Internal change

PiperOrigin-RevId: 487880137
This commit is contained in:
Hadon Nash 2022-11-11 11:48:22 -08:00 committed by Copybara-Service
parent ce292c2a49
commit a83d87e157

View File

@ -79,21 +79,25 @@ std::vector<T> PacketValues(const std::vector<Packet>& packets) {
return result; return result;
} }
template <typename T>
std::vector<Packet> MakePackets(std::vector<std::pair<Timestamp, T>> contents) {
std::vector<Packet> result;
for (auto& entry : contents) {
result.push_back(MakePacket<T>(entry.second).At(entry.first));
}
return result;
}
std::string SourceString(Timestamp t) { std::string SourceString(Timestamp t) {
return (t.IsSpecialValue()) return (t.IsSpecialValue())
? t.DebugString() ? t.DebugString()
: absl::StrCat("Timestamp(", t.DebugString(), ")"); : absl::StrCat("Timestamp(", t.DebugString(), ")");
} }
template <typename T>
std::string SourceString(Packet packet) {
std::ostringstream oss;
if (packet.IsEmpty()) {
oss << "Packet()";
} else {
oss << "MakePacket<" << MediaPipeTypeStringOrDemangled<T>() << ">("
<< packet.Get<T>() << ")";
}
oss << ".At(" << SourceString(packet.Timestamp()) << ")";
return oss.str();
}
template <typename PacketContainer, typename PacketContent> template <typename PacketContainer, typename PacketContent>
class PacketsEqMatcher class PacketsEqMatcher
: public ::testing::MatcherInterface<const PacketContainer&> { : public ::testing::MatcherInterface<const PacketContainer&> {
@ -123,8 +127,9 @@ class PacketsEqMatcher
} }
for (auto i1 = c1.begin(), i2 = c2.begin(); i1 != c1.end(); ++i1, ++i2) { for (auto i1 = c1.begin(), i2 = c2.begin(); i1 != c1.end(); ++i1, ++i2) {
Packet p1 = *i1, p2 = *i2; Packet p1 = *i1, p2 = *i2;
if (p1.Timestamp() != p2.Timestamp() || if (p1.Timestamp() != p2.Timestamp() || p1.IsEmpty() != p2.IsEmpty() ||
p1.Get<PacketContent>() != p2.Get<PacketContent>()) { (!p1.IsEmpty() &&
p1.Get<PacketContent>() != p2.Get<PacketContent>())) {
return false; return false;
} }
} }
@ -133,10 +138,9 @@ class PacketsEqMatcher
void Print(const PacketContainer& packets, ::std::ostream* os) const { void Print(const PacketContainer& packets, ::std::ostream* os) const {
for (auto it = packets.begin(); it != packets.end(); ++it) { for (auto it = packets.begin(); it != packets.end(); ++it) {
const Packet& packet = *it; const Packet& packet = *it;
*os << (it == packets.begin() ? "{" : "") << "{" *os << (it == packets.begin() ? "{" : "");
<< SourceString(packet.Timestamp()) << ", " *os << SourceString<PacketContent>(packet);
<< packet.Get<PacketContent>() << "}" *os << (std::next(it) == packets.end() ? "}" : ", ");
<< (std::next(it) == packets.end() ? "}" : ", ");
} }
} }
@ -144,7 +148,7 @@ class PacketsEqMatcher
}; };
template <typename PacketContainer, typename PacketContent> template <typename PacketContainer, typename PacketContent>
::testing::Matcher<const PacketContainer&> PackestEq( ::testing::Matcher<const PacketContainer&> PacketsEq(
const PacketContainer& packets) { const PacketContainer& packets) {
return MakeMatcher( return MakeMatcher(
new PacketsEqMatcher<PacketContainer, PacketContent>(packets)); new PacketsEqMatcher<PacketContainer, PacketContent>(packets));
@ -739,8 +743,8 @@ TEST_F(FlowLimiterCalculatorTest, TwoInputStreams) {
// The processing time "sleep_time" is reduced from 22ms to 12ms to create // The processing time "sleep_time" is reduced from 22ms to 12ms to create
// the same frame rate as FlowLimiterCalculatorTest::TwoInputStreams. // the same frame rate as FlowLimiterCalculatorTest::TwoInputStreams.
TEST_F(FlowLimiterCalculatorTest, ZeroQueue) { TEST_F(FlowLimiterCalculatorTest, ZeroQueue) {
auto BoolPackestEq = PackestEq<std::vector<Packet>, bool>; auto BoolPacketsEq = PacketsEq<std::vector<Packet>, bool>;
auto IntPackestEq = PackestEq<std::vector<Packet>, int>; auto IntPacketsEq = PacketsEq<std::vector<Packet>, int>;
// Configure the test. // Configure the test.
SetUpInputData(); SetUpInputData();
@ -835,52 +839,86 @@ TEST_F(FlowLimiterCalculatorTest, ZeroQueue) {
input_packets_[0], input_packets_[2], input_packets_[15], input_packets_[0], input_packets_[2], input_packets_[15],
input_packets_[17], input_packets_[19], input_packets_[17], input_packets_[19],
}; };
EXPECT_THAT(out_1_packets_, IntPackestEq(expected_output)); EXPECT_THAT(out_1_packets_, IntPacketsEq(expected_output));
// Exactly the timestamps released by FlowLimiterCalculator for in_1_sampled. // Exactly the timestamps released by FlowLimiterCalculator for in_1_sampled.
std::vector<Packet> expected_output_2 = { std::vector<Packet> expected_output_2 = {
input_packets_[0], input_packets_[2], input_packets_[4], input_packets_[0], input_packets_[2], input_packets_[4],
input_packets_[15], input_packets_[17], input_packets_[19], input_packets_[15], input_packets_[17], input_packets_[19],
}; };
EXPECT_THAT(out_2_packets, IntPackestEq(expected_output_2)); EXPECT_THAT(out_2_packets, IntPacketsEq(expected_output_2));
// Validate the ALLOW stream output. // Validate the ALLOW stream output.
std::vector<Packet> expected_allow = MakePackets<bool>( // std::vector<Packet> expected_allow = {
{{Timestamp(0), true}, {Timestamp(10000), false}, MakePacket<bool>(true).At(Timestamp(0)),
{Timestamp(20000), true}, {Timestamp(30000), false}, MakePacket<bool>(false).At(Timestamp(10000)),
{Timestamp(40000), true}, {Timestamp(50000), false}, MakePacket<bool>(true).At(Timestamp(20000)),
{Timestamp(60000), false}, {Timestamp(70000), false}, MakePacket<bool>(false).At(Timestamp(30000)),
{Timestamp(80000), false}, {Timestamp(90000), false}, MakePacket<bool>(true).At(Timestamp(40000)),
{Timestamp(100000), false}, {Timestamp(110000), false}, MakePacket<bool>(false).At(Timestamp(50000)),
{Timestamp(120000), false}, {Timestamp(130000), false}, MakePacket<bool>(false).At(Timestamp(60000)),
{Timestamp(140000), false}, {Timestamp(150000), true}, MakePacket<bool>(false).At(Timestamp(70000)),
{Timestamp(160000), false}, {Timestamp(170000), true}, MakePacket<bool>(false).At(Timestamp(80000)),
{Timestamp(180000), false}, {Timestamp(190000), true}, MakePacket<bool>(false).At(Timestamp(90000)),
{Timestamp(200000), false}}); MakePacket<bool>(false).At(Timestamp(100000)),
EXPECT_THAT(allow_packets_, BoolPackestEq(expected_allow)); MakePacket<bool>(false).At(Timestamp(110000)),
MakePacket<bool>(false).At(Timestamp(120000)),
MakePacket<bool>(false).At(Timestamp(130000)),
MakePacket<bool>(false).At(Timestamp(140000)),
MakePacket<bool>(true).At(Timestamp(150000)),
MakePacket<bool>(false).At(Timestamp(160000)),
MakePacket<bool>(true).At(Timestamp(170000)),
MakePacket<bool>(false).At(Timestamp(180000)),
MakePacket<bool>(true).At(Timestamp(190000)),
MakePacket<bool>(false).At(Timestamp(200000)),
};
EXPECT_THAT(allow_packets_, BoolPacketsEq(expected_allow));
}
std::vector<Packet> StripBoundsUpdates(const std::vector<Packet>& packets,
Timestamp begin = Timestamp::Min(),
Timestamp end = Timestamp::Max()) {
std::vector<Packet> result;
for (const auto& packet : packets) {
Timestamp ts = packet.Timestamp();
if (packet.IsEmpty() && ts >= begin && ts < end) {
continue;
}
result.push_back(packet);
}
return result;
} }
// Shows how FlowLimiterCalculator releases auxiliary input packets. // Shows how FlowLimiterCalculator releases auxiliary input packets.
// In this test, auxiliary input packets arrive at twice the primary rate. // In this test, auxiliary input packets arrive at twice the primary rate.
TEST_F(FlowLimiterCalculatorTest, AuxiliaryInputs) { TEST_F(FlowLimiterCalculatorTest, AuxiliaryInputs) {
auto BoolPackestEq = PackestEq<std::vector<Packet>, bool>; auto BoolPacketsEq = PacketsEq<std::vector<Packet>, bool>;
auto IntPackestEq = PackestEq<std::vector<Packet>, int>; auto IntPacketsEq = PacketsEq<std::vector<Packet>, int>;
// Configure the test. // Configure the test.
SetUpInputData(); SetUpInputData();
SetUpSimulationClock(); SetUpSimulationClock();
CalculatorGraphConfig graph_config = CalculatorGraphConfig graph_config =
ParseTextProtoOrDie<CalculatorGraphConfig>(R"pb( ParseTextProtoOrDie<CalculatorGraphConfig>(R"pb(
input_stream: 'in_1' input_stream: 'input_1'
input_stream: 'in_2' input_stream: 'auxiliary_input_2'
input_stream: 'auxiliary_input_3'
node { node {
calculator: 'FlowLimiterCalculator' calculator: 'FlowLimiterCalculator'
input_side_packet: 'OPTIONS:limiter_options' options {
input_stream: 'in_1' [mediapipe.FlowLimiterCalculatorOptions.ext] {
input_stream: 'in_2' max_in_flight: 1
max_in_queue: 0
in_flight_timeout: 1000000 # 1s
}
}
input_stream: 'input_1'
input_stream: 'auxiliary_input_2'
input_stream: 'auxiliary_input_3'
input_stream: 'FINISHED:out_1' input_stream: 'FINISHED:out_1'
input_stream_info: { tag_index: 'FINISHED' back_edge: true } input_stream_info: { tag_index: 'FINISHED' back_edge: true }
output_stream: 'in_1_sampled' output_stream: 'input_1_sampled'
output_stream: 'in_2_sampled' output_stream: 'auxiliary_input_2_sampled'
output_stream: 'auxiliary_input_3_sampled'
output_stream: 'ALLOW:allow' output_stream: 'ALLOW:allow'
} }
node { node {
@ -888,49 +926,75 @@ TEST_F(FlowLimiterCalculatorTest, AuxiliaryInputs) {
input_side_packet: 'WARMUP_TIME:warmup_time' input_side_packet: 'WARMUP_TIME:warmup_time'
input_side_packet: 'SLEEP_TIME:sleep_time' input_side_packet: 'SLEEP_TIME:sleep_time'
input_side_packet: 'CLOCK:clock' input_side_packet: 'CLOCK:clock'
input_stream: 'PACKET:in_1_sampled' input_stream: 'PACKET:input_1_sampled'
output_stream: 'PACKET:out_1' output_stream: 'PACKET:out_1'
} }
)pb"); )pb");
auto limiter_options = ParseTextProtoOrDie<FlowLimiterCalculatorOptions>(
R"pb(
max_in_flight: 1 max_in_queue: 0 in_flight_timeout: 1000000 # 1s
)pb");
std::map<std::string, Packet> side_packets = { std::map<std::string, Packet> side_packets = {
{"limiter_options", // Fake processing lazy initialization time in microseconds.
MakePacket<FlowLimiterCalculatorOptions>(limiter_options)},
{"warmup_time", MakePacket<int64>(22000)}, {"warmup_time", MakePacket<int64>(22000)},
// Fake processing duration in microseconds.
{"sleep_time", MakePacket<int64>(22000)}, {"sleep_time", MakePacket<int64>(22000)},
// The SimulationClock to count virtual elapsed time.
{"clock", MakePacket<mediapipe::Clock*>(clock_)}, {"clock", MakePacket<mediapipe::Clock*>(clock_)},
}; };
// Start the graph. // Start the graph.
MP_ASSERT_OK(graph_.Initialize(graph_config)); MP_ASSERT_OK(graph_.Initialize(graph_config));
MP_EXPECT_OK(graph_.ObserveOutputStream("out_1", [this](Packet p) { MP_EXPECT_OK(graph_.ObserveOutputStream(
"out_1",
[this](Packet p) {
out_1_packets_.push_back(p); out_1_packets_.push_back(p);
return absl::OkStatus(); return absl::OkStatus();
})); },
std::vector<Packet> out_2_packets; true));
MP_EXPECT_OK(graph_.ObserveOutputStream("in_2_sampled", [&](Packet p) { std::vector<Packet> out_2_packets, out_3_packets;
MP_EXPECT_OK(graph_.ObserveOutputStream(
"auxiliary_input_2_sampled",
[&](Packet p) {
out_2_packets.push_back(p); out_2_packets.push_back(p);
return absl::OkStatus(); return absl::OkStatus();
})); },
MP_EXPECT_OK(graph_.ObserveOutputStream("allow", [this](Packet p) { true));
MP_EXPECT_OK(graph_.ObserveOutputStream(
"auxiliary_input_3_sampled",
[&](Packet p) {
out_3_packets.push_back(p);
return absl::OkStatus();
},
true));
MP_EXPECT_OK(graph_.ObserveOutputStream(
"allow",
[this](Packet p) {
allow_packets_.push_back(p); allow_packets_.push_back(p);
return absl::OkStatus(); return absl::OkStatus();
})); },
true));
simulation_clock_->ThreadStart(); simulation_clock_->ThreadStart();
MP_ASSERT_OK(graph_.StartRun(side_packets)); MP_ASSERT_OK(graph_.StartRun(side_packets));
// Add packets 2,4,6,8 to stream in_1 and 1..9 to stream in_2. // Add packets 1..9 to auxiliary_input_3, early.
clock_->Sleep(absl::Microseconds(10000)); for (int i = 1; i < 10; ++i) {
MP_EXPECT_OK(graph_.AddPacketToInputStream(
"auxiliary_input_3", MakePacket<int>(i).At(Timestamp(i * 10000))));
}
// The total count of out_2_packets after each input packet.
// std::vector<int> sizes_2 = {0, 0, 2, 2, 3, 3, 4, 4, 5, 5};
std::vector<int> sizes_2 = {0, 1, 3, 4, 6, 7, 9, 10, 12, 13};
// Add packets 2,4,6,8 to stream input_1.
// Add packets 1..9 to auxiliary_input_2.
for (int i = 1; i < 10; ++i) { for (int i = 1; i < 10; ++i) {
if (i % 2 == 0) { if (i % 2 == 0) {
MP_EXPECT_OK(graph_.AddPacketToInputStream("in_1", input_packets_[i])); MP_EXPECT_OK(graph_.AddPacketToInputStream(
"input_1", MakePacket<int>(i).At(Timestamp(i * 10000))));
} }
MP_EXPECT_OK(graph_.AddPacketToInputStream("in_2", input_packets_[i])); MP_EXPECT_OK(graph_.AddPacketToInputStream(
"auxiliary_input_2", MakePacket<int>(i).At(Timestamp(i * 10000))));
clock_->Sleep(absl::Microseconds(10000)); clock_->Sleep(absl::Microseconds(10000));
EXPECT_EQ(out_2_packets.size(), sizes_2[i]);
} }
// Finish the graph run. // Finish the graph run.
@ -942,24 +1006,46 @@ TEST_F(FlowLimiterCalculatorTest, AuxiliaryInputs) {
// Validate the output. // Validate the output.
// Input packets 4 and 8 are dropped due to max_in_flight. // Input packets 4 and 8 are dropped due to max_in_flight.
std::vector<Packet> expected_output = { std::vector<Packet> expected_output = {
input_packets_[2], MakePacket<int>(2).At(Timestamp(20000)),
input_packets_[6], Packet().At(Timestamp(40000)),
MakePacket<int>(6).At(Timestamp(60000)),
Packet().At(Timestamp(80000)),
}; };
EXPECT_THAT(out_1_packets_, IntPackestEq(expected_output)); EXPECT_THAT(out_1_packets_, IntPacketsEq(expected_output));
// Packets following input packets 2 and 6, and not input packets 4 and 8. // Packets following input packets 2 and 6, and not input packets 4 and 8.
std::vector<Packet> expected_output_2 = { std::vector<Packet> expected_auxiliary_output = {
input_packets_[1], input_packets_[2], input_packets_[3], Packet().At(Timestamp(9999)),
input_packets_[6], input_packets_[7], MakePacket<int>(1).At(Timestamp(10000)),
MakePacket<int>(2).At(Timestamp(20000)),
Packet().At(Timestamp(29999)),
MakePacket<int>(3).At(Timestamp(30000)),
Packet().At(Timestamp(40000)),
Packet().At(Timestamp(49999)),
Packet().At(Timestamp(50000)),
MakePacket<int>(6).At(Timestamp(60000)),
Packet().At(Timestamp(69999)),
MakePacket<int>(7).At(Timestamp(70000)),
Packet().At(Timestamp(80000)),
Packet().At(Timestamp(89999)),
}; };
EXPECT_THAT(out_2_packets, IntPackestEq(expected_output_2)); std::vector<Packet> actual_2 =
StripBoundsUpdates(out_2_packets, Timestamp(90000));
EXPECT_THAT(actual_2, IntPacketsEq(expected_auxiliary_output));
std::vector<Packet> expected_3 =
StripBoundsUpdates(expected_auxiliary_output, Timestamp(39999));
std::vector<Packet> actual_3 =
StripBoundsUpdates(out_3_packets, Timestamp(39999));
EXPECT_THAT(actual_3, IntPacketsEq(expected_3));
// Validate the ALLOW stream output. // Validate the ALLOW stream output.
std::vector<Packet> expected_allow = std::vector<Packet> expected_allow = {
MakePackets<bool>({{Timestamp(20000), 1}, MakePacket<bool>(true).At(Timestamp(20000)),
{Timestamp(40000), 0}, MakePacket<bool>(false).At(Timestamp(40000)),
{Timestamp(60000), 1}, MakePacket<bool>(true).At(Timestamp(60000)),
{Timestamp(80000), 0}}); MakePacket<bool>(false).At(Timestamp(80000)),
EXPECT_THAT(allow_packets_, BoolPackestEq(expected_allow)); };
EXPECT_THAT(allow_packets_, BoolPacketsEq(expected_allow));
} }
} // anonymous namespace } // anonymous namespace