Added files needed for the GestureRecognizer API implementation

This commit is contained in:
kinaryml 2022-10-24 06:08:27 -07:00
parent af051dcb62
commit 9a1a9d4c13
12 changed files with 1386 additions and 0 deletions

View File

@ -88,6 +88,7 @@ cc_library(
name = "builtin_task_graphs",
deps = [
"//mediapipe/tasks/cc/vision/object_detector:object_detector_graph",
"//mediapipe/tasks/cc/vision/gesture_recognizer:gesture_recognizer_graph",
],
)

View File

@ -27,6 +27,43 @@ py_library(
],
)
py_library(
name = "rect",
srcs = ["rect.py"],
deps = [
"//mediapipe/framework/formats:rect_py_pb2",
"//mediapipe/tasks/python/core:optional_dependencies",
],
)
py_library(
name = "classification",
srcs = ["classification.py"],
deps = [
"//mediapipe/framework/formats:classification_py_pb2",
"//mediapipe/tasks/python/core:optional_dependencies",
],
)
py_library(
name = "landmark",
srcs = ["landmark.py"],
deps = [
"//mediapipe/framework/formats:landmark_py_pb2",
"//mediapipe/tasks/python/core:optional_dependencies",
],
)
py_library(
name = "gesture",
srcs = ["gesture.py"],
deps = [
":classification",
":landmark",
"//mediapipe/tasks/python/core:optional_dependencies",
],
)
py_library(
name = "category",
srcs = ["category.py"],

View File

@ -0,0 +1,128 @@
# Copyright 2022 The MediaPipe Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Classification data class."""
import dataclasses
from typing import Any, List
from mediapipe.framework.formats import classification_pb2
from mediapipe.tasks.python.core.optional_dependencies import doc_controls
_ClassificationProto = classification_pb2.Classification
_ClassificationListProto = classification_pb2.ClassificationList
_ClassificationListCollectionProto = classification_pb2.ClassificationListCollection
@dataclasses.dataclass
class Classification:
"""A classification.
Attributes:
index: The index of the class in the corresponding label map.
score: The probability score for this class.
label_name: Label or name of the class.
display_name: Optional human-readable string for display purposes.
"""
index: int
score: float
label_name: str
display_name: str
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _ClassificationProto:
"""Generates a Classification protobuf object."""
return _ClassificationProto(
index=self.index,
score=self.score,
label_name=self.label_name,
display_name=self.display_name)
@classmethod
@doc_controls.do_not_generate_docs
def create_from_pb2(cls, pb2_obj: _ClassificationProto) -> 'Classification':
"""Creates a `Classification` object from the given protobuf object."""
return Classification(
index=pb2_obj.index,
score=pb2_obj.score,
label_name=pb2_obj.label_name,
display_name=pb2_obj.display_name)
def __eq__(self, other: Any) -> bool:
"""Checks if this object is equal to the given object.
Args:
other: The object to be compared with.
Returns:
True if the objects are equal.
"""
if not isinstance(other, Classification):
return False
return self.to_pb2().__eq__(other.to_pb2())
@dataclasses.dataclass
class ClassificationList:
"""Represents the classifications for a given classifier.
Attributes:
classification : A list of `Classification` objects.
tensor_index: Optional index of the tensor that produced these
classifications.
tensor_name: Optional name of the tensor that produced these
classifications tensor metadata name.
"""
classifications: List[Classification]
tensor_index: int
tensor_name: str
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _ClassificationListProto:
"""Generates a ClassificationList protobuf object."""
return _ClassificationListProto(
classification=[
classification.to_pb2()
for classification in self.classifications
],
tensor_index=self.tensor_index,
tensor_name=self.tensor_name)
@classmethod
@doc_controls.do_not_generate_docs
def create_from_pb2(
cls,
pb2_obj: _ClassificationListProto
) -> 'ClassificationList':
"""Creates a `ClassificationList` object from the given protobuf object."""
return ClassificationList(
classifications=[
Classification.create_from_pb2(classification)
for classification in pb2_obj.classification
],
tensor_index=pb2_obj.tensor_index,
tensor_name=pb2_obj.tensor_name)
def __eq__(self, other: Any) -> bool:
"""Checks if this object is equal to the given object.
Args:
other: The object to be compared with.
Returns:
True if the objects are equal.
"""
if not isinstance(other, ClassificationList):
return False
return self.to_pb2().__eq__(other.to_pb2())

View File

@ -0,0 +1,138 @@
# Copyright 2022 The MediaPipe Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Gesture data class."""
import dataclasses
from typing import Any, List
from mediapipe.tasks.python.components.containers import classification
from mediapipe.tasks.python.components.containers import landmark
from mediapipe.tasks.python.core.optional_dependencies import doc_controls
@dataclasses.dataclass
class GestureRecognitionResult:
""" The gesture recognition result from GestureRecognizer, where each vector
element represents a single hand detected in the image.
Attributes:
gestures: Recognized hand gestures with sorted order such that the
winning label is the first item in the list.
handedness: Classification of handedness.
hand_landmarks: Detected hand landmarks in normalized image coordinates.
hand_world_landmarks: Detected hand landmarks in world coordinates.
"""
gestures: List[classification.ClassificationList]
handedness: List[classification.ClassificationList]
hand_landmarks: List[landmark.NormalizedLandmarkList]
hand_world_landmarks: List[landmark.LandmarkList]
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _DetectionProto:
"""Generates a Detection protobuf object."""
labels = []
label_ids = []
scores = []
display_names = []
for category in self.categories:
scores.append(category.score)
if category.index:
label_ids.append(category.index)
if category.category_name:
labels.append(category.category_name)
if category.display_name:
display_names.append(category.display_name)
return _DetectionProto(
label=labels,
label_id=label_ids,
score=scores,
display_name=display_names,
location_data=_LocationDataProto(
format=_LocationDataProto.Format.BOUNDING_BOX,
bounding_box=self.bounding_box.to_pb2()))
@classmethod
@doc_controls.do_not_generate_docs
def create_from_pb2(cls, pb2_obj: _DetectionProto) -> 'Detection':
"""Creates a `Detection` object from the given protobuf object."""
categories = []
for idx, score in enumerate(pb2_obj.score):
categories.append(
category_module.Category(
score=score,
index=pb2_obj.label_id[idx]
if idx < len(pb2_obj.label_id) else None,
category_name=pb2_obj.label[idx]
if idx < len(pb2_obj.label) else None,
display_name=pb2_obj.display_name[idx]
if idx < len(pb2_obj.display_name) else None))
return Detection(
bounding_box=bounding_box_module.BoundingBox.create_from_pb2(
pb2_obj.location_data.bounding_box),
categories=categories)
def __eq__(self, other: Any) -> bool:
"""Checks if this object is equal to the given object.
Args:
other: The object to be compared with.
Returns:
True if the objects are equal.
"""
if not isinstance(other, Detection):
return False
return self.to_pb2().__eq__(other.to_pb2())
@dataclasses.dataclass
class DetectionResult:
"""Represents the list of detected objects.
Attributes:
detections: A list of `Detection` objects.
"""
detections: List[Detection]
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _DetectionListProto:
"""Generates a DetectionList protobuf object."""
return _DetectionListProto(
detection=[detection.to_pb2() for detection in self.detections])
@classmethod
@doc_controls.do_not_generate_docs
def create_from_pb2(cls, pb2_obj: _DetectionListProto) -> 'DetectionResult':
"""Creates a `DetectionResult` object from the given protobuf object."""
return DetectionResult(detections=[
Detection.create_from_pb2(detection) for detection in pb2_obj.detection
])
def __eq__(self, other: Any) -> bool:
"""Checks if this object is equal to the given object.
Args:
other: The object to be compared with.
Returns:
True if the objects are equal.
"""
if not isinstance(other, DetectionResult):
return False
return self.to_pb2().__eq__(other.to_pb2())

View File

@ -0,0 +1,250 @@
# Copyright 2022 The MediaPipe Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Landmark data class."""
import dataclasses
from typing import Any, Optional, List
from mediapipe.framework.formats import landmark_pb2
from mediapipe.tasks.python.core.optional_dependencies import doc_controls
_LandmarkProto = landmark_pb2.Landmark
_LandmarkListProto = landmark_pb2.LandmarkList
_NormalizedLandmarkProto = landmark_pb2.NormalizedLandmark
_NormalizedLandmarkListProto = landmark_pb2.NormalizedLandmarkList
@dataclasses.dataclass
class Landmark:
"""A landmark that can have 1 to 3 dimensions.
Use x for 1D points, (x, y) for 2D points and (x, y, z) for 3D points.
Attributes:
x: The x coordinate of the 3D point.
y: The y coordinate of the 3D point.
z: The z coordinate of the 3D point.
visibility: Landmark visibility. Should stay unset if not supported.
Float score of whether landmark is visible or occluded by other objects.
Landmark considered as invisible also if it is not present on the screen
(out of scene bounds). Depending on the model, visibility value is either
a sigmoid or an argument of sigmoid.
presence: Landmark presence. Should stay unset if not supported.
Float score of whether landmark is present on the scene (located within
scene bounds). Depending on the model, presence value is either a result
of sigmoid or an argument of sigmoid function to get landmark presence
probability.
"""
x: Optional[float] = None
y: Optional[float] = None
z: Optional[float] = None
visibility: Optional[float] = None
presence: Optional[float] = None
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _LandmarkProto:
"""Generates a Landmark protobuf object."""
return _LandmarkProto(
x=self.x,
y=self.y,
z=self.z,
visibility=self.visibility,
presence=self.presence)
@classmethod
@doc_controls.do_not_generate_docs
def create_from_pb2(cls, pb2_obj: _LandmarkProto) -> 'Landmark':
"""Creates a `Landmark` object from the given protobuf object."""
return Landmark(
x=pb2_obj.x,
y=pb2_obj.y,
z=pb2_obj.z,
visibility=pb2_obj.visibility,
presence=pb2_obj.presence)
def __eq__(self, other: Any) -> bool:
"""Checks if this object is equal to the given object.
Args:
other: The object to be compared with.
Returns:
True if the objects are equal.
"""
if not isinstance(other, Landmark):
return False
return self.to_pb2().__eq__(other.to_pb2())
@dataclasses.dataclass
class LandmarkList:
"""Represents the list of landmarks.
Attributes:
landmarks : A list of `Landmark` objects.
"""
landmarks: List[Landmark]
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _LandmarkListProto:
"""Generates a LandmarkList protobuf object."""
return _LandmarkListProto(
landmark=[
landmark.to_pb2()
for landmark in self.landmarks
]
)
@classmethod
@doc_controls.do_not_generate_docs
def create_from_pb2(
cls,
pb2_obj: _LandmarkListProto
) -> 'LandmarkList':
"""Creates a `LandmarkList` object from the given protobuf object."""
return LandmarkList(
landmarks=[
Landmark.create_from_pb2(landmark)
for landmark in pb2_obj.landmark
]
)
def __eq__(self, other: Any) -> bool:
"""Checks if this object is equal to the given object.
Args:
other: The object to be compared with.
Returns:
True if the objects are equal.
"""
if not isinstance(other, LandmarkList):
return False
return self.to_pb2().__eq__(other.to_pb2())
@dataclasses.dataclass
class NormalizedLandmark:
"""A normalized version of above Landmark proto.
All coordinates should be within [0, 1].
Attributes:
x: The normalized x coordinate of the 3D point.
y: The normalized y coordinate of the 3D point.
z: The normalized z coordinate of the 3D point.
visibility: Landmark visibility. Should stay unset if not supported.
Float score of whether landmark is visible or occluded by other objects.
Landmark considered as invisible also if it is not present on the screen
(out of scene bounds). Depending on the model, visibility value is either
a sigmoid or an argument of sigmoid.
presence: Landmark presence. Should stay unset if not supported.
Float score of whether landmark is present on the scene (located within
scene bounds). Depending on the model, presence value is either a result
of sigmoid or an argument of sigmoid function to get landmark presence
probability.
"""
x: Optional[float] = None
y: Optional[float] = None
z: Optional[float] = None
visibility: Optional[float] = None
presence: Optional[float] = None
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _NormalizedLandmarkProto:
"""Generates a NormalizedLandmark protobuf object."""
return _NormalizedLandmarkProto(
x=self.x,
y=self.y,
z=self.z,
visibility=self.visibility,
presence=self.presence)
@classmethod
@doc_controls.do_not_generate_docs
def create_from_pb2(
cls,
pb2_obj: _NormalizedLandmarkProto
) -> 'NormalizedLandmark':
"""Creates a `NormalizedLandmark` object from the given protobuf object."""
return NormalizedLandmark(
x=pb2_obj.x,
y=pb2_obj.y,
z=pb2_obj.z,
visibility=pb2_obj.visibility,
presence=pb2_obj.presence)
def __eq__(self, other: Any) -> bool:
"""Checks if this object is equal to the given object.
Args:
other: The object to be compared with.
Returns:
True if the objects are equal.
"""
if not isinstance(other, NormalizedLandmark):
return False
return self.to_pb2().__eq__(other.to_pb2())
@dataclasses.dataclass
class NormalizedLandmarkList:
"""Represents the list of normalized landmarks.
Attributes:
landmarks : A list of `Landmark` objects.
"""
landmarks: List[NormalizedLandmark]
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _NormalizedLandmarkListProto:
"""Generates a NormalizedLandmarkList protobuf object."""
return _NormalizedLandmarkListProto(
landmark=[
landmark.to_pb2()
for landmark in self.landmarks
]
)
@classmethod
@doc_controls.do_not_generate_docs
def create_from_pb2(
cls,
pb2_obj: _NormalizedLandmarkListProto
) -> 'NormalizedLandmarkList':
"""Creates a `NormalizedLandmarkList` object from the given protobuf object."""
return NormalizedLandmarkList(
landmarks=[
NormalizedLandmark.create_from_pb2(landmark)
for landmark in pb2_obj.landmark
]
)
def __eq__(self, other: Any) -> bool:
"""Checks if this object is equal to the given object.
Args:
other: The object to be compared with.
Returns:
True if the objects are equal.
"""
if not isinstance(other, NormalizedLandmarkList):
return False
return self.to_pb2().__eq__(other.to_pb2())

View File

@ -0,0 +1,141 @@
# Copyright 2022 The MediaPipe Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Rect data class."""
import dataclasses
from typing import Any, Optional
from mediapipe.framework.formats import rect_pb2
from mediapipe.tasks.python.core.optional_dependencies import doc_controls
_RectProto = rect_pb2.Rect
_NormalizedRectProto = rect_pb2.NormalizedRect
@dataclasses.dataclass
class Rect:
"""A rectangle with rotation in image coordinates.
Attributes:
x_center : The X coordinate of the top-left corner, in pixels.
y_center : The Y coordinate of the top-left corner, in pixels.
width: The width of the rectangle, in pixels.
height: The height of the rectangle, in pixels.
rotation: Rotation angle is clockwise in radians.
rect_id: Optional unique id to help associate different rectangles to each
other.
"""
x_center: int
y_center: int
width: int
height: int
rotation: Optional[float] = 0.0
rect_id: Optional[int] = None
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _RectProto:
"""Generates a Rect protobuf object."""
return _RectProto(
x_center=self.x_center,
y_center=self.y_center,
width=self.width,
height=self.height,
)
@classmethod
@doc_controls.do_not_generate_docs
def create_from_pb2(cls, pb2_obj: _RectProto) -> 'Rect':
"""Creates a `Rect` object from the given protobuf object."""
return Rect(
x_center=pb2_obj.x_center,
y_center=pb2_obj.y_center,
width=pb2_obj.width,
height=pb2_obj.height)
def __eq__(self, other: Any) -> bool:
"""Checks if this object is equal to the given object.
Args:
other: The object to be compared with.
Returns:
True if the objects are equal.
"""
if not isinstance(other, Rect):
return False
return self.to_pb2().__eq__(other.to_pb2())
@dataclasses.dataclass
class NormalizedRect:
"""A rectangle with rotation in normalized coordinates. The values of box
center location and size are within [0, 1].
Attributes:
x_center : The X normalized coordinate of the top-left corner.
y_center : The Y normalized coordinate of the top-left corner.
width: The width of the rectangle.
height: The height of the rectangle.
rotation: Rotation angle is clockwise in radians.
rect_id: Optional unique id to help associate different rectangles to each
other.
"""
x_center: float
y_center: float
width: float
height: float
rotation: Optional[float] = 0.0
rect_id: Optional[int] = None
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _NormalizedRectProto:
"""Generates a NormalizedRect protobuf object."""
return _NormalizedRectProto(
x_center=self.x_center,
y_center=self.y_center,
width=self.width,
height=self.height,
rotation=self.rotation,
rect_id=self.rect_id
)
@classmethod
@doc_controls.do_not_generate_docs
def create_from_pb2(cls, pb2_obj: _NormalizedRectProto) -> 'NormalizedRect':
"""Creates a `NormalizedRect` object from the given protobuf object."""
return NormalizedRect(
x_center=pb2_obj.x_center,
y_center=pb2_obj.y_center,
width=pb2_obj.width,
height=pb2_obj.height,
rotation=pb2_obj.rotation,
rect_id=pb2_obj.rect_id
)
def __eq__(self, other: Any) -> bool:
"""Checks if this object is equal to the given object.
Args:
other: The object to be compared with.
Returns:
True if the objects are equal.
"""
if not isinstance(other, NormalizedRect):
return False
return self.to_pb2().__eq__(other.to_pb2())

View File

@ -0,0 +1,28 @@
# Copyright 2022 The MediaPipe Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Placeholder for internal Python strict library compatibility macro.
package(default_visibility = ["//mediapipe/tasks:internal"])
licenses(["notice"])
py_library(
name = "classifier_options",
srcs = ["classifier_options.py"],
deps = [
"//mediapipe/tasks/cc/components/processors/proto:classifier_options_py_pb2",
"//mediapipe/tasks/python/core:optional_dependencies",
],
)

View File

@ -0,0 +1,92 @@
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Classifier options data class."""
import dataclasses
from typing import Any, List, Optional
from mediapipe.tasks.cc.components.processors.proto import classifier_options_pb2
from mediapipe.tasks.python.core.optional_dependencies import doc_controls
_ClassifierOptionsProto = classifier_options_pb2.ClassifierOptions
@dataclasses.dataclass
class ClassifierOptions:
"""Options for classification processor.
Attributes:
display_names_locale: The locale to use for display names specified through
the TFLite Model Metadata.
max_results: The maximum number of top-scored classification results to
return.
score_threshold: Overrides the ones provided in the model metadata. Results
below this value are rejected.
category_allowlist: Allowlist of category names. If non-empty, detection
results whose category name is not in this set will be filtered out.
Duplicate or unknown category names are ignored. Mutually exclusive with
`category_denylist`.
category_denylist: Denylist of category names. If non-empty, detection
results whose category name is in this set will be filtered out. Duplicate
or unknown category names are ignored. Mutually exclusive with
`category_allowlist`.
"""
display_names_locale: Optional[str] = None
max_results: Optional[int] = None
score_threshold: Optional[float] = None
category_allowlist: Optional[List[str]] = None
category_denylist: Optional[List[str]] = None
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _ClassifierOptionsProto:
"""Generates a ClassifierOptions protobuf object."""
return _ClassifierOptionsProto(
score_threshold=self.score_threshold,
category_allowlist=self.category_allowlist,
category_denylist=self.category_denylist,
display_names_locale=self.display_names_locale,
max_results=self.max_results)
@classmethod
@doc_controls.do_not_generate_docs
def create_from_pb2(
cls,
pb2_obj: _ClassifierOptionsProto
) -> 'ClassifierOptions':
"""Creates a `ClassifierOptions` object from the given protobuf object."""
return ClassifierOptions(
score_threshold=pb2_obj.score_threshold,
category_allowlist=[
str(name) for name in pb2_obj.class_name_allowlist
],
category_denylist=[
str(name) for name in pb2_obj.class_name_denylist
],
display_names_locale=pb2_obj.display_names_locale,
max_results=pb2_obj.max_results)
def __eq__(self, other: Any) -> bool:
"""Checks if this object is equal to the given object.
Args:
other: The object to be compared with.
Returns:
True if the objects are equal.
"""
if not isinstance(other, ClassifierOptions):
return False
return self.to_pb2().__eq__(other.to_pb2())

View File

@ -36,3 +36,22 @@ py_test(
"//mediapipe/tasks/python/vision/core:vision_task_running_mode",
],
)
py_test(
name = "gesture_recognizer_test",
srcs = ["gesture_recognizer_test.py"],
data = [
"//mediapipe/tasks/testdata/vision:test_images",
"//mediapipe/tasks/testdata/vision:test_models",
],
deps = [
"//mediapipe/python:_framework_bindings",
"//mediapipe/tasks/python/components/containers:classification",
"//mediapipe/tasks/python/components/containers:landmark",
"//mediapipe/tasks/python/components/containers:rect",
"//mediapipe/tasks/python/core:base_options",
"//mediapipe/tasks/python/test:test_utils",
"//mediapipe/tasks/python/vision:gesture_recognizer",
"//mediapipe/tasks/python/vision/core:vision_task_running_mode",
],
)

View File

@ -0,0 +1,91 @@
# Copyright 2022 The MediaPipe Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for gesture recognizer."""
import enum
from absl.testing import absltest
from absl.testing import parameterized
from mediapipe.python._framework_bindings import image as image_module
from mediapipe.tasks.python.components.containers import rect as rect_module
from mediapipe.tasks.python.components.containers import classification as classification_module
from mediapipe.tasks.python.components.containers import landmark as landmark_module
from mediapipe.tasks.python.core import base_options as base_options_module
from mediapipe.tasks.python.test import test_utils
from mediapipe.tasks.python.vision import gesture_recognizer
from mediapipe.tasks.python.vision.core import vision_task_running_mode as running_mode_module
_BaseOptions = base_options_module.BaseOptions
_NormalizedRect = rect_module.NormalizedRect
_ClassificationList = classification_module.ClassificationList
_LandmarkList = landmark_module.LandmarkList
_NormalizedLandmarkList = landmark_module.NormalizedLandmarkList
_Image = image_module.Image
_GestureRecognizer = gesture_recognizer.GestureRecognizer
_GestureRecognizerOptions = gesture_recognizer.GestureRecognizerOptions
_GestureRecognitionResult = gesture_recognizer.GestureRecognitionResult
_RUNNING_MODE = running_mode_module.VisionTaskRunningMode
_GESTURE_RECOGNIZER_MODEL_FILE = 'gesture_recognizer.task'
_IMAGE_FILE = 'right_hands.jpg'
_EXPECTED_DETECTION_RESULT = _GestureRecognitionResult([], [], [], [])
class ModelFileType(enum.Enum):
FILE_CONTENT = 1
FILE_NAME = 2
class GestureRecognizerTest(parameterized.TestCase):
def setUp(self):
super().setUp()
self.test_image = _Image.create_from_file(
test_utils.get_test_data_path(_IMAGE_FILE))
self.gesture_recognizer_model_path = test_utils.get_test_data_path(
_GESTURE_RECOGNIZER_MODEL_FILE)
@parameterized.parameters(
(ModelFileType.FILE_NAME, _EXPECTED_DETECTION_RESULT),
(ModelFileType.FILE_CONTENT, _EXPECTED_DETECTION_RESULT))
def test_recognize(self, model_file_type, expected_recognition_result):
# Creates gesture recognizer.
if model_file_type is ModelFileType.FILE_NAME:
gesture_recognizer_base_options = _BaseOptions(
model_asset_path=self.gesture_recognizer_model_path)
elif model_file_type is ModelFileType.FILE_CONTENT:
with open(self.gesture_recognizer_model_path, 'rb') as f:
model_content = f.read()
gesture_recognizer_base_options = _BaseOptions(
model_asset_buffer=model_content)
else:
# Should never happen
raise ValueError('model_file_type is invalid.')
options = _GestureRecognizerOptions(
base_options=gesture_recognizer_base_options)
recognizer = _GestureRecognizer.create_from_options(options)
# Performs hand gesture recognition on the input.
recognition_result = recognizer.recognize(self.test_image)
# Comparing results.
self.assertEqual(recognition_result, expected_recognition_result)
# Closes the gesture recognizer explicitly when the detector is not used in
# a context.
recognizer.close()
if __name__ == '__main__':
absltest.main()

View File

@ -36,3 +36,30 @@ py_library(
"//mediapipe/tasks/python/vision/core:vision_task_running_mode",
],
)
py_library(
name = "gesture_recognizer",
srcs = [
"gesture_recognizer.py",
],
deps = [
"//mediapipe/python:_framework_bindings",
"//mediapipe/python:packet_creator",
"//mediapipe/python:packet_getter",
"//mediapipe/tasks/cc/vision/gesture_recognizer/proto:gesture_classifier_graph_options_py_pb2",
"//mediapipe/tasks/cc/vision/gesture_recognizer/proto:gesture_recognizer_graph_options_py_pb2",
"//mediapipe/tasks/cc/vision/gesture_recognizer/proto:hand_gesture_recognizer_graph_options_py_pb2",
"//mediapipe/tasks/cc/vision/hand_detector/proto:hand_detector_graph_options_py_pb2",
"//mediapipe/tasks/cc/vision/hand_landmarker/proto:hand_landmarker_graph_options_py_pb2",
"//mediapipe/tasks/cc/vision/hand_landmarker/proto:hand_landmarks_detector_graph_options_py_pb2",
"//mediapipe/tasks/python/components/containers:rect",
"//mediapipe/tasks/python/components/containers:classification",
"//mediapipe/tasks/python/components/containers:landmark",
"//mediapipe/tasks/python/components/processors:classifier_options",
"//mediapipe/tasks/python/core:base_options",
"//mediapipe/tasks/python/core:optional_dependencies",
"//mediapipe/tasks/python/core:task_info",
"//mediapipe/tasks/python/vision/core:base_vision_task_api",
"//mediapipe/tasks/python/vision/core:vision_task_running_mode",
],
)

