From 91782a27725d78efdaa84dc6fc1aabb06279af88 Mon Sep 17 00:00:00 2001 From: Hadon Nash Date: Fri, 4 Nov 2022 19:45:46 -0700 Subject: [PATCH] Internal change PiperOrigin-RevId: 486283316 --- mediapipe/calculators/core/BUILD | 1 + .../core/flow_limiter_calculator.cc | 142 +++++++----- .../core/flow_limiter_calculator_test.cc | 212 +++++++++++++++++- 3 files changed, 290 insertions(+), 65 deletions(-) diff --git a/mediapipe/calculators/core/BUILD b/mediapipe/calculators/core/BUILD index 74398be42..ecd878115 100644 --- a/mediapipe/calculators/core/BUILD +++ b/mediapipe/calculators/core/BUILD @@ -936,6 +936,7 @@ cc_test( "//mediapipe/framework/tool:simulation_clock", "//mediapipe/framework/tool:simulation_clock_executor", "//mediapipe/framework/tool:sink", + "//mediapipe/util:packet_test_util", "@com_google_absl//absl/time", ], ) diff --git a/mediapipe/calculators/core/flow_limiter_calculator.cc b/mediapipe/calculators/core/flow_limiter_calculator.cc index d209b1dbb..5b08f3af5 100644 --- a/mediapipe/calculators/core/flow_limiter_calculator.cc +++ b/mediapipe/calculators/core/flow_limiter_calculator.cc @@ -18,7 +18,6 @@ #include "mediapipe/calculators/core/flow_limiter_calculator.pb.h" #include "mediapipe/framework/calculator_framework.h" -#include "mediapipe/framework/port/ret_check.h" #include "mediapipe/framework/port/status.h" #include "mediapipe/util/header_util.h" @@ -68,7 +67,7 @@ constexpr char kOptionsTag[] = "OPTIONS"; // FlowLimiterCalculator provides limited support for multiple input streams. // The first input stream is treated as the main input stream and successive // input streams are treated as auxiliary input streams. The auxiliary input -// streams are limited to timestamps passed on the main input stream. +// streams are limited to timestamps allowed by the "ALLOW" stream. // class FlowLimiterCalculator : public CalculatorBase { public: @@ -100,64 +99,11 @@ class FlowLimiterCalculator : public CalculatorBase { cc->InputSidePackets().Tag(kMaxInFlightTag).Get()); } input_queues_.resize(cc->Inputs().NumEntries("")); + allowed_[Timestamp::Unset()] = true; RET_CHECK_OK(CopyInputHeadersToOutputs(cc->Inputs(), &(cc->Outputs()))); return absl::OkStatus(); } - // Returns true if an additional frame can be released for processing. - // The "ALLOW" output stream indicates this condition at each input frame. - bool ProcessingAllowed() { - return frames_in_flight_.size() < options_.max_in_flight(); - } - - // Outputs a packet indicating whether a frame was sent or dropped. - void SendAllow(bool allow, Timestamp ts, CalculatorContext* cc) { - if (cc->Outputs().HasTag(kAllowTag)) { - cc->Outputs().Tag(kAllowTag).AddPacket(MakePacket(allow).At(ts)); - } - } - - // Sets the timestamp bound or closes an output stream. - void SetNextTimestampBound(Timestamp bound, OutputStream* stream) { - if (bound > Timestamp::Max()) { - stream->Close(); - } else { - stream->SetNextTimestampBound(bound); - } - } - - // Returns true if a certain timestamp is being processed. - bool IsInFlight(Timestamp timestamp) { - return std::find(frames_in_flight_.begin(), frames_in_flight_.end(), - timestamp) != frames_in_flight_.end(); - } - - // Releases input packets up to the latest settled input timestamp. - void ProcessAuxiliaryInputs(CalculatorContext* cc) { - Timestamp settled_bound = cc->Outputs().Get("", 0).NextTimestampBound(); - for (int i = 1; i < cc->Inputs().NumEntries(""); ++i) { - // Release settled frames from each input queue. - while (!input_queues_[i].empty() && - input_queues_[i].front().Timestamp() < settled_bound) { - Packet packet = input_queues_[i].front(); - input_queues_[i].pop_front(); - if (IsInFlight(packet.Timestamp())) { - cc->Outputs().Get("", i).AddPacket(packet); - } - } - - // Propagate each input timestamp bound. - if (!input_queues_[i].empty()) { - Timestamp bound = input_queues_[i].front().Timestamp(); - SetNextTimestampBound(bound, &cc->Outputs().Get("", i)); - } else { - Timestamp bound = - cc->Inputs().Get("", i).Value().Timestamp().NextAllowedInStream(); - SetNextTimestampBound(bound, &cc->Outputs().Get("", i)); - } - } - } - // Releases input packets allowed by the max_in_flight constraint. absl::Status Process(CalculatorContext* cc) final { options_ = tool::RetrieveOptions(options_, cc->Inputs()); @@ -224,13 +170,97 @@ class FlowLimiterCalculator : public CalculatorBase { } ProcessAuxiliaryInputs(cc); + + // Discard old ALLOW ranges. + Timestamp input_bound = InputTimestampBound(cc); + auto first_range = std::prev(allowed_.upper_bound(input_bound)); + allowed_.erase(allowed_.begin(), first_range); return absl::OkStatus(); } + int LedgerSize() { + int result = frames_in_flight_.size() + allowed_.size(); + for (const auto& queue : input_queues_) { + result += queue.size(); + } + return result; + } + + private: + // Returns true if an additional frame can be released for processing. + // The "ALLOW" output stream indicates this condition at each input frame. + bool ProcessingAllowed() { + return frames_in_flight_.size() < options_.max_in_flight(); + } + + // Outputs a packet indicating whether a frame was sent or dropped. + void SendAllow(bool allow, Timestamp ts, CalculatorContext* cc) { + if (cc->Outputs().HasTag(kAllowTag)) { + cc->Outputs().Tag(kAllowTag).AddPacket(MakePacket(allow).At(ts)); + } + allowed_[ts] = allow; + } + + // Returns true if a timestamp falls within a range of allowed timestamps. + bool IsAllowed(Timestamp timestamp) { + auto it = allowed_.upper_bound(timestamp); + return std::prev(it)->second; + } + + // Sets the timestamp bound or closes an output stream. + void SetNextTimestampBound(Timestamp bound, OutputStream* stream) { + if (bound > Timestamp::Max()) { + stream->Close(); + } else { + stream->SetNextTimestampBound(bound); + } + } + + // Returns the lowest unprocessed input Timestamp. + Timestamp InputTimestampBound(CalculatorContext* cc) { + Timestamp result = Timestamp::Done(); + for (int i = 0; i < input_queues_.size(); ++i) { + auto& queue = input_queues_[i]; + auto& stream = cc->Inputs().Get("", i); + Timestamp bound = queue.empty() + ? stream.Value().Timestamp().NextAllowedInStream() + : queue.front().Timestamp(); + result = std::min(result, bound); + } + return result; + } + + // Releases input packets up to the latest settled input timestamp. + void ProcessAuxiliaryInputs(CalculatorContext* cc) { + Timestamp settled_bound = cc->Outputs().Get("", 0).NextTimestampBound(); + for (int i = 1; i < cc->Inputs().NumEntries(""); ++i) { + // Release settled frames from each input queue. + while (!input_queues_[i].empty() && + input_queues_[i].front().Timestamp() < settled_bound) { + Packet packet = input_queues_[i].front(); + input_queues_[i].pop_front(); + if (IsAllowed(packet.Timestamp())) { + cc->Outputs().Get("", i).AddPacket(packet); + } + } + + // Propagate each input timestamp bound. + if (!input_queues_[i].empty()) { + Timestamp bound = input_queues_[i].front().Timestamp(); + SetNextTimestampBound(bound, &cc->Outputs().Get("", i)); + } else { + Timestamp bound = + cc->Inputs().Get("", i).Value().Timestamp().NextAllowedInStream(); + SetNextTimestampBound(bound, &cc->Outputs().Get("", i)); + } + } + } + private: FlowLimiterCalculatorOptions options_; std::vector> input_queues_; std::deque frames_in_flight_; + std::map allowed_; }; REGISTER_CALCULATOR(FlowLimiterCalculator); diff --git a/mediapipe/calculators/core/flow_limiter_calculator_test.cc b/mediapipe/calculators/core/flow_limiter_calculator_test.cc index 962b1c81a..8a8cc9656 100644 --- a/mediapipe/calculators/core/flow_limiter_calculator_test.cc +++ b/mediapipe/calculators/core/flow_limiter_calculator_test.cc @@ -15,6 +15,7 @@ #include #include #include +#include #include #include "absl/time/clock.h" @@ -32,6 +33,7 @@ #include "mediapipe/framework/tool/simulation_clock.h" #include "mediapipe/framework/tool/simulation_clock_executor.h" #include "mediapipe/framework/tool/sink.h" +#include "mediapipe/util/packet_test_util.h" namespace mediapipe { @@ -77,6 +79,77 @@ std::vector PacketValues(const std::vector& packets) { return result; } +template +std::vector MakePackets(std::vector> contents) { + std::vector result; + for (auto& entry : contents) { + result.push_back(MakePacket(entry.second).At(entry.first)); + } + return result; +} + +std::string SourceString(Timestamp t) { + return (t.IsSpecialValue()) + ? t.DebugString() + : absl::StrCat("Timestamp(", t.DebugString(), ")"); +} + +template +class PacketsEqMatcher + : public ::testing::MatcherInterface { + public: + PacketsEqMatcher(PacketContainer packets) : packets_(packets) {} + void DescribeTo(::std::ostream* os) const override { + *os << "The expected packet contents: \n"; + Print(packets_, os); + } + bool MatchAndExplain( + const PacketContainer& value, + ::testing::MatchResultListener* listener) const override { + if (!Equals(packets_, value)) { + if (listener->IsInterested()) { + *listener << "The actual packet contents: \n"; + Print(value, listener->stream()); + } + return false; + } + return true; + } + + private: + bool Equals(const PacketContainer& c1, const PacketContainer& c2) const { + if (c1.size() != c2.size()) { + return false; + } + for (auto i1 = c1.begin(), i2 = c2.begin(); i1 != c1.end(); ++i1, ++i2) { + Packet p1 = *i1, p2 = *i2; + if (p1.Timestamp() != p2.Timestamp() || + p1.Get() != p2.Get()) { + return false; + } + } + return true; + } + void Print(const PacketContainer& packets, ::std::ostream* os) const { + for (auto it = packets.begin(); it != packets.end(); ++it) { + const Packet& packet = *it; + *os << (it == packets.begin() ? "{" : "") << "{" + << SourceString(packet.Timestamp()) << ", " + << packet.Get() << "}" + << (std::next(it) == packets.end() ? "}" : ", "); + } + } + + const PacketContainer packets_; +}; + +template +::testing::Matcher PackestEq( + const PacketContainer& packets) { + return MakeMatcher( + new PacketsEqMatcher(packets)); +} + // A Calculator::Process callback function. typedef std::function @@ -651,11 +724,12 @@ TEST_F(FlowLimiterCalculatorTest, TwoInputStreams) { input_packets_[17], input_packets_[19], input_packets_[20], }; EXPECT_EQ(out_1_packets_, expected_output); - // Exactly the timestamps released by FlowLimiterCalculator for in_1_sampled. + // The timestamps released by FlowLimiterCalculator for in_1_sampled, + // plus input_packets_[21]. std::vector expected_output_2 = { input_packets_[0], input_packets_[2], input_packets_[4], input_packets_[14], input_packets_[17], input_packets_[19], - input_packets_[20], + input_packets_[20], input_packets_[21], }; EXPECT_EQ(out_2_packets, expected_output_2); } @@ -665,6 +739,9 @@ TEST_F(FlowLimiterCalculatorTest, TwoInputStreams) { // The processing time "sleep_time" is reduced from 22ms to 12ms to create // the same frame rate as FlowLimiterCalculatorTest::TwoInputStreams. TEST_F(FlowLimiterCalculatorTest, ZeroQueue) { + auto BoolPackestEq = PackestEq, bool>; + auto IntPackestEq = PackestEq, int>; + // Configure the test. SetUpInputData(); SetUpSimulationClock(); @@ -699,11 +776,10 @@ TEST_F(FlowLimiterCalculatorTest, ZeroQueue) { } )pb"); - auto limiter_options = ParseTextProtoOrDie(R"pb( - max_in_flight: 1 - max_in_queue: 0 - in_flight_timeout: 100000 # 100 ms - )pb"); + auto limiter_options = ParseTextProtoOrDie( + R"pb( + max_in_flight: 1 max_in_queue: 0 in_flight_timeout: 100000 # 100 ms + )pb"); std::map side_packets = { {"limiter_options", MakePacket(limiter_options)}, @@ -759,13 +835,131 @@ TEST_F(FlowLimiterCalculatorTest, ZeroQueue) { input_packets_[0], input_packets_[2], input_packets_[15], input_packets_[17], input_packets_[19], }; - EXPECT_EQ(out_1_packets_, expected_output); + EXPECT_THAT(out_1_packets_, IntPackestEq(expected_output)); // Exactly the timestamps released by FlowLimiterCalculator for in_1_sampled. std::vector expected_output_2 = { input_packets_[0], input_packets_[2], input_packets_[4], input_packets_[15], input_packets_[17], input_packets_[19], }; - EXPECT_EQ(out_2_packets, expected_output_2); + EXPECT_THAT(out_2_packets, IntPackestEq(expected_output_2)); + + // Validate the ALLOW stream output. + std::vector expected_allow = MakePackets( // + {{Timestamp(0), true}, {Timestamp(10000), false}, + {Timestamp(20000), true}, {Timestamp(30000), false}, + {Timestamp(40000), true}, {Timestamp(50000), false}, + {Timestamp(60000), false}, {Timestamp(70000), false}, + {Timestamp(80000), false}, {Timestamp(90000), false}, + {Timestamp(100000), false}, {Timestamp(110000), false}, + {Timestamp(120000), false}, {Timestamp(130000), false}, + {Timestamp(140000), false}, {Timestamp(150000), true}, + {Timestamp(160000), false}, {Timestamp(170000), true}, + {Timestamp(180000), false}, {Timestamp(190000), true}, + {Timestamp(200000), false}}); + EXPECT_THAT(allow_packets_, BoolPackestEq(expected_allow)); +} + +// Shows how FlowLimiterCalculator releases auxiliary input packets. +// In this test, auxiliary input packets arrive at twice the primary rate. +TEST_F(FlowLimiterCalculatorTest, AuxiliaryInputs) { + auto BoolPackestEq = PackestEq, bool>; + auto IntPackestEq = PackestEq, int>; + + // Configure the test. + SetUpInputData(); + SetUpSimulationClock(); + CalculatorGraphConfig graph_config = + ParseTextProtoOrDie(R"pb( + input_stream: 'in_1' + input_stream: 'in_2' + node { + calculator: 'FlowLimiterCalculator' + input_side_packet: 'OPTIONS:limiter_options' + input_stream: 'in_1' + input_stream: 'in_2' + input_stream: 'FINISHED:out_1' + input_stream_info: { tag_index: 'FINISHED' back_edge: true } + output_stream: 'in_1_sampled' + output_stream: 'in_2_sampled' + output_stream: 'ALLOW:allow' + } + node { + calculator: 'SleepCalculator' + input_side_packet: 'WARMUP_TIME:warmup_time' + input_side_packet: 'SLEEP_TIME:sleep_time' + input_side_packet: 'CLOCK:clock' + input_stream: 'PACKET:in_1_sampled' + output_stream: 'PACKET:out_1' + } + )pb"); + + auto limiter_options = ParseTextProtoOrDie( + R"pb( + max_in_flight: 1 max_in_queue: 0 in_flight_timeout: 1000000 # 1s + )pb"); + std::map side_packets = { + {"limiter_options", + MakePacket(limiter_options)}, + {"warmup_time", MakePacket(22000)}, + {"sleep_time", MakePacket(22000)}, + {"clock", MakePacket(clock_)}, + }; + + // Start the graph. + MP_ASSERT_OK(graph_.Initialize(graph_config)); + MP_EXPECT_OK(graph_.ObserveOutputStream("out_1", [this](Packet p) { + out_1_packets_.push_back(p); + return absl::OkStatus(); + })); + std::vector out_2_packets; + MP_EXPECT_OK(graph_.ObserveOutputStream("in_2_sampled", [&](Packet p) { + out_2_packets.push_back(p); + return absl::OkStatus(); + })); + MP_EXPECT_OK(graph_.ObserveOutputStream("allow", [this](Packet p) { + allow_packets_.push_back(p); + return absl::OkStatus(); + })); + simulation_clock_->ThreadStart(); + MP_ASSERT_OK(graph_.StartRun(side_packets)); + + // Add packets 2,4,6,8 to stream in_1 and 1..9 to stream in_2. + clock_->Sleep(absl::Microseconds(10000)); + for (int i = 1; i < 10; ++i) { + if (i % 2 == 0) { + MP_EXPECT_OK(graph_.AddPacketToInputStream("in_1", input_packets_[i])); + } + MP_EXPECT_OK(graph_.AddPacketToInputStream("in_2", input_packets_[i])); + clock_->Sleep(absl::Microseconds(10000)); + } + + // Finish the graph run. + MP_EXPECT_OK(graph_.CloseAllPacketSources()); + clock_->Sleep(absl::Microseconds(40000)); + MP_EXPECT_OK(graph_.WaitUntilDone()); + simulation_clock_->ThreadFinish(); + + // Validate the output. + // Input packets 4 and 8 are dropped due to max_in_flight. + std::vector expected_output = { + input_packets_[2], + input_packets_[6], + }; + EXPECT_THAT(out_1_packets_, IntPackestEq(expected_output)); + // Packets following input packets 2 and 6, and not input packets 4 and 8. + std::vector expected_output_2 = { + input_packets_[1], input_packets_[2], input_packets_[3], + input_packets_[6], input_packets_[7], + }; + EXPECT_THAT(out_2_packets, IntPackestEq(expected_output_2)); + + // Validate the ALLOW stream output. + std::vector expected_allow = + MakePackets({{Timestamp(20000), 1}, + {Timestamp(40000), 0}, + {Timestamp(60000), 1}, + {Timestamp(80000), 0}}); + EXPECT_THAT(allow_packets_, BoolPackestEq(expected_allow)); } } // anonymous namespace