Merge pull request #4303 from kinaryml:pose-landmarker-python

PiperOrigin-RevId: 527948047
This commit is contained in:
Copybara-Service 2023-04-28 11:59:28 -07:00
commit 5cffb3973f
11 changed files with 1102 additions and 91 deletions

View File

@ -99,6 +99,7 @@ cc_library(
"//mediapipe/tasks/cc/vision/image_segmenter:image_segmenter_graph",
"//mediapipe/tasks/cc/vision/interactive_segmenter:interactive_segmenter_graph",
"//mediapipe/tasks/cc/vision/object_detector:object_detector_graph",
"//mediapipe/tasks/cc/vision/pose_landmarker:pose_landmarker_graph",
] + select({
# TODO: Build text_classifier_graph and text_embedder_graph on Windows.
"//mediapipe:windows": [],

View File

@ -162,3 +162,26 @@ py_test(
"@com_google_protobuf//:protobuf_python",
],
)
py_test(
name = "pose_landmarker_test",
srcs = ["pose_landmarker_test.py"],
data = [
"//mediapipe/tasks/testdata/vision:test_images",
"//mediapipe/tasks/testdata/vision:test_models",
"//mediapipe/tasks/testdata/vision:test_protos",
],
deps = [
"//mediapipe/python:_framework_bindings",
"//mediapipe/tasks/cc/components/containers/proto:landmarks_detection_result_py_pb2",
"//mediapipe/tasks/python/components/containers:landmark",
"//mediapipe/tasks/python/components/containers:landmark_detection_result",
"//mediapipe/tasks/python/components/containers:rect",
"//mediapipe/tasks/python/core:base_options",
"//mediapipe/tasks/python/test:test_utils",
"//mediapipe/tasks/python/vision:pose_landmarker",
"//mediapipe/tasks/python/vision/core:image_processing_options",
"//mediapipe/tasks/python/vision/core:vision_task_running_mode",
"@com_google_protobuf//:protobuf_python",
],
)

View File

@ -51,24 +51,27 @@ _PORTRAIT_IMAGE = 'portrait.jpg'
_CAT_IMAGE = 'cat.jpg'
_PORTRAIT_EXPECTED_FACE_LANDMARKS = 'portrait_expected_face_landmarks.pbtxt'
_PORTRAIT_EXPECTED_BLENDSHAPES = 'portrait_expected_blendshapes.pbtxt'
_LANDMARKS_DIFF_MARGIN = 0.03
_BLENDSHAPES_DIFF_MARGIN = 0.13
_FACIAL_TRANSFORMATION_MATRIX_DIFF_MARGIN = 0.02
_LANDMARKS_MARGIN = 0.03
_BLENDSHAPES_MARGIN = 0.13
_FACIAL_TRANSFORMATION_MATRIX_MARGIN = 0.02
def _get_expected_face_landmarks(file_path: str):
proto_file_path = test_utils.get_test_data_path(file_path)
face_landmarks_results = []
with open(proto_file_path, 'rb') as f:
proto = landmark_pb2.NormalizedLandmarkList()
text_format.Parse(f.read(), proto)
face_landmarks = []
for landmark in proto.landmark:
face_landmarks.append(_NormalizedLandmark.create_from_pb2(landmark))
return face_landmarks
face_landmarks_results.append(face_landmarks)
return face_landmarks_results
def _get_expected_face_blendshapes(file_path: str):
proto_file_path = test_utils.get_test_data_path(file_path)
face_blendshapes_results = []
with open(proto_file_path, 'rb') as f:
proto = classification_pb2.ClassificationList()
text_format.Parse(f.read(), proto)
@ -84,7 +87,8 @@ def _get_expected_face_blendshapes(file_path: str):
category_name=face_blendshapes.label,
)
)
return face_blendshapes_categories
face_blendshapes_results.append(face_blendshapes_categories)
return face_blendshapes_results
def _get_expected_facial_transformation_matrixes():
@ -119,12 +123,13 @@ class FaceLandmarkerTest(parameterized.TestCase):
# Expects to have the same number of faces detected.
self.assertLen(actual_landmarks, len(expected_landmarks))
for i, elem in enumerate(actual_landmarks):
for i, _ in enumerate(actual_landmarks):
for j, elem in enumerate(actual_landmarks[i]):
self.assertAlmostEqual(
elem.x, expected_landmarks[i].x, delta=_LANDMARKS_DIFF_MARGIN
elem.x, expected_landmarks[i][j].x, delta=_LANDMARKS_MARGIN
)
self.assertAlmostEqual(
elem.y, expected_landmarks[i].y, delta=_LANDMARKS_DIFF_MARGIN
elem.y, expected_landmarks[i][j].y, delta=_LANDMARKS_MARGIN
)
def _expect_blendshapes_correct(
@ -133,12 +138,13 @@ class FaceLandmarkerTest(parameterized.TestCase):
# Expects to have the same number of blendshapes.
self.assertLen(actual_blendshapes, len(expected_blendshapes))
for i, elem in enumerate(actual_blendshapes):
self.assertEqual(elem.index, expected_blendshapes[i].index)
for i, _ in enumerate(actual_blendshapes):
for j, elem in enumerate(actual_blendshapes[i]):
self.assertEqual(elem.index, expected_blendshapes[i][j].index)
self.assertAlmostEqual(
elem.score,
expected_blendshapes[i].score,
delta=_BLENDSHAPES_DIFF_MARGIN,
expected_blendshapes[i][j].score,
delta=_BLENDSHAPES_MARGIN,
)
def _expect_facial_transformation_matrixes_correct(
@ -152,7 +158,7 @@ class FaceLandmarkerTest(parameterized.TestCase):
self.assertSequenceAlmostEqual(
elem.flatten(),
expected_matrix_list[i].flatten(),
delta=_FACIAL_TRANSFORMATION_MATRIX_DIFF_MARGIN,
delta=_FACIAL_TRANSFORMATION_MATRIX_MARGIN,
)
def test_create_from_file_succeeds_with_valid_model_path(self):
@ -236,11 +242,11 @@ class FaceLandmarkerTest(parameterized.TestCase):
# Comparing results.
if expected_face_landmarks is not None:
self._expect_landmarks_correct(
detection_result.face_landmarks[0], expected_face_landmarks
detection_result.face_landmarks, expected_face_landmarks
)
if expected_face_blendshapes is not None:
self._expect_blendshapes_correct(
detection_result.face_blendshapes[0], expected_face_blendshapes
detection_result.face_blendshapes, expected_face_blendshapes
)
if expected_facial_transformation_matrixes is not None:
self._expect_facial_transformation_matrixes_correct(
@ -302,11 +308,11 @@ class FaceLandmarkerTest(parameterized.TestCase):
# Comparing results.
if expected_face_landmarks is not None:
self._expect_landmarks_correct(
detection_result.face_landmarks[0], expected_face_landmarks
detection_result.face_landmarks, expected_face_landmarks
)
if expected_face_blendshapes is not None:
self._expect_blendshapes_correct(
detection_result.face_blendshapes[0], expected_face_blendshapes
detection_result.face_blendshapes, expected_face_blendshapes
)
if expected_facial_transformation_matrixes is not None:
self._expect_facial_transformation_matrixes_correct(
@ -446,11 +452,11 @@ class FaceLandmarkerTest(parameterized.TestCase):
# Comparing results.
if expected_face_landmarks is not None:
self._expect_landmarks_correct(
detection_result.face_landmarks[0], expected_face_landmarks
detection_result.face_landmarks, expected_face_landmarks
)
if expected_face_blendshapes is not None:
self._expect_blendshapes_correct(
detection_result.face_blendshapes[0], expected_face_blendshapes
detection_result.face_blendshapes, expected_face_blendshapes
)
if expected_facial_transformation_matrixes is not None:
self._expect_facial_transformation_matrixes_correct(
@ -523,11 +529,11 @@ class FaceLandmarkerTest(parameterized.TestCase):
# Comparing results.
if expected_face_landmarks is not None:
self._expect_landmarks_correct(
result.face_landmarks[0], expected_face_landmarks
result.face_landmarks, expected_face_landmarks
)
if expected_face_blendshapes is not None:
self._expect_blendshapes_correct(
result.face_blendshapes[0], expected_face_blendshapes
result.face_blendshapes, expected_face_blendshapes
)
if expected_facial_transformation_matrixes is not None:
self._expect_facial_transformation_matrixes_correct(

View File

@ -32,12 +32,14 @@ from mediapipe.tasks.python.vision import hand_landmarker
from mediapipe.tasks.python.vision.core import image_processing_options as image_processing_options_module
from mediapipe.tasks.python.vision.core import vision_task_running_mode as running_mode_module
_LandmarksDetectionResultProto = landmarks_detection_result_pb2.LandmarksDetectionResult
_LandmarksDetectionResultProto = (
landmarks_detection_result_pb2.LandmarksDetectionResult)
_BaseOptions = base_options_module.BaseOptions
_Rect = rect_module.Rect
_Landmark = landmark_module.Landmark
_NormalizedLandmark = landmark_module.NormalizedLandmark
_LandmarksDetectionResult = landmark_detection_result_module.LandmarksDetectionResult
_LandmarksDetectionResult = (
landmark_detection_result_module.LandmarksDetectionResult)
_Image = image_module.Image
_HandLandmarker = hand_landmarker.HandLandmarker
_HandLandmarkerOptions = hand_landmarker.HandLandmarkerOptions
@ -54,7 +56,7 @@ _POINTING_UP_IMAGE = 'pointing_up.jpg'
_POINTING_UP_LANDMARKS = 'pointing_up_landmarks.pbtxt'
_POINTING_UP_ROTATED_IMAGE = 'pointing_up_rotated.jpg'
_POINTING_UP_ROTATED_LANDMARKS = 'pointing_up_rotated_landmarks.pbtxt'
_LANDMARKS_ERROR_TOLERANCE = 0.03
_LANDMARKS_MARGIN = 0.03
_HANDEDNESS_MARGIN = 0.05
@ -89,39 +91,43 @@ class HandLandmarkerTest(parameterized.TestCase):
self.model_path = test_utils.get_test_data_path(
_HAND_LANDMARKER_BUNDLE_ASSET_FILE)
def _assert_actual_result_approximately_matches_expected_result(
self, actual_result: _HandLandmarkerResult,
expected_result: _HandLandmarkerResult):
def _expect_hand_landmarks_correct(
self, actual_landmarks, expected_landmarks, margin
):
# Expects to have the same number of hands detected.
self.assertLen(actual_result.hand_landmarks,
len(expected_result.hand_landmarks))
self.assertLen(actual_result.hand_world_landmarks,
len(expected_result.hand_world_landmarks))
self.assertLen(actual_result.handedness, len(expected_result.handedness))
# Actual landmarks match expected landmarks.
self.assertLen(actual_result.hand_landmarks[0],
len(expected_result.hand_landmarks[0]))
actual_landmarks = actual_result.hand_landmarks[0]
expected_landmarks = expected_result.hand_landmarks[0]
for i, rename_me in enumerate(actual_landmarks):
self.assertAlmostEqual(
rename_me.x,
expected_landmarks[i].x,
delta=_LANDMARKS_ERROR_TOLERANCE)
self.assertAlmostEqual(
rename_me.y,
expected_landmarks[i].y,
delta=_LANDMARKS_ERROR_TOLERANCE)
# Actual handedness matches expected handedness.
actual_top_handedness = actual_result.handedness[0][0]
expected_top_handedness = expected_result.handedness[0][0]
self.assertLen(actual_landmarks, len(expected_landmarks))
for i, _ in enumerate(actual_landmarks):
for j, elem in enumerate(actual_landmarks[i]):
self.assertAlmostEqual(elem.x, expected_landmarks[i][j].x, delta=margin)
self.assertAlmostEqual(elem.y, expected_landmarks[i][j].y, delta=margin)
def _expect_handedness_correct(
self, actual_handedness, expected_handedness, margin
):
# Actual top handedness matches expected top handedness.
actual_top_handedness = actual_handedness[0][0]
expected_top_handedness = expected_handedness[0][0]
self.assertEqual(actual_top_handedness.index, expected_top_handedness.index)
self.assertEqual(actual_top_handedness.category_name,
expected_top_handedness.category_name)
self.assertAlmostEqual(
actual_top_handedness.score,
expected_top_handedness.score,
delta=_HANDEDNESS_MARGIN)
actual_top_handedness.score, expected_top_handedness.score, delta=margin
)
def _expect_hand_landmarker_results_correct(
self,
actual_result: _HandLandmarkerResult,
expected_result: _HandLandmarkerResult,
):
self._expect_hand_landmarks_correct(
actual_result.hand_landmarks,
expected_result.hand_landmarks,
_LANDMARKS_MARGIN,
)
self._expect_handedness_correct(
actual_result.handedness, expected_result.handedness, _HANDEDNESS_MARGIN
)
def test_create_from_file_succeeds_with_valid_model_path(self):
# Creates with default option and valid model file successfully.
@ -175,8 +181,9 @@ class HandLandmarkerTest(parameterized.TestCase):
# Performs hand landmarks detection on the input.
detection_result = landmarker.detect(self.test_image)
# Comparing results.
self._assert_actual_result_approximately_matches_expected_result(
detection_result, expected_detection_result)
self._expect_hand_landmarker_results_correct(
detection_result, expected_detection_result
)
# Closes the hand landmarker explicitly when the hand landmarker is not used
# in a context.
landmarker.close()
@ -203,8 +210,9 @@ class HandLandmarkerTest(parameterized.TestCase):
# Performs hand landmarks detection on the input.
detection_result = landmarker.detect(self.test_image)
# Comparing results.
self._assert_actual_result_approximately_matches_expected_result(
detection_result, expected_detection_result)
self._expect_hand_landmarker_results_correct(
detection_result, expected_detection_result
)
def test_detect_succeeds_with_num_hands(self):
# Creates hand landmarker.
@ -234,8 +242,9 @@ class HandLandmarkerTest(parameterized.TestCase):
expected_detection_result = _get_expected_hand_landmarker_result(
_POINTING_UP_ROTATED_LANDMARKS)
# Comparing results.
self._assert_actual_result_approximately_matches_expected_result(
detection_result, expected_detection_result)
self._expect_hand_landmarker_results_correct(
detection_result, expected_detection_result
)
def test_detect_fails_with_region_of_interest(self):
# Creates hand landmarker.
@ -350,9 +359,9 @@ class HandLandmarkerTest(parameterized.TestCase):
for timestamp in range(0, 300, 30):
result = landmarker.detect_for_video(test_image, timestamp,
image_processing_options)
if result.hand_landmarks and result.hand_world_landmarks and result.handedness:
self._assert_actual_result_approximately_matches_expected_result(
result, expected_result)
if (result.hand_landmarks and result.hand_world_landmarks and
result.handedness):
self._expect_hand_landmarker_results_correct(result, expected_result)
else:
self.assertEqual(result, expected_result)
@ -405,9 +414,9 @@ class HandLandmarkerTest(parameterized.TestCase):
def check_result(result: _HandLandmarkerResult, output_image: _Image,
timestamp_ms: int):
if result.hand_landmarks and result.hand_world_landmarks and result.handedness:
self._assert_actual_result_approximately_matches_expected_result(
result, expected_result)
if (result.hand_landmarks and result.hand_world_landmarks and
result.handedness):
self._expect_hand_landmarker_results_correct(result, expected_result)
else:
self.assertEqual(result, expected_result)
self.assertTrue(

View File

@ -0,0 +1,520 @@
# Copyright 2023 The MediaPipe Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for pose landmarker."""
import enum
from typing import List
from unittest import mock
from absl.testing import absltest
from absl.testing import parameterized
import numpy as np
from google.protobuf import text_format
from mediapipe.python._framework_bindings import image as image_module
from mediapipe.tasks.cc.components.containers.proto import landmarks_detection_result_pb2
from mediapipe.tasks.python.components.containers import landmark as landmark_module
from mediapipe.tasks.python.components.containers import landmark_detection_result as landmark_detection_result_module
from mediapipe.tasks.python.components.containers import rect as rect_module
from mediapipe.tasks.python.core import base_options as base_options_module
from mediapipe.tasks.python.test import test_utils
from mediapipe.tasks.python.vision import pose_landmarker
from mediapipe.tasks.python.vision.core import image_processing_options as image_processing_options_module
from mediapipe.tasks.python.vision.core import vision_task_running_mode as running_mode_module
PoseLandmarkerResult = pose_landmarker.PoseLandmarkerResult
_LandmarksDetectionResultProto = (
landmarks_detection_result_pb2.LandmarksDetectionResult
)
_BaseOptions = base_options_module.BaseOptions
_Rect = rect_module.Rect
_Landmark = landmark_module.Landmark
_NormalizedLandmark = landmark_module.NormalizedLandmark
_LandmarksDetectionResult = (
landmark_detection_result_module.LandmarksDetectionResult
)
_Image = image_module.Image
_PoseLandmarker = pose_landmarker.PoseLandmarker
_PoseLandmarkerOptions = pose_landmarker.PoseLandmarkerOptions
_RUNNING_MODE = running_mode_module.VisionTaskRunningMode
_ImageProcessingOptions = image_processing_options_module.ImageProcessingOptions
_POSE_LANDMARKER_BUNDLE_ASSET_FILE = 'pose_landmarker.task'
_BURGER_IMAGE = 'burger.jpg'
_POSE_IMAGE = 'pose.jpg'
_POSE_LANDMARKS = 'pose_landmarks.pbtxt'
_LANDMARKS_MARGIN = 0.03
def _get_expected_pose_landmarker_result(
file_path: str,
) -> PoseLandmarkerResult:
landmarks_detection_result_file_path = test_utils.get_test_data_path(
file_path
)
with open(landmarks_detection_result_file_path, 'rb') as f:
landmarks_detection_result_proto = _LandmarksDetectionResultProto()
# Use this if a .pb file is available.
# landmarks_detection_result_proto.ParseFromString(f.read())
text_format.Parse(f.read(), landmarks_detection_result_proto)
landmarks_detection_result = _LandmarksDetectionResult.create_from_pb2(
landmarks_detection_result_proto
)
return PoseLandmarkerResult(
pose_landmarks=[landmarks_detection_result.landmarks],
pose_world_landmarks=[],
pose_auxiliary_landmarks=[],
)
class ModelFileType(enum.Enum):
FILE_CONTENT = 1
FILE_NAME = 2
class PoseLandmarkerTest(parameterized.TestCase):
def setUp(self):
super().setUp()
self.test_image = _Image.create_from_file(
test_utils.get_test_data_path(_POSE_IMAGE)
)
self.model_path = test_utils.get_test_data_path(
_POSE_LANDMARKER_BUNDLE_ASSET_FILE
)
def _expect_pose_landmarks_correct(
self, actual_landmarks, expected_landmarks, margin
):
# Expects to have the same number of poses detected.
self.assertLen(actual_landmarks, len(expected_landmarks))
for i, _ in enumerate(actual_landmarks):
for j, elem in enumerate(actual_landmarks[i]):
self.assertAlmostEqual(elem.x, expected_landmarks[i][j].x, delta=margin)
self.assertAlmostEqual(elem.y, expected_landmarks[i][j].y, delta=margin)
def _expect_pose_landmarker_results_correct(
self,
actual_result: PoseLandmarkerResult,
expected_result: PoseLandmarkerResult,
output_segmentation_masks: bool,
margin: float,
):
self._expect_pose_landmarks_correct(
actual_result.pose_landmarks, expected_result.pose_landmarks, margin
)
if output_segmentation_masks:
self.assertIsInstance(actual_result.segmentation_masks, List)
for _, mask in enumerate(actual_result.segmentation_masks):
self.assertIsInstance(mask, _Image)
else:
self.assertIsNone(actual_result.segmentation_masks)
def test_create_from_file_succeeds_with_valid_model_path(self):
# Creates with default option and valid model file successfully.
with _PoseLandmarker.create_from_model_path(self.model_path) as landmarker:
self.assertIsInstance(landmarker, _PoseLandmarker)
def test_create_from_options_succeeds_with_valid_model_path(self):
# Creates with options containing model file successfully.
base_options = _BaseOptions(model_asset_path=self.model_path)
options = _PoseLandmarkerOptions(base_options=base_options)
with _PoseLandmarker.create_from_options(options) as landmarker:
self.assertIsInstance(landmarker, _PoseLandmarker)
def test_create_from_options_fails_with_invalid_model_path(self):
# Invalid empty model path.
with self.assertRaisesRegex(
RuntimeError, 'Unable to open file at /path/to/invalid/model.tflite'
):
base_options = _BaseOptions(
model_asset_path='/path/to/invalid/model.tflite'
)
options = _PoseLandmarkerOptions(base_options=base_options)
_PoseLandmarker.create_from_options(options)
def test_create_from_options_succeeds_with_valid_model_content(self):
# Creates with options containing model content successfully.
with open(self.model_path, 'rb') as f:
base_options = _BaseOptions(model_asset_buffer=f.read())
options = _PoseLandmarkerOptions(base_options=base_options)
landmarker = _PoseLandmarker.create_from_options(options)
self.assertIsInstance(landmarker, _PoseLandmarker)
@parameterized.parameters(
(
ModelFileType.FILE_NAME,
False,
_get_expected_pose_landmarker_result(_POSE_LANDMARKS),
),
(
ModelFileType.FILE_CONTENT,
False,
_get_expected_pose_landmarker_result(_POSE_LANDMARKS),
),
(
ModelFileType.FILE_NAME,
True,
_get_expected_pose_landmarker_result(_POSE_LANDMARKS),
),
(
ModelFileType.FILE_CONTENT,
True,
_get_expected_pose_landmarker_result(_POSE_LANDMARKS),
),
)
def test_detect(
self,
model_file_type,
output_segmentation_masks,
expected_detection_result,
):
# Creates pose landmarker.
if model_file_type is ModelFileType.FILE_NAME:
base_options = _BaseOptions(model_asset_path=self.model_path)
elif model_file_type is ModelFileType.FILE_CONTENT:
with open(self.model_path, 'rb') as f:
model_content = f.read()
base_options = _BaseOptions(model_asset_buffer=model_content)
else:
# Should never happen
raise ValueError('model_file_type is invalid.')
options = _PoseLandmarkerOptions(
base_options=base_options,
output_segmentation_masks=output_segmentation_masks,
)
landmarker = _PoseLandmarker.create_from_options(options)
# Performs pose landmarks detection on the input.
detection_result = landmarker.detect(self.test_image)
# Comparing results.
self._expect_pose_landmarker_results_correct(
detection_result,
expected_detection_result,
output_segmentation_masks,
_LANDMARKS_MARGIN,
)
# Closes the pose landmarker explicitly when the pose landmarker is not used
# in a context.
landmarker.close()
@parameterized.parameters(
(
ModelFileType.FILE_NAME,
False,
_get_expected_pose_landmarker_result(_POSE_LANDMARKS),
),
(
ModelFileType.FILE_CONTENT,
False,
_get_expected_pose_landmarker_result(_POSE_LANDMARKS),
),
(
ModelFileType.FILE_NAME,
True,
_get_expected_pose_landmarker_result(_POSE_LANDMARKS),
),
(
ModelFileType.FILE_CONTENT,
True,
_get_expected_pose_landmarker_result(_POSE_LANDMARKS),
),
)
def test_detect_in_context(
self,
model_file_type,
output_segmentation_masks,
expected_detection_result,
):
# Creates pose landmarker.
if model_file_type is ModelFileType.FILE_NAME:
base_options = _BaseOptions(model_asset_path=self.model_path)
elif model_file_type is ModelFileType.FILE_CONTENT:
with open(self.model_path, 'rb') as f:
model_content = f.read()
base_options = _BaseOptions(model_asset_buffer=model_content)
else:
# Should never happen
raise ValueError('model_file_type is invalid.')
options = _PoseLandmarkerOptions(
base_options=base_options,
output_segmentation_masks=output_segmentation_masks,
)
with _PoseLandmarker.create_from_options(options) as landmarker:
# Performs pose landmarks detection on the input.
detection_result = landmarker.detect(self.test_image)
# Comparing results.
self._expect_pose_landmarker_results_correct(
detection_result,
expected_detection_result,
output_segmentation_masks,
_LANDMARKS_MARGIN,
)
def test_detect_fails_with_region_of_interest(self):
# Creates pose landmarker.
base_options = _BaseOptions(model_asset_path=self.model_path)
options = _PoseLandmarkerOptions(base_options=base_options)
with self.assertRaisesRegex(
ValueError, "This task doesn't support region-of-interest."
):
with _PoseLandmarker.create_from_options(options) as landmarker:
# Set the `region_of_interest` parameter using `ImageProcessingOptions`.
image_processing_options = _ImageProcessingOptions(
region_of_interest=_Rect(0, 0, 1, 1)
)
# Attempt to perform pose landmarks detection on the cropped input.
landmarker.detect(self.test_image, image_processing_options)
def test_empty_detection_outputs(self):
# Creates pose landmarker.
base_options = _BaseOptions(model_asset_path=self.model_path)
options = _PoseLandmarkerOptions(base_options=base_options)
with _PoseLandmarker.create_from_options(options) as landmarker:
# Load an image with no poses.
test_image = _Image.create_from_file(
test_utils.get_test_data_path(_BURGER_IMAGE)
)
# Performs pose landmarks detection on the input.
detection_result = landmarker.detect(test_image)
# Comparing results.
self.assertEmpty(detection_result.pose_landmarks)
self.assertEmpty(detection_result.pose_world_landmarks)
self.assertEmpty(detection_result.pose_auxiliary_landmarks)
def test_missing_result_callback(self):
options = _PoseLandmarkerOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.LIVE_STREAM,
)
with self.assertRaisesRegex(
ValueError, r'result callback must be provided'
):
with _PoseLandmarker.create_from_options(options) as unused_landmarker:
pass
@parameterized.parameters((_RUNNING_MODE.IMAGE), (_RUNNING_MODE.VIDEO))
def test_illegal_result_callback(self, running_mode):
options = _PoseLandmarkerOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=running_mode,
result_callback=mock.MagicMock(),
)
with self.assertRaisesRegex(
ValueError, r'result callback should not be provided'
):
with _PoseLandmarker.create_from_options(options) as unused_landmarker:
pass
def test_calling_detect_for_video_in_image_mode(self):
options = _PoseLandmarkerOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.IMAGE,
)
with _PoseLandmarker.create_from_options(options) as landmarker:
with self.assertRaisesRegex(
ValueError, r'not initialized with the video mode'
):
landmarker.detect_for_video(self.test_image, 0)
def test_calling_detect_async_in_image_mode(self):
options = _PoseLandmarkerOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.IMAGE,
)
with _PoseLandmarker.create_from_options(options) as landmarker:
with self.assertRaisesRegex(
ValueError, r'not initialized with the live stream mode'
):
landmarker.detect_async(self.test_image, 0)
def test_calling_detect_in_video_mode(self):
options = _PoseLandmarkerOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.VIDEO,
)
with _PoseLandmarker.create_from_options(options) as landmarker:
with self.assertRaisesRegex(
ValueError, r'not initialized with the image mode'
):
landmarker.detect(self.test_image)
def test_calling_detect_async_in_video_mode(self):
options = _PoseLandmarkerOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.VIDEO,
)
with _PoseLandmarker.create_from_options(options) as landmarker:
with self.assertRaisesRegex(
ValueError, r'not initialized with the live stream mode'
):
landmarker.detect_async(self.test_image, 0)
def test_detect_for_video_with_out_of_order_timestamp(self):
options = _PoseLandmarkerOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.VIDEO,
)
with _PoseLandmarker.create_from_options(options) as landmarker:
unused_result = landmarker.detect_for_video(self.test_image, 1)
with self.assertRaisesRegex(
ValueError, r'Input timestamp must be monotonically increasing'
):
landmarker.detect_for_video(self.test_image, 0)
@parameterized.parameters(
(
_POSE_IMAGE,
0,
False,
_get_expected_pose_landmarker_result(_POSE_LANDMARKS),
),
(
_POSE_IMAGE,
0,
True,
_get_expected_pose_landmarker_result(_POSE_LANDMARKS),
),
(_BURGER_IMAGE, 0, False, PoseLandmarkerResult([], [], [])),
)
def test_detect_for_video(
self, image_path, rotation, output_segmentation_masks, expected_result
):
test_image = _Image.create_from_file(
test_utils.get_test_data_path(image_path)
)
# Set rotation parameters using ImageProcessingOptions.
image_processing_options = _ImageProcessingOptions(
rotation_degrees=rotation
)
options = _PoseLandmarkerOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
output_segmentation_masks=output_segmentation_masks,
running_mode=_RUNNING_MODE.VIDEO,
)
with _PoseLandmarker.create_from_options(options) as landmarker:
for timestamp in range(0, 300, 30):
result = landmarker.detect_for_video(
test_image, timestamp, image_processing_options
)
if result.pose_landmarks:
self._expect_pose_landmarker_results_correct(
result,
expected_result,
output_segmentation_masks,
_LANDMARKS_MARGIN,
)
else:
self.assertEqual(result, expected_result)
def test_calling_detect_in_live_stream_mode(self):
options = _PoseLandmarkerOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.LIVE_STREAM,
result_callback=mock.MagicMock(),
)
with _PoseLandmarker.create_from_options(options) as landmarker:
with self.assertRaisesRegex(
ValueError, r'not initialized with the image mode'
):
landmarker.detect(self.test_image)
def test_calling_detect_for_video_in_live_stream_mode(self):
options = _PoseLandmarkerOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.LIVE_STREAM,
result_callback=mock.MagicMock(),
)
with _PoseLandmarker.create_from_options(options) as landmarker:
with self.assertRaisesRegex(
ValueError, r'not initialized with the video mode'
):
landmarker.detect_for_video(self.test_image, 0)
def test_detect_async_calls_with_illegal_timestamp(self):
options = _PoseLandmarkerOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.LIVE_STREAM,
result_callback=mock.MagicMock(),
)
with _PoseLandmarker.create_from_options(options) as landmarker:
landmarker.detect_async(self.test_image, 100)
with self.assertRaisesRegex(
ValueError, r'Input timestamp must be monotonically increasing'
):
landmarker.detect_async(self.test_image, 0)
@parameterized.parameters(
(
_POSE_IMAGE,
0,
False,
_get_expected_pose_landmarker_result(_POSE_LANDMARKS),
),
(
_POSE_IMAGE,
0,
True,
_get_expected_pose_landmarker_result(_POSE_LANDMARKS),
),
(_BURGER_IMAGE, 0, False, PoseLandmarkerResult([], [], [])),
)
def test_detect_async_calls(
self, image_path, rotation, output_segmentation_masks, expected_result
):
test_image = _Image.create_from_file(
test_utils.get_test_data_path(image_path)
)
# Set rotation parameters using ImageProcessingOptions.
image_processing_options = _ImageProcessingOptions(
rotation_degrees=rotation
)
observed_timestamp_ms = -1
def check_result(
result: PoseLandmarkerResult, output_image: _Image, timestamp_ms: int
):
if result.pose_landmarks:
self._expect_pose_landmarker_results_correct(
result,
expected_result,
output_segmentation_masks,
_LANDMARKS_MARGIN,
)
else:
self.assertEqual(result, expected_result)
self.assertTrue(
np.array_equal(output_image.numpy_view(), test_image.numpy_view())
)
self.assertLess(observed_timestamp_ms, timestamp_ms)
self.observed_timestamp_ms = timestamp_ms
options = _PoseLandmarkerOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
output_segmentation_masks=output_segmentation_masks,
running_mode=_RUNNING_MODE.LIVE_STREAM,
result_callback=check_result,
)
with _PoseLandmarker.create_from_options(options) as landmarker:
for timestamp in range(0, 300, 30):
landmarker.detect_async(test_image, timestamp, image_processing_options)
if __name__ == '__main__':
absltest.main()