View File

@ -0,0 +1,434 @@
# Copyright 2022 The MediaPipe Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""MediaPipe gesture recognizer task."""
import dataclasses
from typing import Callable, Mapping, Optional, List
from mediapipe.python import packet_creator
from mediapipe.python import packet_getter
from mediapipe.python._framework_bindings import image as image_module
from mediapipe.python._framework_bindings import packet as packet_module
from mediapipe.python._framework_bindings import task_runner as task_runner_module
from mediapipe.tasks.cc.vision.gesture_recognizer.proto import gesture_classifier_graph_options_pb2
from mediapipe.tasks.cc.vision.gesture_recognizer.proto import gesture_recognizer_graph_options_pb2
from mediapipe.tasks.cc.vision.gesture_recognizer.proto import hand_gesture_recognizer_graph_options_pb2
from mediapipe.tasks.cc.vision.hand_detector.proto import hand_detector_graph_options_pb2
from mediapipe.tasks.cc.vision.hand_landmarker.proto import hand_landmarker_graph_options_pb2
from mediapipe.tasks.cc.vision.hand_landmarker.proto import hand_landmarks_detector_graph_options_pb2
from mediapipe.tasks.python.components.containers import rect as rect_module
from mediapipe.tasks.python.components.containers import classification as classification_module
from mediapipe.tasks.python.components.containers import landmark as landmark_module
from mediapipe.tasks.python.components.processors import classifier_options
from mediapipe.tasks.python.core import base_options as base_options_module
from mediapipe.tasks.python.core import task_info as task_info_module
from mediapipe.tasks.python.core.optional_dependencies import doc_controls
from mediapipe.tasks.python.vision.core import base_vision_task_api
from mediapipe.tasks.python.vision.core import vision_task_running_mode as running_mode_module
_NormalizedRect = rect_module.NormalizedRect
_BaseOptions = base_options_module.BaseOptions
_GestureClassifierGraphOptionsProto = gesture_classifier_graph_options_pb2.GestureClassifierGraphOptions
_GestureRecognizerGraphOptionsProto = gesture_recognizer_graph_options_pb2.GestureRecognizerGraphOptions
_HandGestureRecognizerGraphOptionsProto = hand_gesture_recognizer_graph_options_pb2.HandGestureRecognizerGraphOptions
_HandDetectorGraphOptionsProto = hand_detector_graph_options_pb2.HandDetectorGraphOptions
_HandLandmarkerGraphOptionsProto = hand_landmarker_graph_options_pb2.HandLandmarkerGraphOptions
_HandLandmarksDetectorGraphOptionsProto = hand_landmarks_detector_graph_options_pb2.HandLandmarksDetectorGraphOptions
_ClassifierOptions = classifier_options.ClassifierOptions
_RunningMode = running_mode_module.VisionTaskRunningMode
_TaskInfo = task_info_module.TaskInfo
_TaskRunner = task_runner_module.TaskRunner
_IMAGE_IN_STREAM_NAME = 'image_in'
_IMAGE_OUT_STREAM_NAME = 'image_out'
_IMAGE_TAG = 'IMAGE'
_NORM_RECT_STREAM_NAME = 'norm_rect_in'
_NORM_RECT_TAG = 'NORM_RECT'
_HAND_GESTURE_STREAM_NAME = 'hand_gestures'
_HAND_GESTURE_TAG = 'HAND_GESTURES'
_HANDEDNESS_STREAM_NAME = 'handedness'
_HANDEDNESS_TAG = 'HANDEDNESS'
_HAND_LANDMARKS_STREAM_NAME = 'landmarks'
_HAND_LANDMARKS_TAG = 'LANDMARKS'
_HAND_WORLD_LANDMARKS_STREAM_NAME = 'world_landmarks'
_HAND_WORLD_LANDMARKS_TAG = 'WORLD_LANDMARKS'
_TASK_GRAPH_NAME = 'mediapipe.tasks.vision.gesture_recognizer.GestureRecognizerGraph'
_MICRO_SECONDS_PER_MILLISECOND = 1000
def _build_full_image_norm_rect() -> _NormalizedRect:
# Builds a NormalizedRect covering the entire image.
return _NormalizedRect(x_center=0.5, y_center=0.5, width=1, height=1)
@dataclasses.dataclass
class GestureRecognitionResult:
"""The gesture recognition result from GestureRecognizer, where each vector
element represents a single hand detected in the image.
Attributes:
gestures: Recognized hand gestures with sorted order such that the
winning label is the first item in the list.
handedness: Classification of handedness.
hand_landmarks: Detected hand landmarks in normalized image coordinates.
hand_world_landmarks: Detected hand landmarks in world coordinates.
"""
gestures: List[classification_module.ClassificationList]
handedness: List[classification_module.ClassificationList]
hand_landmarks: List[landmark_module.NormalizedLandmarkList]
hand_world_landmarks: List[landmark_module.LandmarkList]
@dataclasses.dataclass
class GestureRecognizerOptions:
"""Options for the gesture recognizer task.
Attributes:
base_options: Base options for the hand gesture recognizer task.
running_mode: The running mode of the task. Default to the image mode.
Gesture recognizer task has three running modes:
1) The image mode for recognizing hand gestures on single image inputs.
2) The video mode for recognizing hand gestures on the decoded frames of a
video.
3) The live stream mode for recognizing hand gestures on a live stream of
input data, such as from camera.
num_hands: The maximum number of hands can be detected by the recognizer.
min_hand_detection_confidence: The minimum confidence score for the hand
detection to be considered successful.
min_hand_presence_confidence: The minimum confidence score of hand presence
score in the hand landmark detection.
min_tracking_confidence: The minimum confidence score for the hand tracking
to be considered successful.
min_gesture_confidence: The minimum confidence score for the gestures to be
considered successful. If < 0, the gesture confidence thresholds in the
model metadata are used.
TODO: Note this option is subject to change, after scoring merging
calculator is implemented.
result_callback: The user-defined result callback for processing live stream
data. The result callback should only be specified when the running mode
is set to the live stream mode.
"""
base_options: _BaseOptions
running_mode: _RunningMode = _RunningMode.IMAGE
num_hands: Optional[int] = 1
min_hand_detection_confidence: Optional[int] = 0.5
min_hand_presence_confidence: Optional[int] = 0.5
min_tracking_confidence: Optional[int] = 0.5
min_gesture_confidence: Optional[int] = -1
result_callback: Optional[
Callable[[GestureRecognitionResult, image_module.Image,
int], None]] = None
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _GestureRecognizerGraphOptionsProto:
"""Generates an GestureRecognizerOptions protobuf object."""
base_options_proto = self.base_options.to_pb2()
base_options_proto.use_stream_mode = False if self.running_mode == _RunningMode.IMAGE else True
# hand_landmark_detector_base_options_proto = self.hand_landmark_detector_base_options.to_pb2()
# hand_landmark_detector_base_options_proto.use_stream_mode = False if self.running_mode == _RunningMode.IMAGE else True
# Configure hand detector options.
hand_detector_options_proto = _HandDetectorGraphOptionsProto(
num_hands=self.num_hands,
min_detection_confidence=self.min_hand_detection_confidence)
# Configure hand landmarker options.
hand_landmarks_detector_options_proto = _HandLandmarksDetectorGraphOptionsProto(
min_detection_confidence=self.min_hand_presence_confidence)
hand_landmarker_options_proto = _HandLandmarkerGraphOptionsProto(
hand_detector_graph_options=hand_detector_options_proto,
hand_landmarks_detector_graph_options=hand_landmarks_detector_options_proto,
min_tracking_confidence=self.min_tracking_confidence)
# Configure hand gesture recognizer options.
hand_gesture_recognizer_options_proto = _HandGestureRecognizerGraphOptionsProto()
if self.min_gesture_confidence >= 0:
classifier_options = _ClassifierOptions(
score_threshold=self.min_gesture_confidence)
hand_gesture_recognizer_options_proto.canned_gesture_classifier_graph_options = \
_GestureClassifierGraphOptionsProto(
classifier_options=classifier_options.to_pb2())
return _GestureRecognizerGraphOptionsProto(
base_options=base_options_proto,
hand_landmarker_graph_options=hand_landmarker_options_proto,
hand_gesture_recognizer_graph_options=hand_gesture_recognizer_options_proto
)
class GestureRecognizer(base_vision_task_api.BaseVisionTaskApi):
"""Class that performs gesture recognition on images."""
@classmethod
def create_from_model_path(cls, model_path: str) -> 'GestureRecognizer':
"""Creates an `GestureRecognizer` object from a TensorFlow Lite model and
the default `GestureRecognizerOptions`.
Note that the created `GestureRecognizer` instance is in image mode, for
recognizing hand gestures on single image inputs.
Args:
model_path: Path to the model.
Returns:
`GestureRecognizer` object that's created from the model file and the
default `GestureRecognizerOptions`.
Raises:
ValueError: If failed to create `GestureRecognizer` object from the
provided file such as invalid file path.
RuntimeError: If other types of error occurred.
"""
base_options = _BaseOptions(model_asset_path=model_path)
options = GestureRecognizerOptions(
base_options=base_options, running_mode=_RunningMode.IMAGE)
return cls.create_from_options(options)
@classmethod
def create_from_options(
cls,
options: GestureRecognizerOptions
) -> 'GestureRecognizer':
"""Creates the `GestureRecognizer` object from gesture recognizer options.
Args:
options: Options for the gesture recognizer task.
Returns:
`GestureRecognizer` object that's created from `options`.
Raises:
ValueError: If failed to create `GestureRecognizer` object from
`GestureRecognizerOptions` such as missing the model.
RuntimeError: If other types of error occurred.
"""
def packets_callback(output_packets: Mapping[str, packet_module.Packet]):
if output_packets[_IMAGE_OUT_STREAM_NAME].is_empty():
return
image = packet_getter.get_image(output_packets[_IMAGE_OUT_STREAM_NAME])
if output_packets[_HAND_GESTURE_STREAM_NAME].is_empty():
empty_packet = output_packets[_HAND_GESTURE_STREAM_NAME]
options.result_callback(
GestureRecognitionResult([], [], [], []), image,
empty_packet.timestamp.value // _MICRO_SECONDS_PER_MILLISECOND)
return
gestures_proto_list = packet_getter.get_proto_list(
output_packets[_HAND_GESTURE_STREAM_NAME])
handedness_proto_list = packet_getter.get_proto_list(
output_packets[_HANDEDNESS_STREAM_NAME])
hand_landmarks_proto_list = packet_getter.get_proto_list(
output_packets[_HAND_LANDMARKS_STREAM_NAME])
hand_world_landmarks_proto_list = packet_getter.get_proto_list(
output_packets[_HAND_WORLD_LANDMARKS_STREAM_NAME])
gesture_recognition_result = GestureRecognitionResult(
[
classification_module.ClassificationList.create_from_pb2(gestures)
for gestures in gestures_proto_list
], [
classification_module.ClassificationList.create_from_pb2(handedness)
for handedness in handedness_proto_list
], [
landmark_module.NormalizedLandmarkList.create_from_pb2(hand_landmarks)
for hand_landmarks in hand_landmarks_proto_list
], [
landmark_module.LandmarkList.create_from_pb2(hand_world_landmarks)
for hand_world_landmarks in hand_world_landmarks_proto_list
]
)
timestamp = output_packets[_HAND_GESTURE_STREAM_NAME].timestamp
options.result_callback(
gesture_recognition_result, image,
timestamp.value // _MICRO_SECONDS_PER_MILLISECOND)
task_info = _TaskInfo(
task_graph=_TASK_GRAPH_NAME,
input_streams=[
':'.join([_IMAGE_TAG, _IMAGE_IN_STREAM_NAME]),
':'.join([_NORM_RECT_TAG, _NORM_RECT_STREAM_NAME]),
],
output_streams=[
':'.join([_HAND_GESTURE_TAG, _HAND_GESTURE_STREAM_NAME]),
':'.join([_HANDEDNESS_TAG, _HANDEDNESS_STREAM_NAME]),
':'.join([_HAND_LANDMARKS_TAG, _HAND_LANDMARKS_STREAM_NAME]),
':'.join([_HAND_WORLD_LANDMARKS_TAG,
_HAND_WORLD_LANDMARKS_STREAM_NAME]),
':'.join([_IMAGE_TAG, _IMAGE_OUT_STREAM_NAME])
],
task_options=options)
return cls(
task_info.generate_graph_config(
enable_flow_limiting=options.running_mode ==
_RunningMode.LIVE_STREAM), options.running_mode,
packets_callback if options.result_callback else None)
def recognize(
self,
image: image_module.Image,
roi: Optional[_NormalizedRect] = None
) -> GestureRecognitionResult:
"""Performs hand gesture recognition on the given image. Only use this
method when the GestureRecognizer is created with the image running mode.
The image can be of any size with format RGB or RGBA.
TODO: Describes how the input image will be preprocessed after the yuv
support is implemented.
Args:
image: MediaPipe Image.
roi: The region of interest.
Returns:
The hand gesture recognition results.
Raises:
ValueError: If any of the input arguments is invalid.
RuntimeError: If gesture recognition failed to run.
"""
norm_rect = roi if roi is not None else _build_full_image_norm_rect()
output_packets = self._process_image_data({
_IMAGE_IN_STREAM_NAME: packet_creator.create_image(image),
_NORM_RECT_STREAM_NAME: packet_creator.create_proto(
norm_rect.to_pb2())})
gestures_proto_list = packet_getter.get_proto_list(
output_packets[_HAND_GESTURE_STREAM_NAME])
handedness_proto_list = packet_getter.get_proto_list(
output_packets[_HANDEDNESS_STREAM_NAME])
hand_landmarks_proto_list = packet_getter.get_proto_list(
output_packets[_HAND_LANDMARKS_STREAM_NAME])
hand_world_landmarks_proto_list = packet_getter.get_proto_list(
output_packets[_HAND_WORLD_LANDMARKS_STREAM_NAME])
return GestureRecognitionResult(
[
classification_module.ClassificationList.create_from_pb2(gestures)
for gestures in gestures_proto_list
], [
classification_module.ClassificationList.create_from_pb2(handedness)
for handedness in handedness_proto_list
], [
landmark_module.NormalizedLandmarkList.create_from_pb2(hand_landmarks)
for hand_landmarks in hand_landmarks_proto_list
], [
landmark_module.LandmarkList.create_from_pb2(hand_world_landmarks)
for hand_world_landmarks in hand_world_landmarks_proto_list
]
)
def recognize_for_video(
self, image: image_module.Image,
timestamp_ms: int,
roi: Optional[_NormalizedRect] = None
) -> GestureRecognitionResult:
"""Performs gesture recognition on the provided video frame. Only use this
method when the GestureRecognizer is created with the video running mode.
Only use this method when the GestureRecognizer is created with the video
running mode. It's required to provide the video frame's timestamp (in
milliseconds) along with the video frame. The input timestamps should be
monotonically increasing for adjacent calls of this method.
Args:
image: MediaPipe Image.
timestamp_ms: The timestamp of the input video frame in milliseconds.
roi: The region of interest.
Returns:
The hand gesture recognition results.
Raises:
ValueError: If any of the input arguments is invalid.
RuntimeError: If gesture recognition failed to run.
"""
norm_rect = roi if roi is not None else _build_full_image_norm_rect()
output_packets = self._process_video_data({
_IMAGE_IN_STREAM_NAME: packet_creator.create_image(image).at(
timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND),
_NORM_RECT_STREAM_NAME: packet_creator.create_proto(
norm_rect.to_pb2()).at(
timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND)
})
gestures_proto_list = packet_getter.get_proto_list(
output_packets[_HAND_GESTURE_STREAM_NAME])
handedness_proto_list = packet_getter.get_proto_list(
output_packets[_HANDEDNESS_STREAM_NAME])
hand_landmarks_proto_list = packet_getter.get_proto_list(
output_packets[_HAND_LANDMARKS_STREAM_NAME])
hand_world_landmarks_proto_list = packet_getter.get_proto_list(
output_packets[_HAND_WORLD_LANDMARKS_STREAM_NAME])
return GestureRecognitionResult(
[
classification_module.ClassificationList.create_from_pb2(gestures)
for gestures in gestures_proto_list
], [
classification_module.ClassificationList.create_from_pb2(handedness)
for handedness in handedness_proto_list
], [
landmark_module.NormalizedLandmarkList.create_from_pb2(hand_landmarks)
for hand_landmarks in hand_landmarks_proto_list
], [
landmark_module.LandmarkList.create_from_pb2(hand_world_landmarks)
for hand_world_landmarks in hand_world_landmarks_proto_list
]
)
def recognize_async(
self,
image: image_module.Image,
timestamp_ms: int,
roi: Optional[_NormalizedRect] = None
) -> None:
"""Sends live image data to perform gesture recognition, and the results
will be available via the "result_callback" provided in the
GestureRecognizerOptions. Only use this method when the GestureRecognizer
is created with the live stream running mode.
Only use this method when the GestureRecognizer is created with the live
stream running mode. The input timestamps should be monotonically increasing
for adjacent calls of this method. This method will return immediately after
the input image is accepted. The results will be available via the
`result_callback` provided in the `GestureRecognizerOptions`. The
`recognize_async` method is designed to process live stream data such as
camera input. To lower the overall latency, gesture recognizer may drop the
input images if needed. In other words, it's not guaranteed to have output
per input image.
The `result_callback` provides:
- The hand gesture recognition results.
- The input image that the image classifier runs on.
- The input timestamp in milliseconds.
Args:
image: MediaPipe Image.
timestamp_ms: The timestamp of the input image in milliseconds.
roi: The region of interest.
Raises:
ValueError: If the current input timestamp is smaller than what the
gesture recognizer has already processed.
"""
norm_rect = roi if roi is not None else _build_full_image_norm_rect()
self._send_live_stream_data({
_IMAGE_IN_STREAM_NAME: packet_creator.create_image(image).at(
timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND),
_NORM_RECT_STREAM_NAME: packet_creator.create_proto(
norm_rect.to_pb2()).at(
timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND)
})