GitOrigin-RevId: 2146b10f0a498f665f246e16033b686c7947b92d
7.3 KiB
layout | title | nav_order | has_children | has_toc |
---|---|---|---|---|
default | Processing real-time data streams | 6 | true | false |
Processing real-time data streams
{: .no_toc }
- TOC {:toc}
Realtime timestamps
MediaPipe calculator graphs are often used to process streams of video or audio
frames for interactive applications. The MediaPipe framework requires only that
successive packets be assigned monotonically increasing timestamps. By
convention, realtime calculators and graphs use the recording time or the
presentation time of each frame as its timestamp, with each timestamp indicating
the microseconds since Jan/1/1970:00:00:00
. This allows packets from various
sources to be processed in a globally consistent sequence.
Realtime scheduling
Normally, each Calculator runs as soon as all of its input packets for a given timestamp become available. Normally, this happens when the calculator has finished processing the previous frame, and each of the calculators producing its inputs have finished processing the current frame. The MediaPipe scheduler invokes each calculator as soon as these conditions are met. See Synchronization for more details.
Timestamp bounds
When a calculator does not produce any output packets for a given timestamp, it can instead output a "timestamp bound" indicating that no packet will be produced for that timestamp. This indication is necessary to allow downstream calculators to run at that timestamp, even though no packet has arrived for certain streams for that timestamp. This is especially important for realtime graphs in interactive applications, where it is crucial that each calculator begin processing as soon as possible.
Consider a graph like the following:
node {
calculator: "A"
input_stream: "alpha_in"
output_stream: "alpha"
}
node {
calculator: "B"
input_stream: "alpha"
input_stream: "foo"
output_stream: "beta"
}
Suppose: at timestamp T
, node A
doesn't send a packet in its output stream
alpha
. Node B
gets a packet in foo
at timestamp T
and is waiting for a
packet in alpha
at timestamp T
. If A
doesn't send B
a timestamp bound
update for alpha
, B
will keep waiting for a packet to arrive in alpha
.
Meanwhile, the packet queue of foo
will accumulate packets at T
, T+1
and
so on.
To output a packet on a stream, a calculator uses the API functions
CalculatorContext::Outputs
and OutputStream::Add
. To instead output a
timestamp bound on a stream, a calculator can use the API functions
CalculatorContext::Outputs
and CalculatorContext::SetNextTimestampBound
. The
specified bound is the lowest allowable timestamp for the next packet on the
specified output stream. When no packet is output, a calculator will typically
do something like:
cc->Outputs().Tag("output_frame").SetNextTimestampBound(
cc->InputTimestamp().NextAllowedInStream());
The function Timestamp::NextAllowedInStream
returns the successive timestamp.
For example, Timestamp(1).NextAllowedInStream() == Timestamp(2)
.
Propagating timestamp bounds
Calculators that will be used in realtime graphs need to define output timestamp
bounds based on input timestamp bounds in order to allow downstream calculators
to be scheduled promptly. A common pattern is for calculators to output packets
with the same timestamps as their input packets. In this case, simply outputting
a packet on every call to Calculator::Process
is sufficient to define output
timestamp bounds.
However, calculators are not required to follow this common pattern for output timestamps, they are only required to choose monotonically increasing output timestamps. As a result, certain calculators must calculate timestamp bounds explicitly. MediaPipe provides several tools for computing appropriate timestamp bound for each calculator.
1. SetNextTimestampBound() can be used to specify the timestamp bound, t + 1
, for an output stream.
cc->Outputs.Tag("OUT").SetNextTimestampBound(t.NextAllowedInStream());
Alternatively, an empty packet with timestamp t
can be produced to specify the
timestamp bound t + 1
.
cc->Outputs.Tag("OUT").Add(Packet(), t);
The timestamp bound of an input stream is indicated by the packet or the empty packet on the input stream.
Timestamp bound = cc->Inputs().Tag("IN").Value().Timestamp();
2. TimestampOffset() can be specified in order to automatically copy the timestamp bound from input streams to output streams.
cc->SetTimestampOffset(0);
This setting has the advantage of propagating timestamp bounds automatically, even when only timestamp bounds arrive and Calculator::Process is not invoked.
3. ProcessTimestampBounds() can be specified in order to invoke
Calculator::Process
for each new "settled timestamp", where the "settled
timestamp" is the new highest timestamp below the current timestamp bounds.
Without ProcessTimestampBounds()
, Calculator::Process
is invoked only with
one or more arriving packets.
cc->SetProcessTimestampBounds(true);
This setting allows a calculator to perform its own timestamp bounds calculation
and propagation, even when only input timestamps are updated. It can be used to
replicate the effect of TimestampOffset()
, but it can also be used to
calculate a timestamp bound that takes into account additional factors.
For example, in order to replicate SetTimestampOffset(0)
, a calculator could
do the following:
absl::Status Open(CalculatorContext* cc) {
cc->SetProcessTimestampBounds(true);
}
absl::Status Process(CalculatorContext* cc) {
cc->Outputs.Tag("OUT").SetNextTimestampBound(
cc->InputTimestamp().NextAllowedInStream());
}
Scheduling of Calculator::Open and Calculator::Close
Calculator::Open
is invoked when all required input side-packets have been
produced. Input side-packets can be provided by the enclosing application or by
"side-packet calculators" inside the graph. Side-packets can be specified from
outside the graph using the API's CalculatorGraph::Initialize
and
CalculatorGraph::StartRun
. Side packets can be specified by calculators within
the graph using CalculatorGraphConfig::OutputSidePackets
and
OutputSidePacket::Set
.
Calculator::Close is invoked when all of the input streams have become Done
by
being closed or reaching timestamp bound Timestamp::Done
.
Note: If the graph finishes all pending calculator execution and becomes
Done
, before some streams become Done
, then MediaPipe will invoke the
remaining calls to Calculator::Close
, so that every calculator can produce its
final outputs.
The use of TimestampOffset
has some implications for Calculator::Close
. A
calculator specifying SetTimestampOffset(0)
will by design signal that all of
its output streams have reached Timestamp::Done
when all of its input streams
have reached Timestamp::Done
, and therefore no further outputs are possible.
This prevents such a calculator from emitting any packets during
Calculator::Close
. If a calculator needs to produce a summary packet during
Calculator::Close
, Calculator::Process
must specify timestamp bounds such
that at least one timestamp (such as Timestamp::Max
) remains available during
Calculator::Close
. This means that such a calculator normally cannot rely upon
SetTimestampOffset(0)
and must instead specify timestamp bounds explicitly
using SetNextTimestampBounds()
.