View File

@ -180,6 +180,27 @@ py_library(
],
)
py_library(
name = "pose_landmarker",
srcs = [
"pose_landmarker.py",
],
deps = [
"//mediapipe/framework/formats:landmark_py_pb2",
"//mediapipe/python:_framework_bindings",
"//mediapipe/python:packet_creator",
"//mediapipe/python:packet_getter",
"//mediapipe/tasks/cc/vision/pose_landmarker/proto:pose_landmarker_graph_options_py_pb2",
"//mediapipe/tasks/python/components/containers:landmark",
"//mediapipe/tasks/python/core:base_options",
"//mediapipe/tasks/python/core:optional_dependencies",
"//mediapipe/tasks/python/core:task_info",
"//mediapipe/tasks/python/vision/core:base_vision_task_api",
"//mediapipe/tasks/python/vision/core:image_processing_options",
"//mediapipe/tasks/python/vision/core:vision_task_running_mode",
],
)
py_library(
name = "face_detector",
srcs = [

View File

@ -71,8 +71,8 @@ class FaceDetectorOptions:
base_options: _BaseOptions
running_mode: _RunningMode = _RunningMode.IMAGE
min_detection_confidence: Optional[float] = None
min_suppression_threshold: Optional[float] = None
min_detection_confidence: float = 0.5
min_suppression_threshold: float = 0.3
result_callback: Optional[
Callable[
[detections_module.DetectionResult, image_module.Image, int], None

View File

@ -2966,12 +2966,12 @@ class FaceLandmarkerOptions:
base_options: _BaseOptions
running_mode: _RunningMode = _RunningMode.IMAGE
num_faces: Optional[int] = 1
min_face_detection_confidence: Optional[float] = 0.5
min_face_presence_confidence: Optional[float] = 0.5
min_tracking_confidence: Optional[float] = 0.5
output_face_blendshapes: Optional[bool] = False
output_facial_transformation_matrixes: Optional[bool] = False
num_faces: int = 1
min_face_detection_confidence: float = 0.5
min_face_presence_confidence: float = 0.5
min_tracking_confidence: float = 0.5
output_face_blendshapes: bool = False
output_facial_transformation_matrixes: bool = False
result_callback: Optional[
Callable[[FaceLandmarkerResult, image_module.Image, int], None]
] = None

View File

@ -194,15 +194,15 @@ class GestureRecognizerOptions:
base_options: _BaseOptions
running_mode: _RunningMode = _RunningMode.IMAGE
num_hands: Optional[int] = 1
min_hand_detection_confidence: Optional[float] = 0.5
min_hand_presence_confidence: Optional[float] = 0.5
min_tracking_confidence: Optional[float] = 0.5
canned_gesture_classifier_options: Optional[_ClassifierOptions] = (
dataclasses.field(default_factory=_ClassifierOptions)
num_hands: int = 1
min_hand_detection_confidence: float = 0.5
min_hand_presence_confidence: float = 0.5
min_tracking_confidence: float = 0.5
canned_gesture_classifier_options: _ClassifierOptions = dataclasses.field(
default_factory=_ClassifierOptions
)
custom_gesture_classifier_options: Optional[_ClassifierOptions] = (
dataclasses.field(default_factory=_ClassifierOptions)
custom_gesture_classifier_options: _ClassifierOptions = dataclasses.field(
default_factory=_ClassifierOptions
)
result_callback: Optional[
Callable[[GestureRecognizerResult, image_module.Image, int], None]

View File

@ -182,10 +182,10 @@ class HandLandmarkerOptions:
base_options: _BaseOptions
running_mode: _RunningMode = _RunningMode.IMAGE
num_hands: Optional[int] = 1
min_hand_detection_confidence: Optional[float] = 0.5
min_hand_presence_confidence: Optional[float] = 0.5
min_tracking_confidence: Optional[float] = 0.5
num_hands: int = 1
min_hand_detection_confidence: float = 0.5
min_hand_presence_confidence: float = 0.5
min_tracking_confidence: float = 0.5
result_callback: Optional[
Callable[[HandLandmarkerResult, image_module.Image, int], None]
] = None

View File

@ -0,0 +1,431 @@
# Copyright 2023 The MediaPipe Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""MediaPipe pose landmarker task."""
import dataclasses
from typing import Callable, Mapping, Optional, List
from mediapipe.framework.formats import landmark_pb2
from mediapipe.python import packet_creator
from mediapipe.python import packet_getter
from mediapipe.python._framework_bindings import image as image_module
from mediapipe.python._framework_bindings import packet as packet_module
from mediapipe.tasks.cc.vision.pose_landmarker.proto import pose_landmarker_graph_options_pb2
from mediapipe.tasks.python.components.containers import landmark as landmark_module
from mediapipe.tasks.python.core import base_options as base_options_module
from mediapipe.tasks.python.core import task_info as task_info_module
from mediapipe.tasks.python.core.optional_dependencies import doc_controls
from mediapipe.tasks.python.vision.core import base_vision_task_api
from mediapipe.tasks.python.vision.core import image_processing_options as image_processing_options_module
from mediapipe.tasks.python.vision.core import vision_task_running_mode as running_mode_module
_BaseOptions = base_options_module.BaseOptions
_PoseLandmarkerGraphOptionsProto = (
pose_landmarker_graph_options_pb2.PoseLandmarkerGraphOptions
)
_RunningMode = running_mode_module.VisionTaskRunningMode
_ImageProcessingOptions = image_processing_options_module.ImageProcessingOptions
_TaskInfo = task_info_module.TaskInfo
_IMAGE_IN_STREAM_NAME = 'image_in'
_IMAGE_OUT_STREAM_NAME = 'image_out'
_IMAGE_TAG = 'IMAGE'
_NORM_RECT_STREAM_NAME = 'norm_rect_in'
_NORM_RECT_TAG = 'NORM_RECT'
_SEGMENTATION_MASK_STREAM_NAME = 'segmentation_mask'
_SEGMENTATION_MASK_TAG = 'SEGMENTATION_MASK'
_NORM_LANDMARKS_STREAM_NAME = 'norm_landmarks'
_NORM_LANDMARKS_TAG = 'NORM_LANDMARKS'
_POSE_WORLD_LANDMARKS_STREAM_NAME = 'world_landmarks'
_POSE_WORLD_LANDMARKS_TAG = 'WORLD_LANDMARKS'
_POSE_AUXILIARY_LANDMARKS_STREAM_NAME = 'auxiliary_landmarks'
_POSE_AUXILIARY_LANDMARKS_TAG = 'AUXILIARY_LANDMARKS'
_TASK_GRAPH_NAME = 'mediapipe.tasks.vision.pose_landmarker.PoseLandmarkerGraph'
_MICRO_SECONDS_PER_MILLISECOND = 1000
@dataclasses.dataclass
class PoseLandmarkerResult:
"""The pose landmarks detection result from PoseLandmarker, where each vector element represents a single pose detected in the image.
Attributes:
pose_landmarks: Detected pose landmarks in normalized image coordinates.
pose_world_landmarks: Detected pose landmarks in world coordinates.
pose_auxiliary_landmarks: Detected auxiliary landmarks, used for deriving
ROI for next frame.
segmentation_masks: Optional segmentation masks for pose.
"""
pose_landmarks: List[List[landmark_module.NormalizedLandmark]]
pose_world_landmarks: List[List[landmark_module.Landmark]]
pose_auxiliary_landmarks: List[List[landmark_module.NormalizedLandmark]]
segmentation_masks: Optional[List[image_module.Image]] = None
def _build_landmarker_result(
output_packets: Mapping[str, packet_module.Packet]
) -> PoseLandmarkerResult:
"""Constructs a `PoseLandmarkerResult` from output packets."""
pose_landmarker_result = PoseLandmarkerResult([], [], [])
if _SEGMENTATION_MASK_STREAM_NAME in output_packets:
pose_landmarker_result.segmentation_masks = packet_getter.get_image_list(
output_packets[_SEGMENTATION_MASK_STREAM_NAME]
)
pose_landmarks_proto_list = packet_getter.get_proto_list(
output_packets[_NORM_LANDMARKS_STREAM_NAME]
)
pose_world_landmarks_proto_list = packet_getter.get_proto_list(
output_packets[_POSE_WORLD_LANDMARKS_STREAM_NAME]
)
pose_auxiliary_landmarks_proto_list = packet_getter.get_proto_list(
output_packets[_POSE_AUXILIARY_LANDMARKS_STREAM_NAME]
)
for proto in pose_landmarks_proto_list:
pose_landmarks = landmark_pb2.NormalizedLandmarkList()
pose_landmarks.MergeFrom(proto)
pose_landmarks_list = []
for pose_landmark in pose_landmarks.landmark:
pose_landmarks_list.append(
landmark_module.NormalizedLandmark.create_from_pb2(pose_landmark)
)
pose_landmarker_result.pose_landmarks.append(pose_landmarks_list)
for proto in pose_world_landmarks_proto_list:
pose_world_landmarks = landmark_pb2.LandmarkList()
pose_world_landmarks.MergeFrom(proto)
pose_world_landmarks_list = []
for pose_world_landmark in pose_world_landmarks.landmark:
pose_world_landmarks_list.append(
landmark_module.Landmark.create_from_pb2(pose_world_landmark)
)
pose_landmarker_result.pose_world_landmarks.append(
pose_world_landmarks_list
)
for proto in pose_auxiliary_landmarks_proto_list:
pose_auxiliary_landmarks = landmark_pb2.NormalizedLandmarkList()
pose_auxiliary_landmarks.MergeFrom(proto)
pose_auxiliary_landmarks_list = []
for pose_auxiliary_landmark in pose_auxiliary_landmarks.landmark:
pose_auxiliary_landmarks_list.append(
landmark_module.NormalizedLandmark.create_from_pb2(
pose_auxiliary_landmark
)
)
pose_landmarker_result.pose_auxiliary_landmarks.append(
pose_auxiliary_landmarks_list
)
return pose_landmarker_result
@dataclasses.dataclass
class PoseLandmarkerOptions:
"""Options for the pose landmarker task.
Attributes:
base_options: Base options for the pose landmarker task.
running_mode: The running mode of the task. Default to the image mode.
PoseLandmarker has three running modes: 1) The image mode for detecting
pose landmarks on single image inputs. 2) The video mode for detecting
pose landmarks on the decoded frames of a video. 3) The live stream mode
for detecting pose landmarks on the live stream of input data, such as
from camera. In this mode, the "result_callback" below must be specified
to receive the detection results asynchronously.
num_poses: The maximum number of poses can be detected by the
PoseLandmarker.
min_pose_detection_confidence: The minimum confidence score for the pose
detection to be considered successful.
min_pose_presence_confidence: The minimum confidence score of pose presence
score in the pose landmark detection.
min_tracking_confidence: The minimum confidence score for the pose tracking
to be considered successful.
output_segmentation_masks: whether to output segmentation masks.
result_callback: The user-defined result callback for processing live stream
data. The result callback should only be specified when the running mode
is set to the live stream mode.
"""
base_options: _BaseOptions
running_mode: _RunningMode = _RunningMode.IMAGE
num_poses: int = 1
min_pose_detection_confidence: float = 0.5
min_pose_presence_confidence: float = 0.5
min_tracking_confidence: float = 0.5
output_segmentation_masks: bool = False
result_callback: Optional[
Callable[[PoseLandmarkerResult, image_module.Image, int], None]
] = None
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _PoseLandmarkerGraphOptionsProto:
"""Generates an PoseLandmarkerGraphOptions protobuf object."""
base_options_proto = self.base_options.to_pb2()
base_options_proto.use_stream_mode = (
False if self.running_mode == _RunningMode.IMAGE else True
)
# Initialize the pose landmarker options from base options.
pose_landmarker_options_proto = _PoseLandmarkerGraphOptionsProto(
base_options=base_options_proto
)
pose_landmarker_options_proto.min_tracking_confidence = (
self.min_tracking_confidence
)
pose_landmarker_options_proto.pose_detector_graph_options.num_poses = (
self.num_poses
)
pose_landmarker_options_proto.pose_detector_graph_options.min_detection_confidence = (
self.min_pose_detection_confidence
)
pose_landmarker_options_proto.pose_landmarks_detector_graph_options.min_detection_confidence = (
self.min_pose_presence_confidence
)
return pose_landmarker_options_proto
class PoseLandmarker(base_vision_task_api.BaseVisionTaskApi):
"""Class that performs pose landmarks detection on images."""
@classmethod
def create_from_model_path(cls, model_path: str) -> 'PoseLandmarker':
"""Creates a `PoseLandmarker` object from a model bundle file and the default `PoseLandmarkerOptions`.
Note that the created `PoseLandmarker` instance is in image mode, for
detecting pose landmarks on single image inputs.
Args:
model_path: Path to the model.
Returns:
`PoseLandmarker` object that's created from the model file and the
default `PoseLandmarkerOptions`.
Raises:
ValueError: If failed to create `PoseLandmarker` object from the
provided file such as invalid file path.
RuntimeError: If other types of error occurred.
"""
base_options = _BaseOptions(model_asset_path=model_path)
options = PoseLandmarkerOptions(
base_options=base_options, running_mode=_RunningMode.IMAGE
)
return cls.create_from_options(options)
@classmethod
def create_from_options(
cls, options: PoseLandmarkerOptions
) -> 'PoseLandmarker':
"""Creates the `PoseLandmarker` object from pose landmarker options.
Args:
options: Options for the pose landmarker task.
Returns:
`PoseLandmarker` object that's created from `options`.
Raises:
ValueError: If failed to create `PoseLandmarker` object from
`PoseLandmarkerOptions` such as missing the model.
RuntimeError: If other types of error occurred.
"""
def packets_callback(output_packets: Mapping[str, packet_module.Packet]):
if output_packets[_IMAGE_OUT_STREAM_NAME].is_empty():
return
image = packet_getter.get_image(output_packets[_IMAGE_OUT_STREAM_NAME])
if output_packets[_NORM_LANDMARKS_STREAM_NAME].is_empty():
empty_packet = output_packets[_NORM_LANDMARKS_STREAM_NAME]
options.result_callback(
PoseLandmarkerResult([], [], []),
image,
empty_packet.timestamp.value // _MICRO_SECONDS_PER_MILLISECOND,
)
return
pose_landmarker_result = _build_landmarker_result(output_packets)
timestamp = output_packets[_NORM_LANDMARKS_STREAM_NAME].timestamp
options.result_callback(
pose_landmarker_result,
image,
timestamp.value // _MICRO_SECONDS_PER_MILLISECOND,
)
output_streams = [
':'.join([_NORM_LANDMARKS_TAG, _NORM_LANDMARKS_STREAM_NAME]),
':'.join(
[_POSE_WORLD_LANDMARKS_TAG, _POSE_WORLD_LANDMARKS_STREAM_NAME]
),
':'.join([
_POSE_AUXILIARY_LANDMARKS_TAG,
_POSE_AUXILIARY_LANDMARKS_STREAM_NAME,
]),
':'.join([_IMAGE_TAG, _IMAGE_OUT_STREAM_NAME]),
]
if options.output_segmentation_masks:
output_streams.append(
':'.join([_SEGMENTATION_MASK_TAG, _SEGMENTATION_MASK_STREAM_NAME])
)
task_info = _TaskInfo(
task_graph=_TASK_GRAPH_NAME,
input_streams=[
':'.join([_IMAGE_TAG, _IMAGE_IN_STREAM_NAME]),
':'.join([_NORM_RECT_TAG, _NORM_RECT_STREAM_NAME]),
],
output_streams=output_streams,
task_options=options,
)
return cls(
task_info.generate_graph_config(
enable_flow_limiting=options.running_mode
== _RunningMode.LIVE_STREAM
),
options.running_mode,
packets_callback if options.result_callback else None,
)
def detect(
self,
image: image_module.Image,
image_processing_options: Optional[_ImageProcessingOptions] = None,
) -> PoseLandmarkerResult:
"""Performs pose landmarks detection on the given image.
Only use this method when the PoseLandmarker is created with the image
running mode.
Args:
image: MediaPipe Image.
image_processing_options: Options for image processing.
Returns:
The pose landmarker detection results.
Raises:
ValueError: If any of the input arguments is invalid.
RuntimeError: If pose landmarker detection failed to run.
"""
normalized_rect = self.convert_to_normalized_rect(
image_processing_options, image, roi_allowed=False
)
output_packets = self._process_image_data({
_IMAGE_IN_STREAM_NAME: packet_creator.create_image(image),
_NORM_RECT_STREAM_NAME: packet_creator.create_proto(
normalized_rect.to_pb2()
),
})
if output_packets[_NORM_LANDMARKS_STREAM_NAME].is_empty():
return PoseLandmarkerResult([], [], [])
return _build_landmarker_result(output_packets)
def detect_for_video(
self,
image: image_module.Image,
timestamp_ms: int,
image_processing_options: Optional[_ImageProcessingOptions] = None,
) -> PoseLandmarkerResult:
"""Performs pose landmarks detection on the provided video frame.
Only use this method when the PoseLandmarker is created with the video
running mode.
Only use this method when the PoseLandmarker is created with the video
running mode. It's required to provide the video frame's timestamp (in
milliseconds) along with the video frame. The input timestamps should be
monotonically increasing for adjacent calls of this method.
Args:
image: MediaPipe Image.
timestamp_ms: The timestamp of the input video frame in milliseconds.
image_processing_options: Options for image processing.
Returns:
The pose landmarker detection results.
Raises:
ValueError: If any of the input arguments is invalid.
RuntimeError: If pose landmarker detection failed to run.
"""
normalized_rect = self.convert_to_normalized_rect(
image_processing_options, image, roi_allowed=False
)
output_packets = self._process_video_data({
_IMAGE_IN_STREAM_NAME: packet_creator.create_image(image).at(
timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND
),
_NORM_RECT_STREAM_NAME: packet_creator.create_proto(
normalized_rect.to_pb2()
).at(timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND),
})
if output_packets[_NORM_LANDMARKS_STREAM_NAME].is_empty():
return PoseLandmarkerResult([], [], [])
return _build_landmarker_result(output_packets)
def detect_async(
self,
image: image_module.Image,
timestamp_ms: int,
image_processing_options: Optional[_ImageProcessingOptions] = None,
) -> None:
"""Sends live image data to perform pose landmarks detection.
The results will be available via the "result_callback" provided in the
PoseLandmarkerOptions. Only use this method when the PoseLandmarker is
created with the live stream running mode.
Only use this method when the PoseLandmarker is created with the live
stream running mode. The input timestamps should be monotonically increasing
for adjacent calls of this method. This method will return immediately after
the input image is accepted. The results will be available via the
`result_callback` provided in the `PoseLandmarkerOptions`. The
`detect_async` method is designed to process live stream data such as
camera input. To lower the overall latency, pose landmarker may drop the
input images if needed. In other words, it's not guaranteed to have output
per input image.
The `result_callback` provides:
- The pose landmarker detection results.
- The input image that the pose landmarker runs on.
- The input timestamp in milliseconds.
Args:
image: MediaPipe Image.
timestamp_ms: The timestamp of the input image in milliseconds.
image_processing_options: Options for image processing.
Raises:
ValueError: If the current input timestamp is smaller than what the
pose landmarker has already processed.
"""
normalized_rect = self.convert_to_normalized_rect(
image_processing_options, image, roi_allowed=False
)
self._send_live_stream_data({
_IMAGE_IN_STREAM_NAME: packet_creator.create_image(image).at(
timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND
),
_NORM_RECT_STREAM_NAME: packet_creator.create_proto(
normalized_rect.to_pb2()
).at(timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND),
})