From e4d7d8605c9c6ec8a973a269c4cba4b3ef1896d3 Mon Sep 17 00:00:00 2001 From: "Stepan Dyatkovskiy (kaomoneus)" Date: Thu, 25 Aug 2022 17:19:21 +0400 Subject: [PATCH] Introduced PacketRateCalculator. PacketRateCalculator allows to calculate rate of incoming packages. E.g. when you want to extract FPS, or any other processing rate. * As input it accepts any packets stream. * As output it emits rate (floating point scalar), namely amount of packets per second based calculation of current packet and previous packet. * For very first packet it emits empty packet. Usage example: node { calculator: "PacketRateCalculator" input_stream: "image" output_stream: "image_fps" } --- mediapipe/calculators/core/BUILD | 34 ++++++++ .../core/packet_rate_calculator.cc | 80 +++++++++++++++++++ .../core/packet_rate_calculator_test.cc | 64 +++++++++++++++ 3 files changed, 178 insertions(+) create mode 100644 mediapipe/calculators/core/packet_rate_calculator.cc create mode 100644 mediapipe/calculators/core/packet_rate_calculator_test.cc diff --git a/mediapipe/calculators/core/BUILD b/mediapipe/calculators/core/BUILD index e741ebad4..29ffa77c8 100644 --- a/mediapipe/calculators/core/BUILD +++ b/mediapipe/calculators/core/BUILD @@ -49,6 +49,40 @@ mediapipe_proto_library( ], ) +cc_library( + name = "packet_rate_calculator", + srcs = ["packet_rate_calculator.cc"], + visibility = [ + "//visibility:public", + ], + deps = [ + "//mediapipe/framework:calculator_framework", + "//mediapipe/framework:collection_item_id", + "//mediapipe/framework/port:integral_types", + "//mediapipe/framework/port:logging", + "//mediapipe/framework/port:ret_check", + "//mediapipe/framework/port:status", + "@com_google_absl//absl/strings", + ], + alwayslink = 1, +) + +cc_test( + name = "packet_rate_calculator_test", + timeout = "short", + srcs = [ + "packet_rate_calculator_test.cc", + ], + deps = [ + ":packet_rate_calculator", + "//mediapipe/framework:calculator_framework", + "//mediapipe/framework:calculator_runner", + "//mediapipe/framework/port:gtest_main", + "//mediapipe/framework/port:parse_text_proto", + "@com_google_absl//absl/strings", + ], +) + mediapipe_proto_library( name = "packet_resampler_calculator_proto", srcs = ["packet_resampler_calculator.proto"], diff --git a/mediapipe/calculators/core/packet_rate_calculator.cc b/mediapipe/calculators/core/packet_rate_calculator.cc new file mode 100644 index 000000000..8d06c0aff --- /dev/null +++ b/mediapipe/calculators/core/packet_rate_calculator.cc @@ -0,0 +1,80 @@ +// Copyright 2022 The MediaPipe Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "mediapipe/framework/calculator_framework.h" +#include "mediapipe/framework/port/ret_check.h" +#include "mediapipe/framework/port/status.h" + +namespace { +using namespace mediapipe; +/*** + * PacketRateCalculator allows to calculate rate of incoming packages. + * E.g. when you want to extract FPS, or any other processing rate. + * As input it accepts any packets stream. + * As output it emits rate (floating point scalar), namely amount of packets per second based + * calculation of current packet and previous packet. + * + * For very first packet it emits empty packet. + * + * Usage example: + * node { + * calculator: "PacketRateCalculator" + * input_stream: "image" + * output_stream: "image_fps" + * } + * + */ +class PacketRateCalculator : public CalculatorBase { +public: + static absl::Status GetContract(CalculatorContract *cc) { + auto &side_inputs = cc->InputSidePackets(); + RET_CHECK_EQ(cc->Inputs().NumEntries(), 1); + RET_CHECK_EQ(cc->Outputs().NumEntries(), 1); + + cc->Inputs().Index(0).SetAny(); + + cc->Outputs().Index(0).Set(); + return absl::OkStatus(); + } + + absl::Status Open(CalculatorContext *cc) final { return absl::OkStatus(); } + + // Releases input packets allowed by the max_in_flight constraint. + absl::Status Process(CalculatorContext *cc) final { + Timestamp latest_ts = cc->Inputs().Index(0).Value().Timestamp(); + + if (prev_timestamp_.Value() != kint64min) { + float seconds_since_last_packet = (latest_ts - prev_timestamp_).Seconds(); + auto rate = std::make_unique(1. / seconds_since_last_packet); + cc->Outputs().Index(0).Add(rate.release(), latest_ts); + } else { + cc->Outputs().Index(0).AddPacket(Packet().At(latest_ts)); + } + + prev_timestamp_ = latest_ts; + + return absl::OkStatus(); + } + +private: + Timestamp prev_timestamp_; +}; +} // namespace + +namespace mediapipe { +REGISTER_CALCULATOR(PacketRateCalculator); +} // namespace mediapipe diff --git a/mediapipe/calculators/core/packet_rate_calculator_test.cc b/mediapipe/calculators/core/packet_rate_calculator_test.cc new file mode 100644 index 000000000..601367e1a --- /dev/null +++ b/mediapipe/calculators/core/packet_rate_calculator_test.cc @@ -0,0 +1,64 @@ +// Copyright 2022 The MediaPipe Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "mediapipe/framework/calculator_framework.h" +#include "mediapipe/framework/calculator_runner.h" +#include "mediapipe/framework/port/gmock.h" +#include "mediapipe/framework/port/gtest.h" +#include "mediapipe/framework/port/parse_text_proto.h" +#include "mediapipe/framework/port/status_matchers.h" // NOLINT + +namespace mediapipe { + +void AddInputVector(const std::vector &input, int64 timestamp, + CalculatorRunner *runner) { + runner->MutableInputs()->Index(0).packets.push_back( + MakePacket>(input).At(Timestamp(timestamp))); +} + +TEST(PacketRateCalculatorTest, EmptyVectorInput) { + + CalculatorGraphConfig::Node node_config = + ParseTextProtoOrDie(R"pb( + calculator: "PacketRateCalculator" + input_stream: "input_packet" + output_stream: "packet_rate" + )pb"); + + CalculatorRunner runner(node_config); + + AddInputVector({0}, /*timestamp (microseconds)=*/1, &runner); + MP_ASSERT_OK(runner.Run()); + + EXPECT_EQ(0, runner.Outputs().Index(0).packets.size()); + + AddInputVector({1}, /*timestamp=*/1001, &runner); + MP_ASSERT_OK(runner.Run()); + + const std::vector &outputs = runner.Outputs().Index(0).packets; + EXPECT_EQ(1, outputs.size()); + + // * first packet came with timestamp 1us + // * second packet has timestamp 1001us + // So period is 1ms, which equal to rate 1000 packets per second. + + auto &v = outputs[0].Get(); + EXPECT_FLOAT_EQ(1e+3, v); +} + +} // namespace mediapipe \ No newline at end of file