// Copyright 2019 The MediaPipe Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // Declares CalculatorGraph, which links Calculators into a directed acyclic // graph, and allows its evaluation. #ifndef MEDIAPIPE_FRAMEWORK_CALCULATOR_GRAPH_H_ #define MEDIAPIPE_FRAMEWORK_CALCULATOR_GRAPH_H_ #include #include #include #include #include #include #include #include "absl/base/macros.h" #include "absl/container/fixed_array.h" #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" #include "absl/synchronization/mutex.h" #include "mediapipe/framework/calculator.pb.h" #include "mediapipe/framework/calculator_base.h" #include "mediapipe/framework/calculator_node.h" #include "mediapipe/framework/counter_factory.h" #include "mediapipe/framework/executor.h" #include "mediapipe/framework/graph_output_stream.h" #include "mediapipe/framework/graph_service.h" #include "mediapipe/framework/graph_service_manager.h" #include "mediapipe/framework/mediapipe_profiling.h" #include "mediapipe/framework/output_side_packet_impl.h" #include "mediapipe/framework/output_stream.h" #include "mediapipe/framework/output_stream_manager.h" #include "mediapipe/framework/output_stream_poller.h" #include "mediapipe/framework/output_stream_shard.h" #include "mediapipe/framework/packet.h" #include "mediapipe/framework/packet_generator.pb.h" #include "mediapipe/framework/packet_generator_graph.h" #include "mediapipe/framework/port.h" #include "mediapipe/framework/port/integral_types.h" #include "mediapipe/framework/port/status.h" #include "mediapipe/framework/scheduler.h" #include "mediapipe/framework/thread_pool_executor.pb.h" #include "mediapipe/gpu/gpu_service.h" namespace mediapipe { typedef absl::StatusOr StatusOrPoller; // The class representing a DAG of calculator nodes. // // CalculatorGraph is the primary API for the MediaPipe Framework. // In general, CalculatorGraph should be used if the only thing you need // to do is run the graph (without pushing data in or extracting it as // the graph runs). // // Example: // // Build dependency "//mediapipe/framework:calculator_framework". // // #include "mediapipe/framework/calculator_framework.h" // // mediapipe::CalculatorGraphConfig config; // MP_RETURN_IF_ERROR(mediapipe::tool::ParseGraphFromString(kGraphStr, // &config)); mediapipe::CalculatorGraph graph; // MP_RETURN_IF_ERROR(graph.Initialize(config)); // // std::map extra_side_packets; // extra_side_packets["video_id"] = mediapipe::MakePacket( // "3edb9503834e9b42"); // MP_RETURN_IF_ERROR(graph.Run(extra_side_packets)); // // // Run again (demonstrating the more concise initializer list syntax). // MP_RETURN_IF_ERROR(graph.Run( // {{"video_id", mediapipe::MakePacket("Ex-uGhDzue4")}})); // // See mediapipe/framework/graph_runner.h for an interface // // to insert and extract packets from a graph as it runs. // // Once it is done using the graph, close its streams and wait till done. // MP_RETURN_IF_ERROR(graph->CloseAllInputStreams()); // MP_RETURN_IF_ERROR(graph->WaitUntilDone()); class CalculatorGraph { public: // Defines possible modes for adding a packet to a graph input stream. // WAIT_TILL_NOT_FULL can be used to control the memory usage of a graph by // avoiding adding a new packet until all dependent input streams fall below // the maximum queue size specified in the graph configuration. // ADD_IF_NOT_FULL could also be used to control the latency if used in a // real-time graph (e.g. drop camera frames if the MediaPipe graph queues are // full). enum class GraphInputStreamAddMode { // Blocks and waits until none of the affected streams // are full. Note that if max_queue_size is set to -1, the packet will be // added regardless of queue size. WAIT_TILL_NOT_FULL, // Returns and does not add packet if any affected input // stream is full. ADD_IF_NOT_FULL }; // Creates an uninitialized graph. CalculatorGraph(); CalculatorGraph(const CalculatorGraph&) = delete; CalculatorGraph& operator=(const CalculatorGraph&) = delete; // Initializes the graph from its proto description (using Initialize()) // and crashes if something goes wrong. explicit CalculatorGraph(CalculatorGraphConfig config); virtual ~CalculatorGraph(); // Initializes the graph from a its proto description. // side_packets that are provided at this stage are common across all Run() // invocations and could be used to execute PacketGenerators immediately. absl::Status Initialize(CalculatorGraphConfig config, const std::map& side_packets); // Convenience version which does not take side packets. absl::Status Initialize(CalculatorGraphConfig config); // Initializes the CalculatorGraph from the specified graph and subgraph // configs. Template graph and subgraph configs can be specified through // |input_templates|. Every subgraph must have its graph type specified in // CalclatorGraphConfig.type. A subgraph can be instantiated directly by // specifying its type in |graph_type|. A template graph can be instantiated // directly by specifying its template arguments in |options|. absl::Status Initialize( const std::vector& configs, const std::vector& templates, const std::map& side_packets = {}, const std::string& graph_type = "", const Subgraph::SubgraphOptions* options = nullptr); // Returns the canonicalized CalculatorGraphConfig for this graph. const CalculatorGraphConfig& Config() const { return validated_graph_->Config(); } // Observes the named output stream. packet_callback will be invoked on every // packet emitted by the output stream. Can only be called before Run() or // StartRun(). // TODO: Rename to AddOutputStreamCallback. absl::Status ObserveOutputStream( const std::string& stream_name, std::function packet_callback, bool observe_timestamp_bounds = false); // Adds an OutputStreamPoller for a stream. This provides a synchronous, // polling API for accessing a stream's output. Should only be called before // Run() or StartRun(). For asynchronous output, use ObserveOutputStream. See // also the helpers in tool/sink.h. StatusOrPoller AddOutputStreamPoller(const std::string& stream_name, bool observe_timestamp_bounds = false); // Gets output side packet by name. The output side packet can be successfully // retrevied in one of the following situations: // - The graph is done. // - The output side packet has been generated by a calculator and the graph // is currently idle. // - The side packet is a base packet generated by a PacketGenerator. // Returns error if the the output side packet is not found or empty. absl::StatusOr GetOutputSidePacket(const std::string& packet_name); // Runs the graph after adding the given extra input side packets. All // arguments are forgotten after Run() returns. // Run() is a blocking call and will return when all calculators are done. virtual absl::Status Run( const std::map& extra_side_packets); // Run the graph without adding any input side packets. absl::Status Run() { return Run({}); } // Start a run of the graph. StartRun, WaitUntilDone, HasError, // AddPacketToInputStream, and CloseInputStream allow more control over // the execution of the graph run. You can insert packets directly into // a stream while the graph is running. Once StartRun has been called, // the graph will continue to run until WaitUntilDone() is called. // If StartRun returns an error, then the graph is not started and a // subsequent call to StartRun can be attempted. // // Example: // MP_RETURN_IF_ERROR(graph.StartRun(...)); // while (true) { // if (graph.HasError() || want_to_stop) break; // MP_RETURN_IF_ERROR(graph.AddPacketToInputStream(...)); // } // for (const std::string& stream : streams) { // MP_RETURN_IF_ERROR(graph.CloseInputStream(stream)); // } // MP_RETURN_IF_ERROR(graph.WaitUntilDone()); absl::Status StartRun( const std::map& extra_side_packets) { return StartRun(extra_side_packets, {}); } // In addition to the above StartRun, add additional parameter to set the // stream header before running. // Note: We highly discourage the use of stream headers, this is added for the // compatibility of existing calculators that use headers during Open(). absl::Status StartRun(const std::map& extra_side_packets, const std::map& stream_headers); // Wait for the current run to finish (block the current thread // until all source calculators have returned StatusStop(), all // graph_input_streams_ have been closed, and no more calculators can // be run). This function can be called only after StartRun(). absl::Status WaitUntilDone(); // Wait until the running graph is in the idle mode, which is when nothing can // be scheduled and nothing is running in the worker threads. This function // can be called only after StartRun(). // NOTE: The graph must not have any source nodes because source nodes prevent // the running graph from becoming idle until the source nodes are done. absl::Status WaitUntilIdle(); // Wait until a packet is emitted on one of the observed output streams. // Returns immediately if a packet has already been emitted since the last // call to this function. // Returns OutOfRangeError if the graph terminated while waiting. absl::Status WaitForObservedOutput(); // Quick non-locking means of checking if the graph has encountered an error. bool HasError() const { return has_error_; } // Add a Packet to a graph input stream based on the graph input stream add // mode. If the mode is ADD_IF_NOT_FULL, the packet will not be added if any // queue exceeds max_queue_size specified by the graph config and will return // StatusUnavailable. The WAIT_TILL_NOT_FULL mode (default) will block until // the queues fall below the max_queue_size before adding the packet. If the // mode is max_queue_size is -1, then the packet is added regardless of the // sizes of the queues in the graph. The input stream must have been specified // in the configuration as a graph level input_stream. On error, nothing is // added. absl::Status AddPacketToInputStream(const std::string& stream_name, const Packet& packet); // Same as the l-value version of this function by the same name, but moves // the r-value referenced packet into the stream instead of copying it over. // This allows the graph to take exclusive ownership of the packet, which may // allow more memory optimizations. Note that, if an error is returned, the // packet may remain valid. In particular, when using the ADD_IF_NOT_FULL // mode with a full queue, this will return StatusUnavailable and the caller // may try adding the packet again later. absl::Status AddPacketToInputStream(const std::string& stream_name, Packet&& packet); // Indicates that input will arrive no earlier than a certain timestamp. absl::Status SetInputStreamTimestampBound(const std::string& stream_name, Timestamp timestamp); // Sets the queue size of a graph input stream, overriding the graph default. absl::Status SetInputStreamMaxQueueSize(const std::string& stream_name, int max_queue_size); // Check if an input stream exists in the graph bool HasInputStream(const std::string& name); // Close a graph input stream. If the graph has any graph input streams // then Run() will not return until all the graph input streams have // been closed (and all packets propagate through the graph). // Note that multiple threads cannot call CloseInputStream() on the same // stream_name at the same time. absl::Status CloseInputStream(const std::string& stream_name); // Closes all the graph input streams. absl::Status CloseAllInputStreams(); // Closes all the graph input streams and source calculator nodes. absl::Status CloseAllPacketSources(); // Returns the pointer to the stream with the given name, or dies if none // exists. The result remains owned by the CalculatorGraph. ABSL_DEPRECATED( "Prefer using a Calculator to get information of all sorts out of the " "graph.") const OutputStreamManager* FindOutputStreamManager(const std::string& name); // Returns the ProfilingContext assocoaited with the CalculatorGraph. ProfilingContext* profiler() { return profiler_.get(); } // Collects the runtime profile for Open(), Process(), and Close() of each // calculator in the graph. May be called at any time after the graph has been // initialized. ABSL_DEPRECATED("Use profiler()->GetCalculatorProfiles() instead") absl::Status GetCalculatorProfiles(std::vector*) const; // Set the type of counter used in this graph. void SetCounterFactory(CounterFactory* factory) { counter_factory_.reset(factory); } CounterFactory* GetCounterFactory() { return counter_factory_.get(); } // Callback when an error is encountered. // Adds the error to the vector of errors. void RecordError(const absl::Status& error) ABSL_LOCKS_EXCLUDED(error_mutex_); // Combines errors into a status. Returns true if the vector of errors is // non-empty. bool GetCombinedErrors(const std::string& error_prefix, absl::Status* error_status); // Convenience overload which specifies a default error prefix. bool GetCombinedErrors(absl::Status* error_status); // Returns the maximum input stream queue size. int GetMaxInputStreamQueueSize(); // Get the mode for adding packets to an input stream. GraphInputStreamAddMode GetGraphInputStreamAddMode() const; // Set the mode for adding packets to an input stream. void SetGraphInputStreamAddMode(GraphInputStreamAddMode mode); // Aborts the scheduler if the graph is not terminated; no-op otherwise. void Cancel(); // Pauses the scheduler. Only used by calculator graph testing. ABSL_DEPRECATED( "CalculatorGraph will not allow external callers to explictly pause and " "resume a graph.") void Pause(); // Resumes the scheduler. Only used by calculator graph testing. ABSL_DEPRECATED( "CalculatorGraph will not allow external callers to explictly pause and " "resume a graph.") void Resume(); // Sets the executor that will run the nodes assigned to the executor // named |name|. If |name| is empty, this sets the default executor. Must // be called before the graph is initialized. absl::Status SetExecutor(const std::string& name, std::shared_ptr executor); // WARNING: the following public methods are exposed to Scheduler only. // Return true if all the graph input streams have been closed. bool GraphInputStreamsClosed() { return num_closed_graph_input_streams_ == graph_input_streams_.size(); } // Returns true if this node or graph input stream is connected to // any input stream whose queue has hit maximum capacity. bool IsNodeThrottled(int node_id) ABSL_LOCKS_EXCLUDED(full_input_streams_mutex_); // If any active source node or graph input stream is throttled and not yet // closed, increases the max_queue_size for each full input stream in the // graph. // Returns true if at least one max_queue_size has been grown. bool UnthrottleSources() ABSL_LOCKS_EXCLUDED(full_input_streams_mutex_); // Returns the scheduler's runtime measures for overhead measurement. // Only meant for test purposes. internal::SchedulerTimes GetSchedulerTimes() { return scheduler_.GetSchedulerTimes(); } #if !MEDIAPIPE_DISABLE_GPU // Returns a pointer to the GpuResources in use, if any. // Only meant for internal use. std::shared_ptr GetGpuResources() const; absl::Status SetGpuResources(std::shared_ptr resources); #endif // !MEDIAPIPE_DISABLE_GPU template absl::Status SetServiceObject(const GraphService& service, std::shared_ptr object) { // TODO: check that the graph has not been started! return service_manager_.SetServiceObject(service, object); } template std::shared_ptr GetServiceObject(const GraphService& service) { return service_manager_.GetServiceObject(service); } // Only the Java API should call this directly. absl::Status SetServicePacket(const GraphServiceBase& service, Packet p) { // TODO: check that the graph has not been started! return service_manager_.SetServicePacket(service, p); } private: // GraphRunState is used as a parameter in the function CallStatusHandlers. enum class GraphRunState { // State of the graph before the run; see status_handler.h for details. PRE_RUN, // State of the graph after after the run; set by CleanUpAfterRun. POST_RUN, }; // The graph input streams (which have packets added to them from // outside the graph). Since these will be connected directly to a // node's input streams they are implemented as "output" streams. // Based on the assumption that all the graph input packets must be added to a // graph input stream sequentially, a GraphInputStream object only contains // one reusable output stream shard. class GraphInputStream { public: explicit GraphInputStream(OutputStreamManager* manager) : manager_(manager) { shard_.SetSpec(manager_->Spec()); } void PrepareForRun(std::function error_callback) { manager_->PrepareForRun(std::move(error_callback)); } void SetMaxQueueSize(int max_queue_size) { manager_->SetMaxQueueSize(max_queue_size); } void SetHeader(const Packet& header); void AddPacket(const Packet& packet) { shard_.AddPacket(packet); } void AddPacket(Packet&& packet) { shard_.AddPacket(std::move(packet)); } void SetNextTimestampBound(Timestamp timestamp); void PropagateUpdatesToMirrors(); void Close(); bool IsClosed() const { return manager_->IsClosed(); } OutputStreamManager* GetManager() { return manager_; } private: OutputStreamManager* manager_ = nullptr; OutputStreamShard shard_; }; // Initializes the graph from a ValidatedGraphConfig object. absl::Status Initialize(std::unique_ptr validated_graph, const std::map& side_packets); // AddPacketToInputStreamInternal template is called by either // AddPacketToInputStream(Packet&& packet) or // AddPacketToInputStream(const Packet& packet). template absl::Status AddPacketToInputStreamInternal(const std::string& stream_name, T&& packet); // Sets the executor that will run the nodes assigned to the executor // named |name|. If |name| is empty, this sets the default executor. // Does not check that the graph is uninitialized and |name| is not a // reserved executor name. absl::Status SetExecutorInternal(const std::string& name, std::shared_ptr executor); // If the num_threads field in default_executor_options is not specified, // assigns a reasonable value based on system configuration and the graph. // Then, creates the default thread pool if appropriate. // // Only called by InitializeExecutors(). absl::Status InitializeDefaultExecutor( const ThreadPoolExecutorOptions* default_executor_options, bool use_application_thread); // Creates a thread pool as the default executor. The num_threads argument // overrides the num_threads field in default_executor_options. absl::Status CreateDefaultThreadPool( const ThreadPoolExecutorOptions* default_executor_options, int num_threads); // Returns true if |name| is a reserved executor name. static bool IsReservedExecutorName(const std::string& name); // Helper functions for Initialize(). absl::Status InitializeExecutors(); absl::Status InitializePacketGeneratorGraph( const std::map& side_packets); absl::Status InitializeStreams(); absl::Status InitializeProfiler(); absl::Status InitializeCalculatorNodes(); absl::Status InitializePacketGeneratorNodes( const std::vector& non_scheduled_generators); // Iterates through all nodes and schedules any that can be opened. void ScheduleAllOpenableNodes(); // Does the bulk of the work for StartRun but does not start the scheduler. absl::Status PrepareForRun( const std::map& extra_side_packets, const std::map& stream_headers); absl::Status PrepareServices(); #if !MEDIAPIPE_DISABLE_GPU absl::Status MaybeSetUpGpuServiceFromLegacySidePacket(Packet legacy_sp); // Helper for PrepareForRun. If it returns a non-empty map, those packets // must be added to the existing side packets, replacing existing values // that have the same key. std::map MaybeCreateLegacyGpuSidePacket( Packet legacy_sp); absl::Status PrepareGpu(); #endif // !MEDIAPIPE_DISABLE_GPU // Cleans up any remaining state after the run and returns any errors that may // have occurred during the run. Called after the scheduler has terminated. absl::Status FinishRun(); // Cleans up any remaining state after the run. All status handlers run here // if their requested input side packets exist. // The original |*status| is passed to all the status handlers. If any status // handler fails, it appends its error to errors_, and CleanupAfterRun sets // |*status| to the new combined errors on return. void CleanupAfterRun(absl::Status* status) ABSL_LOCKS_EXCLUDED(error_mutex_); // Calls HandlePreRunStatus or HandleStatus on the StatusHandlers. Which one // is called depends on the GraphRunState parameter (PRE_RUN or POST_RUN). // current_run_side_packets_ must be set before this function is called. // On error, has_error_ will be set. void CallStatusHandlers(GraphRunState graph_run_state, const absl::Status& status); // Callback function to throttle or unthrottle source nodes when a stream // becomes full or non-full. A node is throttled (i.e. prevented being // scheduled) if it has caused a downstream input queue to become full. Note // that all sources (including graph input streams) that affect this stream // will be throttled. A node is unthrottled (i.e. added to the scheduler // queue) if all downstream input queues have become non-full. // // This method is invoked from an input stream when its queue becomes full or // non-full. However, since streams are not allowed to hold any locks while // invoking a callback, this method must re-lock the stream and query its // status before taking any action. void UpdateThrottledNodes(InputStreamManager* stream, bool* stream_was_full); #if !MEDIAPIPE_DISABLE_GPU // Owns the legacy GpuSharedData if we need to create one for backwards // compatibility. std::unique_ptr legacy_gpu_shared_; #endif // !MEDIAPIPE_DISABLE_GPU // True if the graph was initialized. bool initialized_ = false; // A packet type that has SetAny() called on it. PacketType any_packet_type_; // The ValidatedGraphConfig object defining this CalculatorGraph. std::unique_ptr validated_graph_; // The PacketGeneratorGraph to use to generate all the input side packets. PacketGeneratorGraph packet_generator_graph_; // True if the graph has source nodes. bool has_sources_ = false; // A flat array of InputStreamManager/OutputStreamManager/ // OutputSidePacketImpl/CalculatorNode corresponding to the input/output // stream indexes, output side packet indexes, and calculator indexes // respectively in validated_graph_. // Once allocated these structures must not be reallocated since // internal structures may point to individual entries in the array. std::unique_ptr input_stream_managers_; std::unique_ptr output_stream_managers_; std::unique_ptr output_side_packets_; std::vector> nodes_; bool packet_generator_nodes_added_ = false; // The graph output streams. std::vector> graph_output_streams_; // Maximum queue size for an input stream. This is used by the scheduler to // restrict memory usage. int max_queue_size_ = -1; // Mode for adding packets to a graph input stream. Set to block until all // affected input streams are not full by default. GraphInputStreamAddMode graph_input_stream_add_mode_ ABSL_GUARDED_BY(full_input_streams_mutex_); // For a source node or graph input stream (specified using id), // this stores the set of dependent input streams that have hit their // maximum capacity. Graph input streams are also treated as nodes. // A node is scheduled only if this set is empty. Similarly, a packet // is added to a graph input stream only if this set is empty. // Note that this vector contains an unused entry for each non-source node. std::vector> full_input_streams_ ABSL_GUARDED_BY(full_input_streams_mutex_); // Maps stream names to graph input stream objects. absl::flat_hash_map> graph_input_streams_; // Maps graph input streams to their virtual node ids. absl::flat_hash_map graph_input_stream_node_ids_; // Maps graph input streams to their max queue size. absl::flat_hash_map graph_input_stream_max_queue_size_; // The factory for making counters associated with this graph. std::unique_ptr counter_factory_; // Executors for the scheduler, keyed by the executor's name. The default // executor's name is the empty string. std::map> executors_; // The processed input side packet map for this run. std::map current_run_side_packets_; // Object to manage graph services. GraphServiceManager service_manager_; // Vector of errors encountered while running graph. Always use RecordError() // to add an error to this vector. std::vector errors_ ABSL_GUARDED_BY(error_mutex_); // True if the default executor uses the application thread. bool use_application_thread_ = false; // Condition variable that waits until all input streams that depend on a // graph input stream are below the maximum queue size. absl::CondVar wait_to_add_packet_cond_var_ ABSL_GUARDED_BY(full_input_streams_mutex_); // Mutex for the vector of errors. absl::Mutex error_mutex_; // Status variable to indicate if the graph has encountered an error. std::atomic has_error_; // Mutex for full_input_streams_. mutable absl::Mutex full_input_streams_mutex_; // Number of closed graph input streams. This is a separate variable because // it is not safe to hold a lock on the scheduler while calling Close() on an // input stream. Hence, we decouple the closing of the stream and checking its // status. // TODO: update this comment. std::atomic num_closed_graph_input_streams_; // The graph tracing and profiling interface. It is owned by the // CalculatorGraph using a shared_ptr in order to allow threadsafe access // to the ProfilingContext from clients that may outlive the CalculatorGraph // such as GlContext. It is declared here before the Scheduler so that it // remains available during the Scheduler destructor. std::shared_ptr profiler_; internal::Scheduler scheduler_; }; } // namespace mediapipe #endif // MEDIAPIPE_FRAMEWORK_CALCULATOR_GRAPH_H_