新增 ola_graph callback Framework的实现

This commit is contained in:
Wang.Renzhu 2022-07-20 11:32:16 +08:00
parent 64ed559d94
commit b3bfcb1a3d
17 changed files with 576 additions and 450 deletions

50
.vscode/settings.json vendored
View File

@ -1,6 +1,54 @@
{
"files.associations": {
"chrono": "cpp",
"ios": "cpp"
"ios": "cpp",
"olagraph.hpp": "c",
"face_mesh_module.h": "c",
"array": "cpp",
"atomic": "cpp",
"bit": "cpp",
"*.tcc": "cpp",
"cctype": "cpp",
"clocale": "cpp",
"cmath": "cpp",
"compare": "cpp",
"concepts": "cpp",
"cstddef": "cpp",
"cstdint": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"cwchar": "cpp",
"cwctype": "cpp",
"deque": "cpp",
"forward_list": "cpp",
"list": "cpp",
"map": "cpp",
"set": "cpp",
"unordered_map": "cpp",
"unordered_set": "cpp",
"vector": "cpp",
"exception": "cpp",
"functional": "cpp",
"initializer_list": "cpp",
"iosfwd": "cpp",
"istream": "cpp",
"limits": "cpp",
"memory": "cpp",
"new": "cpp",
"numbers": "cpp",
"ostream": "cpp",
"sstream": "cpp",
"stdexcept": "cpp",
"streambuf": "cpp",
"string": "cpp",
"string_view": "cpp",
"system_error": "cpp",
"type_traits": "cpp",
"tuple": "cpp",
"typeindex": "cpp",
"typeinfo": "cpp",
"utility": "cpp",
"variant": "cpp"
}
}

View File

@ -421,5 +421,5 @@ cc_crosstool(name = "crosstool")
# android_ndk_repository(name = "androidndk", api_level=21, path = "/Users/wangrenzhu/Library/Android/sdk/ndk/21.2.6472646")
# android_sdk_repository(name = "androidsdk", path = "/Users/wangrenzhu/Android/sdk")
# android_ndk_repository(name = "androidndk", api_level=21, path = "/Users/wangrenzhu/Android/sdk/ndk/android-ndk-r21")
android_sdk_repository(name = "androidsdk")
android_ndk_repository(name = "androidndk", api_level=22)
# android_sdk_repository(name = "androidsdk")
# android_ndk_repository(name = "androidndk", api_level=22)

View File

@ -417,7 +417,9 @@ libedgetpu_dependencies()
load("@coral_crosstool//:configure.bzl", "cc_crosstool")
cc_crosstool(name = "crosstool")
# android_sdk_repository(name = "androidsdk", path = "/Users/wangrenzhu/Library/Android/sdk")
# android_ndk_repository(name = "androidndk", api_level=21, path = "/Users/wangrenzhu/Library/Android/sdk/ndk/21.2.6472646")
android_sdk_repository(name = "androidsdk", path = "/Users/wangrenzhu/Library/Android/sdk")
android_ndk_repository(name = "androidndk", api_level=21, path = "/Users/wangrenzhu/Library/Android/sdk/ndk/21.2.6472646")
# android_sdk_repository(name = "androidsdk", path = "/Users/wangrenzhu/Android/sdk")
# android_ndk_repository(name = "androidndk", api_level=21, path = "/Users/wangrenzhu/Android/sdk/ndk/android-ndk-r21")
# android_sdk_repository(name = "androidsdk", path = "/Users/wangrenzhu/Android/sdk")
# android_ndk_repository(name = "androidndk", api_level=21, path = "/Users/wangrenzhu/Android/sdk/ndk/android-ndk-r21")

View File

@ -1,11 +1,18 @@
package(default_visibility = ["//visibility:public"])
cc_library(
name = "FaceMeshGPULibrary",
copts = ["-std=c++17"],
srcs = [
"face_mesh_module.h",
"face_mesh_module.cc",
"face_mesh_beauty_render.cc",
"face_mesh_module_imp.cc",
],
hdrs = ["face_mesh_module.cpp"],
hdrs = [
"face_mesh_module.h",
"face_mesh_beauty_render.h",
"face_mesh_module_imp.h",
],
data = [
"//mediapipe/graphs/face_mesh:face_mesh_mobile_gpu.binarypb",
"//mediapipe/modules/face_detection:face_detection_short_range.tflite",
@ -23,4 +30,11 @@ cc_library(
"//mediapipe/framework/formats:landmark_cc_proto",
],
}),
copts = select({
"//mediapipe:apple": [
"-x objective-c++",
"-fobjc-arc", # enable reference-counting
],
"//conditions:default": [],
}),
)

View File

