Improving throttling logs by providing a node info corresponding to a throttling stream.

PiperOrigin-RevId: 561396272
This commit is contained in:
MediaPipe Team 2023-08-30 11:17:02 -07:00 committed by Copybara-Service
parent 45b0271ded
commit 5434b840f6
6 changed files with 208 additions and 28 deletions

View File

@ -323,7 +323,6 @@ cc_library(
":input_stream_manager",
":mediapipe_profiling",
":output_side_packet_impl",
":output_stream",
":output_stream_manager",
":output_stream_poller",
":output_stream_shard",
@ -337,6 +336,7 @@ cc_library(
":scheduler_queue",
":status_handler",
":status_handler_cc_proto",
":subgraph",
":thread_pool_executor",
":thread_pool_executor_cc_proto",
":timestamp",
@ -344,6 +344,7 @@ cc_library(
"//mediapipe/framework/port:core_proto",
"//mediapipe/framework/port:integral_types",
"//mediapipe/framework/port:logging",
"//mediapipe/framework/port:map_util",
"//mediapipe/framework/port:ret_check",
"//mediapipe/framework/port:source_location",
"//mediapipe/framework/port:status",
@ -357,12 +358,12 @@ cc_library(
"//mediapipe/gpu:graph_support",
"//mediapipe/util:cpu_util",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/container:fixed_array",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
"@com_google_absl//absl/log",
"@com_google_absl//absl/memory",
"@com_google_absl//absl/status",
"@com_google_absl//absl/status:statusor",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/synchronization",
@ -1184,7 +1185,6 @@ cc_library(
":calculator_contract",
":graph_service_manager",
":legacy_calculator_support",
":packet",
":packet_generator",
":packet_generator_cc_proto",
":packet_set",
@ -1195,7 +1195,6 @@ cc_library(
":stream_handler_cc_proto",
":subgraph",
":thread_pool_executor_cc_proto",
":timestamp",
"//mediapipe/framework/port:core_proto",
"//mediapipe/framework/port:integral_types",
"//mediapipe/framework/port:logging",
@ -1209,10 +1208,10 @@ cc_library(
"//mediapipe/framework/tool:subgraph_expansion",
"//mediapipe/framework/tool:validate",
"//mediapipe/framework/tool:validate_name",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/container:flat_hash_set",
"@com_google_absl//absl/memory",
"@com_google_absl//absl/strings",
"@com_google_protobuf//:protobuf",
],
)
@ -1470,6 +1469,7 @@ cc_test(
"//mediapipe/gpu:gpu_service",
"@com_google_absl//absl/container:fixed_array",
"@com_google_absl//absl/memory",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/time",

View File

@ -17,13 +17,14 @@
#include <stdio.h>
#include <algorithm>
#include <cstdint>
#include <functional>
#include <map>
#include <memory>
#include <queue>
#include <string>
#include <utility>
#include <vector>
#include "absl/container/fixed_array.h"
#include "absl/container/flat_hash_set.h"
#include "absl/log/log.h"
#include "absl/memory/memory.h"
@ -38,9 +39,15 @@
#include "mediapipe/framework/calculator_base.h"
#include "mediapipe/framework/counter_factory.h"
#include "mediapipe/framework/delegating_executor.h"
#include "mediapipe/framework/executor.h"
#include "mediapipe/framework/graph_output_stream.h"
#include "mediapipe/framework/graph_service_manager.h"
#include "mediapipe/framework/input_stream_manager.h"
#include "mediapipe/framework/mediapipe_profiling.h"
#include "mediapipe/framework/output_side_packet_impl.h"
#include "mediapipe/framework/output_stream_manager.h"
#include "mediapipe/framework/output_stream_poller.h"
#include "mediapipe/framework/packet.h"
#include "mediapipe/framework/packet_generator.h"
#include "mediapipe/framework/packet_generator.pb.h"
#include "mediapipe/framework/packet_set.h"
@ -49,14 +56,18 @@
#include "mediapipe/framework/port/canonical_errors.h"
#include "mediapipe/framework/port/core_proto_inc.h"
#include "mediapipe/framework/port/logging.h"
#include "mediapipe/framework/port/map_util.h"
#include "mediapipe/framework/port/ret_check.h"
#include "mediapipe/framework/port/source_location.h"
#include "mediapipe/framework/port/status.h"
#include "mediapipe/framework/port/status_builder.h"
#include "mediapipe/framework/port/status_macros.h"
#include "mediapipe/framework/scheduler.h"
#include "mediapipe/framework/status_handler.h"
#include "mediapipe/framework/status_handler.pb.h"
#include "mediapipe/framework/thread_pool_executor.h"
#include "mediapipe/framework/thread_pool_executor.pb.h"
#include "mediapipe/framework/timestamp.h"
#include "mediapipe/framework/tool/fill_packet_set.h"
#include "mediapipe/framework/tool/status_util.h"
#include "mediapipe/framework/tool/tag_map.h"
@ -133,7 +144,7 @@ CalculatorGraph::CalculatorGraph(CalculatorGraphConfig config)
// they only need to be fully visible here, where their destructor is
// instantiated.
CalculatorGraph::~CalculatorGraph() {
// Stop periodic profiler output to ublock Executor destructors.
// Stop periodic profiler output to unblock Executor destructors.
absl::Status status = profiler()->Stop();
if (!status.ok()) {
LOG(ERROR) << "During graph destruction: " << status;
@ -180,6 +191,7 @@ absl::Status CalculatorGraph::InitializeStreams() {
const EdgeInfo& edge_info = validated_graph_->InputStreamInfos()[index];
MP_RETURN_IF_ERROR(input_stream_managers_[index].Initialize(
edge_info.name, edge_info.packet_type, edge_info.back_edge));
input_stream_to_index_[&input_stream_managers_[index]] = index;
}
// Create and initialize the output streams.
@ -1223,7 +1235,7 @@ bool CalculatorGraph::UnthrottleSources() {
// NOTE: We can be sure that this function will grow input streams enough
// to unthrottle at least one source node. The current stream queue sizes
// will remain unchanged until at least one source node becomes unthrottled.
// This is a sufficient because succesfully growing at least one full input
// This is a sufficient because successfully growing at least one full input
// stream during each call to UnthrottleSources will eventually resolve
// each deadlock.
absl::flat_hash_set<InputStreamManager*> full_streams;
@ -1243,7 +1255,8 @@ bool CalculatorGraph::UnthrottleSources() {
for (InputStreamManager* stream : full_streams) {
if (Config().report_deadlock()) {
RecordError(absl::UnavailableError(absl::StrCat(
"Detected a deadlock due to input throttling for: \"", stream->Name(),
"Detected a deadlock due to input throttling for input stream: \"",
stream->Name(), "\" of a node \"", GetParentNodeDebugName(stream),
"\". All calculators are idle while packet sources remain active "
"and throttled. Consider adjusting \"max_queue_size\" or "
"\"report_deadlock\".")));
@ -1251,10 +1264,11 @@ bool CalculatorGraph::UnthrottleSources() {
}
int new_size = stream->QueueSize() + 1;
stream->SetMaxQueueSize(new_size);
LOG_EVERY_N(WARNING, 100)
<< "Resolved a deadlock by increasing max_queue_size of input stream: "
<< stream->Name() << " to: " << new_size
<< ". Consider increasing max_queue_size for better performance.";
LOG_EVERY_N(WARNING, 100) << absl::StrCat(
"Resolved a deadlock by increasing max_queue_size of input stream: \"",
stream->Name(), "\" of a node \"", GetParentNodeDebugName(stream),
"\" to ", new_size,
". Consider increasing max_queue_size for better performance.");
}
return !full_streams.empty();
}
@ -1393,6 +1407,27 @@ std::string CalculatorGraph::ListSourceNodes() const {
return absl::StrJoin(sources, ", ");
}
std::string CalculatorGraph::GetParentNodeDebugName(
InputStreamManager* stream) const {
auto iter = input_stream_to_index_.find(stream);
if (iter == input_stream_to_index_.end()) {
return absl::StrCat("Unknown (node with input stream: ", stream->Name(),
")");
}
const int input_stream_index = iter->second;
const EdgeInfo& edge_info =
validated_graph_->InputStreamInfos()[input_stream_index];
const int node_index = edge_info.parent_node.index;
const CalculatorGraphConfig& config = validated_graph_->Config();
if (node_index < 0 || node_index >= config.node_size()) {
return absl::StrCat("Unknown (node index: ", node_index,
", with input stream: ", stream->Name(), ")");
}
return DebugName(config.node(node_index));
}
namespace {
void PrintTimingToInfo(const std::string& label, int64_t timer_value) {
const int64_t total_seconds = timer_value / 1000000ll;

View File

@ -26,10 +26,12 @@
#include <utility>
#include <vector>
#include "absl/base/macros.h"
#include "absl/container/fixed_array.h"
#include "absl/base/attributes.h"
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/synchronization/mutex.h"
#include "mediapipe/framework/calculator.pb.h"
#include "mediapipe/framework/calculator_base.h"
@ -41,18 +43,17 @@
#include "mediapipe/framework/graph_service_manager.h"
#include "mediapipe/framework/mediapipe_profiling.h"
#include "mediapipe/framework/output_side_packet_impl.h"
#include "mediapipe/framework/output_stream.h"
#include "mediapipe/framework/output_stream_manager.h"
#include "mediapipe/framework/output_stream_poller.h"
#include "mediapipe/framework/output_stream_shard.h"
#include "mediapipe/framework/packet.h"
#include "mediapipe/framework/packet_generator.pb.h"
#include "mediapipe/framework/packet_generator_graph.h"
#include "mediapipe/framework/port.h"
#include "mediapipe/framework/port/integral_types.h"
#include "mediapipe/framework/port/status.h"
#include "mediapipe/framework/scheduler.h"
#include "mediapipe/framework/scheduler_shared.h"
#include "mediapipe/framework/subgraph.h"
#include "mediapipe/framework/thread_pool_executor.pb.h"
#include "mediapipe/framework/timestamp.h"
#include "mediapipe/framework/validated_graph_config.h"
namespace mediapipe {
@ -600,6 +601,9 @@ class CalculatorGraph {
// Returns a comma-separated list of source nodes.
std::string ListSourceNodes() const;
// Returns a parent node name for the given input stream.
std::string GetParentNodeDebugName(InputStreamManager* stream) const;
#if !MEDIAPIPE_DISABLE_GPU
// Owns the legacy GpuSharedData if we need to create one for backwards
// compatibility.
@ -655,6 +659,9 @@ class CalculatorGraph {
std::vector<absl::flat_hash_set<InputStreamManager*>> full_input_streams_
ABSL_GUARDED_BY(full_input_streams_mutex_);
// Input stream to index within `input_stream_managers_` mapping.
absl::flat_hash_map<InputStreamManager*, int> input_stream_to_index_;
// Maps stream names to graph input stream objects.
absl::flat_hash_map<std::string, std::unique_ptr<GraphInputStream>>
graph_input_streams_;

View File

@ -17,16 +17,20 @@
#include <pthread.h>
#include <atomic>
#include <cstdint>
#include <ctime>
#include <deque>
#include <functional>
#include <map>
#include <memory>
#include <string>
#include <tuple>
#include <utility>
#include <vector>
#include "absl/container/fixed_array.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/strings/escaping.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
@ -2549,6 +2553,129 @@ TEST(CalculatorGraph, OutputPacketInOpen2) {
EXPECT_EQ(Timestamp(i), packet_dump[i].Timestamp());
}
TEST(CalculatorGraph, DeadlockIsReportedAndSufficientInfoProvided) {
CalculatorGraphConfig config =
mediapipe::ParseTextProtoOrDie<CalculatorGraphConfig>(R"pb(
report_deadlock: true
max_queue_size: 1
input_stream: 'input1'
input_stream: 'input2'
node {
calculator: 'PassThroughCalculator'
input_stream: 'input1'
input_stream: 'input2'
output_stream: 'output1'
output_stream: 'output2'
}
)pb");
CalculatorGraph graph;
MP_ASSERT_OK(graph.Initialize(config));
MP_ASSERT_OK(graph.StartRun({}));
Packet packet = MakePacket<int>(1);
MP_EXPECT_OK(graph.AddPacketToInputStream("input1", packet.At(Timestamp(0))));
absl::Status status =
graph.AddPacketToInputStream("input1", packet.At(Timestamp(1)));
EXPECT_EQ(status.code(), absl::StatusCode::kUnavailable);
EXPECT_THAT(status.message(),
testing::AllOf(testing::HasSubstr("deadlock"),
testing::HasSubstr("input1"),
testing::HasSubstr("PassThroughCalculator")));
graph.Cancel();
}
TEST(CalculatorGraph,
DeadlockIsReportedAndSufficientInfoProvidedMultipleCalculators) {
CalculatorGraphConfig config =
mediapipe::ParseTextProtoOrDie<CalculatorGraphConfig>(R"pb(
report_deadlock: true
max_queue_size: 1
input_stream: 'input1'
input_stream: 'input2'
node {
calculator: 'PassThroughCalculator'
input_stream: 'input1'
input_stream: 'input2'
output_stream: 'output1'
output_stream: 'output2'
}
node {
calculator: 'MergeCalculator'
input_stream: 'output1'
input_stream: 'output2'
output_stream: 'output3'
}
)pb");
CalculatorGraph graph;
MP_ASSERT_OK(graph.Initialize(config));
MP_ASSERT_OK(graph.StartRun({}));
Packet packet = MakePacket<int>(1);
MP_EXPECT_OK(graph.AddPacketToInputStream("input1", packet.At(Timestamp(0))));
absl::Status status =
graph.AddPacketToInputStream("input1", packet.At(Timestamp(1)));
EXPECT_EQ(status.code(), absl::StatusCode::kUnavailable);
EXPECT_THAT(status.message(),
testing::AllOf(testing::HasSubstr("deadlock"),
testing::HasSubstr("input1"),
testing::HasSubstr("PassThroughCalculator")));
graph.Cancel();
}
TEST(CalculatorGraph, TwoDeadlocksAreReportedAndSufficientInfoProvided) {
CalculatorGraphConfig config =
mediapipe::ParseTextProtoOrDie<CalculatorGraphConfig>(R"pb(
report_deadlock: true
max_queue_size: 1
input_stream: 'input1'
input_stream: 'input2'
node {
calculator: 'PassThroughCalculator'
input_stream: 'input1'
input_stream: 'input2'
output_stream: 'output1'
output_stream: 'output2'
}
node {
calculator: 'PassThroughCalculator'
input_stream: 'output1'
input_stream: 'output2'
output_stream: 'output3'
output_stream: 'output4'
}
node {
calculator: 'MergeCalculator'
input_stream: 'input1'
input_stream: 'output1'
input_stream: 'output2'
input_stream: 'output3'
input_stream: 'output4'
output_stream: 'output5'
}
)pb");
CalculatorGraph graph;
MP_ASSERT_OK(graph.Initialize(config));
MP_ASSERT_OK(graph.StartRun({}));
Packet packet = MakePacket<int>(1);
MP_EXPECT_OK(graph.AddPacketToInputStream("input1", packet.At(Timestamp(0))));
absl::Status status =
graph.AddPacketToInputStream("input1", packet.At(Timestamp(1)));
EXPECT_EQ(status.code(), absl::StatusCode::kUnavailable);
EXPECT_THAT(status.message(),
testing::AllOf(testing::HasSubstr("deadlock"),
testing::HasSubstr("input1"),
testing::HasSubstr("PassThroughCalculator"),
testing::HasSubstr("MergeCalculator")));
graph.Cancel();
}
// Tests that no packets are available on input streams in Open(), even if the
// upstream calculator outputs a packet in Open().
TEST(CalculatorGraph, EmptyInputInOpen) {
@ -2619,7 +2746,7 @@ TEST(CalculatorGraph, UnthrottleRespectsLayers) {
std::map<std::string, Packet> input_side_packets;
input_side_packets["global_counter"] = Adopt(new auto(&global_counter));
// TODO: Set this value to true. When the calculator outputs a
// packet in Open, it will trigget b/33568859, and the test will fail. Use
// packet in Open, it will trigger b/33568859, and the test will fail. Use
// this test to verify that b/33568859 is fixed.
constexpr bool kOutputInOpen = true;
input_side_packets["output_in_open"] = MakePacket<bool>(kOutputInOpen);
@ -3339,7 +3466,7 @@ TEST(CalculatorGraph, SetInputStreamMaxQueueSizeWorksSlowCalculator) {
// Verify the scheduler unthrottles the graph input stream to avoid a deadlock,
// and won't enter a busy loop.
TEST(CalculatorGraph, AddPacketNoBusyLoop) {
// The DecimatorCalculator ouputs 1 out of every 101 input packets and drops
// The DecimatorCalculator outputs 1 out of every 101 input packets and drops
// the rest, without setting the next timestamp bound on its output. As a
// result, the MergeCalculator is not runnable in between and packets on its
// "in" input stream will be queued and exceed the max queue size.

View File

@ -15,6 +15,7 @@
#include "mediapipe/framework/validated_graph_config.h"
#include <memory>
#include <string>
#include "absl/container/flat_hash_set.h"
#include "absl/memory/memory.h"
@ -33,6 +34,7 @@
#include "mediapipe/framework/port/core_proto_inc.h"
#include "mediapipe/framework/port/integral_types.h"
#include "mediapipe/framework/port/logging.h"
#include "mediapipe/framework/port/proto_ns.h"
#include "mediapipe/framework/port/ret_check.h"
#include "mediapipe/framework/port/source_location.h"
#include "mediapipe/framework/port/status.h"
@ -49,8 +51,6 @@
namespace mediapipe {
namespace {
// Create a debug string name for a set of edge. An edge can be either
// a stream or a side packet.
std::string DebugEdgeNames(
@ -78,6 +78,8 @@ std::string DebugName(const CalculatorGraphConfig::Node& node_config) {
return name;
}
namespace {
std::string DebugName(const PacketGeneratorConfig& node_config) {
return absl::StrCat(
"[", node_config.packet_generator(), ", ",
@ -98,7 +100,7 @@ std::string DebugName(const CalculatorGraphConfig& config,
NodeTypeInfo::NodeType node_type, int node_index) {
switch (node_type) {
case NodeTypeInfo::NodeType::CALCULATOR:
return DebugName(config.node(node_index));
return mediapipe::DebugName(config.node(node_index));
case NodeTypeInfo::NodeType::PACKET_GENERATOR:
return DebugName(config.packet_generator(node_index));
case NodeTypeInfo::NodeType::GRAPH_INPUT_STREAM:
@ -900,8 +902,8 @@ absl::Status ValidatedGraphConfig::ValidateSidePacketTypes() {
"\"$3\" but the connected output side packet will be of type \"$4\"",
side_packet.name,
NodeTypeInfo::NodeTypeToString(side_packet.parent_node.type),
mediapipe::DebugName(config_, side_packet.parent_node.type,
side_packet.parent_node.index),
DebugName(config_, side_packet.parent_node.type,
side_packet.parent_node.index),
side_packet.packet_type->DebugTypeName(),
output_side_packets_[side_packet.upstream]
.packet_type->DebugTypeName()));

View File

@ -16,15 +16,18 @@
#define MEDIAPIPE_FRAMEWORK_VALIDATED_GRAPH_CONFIG_H_
#include <map>
#include <string>
#include <vector>
#include "absl/container/flat_hash_set.h"
#include "google/protobuf/repeated_ptr_field.h"
#include "mediapipe/framework/calculator.pb.h"
#include "mediapipe/framework/calculator_contract.h"
#include "mediapipe/framework/graph_service_manager.h"
#include "mediapipe/framework/packet_generator.pb.h"
#include "mediapipe/framework/packet_type.h"
#include "mediapipe/framework/port/map_util.h"
#include "mediapipe/framework/port/proto_ns.h"
#include "mediapipe/framework/port/status.h"
#include "mediapipe/framework/port/status_builder.h"
#include "mediapipe/framework/status_handler.pb.h"
@ -34,6 +37,12 @@ namespace mediapipe {
class ValidatedGraphConfig;
std::string DebugEdgeNames(
const std::string& edge_type,
const proto_ns::RepeatedPtrField<ProtoString>& edges);
std::string DebugName(const CalculatorGraphConfig::Node& node_config);
// Type information for a graph node (Calculator, Generator, etc).
class NodeTypeInfo {
public: