Adding SwitchContainer option "tick_input_stream" to specify which input streams indicate the output timestamps to expect from the contained graphs.

PiperOrigin-RevId: 484615286
This commit is contained in:
Hadon Nash 2022-10-28 14:02:07 -07:00 committed by Copybara-Service
parent e16be2e8fa
commit 313b387ced
2 changed files with 24 additions and 4 deletions

View File

@ -239,6 +239,15 @@ bool HasTag(const proto_ns::RepeatedPtrField<std::string>& streams,
return tags.count({tag, 0}) > 0;
}
// Returns true if a set of "TAG::index" includes a TagIndex.
bool ContainsTag(const proto_ns::RepeatedPtrField<std::string>& tags,
TagIndex item) {
for (const std::string& t : tags) {
if (ParseTagIndex(t) == item) return true;
}
return false;
}
absl::StatusOr<CalculatorGraphConfig> SwitchContainer::GetConfig(
const Subgraph::SubgraphOptions& options) {
CalculatorGraphConfig config;
@ -263,9 +272,9 @@ absl::StatusOr<CalculatorGraphConfig> SwitchContainer::GetConfig(
std::string enable_stream = "ENABLE:gate_enable";
// Add a PacketSequencerCalculator node for "SELECT" or "ENABLE" streams.
bool async_selection =
Subgraph::GetOptions<mediapipe::SwitchContainerOptions>(options)
.async_selection();
const auto& switch_options =
Subgraph::GetOptions<mediapipe::SwitchContainerOptions>(options);
bool async_selection = switch_options.async_selection();
if (HasTag(container_node.input_stream(), "SELECT")) {
select_node = BuildTimestampNode(&config, async_selection);
select_node->add_input_stream("INPUT:gate_select");
@ -296,7 +305,7 @@ absl::StatusOr<CalculatorGraphConfig> SwitchContainer::GetConfig(
mux->add_input_side_packet("SELECT:gate_select");
mux->add_input_side_packet("ENABLE:gate_enable");
// Add input streams for graph and demux and the timestamper.
// Add input streams for graph and demux.
config.add_input_stream("SELECT:gate_select");
config.add_input_stream("ENABLE:gate_enable");
config.add_input_side_packet("SELECT:gate_select");
@ -306,6 +315,12 @@ absl::StatusOr<CalculatorGraphConfig> SwitchContainer::GetConfig(
std::string stream = CatStream(p.first, p.second);
config.add_input_stream(stream);
demux->add_input_stream(stream);
}
// Add input streams for the timestamper.
auto& tick_streams = switch_options.tick_input_stream();
for (const auto& p : input_tags) {
if (!tick_streams.empty() && !ContainsTag(tick_streams, p.first)) continue;
TagIndex tick_tag{"TICK", tick_index++};
if (select_node) {
select_node->add_input_stream(CatStream(tick_tag, p.second));

View File

@ -30,4 +30,9 @@ message SwitchContainerOptions {
// Use ImmediateInputStreamHandler for channel selection.
optional bool async_selection = 6;
// Specifies an input stream, "TAG:index", that defines the processed
// timestamps. SwitchContainer awaits output at the last processed
// timestamp before advancing from one selected channel to the next.
repeated string tick_input_stream = 7;
}