Make MuxCalculator with DefaultInputStreamHandler to handle graph closure gracefully

PiperOrigin-RevId: 489336722
This commit is contained in:
MediaPipe Team 2022-11-17 16:28:08 -08:00 committed by Copybara-Service
parent a7bd725e65
commit ab3a5f0fbf
2 changed files with 10 additions and 10 deletions

View File

@ -41,6 +41,10 @@ class MuxCalculator : public Node {
StreamHandler("MuxInputStreamHandler")); StreamHandler("MuxInputStreamHandler"));
absl::Status Process(CalculatorContext* cc) final { absl::Status Process(CalculatorContext* cc) final {
if (kSelect(cc).IsStream() && kSelect(cc).IsEmpty()) {
return absl::OkStatus();
}
int select = *kSelect(cc); int select = *kSelect(cc);
RET_CHECK(0 <= select && select < kIn(cc).Count()); RET_CHECK(0 <= select && select < kIn(cc).Count());
if (!kIn(cc)[select].IsEmpty()) { if (!kIn(cc)[select].IsEmpty()) {

View File

@ -439,7 +439,7 @@ TEST(MuxCalculatorTest, HandlesCloseGracefully) {
EXPECT_TRUE(output_packets.empty()); EXPECT_TRUE(output_packets.empty());
} }
TEST(MuxCalculatorTest, CrashesOnCloseWithDeafultInputStreamHandler) { TEST(MuxCalculatorTest, HandlesCloseGracefullyWithDeafultInputStreamHandler) {
CalculatorGraphConfig config = CalculatorGraphConfig config =
mediapipe::ParseTextProtoOrDie<CalculatorGraphConfig>( mediapipe::ParseTextProtoOrDie<CalculatorGraphConfig>(
R"pb( R"pb(
@ -480,15 +480,11 @@ TEST(MuxCalculatorTest, CrashesOnCloseWithDeafultInputStreamHandler) {
MP_ASSERT_OK(graph.AddPacketToInputStream( MP_ASSERT_OK(graph.AddPacketToInputStream(
"value_0", MakePacket<int>(0).At(Timestamp(1000)))); "value_0", MakePacket<int>(0).At(Timestamp(1000))));
MP_ASSERT_OK(graph.WaitUntilIdle()); MP_ASSERT_OK(graph.WaitUntilIdle());
// Currently MuxCalculator crashes with a correct packet set from MP_ASSERT_OK(graph.CloseAllInputStreams());
// DefaultInputStreamHandler. The SELECT packet is missing at Timestamp 1000, MP_ASSERT_OK(graph.WaitUntilDone());
// and an empty packet is the correct representation of that.
EXPECT_DEATH( ASSERT_EQ(output_packets.size(), 1);
{ EXPECT_TRUE(output_packets[0].IsEmpty());
(void)graph.CloseAllInputStreams();
(void)graph.WaitUntilDone();
},
"Check failed: payload_");
} }
} // namespace } // namespace