// Copyright 2019 The MediaPipe Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef MEDIAPIPE_FRAMEWORK_INPUT_STREAM_HANDLER_H_ #define MEDIAPIPE_FRAMEWORK_INPUT_STREAM_HANDLER_H_ #include #include #include #include #include #include #include // TODO: Move protos in another CL after the C++ code migration. #include "mediapipe/framework/calculator_context.h" #include "mediapipe/framework/calculator_context_manager.h" #include "mediapipe/framework/collection.h" #include "mediapipe/framework/collection_item_id.h" #include "mediapipe/framework/deps/registration.h" #include "mediapipe/framework/input_stream_manager.h" #include "mediapipe/framework/input_stream_shard.h" #include "mediapipe/framework/mediapipe_options.pb.h" #include "mediapipe/framework/packet.h" #include "mediapipe/framework/packet_set.h" #include "mediapipe/framework/packet_type.h" #include "mediapipe/framework/port/status.h" #include "mediapipe/framework/tool/tag_map.h" namespace mediapipe { // Indicates the operation the node is ready for. enum class NodeReadiness { // The node is not ready. kNotReady, // The node is ready and we should run Process(). kReadyForProcess, // The node is ready and we should run Close(). kReadyForClose, }; // Abstract base class for input stream handlers. // // The input stream handler is invoked every time a Packet is enqueued or the // next timestamp bound is set in any of the input streams and actually decides // whether a calculator node's Process() should be called. The input stream // handler owns and manages the InputStreamManager objects. The operations // performed by OutputStreamHandler on an InputStreamManager object need to go // through its input stream handler. // // Derived classes are required to implement the following member functions: // // NodeReadiness GetNodeReadiness(Timestamp* min_stream_timestamp); // void FillInputSet(Timestamp input_timestamp, // InputStreamShardSet* input_set); // class InputStreamHandler { public: InputStreamHandler(std::shared_ptr tag_map, CalculatorContextManager* calculator_context_manager, const MediaPipeOptions& options, bool calculator_run_in_parallel) : input_stream_managers_(std::move(tag_map)), calculator_context_manager_(calculator_context_manager), options_(options), calculator_run_in_parallel_(calculator_run_in_parallel) {} virtual ~InputStreamHandler() = default; // Initializes the InputStreamManagerSet object. // flat_input_stream_managers is expected to point to a contiguous // flat array with InputStreamManagers corresponding to the id's in // InputStreamHandler::input_stream_managers_ (meaning it should point // to somewhere in the middle of the master flat array of all input // stream managers). absl::Status InitializeInputStreamManagers( InputStreamManager* flat_input_stream_managers); InputStreamManager* GetInputStreamManager(CollectionItemId id); // Sets up the InputStreamShardSet by propagating data from the managers. absl::Status SetupInputShards(InputStreamShardSet* input_shards); // Returns a vector of pairs of stream name and queue size for monitoring // purpose. std::vector> GetMonitoringInfo(); // Resets the input stream handler and its underlying input streams for // another run of the graph. // InputStreamHandler subclass can override this function to meet some special // needs while preparing the calculator node for run. // TODO: Makes PrepareForRun() non-virtual and makes // InitializeInputStreamManagers() virtual instead. virtual void PrepareForRun( std::function headers_ready_callback, std::function notification_callback, std::function schedule_callback, std::function error_callback); int NumInputStreams() const { return input_stream_managers_.NumEntries(); } // Returns the tag map of the input streams. const std::shared_ptr& InputTagMap() const { return input_stream_managers_.TagMap(); } // Sets header into a particular stream. void SetHeader(CollectionItemId id, const Packet& header); // Updates the header packets in the input shards. void UpdateInputShardHeaders(InputStreamShardSet* input_shards); // Sets max queue size of every stream. void SetMaxQueueSize(int max_queue_size); // Sets max queue size of a particular stream. void SetMaxQueueSize(CollectionItemId id, int max_queue_size); void SetQueueSizeCallbacks( InputStreamManager::QueueSizeCallback becomes_full_callback, InputStreamManager::QueueSizeCallback becomes_not_full_callback); // Add packets into a particular stream. virtual void AddPackets(CollectionItemId id, const std::list& packets); // Moves packets into a particular stream. virtual void MovePackets(CollectionItemId id, std::list* packets); // Sets next timestamp bound in a particular stream. void SetNextTimestampBound(CollectionItemId id, Timestamp bound); // Clears the current packet of every stream shard and removes the current // timestamp from the calculator context. void ClearCurrentInputs(CalculatorContext* calculator_context); void Close(); // Returns a std::string that concatenates the stream names of all managed // streams. std::string DebugStreamNames() const; // Keeps scheduling new invocations until 1) the node is not ready or 2) the // max number of invocations that are allowed to be scheduled is reached. // Returns true if at least one invocation has been scheduled. // The latest minimum timestamp bound of the input streams is returned in // *input_bound if the latest readiness of the node is kNotReady when the // function returns. During batching, this value will be equal to the // timestamp of the first set of inputs in the batch. In other cases, // Timestamp::Unset() is returned. // This method can only be invoked in the schedule phase. bool ScheduleInvocations(int max_allowance, Timestamp* input_bound); // Finalizes the input set before the calculator node's Process() is // called at the given input timestamp. void FinalizeInputSet(Timestamp timestamp, InputStreamShardSet* input_set); // Returns the number of input stream headers (excluding headers of back // edges) that are not set. int UnsetHeaderCount() const { return unset_header_count_.load(std::memory_order_relaxed); } // When true, Calculator::Process is called for any increase in the // timestamp bound, whether or not any packets are available. // Calculator::Process is called when the minimum timestamp bound // increases for any synchronized set of input streams. // DefaultInputStreamHandler groups all input streams into a single set. // ImmediateInputStreamHandler treats each input stream as a separate set. void SetProcessTimestampBounds(bool process_ts) { process_timestamps_ = process_ts; } // When true, Calculator::Process is called for every input timestamp bound. bool ProcessTimestampBounds() { return process_timestamps_; } // Returns the number of sync-sets populated by this input stream handler. virtual int SyncSetCount() { return 1; } // A helper class to build input packet sets for a certain set of streams. // // ReadyForProcess requires all of the streams to be fully determined // at the same input-timestamp. // This is the readiness policy for all streams in DefaultInputStreamHandler. // It is also the policy for each sync-set in SyncSetInputStreamHandler. // It is also the policy for each input-stream in ImmediateInputStreamHandler. // // If ProcessTimestampBounds() is set, then a fully determined input timestamp // with only empty input packets will qualify as ReadyForProcess. class SyncSet { public: // Creates a SyncSet for a certain set of streams, |stream_ids|. SyncSet(InputStreamHandler* input_stream_handler, std::vector stream_ids); // Reinitializes this SyncSet before each CalculatorGraph run. void PrepareForRun(); // Answers whether this stream is ready for Process or Close. NodeReadiness GetReadiness(Timestamp* min_stream_timestamp); // Returns the latest timestamp returned for processing. Timestamp LastProcessed() const; // The earliest available packet timestamp, or Timestamp::Done. Timestamp MinPacketTimestamp() const; // Moves packets from all input streams to the input_set. void FillInputSet(Timestamp input_timestamp, InputStreamShardSet* input_set); // Copies timestamp bounds from all input streams to the input_set. void FillInputBounds(InputStreamShardSet* input_set); private: InputStreamHandler* input_stream_handler_; std::vector stream_ids_; Timestamp last_processed_ts_ = Timestamp::Unset(); }; protected: typedef internal::Collection InputStreamManagerSet; const MediaPipeOptions& options() const { return options_; } // Subclasses must set batch size to greater than 1 to enable batching. // Batching cannot be combined with late_preparation_ behavior. void SetBatchSize(int batch_size); // Subclasses can enable late preparation; however it cannot be used along // with batching. void SetLatePreparation(bool late_preparation); // Accesses InputStreamShard's private method to add packets. All // InputStreamHandler subclasses are only able to add packets to shards // through this method. // TODO: Renames this method to "SetStreamContents" when // InputStreamShard is renamed. void AddPacketToShard(InputStreamShard* shard, Packet&& value, bool is_done) { shard->AddPacket(std::move(value), is_done); } // Returns the operation the calculator node is ready for. // Specifically: // - NodeReadiness::kNotReady if the node's Process() or Close() cannot be // called yet. The minimum timestamp bound of the input streams is // returned in *min_stream_timestamp. // - NodeReadiness::kReadyForProcess if the node is not waiting for a packet // on any input stream and is thus "ready" for its Process() to be // called. For a source node, Timestamp::Done() is returned in // *min_stream_timestamp. For a non-source node, the input timestamp for // the Process() call is returned in *min_stream_timestamp. // - NodeReadiness::kReadyForClose if the node's Close() can be called. // Timestamp::Done() is returned in *min_stream_timestamp. virtual NodeReadiness GetNodeReadiness(Timestamp* min_stream_timestamp) = 0; // Moves input packets from the input streams into the input set for the given // input timestamp. virtual void FillInputSet(Timestamp input_timestamp, InputStreamShardSet* input_set) = 0; // Collection of InputStreamManager objects. InputStreamManagerSet input_stream_managers_; // A pointer to the calculator context manager of the calculator node. CalculatorContextManager* const calculator_context_manager_; MediaPipeOptions options_; const bool calculator_run_in_parallel_; // Indicates if a calculator context has been prepared for Close(). bool prepared_context_for_close_; // Note: You must not hold any locks while invoking any of these callbacks. // // A callback to notify the observer of the changes on the packet queue or // the timestamp bound. std::function notification_; // A callback to schedule the node with the prepared calculator context. std::function schedule_callback_; std::function error_callback_; private: // Indicates when to fill the input set. If true, every input set will be // prepared in FinalizeInputSet(). Otherwise, the input sets will be filled // in ScheduleInvocations() in the scheduling phase. // The variable is set to false by default. A subclass should set it to true // with SetLatePreparation(true) in the constructor if the input sets need to // be filled in ProcessNode(). bool late_preparation_ = false; // Determines how many sets of input packets are collected before a // CalculatorNode is scheduled. int batch_size_ = 1; // When true, any increase in timestamp bound invokes Calculator::Process. bool process_timestamps_ = false; // A callback to notify the observer when all the input stream headers // (excluding headers of back edges) become available. std::function headers_ready_callback_; std::atomic unset_header_count_{0}; }; using InputStreamHandlerRegistry = GlobalFactoryRegistry< std::unique_ptr, std::shared_ptr, CalculatorContextManager*, const MediaPipeOptions&, bool>; } // namespace mediapipe // Macro for registering the input stream handler. #define REGISTER_INPUT_STREAM_HANDLER(name) \ REGISTER_FACTORY_FUNCTION_QUALIFIED( \ mediapipe::InputStreamHandlerRegistry, input_handler_registration, name, \ absl::make_unique, \ CalculatorContextManager*, const MediaPipeOptions&, \ bool>) #endif // MEDIAPIPE_FRAMEWORK_INPUT_STREAM_HANDLER_H_