Add Empty Packet support to GraphRunner

PiperOrigin-RevId: 506963970
This commit is contained in:
Sebastian Schmidt 2023-02-03 12:14:48 -08:00 committed by Copybara-Service
parent 632a3602dd
commit 046efddf8b
2 changed files with 75 additions and 58 deletions

View File

@ -15,6 +15,25 @@ export declare interface FileLocator {
locateFile: (filename: string) => string; locateFile: (filename: string) => string;
} }
/**
* A listener that receives the contents of a non-empty MediaPipe packet and
* its timestamp.
*/
export type SimpleListener<T> = (data: T, timestamp: number) => void;
/**
* A listener that receives the current MediaPipe packet timestamp. This is
* invoked even for empty packet.
*/
export type EmptyPacketListener = (timestamp: number) => void;
/**
* A listener that receives a single element of vector-returning output packet.
* Intended for internal usage.
*/
export type VectorListener<T> =
(data: T, index: number, length: number, timestamp: number) => void;
/** /**
* Declarations for Emscripten's WebAssembly Module behavior, so TS compiler * Declarations for Emscripten's WebAssembly Module behavior, so TS compiler
* doesn't break our JS/C++ bridge. * doesn't break our JS/C++ bridge.
@ -71,14 +90,12 @@ export declare interface WasmModule {
(dataPtr: number, dataSize: number, protoNamePtr: number, (dataPtr: number, dataSize: number, protoNamePtr: number,
streamNamePtr: number) => void; streamNamePtr: number) => void;
// Wasm Module output listener entrypoints. Also built as part of // Map of output streams to packet listeners. Also built as part of
// gl_graph_runner_internal_multi_input. // gl_graph_runner_internal_multi_input.
simpleListeners?: simpleListeners?:
{[outputStreamName: string]: (data: unknown, timestamp: number) => void}; Record<string, SimpleListener<unknown>|VectorListener<unknown>>;
vectorListeners?: { // Map of output streams to empty packet listeners.
[outputStreamName: string]: ( emptyPacketListeners?: Record<string, EmptyPacketListener>;
data: unknown, index: number, length: number, timestamp: number) => void
};
_attachBoolListener: (streamNamePtr: number) => void; _attachBoolListener: (streamNamePtr: number) => void;
_attachBoolVectorListener: (streamNamePtr: number) => void; _attachBoolVectorListener: (streamNamePtr: number) => void;
_attachDoubleListener: (streamNamePtr: number) => void; _attachDoubleListener: (streamNamePtr: number) => void;
@ -147,7 +164,7 @@ export type WasmMediaPipeConstructor<LibType> =
* Simple class to run an arbitrary image-in/image-out MediaPipe graph (i.e. * Simple class to run an arbitrary image-in/image-out MediaPipe graph (i.e.
* as created by wasm_mediapipe_demo BUILD macro), and either render results * as created by wasm_mediapipe_demo BUILD macro), and either render results
* into canvas, or else return the output WebGLTexture. Takes a WebAssembly * into canvas, or else return the output WebGLTexture. Takes a WebAssembly
* Module (must be instantiated to self.Module). * Module.
*/ */
export class GraphRunner { export class GraphRunner {
// TODO: These should be protected/private, but are left exposed for // TODO: These should be protected/private, but are left exposed for
@ -419,12 +436,10 @@ export class GraphRunner {
* Ensures existence of the simple listeners table and registers the callback. * Ensures existence of the simple listeners table and registers the callback.
* Intended for internal usage. * Intended for internal usage.
*/ */
setListener<T>( setListener<T>(outputStreamName: string, callbackFcn: SimpleListener<T>) {
outputStreamName: string,
callbackFcn: (data: T, timestamp: number) => void) {
this.wasmModule.simpleListeners = this.wasmModule.simpleListeners || {}; this.wasmModule.simpleListeners = this.wasmModule.simpleListeners || {};
this.wasmModule.simpleListeners[outputStreamName] = this.wasmModule.simpleListeners[outputStreamName] =
callbackFcn as (data: unknown, timestamp: number) => void; callbackFcn as SimpleListener<unknown>;
} }
/** /**
@ -432,20 +447,19 @@ export class GraphRunner {
* Intended for internal usage. * Intended for internal usage.
*/ */
setVectorListener<T>( setVectorListener<T>(
outputStreamName: string, outputStreamName: string, callbackFcn: SimpleListener<T[]>) {
callbackFcn: (data: T[], timestamp: number) => void) {
let buffer: T[] = []; let buffer: T[] = [];
this.wasmModule.vectorListeners = this.wasmModule.vectorListeners || {}; this.wasmModule.simpleListeners = this.wasmModule.simpleListeners || {};
this.wasmModule.vectorListeners[outputStreamName] = this.wasmModule.simpleListeners[outputStreamName] =
(data: unknown, index: number, length: number, timestamp: number) => { (data: unknown, index: number, length: number, timestamp: number) => {
// The Wasm listener gets invoked once for each element. Once we // The Wasm listener gets invoked once for each element. Once we
// receive all elements, we invoke the registered callback with the // receive all elements, we invoke the registered callback with
// full array. // the full array.
buffer[index] = data as T; buffer[index] = data as T;
if (index === length - 1) { if (index === length - 1) {
// Invoke the user callback directly, as the Wasm layer may clean up // Invoke the user callback directly, as the Wasm layer may
// the underlying data elements once we leave the scope of the // clean up the underlying data elements once we leave the scope
// listener. // of the listener.
callbackFcn(buffer, timestamp); callbackFcn(buffer, timestamp);
buffer = []; buffer = [];
} }
@ -460,6 +474,25 @@ export class GraphRunner {
this.wasmModule.errorListener = callbackFcn; this.wasmModule.errorListener = callbackFcn;
} }
/**
* Attaches a listener that will be invoked when the MediaPipe framework
* receives an empty packet on the provided output stream. This can be used
* to receive the latest output timestamp.
*
* Empty packet listeners are only active if there is a corresponding packet
* listener.
*
* @param outputStreamName The name of the graph output stream to receive
* empty packets from.
* @param callbackFcn The callback to receive the timestamp.
*/
attachEmptyPacketListener(
outputStreamName: string, callbackFcn: EmptyPacketListener) {
this.wasmModule.emptyPacketListeners =
this.wasmModule.emptyPacketListeners || {};
this.wasmModule.emptyPacketListeners[outputStreamName] = callbackFcn;
}
/** /**
* Takes the raw data from a JS audio capture array, and sends it to C++ to be * Takes the raw data from a JS audio capture array, and sends it to C++ to be
* processed. * processed.
@ -744,8 +777,7 @@ export class GraphRunner {
* should not perform overly complicated (or any async) behavior. * should not perform overly complicated (or any async) behavior.
*/ */
attachBoolListener( attachBoolListener(
outputStreamName: string, outputStreamName: string, callbackFcn: SimpleListener<boolean>): void {
callbackFcn: (data: boolean, timestamp: number) => void): void {
// Set up our TS listener to receive any packets for this stream. // Set up our TS listener to receive any packets for this stream.
this.setListener(outputStreamName, callbackFcn); this.setListener(outputStreamName, callbackFcn);
@ -765,8 +797,7 @@ export class GraphRunner {
* should not perform overly complicated (or any async) behavior. * should not perform overly complicated (or any async) behavior.
*/ */
attachBoolVectorListener( attachBoolVectorListener(
outputStreamName: string, outputStreamName: string, callbackFcn: SimpleListener<boolean[]>): void {
callbackFcn: (data: boolean[], timestamp: number) => void): void {
// Set up our TS listener to receive any packets for this stream. // Set up our TS listener to receive any packets for this stream.
this.setVectorListener(outputStreamName, callbackFcn); this.setVectorListener(outputStreamName, callbackFcn);
@ -786,8 +817,7 @@ export class GraphRunner {
* should not perform overly complicated (or any async) behavior. * should not perform overly complicated (or any async) behavior.
*/ */
attachIntListener( attachIntListener(
outputStreamName: string, outputStreamName: string, callbackFcn: SimpleListener<number>): void {
callbackFcn: (data: number, timestamp: number) => void): void {
// Set up our TS listener to receive any packets for this stream. // Set up our TS listener to receive any packets for this stream.
this.setListener(outputStreamName, callbackFcn); this.setListener(outputStreamName, callbackFcn);
@ -807,8 +837,7 @@ export class GraphRunner {
* should not perform overly complicated (or any async) behavior. * should not perform overly complicated (or any async) behavior.
*/ */
attachIntVectorListener( attachIntVectorListener(
outputStreamName: string, outputStreamName: string, callbackFcn: SimpleListener<number[]>): void {
callbackFcn: (data: number[], timestamp: number) => void): void {
// Set up our TS listener to receive any packets for this stream. // Set up our TS listener to receive any packets for this stream.
this.setVectorListener(outputStreamName, callbackFcn); this.setVectorListener(outputStreamName, callbackFcn);
@ -828,8 +857,7 @@ export class GraphRunner {
* should not perform overly complicated (or any async) behavior. * should not perform overly complicated (or any async) behavior.
*/ */
attachDoubleListener( attachDoubleListener(
outputStreamName: string, outputStreamName: string, callbackFcn: SimpleListener<number>): void {
callbackFcn: (data: number, timestamp: number) => void): void {
// Set up our TS listener to receive any packets for this stream. // Set up our TS listener to receive any packets for this stream.
this.setListener(outputStreamName, callbackFcn); this.setListener(outputStreamName, callbackFcn);
@ -849,8 +877,7 @@ export class GraphRunner {
* should not perform overly complicated (or any async) behavior. * should not perform overly complicated (or any async) behavior.
*/ */
attachDoubleVectorListener( attachDoubleVectorListener(
outputStreamName: string, outputStreamName: string, callbackFcn: SimpleListener<number[]>): void {
callbackFcn: (data: number[], timestamp: number) => void): void {
// Set up our TS listener to receive any packets for this stream. // Set up our TS listener to receive any packets for this stream.
this.setVectorListener(outputStreamName, callbackFcn); this.setVectorListener(outputStreamName, callbackFcn);
@ -870,8 +897,7 @@ export class GraphRunner {
* should not perform overly complicated (or any async) behavior. * should not perform overly complicated (or any async) behavior.
*/ */
attachFloatListener( attachFloatListener(
outputStreamName: string, outputStreamName: string, callbackFcn: SimpleListener<number>): void {
callbackFcn: (data: number, timestamp: number) => void): void {
// Set up our TS listener to receive any packets for this stream. // Set up our TS listener to receive any packets for this stream.
this.setListener(outputStreamName, callbackFcn); this.setListener(outputStreamName, callbackFcn);
@ -891,8 +917,7 @@ export class GraphRunner {
* should not perform overly complicated (or any async) behavior. * should not perform overly complicated (or any async) behavior.
*/ */
attachFloatVectorListener( attachFloatVectorListener(
outputStreamName: string, outputStreamName: string, callbackFcn: SimpleListener<number[]>): void {
callbackFcn: (data: number[], timestamp: number) => void): void {
// Set up our TS listener to receive any packets for this stream. // Set up our TS listener to receive any packets for this stream.
this.setVectorListener(outputStreamName, callbackFcn); this.setVectorListener(outputStreamName, callbackFcn);
@ -912,8 +937,7 @@ export class GraphRunner {
* should not perform overly complicated (or any async) behavior. * should not perform overly complicated (or any async) behavior.
*/ */
attachStringListener( attachStringListener(
outputStreamName: string, outputStreamName: string, callbackFcn: SimpleListener<string>): void {
callbackFcn: (data: string, timestamp: number) => void): void {
// Set up our TS listener to receive any packets for this stream. // Set up our TS listener to receive any packets for this stream.
this.setListener(outputStreamName, callbackFcn); this.setListener(outputStreamName, callbackFcn);
@ -933,8 +957,7 @@ export class GraphRunner {
* should not perform overly complicated (or any async) behavior. * should not perform overly complicated (or any async) behavior.
*/ */
attachStringVectorListener( attachStringVectorListener(
outputStreamName: string, outputStreamName: string, callbackFcn: SimpleListener<string[]>): void {
callbackFcn: (data: string[], timestamp: number) => void): void {
// Set up our TS listener to receive any packets for this stream. // Set up our TS listener to receive any packets for this stream.
this.setVectorListener(outputStreamName, callbackFcn); this.setVectorListener(outputStreamName, callbackFcn);
@ -964,8 +987,7 @@ export class GraphRunner {
* with it). * with it).
*/ */
attachProtoListener( attachProtoListener(
outputStreamName: string, outputStreamName: string, callbackFcn: SimpleListener<Uint8Array>,
callbackFcn: (data: Uint8Array, timestamp: number) => void,
makeDeepCopy?: boolean): void { makeDeepCopy?: boolean): void {
// Set up our TS listener to receive any packets for this stream. // Set up our TS listener to receive any packets for this stream.
this.setListener(outputStreamName, callbackFcn); this.setListener(outputStreamName, callbackFcn);
@ -999,8 +1021,7 @@ export class GraphRunner {
* with it). * with it).
*/ */
attachProtoVectorListener( attachProtoVectorListener(
outputStreamName: string, outputStreamName: string, callbackFcn: SimpleListener<Uint8Array[]>,
callbackFcn: (data: Uint8Array[], timestamp: number) => void,
makeDeepCopy?: boolean): void { makeDeepCopy?: boolean): void {
// Set up our TS listener to receive any packets for this stream. // Set up our TS listener to receive any packets for this stream.
this.setVectorListener(outputStreamName, callbackFcn); this.setVectorListener(outputStreamName, callbackFcn);
@ -1034,8 +1055,7 @@ export class GraphRunner {
* with it). * with it).
*/ */
attachAudioListener( attachAudioListener(
outputStreamName: string, outputStreamName: string, callbackFcn: SimpleListener<Float32Array>,
callbackFcn: (data: Float32Array, timestamp: number) => void,
makeDeepCopy?: boolean): void { makeDeepCopy?: boolean): void {
if (!this.wasmModule._attachAudioListener) { if (!this.wasmModule._attachAudioListener) {
console.warn( console.warn(
@ -1045,13 +1065,12 @@ export class GraphRunner {
// Set up our TS listener to receive any packets for this stream, and // Set up our TS listener to receive any packets for this stream, and
// additionally reformat our Uint8Array into a Float32Array for the user. // additionally reformat our Uint8Array into a Float32Array for the user.
this.setListener( this.setListener<Uint8Array>(outputStreamName, (data, timestamp) => {
outputStreamName, (data: Uint8Array, timestamp: number) => { // Should be very fast
// Should be very fast const floatArray =
const floatArray = new Float32Array(data.buffer, data.byteOffset, data.length / 4);
new Float32Array(data.buffer, data.byteOffset, data.length / 4); callbackFcn(floatArray, timestamp);
callbackFcn(floatArray, timestamp); });
});
// Tell our graph to listen for string packets on this stream. // Tell our graph to listen for string packets on this stream.
this.wrapStringPtr(outputStreamName, (outputStreamNamePtr: number) => { this.wrapStringPtr(outputStreamName, (outputStreamNamePtr: number) => {

View File

@ -1,6 +1,4 @@
import {GraphRunner, ImageSource} from './graph_runner'; import {GraphRunner, ImageSource, SimpleListener} from './graph_runner';
/** /**
* We extend from a GraphRunner constructor. This ensures our mixin has * We extend from a GraphRunner constructor. This ensures our mixin has
@ -76,7 +74,7 @@ export function SupportImage<TBase extends LibConstructor>(Base: TBase) {
*/ */
attachImageListener( attachImageListener(
outputStreamName: string, outputStreamName: string,
callbackFcn: (data: WasmImage, timestamp: number) => void): void { callbackFcn: SimpleListener<WasmImage>): void {
// Set up our TS listener to receive any packets for this stream. // Set up our TS listener to receive any packets for this stream.
this.setListener(outputStreamName, callbackFcn); this.setListener(outputStreamName, callbackFcn);
@ -99,7 +97,7 @@ export function SupportImage<TBase extends LibConstructor>(Base: TBase) {
*/ */
attachImageVectorListener( attachImageVectorListener(
outputStreamName: string, outputStreamName: string,
callbackFcn: (data: WasmImage[], timestamp: number) => void): void { callbackFcn: SimpleListener<WasmImage[]>): void {
// Set up our TS listener to receive any packets for this stream. // Set up our TS listener to receive any packets for this stream.
this.setVectorListener(outputStreamName, callbackFcn); this.setVectorListener(outputStreamName, callbackFcn);