Discard outdated packets earlier in MuxInputStreamHandler.
In our pipeline, a deadlock is detected because the packets in deselected data streams get piled up. In the current implementation, those packets only get removed in FillInputSet(), but we should also do that in GetNodeReadiness(). PiperOrigin-RevId: 548051369
This commit is contained in:
parent
723e91cec1
commit
2fae07375c
|
@ -48,6 +48,18 @@ class MuxInputStreamHandler : public InputStreamHandler {
|
|||
: InputStreamHandler(std::move(tag_map), cc_manager, options,
|
||||
calculator_run_in_parallel) {}
|
||||
|
||||
private:
|
||||
CollectionItemId GetControlStreamId() const {
|
||||
return input_stream_managers_.EndId() - 1;
|
||||
}
|
||||
void RemoveOutdatedDataPackets(Timestamp timestamp) {
|
||||
const CollectionItemId control_stream_id = GetControlStreamId();
|
||||
for (CollectionItemId id = input_stream_managers_.BeginId();
|
||||
id < control_stream_id; ++id) {
|
||||
input_stream_managers_.Get(id)->ErasePacketsEarlierThan(timestamp);
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
// In MuxInputStreamHandler, a node is "ready" if:
|
||||
// - the control stream is done (need to call Close() in this case), or
|
||||
|
@ -58,9 +70,15 @@ class MuxInputStreamHandler : public InputStreamHandler {
|
|||
absl::MutexLock lock(&input_streams_mutex_);
|
||||
|
||||
const auto& control_stream =
|
||||
input_stream_managers_.Get(input_stream_managers_.EndId() - 1);
|
||||
input_stream_managers_.Get(GetControlStreamId());
|
||||
bool empty;
|
||||
*min_stream_timestamp = control_stream->MinTimestampOrBound(&empty);
|
||||
|
||||
// Data streams may contain some outdated packets which failed to be popped
|
||||
// out during "FillInputSet". (This handler doesn't sync input streams,
|
||||
// hence "FillInputSet" can be triggerred before every input stream is
|
||||
// filled with packets corresponding to the same timestamp.)
|
||||
RemoveOutdatedDataPackets(*min_stream_timestamp);
|
||||
if (empty) {
|
||||
if (*min_stream_timestamp == Timestamp::Done()) {
|
||||
// Calculator is done if the control input stream is done.
|
||||
|
@ -78,11 +96,6 @@ class MuxInputStreamHandler : public InputStreamHandler {
|
|||
const auto& data_stream = input_stream_managers_.Get(
|
||||
input_stream_managers_.BeginId() + control_value);
|
||||
|
||||
// Data stream may contain some outdated packets which failed to be popped
|
||||
// out during "FillInputSet". (This handler doesn't sync input streams,
|
||||
// hence "FillInputSet" can be triggerred before every input stream is
|
||||
// filled with packets corresponding to the same timestamp.)
|
||||
data_stream->ErasePacketsEarlierThan(*min_stream_timestamp);
|
||||
Timestamp stream_timestamp = data_stream->MinTimestampOrBound(&empty);
|
||||
if (empty) {
|
||||
if (stream_timestamp <= *min_stream_timestamp) {
|
||||
|
@ -111,8 +124,7 @@ class MuxInputStreamHandler : public InputStreamHandler {
|
|||
CHECK(input_set);
|
||||
absl::MutexLock lock(&input_streams_mutex_);
|
||||
|
||||
const CollectionItemId control_stream_id =
|
||||
input_stream_managers_.EndId() - 1;
|
||||
const CollectionItemId control_stream_id = GetControlStreamId();
|
||||
auto& control_stream = input_stream_managers_.Get(control_stream_id);
|
||||
int num_packets_dropped = 0;
|
||||
bool stream_is_done = false;
|
||||
|
@ -140,15 +152,8 @@ class MuxInputStreamHandler : public InputStreamHandler {
|
|||
AddPacketToShard(&input_set->Get(data_stream_id), std::move(data_packet),
|
||||
stream_is_done);
|
||||
|
||||
// Discard old packets on other streams.
|
||||
// Note that control_stream_id is the last valid id.
|
||||
auto next_timestamp = input_timestamp.NextAllowedInStream();
|
||||
for (CollectionItemId id = input_stream_managers_.BeginId();
|
||||
id < control_stream_id; ++id) {
|
||||
if (id == data_stream_id) continue;
|
||||
auto& other_stream = input_stream_managers_.Get(id);
|
||||
other_stream->ErasePacketsEarlierThan(next_timestamp);
|
||||
}
|
||||
// Discard old packets on data streams.
|
||||
RemoveOutdatedDataPackets(input_timestamp.NextAllowedInStream());
|
||||
}
|
||||
|
||||
private:
|
||||
|
|
|
@ -645,5 +645,41 @@ TEST(MuxInputStreamHandlerTest,
|
|||
MP_ASSERT_OK(graph.WaitUntilDone());
|
||||
}
|
||||
|
||||
TEST(MuxInputStreamHandlerTest, RemovesUnusedDataStreamPackets) {
|
||||
CalculatorGraphConfig config =
|
||||
mediapipe::ParseTextProtoOrDie<CalculatorGraphConfig>(R"pb(
|
||||
input_stream: "input0"
|
||||
input_stream: "input1"
|
||||
input_stream: "select"
|
||||
node {
|
||||
calculator: "MuxCalculator"
|
||||
input_stream: "INPUT:0:input0"
|
||||
input_stream: "INPUT:1:input1"
|
||||
input_stream: "SELECT:select"
|
||||
output_stream: "OUTPUT:output"
|
||||
input_stream_handler { input_stream_handler: "MuxInputStreamHandler" }
|
||||
}
|
||||
)pb");
|
||||
config.set_max_queue_size(1);
|
||||
config.set_report_deadlock(true);
|
||||
|
||||
CalculatorGraph graph;
|
||||
MP_ASSERT_OK(graph.Initialize(config));
|
||||
MP_ASSERT_OK(graph.StartRun({}));
|
||||
MP_ASSERT_OK(graph.AddPacketToInputStream(
|
||||
"select", MakePacket<int>(0).At(Timestamp(2))));
|
||||
MP_ASSERT_OK(graph.AddPacketToInputStream(
|
||||
"input0", MakePacket<int>(1000).At(Timestamp(2))));
|
||||
MP_ASSERT_OK(graph.WaitUntilIdle());
|
||||
|
||||
// Add two delayed packets to the deselected input. They should be discarded
|
||||
// instead of triggering the deadlock detection (max_queue_size = 1).
|
||||
MP_ASSERT_OK(graph.AddPacketToInputStream(
|
||||
"input1", MakePacket<int>(900).At(Timestamp(1))));
|
||||
MP_ASSERT_OK(graph.AddPacketToInputStream(
|
||||
"input1", MakePacket<int>(900).At(Timestamp(2))));
|
||||
MP_ASSERT_OK(graph.WaitUntilIdle());
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace mediapipe
|
||||
|
|
Loading…
Reference in New Issue
Block a user