diff --git a/mediapipe/calculators/core/mux_calculator_test.cc b/mediapipe/calculators/core/mux_calculator_test.cc index 86d2fab42..a3ac8a27a 100644 --- a/mediapipe/calculators/core/mux_calculator_test.cc +++ b/mediapipe/calculators/core/mux_calculator_test.cc @@ -398,6 +398,99 @@ TEST(MuxCalculatorTest, HandleTimestampBoundUpdates) { MP_ASSERT_OK(graph.WaitUntilDone()); } +TEST(MuxCalculatorTest, HandlesCloseGracefully) { + CalculatorGraphConfig config = + mediapipe::ParseTextProtoOrDie( + R"pb( + input_stream: "select" + input_stream: "value_0" + input_stream: "value_1" + node { + calculator: "MuxCalculator" + input_stream: "SELECT:select" + input_stream: "INPUT:0:value_0" + input_stream: "INPUT:1:value_1" + output_stream: "OUTPUT:output" + } + )pb"); + CalculatorGraph graph; + MP_ASSERT_OK(graph.Initialize(config)); + + // Observe packets. + std::vector output_packets; + MP_ASSERT_OK(graph.ObserveOutputStream( + "output", + [&output_packets](const Packet& p) -> absl::Status { + output_packets.push_back(p); + return absl::OkStatus(); + }, + /*observe_timestamp_bounds=*/true)); + + // Start graph. + MP_ASSERT_OK(graph.StartRun({})); + + // Add single packet wait for completion and close. + MP_ASSERT_OK(graph.AddPacketToInputStream( + "value_0", MakePacket(0).At(Timestamp(1000)))); + MP_ASSERT_OK(graph.WaitUntilIdle()); + MP_ASSERT_OK(graph.CloseAllInputStreams()); + MP_ASSERT_OK(graph.WaitUntilDone()); + + EXPECT_TRUE(output_packets.empty()); +} + +TEST(MuxCalculatorTest, CrashesOnCloseWithDeafultInputStreamHandler) { + CalculatorGraphConfig config = + mediapipe::ParseTextProtoOrDie( + R"pb( + # This is required in order for EXPECT_DEATH to work everywhere + executor { name: "" type: "ApplicationThreadExecutor" } + + input_stream: "select" + input_stream: "value_0" + input_stream: "value_1" + node { + calculator: "MuxCalculator" + input_stream: "SELECT:select" + input_stream: "INPUT:0:value_0" + input_stream: "INPUT:1:value_1" + output_stream: "OUTPUT:output" + input_stream_handler { + input_stream_handler: "DefaultInputStreamHandler" + } + } + )pb"); + CalculatorGraph graph; + MP_ASSERT_OK(graph.Initialize(config)); + + // Observe packets. + std::vector output_packets; + MP_ASSERT_OK(graph.ObserveOutputStream( + "output", + [&output_packets](const Packet& p) -> absl::Status { + output_packets.push_back(p); + return absl::OkStatus(); + }, + /*observe_timestamp_bounds=*/true)); + + // Start graph. + MP_ASSERT_OK(graph.StartRun({})); + + // Add single packet wait for completion and close. + MP_ASSERT_OK(graph.AddPacketToInputStream( + "value_0", MakePacket(0).At(Timestamp(1000)))); + MP_ASSERT_OK(graph.WaitUntilIdle()); + // Currently MuxCalculator crashes with a correct packet set from + // DefaultInputStreamHandler. The SELECT packet is missing at Timestamp 1000, + // and an empty packet is the correct representation of that. + EXPECT_DEATH( + { + (void)graph.CloseAllInputStreams(); + (void)graph.WaitUntilDone(); + }, + "Check failed: payload_"); +} + } // namespace } // namespace mediapipe