From a83d87e157877b998cdbc3a59123f65559aae9c5 Mon Sep 17 00:00:00 2001 From: Hadon Nash Date: Fri, 11 Nov 2022 11:48:22 -0800 Subject: [PATCH] Internal change PiperOrigin-RevId: 487880137 --- .../core/flow_limiter_calculator_test.cc | 244 ++++++++++++------ 1 file changed, 165 insertions(+), 79 deletions(-) diff --git a/mediapipe/calculators/core/flow_limiter_calculator_test.cc b/mediapipe/calculators/core/flow_limiter_calculator_test.cc index 8a8cc9656..45bace271 100644 --- a/mediapipe/calculators/core/flow_limiter_calculator_test.cc +++ b/mediapipe/calculators/core/flow_limiter_calculator_test.cc @@ -79,21 +79,25 @@ std::vector PacketValues(const std::vector& packets) { return result; } -template -std::vector MakePackets(std::vector> contents) { - std::vector result; - for (auto& entry : contents) { - result.push_back(MakePacket(entry.second).At(entry.first)); - } - return result; -} - std::string SourceString(Timestamp t) { return (t.IsSpecialValue()) ? t.DebugString() : absl::StrCat("Timestamp(", t.DebugString(), ")"); } +template +std::string SourceString(Packet packet) { + std::ostringstream oss; + if (packet.IsEmpty()) { + oss << "Packet()"; + } else { + oss << "MakePacket<" << MediaPipeTypeStringOrDemangled() << ">(" + << packet.Get() << ")"; + } + oss << ".At(" << SourceString(packet.Timestamp()) << ")"; + return oss.str(); +} + template class PacketsEqMatcher : public ::testing::MatcherInterface { @@ -123,8 +127,9 @@ class PacketsEqMatcher } for (auto i1 = c1.begin(), i2 = c2.begin(); i1 != c1.end(); ++i1, ++i2) { Packet p1 = *i1, p2 = *i2; - if (p1.Timestamp() != p2.Timestamp() || - p1.Get() != p2.Get()) { + if (p1.Timestamp() != p2.Timestamp() || p1.IsEmpty() != p2.IsEmpty() || + (!p1.IsEmpty() && + p1.Get() != p2.Get())) { return false; } } @@ -133,10 +138,9 @@ class PacketsEqMatcher void Print(const PacketContainer& packets, ::std::ostream* os) const { for (auto it = packets.begin(); it != packets.end(); ++it) { const Packet& packet = *it; - *os << (it == packets.begin() ? "{" : "") << "{" - << SourceString(packet.Timestamp()) << ", " - << packet.Get() << "}" - << (std::next(it) == packets.end() ? "}" : ", "); + *os << (it == packets.begin() ? "{" : ""); + *os << SourceString(packet); + *os << (std::next(it) == packets.end() ? "}" : ", "); } } @@ -144,7 +148,7 @@ class PacketsEqMatcher }; template -::testing::Matcher PackestEq( +::testing::Matcher PacketsEq( const PacketContainer& packets) { return MakeMatcher( new PacketsEqMatcher(packets)); @@ -739,8 +743,8 @@ TEST_F(FlowLimiterCalculatorTest, TwoInputStreams) { // The processing time "sleep_time" is reduced from 22ms to 12ms to create // the same frame rate as FlowLimiterCalculatorTest::TwoInputStreams. TEST_F(FlowLimiterCalculatorTest, ZeroQueue) { - auto BoolPackestEq = PackestEq, bool>; - auto IntPackestEq = PackestEq, int>; + auto BoolPacketsEq = PacketsEq, bool>; + auto IntPacketsEq = PacketsEq, int>; // Configure the test. SetUpInputData(); @@ -835,52 +839,86 @@ TEST_F(FlowLimiterCalculatorTest, ZeroQueue) { input_packets_[0], input_packets_[2], input_packets_[15], input_packets_[17], input_packets_[19], }; - EXPECT_THAT(out_1_packets_, IntPackestEq(expected_output)); + EXPECT_THAT(out_1_packets_, IntPacketsEq(expected_output)); // Exactly the timestamps released by FlowLimiterCalculator for in_1_sampled. std::vector expected_output_2 = { input_packets_[0], input_packets_[2], input_packets_[4], input_packets_[15], input_packets_[17], input_packets_[19], }; - EXPECT_THAT(out_2_packets, IntPackestEq(expected_output_2)); + EXPECT_THAT(out_2_packets, IntPacketsEq(expected_output_2)); // Validate the ALLOW stream output. - std::vector expected_allow = MakePackets( // - {{Timestamp(0), true}, {Timestamp(10000), false}, - {Timestamp(20000), true}, {Timestamp(30000), false}, - {Timestamp(40000), true}, {Timestamp(50000), false}, - {Timestamp(60000), false}, {Timestamp(70000), false}, - {Timestamp(80000), false}, {Timestamp(90000), false}, - {Timestamp(100000), false}, {Timestamp(110000), false}, - {Timestamp(120000), false}, {Timestamp(130000), false}, - {Timestamp(140000), false}, {Timestamp(150000), true}, - {Timestamp(160000), false}, {Timestamp(170000), true}, - {Timestamp(180000), false}, {Timestamp(190000), true}, - {Timestamp(200000), false}}); - EXPECT_THAT(allow_packets_, BoolPackestEq(expected_allow)); + std::vector expected_allow = { + MakePacket(true).At(Timestamp(0)), + MakePacket(false).At(Timestamp(10000)), + MakePacket(true).At(Timestamp(20000)), + MakePacket(false).At(Timestamp(30000)), + MakePacket(true).At(Timestamp(40000)), + MakePacket(false).At(Timestamp(50000)), + MakePacket(false).At(Timestamp(60000)), + MakePacket(false).At(Timestamp(70000)), + MakePacket(false).At(Timestamp(80000)), + MakePacket(false).At(Timestamp(90000)), + MakePacket(false).At(Timestamp(100000)), + MakePacket(false).At(Timestamp(110000)), + MakePacket(false).At(Timestamp(120000)), + MakePacket(false).At(Timestamp(130000)), + MakePacket(false).At(Timestamp(140000)), + MakePacket(true).At(Timestamp(150000)), + MakePacket(false).At(Timestamp(160000)), + MakePacket(true).At(Timestamp(170000)), + MakePacket(false).At(Timestamp(180000)), + MakePacket(true).At(Timestamp(190000)), + MakePacket(false).At(Timestamp(200000)), + }; + EXPECT_THAT(allow_packets_, BoolPacketsEq(expected_allow)); +} + +std::vector StripBoundsUpdates(const std::vector& packets, + Timestamp begin = Timestamp::Min(), + Timestamp end = Timestamp::Max()) { + std::vector result; + for (const auto& packet : packets) { + Timestamp ts = packet.Timestamp(); + if (packet.IsEmpty() && ts >= begin && ts < end) { + continue; + } + result.push_back(packet); + } + return result; } // Shows how FlowLimiterCalculator releases auxiliary input packets. // In this test, auxiliary input packets arrive at twice the primary rate. TEST_F(FlowLimiterCalculatorTest, AuxiliaryInputs) { - auto BoolPackestEq = PackestEq, bool>; - auto IntPackestEq = PackestEq, int>; + auto BoolPacketsEq = PacketsEq, bool>; + auto IntPacketsEq = PacketsEq, int>; // Configure the test. SetUpInputData(); SetUpSimulationClock(); CalculatorGraphConfig graph_config = ParseTextProtoOrDie(R"pb( - input_stream: 'in_1' - input_stream: 'in_2' + input_stream: 'input_1' + input_stream: 'auxiliary_input_2' + input_stream: 'auxiliary_input_3' node { calculator: 'FlowLimiterCalculator' - input_side_packet: 'OPTIONS:limiter_options' - input_stream: 'in_1' - input_stream: 'in_2' + options { + [mediapipe.FlowLimiterCalculatorOptions.ext] { + max_in_flight: 1 + max_in_queue: 0 + in_flight_timeout: 1000000 # 1s + } + } + input_stream: 'input_1' + input_stream: 'auxiliary_input_2' + input_stream: 'auxiliary_input_3' input_stream: 'FINISHED:out_1' input_stream_info: { tag_index: 'FINISHED' back_edge: true } - output_stream: 'in_1_sampled' - output_stream: 'in_2_sampled' + output_stream: 'input_1_sampled' + output_stream: 'auxiliary_input_2_sampled' + output_stream: 'auxiliary_input_3_sampled' output_stream: 'ALLOW:allow' } node { @@ -888,49 +926,75 @@ TEST_F(FlowLimiterCalculatorTest, AuxiliaryInputs) { input_side_packet: 'WARMUP_TIME:warmup_time' input_side_packet: 'SLEEP_TIME:sleep_time' input_side_packet: 'CLOCK:clock' - input_stream: 'PACKET:in_1_sampled' + input_stream: 'PACKET:input_1_sampled' output_stream: 'PACKET:out_1' } )pb"); - auto limiter_options = ParseTextProtoOrDie( - R"pb( - max_in_flight: 1 max_in_queue: 0 in_flight_timeout: 1000000 # 1s - )pb"); std::map side_packets = { - {"limiter_options", - MakePacket(limiter_options)}, + // Fake processing lazy initialization time in microseconds. {"warmup_time", MakePacket(22000)}, + // Fake processing duration in microseconds. {"sleep_time", MakePacket(22000)}, + // The SimulationClock to count virtual elapsed time. {"clock", MakePacket(clock_)}, }; // Start the graph. MP_ASSERT_OK(graph_.Initialize(graph_config)); - MP_EXPECT_OK(graph_.ObserveOutputStream("out_1", [this](Packet p) { - out_1_packets_.push_back(p); - return absl::OkStatus(); - })); - std::vector out_2_packets; - MP_EXPECT_OK(graph_.ObserveOutputStream("in_2_sampled", [&](Packet p) { - out_2_packets.push_back(p); - return absl::OkStatus(); - })); - MP_EXPECT_OK(graph_.ObserveOutputStream("allow", [this](Packet p) { - allow_packets_.push_back(p); - return absl::OkStatus(); - })); + MP_EXPECT_OK(graph_.ObserveOutputStream( + "out_1", + [this](Packet p) { + out_1_packets_.push_back(p); + return absl::OkStatus(); + }, + true)); + std::vector out_2_packets, out_3_packets; + MP_EXPECT_OK(graph_.ObserveOutputStream( + "auxiliary_input_2_sampled", + [&](Packet p) { + out_2_packets.push_back(p); + return absl::OkStatus(); + }, + true)); + MP_EXPECT_OK(graph_.ObserveOutputStream( + "auxiliary_input_3_sampled", + [&](Packet p) { + out_3_packets.push_back(p); + return absl::OkStatus(); + }, + true)); + MP_EXPECT_OK(graph_.ObserveOutputStream( + "allow", + [this](Packet p) { + allow_packets_.push_back(p); + return absl::OkStatus(); + }, + true)); simulation_clock_->ThreadStart(); MP_ASSERT_OK(graph_.StartRun(side_packets)); - // Add packets 2,4,6,8 to stream in_1 and 1..9 to stream in_2. - clock_->Sleep(absl::Microseconds(10000)); + // Add packets 1..9 to auxiliary_input_3, early. + for (int i = 1; i < 10; ++i) { + MP_EXPECT_OK(graph_.AddPacketToInputStream( + "auxiliary_input_3", MakePacket(i).At(Timestamp(i * 10000)))); + } + + // The total count of out_2_packets after each input packet. + // std::vector sizes_2 = {0, 0, 2, 2, 3, 3, 4, 4, 5, 5}; + std::vector sizes_2 = {0, 1, 3, 4, 6, 7, 9, 10, 12, 13}; + + // Add packets 2,4,6,8 to stream input_1. + // Add packets 1..9 to auxiliary_input_2. for (int i = 1; i < 10; ++i) { if (i % 2 == 0) { - MP_EXPECT_OK(graph_.AddPacketToInputStream("in_1", input_packets_[i])); + MP_EXPECT_OK(graph_.AddPacketToInputStream( + "input_1", MakePacket(i).At(Timestamp(i * 10000)))); } - MP_EXPECT_OK(graph_.AddPacketToInputStream("in_2", input_packets_[i])); + MP_EXPECT_OK(graph_.AddPacketToInputStream( + "auxiliary_input_2", MakePacket(i).At(Timestamp(i * 10000)))); clock_->Sleep(absl::Microseconds(10000)); + EXPECT_EQ(out_2_packets.size(), sizes_2[i]); } // Finish the graph run. @@ -942,24 +1006,46 @@ TEST_F(FlowLimiterCalculatorTest, AuxiliaryInputs) { // Validate the output. // Input packets 4 and 8 are dropped due to max_in_flight. std::vector expected_output = { - input_packets_[2], - input_packets_[6], + MakePacket(2).At(Timestamp(20000)), + Packet().At(Timestamp(40000)), + MakePacket(6).At(Timestamp(60000)), + Packet().At(Timestamp(80000)), }; - EXPECT_THAT(out_1_packets_, IntPackestEq(expected_output)); + EXPECT_THAT(out_1_packets_, IntPacketsEq(expected_output)); + // Packets following input packets 2 and 6, and not input packets 4 and 8. - std::vector expected_output_2 = { - input_packets_[1], input_packets_[2], input_packets_[3], - input_packets_[6], input_packets_[7], + std::vector expected_auxiliary_output = { + Packet().At(Timestamp(9999)), + MakePacket(1).At(Timestamp(10000)), + MakePacket(2).At(Timestamp(20000)), + Packet().At(Timestamp(29999)), + MakePacket(3).At(Timestamp(30000)), + Packet().At(Timestamp(40000)), + Packet().At(Timestamp(49999)), + Packet().At(Timestamp(50000)), + MakePacket(6).At(Timestamp(60000)), + Packet().At(Timestamp(69999)), + MakePacket(7).At(Timestamp(70000)), + Packet().At(Timestamp(80000)), + Packet().At(Timestamp(89999)), }; - EXPECT_THAT(out_2_packets, IntPackestEq(expected_output_2)); + std::vector actual_2 = + StripBoundsUpdates(out_2_packets, Timestamp(90000)); + EXPECT_THAT(actual_2, IntPacketsEq(expected_auxiliary_output)); + std::vector expected_3 = + StripBoundsUpdates(expected_auxiliary_output, Timestamp(39999)); + std::vector actual_3 = + StripBoundsUpdates(out_3_packets, Timestamp(39999)); + EXPECT_THAT(actual_3, IntPacketsEq(expected_3)); // Validate the ALLOW stream output. - std::vector expected_allow = - MakePackets({{Timestamp(20000), 1}, - {Timestamp(40000), 0}, - {Timestamp(60000), 1}, - {Timestamp(80000), 0}}); - EXPECT_THAT(allow_packets_, BoolPackestEq(expected_allow)); + std::vector expected_allow = { + MakePacket(true).At(Timestamp(20000)), + MakePacket(false).At(Timestamp(40000)), + MakePacket(true).At(Timestamp(60000)), + MakePacket(false).At(Timestamp(80000)), + }; + EXPECT_THAT(allow_packets_, BoolPacketsEq(expected_allow)); } } // anonymous namespace