Internal change
PiperOrigin-RevId: 486283316
This commit is contained in:
parent
416f91180b
commit
91782a2772
|
@ -936,6 +936,7 @@ cc_test(
|
||||||
"//mediapipe/framework/tool:simulation_clock",
|
"//mediapipe/framework/tool:simulation_clock",
|
||||||
"//mediapipe/framework/tool:simulation_clock_executor",
|
"//mediapipe/framework/tool:simulation_clock_executor",
|
||||||
"//mediapipe/framework/tool:sink",
|
"//mediapipe/framework/tool:sink",
|
||||||
|
"//mediapipe/util:packet_test_util",
|
||||||
"@com_google_absl//absl/time",
|
"@com_google_absl//absl/time",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
#include "mediapipe/calculators/core/flow_limiter_calculator.pb.h"
|
#include "mediapipe/calculators/core/flow_limiter_calculator.pb.h"
|
||||||
#include "mediapipe/framework/calculator_framework.h"
|
#include "mediapipe/framework/calculator_framework.h"
|
||||||
#include "mediapipe/framework/port/ret_check.h"
|
|
||||||
#include "mediapipe/framework/port/status.h"
|
#include "mediapipe/framework/port/status.h"
|
||||||
#include "mediapipe/util/header_util.h"
|
#include "mediapipe/util/header_util.h"
|
||||||
|
|
||||||
|
@ -68,7 +67,7 @@ constexpr char kOptionsTag[] = "OPTIONS";
|
||||||
// FlowLimiterCalculator provides limited support for multiple input streams.
|
// FlowLimiterCalculator provides limited support for multiple input streams.
|
||||||
// The first input stream is treated as the main input stream and successive
|
// The first input stream is treated as the main input stream and successive
|
||||||
// input streams are treated as auxiliary input streams. The auxiliary input
|
// input streams are treated as auxiliary input streams. The auxiliary input
|
||||||
// streams are limited to timestamps passed on the main input stream.
|
// streams are limited to timestamps allowed by the "ALLOW" stream.
|
||||||
//
|
//
|
||||||
class FlowLimiterCalculator : public CalculatorBase {
|
class FlowLimiterCalculator : public CalculatorBase {
|
||||||
public:
|
public:
|
||||||
|
@ -100,64 +99,11 @@ class FlowLimiterCalculator : public CalculatorBase {
|
||||||
cc->InputSidePackets().Tag(kMaxInFlightTag).Get<int>());
|
cc->InputSidePackets().Tag(kMaxInFlightTag).Get<int>());
|
||||||
}
|
}
|
||||||
input_queues_.resize(cc->Inputs().NumEntries(""));
|
input_queues_.resize(cc->Inputs().NumEntries(""));
|
||||||
|
allowed_[Timestamp::Unset()] = true;
|
||||||
RET_CHECK_OK(CopyInputHeadersToOutputs(cc->Inputs(), &(cc->Outputs())));
|
RET_CHECK_OK(CopyInputHeadersToOutputs(cc->Inputs(), &(cc->Outputs())));
|
||||||
return absl::OkStatus();
|
return absl::OkStatus();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns true if an additional frame can be released for processing.
|
|
||||||
// The "ALLOW" output stream indicates this condition at each input frame.
|
|
||||||
bool ProcessingAllowed() {
|
|
||||||
return frames_in_flight_.size() < options_.max_in_flight();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Outputs a packet indicating whether a frame was sent or dropped.
|
|
||||||
void SendAllow(bool allow, Timestamp ts, CalculatorContext* cc) {
|
|
||||||
if (cc->Outputs().HasTag(kAllowTag)) {
|
|
||||||
cc->Outputs().Tag(kAllowTag).AddPacket(MakePacket<bool>(allow).At(ts));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sets the timestamp bound or closes an output stream.
|
|
||||||
void SetNextTimestampBound(Timestamp bound, OutputStream* stream) {
|
|
||||||
if (bound > Timestamp::Max()) {
|
|
||||||
stream->Close();
|
|
||||||
} else {
|
|
||||||
stream->SetNextTimestampBound(bound);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns true if a certain timestamp is being processed.
|
|
||||||
bool IsInFlight(Timestamp timestamp) {
|
|
||||||
return std::find(frames_in_flight_.begin(), frames_in_flight_.end(),
|
|
||||||
timestamp) != frames_in_flight_.end();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Releases input packets up to the latest settled input timestamp.
|
|
||||||
void ProcessAuxiliaryInputs(CalculatorContext* cc) {
|
|
||||||
Timestamp settled_bound = cc->Outputs().Get("", 0).NextTimestampBound();
|
|
||||||
for (int i = 1; i < cc->Inputs().NumEntries(""); ++i) {
|
|
||||||
// Release settled frames from each input queue.
|
|
||||||
while (!input_queues_[i].empty() &&
|
|
||||||
input_queues_[i].front().Timestamp() < settled_bound) {
|
|
||||||
Packet packet = input_queues_[i].front();
|
|
||||||
input_queues_[i].pop_front();
|
|
||||||
if (IsInFlight(packet.Timestamp())) {
|
|
||||||
cc->Outputs().Get("", i).AddPacket(packet);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Propagate each input timestamp bound.
|
|
||||||
if (!input_queues_[i].empty()) {
|
|
||||||
Timestamp bound = input_queues_[i].front().Timestamp();
|
|
||||||
SetNextTimestampBound(bound, &cc->Outputs().Get("", i));
|
|
||||||
} else {
|
|
||||||
Timestamp bound =
|
|
||||||
cc->Inputs().Get("", i).Value().Timestamp().NextAllowedInStream();
|
|
||||||
SetNextTimestampBound(bound, &cc->Outputs().Get("", i));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Releases input packets allowed by the max_in_flight constraint.
|
// Releases input packets allowed by the max_in_flight constraint.
|
||||||
absl::Status Process(CalculatorContext* cc) final {
|
absl::Status Process(CalculatorContext* cc) final {
|
||||||
options_ = tool::RetrieveOptions(options_, cc->Inputs());
|
options_ = tool::RetrieveOptions(options_, cc->Inputs());
|
||||||
|
@ -224,13 +170,97 @@ class FlowLimiterCalculator : public CalculatorBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
ProcessAuxiliaryInputs(cc);
|
ProcessAuxiliaryInputs(cc);
|
||||||
|
|
||||||
|
// Discard old ALLOW ranges.
|
||||||
|
Timestamp input_bound = InputTimestampBound(cc);
|
||||||
|
auto first_range = std::prev(allowed_.upper_bound(input_bound));
|
||||||
|
allowed_.erase(allowed_.begin(), first_range);
|
||||||
return absl::OkStatus();
|
return absl::OkStatus();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int LedgerSize() {
|
||||||
|
int result = frames_in_flight_.size() + allowed_.size();
|
||||||
|
for (const auto& queue : input_queues_) {
|
||||||
|
result += queue.size();
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
// Returns true if an additional frame can be released for processing.
|
||||||
|
// The "ALLOW" output stream indicates this condition at each input frame.
|
||||||
|
bool ProcessingAllowed() {
|
||||||
|
return frames_in_flight_.size() < options_.max_in_flight();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Outputs a packet indicating whether a frame was sent or dropped.
|
||||||
|
void SendAllow(bool allow, Timestamp ts, CalculatorContext* cc) {
|
||||||
|
if (cc->Outputs().HasTag(kAllowTag)) {
|
||||||
|
cc->Outputs().Tag(kAllowTag).AddPacket(MakePacket<bool>(allow).At(ts));
|
||||||
|
}
|
||||||
|
allowed_[ts] = allow;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns true if a timestamp falls within a range of allowed timestamps.
|
||||||
|
bool IsAllowed(Timestamp timestamp) {
|
||||||
|
auto it = allowed_.upper_bound(timestamp);
|
||||||
|
return std::prev(it)->second;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sets the timestamp bound or closes an output stream.
|
||||||
|
void SetNextTimestampBound(Timestamp bound, OutputStream* stream) {
|
||||||
|
if (bound > Timestamp::Max()) {
|
||||||
|
stream->Close();
|
||||||
|
} else {
|
||||||
|
stream->SetNextTimestampBound(bound);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns the lowest unprocessed input Timestamp.
|
||||||
|
Timestamp InputTimestampBound(CalculatorContext* cc) {
|
||||||
|
Timestamp result = Timestamp::Done();
|
||||||
|
for (int i = 0; i < input_queues_.size(); ++i) {
|
||||||
|
auto& queue = input_queues_[i];
|
||||||
|
auto& stream = cc->Inputs().Get("", i);
|
||||||
|
Timestamp bound = queue.empty()
|
||||||
|
? stream.Value().Timestamp().NextAllowedInStream()
|
||||||
|
: queue.front().Timestamp();
|
||||||
|
result = std::min(result, bound);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Releases input packets up to the latest settled input timestamp.
|
||||||
|
void ProcessAuxiliaryInputs(CalculatorContext* cc) {
|
||||||
|
Timestamp settled_bound = cc->Outputs().Get("", 0).NextTimestampBound();
|
||||||
|
for (int i = 1; i < cc->Inputs().NumEntries(""); ++i) {
|
||||||
|
// Release settled frames from each input queue.
|
||||||
|
while (!input_queues_[i].empty() &&
|
||||||
|
input_queues_[i].front().Timestamp() < settled_bound) {
|
||||||
|
Packet packet = input_queues_[i].front();
|
||||||
|
input_queues_[i].pop_front();
|
||||||
|
if (IsAllowed(packet.Timestamp())) {
|
||||||
|
cc->Outputs().Get("", i).AddPacket(packet);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Propagate each input timestamp bound.
|
||||||
|
if (!input_queues_[i].empty()) {
|
||||||
|
Timestamp bound = input_queues_[i].front().Timestamp();
|
||||||
|
SetNextTimestampBound(bound, &cc->Outputs().Get("", i));
|
||||||
|
} else {
|
||||||
|
Timestamp bound =
|
||||||
|
cc->Inputs().Get("", i).Value().Timestamp().NextAllowedInStream();
|
||||||
|
SetNextTimestampBound(bound, &cc->Outputs().Get("", i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
FlowLimiterCalculatorOptions options_;
|
FlowLimiterCalculatorOptions options_;
|
||||||
std::vector<std::deque<Packet>> input_queues_;
|
std::vector<std::deque<Packet>> input_queues_;
|
||||||
std::deque<Timestamp> frames_in_flight_;
|
std::deque<Timestamp> frames_in_flight_;
|
||||||
|
std::map<Timestamp, bool> allowed_;
|
||||||
};
|
};
|
||||||
REGISTER_CALCULATOR(FlowLimiterCalculator);
|
REGISTER_CALCULATOR(FlowLimiterCalculator);
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <utility>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#include "absl/time/clock.h"
|
#include "absl/time/clock.h"
|
||||||
|
@ -32,6 +33,7 @@
|
||||||
#include "mediapipe/framework/tool/simulation_clock.h"
|
#include "mediapipe/framework/tool/simulation_clock.h"
|
||||||
#include "mediapipe/framework/tool/simulation_clock_executor.h"
|
#include "mediapipe/framework/tool/simulation_clock_executor.h"
|
||||||
#include "mediapipe/framework/tool/sink.h"
|
#include "mediapipe/framework/tool/sink.h"
|
||||||
|
#include "mediapipe/util/packet_test_util.h"
|
||||||
|
|
||||||
namespace mediapipe {
|
namespace mediapipe {
|
||||||
|
|
||||||
|
@ -77,6 +79,77 @@ std::vector<T> PacketValues(const std::vector<Packet>& packets) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
std::vector<Packet> MakePackets(std::vector<std::pair<Timestamp, T>> contents) {
|
||||||
|
std::vector<Packet> result;
|
||||||
|
for (auto& entry : contents) {
|
||||||
|
result.push_back(MakePacket<T>(entry.second).At(entry.first));
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string SourceString(Timestamp t) {
|
||||||
|
return (t.IsSpecialValue())
|
||||||
|
? t.DebugString()
|
||||||
|
: absl::StrCat("Timestamp(", t.DebugString(), ")");
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename PacketContainer, typename PacketContent>
|
||||||
|
class PacketsEqMatcher
|
||||||
|
: public ::testing::MatcherInterface<const PacketContainer&> {
|
||||||
|
public:
|
||||||
|
PacketsEqMatcher(PacketContainer packets) : packets_(packets) {}
|
||||||
|
void DescribeTo(::std::ostream* os) const override {
|
||||||
|
*os << "The expected packet contents: \n";
|
||||||
|
Print(packets_, os);
|
||||||
|
}
|
||||||
|
bool MatchAndExplain(
|
||||||
|
const PacketContainer& value,
|
||||||
|
::testing::MatchResultListener* listener) const override {
|
||||||
|
if (!Equals(packets_, value)) {
|
||||||
|
if (listener->IsInterested()) {
|
||||||
|
*listener << "The actual packet contents: \n";
|
||||||
|
Print(value, listener->stream());
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
bool Equals(const PacketContainer& c1, const PacketContainer& c2) const {
|
||||||
|
if (c1.size() != c2.size()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
for (auto i1 = c1.begin(), i2 = c2.begin(); i1 != c1.end(); ++i1, ++i2) {
|
||||||
|
Packet p1 = *i1, p2 = *i2;
|
||||||
|
if (p1.Timestamp() != p2.Timestamp() ||
|
||||||
|
p1.Get<PacketContent>() != p2.Get<PacketContent>()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
void Print(const PacketContainer& packets, ::std::ostream* os) const {
|
||||||
|
for (auto it = packets.begin(); it != packets.end(); ++it) {
|
||||||
|
const Packet& packet = *it;
|
||||||
|
*os << (it == packets.begin() ? "{" : "") << "{"
|
||||||
|
<< SourceString(packet.Timestamp()) << ", "
|
||||||
|
<< packet.Get<PacketContent>() << "}"
|
||||||
|
<< (std::next(it) == packets.end() ? "}" : ", ");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const PacketContainer packets_;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <typename PacketContainer, typename PacketContent>
|
||||||
|
::testing::Matcher<const PacketContainer&> PackestEq(
|
||||||
|
const PacketContainer& packets) {
|
||||||
|
return MakeMatcher(
|
||||||
|
new PacketsEqMatcher<PacketContainer, PacketContent>(packets));
|
||||||
|
}
|
||||||
|
|
||||||
// A Calculator::Process callback function.
|
// A Calculator::Process callback function.
|
||||||
typedef std::function<absl::Status(const InputStreamShardSet&,
|
typedef std::function<absl::Status(const InputStreamShardSet&,
|
||||||
OutputStreamShardSet*)>
|
OutputStreamShardSet*)>
|
||||||
|
@ -651,11 +724,12 @@ TEST_F(FlowLimiterCalculatorTest, TwoInputStreams) {
|
||||||
input_packets_[17], input_packets_[19], input_packets_[20],
|
input_packets_[17], input_packets_[19], input_packets_[20],
|
||||||
};
|
};
|
||||||
EXPECT_EQ(out_1_packets_, expected_output);
|
EXPECT_EQ(out_1_packets_, expected_output);
|
||||||
// Exactly the timestamps released by FlowLimiterCalculator for in_1_sampled.
|
// The timestamps released by FlowLimiterCalculator for in_1_sampled,
|
||||||
|
// plus input_packets_[21].
|
||||||
std::vector<Packet> expected_output_2 = {
|
std::vector<Packet> expected_output_2 = {
|
||||||
input_packets_[0], input_packets_[2], input_packets_[4],
|
input_packets_[0], input_packets_[2], input_packets_[4],
|
||||||
input_packets_[14], input_packets_[17], input_packets_[19],
|
input_packets_[14], input_packets_[17], input_packets_[19],
|
||||||
input_packets_[20],
|
input_packets_[20], input_packets_[21],
|
||||||
};
|
};
|
||||||
EXPECT_EQ(out_2_packets, expected_output_2);
|
EXPECT_EQ(out_2_packets, expected_output_2);
|
||||||
}
|
}
|
||||||
|
@ -665,6 +739,9 @@ TEST_F(FlowLimiterCalculatorTest, TwoInputStreams) {
|
||||||
// The processing time "sleep_time" is reduced from 22ms to 12ms to create
|
// The processing time "sleep_time" is reduced from 22ms to 12ms to create
|
||||||
// the same frame rate as FlowLimiterCalculatorTest::TwoInputStreams.
|
// the same frame rate as FlowLimiterCalculatorTest::TwoInputStreams.
|
||||||
TEST_F(FlowLimiterCalculatorTest, ZeroQueue) {
|
TEST_F(FlowLimiterCalculatorTest, ZeroQueue) {
|
||||||
|
auto BoolPackestEq = PackestEq<std::vector<Packet>, bool>;
|
||||||
|
auto IntPackestEq = PackestEq<std::vector<Packet>, int>;
|
||||||
|
|
||||||
// Configure the test.
|
// Configure the test.
|
||||||
SetUpInputData();
|
SetUpInputData();
|
||||||
SetUpSimulationClock();
|
SetUpSimulationClock();
|
||||||
|
@ -699,10 +776,9 @@ TEST_F(FlowLimiterCalculatorTest, ZeroQueue) {
|
||||||
}
|
}
|
||||||
)pb");
|
)pb");
|
||||||
|
|
||||||
auto limiter_options = ParseTextProtoOrDie<FlowLimiterCalculatorOptions>(R"pb(
|
auto limiter_options = ParseTextProtoOrDie<FlowLimiterCalculatorOptions>(
|
||||||
max_in_flight: 1
|
R"pb(
|
||||||
max_in_queue: 0
|
max_in_flight: 1 max_in_queue: 0 in_flight_timeout: 100000 # 100 ms
|
||||||
in_flight_timeout: 100000 # 100 ms
|
|
||||||
)pb");
|
)pb");
|
||||||
std::map<std::string, Packet> side_packets = {
|
std::map<std::string, Packet> side_packets = {
|
||||||
{"limiter_options",
|
{"limiter_options",
|
||||||
|
@ -759,13 +835,131 @@ TEST_F(FlowLimiterCalculatorTest, ZeroQueue) {
|
||||||
input_packets_[0], input_packets_[2], input_packets_[15],
|
input_packets_[0], input_packets_[2], input_packets_[15],
|
||||||
input_packets_[17], input_packets_[19],
|
input_packets_[17], input_packets_[19],
|
||||||
};
|
};
|
||||||
EXPECT_EQ(out_1_packets_, expected_output);
|
EXPECT_THAT(out_1_packets_, IntPackestEq(expected_output));
|
||||||
// Exactly the timestamps released by FlowLimiterCalculator for in_1_sampled.
|
// Exactly the timestamps released by FlowLimiterCalculator for in_1_sampled.
|
||||||
std::vector<Packet> expected_output_2 = {
|
std::vector<Packet> expected_output_2 = {
|
||||||
input_packets_[0], input_packets_[2], input_packets_[4],
|
input_packets_[0], input_packets_[2], input_packets_[4],
|
||||||
input_packets_[15], input_packets_[17], input_packets_[19],
|
input_packets_[15], input_packets_[17], input_packets_[19],
|
||||||
};
|
};
|
||||||
EXPECT_EQ(out_2_packets, expected_output_2);
|
EXPECT_THAT(out_2_packets, IntPackestEq(expected_output_2));
|
||||||
|
|
||||||
|
// Validate the ALLOW stream output.
|
||||||
|
std::vector<Packet> expected_allow = MakePackets<bool>( //
|
||||||
|
{{Timestamp(0), true}, {Timestamp(10000), false},
|
||||||
|
{Timestamp(20000), true}, {Timestamp(30000), false},
|
||||||
|
{Timestamp(40000), true}, {Timestamp(50000), false},
|
||||||
|
{Timestamp(60000), false}, {Timestamp(70000), false},
|
||||||
|
{Timestamp(80000), false}, {Timestamp(90000), false},
|
||||||
|
{Timestamp(100000), false}, {Timestamp(110000), false},
|
||||||
|
{Timestamp(120000), false}, {Timestamp(130000), false},
|
||||||
|
{Timestamp(140000), false}, {Timestamp(150000), true},
|
||||||
|
{Timestamp(160000), false}, {Timestamp(170000), true},
|
||||||
|
{Timestamp(180000), false}, {Timestamp(190000), true},
|
||||||
|
{Timestamp(200000), false}});
|
||||||
|
EXPECT_THAT(allow_packets_, BoolPackestEq(expected_allow));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shows how FlowLimiterCalculator releases auxiliary input packets.
|
||||||
|
// In this test, auxiliary input packets arrive at twice the primary rate.
|
||||||
|
TEST_F(FlowLimiterCalculatorTest, AuxiliaryInputs) {
|
||||||
|
auto BoolPackestEq = PackestEq<std::vector<Packet>, bool>;
|
||||||
|
auto IntPackestEq = PackestEq<std::vector<Packet>, int>;
|
||||||
|
|
||||||
|
// Configure the test.
|
||||||
|
SetUpInputData();
|
||||||
|
SetUpSimulationClock();
|
||||||
|
CalculatorGraphConfig graph_config =
|
||||||
|
ParseTextProtoOrDie<CalculatorGraphConfig>(R"pb(
|
||||||
|
input_stream: 'in_1'
|
||||||
|
input_stream: 'in_2'
|
||||||
|
node {
|
||||||
|
calculator: 'FlowLimiterCalculator'
|
||||||
|
input_side_packet: 'OPTIONS:limiter_options'
|
||||||
|
input_stream: 'in_1'
|
||||||
|
input_stream: 'in_2'
|
||||||
|
input_stream: 'FINISHED:out_1'
|
||||||
|
input_stream_info: { tag_index: 'FINISHED' back_edge: true }
|
||||||
|
output_stream: 'in_1_sampled'
|
||||||
|
output_stream: 'in_2_sampled'
|
||||||
|
output_stream: 'ALLOW:allow'
|
||||||
|
}
|
||||||
|
node {
|
||||||
|
calculator: 'SleepCalculator'
|
||||||
|
input_side_packet: 'WARMUP_TIME:warmup_time'
|
||||||
|
input_side_packet: 'SLEEP_TIME:sleep_time'
|
||||||
|
input_side_packet: 'CLOCK:clock'
|
||||||
|
input_stream: 'PACKET:in_1_sampled'
|
||||||
|
output_stream: 'PACKET:out_1'
|
||||||
|
}
|
||||||
|
)pb");
|
||||||
|
|
||||||
|
auto limiter_options = ParseTextProtoOrDie<FlowLimiterCalculatorOptions>(
|
||||||
|
R"pb(
|
||||||
|
max_in_flight: 1 max_in_queue: 0 in_flight_timeout: 1000000 # 1s
|
||||||
|
)pb");
|
||||||
|
std::map<std::string, Packet> side_packets = {
|
||||||
|
{"limiter_options",
|
||||||
|
MakePacket<FlowLimiterCalculatorOptions>(limiter_options)},
|
||||||
|
{"warmup_time", MakePacket<int64>(22000)},
|
||||||
|
{"sleep_time", MakePacket<int64>(22000)},
|
||||||
|
{"clock", MakePacket<mediapipe::Clock*>(clock_)},
|
||||||
|
};
|
||||||
|
|
||||||
|
// Start the graph.
|
||||||
|
MP_ASSERT_OK(graph_.Initialize(graph_config));
|
||||||
|
MP_EXPECT_OK(graph_.ObserveOutputStream("out_1", [this](Packet p) {
|
||||||
|
out_1_packets_.push_back(p);
|
||||||
|
return absl::OkStatus();
|
||||||
|
}));
|
||||||
|
std::vector<Packet> out_2_packets;
|
||||||
|
MP_EXPECT_OK(graph_.ObserveOutputStream("in_2_sampled", [&](Packet p) {
|
||||||
|
out_2_packets.push_back(p);
|
||||||
|
return absl::OkStatus();
|
||||||
|
}));
|
||||||
|
MP_EXPECT_OK(graph_.ObserveOutputStream("allow", [this](Packet p) {
|
||||||
|
allow_packets_.push_back(p);
|
||||||
|
return absl::OkStatus();
|
||||||
|
}));
|
||||||
|
simulation_clock_->ThreadStart();
|
||||||
|
MP_ASSERT_OK(graph_.StartRun(side_packets));
|
||||||
|
|
||||||
|
// Add packets 2,4,6,8 to stream in_1 and 1..9 to stream in_2.
|
||||||
|
clock_->Sleep(absl::Microseconds(10000));
|
||||||
|
for (int i = 1; i < 10; ++i) {
|
||||||
|
if (i % 2 == 0) {
|
||||||
|
MP_EXPECT_OK(graph_.AddPacketToInputStream("in_1", input_packets_[i]));
|
||||||
|
}
|
||||||
|
MP_EXPECT_OK(graph_.AddPacketToInputStream("in_2", input_packets_[i]));
|
||||||
|
clock_->Sleep(absl::Microseconds(10000));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finish the graph run.
|
||||||
|
MP_EXPECT_OK(graph_.CloseAllPacketSources());
|
||||||
|
clock_->Sleep(absl::Microseconds(40000));
|
||||||
|
MP_EXPECT_OK(graph_.WaitUntilDone());
|
||||||
|
simulation_clock_->ThreadFinish();
|
||||||
|
|
||||||
|
// Validate the output.
|
||||||
|
// Input packets 4 and 8 are dropped due to max_in_flight.
|
||||||
|
std::vector<Packet> expected_output = {
|
||||||
|
input_packets_[2],
|
||||||
|
input_packets_[6],
|
||||||
|
};
|
||||||
|
EXPECT_THAT(out_1_packets_, IntPackestEq(expected_output));
|
||||||
|
// Packets following input packets 2 and 6, and not input packets 4 and 8.
|
||||||
|
std::vector<Packet> expected_output_2 = {
|
||||||
|
input_packets_[1], input_packets_[2], input_packets_[3],
|
||||||
|
input_packets_[6], input_packets_[7],
|
||||||
|
};
|
||||||
|
EXPECT_THAT(out_2_packets, IntPackestEq(expected_output_2));
|
||||||
|
|
||||||
|
// Validate the ALLOW stream output.
|
||||||
|
std::vector<Packet> expected_allow =
|
||||||
|
MakePackets<bool>({{Timestamp(20000), 1},
|
||||||
|
{Timestamp(40000), 0},
|
||||||
|
{Timestamp(60000), 1},
|
||||||
|
{Timestamp(80000), 0}});
|
||||||
|
EXPECT_THAT(allow_packets_, BoolPackestEq(expected_allow));
|
||||||
}
|
}
|
||||||
|
|
||||||
} // anonymous namespace
|
} // anonymous namespace
|
||||||
|
|
Loading…
Reference in New Issue
Block a user