From 5434b840f60b98e3a6b5487236bcfd2714447f53 Mon Sep 17 00:00:00 2001 From: MediaPipe Team Date: Wed, 30 Aug 2023 11:17:02 -0700 Subject: [PATCH] Improving throttling logs by providing a node info corresponding to a throttling stream. PiperOrigin-RevId: 561396272 --- mediapipe/framework/BUILD | 10 +- mediapipe/framework/calculator_graph.cc | 53 +++++-- mediapipe/framework/calculator_graph.h | 21 ++- mediapipe/framework/calculator_graph_test.cc | 131 +++++++++++++++++- mediapipe/framework/validated_graph_config.cc | 12 +- mediapipe/framework/validated_graph_config.h | 9 ++ 6 files changed, 208 insertions(+), 28 deletions(-) diff --git a/mediapipe/framework/BUILD b/mediapipe/framework/BUILD index 8a22d2348..3587d5dad 100644 --- a/mediapipe/framework/BUILD +++ b/mediapipe/framework/BUILD @@ -323,7 +323,6 @@ cc_library( ":input_stream_manager", ":mediapipe_profiling", ":output_side_packet_impl", - ":output_stream", ":output_stream_manager", ":output_stream_poller", ":output_stream_shard", @@ -337,6 +336,7 @@ cc_library( ":scheduler_queue", ":status_handler", ":status_handler_cc_proto", + ":subgraph", ":thread_pool_executor", ":thread_pool_executor_cc_proto", ":timestamp", @@ -344,6 +344,7 @@ cc_library( "//mediapipe/framework/port:core_proto", "//mediapipe/framework/port:integral_types", "//mediapipe/framework/port:logging", + "//mediapipe/framework/port:map_util", "//mediapipe/framework/port:ret_check", "//mediapipe/framework/port:source_location", "//mediapipe/framework/port:status", @@ -357,12 +358,12 @@ cc_library( "//mediapipe/gpu:graph_support", "//mediapipe/util:cpu_util", "@com_google_absl//absl/base:core_headers", - "@com_google_absl//absl/container:fixed_array", "@com_google_absl//absl/container:flat_hash_map", "@com_google_absl//absl/container:flat_hash_set", "@com_google_absl//absl/log", "@com_google_absl//absl/memory", "@com_google_absl//absl/status", + "@com_google_absl//absl/status:statusor", "@com_google_absl//absl/strings", "@com_google_absl//absl/strings:str_format", "@com_google_absl//absl/synchronization", @@ -1184,7 +1185,6 @@ cc_library( ":calculator_contract", ":graph_service_manager", ":legacy_calculator_support", - ":packet", ":packet_generator", ":packet_generator_cc_proto", ":packet_set", @@ -1195,7 +1195,6 @@ cc_library( ":stream_handler_cc_proto", ":subgraph", ":thread_pool_executor_cc_proto", - ":timestamp", "//mediapipe/framework/port:core_proto", "//mediapipe/framework/port:integral_types", "//mediapipe/framework/port:logging", @@ -1209,10 +1208,10 @@ cc_library( "//mediapipe/framework/tool:subgraph_expansion", "//mediapipe/framework/tool:validate", "//mediapipe/framework/tool:validate_name", - "@com_google_absl//absl/base:core_headers", "@com_google_absl//absl/container:flat_hash_set", "@com_google_absl//absl/memory", "@com_google_absl//absl/strings", + "@com_google_protobuf//:protobuf", ], ) @@ -1470,6 +1469,7 @@ cc_test( "//mediapipe/gpu:gpu_service", "@com_google_absl//absl/container:fixed_array", "@com_google_absl//absl/memory", + "@com_google_absl//absl/status", "@com_google_absl//absl/strings", "@com_google_absl//absl/strings:str_format", "@com_google_absl//absl/time", diff --git a/mediapipe/framework/calculator_graph.cc b/mediapipe/framework/calculator_graph.cc index afc25e07d..0811fcb7c 100644 --- a/mediapipe/framework/calculator_graph.cc +++ b/mediapipe/framework/calculator_graph.cc @@ -17,13 +17,14 @@ #include #include +#include +#include #include #include -#include +#include #include #include -#include "absl/container/fixed_array.h" #include "absl/container/flat_hash_set.h" #include "absl/log/log.h" #include "absl/memory/memory.h" @@ -38,9 +39,15 @@ #include "mediapipe/framework/calculator_base.h" #include "mediapipe/framework/counter_factory.h" #include "mediapipe/framework/delegating_executor.h" +#include "mediapipe/framework/executor.h" +#include "mediapipe/framework/graph_output_stream.h" #include "mediapipe/framework/graph_service_manager.h" #include "mediapipe/framework/input_stream_manager.h" #include "mediapipe/framework/mediapipe_profiling.h" +#include "mediapipe/framework/output_side_packet_impl.h" +#include "mediapipe/framework/output_stream_manager.h" +#include "mediapipe/framework/output_stream_poller.h" +#include "mediapipe/framework/packet.h" #include "mediapipe/framework/packet_generator.h" #include "mediapipe/framework/packet_generator.pb.h" #include "mediapipe/framework/packet_set.h" @@ -49,14 +56,18 @@ #include "mediapipe/framework/port/canonical_errors.h" #include "mediapipe/framework/port/core_proto_inc.h" #include "mediapipe/framework/port/logging.h" +#include "mediapipe/framework/port/map_util.h" #include "mediapipe/framework/port/ret_check.h" #include "mediapipe/framework/port/source_location.h" #include "mediapipe/framework/port/status.h" #include "mediapipe/framework/port/status_builder.h" +#include "mediapipe/framework/port/status_macros.h" +#include "mediapipe/framework/scheduler.h" #include "mediapipe/framework/status_handler.h" #include "mediapipe/framework/status_handler.pb.h" #include "mediapipe/framework/thread_pool_executor.h" #include "mediapipe/framework/thread_pool_executor.pb.h" +#include "mediapipe/framework/timestamp.h" #include "mediapipe/framework/tool/fill_packet_set.h" #include "mediapipe/framework/tool/status_util.h" #include "mediapipe/framework/tool/tag_map.h" @@ -133,7 +144,7 @@ CalculatorGraph::CalculatorGraph(CalculatorGraphConfig config) // they only need to be fully visible here, where their destructor is // instantiated. CalculatorGraph::~CalculatorGraph() { - // Stop periodic profiler output to ublock Executor destructors. + // Stop periodic profiler output to unblock Executor destructors. absl::Status status = profiler()->Stop(); if (!status.ok()) { LOG(ERROR) << "During graph destruction: " << status; @@ -180,6 +191,7 @@ absl::Status CalculatorGraph::InitializeStreams() { const EdgeInfo& edge_info = validated_graph_->InputStreamInfos()[index]; MP_RETURN_IF_ERROR(input_stream_managers_[index].Initialize( edge_info.name, edge_info.packet_type, edge_info.back_edge)); + input_stream_to_index_[&input_stream_managers_[index]] = index; } // Create and initialize the output streams. @@ -1223,7 +1235,7 @@ bool CalculatorGraph::UnthrottleSources() { // NOTE: We can be sure that this function will grow input streams enough // to unthrottle at least one source node. The current stream queue sizes // will remain unchanged until at least one source node becomes unthrottled. - // This is a sufficient because succesfully growing at least one full input + // This is a sufficient because successfully growing at least one full input // stream during each call to UnthrottleSources will eventually resolve // each deadlock. absl::flat_hash_set full_streams; @@ -1243,7 +1255,8 @@ bool CalculatorGraph::UnthrottleSources() { for (InputStreamManager* stream : full_streams) { if (Config().report_deadlock()) { RecordError(absl::UnavailableError(absl::StrCat( - "Detected a deadlock due to input throttling for: \"", stream->Name(), + "Detected a deadlock due to input throttling for input stream: \"", + stream->Name(), "\" of a node \"", GetParentNodeDebugName(stream), "\". All calculators are idle while packet sources remain active " "and throttled. Consider adjusting \"max_queue_size\" or " "\"report_deadlock\"."))); @@ -1251,10 +1264,11 @@ bool CalculatorGraph::UnthrottleSources() { } int new_size = stream->QueueSize() + 1; stream->SetMaxQueueSize(new_size); - LOG_EVERY_N(WARNING, 100) - << "Resolved a deadlock by increasing max_queue_size of input stream: " - << stream->Name() << " to: " << new_size - << ". Consider increasing max_queue_size for better performance."; + LOG_EVERY_N(WARNING, 100) << absl::StrCat( + "Resolved a deadlock by increasing max_queue_size of input stream: \"", + stream->Name(), "\" of a node \"", GetParentNodeDebugName(stream), + "\" to ", new_size, + ". Consider increasing max_queue_size for better performance."); } return !full_streams.empty(); } @@ -1393,6 +1407,27 @@ std::string CalculatorGraph::ListSourceNodes() const { return absl::StrJoin(sources, ", "); } +std::string CalculatorGraph::GetParentNodeDebugName( + InputStreamManager* stream) const { + auto iter = input_stream_to_index_.find(stream); + if (iter == input_stream_to_index_.end()) { + return absl::StrCat("Unknown (node with input stream: ", stream->Name(), + ")"); + } + + const int input_stream_index = iter->second; + const EdgeInfo& edge_info = + validated_graph_->InputStreamInfos()[input_stream_index]; + const int node_index = edge_info.parent_node.index; + const CalculatorGraphConfig& config = validated_graph_->Config(); + if (node_index < 0 || node_index >= config.node_size()) { + return absl::StrCat("Unknown (node index: ", node_index, + ", with input stream: ", stream->Name(), ")"); + } + + return DebugName(config.node(node_index)); +} + namespace { void PrintTimingToInfo(const std::string& label, int64_t timer_value) { const int64_t total_seconds = timer_value / 1000000ll; diff --git a/mediapipe/framework/calculator_graph.h b/mediapipe/framework/calculator_graph.h index 00c922a3b..4284beb7c 100644 --- a/mediapipe/framework/calculator_graph.h +++ b/mediapipe/framework/calculator_graph.h @@ -26,10 +26,12 @@ #include #include -#include "absl/base/macros.h" -#include "absl/container/fixed_array.h" +#include "absl/base/attributes.h" +#include "absl/base/thread_annotations.h" #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" +#include "absl/status/status.h" +#include "absl/status/statusor.h" #include "absl/synchronization/mutex.h" #include "mediapipe/framework/calculator.pb.h" #include "mediapipe/framework/calculator_base.h" @@ -41,18 +43,17 @@ #include "mediapipe/framework/graph_service_manager.h" #include "mediapipe/framework/mediapipe_profiling.h" #include "mediapipe/framework/output_side_packet_impl.h" -#include "mediapipe/framework/output_stream.h" #include "mediapipe/framework/output_stream_manager.h" #include "mediapipe/framework/output_stream_poller.h" #include "mediapipe/framework/output_stream_shard.h" #include "mediapipe/framework/packet.h" -#include "mediapipe/framework/packet_generator.pb.h" #include "mediapipe/framework/packet_generator_graph.h" -#include "mediapipe/framework/port.h" -#include "mediapipe/framework/port/integral_types.h" -#include "mediapipe/framework/port/status.h" #include "mediapipe/framework/scheduler.h" +#include "mediapipe/framework/scheduler_shared.h" +#include "mediapipe/framework/subgraph.h" #include "mediapipe/framework/thread_pool_executor.pb.h" +#include "mediapipe/framework/timestamp.h" +#include "mediapipe/framework/validated_graph_config.h" namespace mediapipe { @@ -600,6 +601,9 @@ class CalculatorGraph { // Returns a comma-separated list of source nodes. std::string ListSourceNodes() const; + // Returns a parent node name for the given input stream. + std::string GetParentNodeDebugName(InputStreamManager* stream) const; + #if !MEDIAPIPE_DISABLE_GPU // Owns the legacy GpuSharedData if we need to create one for backwards // compatibility. @@ -655,6 +659,9 @@ class CalculatorGraph { std::vector> full_input_streams_ ABSL_GUARDED_BY(full_input_streams_mutex_); + // Input stream to index within `input_stream_managers_` mapping. + absl::flat_hash_map input_stream_to_index_; + // Maps stream names to graph input stream objects. absl::flat_hash_map> graph_input_streams_; diff --git a/mediapipe/framework/calculator_graph_test.cc b/mediapipe/framework/calculator_graph_test.cc index 2e7d99ef6..45522cab4 100644 --- a/mediapipe/framework/calculator_graph_test.cc +++ b/mediapipe/framework/calculator_graph_test.cc @@ -17,16 +17,20 @@ #include #include +#include #include #include +#include #include #include +#include #include #include #include #include "absl/container/fixed_array.h" #include "absl/memory/memory.h" +#include "absl/status/status.h" #include "absl/strings/escaping.h" #include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" @@ -2549,6 +2553,129 @@ TEST(CalculatorGraph, OutputPacketInOpen2) { EXPECT_EQ(Timestamp(i), packet_dump[i].Timestamp()); } +TEST(CalculatorGraph, DeadlockIsReportedAndSufficientInfoProvided) { + CalculatorGraphConfig config = + mediapipe::ParseTextProtoOrDie(R"pb( + report_deadlock: true + max_queue_size: 1 + input_stream: 'input1' + input_stream: 'input2' + node { + calculator: 'PassThroughCalculator' + input_stream: 'input1' + input_stream: 'input2' + output_stream: 'output1' + output_stream: 'output2' + } + )pb"); + + CalculatorGraph graph; + MP_ASSERT_OK(graph.Initialize(config)); + MP_ASSERT_OK(graph.StartRun({})); + + Packet packet = MakePacket(1); + MP_EXPECT_OK(graph.AddPacketToInputStream("input1", packet.At(Timestamp(0)))); + absl::Status status = + graph.AddPacketToInputStream("input1", packet.At(Timestamp(1))); + + EXPECT_EQ(status.code(), absl::StatusCode::kUnavailable); + EXPECT_THAT(status.message(), + testing::AllOf(testing::HasSubstr("deadlock"), + testing::HasSubstr("input1"), + testing::HasSubstr("PassThroughCalculator"))); + graph.Cancel(); +} + +TEST(CalculatorGraph, + DeadlockIsReportedAndSufficientInfoProvidedMultipleCalculators) { + CalculatorGraphConfig config = + mediapipe::ParseTextProtoOrDie(R"pb( + report_deadlock: true + max_queue_size: 1 + input_stream: 'input1' + input_stream: 'input2' + node { + calculator: 'PassThroughCalculator' + input_stream: 'input1' + input_stream: 'input2' + output_stream: 'output1' + output_stream: 'output2' + } + node { + calculator: 'MergeCalculator' + input_stream: 'output1' + input_stream: 'output2' + output_stream: 'output3' + } + )pb"); + + CalculatorGraph graph; + MP_ASSERT_OK(graph.Initialize(config)); + MP_ASSERT_OK(graph.StartRun({})); + + Packet packet = MakePacket(1); + MP_EXPECT_OK(graph.AddPacketToInputStream("input1", packet.At(Timestamp(0)))); + absl::Status status = + graph.AddPacketToInputStream("input1", packet.At(Timestamp(1))); + + EXPECT_EQ(status.code(), absl::StatusCode::kUnavailable); + EXPECT_THAT(status.message(), + testing::AllOf(testing::HasSubstr("deadlock"), + testing::HasSubstr("input1"), + testing::HasSubstr("PassThroughCalculator"))); + graph.Cancel(); +} + +TEST(CalculatorGraph, TwoDeadlocksAreReportedAndSufficientInfoProvided) { + CalculatorGraphConfig config = + mediapipe::ParseTextProtoOrDie(R"pb( + report_deadlock: true + max_queue_size: 1 + input_stream: 'input1' + input_stream: 'input2' + node { + calculator: 'PassThroughCalculator' + input_stream: 'input1' + input_stream: 'input2' + output_stream: 'output1' + output_stream: 'output2' + } + node { + calculator: 'PassThroughCalculator' + input_stream: 'output1' + input_stream: 'output2' + output_stream: 'output3' + output_stream: 'output4' + } + node { + calculator: 'MergeCalculator' + input_stream: 'input1' + input_stream: 'output1' + input_stream: 'output2' + input_stream: 'output3' + input_stream: 'output4' + output_stream: 'output5' + } + )pb"); + + CalculatorGraph graph; + MP_ASSERT_OK(graph.Initialize(config)); + MP_ASSERT_OK(graph.StartRun({})); + + Packet packet = MakePacket(1); + MP_EXPECT_OK(graph.AddPacketToInputStream("input1", packet.At(Timestamp(0)))); + absl::Status status = + graph.AddPacketToInputStream("input1", packet.At(Timestamp(1))); + + EXPECT_EQ(status.code(), absl::StatusCode::kUnavailable); + EXPECT_THAT(status.message(), + testing::AllOf(testing::HasSubstr("deadlock"), + testing::HasSubstr("input1"), + testing::HasSubstr("PassThroughCalculator"), + testing::HasSubstr("MergeCalculator"))); + graph.Cancel(); +} + // Tests that no packets are available on input streams in Open(), even if the // upstream calculator outputs a packet in Open(). TEST(CalculatorGraph, EmptyInputInOpen) { @@ -2619,7 +2746,7 @@ TEST(CalculatorGraph, UnthrottleRespectsLayers) { std::map input_side_packets; input_side_packets["global_counter"] = Adopt(new auto(&global_counter)); // TODO: Set this value to true. When the calculator outputs a - // packet in Open, it will trigget b/33568859, and the test will fail. Use + // packet in Open, it will trigger b/33568859, and the test will fail. Use // this test to verify that b/33568859 is fixed. constexpr bool kOutputInOpen = true; input_side_packets["output_in_open"] = MakePacket(kOutputInOpen); @@ -3339,7 +3466,7 @@ TEST(CalculatorGraph, SetInputStreamMaxQueueSizeWorksSlowCalculator) { // Verify the scheduler unthrottles the graph input stream to avoid a deadlock, // and won't enter a busy loop. TEST(CalculatorGraph, AddPacketNoBusyLoop) { - // The DecimatorCalculator ouputs 1 out of every 101 input packets and drops + // The DecimatorCalculator outputs 1 out of every 101 input packets and drops // the rest, without setting the next timestamp bound on its output. As a // result, the MergeCalculator is not runnable in between and packets on its // "in" input stream will be queued and exceed the max queue size. diff --git a/mediapipe/framework/validated_graph_config.cc b/mediapipe/framework/validated_graph_config.cc index 15eac3209..10d47d874 100644 --- a/mediapipe/framework/validated_graph_config.cc +++ b/mediapipe/framework/validated_graph_config.cc @@ -15,6 +15,7 @@ #include "mediapipe/framework/validated_graph_config.h" #include +#include #include "absl/container/flat_hash_set.h" #include "absl/memory/memory.h" @@ -33,6 +34,7 @@ #include "mediapipe/framework/port/core_proto_inc.h" #include "mediapipe/framework/port/integral_types.h" #include "mediapipe/framework/port/logging.h" +#include "mediapipe/framework/port/proto_ns.h" #include "mediapipe/framework/port/ret_check.h" #include "mediapipe/framework/port/source_location.h" #include "mediapipe/framework/port/status.h" @@ -49,8 +51,6 @@ namespace mediapipe { -namespace { - // Create a debug string name for a set of edge. An edge can be either // a stream or a side packet. std::string DebugEdgeNames( @@ -78,6 +78,8 @@ std::string DebugName(const CalculatorGraphConfig::Node& node_config) { return name; } +namespace { + std::string DebugName(const PacketGeneratorConfig& node_config) { return absl::StrCat( "[", node_config.packet_generator(), ", ", @@ -98,7 +100,7 @@ std::string DebugName(const CalculatorGraphConfig& config, NodeTypeInfo::NodeType node_type, int node_index) { switch (node_type) { case NodeTypeInfo::NodeType::CALCULATOR: - return DebugName(config.node(node_index)); + return mediapipe::DebugName(config.node(node_index)); case NodeTypeInfo::NodeType::PACKET_GENERATOR: return DebugName(config.packet_generator(node_index)); case NodeTypeInfo::NodeType::GRAPH_INPUT_STREAM: @@ -900,8 +902,8 @@ absl::Status ValidatedGraphConfig::ValidateSidePacketTypes() { "\"$3\" but the connected output side packet will be of type \"$4\"", side_packet.name, NodeTypeInfo::NodeTypeToString(side_packet.parent_node.type), - mediapipe::DebugName(config_, side_packet.parent_node.type, - side_packet.parent_node.index), + DebugName(config_, side_packet.parent_node.type, + side_packet.parent_node.index), side_packet.packet_type->DebugTypeName(), output_side_packets_[side_packet.upstream] .packet_type->DebugTypeName())); diff --git a/mediapipe/framework/validated_graph_config.h b/mediapipe/framework/validated_graph_config.h index 95ecccbb4..ec46b62b4 100644 --- a/mediapipe/framework/validated_graph_config.h +++ b/mediapipe/framework/validated_graph_config.h @@ -16,15 +16,18 @@ #define MEDIAPIPE_FRAMEWORK_VALIDATED_GRAPH_CONFIG_H_ #include +#include #include #include "absl/container/flat_hash_set.h" +#include "google/protobuf/repeated_ptr_field.h" #include "mediapipe/framework/calculator.pb.h" #include "mediapipe/framework/calculator_contract.h" #include "mediapipe/framework/graph_service_manager.h" #include "mediapipe/framework/packet_generator.pb.h" #include "mediapipe/framework/packet_type.h" #include "mediapipe/framework/port/map_util.h" +#include "mediapipe/framework/port/proto_ns.h" #include "mediapipe/framework/port/status.h" #include "mediapipe/framework/port/status_builder.h" #include "mediapipe/framework/status_handler.pb.h" @@ -34,6 +37,12 @@ namespace mediapipe { class ValidatedGraphConfig; +std::string DebugEdgeNames( + const std::string& edge_type, + const proto_ns::RepeatedPtrField& edges); + +std::string DebugName(const CalculatorGraphConfig::Node& node_config); + // Type information for a graph node (Calculator, Generator, etc). class NodeTypeInfo { public: