Add MuxCalculator test case where graph is being closed while SELECT has not been received.
PiperOrigin-RevId: 489061902
This commit is contained in:
parent
3cdf0f6536
commit
b6b72d5e4e
|
@ -398,6 +398,99 @@ TEST(MuxCalculatorTest, HandleTimestampBoundUpdates) {
|
||||||
MP_ASSERT_OK(graph.WaitUntilDone());
|
MP_ASSERT_OK(graph.WaitUntilDone());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(MuxCalculatorTest, HandlesCloseGracefully) {
|
||||||
|
CalculatorGraphConfig config =
|
||||||
|
mediapipe::ParseTextProtoOrDie<CalculatorGraphConfig>(
|
||||||
|
R"pb(
|
||||||
|
input_stream: "select"
|
||||||
|
input_stream: "value_0"
|
||||||
|
input_stream: "value_1"
|
||||||
|
node {
|
||||||
|
calculator: "MuxCalculator"
|
||||||
|
input_stream: "SELECT:select"
|
||||||
|
input_stream: "INPUT:0:value_0"
|
||||||
|
input_stream: "INPUT:1:value_1"
|
||||||
|
output_stream: "OUTPUT:output"
|
||||||
|
}
|
||||||
|
)pb");
|
||||||
|
CalculatorGraph graph;
|
||||||
|
MP_ASSERT_OK(graph.Initialize(config));
|
||||||
|
|
||||||
|
// Observe packets.
|
||||||
|
std::vector<Packet> output_packets;
|
||||||
|
MP_ASSERT_OK(graph.ObserveOutputStream(
|
||||||
|
"output",
|
||||||
|
[&output_packets](const Packet& p) -> absl::Status {
|
||||||
|
output_packets.push_back(p);
|
||||||
|
return absl::OkStatus();
|
||||||
|
},
|
||||||
|
/*observe_timestamp_bounds=*/true));
|
||||||
|
|
||||||
|
// Start graph.
|
||||||
|
MP_ASSERT_OK(graph.StartRun({}));
|
||||||
|
|
||||||
|
// Add single packet wait for completion and close.
|
||||||
|
MP_ASSERT_OK(graph.AddPacketToInputStream(
|
||||||
|
"value_0", MakePacket<int>(0).At(Timestamp(1000))));
|
||||||
|
MP_ASSERT_OK(graph.WaitUntilIdle());
|
||||||
|
MP_ASSERT_OK(graph.CloseAllInputStreams());
|
||||||
|
MP_ASSERT_OK(graph.WaitUntilDone());
|
||||||
|
|
||||||
|
EXPECT_TRUE(output_packets.empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(MuxCalculatorTest, CrashesOnCloseWithDeafultInputStreamHandler) {
|
||||||
|
CalculatorGraphConfig config =
|
||||||
|
mediapipe::ParseTextProtoOrDie<CalculatorGraphConfig>(
|
||||||
|
R"pb(
|
||||||
|
# This is required in order for EXPECT_DEATH to work everywhere
|
||||||
|
executor { name: "" type: "ApplicationThreadExecutor" }
|
||||||
|
|
||||||
|
input_stream: "select"
|
||||||
|
input_stream: "value_0"
|
||||||
|
input_stream: "value_1"
|
||||||
|
node {
|
||||||
|
calculator: "MuxCalculator"
|
||||||
|
input_stream: "SELECT:select"
|
||||||
|
input_stream: "INPUT:0:value_0"
|
||||||
|
input_stream: "INPUT:1:value_1"
|
||||||
|
output_stream: "OUTPUT:output"
|
||||||
|
input_stream_handler {
|
||||||
|
input_stream_handler: "DefaultInputStreamHandler"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)pb");
|
||||||
|
CalculatorGraph graph;
|
||||||
|
MP_ASSERT_OK(graph.Initialize(config));
|
||||||
|
|
||||||
|
// Observe packets.
|
||||||
|
std::vector<Packet> output_packets;
|
||||||
|
MP_ASSERT_OK(graph.ObserveOutputStream(
|
||||||
|
"output",
|
||||||
|
[&output_packets](const Packet& p) -> absl::Status {
|
||||||
|
output_packets.push_back(p);
|
||||||
|
return absl::OkStatus();
|
||||||
|
},
|
||||||
|
/*observe_timestamp_bounds=*/true));
|
||||||
|
|
||||||
|
// Start graph.
|
||||||
|
MP_ASSERT_OK(graph.StartRun({}));
|
||||||
|
|
||||||
|
// Add single packet wait for completion and close.
|
||||||
|
MP_ASSERT_OK(graph.AddPacketToInputStream(
|
||||||
|
"value_0", MakePacket<int>(0).At(Timestamp(1000))));
|
||||||
|
MP_ASSERT_OK(graph.WaitUntilIdle());
|
||||||
|
// Currently MuxCalculator crashes with a correct packet set from
|
||||||
|
// DefaultInputStreamHandler. The SELECT packet is missing at Timestamp 1000,
|
||||||
|
// and an empty packet is the correct representation of that.
|
||||||
|
EXPECT_DEATH(
|
||||||
|
{
|
||||||
|
(void)graph.CloseAllInputStreams();
|
||||||
|
(void)graph.WaitUntilDone();
|
||||||
|
},
|
||||||
|
"Check failed: payload_");
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
} // namespace mediapipe
|
} // namespace mediapipe
|
||||||
|
|
Loading…
Reference in New Issue
Block a user