From 0402ee383f37c6f8d7bc0cfeb8be48485e393f9e Mon Sep 17 00:00:00 2001 From: kinaryml Date: Tue, 8 Nov 2022 01:05:36 -0800 Subject: [PATCH] Added hand landmarker Python API and tests --- mediapipe/tasks/python/test/vision/BUILD | 23 + .../test/vision/hand_landmarker_test.py | 436 ++++++++++++++++++ mediapipe/tasks/python/vision/BUILD | 23 + .../tasks/python/vision/hand_landmarker.py | 381 +++++++++++++++ 4 files changed, 863 insertions(+) create mode 100644 mediapipe/tasks/python/test/vision/hand_landmarker_test.py create mode 100644 mediapipe/tasks/python/vision/hand_landmarker.py diff --git a/mediapipe/tasks/python/test/vision/BUILD b/mediapipe/tasks/python/test/vision/BUILD index c29648160..f99eb25a2 100644 --- a/mediapipe/tasks/python/test/vision/BUILD +++ b/mediapipe/tasks/python/test/vision/BUILD @@ -73,3 +73,26 @@ py_test( "//mediapipe/tasks/python/vision/core:vision_task_running_mode", ], ) + +py_test( + name = "hand_landmarker_test", + srcs = ["hand_landmarker_test.py"], + data = [ + "//mediapipe/tasks/testdata/vision:test_images", + "//mediapipe/tasks/testdata/vision:test_models", + "//mediapipe/tasks/testdata/vision:test_protos", + ], + deps = [ + "//mediapipe/python:_framework_bindings", + "//mediapipe/tasks/cc/components/containers/proto:landmarks_detection_result_py_pb2", + "//mediapipe/tasks/python/components/containers:rect", + "//mediapipe/tasks/python/components/containers:landmark", + "//mediapipe/tasks/python/components/containers:landmark_detection_result", + "//mediapipe/tasks/python/core:base_options", + "//mediapipe/tasks/python/test:test_utils", + "//mediapipe/tasks/python/vision:hand_landmarker", + "//mediapipe/tasks/python/vision/core:vision_task_running_mode", + "//mediapipe/tasks/python/vision/core:image_processing_options", + "@com_google_protobuf//:protobuf_python" + ], +) diff --git a/mediapipe/tasks/python/test/vision/hand_landmarker_test.py b/mediapipe/tasks/python/test/vision/hand_landmarker_test.py new file mode 100644 index 000000000..9d311e210 --- /dev/null +++ b/mediapipe/tasks/python/test/vision/hand_landmarker_test.py @@ -0,0 +1,436 @@ +# Copyright 2022 The MediaPipe Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for hand landmarker.""" + +import enum +from unittest import mock + +import numpy as np +from google.protobuf import text_format +from absl.testing import absltest +from absl.testing import parameterized + +from mediapipe.python._framework_bindings import image as image_module +from mediapipe.tasks.cc.components.containers.proto import landmarks_detection_result_pb2 +from mediapipe.tasks.python.components.containers import rect as rect_module +from mediapipe.tasks.python.components.containers import landmark as landmark_module +from mediapipe.tasks.python.components.containers import landmark_detection_result as landmark_detection_result_module +from mediapipe.tasks.python.core import base_options as base_options_module +from mediapipe.tasks.python.test import test_utils +from mediapipe.tasks.python.vision import hand_landmarker +from mediapipe.tasks.python.vision.core import vision_task_running_mode as running_mode_module +from mediapipe.tasks.python.vision.core import image_processing_options as image_processing_options_module + +_LandmarksDetectionResultProto = landmarks_detection_result_pb2.LandmarksDetectionResult +_BaseOptions = base_options_module.BaseOptions +_Rect = rect_module.Rect +_Landmark = landmark_module.Landmark +_NormalizedLandmark = landmark_module.NormalizedLandmark +_LandmarksDetectionResult = landmark_detection_result_module.LandmarksDetectionResult +_Image = image_module.Image +_HandLandmarker = hand_landmarker.HandLandmarker +_HandLandmarkerOptions = hand_landmarker.HandLandmarkerOptions +_HandLandmarksDetectionResult = hand_landmarker.HandLandmarksDetectionResult +_RUNNING_MODE = running_mode_module.VisionTaskRunningMode +_ImageProcessingOptions = image_processing_options_module.ImageProcessingOptions + +_HAND_LANDMARKER_BUNDLE_ASSET_FILE = 'hand_landmarker.task' +_NO_HANDS_IMAGE = 'cats_and_dogs.jpg' +_TWO_HANDS_IMAGE = 'right_hands.jpg' +_THUMB_UP_IMAGE = 'thumb_up.jpg' +_THUMB_UP_LANDMARKS = 'thumb_up_landmarks.pbtxt' +_POINTING_UP_IMAGE = 'pointing_up.jpg' +_POINTING_UP_LANDMARKS = 'pointing_up_landmarks.pbtxt' +_POINTING_UP_ROTATED_IMAGE = 'pointing_up_rotated.jpg' +_POINTING_UP_ROTATED_LANDMARKS = 'pointing_up_rotated_landmarks.pbtxt' +_LANDMARKS_ERROR_TOLERANCE = 0.03 +_HANDEDNESS_MARGIN = 0.05 + + +def _get_expected_hand_landmarks_detection_result( + file_path: str) -> _HandLandmarksDetectionResult: + landmarks_detection_result_file_path = test_utils.get_test_data_path( + file_path) + with open(landmarks_detection_result_file_path, "rb") as f: + landmarks_detection_result_proto = _LandmarksDetectionResultProto() + # Use this if a .pb file is available. + # landmarks_detection_result_proto.ParseFromString(f.read()) + text_format.Parse(f.read(), landmarks_detection_result_proto) + landmarks_detection_result = _LandmarksDetectionResult.create_from_pb2( + landmarks_detection_result_proto) + return _HandLandmarksDetectionResult( + handedness=[landmarks_detection_result.categories], + hand_landmarks=[landmarks_detection_result.landmarks], + hand_world_landmarks=[landmarks_detection_result.world_landmarks]) + + +class ModelFileType(enum.Enum): + FILE_CONTENT = 1 + FILE_NAME = 2 + + +class GestureRecognizerTest(parameterized.TestCase): + + def setUp(self): + super().setUp() + self.test_image = _Image.create_from_file( + test_utils.get_test_data_path(_THUMB_UP_IMAGE)) + self.model_path = test_utils.get_test_data_path( + _HAND_LANDMARKER_BUNDLE_ASSET_FILE) + + def _assert_actual_result_approximately_matches_expected_result( + self, + actual_result: _HandLandmarksDetectionResult, + expected_result: _HandLandmarksDetectionResult + ): + # Expects to have the same number of hands detected. + self.assertLen(actual_result.hand_landmarks, + len(expected_result.hand_landmarks)) + self.assertLen(actual_result.hand_world_landmarks, + len(expected_result.hand_world_landmarks)) + self.assertLen(actual_result.handedness, len(expected_result.handedness)) + # Actual landmarks match expected landmarks. + self.assertLen(actual_result.hand_landmarks[0], + len(expected_result.hand_landmarks[0])) + actual_landmarks = actual_result.hand_landmarks[0] + expected_landmarks = expected_result.hand_landmarks[0] + for i in range(len(actual_landmarks)): + self.assertAlmostEqual(actual_landmarks[i].x, expected_landmarks[i].x, + delta=_LANDMARKS_ERROR_TOLERANCE) + self.assertAlmostEqual(actual_landmarks[i].y, expected_landmarks[i].y, + delta=_LANDMARKS_ERROR_TOLERANCE) + # Actual handedness matches expected handedness. + actual_top_handedness = actual_result.handedness[0][0] + expected_top_handedness = expected_result.handedness[0][0] + self.assertEqual(actual_top_handedness.index, expected_top_handedness.index) + self.assertEqual(actual_top_handedness.category_name, + expected_top_handedness.category_name) + self.assertAlmostEqual(actual_top_handedness.score, + expected_top_handedness.score, + delta=_HANDEDNESS_MARGIN) + + def test_create_from_file_succeeds_with_valid_model_path(self): + # Creates with default option and valid model file successfully. + with _HandLandmarker.create_from_model_path(self.model_path) as landmarker: + self.assertIsInstance(landmarker, _HandLandmarker) + + def test_create_from_options_succeeds_with_valid_model_path(self): + # Creates with options containing model file successfully. + base_options = _BaseOptions(model_asset_path=self.model_path) + options = _HandLandmarkerOptions(base_options=base_options) + with _HandLandmarker.create_from_options(options) as landmarker: + self.assertIsInstance(landmarker, _HandLandmarker) + + def test_create_from_options_fails_with_invalid_model_path(self): + # Invalid empty model path. + with self.assertRaisesRegex( + ValueError, + r"ExternalFile must specify at least one of 'file_content', " + r"'file_name', 'file_pointer_meta' or 'file_descriptor_meta'."): + base_options = _BaseOptions(model_asset_path='') + options = _HandLandmarkerOptions(base_options=base_options) + _HandLandmarker.create_from_options(options) + + def test_create_from_options_succeeds_with_valid_model_content(self): + # Creates with options containing model content successfully. + with open(self.model_path, 'rb') as f: + base_options = _BaseOptions(model_asset_buffer=f.read()) + options = _HandLandmarkerOptions(base_options=base_options) + landmarker = _HandLandmarker.create_from_options(options) + self.assertIsInstance(landmarker, _HandLandmarker) + + @parameterized.parameters( + (ModelFileType.FILE_NAME, _get_expected_hand_landmarks_detection_result( + _THUMB_UP_LANDMARKS + )), + (ModelFileType.FILE_CONTENT, _get_expected_hand_landmarks_detection_result( + _THUMB_UP_LANDMARKS + ))) + def test_detect(self, model_file_type, expected_detection_result): + # Creates hand landmarker. + if model_file_type is ModelFileType.FILE_NAME: + base_options = _BaseOptions(model_asset_path=self.model_path) + elif model_file_type is ModelFileType.FILE_CONTENT: + with open(self.model_path, 'rb') as f: + model_content = f.read() + base_options = _BaseOptions(model_asset_buffer=model_content) + else: + # Should never happen + raise ValueError('model_file_type is invalid.') + + options = _HandLandmarkerOptions(base_options=base_options) + landmarker = _HandLandmarker.create_from_options(options) + + # Performs hand landmarks detection on the input. + detection_result = landmarker.detect(self.test_image) + # Comparing results. + self._assert_actual_result_approximately_matches_expected_result( + detection_result, expected_detection_result) + # Closes the hand landmarker explicitly when the hand landmarker is not used + # in a context. + landmarker.close() + + @parameterized.parameters( + (ModelFileType.FILE_NAME, _get_expected_hand_landmarks_detection_result( + _THUMB_UP_LANDMARKS + )), + (ModelFileType.FILE_CONTENT, _get_expected_hand_landmarks_detection_result( + _THUMB_UP_LANDMARKS + ))) + def test_detect_in_context(self, model_file_type, expected_detection_result): + # Creates hand landmarker. + if model_file_type is ModelFileType.FILE_NAME: + base_options = _BaseOptions(model_asset_path=self.model_path) + elif model_file_type is ModelFileType.FILE_CONTENT: + with open(self.model_path, 'rb') as f: + model_content = f.read() + base_options = _BaseOptions(model_asset_buffer=model_content) + else: + # Should never happen + raise ValueError('model_file_type is invalid.') + + options = _HandLandmarkerOptions(base_options=base_options) + with _HandLandmarker.create_from_options(options) as landmarker: + # Performs hand landmarks detection on the input. + detection_result = landmarker.detect(self.test_image) + # Comparing results. + self._assert_actual_result_approximately_matches_expected_result( + detection_result, expected_detection_result) + + def test_detect_succeeds_with_num_hands(self): + # Creates hand landmarker. + base_options = _BaseOptions(model_asset_path=self.model_path) + options = _HandLandmarkerOptions(base_options=base_options, num_hands=2) + with _HandLandmarker.create_from_options(options) as landmarker: + # Load the two hands image. + test_image = _Image.create_from_file( + test_utils.get_test_data_path(_TWO_HANDS_IMAGE)) + # Performs hand landmarks detection on the input. + detection_result = landmarker.detect(test_image) + # Comparing results. + self.assertLen(detection_result.handedness, 2) + + def test_detect_succeeds_with_rotation(self): + # Creates hand landmarker. + base_options = _BaseOptions(model_asset_path=self.model_path) + options = _HandLandmarkerOptions(base_options=base_options) + with _HandLandmarker.create_from_options(options) as landmarker: + # Load the pointing up rotated image. + test_image = _Image.create_from_file( + test_utils.get_test_data_path(_POINTING_UP_ROTATED_IMAGE)) + # Set rotation parameters using ImageProcessingOptions. + image_processing_options = _ImageProcessingOptions(rotation_degrees=-90) + # Performs hand landmarks detection on the input. + detection_result = landmarker.detect(test_image, + image_processing_options) + expected_detection_result = _get_expected_hand_landmarks_detection_result( + _POINTING_UP_ROTATED_LANDMARKS) + # Comparing results. + self._assert_actual_result_approximately_matches_expected_result( + detection_result, expected_detection_result) + + def test_detect_fails_with_region_of_interest(self): + # Creates hand landmarker. + base_options = _BaseOptions(model_asset_path=self.model_path) + options = _HandLandmarkerOptions(base_options=base_options) + with self.assertRaisesRegex( + ValueError, "This task doesn't support region-of-interest."): + with _HandLandmarker.create_from_options(options) as landmarker: + # Set the `region_of_interest` parameter using `ImageProcessingOptions`. + image_processing_options = _ImageProcessingOptions( + region_of_interest=_Rect(0, 0, 1, 1)) + # Attempt to perform hand landmarks detection on the cropped input. + landmarker.detect(self.test_image, image_processing_options) + + def test_empty_detection_outputs(self): + options = _HandLandmarkerOptions( + base_options=_BaseOptions(model_asset_path=self.model_path)) + with _HandLandmarker.create_from_options(options) as landmarker: + # Load the image with no hands. + no_hands_test_image = _Image.create_from_file( + test_utils.get_test_data_path(_NO_HANDS_IMAGE)) + # Performs hand landmarks detection on the input. + detection_result = landmarker.detect(no_hands_test_image) + self.assertEmpty(detection_result.hand_landmarks) + self.assertEmpty(detection_result.hand_world_landmarks) + self.assertEmpty(detection_result.handedness) + + def test_missing_result_callback(self): + options = _HandLandmarkerOptions( + base_options=_BaseOptions(model_asset_path=self.model_path), + running_mode=_RUNNING_MODE.LIVE_STREAM) + with self.assertRaisesRegex(ValueError, + r'result callback must be provided'): + with _HandLandmarker.create_from_options(options) as unused_landmarker: + pass + + @parameterized.parameters((_RUNNING_MODE.IMAGE), (_RUNNING_MODE.VIDEO)) + def test_illegal_result_callback(self, running_mode): + options = _HandLandmarkerOptions( + base_options=_BaseOptions(model_asset_path=self.model_path), + running_mode=running_mode, + result_callback=mock.MagicMock()) + with self.assertRaisesRegex(ValueError, + r'result callback should not be provided'): + with _HandLandmarker.create_from_options(options) as unused_landmarker: + pass + + def test_calling_detect_for_video_in_image_mode(self): + options = _HandLandmarkerOptions( + base_options=_BaseOptions(model_asset_path=self.model_path), + running_mode=_RUNNING_MODE.IMAGE) + with _HandLandmarker.create_from_options(options) as landmarker: + with self.assertRaisesRegex(ValueError, + r'not initialized with the video mode'): + landmarker.detect_for_video(self.test_image, 0) + + def test_calling_detect_async_in_image_mode(self): + options = _HandLandmarkerOptions( + base_options=_BaseOptions(model_asset_path=self.model_path), + running_mode=_RUNNING_MODE.IMAGE) + with _HandLandmarker.create_from_options(options) as landmarker: + with self.assertRaisesRegex(ValueError, + r'not initialized with the live stream mode'): + landmarker.detect_async(self.test_image, 0) + + def test_calling_detect_in_video_mode(self): + options = _HandLandmarkerOptions( + base_options=_BaseOptions(model_asset_path=self.model_path), + running_mode=_RUNNING_MODE.VIDEO) + with _HandLandmarker.create_from_options(options) as landmarker: + with self.assertRaisesRegex(ValueError, + r'not initialized with the image mode'): + landmarker.detect(self.test_image) + + def test_calling_detect_async_in_video_mode(self): + options = _HandLandmarkerOptions( + base_options=_BaseOptions(model_asset_path=self.model_path), + running_mode=_RUNNING_MODE.VIDEO) + with _HandLandmarker.create_from_options(options) as landmarker: + with self.assertRaisesRegex(ValueError, + r'not initialized with the live stream mode'): + landmarker.detect_async(self.test_image, 0) + + def test_detect_for_video_with_out_of_order_timestamp(self): + options = _HandLandmarkerOptions( + base_options=_BaseOptions(model_asset_path=self.model_path), + running_mode=_RUNNING_MODE.VIDEO) + with _HandLandmarker.create_from_options(options) as landmarker: + unused_result = landmarker.detect_for_video(self.test_image, 1) + with self.assertRaisesRegex( + ValueError, r'Input timestamp must be monotonically increasing'): + landmarker.detect_for_video(self.test_image, 0) + + @parameterized.parameters( + (_THUMB_UP_IMAGE, 0, _get_expected_hand_landmarks_detection_result( + _THUMB_UP_LANDMARKS)), + (_POINTING_UP_IMAGE, 0, _get_expected_hand_landmarks_detection_result( + _POINTING_UP_LANDMARKS)), + (_POINTING_UP_ROTATED_IMAGE, -90, + _get_expected_hand_landmarks_detection_result( + _POINTING_UP_ROTATED_LANDMARKS)), + (_NO_HANDS_IMAGE, 0, _HandLandmarksDetectionResult([], [], []))) + def test_detect_for_video(self, image_path, rotation, expected_result): + test_image = _Image.create_from_file( + test_utils.get_test_data_path(image_path)) + # Set rotation parameters using ImageProcessingOptions. + image_processing_options = _ImageProcessingOptions(rotation_degrees=rotation) + options = _HandLandmarkerOptions( + base_options=_BaseOptions(model_asset_path=self.model_path), + running_mode=_RUNNING_MODE.VIDEO) + with _HandLandmarker.create_from_options(options) as landmarker: + for timestamp in range(0, 300, 30): + result = landmarker.detect_for_video(test_image, + timestamp, + image_processing_options) + if result.hand_landmarks and result.hand_world_landmarks and \ + result.handedness: + self._assert_actual_result_approximately_matches_expected_result( + result, expected_result) + else: + self.assertEqual(result, expected_result) + + def test_calling_detect_in_live_stream_mode(self): + options = _HandLandmarkerOptions( + base_options=_BaseOptions(model_asset_path=self.model_path), + running_mode=_RUNNING_MODE.LIVE_STREAM, + result_callback=mock.MagicMock()) + with _HandLandmarker.create_from_options(options) as landmarker: + with self.assertRaisesRegex(ValueError, + r'not initialized with the image mode'): + landmarker.detect(self.test_image) + + def test_calling_detect_for_video_in_live_stream_mode(self): + options = _HandLandmarkerOptions( + base_options=_BaseOptions(model_asset_path=self.model_path), + running_mode=_RUNNING_MODE.LIVE_STREAM, + result_callback=mock.MagicMock()) + with _HandLandmarker.create_from_options(options) as landmarker: + with self.assertRaisesRegex(ValueError, + r'not initialized with the video mode'): + landmarker.detect_for_video(self.test_image, 0) + + def test_detect_async_calls_with_illegal_timestamp(self): + options = _HandLandmarkerOptions( + base_options=_BaseOptions(model_asset_path=self.model_path), + running_mode=_RUNNING_MODE.LIVE_STREAM, + result_callback=mock.MagicMock()) + with _HandLandmarker.create_from_options(options) as landmarker: + landmarker.detect_async(self.test_image, 100) + with self.assertRaisesRegex( + ValueError, r'Input timestamp must be monotonically increasing'): + landmarker.detect_async(self.test_image, 0) + + @parameterized.parameters( + (_THUMB_UP_IMAGE, 0, _get_expected_hand_landmarks_detection_result( + _THUMB_UP_LANDMARKS)), + (_POINTING_UP_IMAGE, 0, _get_expected_hand_landmarks_detection_result( + _POINTING_UP_LANDMARKS)), + (_POINTING_UP_ROTATED_IMAGE, -90, + _get_expected_hand_landmarks_detection_result( + _POINTING_UP_ROTATED_LANDMARKS)), + (_NO_HANDS_IMAGE, 0, _HandLandmarksDetectionResult([], [], []))) + def test_detect_async_calls(self, image_path, rotation, expected_result): + test_image = _Image.create_from_file( + test_utils.get_test_data_path(image_path)) + # Set rotation parameters using ImageProcessingOptions. + image_processing_options = _ImageProcessingOptions(rotation_degrees=rotation) + observed_timestamp_ms = -1 + + def check_result(result: _HandLandmarksDetectionResult, + output_image: _Image, + timestamp_ms: int): + if result.hand_landmarks and result.hand_world_landmarks and \ + result.handedness: + self._assert_actual_result_approximately_matches_expected_result( + result, expected_result) + else: + self.assertEqual(result, expected_result) + self.assertTrue( + np.array_equal(output_image.numpy_view(), + test_image.numpy_view())) + self.assertLess(observed_timestamp_ms, timestamp_ms) + self.observed_timestamp_ms = timestamp_ms + + options = _HandLandmarkerOptions( + base_options=_BaseOptions(model_asset_path=self.model_path), + running_mode=_RUNNING_MODE.LIVE_STREAM, + result_callback=check_result) + with _HandLandmarker.create_from_options(options) as landmarker: + for timestamp in range(0, 300, 30): + landmarker.detect_async(test_image, timestamp, image_processing_options) + + +if __name__ == '__main__': + absltest.main() diff --git a/mediapipe/tasks/python/vision/BUILD b/mediapipe/tasks/python/vision/BUILD index 527c6d883..79d01c6d0 100644 --- a/mediapipe/tasks/python/vision/BUILD +++ b/mediapipe/tasks/python/vision/BUILD @@ -102,3 +102,26 @@ py_library( "//mediapipe/tasks/python/vision/core:vision_task_running_mode", ], ) + +py_library( + name = "hand_landmarker", + srcs = [ + "hand_landmarker.py", + ], + deps = [ + "//mediapipe/framework/formats:classification_py_pb2", + "//mediapipe/framework/formats:landmark_py_pb2", + "//mediapipe/python:_framework_bindings", + "//mediapipe/python:packet_creator", + "//mediapipe/python:packet_getter", + "//mediapipe/tasks/cc/vision/hand_landmarker/proto:hand_landmarker_graph_options_py_pb2", + "//mediapipe/tasks/python/components/containers:category", + "//mediapipe/tasks/python/components/containers:landmark", + "//mediapipe/tasks/python/core:base_options", + "//mediapipe/tasks/python/core:optional_dependencies", + "//mediapipe/tasks/python/core:task_info", + "//mediapipe/tasks/python/vision/core:base_vision_task_api", + "//mediapipe/tasks/python/vision/core:image_processing_options", + "//mediapipe/tasks/python/vision/core:vision_task_running_mode", + ], +) diff --git a/mediapipe/tasks/python/vision/hand_landmarker.py b/mediapipe/tasks/python/vision/hand_landmarker.py new file mode 100644 index 000000000..e063bdafb --- /dev/null +++ b/mediapipe/tasks/python/vision/hand_landmarker.py @@ -0,0 +1,381 @@ +# Copyright 2022 The MediaPipe Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""MediaPipe hand landmarker task.""" + +import dataclasses +from typing import Callable, Mapping, Optional, List + +from mediapipe.framework.formats import classification_pb2 +from mediapipe.framework.formats import landmark_pb2 +from mediapipe.python import packet_creator +from mediapipe.python import packet_getter +from mediapipe.python._framework_bindings import image as image_module +from mediapipe.python._framework_bindings import packet as packet_module +from mediapipe.tasks.cc.vision.hand_landmarker.proto import hand_landmarker_graph_options_pb2 +from mediapipe.tasks.python.components.containers import category as category_module +from mediapipe.tasks.python.components.containers import landmark as landmark_module +from mediapipe.tasks.python.core import base_options as base_options_module +from mediapipe.tasks.python.core import task_info as task_info_module +from mediapipe.tasks.python.core.optional_dependencies import doc_controls +from mediapipe.tasks.python.vision.core import base_vision_task_api +from mediapipe.tasks.python.vision.core import image_processing_options as image_processing_options_module +from mediapipe.tasks.python.vision.core import vision_task_running_mode as running_mode_module + +_BaseOptions = base_options_module.BaseOptions +_HandLandmarkerGraphOptionsProto = hand_landmarker_graph_options_pb2.HandLandmarkerGraphOptions +_RunningMode = running_mode_module.VisionTaskRunningMode +_ImageProcessingOptions = image_processing_options_module.ImageProcessingOptions +_TaskInfo = task_info_module.TaskInfo + +_IMAGE_IN_STREAM_NAME = 'image_in' +_IMAGE_OUT_STREAM_NAME = 'image_out' +_IMAGE_TAG = 'IMAGE' +_NORM_RECT_STREAM_NAME = 'norm_rect_in' +_NORM_RECT_TAG = 'NORM_RECT' +_HANDEDNESS_STREAM_NAME = 'handedness' +_HANDEDNESS_TAG = 'HANDEDNESS' +_HAND_LANDMARKS_STREAM_NAME = 'landmarks' +_HAND_LANDMARKS_TAG = 'LANDMARKS' +_HAND_WORLD_LANDMARKS_STREAM_NAME = 'world_landmarks' +_HAND_WORLD_LANDMARKS_TAG = 'WORLD_LANDMARKS' +_TASK_GRAPH_NAME = 'mediapipe.tasks.vision.hand_landmarker.HandLandmarkerGraph' +_MICRO_SECONDS_PER_MILLISECOND = 1000 + + +@dataclasses.dataclass +class HandLandmarksDetectionResult: + """The hand landmarks detection result from HandLandmarker, where each vector + element represents a single hand detected in the image. + + Attributes: + handedness: Classification of handedness. + hand_landmarks: Detected hand landmarks in normalized image coordinates. + hand_world_landmarks: Detected hand landmarks in world coordinates. + """ + + handedness: List[List[category_module.Category]] + hand_landmarks: List[List[landmark_module.NormalizedLandmark]] + hand_world_landmarks: List[List[landmark_module.Landmark]] + + +def _build_detection_result( + output_packets: Mapping[str,packet_module.Packet] +) -> HandLandmarksDetectionResult: + """Constructs a `HandLandmarksDetectionResult` from output packets.""" + handedness_proto_list = packet_getter.get_proto_list( + output_packets[_HANDEDNESS_STREAM_NAME]) + hand_landmarks_proto_list = packet_getter.get_proto_list( + output_packets[_HAND_LANDMARKS_STREAM_NAME]) + hand_world_landmarks_proto_list = packet_getter.get_proto_list( + output_packets[_HAND_WORLD_LANDMARKS_STREAM_NAME]) + + handedness_results = [] + for proto in handedness_proto_list: + handedness_categories = [] + handedness_classifications = classification_pb2.ClassificationList() + handedness_classifications.MergeFrom(proto) + for handedness in handedness_classifications.classification: + handedness_categories.append( + category_module.Category( + index=handedness.index, + score=handedness.score, + display_name=handedness.display_name, + category_name=handedness.label)) + handedness_results.append(handedness_categories) + + hand_landmarks_results = [] + for proto in hand_landmarks_proto_list: + hand_landmarks = landmark_pb2.NormalizedLandmarkList() + hand_landmarks.MergeFrom(proto) + hand_landmarks_results.append([ + landmark_module.NormalizedLandmark.create_from_pb2(hand_landmark) + for hand_landmark in hand_landmarks.landmark + ]) + + hand_world_landmarks_results = [] + for proto in hand_world_landmarks_proto_list: + hand_world_landmarks = landmark_pb2.LandmarkList() + hand_world_landmarks.MergeFrom(proto) + hand_world_landmarks_results.append([ + landmark_module.Landmark.create_from_pb2(hand_world_landmark) + for hand_world_landmark in hand_world_landmarks.landmark + ]) + + return HandLandmarksDetectionResult(handedness_results, + hand_landmarks_results, + hand_world_landmarks_results) + + +@dataclasses.dataclass +class HandLandmarkerOptions: + """Options for the hand landmarker task. + + Attributes: + base_options: Base options for the hand landmarker task. + running_mode: The running mode of the task. Default to the image mode. + HandLandmarker has three running modes: 1) The image mode for detecting + hand landmarks on single image inputs. 2) The video mode for detecting + hand landmarks on the decoded frames of a video. 3) The live stream mode + for detecting hand landmarks on the live stream of input data, such as + from camera. In this mode, the "result_callback" below must be specified + to receive the detection results asynchronously. + num_hands: The maximum number of hands can be detected by the hand + landmarker. + min_hand_detection_confidence: The minimum confidence score for the hand + detection to be considered successful. + min_hand_presence_confidence: The minimum confidence score of hand presence + score in the hand landmark detection. + min_tracking_confidence: The minimum confidence score for the hand tracking + to be considered successful. + result_callback: The user-defined result callback for processing live stream + data. The result callback should only be specified when the running mode + is set to the live stream mode. + """ + base_options: _BaseOptions + running_mode: _RunningMode = _RunningMode.IMAGE + num_hands: Optional[int] = 1 + min_hand_detection_confidence: Optional[float] = 0.5 + min_hand_presence_confidence: Optional[float] = 0.5 + min_tracking_confidence: Optional[float] = 0.5 + result_callback: Optional[Callable[ + [HandLandmarksDetectionResult, image_module.Image, int], None]] = None + + @doc_controls.do_not_generate_docs + def to_pb2(self) -> _HandLandmarkerGraphOptionsProto: + """Generates an HandLandmarkerGraphOptions protobuf object.""" + base_options_proto = self.base_options.to_pb2() + base_options_proto.use_stream_mode = False if self.running_mode == _RunningMode.IMAGE else True + + # Initialize the hand landmarker options from base options. + hand_landmarker_options_proto = _HandLandmarkerGraphOptionsProto( + base_options=base_options_proto) + hand_landmarker_options_proto.min_tracking_confidence = self.min_tracking_confidence + hand_landmarker_options_proto.hand_detector_graph_options.num_hands = self.num_hands + hand_landmarker_options_proto.hand_detector_graph_options.min_detection_confidence = self.min_hand_detection_confidence + hand_landmarker_options_proto.hand_landmarks_detector_graph_options.min_detection_confidence = self.min_hand_presence_confidence + return hand_landmarker_options_proto + + +class HandLandmarker(base_vision_task_api.BaseVisionTaskApi): + """Class that performs hand landmarks detection on images.""" + + @classmethod + def create_from_model_path(cls, model_path: str) -> 'HandLandmarker': + """Creates an `HandLandmarker` object from a TensorFlow Lite model and the default `HandLandmarkerOptions`. + + Note that the created `HandLandmarker` instance is in image mode, for + detecting hand landmarks on single image inputs. + + Args: + model_path: Path to the model. + + Returns: + `HandLandmarker` object that's created from the model file and the + default `HandLandmarkerOptions`. + + Raises: + ValueError: If failed to create `HandLandmarker` object from the + provided file such as invalid file path. + RuntimeError: If other types of error occurred. + """ + base_options = _BaseOptions(model_asset_path=model_path) + options = HandLandmarkerOptions( + base_options=base_options, running_mode=_RunningMode.IMAGE) + return cls.create_from_options(options) + + @classmethod + def create_from_options( + cls, options: HandLandmarkerOptions) -> 'HandLandmarker': + """Creates the `HandLandmarker` object from hand landmarker options. + + Args: + options: Options for the hand landmarker task. + + Returns: + `HandLandmarker` object that's created from `options`. + + Raises: + ValueError: If failed to create `HandLandmarker` object from + `HandLandmarkerOptions` such as missing the model. + RuntimeError: If other types of error occurred. + """ + + def packets_callback(output_packets: Mapping[str, packet_module.Packet]): + if output_packets[_IMAGE_OUT_STREAM_NAME].is_empty(): + return + + image = packet_getter.get_image(output_packets[_IMAGE_OUT_STREAM_NAME]) + + if output_packets[_HAND_LANDMARKS_STREAM_NAME].is_empty(): + empty_packet = output_packets[_HAND_LANDMARKS_STREAM_NAME] + options.result_callback( + HandLandmarksDetectionResult([], [], []), image, + empty_packet.timestamp.value // _MICRO_SECONDS_PER_MILLISECOND) + return + + hand_landmarks_detection_result = _build_detection_result(output_packets) + timestamp = output_packets[_HAND_LANDMARKS_STREAM_NAME].timestamp + options.result_callback(hand_landmarks_detection_result, image, + timestamp.value // _MICRO_SECONDS_PER_MILLISECOND) + + task_info = _TaskInfo( + task_graph=_TASK_GRAPH_NAME, + input_streams=[ + ':'.join([_IMAGE_TAG, _IMAGE_IN_STREAM_NAME]), + ':'.join([_NORM_RECT_TAG, _NORM_RECT_STREAM_NAME]), + ], + output_streams=[ + ':'.join([_HANDEDNESS_TAG, _HANDEDNESS_STREAM_NAME]), + ':'.join([_HAND_LANDMARKS_TAG, + _HAND_LANDMARKS_STREAM_NAME]), ':'.join([ + _HAND_WORLD_LANDMARKS_TAG, + _HAND_WORLD_LANDMARKS_STREAM_NAME + ]), ':'.join([_IMAGE_TAG, _IMAGE_OUT_STREAM_NAME]) + ], + task_options=options) + return cls( + task_info.generate_graph_config( + enable_flow_limiting=options.running_mode == + _RunningMode.LIVE_STREAM), options.running_mode, + packets_callback if options.result_callback else None) + + def detect( + self, + image: image_module.Image, + image_processing_options: Optional[_ImageProcessingOptions] = None + ) -> HandLandmarksDetectionResult: + """Performs hand landmarks detection on the given image. + + Only use this method when the HandLandmarker is created with the image + running mode. + + The image can be of any size with format RGB or RGBA. + TODO: Describes how the input image will be preprocessed after the yuv + support is implemented. + + Args: + image: MediaPipe Image. + image_processing_options: Options for image processing. + + Returns: + The hand landmarks detection results. + + Raises: + ValueError: If any of the input arguments is invalid. + RuntimeError: If hand landmarker detection failed to run. + """ + normalized_rect = self.convert_to_normalized_rect( + image_processing_options, roi_allowed=False) + output_packets = self._process_image_data({ + _IMAGE_IN_STREAM_NAME: + packet_creator.create_image(image), + _NORM_RECT_STREAM_NAME: + packet_creator.create_proto(normalized_rect.to_pb2()) + }) + + if output_packets[_HAND_LANDMARKS_STREAM_NAME].is_empty(): + return HandLandmarksDetectionResult([], [], []) + + return _build_detection_result(output_packets) + + def detect_for_video( + self, + image: image_module.Image, + timestamp_ms: int, + image_processing_options: Optional[_ImageProcessingOptions] = None + ) -> HandLandmarksDetectionResult: + """Performs hand landmarks detection on the provided video frame. + + Only use this method when the HandLandmarker is created with the video + running mode. + + Only use this method when the HandLandmarker is created with the video + running mode. It's required to provide the video frame's timestamp (in + milliseconds) along with the video frame. The input timestamps should be + monotonically increasing for adjacent calls of this method. + + Args: + image: MediaPipe Image. + timestamp_ms: The timestamp of the input video frame in milliseconds. + image_processing_options: Options for image processing. + + Returns: + The hand landmarks detection results. + + Raises: + ValueError: If any of the input arguments is invalid. + RuntimeError: If hand landmarker detection failed to run. + """ + normalized_rect = self.convert_to_normalized_rect( + image_processing_options, roi_allowed=False) + output_packets = self._process_video_data({ + _IMAGE_IN_STREAM_NAME: + packet_creator.create_image(image).at( + timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND), + _NORM_RECT_STREAM_NAME: + packet_creator.create_proto(normalized_rect.to_pb2()).at( + timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND) + }) + + if output_packets[_HAND_LANDMARKS_STREAM_NAME].is_empty(): + return HandLandmarksDetectionResult([], [], []) + + return _build_detection_result(output_packets) + + def detect_async( + self, + image: image_module.Image, + timestamp_ms: int, + image_processing_options: Optional[_ImageProcessingOptions] = None + ) -> None: + """Sends live image data to perform hand landmarks detection. + + The results will be available via the "result_callback" provided in the + HandLandmarkerOptions. Only use this method when the HandLandmarker is + created with the live stream running mode. + + Only use this method when the HandLandmarker is created with the live + stream running mode. The input timestamps should be monotonically increasing + for adjacent calls of this method. This method will return immediately after + the input image is accepted. The results will be available via the + `result_callback` provided in the `HandLandmarkerOptions`. The + `detect_async` method is designed to process live stream data such as + camera input. To lower the overall latency, hand landmarker may drop the + input images if needed. In other words, it's not guaranteed to have output + per input image. + + The `result_callback` provides: + - The hand landmarks detection results. + - The input image that the hand landmarker runs on. + - The input timestamp in milliseconds. + + Args: + image: MediaPipe Image. + timestamp_ms: The timestamp of the input image in milliseconds. + image_processing_options: Options for image processing. + + Raises: + ValueError: If the current input timestamp is smaller than what the + hand landmarker has already processed. + """ + normalized_rect = self.convert_to_normalized_rect( + image_processing_options, roi_allowed=False) + self._send_live_stream_data({ + _IMAGE_IN_STREAM_NAME: + packet_creator.create_image(image).at( + timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND), + _NORM_RECT_STREAM_NAME: + packet_creator.create_proto(normalized_rect.to_pb2()).at( + timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND) + })