mediapipe/mediapipe/framework/calculator_graph.h
MediaPipe Team 05b505c8e2 Introduce api to disable service default initialization.
PiperOrigin-RevId: 515501608
2023-03-09 18:57:39 -08:00

719 lines
31 KiB
C++

// Copyright 2019 The MediaPipe Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Declares CalculatorGraph, which links Calculators into a directed acyclic
// graph, and allows its evaluation.
#ifndef MEDIAPIPE_FRAMEWORK_CALCULATOR_GRAPH_H_
#define MEDIAPIPE_FRAMEWORK_CALCULATOR_GRAPH_H_
#include <atomic>
#include <functional>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/base/macros.h"
#include "absl/container/fixed_array.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/synchronization/mutex.h"
#include "mediapipe/framework/calculator.pb.h"
#include "mediapipe/framework/calculator_base.h"
#include "mediapipe/framework/calculator_node.h"
#include "mediapipe/framework/counter_factory.h"
#include "mediapipe/framework/executor.h"
#include "mediapipe/framework/graph_output_stream.h"
#include "mediapipe/framework/graph_service.h"
#include "mediapipe/framework/graph_service_manager.h"
#include "mediapipe/framework/mediapipe_profiling.h"
#include "mediapipe/framework/output_side_packet_impl.h"
#include "mediapipe/framework/output_stream.h"
#include "mediapipe/framework/output_stream_manager.h"
#include "mediapipe/framework/output_stream_poller.h"
#include "mediapipe/framework/output_stream_shard.h"
#include "mediapipe/framework/packet.h"
#include "mediapipe/framework/packet_generator.pb.h"
#include "mediapipe/framework/packet_generator_graph.h"
#include "mediapipe/framework/port.h"
#include "mediapipe/framework/port/integral_types.h"
#include "mediapipe/framework/port/status.h"
#include "mediapipe/framework/scheduler.h"
#include "mediapipe/framework/thread_pool_executor.pb.h"
namespace mediapipe {
#if !MEDIAPIPE_DISABLE_GPU
class GpuResources;
struct GpuSharedData;
#endif // !MEDIAPIPE_DISABLE_GPU
typedef absl::StatusOr<OutputStreamPoller> StatusOrPoller;
// The class representing a DAG of calculator nodes.
//
// CalculatorGraph is the primary API for the MediaPipe Framework.
// In general, CalculatorGraph should be used if the only thing you need
// to do is run the graph (without pushing data in or extracting it as
// the graph runs).
//
// Example:
// // Build dependency "//mediapipe/framework:calculator_framework".
//
// #include "mediapipe/framework/calculator_framework.h"
//
// mediapipe::CalculatorGraphConfig config;
// MP_RETURN_IF_ERROR(mediapipe::tool::ParseGraphFromString(kGraphStr,
// &config)); mediapipe::CalculatorGraph graph;
// MP_RETURN_IF_ERROR(graph.Initialize(config));
//
// std::map<std::string, mediapipe::Packet> extra_side_packets;
// extra_side_packets["video_id"] = mediapipe::MakePacket<std::string>(
// "3edb9503834e9b42");
// MP_RETURN_IF_ERROR(graph.Run(extra_side_packets));
//
// // Run again (demonstrating the more concise initializer list syntax).
// MP_RETURN_IF_ERROR(graph.Run(
// {{"video_id", mediapipe::MakePacket<std::string>("Ex-uGhDzue4")}}));
// // See mediapipe/framework/graph_runner.h for an interface
// // to insert and extract packets from a graph as it runs.
// // Once it is done using the graph, close its streams and wait till done.
// MP_RETURN_IF_ERROR(graph->CloseAllInputStreams());
// MP_RETURN_IF_ERROR(graph->WaitUntilDone());
class CalculatorGraph {
public:
// Defines possible modes for adding a packet to a graph input stream.
// WAIT_TILL_NOT_FULL can be used to control the memory usage of a graph by
// avoiding adding a new packet until all dependent input streams fall below
// the maximum queue size specified in the graph configuration.
// ADD_IF_NOT_FULL could also be used to control the latency if used in a
// real-time graph (e.g. drop camera frames if the MediaPipe graph queues are
// full).
enum class GraphInputStreamAddMode {
// Blocks and waits until none of the affected streams
// are full. Note that if max_queue_size is set to -1, the packet will be
// added regardless of queue size.
WAIT_TILL_NOT_FULL,
// Returns and does not add packet if any affected input
// stream is full.
ADD_IF_NOT_FULL
};
// Creates an uninitialized graph.
CalculatorGraph();
CalculatorGraph(const CalculatorGraph&) = delete;
CalculatorGraph& operator=(const CalculatorGraph&) = delete;
// Initializes the graph from its proto description (using Initialize())
// and crashes if something goes wrong.
explicit CalculatorGraph(CalculatorGraphConfig config);
virtual ~CalculatorGraph();
// Initializes the graph from a its proto description.
// side_packets that are provided at this stage are common across all Run()
// invocations and could be used to execute PacketGenerators immediately.
absl::Status Initialize(CalculatorGraphConfig config,
const std::map<std::string, Packet>& side_packets);
// Convenience version which does not take side packets.
absl::Status Initialize(CalculatorGraphConfig config);
// Initializes the CalculatorGraph from the specified graph and subgraph
// configs. Template graph and subgraph configs can be specified through
// |input_templates|. Every subgraph must have its graph type specified in
// CalclatorGraphConfig.type. A subgraph can be instantiated directly by
// specifying its type in |graph_type|. A template graph can be instantiated
// directly by specifying its template arguments in |options|.
absl::Status Initialize(
const std::vector<CalculatorGraphConfig>& configs,
const std::vector<CalculatorGraphTemplate>& templates,
const std::map<std::string, Packet>& side_packets = {},
const std::string& graph_type = "",
const Subgraph::SubgraphOptions* options = nullptr);
// Returns the canonicalized CalculatorGraphConfig for this graph.
const CalculatorGraphConfig& Config() const {
return validated_graph_->Config();
}
// Observes the named output stream. packet_callback will be invoked on every
// packet emitted by the output stream. Can only be called before Run() or
// StartRun(). It is possible for packet_callback to be called until the
// object is destroyed, even if e.g. Cancel() or WaitUntilDone() have already
// been called. After this object is destroyed so is packet_callback.
// TODO: Rename to AddOutputStreamCallback.
absl::Status ObserveOutputStream(
const std::string& stream_name,
std::function<absl::Status(const Packet&)> packet_callback,
bool observe_timestamp_bounds = false);
// Adds an OutputStreamPoller for a stream. This provides a synchronous,
// polling API for accessing a stream's output. Should only be called before
// Run() or StartRun(). For asynchronous output, use ObserveOutputStream. See
// also the helpers in tool/sink.h.
StatusOrPoller AddOutputStreamPoller(const std::string& stream_name,
bool observe_timestamp_bounds = false);
// Gets output side packet by name. The output side packet can be successfully
// retrevied in one of the following situations:
// - The graph is done.
// - The output side packet has been generated by a calculator and the graph
// is currently idle.
// - The side packet is a base packet generated by a PacketGenerator.
// Returns error if the the output side packet is not found or empty.
absl::StatusOr<Packet> GetOutputSidePacket(const std::string& packet_name);
// Runs the graph after adding the given extra input side packets. All
// arguments are forgotten after Run() returns.
// Run() is a blocking call and will return when all calculators are done.
virtual absl::Status Run(
const std::map<std::string, Packet>& extra_side_packets);
// Run the graph without adding any input side packets.
absl::Status Run() { return Run({}); }
// Start a run of the graph. StartRun, WaitUntilDone, Cancel, HasError,
// AddPacketToInputStream, and CloseInputStream allow more control over
// the execution of the graph run. You can insert packets directly into
// a stream while the graph is running. Once StartRun has been called,
// the graph will continue to run until all work is either done or canceled,
// meaning that either WaitUntilDone() or Cancel() has been called and has
// completed. If StartRun returns an error, then the graph is not started and
// a subsequent call to StartRun can be attempted.
//
// Example:
// MP_RETURN_IF_ERROR(graph.StartRun(...));
// while (true) {
// if (graph.HasError() || want_to_stop) break;
// MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(...));
// }
// for (const std::string& stream : streams) {
// MP_RETURN_IF_ERROR(graph.CloseInputStream(stream));
// }
// MP_RETURN_IF_ERROR(graph.WaitUntilDone());
absl::Status StartRun(
const std::map<std::string, Packet>& extra_side_packets) {
return StartRun(extra_side_packets, {});
}
// In addition to the above StartRun, add additional parameter to set the
// stream header before running.
// Note: We highly discourage the use of stream headers, this is added for the
// compatibility of existing calculators that use headers during Open().
absl::Status StartRun(const std::map<std::string, Packet>& extra_side_packets,
const std::map<std::string, Packet>& stream_headers);
// Wait for the current run to finish (block the current thread
// until all source calculators have returned StatusStop(), all
// graph_input_streams_ have been closed, and no more calculators can
// be run). This function can be called only after StartRun(). If you want to
// stop the run quickly, without waiting for all the work in progress to
// finish, see Cancel(). The graph cannot be destroyed until all work is
// either done or canceled, meaning that either WaitUntilDone() or Cancel()
// has been called and completed.
absl::Status WaitUntilDone();
// Wait until the running graph is in the idle mode, which is when nothing can
// be scheduled and nothing is running in the worker threads. This function
// can be called only after StartRun().
// NOTE: The graph must not have any source nodes because source nodes prevent
// the running graph from becoming idle until the source nodes are done.
absl::Status WaitUntilIdle();
// Wait until a packet is emitted on one of the observed output streams.
// Returns immediately if a packet has already been emitted since the last
// call to this function.
// Returns OutOfRangeError if the graph terminated while waiting.
absl::Status WaitForObservedOutput();
// Quick non-locking means of checking if the graph has encountered an error.
bool HasError() const { return has_error_; }
// Add a Packet to a graph input stream based on the graph input stream add
// mode. If the mode is ADD_IF_NOT_FULL, the packet will not be added if any
// queue exceeds max_queue_size specified by the graph config and will return
// StatusUnavailable. The WAIT_TILL_NOT_FULL mode (default) will block until
// the queues fall below the max_queue_size before adding the packet. If the
// mode is max_queue_size is -1, then the packet is added regardless of the
// sizes of the queues in the graph. The input stream must have been specified
// in the configuration as a graph level input_stream. On error, nothing is
// added.
absl::Status AddPacketToInputStream(const std::string& stream_name,
const Packet& packet);
// Same as the l-value version of this function by the same name, but moves
// the r-value referenced packet into the stream instead of copying it over.
// This allows the graph to take exclusive ownership of the packet, which may
// allow more memory optimizations. Note that, if an error is returned, the
// packet may remain valid. In particular, when using the ADD_IF_NOT_FULL
// mode with a full queue, this will return StatusUnavailable and the caller
// may try adding the packet again later.
absl::Status AddPacketToInputStream(const std::string& stream_name,
Packet&& packet);
// Indicates that input will arrive no earlier than a certain timestamp.
absl::Status SetInputStreamTimestampBound(const std::string& stream_name,
Timestamp timestamp);
// Sets the queue size of a graph input stream, overriding the graph default.
absl::Status SetInputStreamMaxQueueSize(const std::string& stream_name,
int max_queue_size);
// Check if an input stream exists in the graph
bool HasInputStream(const std::string& name);
// Close a graph input stream. If the graph has any graph input streams
// then Run() will not return until all the graph input streams have
// been closed (and all packets propagate through the graph).
// Note that multiple threads cannot call CloseInputStream() on the same
// stream_name at the same time.
absl::Status CloseInputStream(const std::string& stream_name);
// Closes all the graph input streams.
absl::Status CloseAllInputStreams();
// Closes all the graph input streams and source calculator nodes.
absl::Status CloseAllPacketSources();
// Returns the pointer to the stream with the given name, or dies if none
// exists. The result remains owned by the CalculatorGraph.
ABSL_DEPRECATED(
"Prefer using a Calculator to get information of all sorts out of the "
"graph.")
const OutputStreamManager* FindOutputStreamManager(const std::string& name);
// Returns the ProfilingContext assocoaited with the CalculatorGraph.
ProfilingContext* profiler() { return profiler_.get(); }
// Collects the runtime profile for Open(), Process(), and Close() of each
// calculator in the graph. May be called at any time after the graph has been
// initialized.
ABSL_DEPRECATED("Use profiler()->GetCalculatorProfiles() instead")
absl::Status GetCalculatorProfiles(std::vector<CalculatorProfile>*) const;
// Set the type of counter used in this graph.
void SetCounterFactory(CounterFactory* factory) {
counter_factory_.reset(factory);
}
CounterFactory* GetCounterFactory() { return counter_factory_.get(); }
// Callback when an error is encountered.
// Adds the error to the vector of errors.
void RecordError(const absl::Status& error) ABSL_LOCKS_EXCLUDED(error_mutex_);
// Combines errors into a status. Returns true if the vector of errors is
// non-empty.
bool GetCombinedErrors(const std::string& error_prefix,
absl::Status* error_status);
// Convenience overload which specifies a default error prefix.
bool GetCombinedErrors(absl::Status* error_status);
// Returns the maximum input stream queue size.
int GetMaxInputStreamQueueSize();
// Get the mode for adding packets to an input stream.
GraphInputStreamAddMode GetGraphInputStreamAddMode() const;
// Set the mode for adding packets to an input stream.
void SetGraphInputStreamAddMode(GraphInputStreamAddMode mode);
// Aborts the scheduler if the graph is not terminated; no-op otherwise. Does
// not wait for all work in progress to finish. To stop the run and wait for
// work in progress to finish, see CloseAllInputStreams() and WaitUntilDone().
// The graph cannot be destroyed until all work is either done or canceled,
// meaning that either WaitUntilDone() or Cancel() has been called and
// completed.
void Cancel();
// Pauses the scheduler. Only used by calculator graph testing.
ABSL_DEPRECATED(
"CalculatorGraph will not allow external callers to explictly pause and "
"resume a graph.")
void Pause();
// Resumes the scheduler. Only used by calculator graph testing.
ABSL_DEPRECATED(
"CalculatorGraph will not allow external callers to explictly pause and "
"resume a graph.")
void Resume();
// Sets the executor that will run the nodes assigned to the executor
// named |name|. If |name| is empty, this sets the default executor. Must
// be called before the graph is initialized.
absl::Status SetExecutor(const std::string& name,
std::shared_ptr<Executor> executor);
// WARNING: the following public methods are exposed to Scheduler only.
// Return true if all the graph input streams have been closed.
bool GraphInputStreamsClosed() {
return num_closed_graph_input_streams_ == graph_input_streams_.size();
}
// Returns true if this node or graph input stream is connected to
// any input stream whose queue has hit maximum capacity.
bool IsNodeThrottled(int node_id)
ABSL_LOCKS_EXCLUDED(full_input_streams_mutex_);
// If any active source node or graph input stream is throttled and not yet
// closed, increases the max_queue_size for each full input stream in the
// graph.
// Returns true if at least one max_queue_size has been grown.
bool UnthrottleSources() ABSL_LOCKS_EXCLUDED(full_input_streams_mutex_);
// Returns the scheduler's runtime measures for overhead measurement.
// Only meant for test purposes.
internal::SchedulerTimes GetSchedulerTimes() {
return scheduler_.GetSchedulerTimes();
}
#if !MEDIAPIPE_DISABLE_GPU
// Returns a pointer to the GpuResources in use, if any.
// Only meant for internal use.
std::shared_ptr<GpuResources> GetGpuResources() const;
absl::Status SetGpuResources(std::shared_ptr<GpuResources> resources);
#endif // !MEDIAPIPE_DISABLE_GPU
// Sets a service object, essentially a graph-level singleton, which can be
// accessed by calculators and subgraphs without requiring an explicit
// connection.
//
// NOTE: must be called before `Initialize`, so subgraphs can access services
// as well, as graph expansion happens during initialization.
template <typename T>
absl::Status SetServiceObject(const GraphService<T>& service,
std::shared_ptr<T> object) {
// TODO: check that the graph has not been started!
return service_manager_.SetServiceObject(service, object);
}
template <typename T>
std::shared_ptr<T> GetServiceObject(const GraphService<T>& service) {
return service_manager_.GetServiceObject(service);
}
// Disallows/disables default initialization of MediaPipe graph services.
//
// IMPORTANT: MediaPipe graph serices, essentially a graph-level singletons,
// are designed in the way, so they may provide default initialization. For
// example, this allows to run OpenGL processing wihtin the graph without
// provinging a praticular OpenGL context as it can be provided by
// default-initializable `kGpuService`. (One caveat here, you may still need
// to initialize it manually to share graph context with external context.)
//
// Even if calculators require some service optionally
// (`calculator_contract->UseService(kSomeService).Optional()`), it will be
// still initialized if it allows default initialization.
//
// So far, in rare cases, this may be unwanted and strict control of what
// services are allowed in the graph can be achieved by calling this method,
// following `SetServiceObject` call for services which are allowed in the
// graph.
//
// Recommendation: do not use unless you have to (for example, default
// initialization has side effects)
//
// NOTE: must be called before `StartRun`/`Run`, where services are checked
// and can be default-initialized.
absl::Status DisallowServiceDefaultInitialization() {
allow_service_default_initialization_ = false;
return absl::OkStatus();
}
// Sets a service object, essentially a graph-level singleton, which can be
// accessed by calculators and subgraphs without requiring an explicit
// connection.
//
// NOTE: must be called before `Initialize`, so subgraphs can access services
// as well, as graph expansion happens during initialization.
//
// Only the Java API should call this directly.
absl::Status SetServicePacket(const GraphServiceBase& service, Packet p) {
// TODO: check that the graph has not been started!
return service_manager_.SetServicePacket(service, p);
}
private:
// GraphRunState is used as a parameter in the function CallStatusHandlers.
enum class GraphRunState {
// State of the graph before the run; see status_handler.h for details.
PRE_RUN,
// State of the graph after after the run; set by CleanUpAfterRun.
POST_RUN,
};
// The graph input streams (which have packets added to them from
// outside the graph). Since these will be connected directly to a
// node's input streams they are implemented as "output" streams.
// Based on the assumption that all the graph input packets must be added to a
// graph input stream sequentially, a GraphInputStream object only contains
// one reusable output stream shard.
class GraphInputStream {
public:
explicit GraphInputStream(OutputStreamManager* manager)
: manager_(manager) {
shard_.SetSpec(manager_->Spec());
}
void PrepareForRun(std::function<void(absl::Status)> error_callback) {
manager_->PrepareForRun(std::move(error_callback));
}
void SetMaxQueueSize(int max_queue_size) {
manager_->SetMaxQueueSize(max_queue_size);
}
void SetHeader(const Packet& header);
void AddPacket(const Packet& packet) { shard_.AddPacket(packet); }
void AddPacket(Packet&& packet) { shard_.AddPacket(std::move(packet)); }
void SetNextTimestampBound(Timestamp timestamp);
void PropagateUpdatesToMirrors();
void Close();
bool IsClosed() const { return manager_->IsClosed(); }
OutputStreamManager* GetManager() { return manager_; }
private:
OutputStreamManager* manager_ = nullptr;
OutputStreamShard shard_;
};
// Initializes the graph from a ValidatedGraphConfig object.
absl::Status Initialize(std::unique_ptr<ValidatedGraphConfig> validated_graph,
const std::map<std::string, Packet>& side_packets);
// AddPacketToInputStreamInternal template is called by either
// AddPacketToInputStream(Packet&& packet) or
// AddPacketToInputStream(const Packet& packet).
template <typename T>
absl::Status AddPacketToInputStreamInternal(const std::string& stream_name,
T&& packet);
// Sets the executor that will run the nodes assigned to the executor
// named |name|. If |name| is empty, this sets the default executor.
// Does not check that the graph is uninitialized and |name| is not a
// reserved executor name.
absl::Status SetExecutorInternal(const std::string& name,
std::shared_ptr<Executor> executor);
// If the num_threads field in default_executor_options is not specified,
// assigns a reasonable value based on system configuration and the graph.
// Then, creates the default thread pool if appropriate.
//
// Only called by InitializeExecutors().
absl::Status InitializeDefaultExecutor(
const ThreadPoolExecutorOptions* default_executor_options,
bool use_application_thread);
// Creates a thread pool as the default executor. The num_threads argument
// overrides the num_threads field in default_executor_options.
absl::Status CreateDefaultThreadPool(
const ThreadPoolExecutorOptions* default_executor_options,
int num_threads);
// Returns true if |name| is a reserved executor name.
static bool IsReservedExecutorName(const std::string& name);
// Helper functions for Initialize().
absl::Status InitializeExecutors();
absl::Status InitializePacketGeneratorGraph(
const std::map<std::string, Packet>& side_packets);
absl::Status InitializeStreams();
absl::Status InitializeProfiler();
absl::Status InitializeCalculatorNodes();
absl::Status InitializePacketGeneratorNodes(
const std::vector<int>& non_scheduled_generators);
// Iterates through all nodes and schedules any that can be opened.
void ScheduleAllOpenableNodes();
// Does the bulk of the work for StartRun but does not start the scheduler.
absl::Status PrepareForRun(
const std::map<std::string, Packet>& extra_side_packets,
const std::map<std::string, Packet>& stream_headers);
absl::Status PrepareServices();
#if !MEDIAPIPE_DISABLE_GPU
absl::Status MaybeSetUpGpuServiceFromLegacySidePacket(Packet legacy_sp);
// Helper for PrepareForRun. If it returns a non-empty map, those packets
// must be added to the existing side packets, replacing existing values
// that have the same key.
std::map<std::string, Packet> MaybeCreateLegacyGpuSidePacket(
Packet legacy_sp);
absl::Status PrepareGpu();
#endif // !MEDIAPIPE_DISABLE_GPU
// Cleans up any remaining state after the run and returns any errors that may
// have occurred during the run. Called after the scheduler has terminated.
absl::Status FinishRun();
// Cleans up any remaining state after the run. All status handlers run here
// if their requested input side packets exist.
// The original |*status| is passed to all the status handlers. If any status
// handler fails, it appends its error to errors_, and CleanupAfterRun sets
// |*status| to the new combined errors on return.
void CleanupAfterRun(absl::Status* status) ABSL_LOCKS_EXCLUDED(error_mutex_);
// Calls HandlePreRunStatus or HandleStatus on the StatusHandlers. Which one
// is called depends on the GraphRunState parameter (PRE_RUN or POST_RUN).
// current_run_side_packets_ must be set before this function is called.
// On error, has_error_ will be set.
void CallStatusHandlers(GraphRunState graph_run_state,
const absl::Status& status);
// Callback function to throttle or unthrottle source nodes when a stream
// becomes full or non-full. A node is throttled (i.e. prevented being
// scheduled) if it has caused a downstream input queue to become full. Note
// that all sources (including graph input streams) that affect this stream
// will be throttled. A node is unthrottled (i.e. added to the scheduler
// queue) if all downstream input queues have become non-full.
//
// This method is invoked from an input stream when its queue becomes full or
// non-full. However, since streams are not allowed to hold any locks while
// invoking a callback, this method must re-lock the stream and query its
// status before taking any action.
void UpdateThrottledNodes(InputStreamManager* stream, bool* stream_was_full);
#if !MEDIAPIPE_DISABLE_GPU
// Owns the legacy GpuSharedData if we need to create one for backwards
// compatibility.
std::unique_ptr<GpuSharedData> legacy_gpu_shared_;
#endif // !MEDIAPIPE_DISABLE_GPU
// True if the graph was initialized.
bool initialized_ = false;
// A packet type that has SetAny() called on it.
PacketType any_packet_type_;
// The ValidatedGraphConfig object defining this CalculatorGraph.
std::unique_ptr<ValidatedGraphConfig> validated_graph_;
// The PacketGeneratorGraph to use to generate all the input side packets.
PacketGeneratorGraph packet_generator_graph_;
// True if the graph has source nodes.
bool has_sources_ = false;
// A flat array of InputStreamManager/OutputStreamManager/
// OutputSidePacketImpl/CalculatorNode corresponding to the input/output
// stream indexes, output side packet indexes, and calculator indexes
// respectively in validated_graph_.
// Once allocated these structures must not be reallocated since
// internal structures may point to individual entries in the array.
std::unique_ptr<InputStreamManager[]> input_stream_managers_;
std::unique_ptr<OutputStreamManager[]> output_stream_managers_;
std::unique_ptr<OutputSidePacketImpl[]> output_side_packets_;
std::vector<std::unique_ptr<CalculatorNode>> nodes_;
bool packet_generator_nodes_added_ = false;
// The graph output streams.
std::vector<std::shared_ptr<internal::GraphOutputStream>>
graph_output_streams_;
// Maximum queue size for an input stream. This is used by the scheduler to
// restrict memory usage.
int max_queue_size_ = -1;
// Mode for adding packets to a graph input stream. Set to block until all
// affected input streams are not full by default.
GraphInputStreamAddMode graph_input_stream_add_mode_
ABSL_GUARDED_BY(full_input_streams_mutex_);
// For a source node or graph input stream (specified using id),
// this stores the set of dependent input streams that have hit their
// maximum capacity. Graph input streams are also treated as nodes.
// A node is scheduled only if this set is empty. Similarly, a packet
// is added to a graph input stream only if this set is empty.
// Note that this vector contains an unused entry for each non-source node.
std::vector<absl::flat_hash_set<InputStreamManager*>> full_input_streams_
ABSL_GUARDED_BY(full_input_streams_mutex_);
// Maps stream names to graph input stream objects.
absl::flat_hash_map<std::string, std::unique_ptr<GraphInputStream>>
graph_input_streams_;
// Maps graph input streams to their virtual node ids.
absl::flat_hash_map<std::string, int> graph_input_stream_node_ids_;
// Maps graph input streams to their max queue size.
absl::flat_hash_map<std::string, int> graph_input_stream_max_queue_size_;
// The factory for making counters associated with this graph.
std::unique_ptr<CounterFactory> counter_factory_;
// Executors for the scheduler, keyed by the executor's name. The default
// executor's name is the empty string.
std::map<std::string, std::shared_ptr<Executor>> executors_;
// The processed input side packet map for this run.
std::map<std::string, Packet> current_run_side_packets_;
// Object to manage graph services.
GraphServiceManager service_manager_;
// Indicates whether service default initialization is allowed.
bool allow_service_default_initialization_ = true;
// Vector of errors encountered while running graph. Always use RecordError()
// to add an error to this vector.
std::vector<absl::Status> errors_ ABSL_GUARDED_BY(error_mutex_);
// True if the default executor uses the application thread.
bool use_application_thread_ = false;
// Condition variable that waits until all input streams that depend on a
// graph input stream are below the maximum queue size.
absl::CondVar wait_to_add_packet_cond_var_
ABSL_GUARDED_BY(full_input_streams_mutex_);
// Mutex for the vector of errors.
absl::Mutex error_mutex_;
// Status variable to indicate if the graph has encountered an error.
std::atomic<bool> has_error_;
// Mutex for full_input_streams_.
mutable absl::Mutex full_input_streams_mutex_;
// Number of closed graph input streams. This is a separate variable because
// it is not safe to hold a lock on the scheduler while calling Close() on an
// input stream. Hence, we decouple the closing of the stream and checking its
// status.
// TODO: update this comment.
std::atomic<unsigned int> num_closed_graph_input_streams_;
// The graph tracing and profiling interface. It is owned by the
// CalculatorGraph using a shared_ptr in order to allow threadsafe access
// to the ProfilingContext from clients that may outlive the CalculatorGraph
// such as GlContext. It is declared here before the Scheduler so that it
// remains available during the Scheduler destructor.
std::shared_ptr<ProfilingContext> profiler_;
internal::Scheduler scheduler_;
};
} // namespace mediapipe
#endif // MEDIAPIPE_FRAMEWORK_CALCULATOR_GRAPH_H_