Merge pull request #3801 from kinaryml:gesture-recognizer-python

PiperOrigin-RevId: 485884796
This commit is contained in:
Copybara-Service 2022-11-03 08:52:17 -07:00
commit 716e59f90c
14 changed files with 862 additions and 99 deletions

View File

@ -87,6 +87,7 @@ cc_library(
cc_library(
name = "builtin_task_graphs",
deps = [
"//mediapipe/tasks/cc/vision/gesture_recognizer:gesture_recognizer_graph",
"//mediapipe/tasks/cc/vision/image_classifier:image_classifier_graph",
"//mediapipe/tasks/cc/vision/image_segmenter:image_segmenter_graph",
"//mediapipe/tasks/cc/vision/object_detector:object_detector_graph",

View File

@ -36,6 +36,29 @@ py_library(
],
)
py_library(
name = "landmark",
srcs = ["landmark.py"],
deps = [
"//mediapipe/framework/formats:landmark_py_pb2",
"//mediapipe/tasks/python/core:optional_dependencies",
],
)
py_library(
name = "landmark_detection_result",
srcs = ["landmark_detection_result.py"],
deps = [
":landmark",
":rect",
"//mediapipe/framework/formats:classification_py_pb2",
"//mediapipe/framework/formats:landmark_py_pb2",
"//mediapipe/tasks/cc/components/containers/proto:landmarks_detection_result_py_pb2",
"//mediapipe/tasks/python/components/containers:category",
"//mediapipe/tasks/python/core:optional_dependencies",
],
)
py_library(
name = "category",
srcs = ["category.py"],

View File

@ -14,7 +14,7 @@
"""Category data class."""
import dataclasses
from typing import Any
from typing import Any, Optional
from mediapipe.tasks.cc.components.containers.proto import category_pb2
from mediapipe.tasks.python.core.optional_dependencies import doc_controls
@ -39,10 +39,10 @@ class Category:
category_name: The label of this category object.
"""
index: int
score: float
display_name: str
category_name: str
index: Optional[int] = None
score: Optional[float] = None
display_name: Optional[str] = None
category_name: Optional[str] = None
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _CategoryProto:

View File

@ -0,0 +1,122 @@
# Copyright 2022 The MediaPipe Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Landmark data class."""
import dataclasses
from typing import Optional
from mediapipe.framework.formats import landmark_pb2
from mediapipe.tasks.python.core.optional_dependencies import doc_controls
_LandmarkProto = landmark_pb2.Landmark
_NormalizedLandmarkProto = landmark_pb2.NormalizedLandmark
@dataclasses.dataclass
class Landmark:
"""A landmark that can have 1 to 3 dimensions.
Use x for 1D points, (x, y) for 2D points and (x, y, z) for 3D points.
Attributes:
x: The x coordinate.
y: The y coordinate.
z: The z coordinate.
visibility: Landmark visibility. Should stay unset if not supported. Float
score of whether landmark is visible or occluded by other objects.
Landmark considered as invisible also if it is not present on the screen
(out of scene bounds). Depending on the model, visibility value is either
a sigmoid or an argument of sigmoid.
presence: Landmark presence. Should stay unset if not supported. Float score
of whether landmark is present on the scene (located within scene bounds).
Depending on the model, presence value is either a result of sigmoid or an
argument of sigmoid function to get landmark presence probability.
"""
x: Optional[float] = None
y: Optional[float] = None
z: Optional[float] = None
visibility: Optional[float] = None
presence: Optional[float] = None
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _LandmarkProto:
"""Generates a Landmark protobuf object."""
return _LandmarkProto(
x=self.x,
y=self.y,
z=self.z,
visibility=self.visibility,
presence=self.presence)
@classmethod
@doc_controls.do_not_generate_docs
def create_from_pb2(cls, pb2_obj: _LandmarkProto) -> 'Landmark':
"""Creates a `Landmark` object from the given protobuf object."""
return Landmark(
x=pb2_obj.x,
y=pb2_obj.y,
z=pb2_obj.z,
visibility=pb2_obj.visibility,
presence=pb2_obj.presence)
@dataclasses.dataclass
class NormalizedLandmark:
"""A normalized version of above Landmark proto.
All coordinates should be within [0, 1].
Attributes:
x: The normalized x coordinate.
y: The normalized y coordinate.
z: The normalized z coordinate.
visibility: Landmark visibility. Should stay unset if not supported. Float
score of whether landmark is visible or occluded by other objects.
Landmark considered as invisible also if it is not present on the screen
(out of scene bounds). Depending on the model, visibility value is either
a sigmoid or an argument of sigmoid.
presence: Landmark presence. Should stay unset if not supported. Float score
of whether landmark is present on the scene (located within scene bounds).
Depending on the model, presence value is either a result of sigmoid or an
argument of sigmoid function to get landmark presence probability.
"""
x: Optional[float] = None
y: Optional[float] = None
z: Optional[float] = None
visibility: Optional[float] = None
presence: Optional[float] = None
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _NormalizedLandmarkProto:
"""Generates a NormalizedLandmark protobuf object."""
return _NormalizedLandmarkProto(
x=self.x,
y=self.y,
z=self.z,
visibility=self.visibility,
presence=self.presence)
@classmethod
@doc_controls.do_not_generate_docs
def create_from_pb2(
cls, pb2_obj: _NormalizedLandmarkProto) -> 'NormalizedLandmark':
"""Creates a `NormalizedLandmark` object from the given protobuf object."""
return NormalizedLandmark(
x=pb2_obj.x,
y=pb2_obj.y,
z=pb2_obj.z,
visibility=pb2_obj.visibility,
presence=pb2_obj.presence)

View File

@ -0,0 +1,96 @@
# Copyright 2022 The MediaPipe Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Landmarks Detection Result data class."""
import dataclasses
from typing import Optional, List
from mediapipe.framework.formats import classification_pb2
from mediapipe.framework.formats import landmark_pb2
from mediapipe.tasks.cc.components.containers.proto import landmarks_detection_result_pb2
from mediapipe.tasks.python.components.containers import category as category_module
from mediapipe.tasks.python.components.containers import landmark as landmark_module
from mediapipe.tasks.python.components.containers import rect as rect_module
from mediapipe.tasks.python.core.optional_dependencies import doc_controls
_LandmarksDetectionResultProto = landmarks_detection_result_pb2.LandmarksDetectionResult
_ClassificationProto = classification_pb2.Classification
_ClassificationListProto = classification_pb2.ClassificationList
_LandmarkListProto = landmark_pb2.LandmarkList
_NormalizedLandmarkListProto = landmark_pb2.NormalizedLandmarkList
_NormalizedRect = rect_module.NormalizedRect
_Category = category_module.Category
_NormalizedLandmark = landmark_module.NormalizedLandmark
_Landmark = landmark_module.Landmark
@dataclasses.dataclass
class LandmarksDetectionResult:
"""Represents the landmarks detection result.
Attributes: landmarks : A list of `NormalizedLandmark` objects. categories : A
list of `Category` objects. world_landmarks : A list of `Landmark` objects.
rect : A `NormalizedRect` object.
"""
landmarks: Optional[List[_NormalizedLandmark]]
categories: Optional[List[_Category]]
world_landmarks: Optional[List[_Landmark]]
rect: _NormalizedRect
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _LandmarksDetectionResultProto:
"""Generates a LandmarksDetectionResult protobuf object."""
classifications = _ClassificationListProto()
for category in self.categories:
classifications.classification.append(
_ClassificationProto(
index=category.index,
score=category.score,
label=category.category_name,
display_name=category.display_name))
return _LandmarksDetectionResultProto(
landmarks=_NormalizedLandmarkListProto(self.landmarks),
classifications=classifications,
world_landmarks=_LandmarkListProto(self.world_landmarks),
rect=self.rect.to_pb2())
@classmethod
@doc_controls.do_not_generate_docs
def create_from_pb2(
cls,
pb2_obj: _LandmarksDetectionResultProto) -> 'LandmarksDetectionResult':
"""Creates a `LandmarksDetectionResult` object from the given protobuf object.
"""
categories = []
for classification in pb2_obj.classifications.classification:
categories.append(
category_module.Category(
score=classification.score,
index=classification.index,
category_name=classification.label,
display_name=classification.display_name))
return LandmarksDetectionResult(
landmarks=[
_NormalizedLandmark.create_from_pb2(landmark)
for landmark in pb2_obj.landmarks.landmark
],
categories=categories,
world_landmarks=[
_Landmark.create_from_pb2(landmark)
for landmark in pb2_obj.world_landmarks.landmark
],
rect=_NormalizedRect.create_from_pb2(pb2_obj.rect))

View File

@ -19,80 +19,49 @@ from typing import Any, Optional
from mediapipe.framework.formats import rect_pb2
from mediapipe.tasks.python.core.optional_dependencies import doc_controls
_RectProto = rect_pb2.Rect
_NormalizedRectProto = rect_pb2.NormalizedRect
@dataclasses.dataclass
class Rect:
"""A rectangle with rotation in image coordinates.
"""A rectangle, used as part of detection results or as input region-of-interest.
Attributes: x_center : The X coordinate of the top-left corner, in pixels.
y_center : The Y coordinate of the top-left corner, in pixels.
width: The width of the rectangle, in pixels.
height: The height of the rectangle, in pixels.
rotation: Rotation angle is clockwise in radians.
rect_id: Optional unique id to help associate different rectangles to each
other.
The coordinates are normalized wrt the image dimensions, i.e. generally in
[0,1] but they may exceed these bounds if describing a region overlapping the
image. The origin is on the top-left corner of the image.
Attributes:
left: The X coordinate of the left side of the rectangle.
top: The Y coordinate of the top of the rectangle.
right: The X coordinate of the right side of the rectangle.
bottom: The Y coordinate of the bottom of the rectangle.
"""
x_center: int
y_center: int
width: int
height: int
rotation: Optional[float] = 0.0
rect_id: Optional[int] = None
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _RectProto:
"""Generates a Rect protobuf object."""
return _RectProto(
x_center=self.x_center,
y_center=self.y_center,
width=self.width,
height=self.height,
)
@classmethod
@doc_controls.do_not_generate_docs
def create_from_pb2(cls, pb2_obj: _RectProto) -> 'Rect':
"""Creates a `Rect` object from the given protobuf object."""
return Rect(
x_center=pb2_obj.x_center,
y_center=pb2_obj.y_center,
width=pb2_obj.width,
height=pb2_obj.height)
def __eq__(self, other: Any) -> bool:
"""Checks if this object is equal to the given object.
Args:
other: The object to be compared with.
Returns:
True if the objects are equal.
"""
if not isinstance(other, Rect):
return False
return self.to_pb2().__eq__(other.to_pb2())
left: float
top: float
right: float
bottom: float
@dataclasses.dataclass
class NormalizedRect:
"""A rectangle with rotation in normalized coordinates.
The values of box
Location of the center of the rectangle in image coordinates. The (0.0, 0.0)
point is at the (top, left) corner.
center location and size are within [0, 1].
The values of box center location and size are within [0, 1].
Attributes: x_center : The X normalized coordinate of the top-left corner.
y_center : The Y normalized coordinate of the top-left corner.
Attributes:
x_center: The normalized X coordinate of the rectangle, in image
coordinates.
y_center: The normalized Y coordinate of the rectangle, in image
coordinates.
width: The width of the rectangle.
height: The height of the rectangle.
rotation: Rotation angle is clockwise in radians.
rect_id: Optional unique id to help associate different rectangles to each
other.
rect_id: Optional unique id to help associate different rectangles to each
other.
"""
x_center: float

View File

@ -53,6 +53,7 @@ py_test(
"//mediapipe/tasks/python/core:base_options",
"//mediapipe/tasks/python/test:test_utils",
"//mediapipe/tasks/python/vision:image_classifier",
"//mediapipe/tasks/python/vision/core:image_processing_options",
"//mediapipe/tasks/python/vision/core:vision_task_running_mode",
],
)

View File

@ -30,9 +30,10 @@ from mediapipe.tasks.python.components.processors import classifier_options
from mediapipe.tasks.python.core import base_options as base_options_module
from mediapipe.tasks.python.test import test_utils
from mediapipe.tasks.python.vision import image_classifier
from mediapipe.tasks.python.vision.core import image_processing_options as image_processing_options_module
from mediapipe.tasks.python.vision.core import vision_task_running_mode
_NormalizedRect = rect.NormalizedRect
_Rect = rect.Rect
_BaseOptions = base_options_module.BaseOptions
_ClassifierOptions = classifier_options.ClassifierOptions
_Category = category.Category
@ -43,6 +44,7 @@ _Image = image.Image
_ImageClassifier = image_classifier.ImageClassifier
_ImageClassifierOptions = image_classifier.ImageClassifierOptions
_RUNNING_MODE = vision_task_running_mode.VisionTaskRunningMode
_ImageProcessingOptions = image_processing_options_module.ImageProcessingOptions
_MODEL_FILE = 'mobilenet_v2_1.0_224.tflite'
_IMAGE_FILE = 'burger.jpg'
@ -227,11 +229,11 @@ class ImageClassifierTest(parameterized.TestCase):
test_image = _Image.create_from_file(
test_utils.get_test_data_path(
os.path.join(_TEST_DATA_DIR, 'multi_objects.jpg')))
# NormalizedRect around the soccer ball.
roi = _NormalizedRect(
x_center=0.532, y_center=0.521, width=0.164, height=0.427)
# Region-of-interest around the soccer ball.
roi = _Rect(left=0.45, top=0.3075, right=0.614, bottom=0.7345)
image_processing_options = _ImageProcessingOptions(roi)
# Performs image classification on the input.
image_result = classifier.classify(test_image, roi)
image_result = classifier.classify(test_image, image_processing_options)
# Comparing results.
test_utils.assert_proto_equals(self, image_result.to_pb2(),
_generate_soccer_ball_results(0).to_pb2())
@ -417,12 +419,12 @@ class ImageClassifierTest(parameterized.TestCase):
test_image = _Image.create_from_file(
test_utils.get_test_data_path(
os.path.join(_TEST_DATA_DIR, 'multi_objects.jpg')))
# NormalizedRect around the soccer ball.
roi = _NormalizedRect(
x_center=0.532, y_center=0.521, width=0.164, height=0.427)
# Region-of-interest around the soccer ball.
roi = _Rect(left=0.45, top=0.3075, right=0.614, bottom=0.7345)
image_processing_options = _ImageProcessingOptions(roi)
for timestamp in range(0, 300, 30):
classification_result = classifier.classify_for_video(
test_image, timestamp, roi)
test_image, timestamp, image_processing_options)
test_utils.assert_proto_equals(
self, classification_result.to_pb2(),
_generate_soccer_ball_results(timestamp).to_pb2())
@ -491,9 +493,9 @@ class ImageClassifierTest(parameterized.TestCase):
test_image = _Image.create_from_file(
test_utils.get_test_data_path(
os.path.join(_TEST_DATA_DIR, 'multi_objects.jpg')))
# NormalizedRect around the soccer ball.
roi = _NormalizedRect(
x_center=0.532, y_center=0.521, width=0.164, height=0.427)
# Region-of-interest around the soccer ball.
roi = _Rect(left=0.45, top=0.3075, right=0.614, bottom=0.7345)
image_processing_options = _ImageProcessingOptions(roi)
observed_timestamp_ms = -1
def check_result(result: _ClassificationResult, output_image: _Image,
@ -514,7 +516,8 @@ class ImageClassifierTest(parameterized.TestCase):
result_callback=check_result)
with _ImageClassifier.create_from_options(options) as classifier:
for timestamp in range(0, 300, 30):
classifier.classify_async(test_image, timestamp, roi)
classifier.classify_async(test_image, timestamp,
image_processing_options)
if __name__ == '__main__':

View File

@ -55,6 +55,7 @@ py_library(
"//mediapipe/tasks/python/core:optional_dependencies",
"//mediapipe/tasks/python/core:task_info",
"//mediapipe/tasks/python/vision/core:base_vision_task_api",
"//mediapipe/tasks/python/vision/core:image_processing_options",
"//mediapipe/tasks/python/vision/core:vision_task_running_mode",
],
)
@ -77,3 +78,27 @@ py_library(
"//mediapipe/tasks/python/vision/core:vision_task_running_mode",
],
)
py_library(
name = "gesture_recognizer",
srcs = [
"gesture_recognizer.py",
],
deps = [
"//mediapipe/framework/formats:classification_py_pb2",
"//mediapipe/framework/formats:landmark_py_pb2",
"//mediapipe/python:_framework_bindings",
"//mediapipe/python:packet_creator",
"//mediapipe/python:packet_getter",
"//mediapipe/tasks/cc/vision/gesture_recognizer/proto:gesture_recognizer_graph_options_py_pb2",
"//mediapipe/tasks/python/components/containers:category",
"//mediapipe/tasks/python/components/containers:landmark",
"//mediapipe/tasks/python/components/processors:classifier_options",
"//mediapipe/tasks/python/core:base_options",
"//mediapipe/tasks/python/core:optional_dependencies",
"//mediapipe/tasks/python/core:task_info",
"//mediapipe/tasks/python/vision/core:base_vision_task_api",
"//mediapipe/tasks/python/vision/core:image_processing_options",
"//mediapipe/tasks/python/vision/core:vision_task_running_mode",
],
)

View File

@ -23,15 +23,25 @@ py_library(
srcs = ["vision_task_running_mode.py"],
)
py_library(
name = "image_processing_options",
srcs = ["image_processing_options.py"],
deps = [
"//mediapipe/tasks/python/components/containers:rect",
],
)
py_library(
name = "base_vision_task_api",
srcs = [
"base_vision_task_api.py",
],
deps = [
":image_processing_options",
":vision_task_running_mode",
"//mediapipe/framework:calculator_py_pb2",
"//mediapipe/python:_framework_bindings",
"//mediapipe/tasks/python/components/containers:rect",
"//mediapipe/tasks/python/core:optional_dependencies",
],
)

View File

@ -13,17 +13,22 @@
# limitations under the License.
"""MediaPipe vision task base api."""
import math
from typing import Callable, Mapping, Optional
from mediapipe.framework import calculator_pb2
from mediapipe.python._framework_bindings import packet as packet_module
from mediapipe.python._framework_bindings import task_runner as task_runner_module
from mediapipe.tasks.python.components.containers import rect as rect_module
from mediapipe.tasks.python.core.optional_dependencies import doc_controls
from mediapipe.tasks.python.vision.core import image_processing_options as image_processing_options_module
from mediapipe.tasks.python.vision.core import vision_task_running_mode as running_mode_module
_TaskRunner = task_runner_module.TaskRunner
_Packet = packet_module.Packet
_NormalizedRect = rect_module.NormalizedRect
_RunningMode = running_mode_module.VisionTaskRunningMode
_ImageProcessingOptions = image_processing_options_module.ImageProcessingOptions
class BaseVisionTaskApi(object):
@ -122,6 +127,49 @@ class BaseVisionTaskApi(object):
+ self._running_mode.name)
self._runner.send(inputs)
def convert_to_normalized_rect(self,
options: _ImageProcessingOptions,
roi_allowed: bool = True) -> _NormalizedRect:
"""Converts from ImageProcessingOptions to NormalizedRect, performing sanity checks on-the-fly.
If the input ImageProcessingOptions is not present, returns a default
NormalizedRect covering the whole image with rotation set to 0. If
'roi_allowed' is false, an error will be returned if the input
ImageProcessingOptions has its 'region_of_interest' field set.
Args:
options: Options for image processing.
roi_allowed: Indicates if the `region_of_interest` field is allowed to be
set. By default, it's set to True.
Returns:
A normalized rect proto that repesents the image processing options.
"""
normalized_rect = _NormalizedRect(
rotation=0, x_center=0.5, y_center=0.5, width=1, height=1)
if options is None:
return normalized_rect
if options.rotation_degrees % 90 != 0:
raise ValueError('Expected rotation to be a multiple of 90°.')
# Convert to radians counter-clockwise.
normalized_rect.rotation = -options.rotation_degrees * math.pi / 180.0
if options.region_of_interest:
if not roi_allowed:
raise ValueError("This task doesn't support region-of-interest.")
roi = options.region_of_interest
if roi.left >= roi.right or roi.top >= roi.bottom:
raise ValueError('Expected Rect with left < right and top < bottom.')
if roi.left < 0 or roi.top < 0 or roi.right > 1 or roi.bottom > 1:
raise ValueError('Expected Rect values to be in [0,1].')
normalized_rect.x_center = (roi.left + roi.right) / 2.0
normalized_rect.y_center = (roi.top + roi.bottom) / 2.0
normalized_rect.width = roi.right - roi.left
normalized_rect.height = roi.bottom - roi.top
return normalized_rect
def close(self) -> None:
"""Shuts down the mediapipe vision task instance.

View File

@ -0,0 +1,39 @@
# Copyright 2022 The MediaPipe Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""MediaPipe vision options for image processing."""
import dataclasses
from typing import Optional
from mediapipe.tasks.python.components.containers import rect as rect_module
@dataclasses.dataclass
class ImageProcessingOptions:
"""Options for image processing.
If both region-of-interest and rotation are specified, the crop around the
region-of-interest is extracted first, then the specified rotation is applied
to the crop.
Attributes:
region_of_interest: The optional region-of-interest to crop from the image.
If not specified, the full image is used. Coordinates must be in [0,1]
with 'left' < 'right' and 'top' < 'bottom'.
rotation_degrees: The rotation to apply to the image (or cropped
region-of-interest), in degrees clockwise. The rotation must be a multiple
(positive or negative) of 90°.
"""
region_of_interest: Optional[rect_module.Rect] = None
rotation_degrees: int = 0

View File

@ -0,0 +1,426 @@
# Copyright 2022 The MediaPipe Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""MediaPipe gesture recognizer task."""
import dataclasses
from typing import Callable, Mapping, Optional, List
from mediapipe.framework.formats import classification_pb2
from mediapipe.framework.formats import landmark_pb2
from mediapipe.python import packet_creator
from mediapipe.python import packet_getter
from mediapipe.python._framework_bindings import image as image_module
from mediapipe.python._framework_bindings import packet as packet_module
from mediapipe.tasks.cc.vision.gesture_recognizer.proto import gesture_recognizer_graph_options_pb2
from mediapipe.tasks.python.components.containers import category as category_module
from mediapipe.tasks.python.components.containers import landmark as landmark_module
from mediapipe.tasks.python.components.processors import classifier_options
from mediapipe.tasks.python.core import base_options as base_options_module
from mediapipe.tasks.python.core import task_info as task_info_module
from mediapipe.tasks.python.core.optional_dependencies import doc_controls
from mediapipe.tasks.python.vision.core import base_vision_task_api
from mediapipe.tasks.python.vision.core import image_processing_options as image_processing_options_module
from mediapipe.tasks.python.vision.core import vision_task_running_mode as running_mode_module
_BaseOptions = base_options_module.BaseOptions
_GestureRecognizerGraphOptionsProto = gesture_recognizer_graph_options_pb2.GestureRecognizerGraphOptions
_ClassifierOptions = classifier_options.ClassifierOptions
_RunningMode = running_mode_module.VisionTaskRunningMode
_ImageProcessingOptions = image_processing_options_module.ImageProcessingOptions
_TaskInfo = task_info_module.TaskInfo
_IMAGE_IN_STREAM_NAME = 'image_in'
_IMAGE_OUT_STREAM_NAME = 'image_out'
_IMAGE_TAG = 'IMAGE'
_NORM_RECT_STREAM_NAME = 'norm_rect_in'
_NORM_RECT_TAG = 'NORM_RECT'
_HAND_GESTURE_STREAM_NAME = 'hand_gestures'
_HAND_GESTURE_TAG = 'HAND_GESTURES'
_HANDEDNESS_STREAM_NAME = 'handedness'
_HANDEDNESS_TAG = 'HANDEDNESS'
_HAND_LANDMARKS_STREAM_NAME = 'landmarks'
_HAND_LANDMARKS_TAG = 'LANDMARKS'
_HAND_WORLD_LANDMARKS_STREAM_NAME = 'world_landmarks'
_HAND_WORLD_LANDMARKS_TAG = 'WORLD_LANDMARKS'
_TASK_GRAPH_NAME = 'mediapipe.tasks.vision.gesture_recognizer.GestureRecognizerGraph'
_MICRO_SECONDS_PER_MILLISECOND = 1000
_GESTURE_DEFAULT_INDEX = -1
@dataclasses.dataclass
class GestureRecognitionResult:
"""The gesture recognition result from GestureRecognizer, where each vector element represents a single hand detected in the image.
Attributes:
gestures: Recognized hand gestures of detected hands. Note that the index of
the gesture is always -1, because the raw indices from multiple gesture
classifiers cannot consolidate to a meaningful index.
handedness: Classification of handedness.
hand_landmarks: Detected hand landmarks in normalized image coordinates.
hand_world_landmarks: Detected hand landmarks in world coordinates.
"""
gestures: List[List[category_module.Category]]
handedness: List[List[category_module.Category]]
hand_landmarks: List[List[landmark_module.NormalizedLandmark]]
hand_world_landmarks: List[List[landmark_module.Landmark]]
def _build_recognition_result(
output_packets: Mapping[str,
packet_module.Packet]) -> GestureRecognitionResult:
"""Consturcts a `GestureRecognitionResult` from output packets."""
gestures_proto_list = packet_getter.get_proto_list(
output_packets[_HAND_GESTURE_STREAM_NAME])
handedness_proto_list = packet_getter.get_proto_list(
output_packets[_HANDEDNESS_STREAM_NAME])
hand_landmarks_proto_list = packet_getter.get_proto_list(
output_packets[_HAND_LANDMARKS_STREAM_NAME])
hand_world_landmarks_proto_list = packet_getter.get_proto_list(
output_packets[_HAND_WORLD_LANDMARKS_STREAM_NAME])
gesture_results = []
for proto in gestures_proto_list:
gesture_categories = []
gesture_classifications = classification_pb2.ClassificationList()
gesture_classifications.MergeFrom(proto)
for gesture in gesture_classifications.classification:
gesture_categories.append(
category_module.Category(
index=_GESTURE_DEFAULT_INDEX,
score=gesture.score,
display_name=gesture.display_name,
category_name=gesture.label))
gesture_results.append(gesture_categories)
handedness_results = []
for proto in handedness_proto_list:
handedness_categories = []
handedness_classifications = classification_pb2.ClassificationList()
handedness_classifications.MergeFrom(proto)
for handedness in handedness_classifications.classification:
handedness_categories.append(
category_module.Category(
index=handedness.index,
score=handedness.score,
display_name=handedness.display_name,
category_name=handedness.label))
handedness_results.append(handedness_categories)
hand_landmarks_results = []
for proto in hand_landmarks_proto_list:
hand_landmarks = landmark_pb2.NormalizedLandmarkList()
hand_landmarks.MergeFrom(proto)
hand_landmarks_results.append([
landmark_module.NormalizedLandmark.create_from_pb2(hand_landmark)
for hand_landmark in hand_landmarks.landmark
])
hand_world_landmarks_results = []
for proto in hand_world_landmarks_proto_list:
hand_world_landmarks = landmark_pb2.LandmarkList()
hand_world_landmarks.MergeFrom(proto)
hand_world_landmarks_results.append([
landmark_module.Landmark.create_from_pb2(hand_world_landmark)
for hand_world_landmark in hand_world_landmarks.landmark
])
return GestureRecognitionResult(gesture_results, handedness_results,
hand_landmarks_results,
hand_world_landmarks_results)
@dataclasses.dataclass
class GestureRecognizerOptions:
"""Options for the gesture recognizer task.
Attributes:
base_options: Base options for the hand gesture recognizer task.
running_mode: The running mode of the task. Default to the image mode.
Gesture recognizer task has three running modes: 1) The image mode for
recognizing hand gestures on single image inputs. 2) The video mode for
recognizing hand gestures on the decoded frames of a video. 3) The live
stream mode for recognizing hand gestures on a live stream of input data,
such as from camera.
num_hands: The maximum number of hands can be detected by the recognizer.
min_hand_detection_confidence: The minimum confidence score for the hand
detection to be considered successful.
min_hand_presence_confidence: The minimum confidence score of hand presence
score in the hand landmark detection.
min_tracking_confidence: The minimum confidence score for the hand tracking
to be considered successful.
canned_gesture_classifier_options: Options for configuring the canned
gestures classifier, such as score threshold, allow list and deny list of
gestures. The categories for canned gesture classifiers are: ["None",
"Closed_Fist", "Open_Palm", "Pointing_Up", "Thumb_Down", "Thumb_Up",
"Victory", "ILoveYou"]. Note this option is subject to change.
custom_gesture_classifier_options: Options for configuring the custom
gestures classifier, such as score threshold, allow list and deny list of
gestures. Note this option is subject to change.
result_callback: The user-defined result callback for processing live stream
data. The result callback should only be specified when the running mode
is set to the live stream mode.
"""
base_options: _BaseOptions
running_mode: _RunningMode = _RunningMode.IMAGE
num_hands: Optional[int] = 1
min_hand_detection_confidence: Optional[float] = 0.5
min_hand_presence_confidence: Optional[float] = 0.5
min_tracking_confidence: Optional[float] = 0.5
canned_gesture_classifier_options: Optional[
_ClassifierOptions] = _ClassifierOptions()
custom_gesture_classifier_options: Optional[
_ClassifierOptions] = _ClassifierOptions()
result_callback: Optional[Callable[
[GestureRecognitionResult, image_module.Image, int], None]] = None
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _GestureRecognizerGraphOptionsProto:
"""Generates an GestureRecognizerOptions protobuf object."""
base_options_proto = self.base_options.to_pb2()
base_options_proto.use_stream_mode = False if self.running_mode == _RunningMode.IMAGE else True
# Initialize gesture recognizer options from base options.
gesture_recognizer_options_proto = _GestureRecognizerGraphOptionsProto(
base_options=base_options_proto)
# Configure hand detector and hand landmarker options.
hand_landmarker_options_proto = gesture_recognizer_options_proto.hand_landmarker_graph_options
hand_landmarker_options_proto.min_tracking_confidence = self.min_tracking_confidence
hand_landmarker_options_proto.hand_detector_graph_options.num_hands = self.num_hands
hand_landmarker_options_proto.hand_detector_graph_options.min_detection_confidence = self.min_hand_detection_confidence
hand_landmarker_options_proto.hand_landmarks_detector_graph_options.min_detection_confidence = self.min_hand_presence_confidence
# Configure hand gesture recognizer options.
hand_gesture_recognizer_options_proto = gesture_recognizer_options_proto.hand_gesture_recognizer_graph_options
hand_gesture_recognizer_options_proto.canned_gesture_classifier_graph_options.classifier_options.CopyFrom(
self.canned_gesture_classifier_options.to_pb2())
hand_gesture_recognizer_options_proto.custom_gesture_classifier_graph_options.classifier_options.CopyFrom(
self.custom_gesture_classifier_options.to_pb2())
return gesture_recognizer_options_proto
class GestureRecognizer(base_vision_task_api.BaseVisionTaskApi):
"""Class that performs gesture recognition on images."""
@classmethod
def create_from_model_path(cls, model_path: str) -> 'GestureRecognizer':
"""Creates an `GestureRecognizer` object from a TensorFlow Lite model and the default `GestureRecognizerOptions`.
Note that the created `GestureRecognizer` instance is in image mode, for
recognizing hand gestures on single image inputs.
Args:
model_path: Path to the model.
Returns:
`GestureRecognizer` object that's created from the model file and the
default `GestureRecognizerOptions`.
Raises:
ValueError: If failed to create `GestureRecognizer` object from the
provided file such as invalid file path.
RuntimeError: If other types of error occurred.
"""
base_options = _BaseOptions(model_asset_path=model_path)
options = GestureRecognizerOptions(
base_options=base_options, running_mode=_RunningMode.IMAGE)
return cls.create_from_options(options)
@classmethod
def create_from_options(
cls, options: GestureRecognizerOptions) -> 'GestureRecognizer':
"""Creates the `GestureRecognizer` object from gesture recognizer options.
Args:
options: Options for the gesture recognizer task.
Returns:
`GestureRecognizer` object that's created from `options`.
Raises:
ValueError: If failed to create `GestureRecognizer` object from
`GestureRecognizerOptions` such as missing the model.
RuntimeError: If other types of error occurred.
"""
def packets_callback(output_packets: Mapping[str, packet_module.Packet]):
if output_packets[_IMAGE_OUT_STREAM_NAME].is_empty():
return
image = packet_getter.get_image(output_packets[_IMAGE_OUT_STREAM_NAME])
if output_packets[_HAND_GESTURE_STREAM_NAME].is_empty():
empty_packet = output_packets[_HAND_GESTURE_STREAM_NAME]
options.result_callback(
GestureRecognitionResult([], [], [], []), image,
empty_packet.timestamp.value // _MICRO_SECONDS_PER_MILLISECOND)
return
gesture_recognition_result = _build_recognition_result(output_packets)
timestamp = output_packets[_HAND_GESTURE_STREAM_NAME].timestamp
options.result_callback(gesture_recognition_result, image,
timestamp.value // _MICRO_SECONDS_PER_MILLISECOND)
task_info = _TaskInfo(
task_graph=_TASK_GRAPH_NAME,
input_streams=[
':'.join([_IMAGE_TAG, _IMAGE_IN_STREAM_NAME]),
':'.join([_NORM_RECT_TAG, _NORM_RECT_STREAM_NAME]),
],
output_streams=[
':'.join([_HAND_GESTURE_TAG, _HAND_GESTURE_STREAM_NAME]),
':'.join([_HANDEDNESS_TAG, _HANDEDNESS_STREAM_NAME]),
':'.join([_HAND_LANDMARKS_TAG,
_HAND_LANDMARKS_STREAM_NAME]), ':'.join([
_HAND_WORLD_LANDMARKS_TAG,
_HAND_WORLD_LANDMARKS_STREAM_NAME
]), ':'.join([_IMAGE_TAG, _IMAGE_OUT_STREAM_NAME])
],
task_options=options)
return cls(
task_info.generate_graph_config(
enable_flow_limiting=options.running_mode ==
_RunningMode.LIVE_STREAM), options.running_mode,
packets_callback if options.result_callback else None)
def recognize(
self,
image: image_module.Image,
image_processing_options: Optional[_ImageProcessingOptions] = None
) -> GestureRecognitionResult:
"""Performs hand gesture recognition on the given image.
Only use this method when the GestureRecognizer is created with the image
running mode.
The image can be of any size with format RGB or RGBA.
TODO: Describes how the input image will be preprocessed after the yuv
support is implemented.
Args:
image: MediaPipe Image.
image_processing_options: Options for image processing.
Returns:
The hand gesture recognition results.
Raises:
ValueError: If any of the input arguments is invalid.
RuntimeError: If gesture recognition failed to run.
"""
normalized_rect = self.convert_to_normalized_rect(
image_processing_options, roi_allowed=False)
output_packets = self._process_image_data({
_IMAGE_IN_STREAM_NAME:
packet_creator.create_image(image),
_NORM_RECT_STREAM_NAME:
packet_creator.create_proto(normalized_rect.to_pb2())
})
if output_packets[_HAND_GESTURE_STREAM_NAME].is_empty():
return GestureRecognitionResult([], [], [], [])
return _build_recognition_result(output_packets)
def recognize_for_video(
self,
image: image_module.Image,
timestamp_ms: int,
image_processing_options: Optional[_ImageProcessingOptions] = None
) -> GestureRecognitionResult:
"""Performs gesture recognition on the provided video frame.
Only use this method when the GestureRecognizer is created with the video
running mode.
Only use this method when the GestureRecognizer is created with the video
running mode. It's required to provide the video frame's timestamp (in
milliseconds) along with the video frame. The input timestamps should be
monotonically increasing for adjacent calls of this method.
Args:
image: MediaPipe Image.
timestamp_ms: The timestamp of the input video frame in milliseconds.
image_processing_options: Options for image processing.
Returns:
The hand gesture recognition results.
Raises:
ValueError: If any of the input arguments is invalid.
RuntimeError: If gesture recognition failed to run.
"""
normalized_rect = self.convert_to_normalized_rect(
image_processing_options, roi_allowed=False)
output_packets = self._process_video_data({
_IMAGE_IN_STREAM_NAME:
packet_creator.create_image(image).at(
timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND),
_NORM_RECT_STREAM_NAME:
packet_creator.create_proto(normalized_rect.to_pb2()).at(
timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND)
})
if output_packets[_HAND_GESTURE_STREAM_NAME].is_empty():
return GestureRecognitionResult([], [], [], [])
return _build_recognition_result(output_packets)
def recognize_async(
self,
image: image_module.Image,
timestamp_ms: int,
image_processing_options: Optional[_ImageProcessingOptions] = None
) -> None:
"""Sends live image data to perform gesture recognition.
The results will be available via the "result_callback" provided in the
GestureRecognizerOptions. Only use this method when the GestureRecognizer
is created with the live stream running mode.
Only use this method when the GestureRecognizer is created with the live
stream running mode. The input timestamps should be monotonically increasing
for adjacent calls of this method. This method will return immediately after
the input image is accepted. The results will be available via the
`result_callback` provided in the `GestureRecognizerOptions`. The
`recognize_async` method is designed to process live stream data such as
camera input. To lower the overall latency, gesture recognizer may drop the
input images if needed. In other words, it's not guaranteed to have output
per input image.
The `result_callback` provides:
- The hand gesture recognition results.
- The input image that the gesture recognizer runs on.
- The input timestamp in milliseconds.
Args:
image: MediaPipe Image.
timestamp_ms: The timestamp of the input image in milliseconds.
image_processing_options: Options for image processing.
Raises:
ValueError: If the current input timestamp is smaller than what the
gesture recognizer has already processed.
"""
normalized_rect = self.convert_to_normalized_rect(
image_processing_options, roi_allowed=False)
self._send_live_stream_data({
_IMAGE_IN_STREAM_NAME:
packet_creator.create_image(image).at(
timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND),
_NORM_RECT_STREAM_NAME:
packet_creator.create_proto(normalized_rect.to_pb2()).at(
timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND)
})

View File

@ -30,6 +30,7 @@ from mediapipe.tasks.python.core import base_options as base_options_module
from mediapipe.tasks.python.core import task_info as task_info_module
from mediapipe.tasks.python.core.optional_dependencies import doc_controls
from mediapipe.tasks.python.vision.core import base_vision_task_api
from mediapipe.tasks.python.vision.core import image_processing_options as image_processing_options_module
from mediapipe.tasks.python.vision.core import vision_task_running_mode
_NormalizedRect = rect.NormalizedRect
@ -37,6 +38,7 @@ _BaseOptions = base_options_module.BaseOptions
_ImageClassifierGraphOptionsProto = image_classifier_graph_options_pb2.ImageClassifierGraphOptions
_ClassifierOptions = classifier_options.ClassifierOptions
_RunningMode = vision_task_running_mode.VisionTaskRunningMode
_ImageProcessingOptions = image_processing_options_module.ImageProcessingOptions
_TaskInfo = task_info_module.TaskInfo
_CLASSIFICATION_RESULT_OUT_STREAM_NAME = 'classification_result_out'
@ -44,17 +46,12 @@ _CLASSIFICATION_RESULT_TAG = 'CLASSIFICATION_RESULT'
_IMAGE_IN_STREAM_NAME = 'image_in'
_IMAGE_OUT_STREAM_NAME = 'image_out'
_IMAGE_TAG = 'IMAGE'
_NORM_RECT_NAME = 'norm_rect_in'
_NORM_RECT_STREAM_NAME = 'norm_rect_in'
_NORM_RECT_TAG = 'NORM_RECT'
_TASK_GRAPH_NAME = 'mediapipe.tasks.vision.image_classifier.ImageClassifierGraph'
_MICRO_SECONDS_PER_MILLISECOND = 1000
def _build_full_image_norm_rect() -> _NormalizedRect:
# Builds a NormalizedRect covering the entire image.
return _NormalizedRect(x_center=0.5, y_center=0.5, width=1, height=1)
@dataclasses.dataclass
class ImageClassifierOptions:
"""Options for the image classifier task.
@ -156,7 +153,7 @@ class ImageClassifier(base_vision_task_api.BaseVisionTaskApi):
task_graph=_TASK_GRAPH_NAME,
input_streams=[
':'.join([_IMAGE_TAG, _IMAGE_IN_STREAM_NAME]),
':'.join([_NORM_RECT_TAG, _NORM_RECT_NAME]),
':'.join([_NORM_RECT_TAG, _NORM_RECT_STREAM_NAME]),
],
output_streams=[
':'.join([
@ -171,17 +168,16 @@ class ImageClassifier(base_vision_task_api.BaseVisionTaskApi):
_RunningMode.LIVE_STREAM), options.running_mode,
packets_callback if options.result_callback else None)
# TODO: Replace _NormalizedRect with ImageProcessingOption
def classify(
self,
image: image_module.Image,
roi: Optional[_NormalizedRect] = None
image_processing_options: Optional[_ImageProcessingOptions] = None
) -> classifications.ClassificationResult:
"""Performs image classification on the provided MediaPipe Image.
Args:
image: MediaPipe Image.
roi: The region of interest.
image_processing_options: Options for image processing.
Returns:
A classification result object that contains a list of classifications.
@ -190,10 +186,12 @@ class ImageClassifier(base_vision_task_api.BaseVisionTaskApi):
ValueError: If any of the input arguments is invalid.
RuntimeError: If image classification failed to run.
"""
norm_rect = roi if roi is not None else _build_full_image_norm_rect()
normalized_rect = self.convert_to_normalized_rect(image_processing_options)
output_packets = self._process_image_data({
_IMAGE_IN_STREAM_NAME: packet_creator.create_image(image),
_NORM_RECT_NAME: packet_creator.create_proto(norm_rect.to_pb2())
_IMAGE_IN_STREAM_NAME:
packet_creator.create_image(image),
_NORM_RECT_STREAM_NAME:
packet_creator.create_proto(normalized_rect.to_pb2())
})
classification_result_proto = classifications_pb2.ClassificationResult()
@ -210,7 +208,7 @@ class ImageClassifier(base_vision_task_api.BaseVisionTaskApi):
self,
image: image_module.Image,
timestamp_ms: int,
roi: Optional[_NormalizedRect] = None
image_processing_options: Optional[_ImageProcessingOptions] = None
) -> classifications.ClassificationResult:
"""Performs image classification on the provided video frames.
@ -222,7 +220,7 @@ class ImageClassifier(base_vision_task_api.BaseVisionTaskApi):
Args:
image: MediaPipe Image.
timestamp_ms: The timestamp of the input video frame in milliseconds.
roi: The region of interest.
image_processing_options: Options for image processing.
Returns:
A classification result object that contains a list of classifications.
@ -231,13 +229,13 @@ class ImageClassifier(base_vision_task_api.BaseVisionTaskApi):
ValueError: If any of the input arguments is invalid.
RuntimeError: If image classification failed to run.
"""
norm_rect = roi if roi is not None else _build_full_image_norm_rect()
normalized_rect = self.convert_to_normalized_rect(image_processing_options)
output_packets = self._process_video_data({
_IMAGE_IN_STREAM_NAME:
packet_creator.create_image(image).at(
timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND),
_NORM_RECT_NAME:
packet_creator.create_proto(norm_rect.to_pb2()).at(
_NORM_RECT_STREAM_NAME:
packet_creator.create_proto(normalized_rect.to_pb2()).at(
timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND)
})
@ -251,10 +249,12 @@ class ImageClassifier(base_vision_task_api.BaseVisionTaskApi):
for classification in classification_result_proto.classifications
])
def classify_async(self,
image: image_module.Image,
timestamp_ms: int,
roi: Optional[_NormalizedRect] = None) -> None:
def classify_async(
self,
image: image_module.Image,
timestamp_ms: int,
image_processing_options: Optional[_ImageProcessingOptions] = None
) -> None:
"""Sends live image data (an Image with a unique timestamp) to perform image classification.
Only use this method when the ImageClassifier is created with the live
@ -275,18 +275,18 @@ class ImageClassifier(base_vision_task_api.BaseVisionTaskApi):
Args:
image: MediaPipe Image.
timestamp_ms: The timestamp of the input image in milliseconds.
roi: The region of interest.
image_processing_options: Options for image processing.
Raises:
ValueError: If the current input timestamp is smaller than what the image
classifier has already processed.
"""
norm_rect = roi if roi is not None else _build_full_image_norm_rect()
normalized_rect = self.convert_to_normalized_rect(image_processing_options)
self._send_live_stream_data({
_IMAGE_IN_STREAM_NAME:
packet_creator.create_image(image).at(
timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND),
_NORM_RECT_NAME:
packet_creator.create_proto(norm_rect.to_pb2()).at(
_NORM_RECT_STREAM_NAME:
packet_creator.create_proto(normalized_rect.to_pb2()).at(
timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND)
})