@ -1,4 +1,5 @@
#include "face_mesh_module.h"
#include "face_mesh_module_imp.h"
namespace Opipe {
FaceMeshModule::FaceMeshModule() {

View File

@ -1,4 +1,4 @@
#include "OlaGraph.hpp"
#include <stdio.h>
namespace Opipe {
class FaceMeshModule {

View File

@ -1,5 +1,5 @@
#include "mediapipe/render/module/common/OlaGraph.hpp"
#include "face_mesh_module.hpp"
#include "mediapipe/render/module/beauty/face_mesh_module.h"
namespace Opipe {
class FaceMeshModuleIMP : public FaceMeshModule {

View File

@ -2,8 +2,8 @@ load("//mediapipe/framework/tool:mediapipe_graph.bzl", "mediapipe_binary_graph")
package(default_visibility = ["//visibility:private"])
cc_library(
name = "olamodule_common_library",
srcs = ["OlaGraph.cpp"],
hdrs = ["OlaGraph.hpp"],
srcs = ["ola_graph.cc"],
hdrs = ["ola_graph.h"],
visibility = ["//mediapipe/framework:mediapipe_internal"],
deps = [
"//mediapipe/framework:calculator_framework",

View File

@ -1,246 +0,0 @@
#include <atomic>
#include "OlaGraph.hpp"
#include "absl/memory/memory.h"
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/formats/image.h"
#include "mediapipe/framework/formats/image_frame.h"
#include "mediapipe/framework/graph_service.h"
#include "mediapipe/gpu/gl_base.h"
#include "mediapipe/gpu/gpu_shared_data_internal.h"
#if defined(__APPLE__)
#include "mediapipe/gpu/MPPGraphGPUData.h"
#include "mediapipe/objc/util.h"
#endif
namespace Opipe {
void CallFrameDelegate(void* wrapperVoid, const std::string& streamName,
MPPPacketType packetType,
const mediapipe::Packet& packet) {
}
OlaGraph::OlaGraph(const mediapipe::CalculatorGraphConfig &config) {
_config = config;
_graph = absl::make_unique<mediapipe::CalculatorGraph>();
}
OlaGraph::~OlaGraph() {
}
mediapipe::ProfilingContext* OlaGraph::getProfiler() {
return _graph->profiler();
}
void OlaGraph::setHeaderPacket(const mediapipe::Packet &packet, std::string streamName) {
_streamHeaders[streamName] = packet;
}
void OlaGraph::setSidePacket(const mediapipe::Packet &packet, std::string name) {
_inputSidePackets[name] = packet;
}
void OlaGraph::setServicePacket(mediapipe::Packet &packet,const mediapipe::GraphServiceBase &service) {
_servicePackets[&service] = std::move(packet);
}
void OlaGraph::addSidePackets(const std::map<std::string, mediapipe::Packet> &extraSidePackets) {
_inputSidePackets.insert(extraSidePackets.begin(), extraSidePackets.end());
}
void OlaGraph::addFrameOutputStream(const std::string &outputStreamName,
MPPPacketType packetType) {
std::string callbackInputName;
mediapipe::tool::AddCallbackCalculator(outputStreamName, &_config, &callbackInputName,
/*use_std_function=*/true);
// No matter what ownership qualifiers are put on the pointer, NewPermanentCallback will
// still end up with a strong pointer to MPPGraph*. That is why we use void* instead.
void* wrapperVoid = this;
_inputSidePackets[callbackInputName] =
mediapipe::MakePacket<std::function<void(const mediapipe::Packet&)>>( [wrapperVoid, outputStreamName, packetType]
(const mediapipe::Packet& packet) {
CallFrameDelegate(wrapperVoid, outputStreamName, packetType, packet);
});
}
bool OlaGraph::start() {
absl::Status status = performStart();
_started = true;
return false;
}
absl::Status OlaGraph::performStart() {
absl::Status status = _graph->Initialize(_config);
if (!status.ok()) {
return status;
}
for (const auto& service_packet : _servicePackets) {
status = _graph->SetServicePacket(*service_packet.first, service_packet.second);
if (!status.ok()) {
return status;
}
}
status = _graph->StartRun(_inputSidePackets, _streamHeaders);
if (!status.ok()) {
return status;
}
return status;
}
bool OlaGraph::sendPacket(const mediapipe::Packet &packet,
const std::string &streamName) {
absl::Status status = _graph->AddPacketToInputStream(streamName, packet);
return status.ok();
}
bool OlaGraph::movePacket(mediapipe::Packet &&packet, const std::string &streamName) {
absl::Status status = _graph->AddPacketToInputStream(streamName, std::move(packet));
return status.ok();
}
/// Sets the maximum queue size for a stream. Experimental feature, currently
/// only supported for graph input streams. Should be called before starting the
/// graph.
bool OlaGraph::setMaxQueueSize(int maxQueueSize,
const std::string &streamName) {
absl::Status status = _graph->SetInputStreamMaxQueueSize(streamName, maxQueueSize);
return status.ok();
}
#if defined(__APPLE__)
/// Creates a MediaPipe packet wrapping the given pixelBuffer;
mediapipe::Packet OlaGraph::packetWithPixelBuffer(CVPixelBufferRef pixelBuffer,
MPPPacketType packetType) {
mediapipe::Packet packet;
if (packetType == MPPPacketTypeImageFrame || packetType == MPPPacketTypeImageFrameBGRANoSwap) {
auto frame = CreateImageFrameForCVPixelBuffer(
pixelBuffer, /* canOverwrite = */ false,
/* bgrAsRgb = */ packetType == MPPPacketTypeImageFrameBGRANoSwap);
packet = mediapipe::Adopt(frame.release());
#if MEDIAPIPE_GPU_BUFFER_USE_CV_PIXEL_BUFFER
} else if (packetType == MPPPacketTypePixelBuffer) {
packet = mediapipe::MakePacket<mediapipe::GpuBuffer>(pixelBuffer);
#endif // MEDIAPIPE_GPU_BUFFER_USE_CV_PIXEL_BUFFER
} else if (packetType == MPPPacketTypeImage) {
#if MEDIAPIPE_GPU_BUFFER_USE_CV_PIXEL_BUFFER
// GPU
packet = mediapipe::MakePacket<mediapipe::Image>(pixelBuffer);
#else
// CPU
auto frame = CreateImageFrameForCVPixelBuffer(imageBuffer, /* canOverwrite = */ false,
/* bgrAsRgb = */ false);
packet = mediapipe::MakePacket<mediapipe::Image>(std::move(frame));
#endif // MEDIAPIPE_GPU_BUFFER_USE_CV_PIXEL_BUFFER
} else {
assert(false);
}
return packet;
}
/// Creates a MediaPipe packet of type Image, wrapping the given CVPixelBufferRef.
mediapipe::Packet OlaGraph::imagePacketWithPixelBuffer(CVPixelBufferRef pixelBuffer) {
return packetWithPixelBuffer(pixelBuffer, MPPPacketTypeImage);
}
/// Sends a pixel buffer into a graph input stream, using the specified packet
/// type. The graph must have been started before calling this. Drops frames and
/// returns NO if maxFramesInFlight is exceeded. If allowOverwrite is set to YES,
/// allows MediaPipe to overwrite the packet contents on successful sending for
/// possibly increased efficiency. Returns YES if the packet was successfully sent.
bool OlaGraph::sendPixelBuffer(CVPixelBufferRef imageBuffer,
const std::string & inputName,
MPPPacketType packetType,
const mediapipe::Timestamp &timestamp,
bool allowOverwrite) {
if (_maxFramesInFlight && _framesInFlight >= _maxFramesInFlight) return false;
mediapipe::Packet packet = packetWithPixelBuffer(imageBuffer, packetType);
bool success;
if (allowOverwrite) {
packet = std::move(packet).At(timestamp);
success = movePacket(std::move(packet), inputName);
} else {
success = sendPacket(packet.At(timestamp), inputName);
}
if (success) _framesInFlight++;
return success;
}
/// Sends a pixel buffer into a graph input stream, using the specified packet
/// type. The graph must have been started before calling this. Drops frames and
/// returns NO if maxFramesInFlight is exceeded. Returns YES if the packet was
/// successfully sent.
bool OlaGraph::sendPixelBuffer(CVPixelBufferRef imageBuffer,
const std::string & inputName,
MPPPacketType packetType,
const mediapipe::Timestamp &timestamp) {
return sendPixelBuffer(imageBuffer, inputName, packetType, timestamp, false);
}
/// Sends a pixel buffer into a graph input stream, using the specified packet
/// type. The graph must have been started before calling this. The timestamp is
/// automatically incremented from the last timestamp used by this method. Drops
/// frames and returns NO if maxFramesInFlight is exceeded. Returns YES if the
/// packet was successfully sent.
bool OlaGraph::sendPixelBuffer(CVPixelBufferRef imageBuffer,
const std::string & inputName,
MPPPacketType packetType) {
if (_frameTimestamp < mediapipe::Timestamp::Min()) {
_frameTimestamp = mediapipe::Timestamp::Min();
} else {
_frameTimestamp++;
}
return sendPixelBuffer(imageBuffer, inputName, packetType, _frameTimestamp);
}
#endif
/// Cancels a graph run. You must still call waitUntilDoneWithError: after this.
void OlaGraph::cancel() {
_graph->Cancel();
}
/// Check if the graph contains this input stream
bool OlaGraph::hasInputStream(const std::string &inputName) {
return _graph->HasInputStream(inputName);
}
/// Closes an input stream.
/// You must close all graph input streams before stopping the graph.
/// @return YES if successful.
bool OlaGraph::closeInputStream(const std::string &inputName) {
absl::Status status = _graph->CloseInputStream(inputName);
return status.ok();
}
/// Closes all graph input streams.
/// @return YES if successful.
bool OlaGraph::closeAllInputStreams() {
absl::Status status = _graph->CloseAllInputStreams();
return status.ok();
}
/// Stops running the graph.
/// Call this before releasing this object. All input streams must have been
/// closed. This call does not time out, so you should not call it from the main
/// thread.
/// @return YES if successful.
bool OlaGraph::waitUntilDone() {
absl::Status status = _graph->WaitUntilDone();
_started = false;
return status.ok();
}
/// Waits for the graph to become idle.
bool OlaGraph::waitUntilIdle() {
absl::Status status = _graph->WaitUntilIdle();
return status.ok();
}
}

View File

@ -1,192 +0,0 @@
#include "absl/memory/memory.h"
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/calculator_graph.h"
#include "mediapipe/framework/formats/image.h"
#include "mediapipe/framework/formats/image_frame.h"
#include "mediapipe/framework/graph_service.h"
#include "mediapipe/gpu/gl_base.h"
#include "mediapipe/gpu/gpu_shared_data_internal.h"
#ifdef __OBJC__
#import <AVFoundation/AVFoundation.h>
#endif // __OBJC__
namespace Opipe {
enum MPPPacketType {
MPPPacketTypeRaw,
MPPPacketTypeImage,
MPPPacketTypeImageFrame,
MPPPacketTypeImageFrameBGRANoSwap,
#if defined(__APPLE__)
MPPPacketTypePixelBuffer,
#endif
};
class OlaGraph;
class MPPGraphDelegate {
public:
#if defined(__APPLE__)
virtual void outputPixelbuffer(OlaGraph *graph, CVPixelBufferRef pixelbuffer,
const std::string &streamName);
virtual void outputPixelbuffer(OlaGraph *graph, CVPixelBufferRef pixelbuffer,
const std::string &streamName,
mediapipe::Timestamp &timstamp);
#endif
virtual void outputPacket(OlaGraph *graph, mediapipe::Packet &packet,
const std::string &streamName);
};
class OlaGraph {
public:
OlaGraph(const mediapipe::CalculatorGraphConfig &config);
~OlaGraph();
absl::Status AddCallbackHandler(std::string output_stream_name,
void *callback);
absl::Status AddMultiStreamCallbackHandler(std::vector<std::string> output_stream_names,
void *callback,
bool observe_timestamp_bounds);
void setDelegate(std::shared_ptr<MPPGraphDelegate> delegate) {
_delegate = delegate;
}
mediapipe::ProfilingContext* getProfiler();
int maxFramesInFlight;
mediapipe::CalculatorGraph::GraphInputStreamAddMode packetAddMode() {
return _graph->GetGraphInputStreamAddMode();
}
void setPacketAddMode(mediapipe::CalculatorGraph::GraphInputStreamAddMode mode) {
_graph->SetGraphInputStreamAddMode(mode);
};
virtual void setHeaderPacket(const mediapipe::Packet &packet, std::string streamName);
virtual void setSidePacket(const mediapipe::Packet &packet, std::string name);
virtual void setServicePacket(mediapipe::Packet &packet,const mediapipe::GraphServiceBase &service);
virtual void addSidePackets(const std::map<std::string, mediapipe::Packet> &extraSidePackets);
virtual void addFrameOutputStream(const std::string &outputStreamName,
MPPPacketType packetType);
virtual bool start();
virtual bool sendPacket(const mediapipe::Packet &packet,
const std::string &streamName);
virtual bool movePacket(mediapipe::Packet &&packet, const std::string &streamName);
/// Sets the maximum queue size for a stream. Experimental feature, currently
/// only supported for graph input streams. Should be called before starting the
/// graph.
virtual bool setMaxQueueSize(int maxQueueSize,
const std::string &streamName);
#if defined(__APPLE__)
/// Creates a MediaPipe packet wrapping the given pixelBuffer;
mediapipe::Packet packetWithPixelBuffer(CVPixelBufferRef pixelBuffer,
MPPPacketType packetType);
/// Creates a MediaPipe packet of type Image, wrapping the given CVPixelBufferRef.
mediapipe::Packet imagePacketWithPixelBuffer(CVPixelBufferRef pixelBuffer);
/// Sends a pixel buffer into a graph input stream, using the specified packet
/// type. The graph must have been started before calling this. Drops frames and
/// returns NO if maxFramesInFlight is exceeded. If allowOverwrite is set to YES,
/// allows MediaPipe to overwrite the packet contents on successful sending for
/// possibly increased efficiency. Returns YES if the packet was successfully sent.
bool sendPixelBuffer(CVPixelBufferRef imageBuffer,
const std::string & inputName,
MPPPacketType packetType,
const mediapipe::Timestamp &timestamp,
bool allowOverwrite);
/// Sends a pixel buffer into a graph input stream, using the specified packet
/// type. The graph must have been started before calling this. Drops frames and
/// returns NO if maxFramesInFlight is exceeded. Returns YES if the packet was
/// successfully sent.
bool sendPixelBuffer(CVPixelBufferRef imageBuffer,
const std::string & inputName,
MPPPacketType packetType,
const mediapipe::Timestamp &timestamp);
/// Sends a pixel buffer into a graph input stream, using the specified packet
/// type. The graph must have been started before calling this. The timestamp is
/// automatically incremented from the last timestamp used by this method. Drops
/// frames and returns NO if maxFramesInFlight is exceeded. Returns YES if the
/// packet was successfully sent.
bool sendPixelBuffer(CVPixelBufferRef imageBuffer,
const std::string & inputName,
MPPPacketType packetType);
#endif
/// Cancels a graph run. You must still call waitUntilDoneWithError: after this.
void cancel();
/// Check if the graph contains this input stream
bool hasInputStream(const std::string &inputName);
/// Closes an input stream.
/// You must close all graph input streams before stopping the graph.
/// @return YES if successful.
bool closeInputStream(const std::string &inputName);
/// Closes all graph input streams.
/// @return YES if successful.
bool closeAllInputStreams();
/// Stops running the graph.
/// Call this before releasing this object. All input streams must have been
/// closed. This call does not time out, so you should not call it from the main
/// thread.
/// @return YES if successful.
bool waitUntilDone();
/// Waits for the graph to become idle.
bool waitUntilIdle();
private:
std::unique_ptr<mediapipe::CalculatorGraph> _graph;
mediapipe::CalculatorGraphConfig _config;
/// Input side packets that will be added to the graph when it is started.
std::map<std::string, mediapipe::Packet> _inputSidePackets;
/// Packet headers that will be added to the graph when it is started.
std::map<std::string, mediapipe::Packet> _streamHeaders;
/// Service packets to be added to the graph when it is started.
std::map<const mediapipe::GraphServiceBase*, mediapipe::Packet> _servicePackets;
/// Number of frames currently being processed by the graph.
std::atomic<int32_t> _framesInFlight;
mediapipe::Timestamp _frameTimestamp;
int64 _frameNumber;
bool _started;
std::weak_ptr<MPPGraphDelegate> _delegate;
absl::Status performStart();
int _maxFramesInFlight = 0;
};
}

View File

@ -0,0 +1,309 @@
#include <atomic>
#include "ola_graph.h"
#include "absl/memory/memory.h"
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/formats/image.h"
#include "mediapipe/framework/formats/image_frame.h"
#include "mediapipe/framework/graph_service.h"
#include "mediapipe/gpu/gl_base.h"
#include "mediapipe/gpu/gpu_shared_data_internal.h"
#if defined(__APPLE__)
#include "mediapipe/gpu/MPPGraphGPUData.h"
#include "mediapipe/objc/util.h"
#endif
using namespace mediapipe;
namespace Opipe
{
void CallFrameDelegate(void *wrapperVoid, const std::string &streamName, MPPPacketType packetType,
const mediapipe::Packet &packet)
{
OlaGraph *graph = (OlaGraph *)wrapperVoid;
if (graph->_delegate.expired())
{
return;
}
if (packetType == MPPPacketTypeRaw)
{
graph->_delegate.lock()->outputPacket(graph, packet, packetType, streamName);
}
#if defined(__APPLE__)
else if (packetType == MPPPacketTypePixelBuffer ||
packetType == MPPPacketTypeImage)
{
graph->_framesInFlight--;
CVPixelBufferRef pixelBuffer;
if (packetType == MPPPacketTypePixelBuffer)
pixelBuffer = mediapipe::GetCVPixelBufferRef(packet.Get<mediapipe::GpuBuffer>());
else
pixelBuffer = packet.Get<mediapipe::Image>().GetCVPixelBufferRef();
graph->_delegate.lock()->outputPixelbuffer(graph, pixelBuffer, streamName, packet.Timestamp().Value());
#endif
}
else
{
}
}
OlaGraph::OlaGraph(const mediapipe::CalculatorGraphConfig &config)
{
_config = config;
_graph = absl::make_unique<mediapipe::CalculatorGraph>();
}
OlaGraph::~OlaGraph()
{
}
mediapipe::ProfilingContext *OlaGraph::getProfiler()
{
return _graph->profiler();
}
void OlaGraph::setHeaderPacket(const mediapipe::Packet &packet, std::string streamName)
{
_streamHeaders[streamName] = packet;
}
void OlaGraph::setSidePacket(const mediapipe::Packet &packet, std::string name)
{
_inputSidePackets[name] = packet;
}
void OlaGraph::setServicePacket(mediapipe::Packet &packet, const mediapipe::GraphServiceBase &service)
{
_servicePackets[&service] = std::move(packet);
}
void OlaGraph::addSidePackets(const std::map<std::string, mediapipe::Packet> &extraSidePackets)
{
_inputSidePackets.insert(extraSidePackets.begin(), extraSidePackets.end());
}
void OlaGraph::addFrameOutputStream(const std::string &outputStreamName,
MPPPacketType packetType)
{
std::string callbackInputName;
mediapipe::tool::AddCallbackCalculator(outputStreamName, &_config, &callbackInputName,
/*use_std_function=*/true);
// No matter what ownership qualifiers are put on the pointer, NewPermanentCallback will
// still end up with a strong pointer to MPPGraph*. That is why we use void* instead.
void *wrapperVoid = this;
_inputSidePackets[callbackInputName] =
mediapipe::MakePacket<std::function<void(const mediapipe::Packet &)>>([wrapperVoid, outputStreamName, packetType](const mediapipe::Packet &packet)
{ CallFrameDelegate(wrapperVoid, outputStreamName, packetType, packet); });
}
bool OlaGraph::start()
{
absl::Status status = performStart();
_started = true;
return false;
}
absl::Status OlaGraph::performStart()
{
absl::Status status = _graph->Initialize(_config);
if (!status.ok())
{
return status;
}
for (const auto &service_packet : _servicePackets)
{
status = _graph->SetServicePacket(*service_packet.first, service_packet.second);
if (!status.ok())
{
return status;
}
}
status = _graph->StartRun(_inputSidePackets, _streamHeaders);
if (!status.ok())
{
return status;
}
return status;
}
bool OlaGraph::sendPacket(const mediapipe::Packet &packet,
const std::string &streamName)
{
absl::Status status = _graph->AddPacketToInputStream(streamName, packet);
return status.ok();
}
bool OlaGraph::movePacket(mediapipe::Packet &&packet, const std::string &streamName)
{
absl::Status status = _graph->AddPacketToInputStream(streamName, std::move(packet));
return status.ok();
}
/// Sets the maximum queue size for a stream. Experimental feature, currently
/// only supported for graph input streams. Should be called before starting the
/// graph.
bool OlaGraph::setMaxQueueSize(int maxQueueSize,
const std::string &streamName)
{
absl::Status status = _graph->SetInputStreamMaxQueueSize(streamName, maxQueueSize);
return status.ok();
}
#if defined(__APPLE__)
/// Creates a MediaPipe packet wrapping the given pixelBuffer;
mediapipe::Packet OlaGraph::packetWithPixelBuffer(CVPixelBufferRef pixelBuffer,
MPPPacketType packetType)
{
mediapipe::Packet packet;
if (packetType == MPPPacketTypeImageFrame || packetType == MPPPacketTypeImageFrameBGRANoSwap)
{
auto frame = CreateImageFrameForCVPixelBuffer(
pixelBuffer, /* canOverwrite = */ false,
/* bgrAsRgb = */ packetType == MPPPacketTypeImageFrameBGRANoSwap);
packet = mediapipe::Adopt(frame.release());
#if MEDIAPIPE_GPU_BUFFER_USE_CV_PIXEL_BUFFER
}
else if (packetType == MPPPacketTypePixelBuffer)
{
packet = mediapipe::MakePacket<mediapipe::GpuBuffer>(pixelBuffer);
#endif // MEDIAPIPE_GPU_BUFFER_USE_CV_PIXEL_BUFFER
}
else if (packetType == MPPPacketTypeImage)
{
#if MEDIAPIPE_GPU_BUFFER_USE_CV_PIXEL_BUFFER
// GPU
packet = mediapipe::MakePacket<mediapipe::Image>(pixelBuffer);
#else
// CPU
auto frame = CreateImageFrameForCVPixelBuffer(imageBuffer, /* canOverwrite = */ false,
/* bgrAsRgb = */ false);
packet = mediapipe::MakePacket<mediapipe::Image>(std::move(frame));
#endif // MEDIAPIPE_GPU_BUFFER_USE_CV_PIXEL_BUFFER
}
else
{
assert(false);
}
return packet;
}
/// Creates a MediaPipe packet of type Image, wrapping the given CVPixelBufferRef.
mediapipe::Packet OlaGraph::imagePacketWithPixelBuffer(CVPixelBufferRef pixelBuffer)
{
return packetWithPixelBuffer(pixelBuffer, MPPPacketTypeImage);
}
/// Sends a pixel buffer into a graph input stream, using the specified packet
/// type. The graph must have been started before calling this. Drops frames and
/// returns NO if maxFramesInFlight is exceeded. If allowOverwrite is set to YES,
/// allows MediaPipe to overwrite the packet contents on successful sending for
/// possibly increased efficiency. Returns YES if the packet was successfully sent.
bool OlaGraph::sendPixelBuffer(CVPixelBufferRef imageBuffer,
const std::string &inputName,
MPPPacketType packetType,
const mediapipe::Timestamp &timestamp,
bool allowOverwrite)
{
if (_maxFramesInFlight && _framesInFlight >= _maxFramesInFlight)
return false;
mediapipe::Packet packet = packetWithPixelBuffer(imageBuffer, packetType);
bool success;
if (allowOverwrite)
{
packet = std::move(packet).At(timestamp);
success = movePacket(std::move(packet), inputName);
}
else
{
success = sendPacket(packet.At(timestamp), inputName);
}
if (success)
_framesInFlight++;
return success;
}
/// Sends a pixel buffer into a graph input stream, using the specified packet
/// type. The graph must have been started before calling this. Drops frames and
/// returns NO if maxFramesInFlight is exceeded. Returns YES if the packet was
/// successfully sent.
bool OlaGraph::sendPixelBuffer(CVPixelBufferRef imageBuffer,
const std::string &inputName,
MPPPacketType packetType,
const mediapipe::Timestamp &timestamp)
{
return sendPixelBuffer(imageBuffer, inputName, packetType, timestamp, false);
}
/// Sends a pixel buffer into a graph input stream, using the specified packet
/// type. The graph must have been started before calling this. The timestamp is
/// automatically incremented from the last timestamp used by this method. Drops
/// frames and returns NO if maxFramesInFlight is exceeded. Returns YES if the
/// packet was successfully sent.
bool OlaGraph::sendPixelBuffer(CVPixelBufferRef imageBuffer,
const std::string &inputName,
MPPPacketType packetType)
{
if (_frameTimestamp < mediapipe::Timestamp::Min())
{
_frameTimestamp = mediapipe::Timestamp::Min();
}
else
{
_frameTimestamp++;
}
return sendPixelBuffer(imageBuffer, inputName, packetType, _frameTimestamp);
}
#endif
/// Cancels a graph run. You must still call waitUntilDoneWithError: after this.
void OlaGraph::cancel()
{
_graph->Cancel();
}
/// Check if the graph contains this input stream
bool OlaGraph::hasInputStream(const std::string &inputName)
{
return _graph->HasInputStream(inputName);
}
/// Closes an input stream.
/// You must close all graph input streams before stopping the graph.
/// @return YES if successful.
bool OlaGraph::closeInputStream(const std::string &inputName)
{
absl::Status status = _graph->CloseInputStream(inputName);
return status.ok();
}
/// Closes all graph input streams.
/// @return YES if successful.
bool OlaGraph::closeAllInputStreams()
{
absl::Status status = _graph->CloseAllInputStreams();
return status.ok();
}
/// Stops running the graph.
/// Call this before releasing this object. All input streams must have been
/// closed. This call does not time out, so you should not call it from the main
/// thread.
/// @return YES if successful.
bool OlaGraph::waitUntilDone()
{
absl::Status status = _graph->WaitUntilDone();
_started = false;
return status.ok();
}
/// Waits for the graph to become idle.
bool OlaGraph::waitUntilIdle()
{
absl::Status status = _graph->WaitUntilIdle();
return status.ok();
}
}

View File

@ -0,0 +1,190 @@
#include "absl/memory/memory.h"
#include "mediapipe/framework/calculator_framework.h"
#include "mediapipe/framework/calculator_graph.h"
#include "mediapipe/framework/formats/image.h"
#include "mediapipe/framework/formats/image_frame.h"
#include "mediapipe/framework/graph_service.h"
#include "mediapipe/gpu/gl_base.h"
#include "mediapipe/gpu/gpu_shared_data_internal.h"
#ifdef __OBJC__
#import <AVFoundation/AVFoundation.h>
#endif // __OBJC__
using namespace mediapipe;
namespace Opipe
{
enum MPPPacketType
{
MPPPacketTypeRaw,
MPPPacketTypeImage,
MPPPacketTypeImageFrame,
MPPPacketTypeImageFrameBGRANoSwap,
#if defined(__APPLE__)
MPPPacketTypePixelBuffer,
#endif
};
class OlaGraph;
class MPPGraphDelegate
{
public:
#if defined(__APPLE__)
virtual void outputPixelbuffer(OlaGraph *graph, CVPixelBufferRef pixelbuffer,
const std::string &streamName,
int64_t timstamp);
#endif
virtual void outputPacket(OlaGraph *graph,
const mediapipe::Packet &packet,
MPPPacketType packetType,
const std::string &streamName);
};
class OlaGraph
{
public:
OlaGraph(const mediapipe::CalculatorGraphConfig &config);
~OlaGraph();
absl::Status AddCallbackHandler(std::string output_stream_name,
void *callback);
absl::Status AddMultiStreamCallbackHandler(std::vector<std::string> output_stream_names,
void *callback,
bool observe_timestamp_bounds);
void setDelegate(std::shared_ptr<MPPGraphDelegate> delegate)
{
_delegate = delegate;
}
mediapipe::ProfilingContext *getProfiler();
int maxFramesInFlight;
mediapipe::CalculatorGraph::GraphInputStreamAddMode packetAddMode()
{
return _graph->GetGraphInputStreamAddMode();
}
void setPacketAddMode(mediapipe::CalculatorGraph::GraphInputStreamAddMode mode)
{
_graph->SetGraphInputStreamAddMode(mode);
};
virtual void setHeaderPacket(const mediapipe::Packet &packet, std::string streamName);
virtual void setSidePacket(const mediapipe::Packet &packet, std::string name);
virtual void setServicePacket(mediapipe::Packet &packet, const mediapipe::GraphServiceBase &service);
virtual void addSidePackets(const std::map<std::string, mediapipe::Packet> &extraSidePackets);
virtual void addFrameOutputStream(const std::string &outputStreamName,
MPPPacketType packetType);
virtual bool start();
virtual bool sendPacket(const mediapipe::Packet &packet,
const std::string &streamName);
virtual bool movePacket(mediapipe::Packet &&packet, const std::string &streamName);
/// Sets the maximum queue size for a stream. Experimental feature, currently
/// only supported for graph input streams. Should be called before starting the
/// graph.
virtual bool setMaxQueueSize(int maxQueueSize,
const std::string &streamName);
#if defined(__APPLE__)
/// Creates a MediaPipe packet wrapping the given pixelBuffer;
mediapipe::Packet packetWithPixelBuffer(CVPixelBufferRef pixelBuffer,
MPPPacketType packetType);
/// Creates a MediaPipe packet of type Image, wrapping the given CVPixelBufferRef.
mediapipe::Packet imagePacketWithPixelBuffer(CVPixelBufferRef pixelBuffer);
/// Sends a pixel buffer into a graph input stream, using the specified packet
/// type. The graph must have been started before calling this. Drops frames and
/// returns NO if maxFramesInFlight is exceeded. If allowOverwrite is set to YES,
/// allows MediaPipe to overwrite the packet contents on successful sending for
/// possibly increased efficiency. Returns YES if the packet was successfully sent.
bool sendPixelBuffer(CVPixelBufferRef imageBuffer,
const std::string &inputName,
MPPPacketType packetType,
const mediapipe::Timestamp &timestamp,
bool allowOverwrite);
/// Sends a pixel buffer into a graph input stream, using the specified packet
/// type. The graph must have been started before calling this. Drops frames and
/// returns NO if maxFramesInFlight is exceeded. Returns YES if the packet was
/// successfully sent.
bool sendPixelBuffer(CVPixelBufferRef imageBuffer,
const std::string &inputName,
MPPPacketType packetType,
const mediapipe::Timestamp &timestamp);
/// Sends a pixel buffer into a graph input stream, using the specified packet
/// type. The graph must have been started before calling this. The timestamp is
/// automatically incremented from the last timestamp used by this method. Drops
/// frames and returns NO if maxFramesInFlight is exceeded. Returns YES if the
/// packet was successfully sent.
bool sendPixelBuffer(CVPixelBufferRef imageBuffer,
const std::string &inputName,
MPPPacketType packetType);
#endif
/// Cancels a graph run. You must still call waitUntilDoneWithError: after this.
void cancel();
/// Check if the graph contains this input stream
bool hasInputStream(const std::string &inputName);
/// Closes an input stream.
/// You must close all graph input streams before stopping the graph.
/// @return YES if successful.
bool closeInputStream(const std::string &inputName);
/// Closes all graph input streams.
/// @return YES if successful.
bool closeAllInputStreams();
/// Stops running the graph.
/// Call this before releasing this object. All input streams must have been
/// closed. This call does not time out, so you should not call it from the main
/// thread.
/// @return YES if successful.
bool waitUntilDone();
/// Waits for the graph to become idle.
bool waitUntilIdle();
std::weak_ptr<MPPGraphDelegate> _delegate;
std::atomic<int32_t> _framesInFlight = 2;
private:
std::unique_ptr<mediapipe::CalculatorGraph> _graph;
mediapipe::CalculatorGraphConfig _config;
/// Input side packets that will be added to the graph when it is started.
std::map<std::string, mediapipe::Packet> _inputSidePackets;
/// Packet headers that will be added to the graph when it is started.
std::map<std::string, mediapipe::Packet> _streamHeaders;
/// Service packets to be added to the graph when it is started.
std::map<const mediapipe::GraphServiceBase *, mediapipe::Packet> _servicePackets;
/// Number of frames currently being processed by the graph.
mediapipe::Timestamp _frameTimestamp;
int64 _frameNumber;
bool _started;
absl::Status performStart();
int _maxFramesInFlight = 0;
};
}