413 lines
17 KiB
Markdown
413 lines
17 KiB
Markdown
|
---
|
||
|
layout: default
|
||
|
title: Calculators
|
||
|
parent: Framework Concepts
|
||
|
nav_order: 1
|
||
|
---
|
||
|
|
||
|
# Calculators
|
||
|
{: .no_toc }
|
||
|
|
||
|
1. TOC
|
||
|
{:toc}
|
||
|
---
|
||
|
|
||
|
Each calculator is a node of a graph. We describe how to create a new
|
||
|
calculator, how to initialize a calculator, how to perform its calculations,
|
||
|
input and output streams, timestamps, and options. Each node in the graph is
|
||
|
implemented as a `Calculator`. The bulk of graph execution happens inside its
|
||
|
calculators. A calculator may receive zero or more input streams and/or side
|
||
|
packets and produces zero or more output streams and/or side packets.
|
||
|
|
||
|
## CalculatorBase
|
||
|
|
||
|
A calculator is created by defining a new sub-class of the
|
||
|
[`CalculatorBase`](https://github.com/google/mediapipe/tree/master/mediapipe/framework/calculator_base.cc)
|
||
|
class, implementing a number of methods, and registering the new sub-class with
|
||
|
Mediapipe. At a minimum, a new calculator must implement the below four methods
|
||
|
|
||
|
* `GetContract()`
|
||
|
* Calculator authors can specify the expected types of inputs and outputs
|
||
|
of a calculator in GetContract(). When a graph is initialized, the
|
||
|
framework calls a static method to verify if the packet types of the
|
||
|
connected inputs and outputs match the information in this
|
||
|
specification.
|
||
|
* `Open()`
|
||
|
* After a graph starts, the framework calls `Open()`. The input side
|
||
|
packets are available to the calculator at this point. `Open()`
|
||
|
interprets the node configuration operations (see [Graphs](graphs.md))
|
||
|
and prepares the calculator's per-graph-run state. This function may
|
||
|
also write packets to calculator outputs. An error during `Open()` can
|
||
|
terminate the graph run.
|
||
|
* `Process()`
|
||
|
* For a calculator with inputs, the framework calls `Process()` repeatedly
|
||
|
whenever at least one input stream has a packet available. The framework
|
||
|
by default guarantees that all inputs have the same timestamp (see
|
||
|
[Synchronization](synchronization.md) for more information). Multiple
|
||
|
`Process()` calls can be invoked simultaneously when parallel execution
|
||
|
is enabled. If an error occurs during `Process()`, the framework calls
|
||
|
`Close()` and the graph run terminates.
|
||
|
* `Close()`
|
||
|
* After all calls to `Process()` finish or when all input streams close,
|
||
|
the framework calls `Close()`. This function is always called if
|
||
|
`Open()` was called and succeeded and even if the graph run terminated
|
||
|
because of an error. No inputs are available via any input streams
|
||
|
during `Close()`, but it still has access to input side packets and
|
||
|
therefore may write outputs. After `Close()` returns, the calculator
|
||
|
should be considered a dead node. The calculator object is destroyed as
|
||
|
soon as the graph finishes running.
|
||
|
|
||
|
The following are code snippets from
|
||
|
[CalculatorBase.h](https://github.com/google/mediapipe/tree/master/mediapipe/framework/calculator_base.h).
|
||
|
|
||
|
```c++
|
||
|
class CalculatorBase {
|
||
|
public:
|
||
|
...
|
||
|
|
||
|
// The subclasses of CalculatorBase must implement GetContract.
|
||
|
// ...
|
||
|
static ::MediaPipe::Status GetContract(CalculatorContract* cc);
|
||
|
|
||
|
// Open is called before any Process() calls, on a freshly constructed
|
||
|
// calculator. Subclasses may override this method to perform necessary
|
||
|
// setup, and possibly output Packets and/or set output streams' headers.
|
||
|
// ...
|
||
|
virtual ::MediaPipe::Status Open(CalculatorContext* cc) {
|
||
|
return ::MediaPipe::OkStatus();
|
||
|
}
|
||
|
|
||
|
// Processes the incoming inputs. May call the methods on cc to access
|
||
|
// inputs and produce outputs.
|
||
|
// ...
|
||
|
virtual ::MediaPipe::Status Process(CalculatorContext* cc) = 0;
|
||
|
|
||
|
// Is called if Open() was called and succeeded. Is called either
|
||
|
// immediately after processing is complete or after a graph run has ended
|
||
|
// (if an error occurred in the graph). ...
|
||
|
virtual ::MediaPipe::Status Close(CalculatorContext* cc) {
|
||
|
return ::MediaPipe::OkStatus();
|
||
|
}
|
||
|
|
||
|
...
|
||
|
};
|
||
|
```
|
||
|
|
||
|
## Life of a calculator
|
||
|
|
||
|
During initialization of a MediaPipe graph, the framework calls a
|
||
|
`GetContract()` static method to determine what kinds of packets are expected.
|
||
|
|
||
|
The framework constructs and destroys the entire calculator for each graph run
|
||
|
(e.g. once per video or once per image). Expensive or large objects that remain
|
||
|
constant across graph runs should be supplied as input side packets so the
|
||
|
calculations are not repeated on subsequent runs.
|
||
|
|
||
|
After initialization, for each run of the graph, the following sequence occurs:
|
||
|
|
||
|
* `Open()`
|
||
|
* `Process()` (repeatedly)
|
||
|
* `Close()`
|
||
|
|
||
|
The framework calls `Open()` to initialize the calculator. `Open()` should
|
||
|
interpret any options and set up the calculator's per-graph-run state. `Open()`
|
||
|
may obtain input side packets and write packets to calculator outputs. If
|
||
|
appropriate, it should call `SetOffset()` to reduce potential packet buffering
|
||
|
of input streams.
|
||
|
|
||
|
If an error occurs during `Open()` or `Process()` (as indicated by one of them
|
||
|
returning a non-`Ok` status), the graph run is terminated with no further calls
|
||
|
to the calculator's methods, and the calculator is destroyed.
|
||
|
|
||
|
For a calculator with inputs, the framework calls `Process()` whenever at least
|
||
|
one input has a packet available. The framework guarantees that inputs all have
|
||
|
the same timestamp, that timestamps increase with each call to `Process()` and
|
||
|
that all packets are delivered. As a consequence, some inputs may not have any
|
||
|
packets when `Process()` is called. An input whose packet is missing appears to
|
||
|
produce an empty packet (with no timestamp).
|
||
|
|
||
|
The framework calls `Close()` after all calls to `Process()`. All inputs will
|
||
|
have been exhausted, but `Close()` has access to input side packets and may
|
||
|
write outputs. After Close returns, the calculator is destroyed.
|
||
|
|
||
|
Calculators with no inputs are referred to as sources. A source calculator
|
||
|
continues to have `Process()` called as long as it returns an `Ok` status. A
|
||
|
source calculator indicates that it is exhausted by returning a stop status
|
||
|
(i.e. MediaPipe::tool::StatusStop).
|
||
|
|
||
|
## Identifying inputs and outputs
|
||
|
|
||
|
The public interface to a calculator consists of a set of input streams and
|
||
|
output streams. In a CalculatorGraphConfiguration, the outputs from some
|
||
|
calculators are connected to the inputs of other calculators using named
|
||
|
streams. Stream names are normally lowercase, while input and output tags are
|
||
|
normally UPPERCASE. In the example below, the output with tag name `VIDEO` is
|
||
|
connected to the input with tag name `VIDEO_IN` using the stream named
|
||
|
`video_stream`.
|
||
|
|
||
|
```proto
|
||
|
# Graph describing calculator SomeAudioVideoCalculator
|
||
|
node {
|
||
|
calculator: "SomeAudioVideoCalculator"
|
||
|
input_stream: "INPUT:combined_input"
|
||
|
output_stream: "VIDEO:video_stream"
|
||
|
}
|
||
|
node {
|
||
|
calculator: "SomeVideoCalculator"
|
||
|
input_stream: "VIDEO_IN:video_stream"
|
||
|
output_stream: "VIDEO_OUT:processed_video"
|
||
|
}
|
||
|
```
|
||
|
|
||
|
Input and output streams can be identified by index number, by tag name, or by a
|
||
|
combination of tag name and index number. You can see some examples of input and
|
||
|
output identifiers in the example below. `SomeAudioVideoCalculator` identifies
|
||
|
its video output by tag and its audio outputs by the combination of tag and
|
||
|
index. The input with tag `VIDEO` is connected to the stream named
|
||
|
`video_stream`. The outputs with tag `AUDIO` and indices `0` and `1` are
|
||
|
connected to the streams named `audio_left` and `audio_right`.
|
||
|
`SomeAudioCalculator` identifies its audio inputs by index only (no tag needed).
|
||
|
|
||
|
```proto
|
||
|
# Graph describing calculator SomeAudioVideoCalculator
|
||
|
node {
|
||
|
calculator: "SomeAudioVideoCalculator"
|
||
|
input_stream: "combined_input"
|
||
|
output_stream: "VIDEO:video_stream"
|
||
|
output_stream: "AUDIO:0:audio_left"
|
||
|
output_stream: "AUDIO:1:audio_right"
|
||
|
}
|
||
|
|
||
|
node {
|
||
|
calculator: "SomeAudioCalculator"
|
||
|
input_stream: "audio_left"
|
||
|
input_stream: "audio_right"
|
||
|
output_stream: "audio_energy"
|
||
|
}
|
||
|
```
|
||
|
|
||
|
In the calculator implementation, inputs and outputs are also identified by tag
|
||
|
name and index number. In the function below input are output are identified:
|
||
|
|
||
|
* By index number: The combined input stream is identified simply by index
|
||
|
`0`.
|
||
|
* By tag name: The video output stream is identified by tag name "VIDEO".
|
||
|
* By tag name and index number: The output audio streams are identified by the
|
||
|
combination of the tag name `AUDIO` and the index numbers `0` and `1`.
|
||
|
|
||
|
```c++
|
||
|
// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
|
||
|
class SomeAudioVideoCalculator : public CalculatorBase {
|
||
|
public:
|
||
|
static ::mediapipe::Status GetContract(CalculatorContract* cc) {
|
||
|
cc->Inputs().Index(0).SetAny();
|
||
|
// SetAny() is used to specify that whatever the type of the
|
||
|
// stream is, it's acceptable. This does not mean that any
|
||
|
// packet is acceptable. Packets in the stream still have a
|
||
|
// particular type. SetAny() has the same effect as explicitly
|
||
|
// setting the type to be the stream's type.
|
||
|
cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
|
||
|
cc->Outputs().Get("AUDIO", 0).Set<Matrix>;
|
||
|
cc->Outputs().Get("AUDIO", 1).Set<Matrix>;
|
||
|
return ::mediapipe::OkStatus();
|
||
|
}
|
||
|
```
|
||
|
|
||
|
## Processing
|
||
|
|
||
|
`Process()` called on a non-source node must return `::mediapipe::OkStatus()` to
|
||
|
indicate that all went well, or any other status code to signal an error
|
||
|
|
||
|
If a non-source calculator returns `tool::StatusStop()`, then this signals the
|
||
|
graph is being cancelled early. In this case, all source calculators and graph
|
||
|
input streams will be closed (and remaining Packets will propagate through the
|
||
|
graph).
|
||
|
|
||
|
A source node in a graph will continue to have `Process()` called on it as long
|
||
|
as it returns `::mediapipe::OkStatus(`). To indicate that there is no more data
|
||
|
to be generated return `tool::StatusStop()`. Any other status indicates an error
|
||
|
has occurred.
|
||
|
|
||
|
`Close()` returns `::mediapipe::OkStatus()` to indicate success. Any other
|
||
|
status indicates a failure.
|
||
|
|
||
|
Here is the basic `Process()` function. It uses the `Input()` method (which can
|
||
|
be used only if the calculator has a single input) to request its input data. It
|
||
|
then uses `std::unique_ptr` to allocate the memory needed for the output packet,
|
||
|
and does the calculations. When done it releases the pointer when adding it to
|
||
|
the output stream.
|
||
|
|
||
|
```c++
|
||
|
::util::Status MyCalculator::Process() {
|
||
|
const Matrix& input = Input()->Get<Matrix>();
|
||
|
std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
|
||
|
// do your magic here....
|
||
|
// output->row(n) = ...
|
||
|
Output()->Add(output.release(), InputTimestamp());
|
||
|
return ::mediapipe::OkStatus();
|
||
|
}
|
||
|
```
|
||
|
|
||
|
## Example calculator
|
||
|
|
||
|
This section discusses the implementation of `PacketClonerCalculator`, which
|
||
|
does a relatively simple job, and is used in many calculator graphs.
|
||
|
`PacketClonerCalculator` simply produces a copy of its most recent input
|
||
|
packets on demand.
|
||
|
|
||
|
`PacketClonerCalculator` is useful when the timestamps of arriving data packets
|
||
|
are not aligned perfectly. Suppose we have a room with a microphone, light
|
||
|
sensor and a video camera that is collecting sensory data. Each of the sensors
|
||
|
operates independently and collects data intermittently. Suppose that the output
|
||
|
of each sensor is:
|
||
|
|
||
|
* microphone = loudness in decibels of sound in the room (Integer)
|
||
|
* light sensor = brightness of room (Integer)
|
||
|
* video camera = RGB image frame of room (ImageFrame)
|
||
|
|
||
|
Our simple perception pipeline is designed to process sensory data from these 3
|
||
|
sensors such that at any time when we have image frame data from the camera that
|
||
|
is synchronized with the last collected microphone loudness data and light
|
||
|
sensor brightness data. To do this with MediaPipe, our perception pipeline has 3
|
||
|
input streams:
|
||
|
|
||
|
* room_mic_signal - Each packet of data in this input stream is integer data
|
||
|
representing how loud audio is in a room with timestamp.
|
||
|
* room_lightening_sensor - Each packet of data in this input stream is integer
|
||
|
data representing how bright is the room illuminated with timestamp.
|
||
|
* room_video_tick_signal - Each packet of data in this input stream is
|
||
|
imageframe of video data representing video collected from camera in the
|
||
|
room with timestamp.
|
||
|
|
||
|
Below is the implementation of the `PacketClonerCalculator`. You can see
|
||
|
the `GetContract()`, `Open()`, and `Process()` methods as well as the instance
|
||
|
variable `current_` which holds the most recent input packets.
|
||
|
|
||
|
```c++
|
||
|
// This takes packets from N+1 streams, A_1, A_2, ..., A_N, B.
|
||
|
// For every packet that appears in B, outputs the most recent packet from each
|
||
|
// of the A_i on a separate stream.
|
||
|
|
||
|
#include <vector>
|
||
|
|
||
|
#include "absl/strings/str_cat.h"
|
||
|
#include "mediapipe/framework/calculator_framework.h"
|
||
|
|
||
|
namespace mediapipe {
|
||
|
|
||
|
// For every packet received on the last stream, output the latest packet
|
||
|
// obtained on all other streams. Therefore, if the last stream outputs at a
|
||
|
// higher rate than the others, this effectively clones the packets from the
|
||
|
// other streams to match the last.
|
||
|
//
|
||
|
// Example config:
|
||
|
// node {
|
||
|
// calculator: "PacketClonerCalculator"
|
||
|
// input_stream: "first_base_signal"
|
||
|
// input_stream: "second_base_signal"
|
||
|
// input_stream: "tick_signal"
|
||
|
// output_stream: "cloned_first_base_signal"
|
||
|
// output_stream: "cloned_second_base_signal"
|
||
|
// }
|
||
|
//
|
||
|
class PacketClonerCalculator : public CalculatorBase {
|
||
|
public:
|
||
|
static ::mediapipe::Status GetContract(CalculatorContract* cc) {
|
||
|
const int tick_signal_index = cc->Inputs().NumEntries() - 1;
|
||
|
// cc->Inputs().NumEntries() returns the number of input streams
|
||
|
// for the PacketClonerCalculator
|
||
|
for (int i = 0; i < tick_signal_index; ++i) {
|
||
|
cc->Inputs().Index(i).SetAny();
|
||
|
// cc->Inputs().Index(i) returns the input stream pointer by index
|
||
|
cc->Outputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
|
||
|
}
|
||
|
cc->Inputs().Index(tick_signal_index).SetAny();
|
||
|
return ::mediapipe::OkStatus();
|
||
|
}
|
||
|
|
||
|
::mediapipe::Status Open(CalculatorContext* cc) final {
|
||
|
tick_signal_index_ = cc->Inputs().NumEntries() - 1;
|
||
|
current_.resize(tick_signal_index_);
|
||
|
// Pass along the header for each stream if present.
|
||
|
for (int i = 0; i < tick_signal_index_; ++i) {
|
||
|
if (!cc->Inputs().Index(i).Header().IsEmpty()) {
|
||
|
cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
|
||
|
// Sets the output stream of index i header to be the same as
|
||
|
// the header for the input stream of index i
|
||
|
}
|
||
|
}
|
||
|
return ::mediapipe::OkStatus();
|
||
|
}
|
||
|
|
||
|
::mediapipe::Status Process(CalculatorContext* cc) final {
|
||
|
// Store input signals.
|
||
|
for (int i = 0; i < tick_signal_index_; ++i) {
|
||
|
if (!cc->Inputs().Index(i).Value().IsEmpty()) {
|
||
|
current_[i] = cc->Inputs().Index(i).Value();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Output if the tick signal is non-empty.
|
||
|
if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
|
||
|
for (int i = 0; i < tick_signal_index_; ++i) {
|
||
|
if (!current_[i].IsEmpty()) {
|
||
|
cc->Outputs().Index(i).AddPacket(
|
||
|
current_[i].At(cc->InputTimestamp()));
|
||
|
// Add a packet to output stream of index i a packet from inputstream i
|
||
|
// with timestamp common to all present inputs
|
||
|
//
|
||
|
} else {
|
||
|
cc->Outputs().Index(i).SetNextTimestampBound(
|
||
|
cc->InputTimestamp().NextAllowedInStream());
|
||
|
// if current_[i], 1 packet buffer for input stream i is empty, we will set
|
||
|
// next allowed timestamp for input stream i to be current timestamp + 1
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return ::mediapipe::OkStatus();
|
||
|
}
|
||
|
|
||
|
private:
|
||
|
std::vector<Packet> current_;
|
||
|
int tick_signal_index_;
|
||
|
};
|
||
|
|
||
|
REGISTER_CALCULATOR(PacketClonerCalculator);
|
||
|
} // namespace mediapipe
|
||
|
```
|
||
|
|
||
|
Typically, a calculator has only a .cc file. No .h is required, because
|
||
|
mediapipe uses registration to make calculators known to it. After you have
|
||
|
defined your calculator class, register it with a macro invocation
|
||
|
REGISTER_CALCULATOR(calculator_class_name).
|
||
|
|
||
|
Below is a trivial MediaPipe graph that has 3 input streams, 1 node
|
||
|
(PacketClonerCalculator) and 3 output streams.
|
||
|
|
||
|
```proto
|
||
|
input_stream: "room_mic_signal"
|
||
|
input_stream: "room_lighting_sensor"
|
||
|
input_stream: "room_video_tick_signal"
|
||
|
|
||
|
node {
|
||
|
calculator: "PacketClonerCalculator"
|
||
|
input_stream: "room_mic_signal"
|
||
|
input_stream: "room_lighting_sensor"
|
||
|
input_stream: "room_video_tick_signal"
|
||
|
output_stream: "cloned_room_mic_signal"
|
||
|
output_stream: "cloned_lighting_sensor"
|
||
|
}
|
||
|
```
|
||
|
|
||
|
The diagram below shows how the `PacketClonerCalculator` defines its output
|
||
|
packets based on its series of input packets.
|
||
|
|
||
|
| ![Graph using |
|
||
|
: PacketClonerCalculator](../images/packet_cloner_calculator.png) :
|
||
|
| :--------------------------------------------------------------------------: |
|
||
|
| *Each time it receives a packet on its TICK input stream, the |
|
||
|
: PacketClonerCalculator outputs the most recent packet from each of its input :
|
||
|
: streams. The sequence of output packets is determined by the sequene of :
|
||
|
: input packets and their timestamps. The timestamps are shows along the right :
|
||
|
: side of the diagram.* :
|