Rollback of "Enable defining and using internal executors in subgraphs."
PiperOrigin-RevId: 561921927
This commit is contained in:
parent
de0c7f2a30
commit
007824594b
|
@ -427,6 +427,7 @@ cc_library(
|
|||
":tag_map",
|
||||
"//mediapipe/framework:calculator_cc_proto",
|
||||
"//mediapipe/framework:graph_service_manager",
|
||||
"//mediapipe/framework:packet_generator",
|
||||
"//mediapipe/framework:packet_generator_cc_proto",
|
||||
"//mediapipe/framework:port",
|
||||
"//mediapipe/framework:status_handler_cc_proto",
|
||||
|
@ -436,12 +437,8 @@ cc_library(
|
|||
"//mediapipe/framework/port:map_util",
|
||||
"//mediapipe/framework/port:ret_check",
|
||||
"//mediapipe/framework/port:status",
|
||||
"@com_google_absl//absl/container:flat_hash_set",
|
||||
"@com_google_absl//absl/log:absl_log",
|
||||
"@com_google_absl//absl/memory",
|
||||
"@com_google_absl//absl/status",
|
||||
"@com_google_absl//absl/strings",
|
||||
"@com_google_absl//absl/strings:str_format",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -785,7 +782,6 @@ cc_test(
|
|||
"//mediapipe/framework/port:ret_check",
|
||||
"//mediapipe/framework/tool/testdata:dub_quad_test_subgraph",
|
||||
"//mediapipe/framework/tool/testdata:nested_test_subgraph",
|
||||
"@com_google_absl//absl/status:statusor",
|
||||
"@com_google_absl//absl/strings",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -23,13 +23,8 @@
|
|||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "absl/container/flat_hash_set.h"
|
||||
#include "absl/log/absl_log.h"
|
||||
#include "absl/memory/memory.h"
|
||||
#include "absl/status/status.h"
|
||||
#include "absl/strings/str_cat.h"
|
||||
#include "absl/strings/str_format.h"
|
||||
#include "mediapipe/framework/calculator.pb.h"
|
||||
#include "mediapipe/framework/graph_service_manager.h"
|
||||
#include "mediapipe/framework/packet_generator.pb.h"
|
||||
#include "mediapipe/framework/port.h"
|
||||
|
@ -128,19 +123,6 @@ absl::Status TransformNames(
|
|||
MP_RETURN_IF_ERROR(TransformStreamNames(
|
||||
status_handler.mutable_input_side_packet(), transform));
|
||||
}
|
||||
// Prefix executor names, but only those defined in the current graph.
|
||||
absl::flat_hash_set<std::string> local_executor_names;
|
||||
for (auto& executor : *config->mutable_executor()) {
|
||||
if (!executor.name().empty()) {
|
||||
local_executor_names.insert(executor.name());
|
||||
*executor.mutable_name() = transform(executor.name());
|
||||
}
|
||||
}
|
||||
for (auto& node : *config->mutable_node()) {
|
||||
if (local_executor_names.contains(node.executor())) {
|
||||
*node.mutable_executor() = transform(node.executor());
|
||||
}
|
||||
}
|
||||
return absl::OkStatus();
|
||||
}
|
||||
|
||||
|
@ -291,41 +273,6 @@ absl::Status ConnectSubgraphStreams(
|
|||
return absl::OkStatus();
|
||||
}
|
||||
|
||||
absl::Status RemoveDuplicateExecutors(
|
||||
const absl::flat_hash_set<std::string>& seen_executors,
|
||||
CalculatorGraphConfig* config) {
|
||||
auto* mutable_executors = config->mutable_executor();
|
||||
auto unique_executors_it = std::remove_if(
|
||||
mutable_executors->begin(), mutable_executors->end(),
|
||||
[&seen_executors](const mediapipe::ExecutorConfig& executor_config) {
|
||||
bool is_duplicate = seen_executors.contains(executor_config.name());
|
||||
// This can happen in the following situation: you define an
|
||||
// executor at the top-level-graph and one or more of your
|
||||
// subgraphs declare executors with the same name as well.
|
||||
//
|
||||
// Historically, executors defined in subgraphs were ignored
|
||||
// (unless you use your subgraph as a top-level-graph).
|
||||
//
|
||||
// Now executors can be defined in subgraphs (their names are
|
||||
// automatically updated to be prefixed with subgraph name). To be
|
||||
// backward compatible, MediaPipe will ignore (remove) executors
|
||||
// defined in subgraphs if they have the same names as one of
|
||||
// top-level-graph defined executors.
|
||||
//
|
||||
// NOTE: If you see this warning, you may want to verify if you
|
||||
// actually use the same executors and consider removing one or
|
||||
// another.
|
||||
if (is_duplicate) {
|
||||
ABSL_LOG(WARNING) << absl::StrFormat(
|
||||
"Removing a duplicate of top-level-graph executor: %s",
|
||||
executor_config.name());
|
||||
}
|
||||
return is_duplicate;
|
||||
});
|
||||
mutable_executors->erase(unique_executors_it, mutable_executors->end());
|
||||
return absl::OkStatus();
|
||||
}
|
||||
|
||||
absl::Status ExpandSubgraphs(CalculatorGraphConfig* config,
|
||||
const GraphRegistry* graph_registry,
|
||||
const Subgraph::SubgraphOptions* graph_options,
|
||||
|
@ -336,12 +283,6 @@ absl::Status ExpandSubgraphs(CalculatorGraphConfig* config,
|
|||
|
||||
MP_RETURN_IF_ERROR(mediapipe::tool::DefineGraphOptions(
|
||||
graph_options ? *graph_options : CalculatorGraphConfig::Node(), config));
|
||||
|
||||
absl::flat_hash_set<std::string> seen_executors;
|
||||
for (int i = 0; i < config->executor_size(); ++i) {
|
||||
seen_executors.insert(config->executor(i).name());
|
||||
}
|
||||
|
||||
auto* nodes = config->mutable_node();
|
||||
while (1) {
|
||||
auto subgraph_nodes_start = std::stable_partition(
|
||||
|
@ -362,7 +303,6 @@ absl::Status ExpandSubgraphs(CalculatorGraphConfig* config,
|
|||
config->package(), node.calculator(),
|
||||
&subgraph_context));
|
||||
MP_RETURN_IF_ERROR(mediapipe::tool::DefineGraphOptions(node, &subgraph));
|
||||
MP_RETURN_IF_ERROR(RemoveDuplicateExecutors(seen_executors, &subgraph));
|
||||
MP_RETURN_IF_ERROR(PrefixNames(node_name, &subgraph));
|
||||
MP_RETURN_IF_ERROR(ConnectSubgraphStreams(node, &subgraph));
|
||||
subgraphs.push_back(subgraph);
|
||||
|
@ -379,9 +319,6 @@ absl::Status ExpandSubgraphs(CalculatorGraphConfig* config,
|
|||
subgraph.status_handler().end(),
|
||||
proto_ns::RepeatedPtrFieldBackInserter(
|
||||
config->mutable_status_handler()));
|
||||
std::copy(
|
||||
subgraph.executor().begin(), subgraph.executor().end(),
|
||||
proto_ns::RepeatedPtrFieldBackInserter(config->mutable_executor()));
|
||||
}
|
||||
}
|
||||
return absl::OkStatus();
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
|
||||
#include <memory>
|
||||
|
||||
#include "absl/status/statusor.h"
|
||||
#include "absl/strings/str_cat.h"
|
||||
#include "mediapipe/framework/calculator.pb.h"
|
||||
#include "mediapipe/framework/calculator_framework.h"
|
||||
|
@ -772,7 +771,9 @@ class InternalExecutorSubgraph : public Subgraph {
|
|||
};
|
||||
REGISTER_MEDIAPIPE_GRAPH(InternalExecutorSubgraph);
|
||||
|
||||
TEST(SubgraphExpansionTest, SubgraphExecutorWorks) {
|
||||
// This test confirms that none of existing subgraphs can actually create an
|
||||
// executor when used as subgraphs and not like a final graph.
|
||||
TEST(SubgraphExpansionTest, SubgraphExecutorIsIgnored) {
|
||||
CalculatorGraphConfig supergraph =
|
||||
mediapipe::ParseTextProtoOrDie<CalculatorGraphConfig>(R"pb(
|
||||
input_stream: "input"
|
||||
|
@ -784,27 +785,23 @@ TEST(SubgraphExpansionTest, SubgraphExecutorWorks) {
|
|||
)pb");
|
||||
CalculatorGraphConfig expected_graph =
|
||||
mediapipe::ParseTextProtoOrDie<CalculatorGraphConfig>(R"pb(
|
||||
input_stream: "input"
|
||||
node {
|
||||
name: "internalexecutorsubgraph__PassThroughCalculator"
|
||||
calculator: "PassThroughCalculator"
|
||||
input_stream: "input"
|
||||
output_stream: "output"
|
||||
executor: "internalexecutorsubgraph__xyz"
|
||||
}
|
||||
input_stream: "input"
|
||||
executor {
|
||||
name: "internalexecutorsubgraph__xyz"
|
||||
type: "ThreadPoolExecutor"
|
||||
options {
|
||||
[mediapipe.ThreadPoolExecutorOptions.ext] { num_threads: 1 }
|
||||
}
|
||||
executor: "xyz"
|
||||
}
|
||||
)pb");
|
||||
MP_EXPECT_OK(tool::ExpandSubgraphs(&supergraph));
|
||||
EXPECT_THAT(supergraph, mediapipe::EqualsProto(expected_graph));
|
||||
|
||||
CalculatorGraph calculator_graph;
|
||||
MP_EXPECT_OK(calculator_graph.Initialize(supergraph));
|
||||
EXPECT_THAT(calculator_graph.Initialize(supergraph),
|
||||
StatusIs(absl::StatusCode::kInvalidArgument,
|
||||
HasSubstr("The executor \"xyz\" is "
|
||||
"not declared in an ExecutorConfig.")));
|
||||
}
|
||||
|
||||
class NestedInternalExecutorsSubgraph : public Subgraph {
|
||||
|
@ -850,7 +847,7 @@ class NestedInternalExecutorsSubgraph : public Subgraph {
|
|||
};
|
||||
REGISTER_MEDIAPIPE_GRAPH(NestedInternalExecutorsSubgraph);
|
||||
|
||||
TEST(SubgraphExpansionTest, NestedSubgraphExecutorsWork) {
|
||||
TEST(SubgraphExpansionTest, NestedSubgraphExecutorsAreIgnored) {
|
||||
CalculatorGraphConfig supergraph =
|
||||
mediapipe::ParseTextProtoOrDie<CalculatorGraphConfig>(R"pb(
|
||||
input_stream: "input"
|
||||
|
@ -867,55 +864,35 @@ TEST(SubgraphExpansionTest, NestedSubgraphExecutorsWork) {
|
|||
calculator: "PassThroughCalculator"
|
||||
input_stream: "nestedinternalexecutorssubgraph__bar_0"
|
||||
output_stream: "nestedinternalexecutorssubgraph__bar_1"
|
||||
executor: "nestedinternalexecutorssubgraph__xyz"
|
||||
executor: "xyz"
|
||||
}
|
||||
node {
|
||||
name: "nestedinternalexecutorssubgraph__PassThroughCalculator_2"
|
||||
calculator: "PassThroughCalculator"
|
||||
input_stream: "nestedinternalexecutorssubgraph__bar_1"
|
||||
output_stream: "output"
|
||||
executor: "nestedinternalexecutorssubgraph__abc"
|
||||
executor: "abc"
|
||||
}
|
||||
node {
|
||||
name: "nestedinternalexecutorssubgraph__internalexecutorsubgraph__PassThroughCalculator"
|
||||
calculator: "PassThroughCalculator"
|
||||
input_stream: "input"
|
||||
output_stream: "nestedinternalexecutorssubgraph__bar_0"
|
||||
executor: "nestedinternalexecutorssubgraph__internalexecutorsubgraph__xyz"
|
||||
executor: "xyz"
|
||||
}
|
||||
input_stream: "input"
|
||||
executor {
|
||||
name: "nestedinternalexecutorssubgraph__xyz"
|
||||
type: "ThreadPoolExecutor"
|
||||
options {
|
||||
[mediapipe.ThreadPoolExecutorOptions.ext] { num_threads: 1 }
|
||||
}
|
||||
}
|
||||
executor {
|
||||
name: "nestedinternalexecutorssubgraph__abc"
|
||||
type: "ThreadPoolExecutor"
|
||||
options {
|
||||
[mediapipe.ThreadPoolExecutorOptions.ext] { num_threads: 1 }
|
||||
}
|
||||
}
|
||||
executor {
|
||||
name: "nestedinternalexecutorssubgraph__internalexecutorsubgraph__xyz"
|
||||
type: "ThreadPoolExecutor"
|
||||
options {
|
||||
[mediapipe.ThreadPoolExecutorOptions.ext] { num_threads: 1 }
|
||||
}
|
||||
}
|
||||
)pb");
|
||||
MP_EXPECT_OK(tool::ExpandSubgraphs(&supergraph));
|
||||
EXPECT_THAT(supergraph, mediapipe::EqualsProto(expected_graph));
|
||||
|
||||
CalculatorGraph calculator_graph;
|
||||
MP_EXPECT_OK(calculator_graph.Initialize(supergraph));
|
||||
EXPECT_THAT(calculator_graph.Initialize(supergraph),
|
||||
StatusIs(absl::StatusCode::kInvalidArgument,
|
||||
HasSubstr("The executor \"xyz\" is "
|
||||
"not declared in an ExecutorConfig.")));
|
||||
}
|
||||
|
||||
// For backward compatibility.
|
||||
TEST(SubgraphExpansionTest,
|
||||
TopLevelGraphExecutorsCauseSameNamedSubgraphExecutorsToBeRemoved) {
|
||||
TEST(SubgraphExpansionTest, GraphExecutorsSubstituteSubgraphExecutors) {
|
||||
CalculatorGraphConfig supergraph =
|
||||
mediapipe::ParseTextProtoOrDie<CalculatorGraphConfig>(R"pb(
|
||||
input_stream: "input"
|
||||
|
|
Loading…
Reference in New Issue
Block a user