Merge pull request #4158 from kinaryml:face-detector-python

PiperOrigin-RevId: 516970627
This commit is contained in:
Copybara-Service 2023-03-15 17:13:37 -07:00
commit d84ccbadad
8 changed files with 1036 additions and 7 deletions

View File

@ -95,6 +95,7 @@ cc_library(
"//mediapipe/tasks/cc/vision/image_embedder:image_embedder_graph", "//mediapipe/tasks/cc/vision/image_embedder:image_embedder_graph",
"//mediapipe/tasks/cc/vision/image_segmenter:image_segmenter_graph", "//mediapipe/tasks/cc/vision/image_segmenter:image_segmenter_graph",
"//mediapipe/tasks/cc/vision/object_detector:object_detector_graph", "//mediapipe/tasks/cc/vision/object_detector:object_detector_graph",
"//mediapipe/tasks/cc/vision/face_detector:face_detector_graph",
] + select({ ] + select({
# TODO: Build text_classifier_graph and text_embedder_graph on Windows. # TODO: Build text_classifier_graph and text_embedder_graph on Windows.
"//mediapipe:windows": [], "//mediapipe:windows": [],

View File

@ -73,12 +73,22 @@ py_library(
], ],
) )
py_library(
name = "keypoint",
srcs = ["keypoint.py"],
deps = [
"//mediapipe/framework/formats:location_data_py_pb2",
"//mediapipe/tasks/python/core:optional_dependencies",
],
)
py_library( py_library(
name = "detections", name = "detections",
srcs = ["detections.py"], srcs = ["detections.py"],
deps = [ deps = [
":bounding_box", ":bounding_box",
":category", ":category",
":keypoint",
"//mediapipe/framework/formats:detection_py_pb2", "//mediapipe/framework/formats:detection_py_pb2",
"//mediapipe/framework/formats:location_data_py_pb2", "//mediapipe/framework/formats:location_data_py_pb2",
"//mediapipe/tasks/python/core:optional_dependencies", "//mediapipe/tasks/python/core:optional_dependencies",

View File

@ -14,12 +14,13 @@
"""Detections data class.""" """Detections data class."""
import dataclasses import dataclasses
from typing import Any, List from typing import Any, List, Optional
from mediapipe.framework.formats import detection_pb2 from mediapipe.framework.formats import detection_pb2
from mediapipe.framework.formats import location_data_pb2 from mediapipe.framework.formats import location_data_pb2
from mediapipe.tasks.python.components.containers import bounding_box as bounding_box_module from mediapipe.tasks.python.components.containers import bounding_box as bounding_box_module
from mediapipe.tasks.python.components.containers import category as category_module from mediapipe.tasks.python.components.containers import category as category_module
from mediapipe.tasks.python.components.containers import keypoint as keypoint_module
from mediapipe.tasks.python.core.optional_dependencies import doc_controls from mediapipe.tasks.python.core.optional_dependencies import doc_controls
_DetectionListProto = detection_pb2.DetectionList _DetectionListProto = detection_pb2.DetectionList
@ -34,10 +35,12 @@ class Detection:
Attributes: Attributes:
bounding_box: A BoundingBox object. bounding_box: A BoundingBox object.
categories: A list of Category objects. categories: A list of Category objects.
keypoints: A list of NormalizedKeypoint objects.
""" """
bounding_box: bounding_box_module.BoundingBox bounding_box: bounding_box_module.BoundingBox
categories: List[category_module.Category] categories: List[category_module.Category]
keypoints: Optional[List[keypoint_module.NormalizedKeypoint]] = None
@doc_controls.do_not_generate_docs @doc_controls.do_not_generate_docs
def to_pb2(self) -> _DetectionProto: def to_pb2(self) -> _DetectionProto:
@ -46,6 +49,8 @@ class Detection:
label_ids = [] label_ids = []
scores = [] scores = []
display_names = [] display_names = []
relative_keypoints = []
for category in self.categories: for category in self.categories:
scores.append(category.score) scores.append(category.score)
if category.index: if category.index:
@ -54,6 +59,20 @@ class Detection:
labels.append(category.category_name) labels.append(category.category_name)
if category.display_name: if category.display_name:
display_names.append(category.display_name) display_names.append(category.display_name)
if self.keypoints:
for keypoint in self.keypoints:
relative_keypoint_proto = _LocationDataProto.RelativeKeypoint()
if keypoint.x:
relative_keypoint_proto.x = keypoint.x
if keypoint.y:
relative_keypoint_proto.y = keypoint.y
if keypoint.label:
relative_keypoint_proto.keypoint_label = keypoint.label
if keypoint.score:
relative_keypoint_proto.score = keypoint.score
relative_keypoints.append(relative_keypoint_proto)
return _DetectionProto( return _DetectionProto(
label=labels, label=labels,
label_id=label_ids, label_id=label_ids,
@ -61,28 +80,52 @@ class Detection:
display_name=display_names, display_name=display_names,
location_data=_LocationDataProto( location_data=_LocationDataProto(
format=_LocationDataProto.Format.BOUNDING_BOX, format=_LocationDataProto.Format.BOUNDING_BOX,
bounding_box=self.bounding_box.to_pb2())) bounding_box=self.bounding_box.to_pb2(),
relative_keypoints=relative_keypoints,
),
)
@classmethod @classmethod
@doc_controls.do_not_generate_docs @doc_controls.do_not_generate_docs
def create_from_pb2(cls, pb2_obj: _DetectionProto) -> 'Detection': def create_from_pb2(cls, pb2_obj: _DetectionProto) -> 'Detection':
"""Creates a `Detection` object from the given protobuf object.""" """Creates a `Detection` object from the given protobuf object."""
categories = [] categories = []
keypoints = []
for idx, score in enumerate(pb2_obj.score): for idx, score in enumerate(pb2_obj.score):
categories.append( categories.append(
category_module.Category( category_module.Category(
score=score, score=score,
index=pb2_obj.label_id[idx] index=pb2_obj.label_id[idx]
if idx < len(pb2_obj.label_id) else None, if idx < len(pb2_obj.label_id)
else None,
category_name=pb2_obj.label[idx] category_name=pb2_obj.label[idx]
if idx < len(pb2_obj.label) else None, if idx < len(pb2_obj.label)
else None,
display_name=pb2_obj.display_name[idx] display_name=pb2_obj.display_name[idx]
if idx < len(pb2_obj.display_name) else None)) if idx < len(pb2_obj.display_name)
else None,
)
)
if pb2_obj.location_data.relative_keypoints:
for idx, elem in enumerate(pb2_obj.location_data.relative_keypoints):
keypoints.append(
keypoint_module.NormalizedKeypoint(
x=elem.x,
y=elem.y,
label=elem.keypoint_label,
score=elem.score,
)
)
return Detection( return Detection(
bounding_box=bounding_box_module.BoundingBox.create_from_pb2( bounding_box=bounding_box_module.BoundingBox.create_from_pb2(
pb2_obj.location_data.bounding_box), pb2_obj.location_data.bounding_box
categories=categories) ),
categories=categories,
keypoints=keypoints,
)
def __eq__(self, other: Any) -> bool: def __eq__(self, other: Any) -> bool:
"""Checks if this object is equal to the given object. """Checks if this object is equal to the given object.

View File

@ -0,0 +1,77 @@
# Copyright 2023 The MediaPipe Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Keypoint data class."""
import dataclasses
from typing import Any, Optional
from mediapipe.framework.formats import location_data_pb2
from mediapipe.tasks.python.core.optional_dependencies import doc_controls
_RelativeKeypointProto = location_data_pb2.LocationData.RelativeKeypoint
@dataclasses.dataclass
class NormalizedKeypoint:
"""A normalized keypoint.
Normalized keypoint represents a point in 2D space with x, y coordinates.
x and y are normalized to [0.0, 1.0] by the image width and height
respectively.
Attributes:
x: The x coordinates of the normalized keypoint.
y: The y coordinates of the normalized keypoint.
label: The optional label of the keypoint.
score: The score of the keypoint.
"""
x: Optional[float] = None
y: Optional[float] = None
label: Optional[str] = None
score: Optional[float] = None
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _RelativeKeypointProto:
"""Generates a RelativeKeypoint protobuf object."""
return _RelativeKeypointProto(
x=self.x, y=self.y, keypoint_label=self.label, score=self.score
)
@classmethod
@doc_controls.do_not_generate_docs
def create_from_pb2(
cls, pb2_obj: _RelativeKeypointProto
) -> 'NormalizedKeypoint':
"""Creates a `NormalizedKeypoint` object from the given protobuf object."""
return NormalizedKeypoint(
x=pb2_obj.x,
y=pb2_obj.y,
label=pb2_obj.keypoint_label,
score=pb2_obj.score,
)
def __eq__(self, other: Any) -> bool:
"""Checks if this object is equal to the given object.
Args:
other: The object to be compared with.
Returns:
True if the objects are equal.
"""
if not isinstance(other, NormalizedKeypoint):
return False
return self.to_pb2().__eq__(other.to_pb2())

View File

@ -92,6 +92,29 @@ py_test(
], ],
) )
py_test(
name = "face_detector_test",
srcs = ["face_detector_test.py"],
data = [
"//mediapipe/tasks/testdata/vision:test_images",
"//mediapipe/tasks/testdata/vision:test_models",
"//mediapipe/tasks/testdata/vision:test_protos",
],
deps = [
"//mediapipe/framework/formats:detection_py_pb2",
"//mediapipe/python:_framework_bindings",
"//mediapipe/tasks/python/components/containers:bounding_box",
"//mediapipe/tasks/python/components/containers:category",
"//mediapipe/tasks/python/components/containers:detections",
"//mediapipe/tasks/python/core:base_options",
"//mediapipe/tasks/python/test:test_utils",
"//mediapipe/tasks/python/vision:face_detector",
"//mediapipe/tasks/python/vision/core:image_processing_options",
"//mediapipe/tasks/python/vision/core:vision_task_running_mode",
"@com_google_protobuf//:protobuf_python",
],
)
py_test( py_test(
name = "hand_landmarker_test", name = "hand_landmarker_test",
srcs = ["hand_landmarker_test.py"], srcs = ["hand_landmarker_test.py"],

View File

@ -0,0 +1,523 @@
# Copyright 2023 The MediaPipe Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for face detector."""
import enum
import os
from unittest import mock
from absl.testing import absltest
from absl.testing import parameterized
from google.protobuf import text_format
from mediapipe.framework.formats import detection_pb2
from mediapipe.python._framework_bindings import image as image_module
from mediapipe.tasks.python.components.containers import bounding_box as bounding_box_module
from mediapipe.tasks.python.components.containers import category as category_module
from mediapipe.tasks.python.components.containers import detections as detections_module
from mediapipe.tasks.python.core import base_options as base_options_module
from mediapipe.tasks.python.test import test_utils
from mediapipe.tasks.python.vision import face_detector
from mediapipe.tasks.python.vision.core import image_processing_options as image_processing_options_module
from mediapipe.tasks.python.vision.core import vision_task_running_mode as running_mode_module
FaceDetectorResult = detections_module.DetectionResult
_BaseOptions = base_options_module.BaseOptions
_Category = category_module.Category
_BoundingBox = bounding_box_module.BoundingBox
_Detection = detections_module.Detection
_Image = image_module.Image
_FaceDetector = face_detector.FaceDetector
_FaceDetectorOptions = face_detector.FaceDetectorOptions
_RUNNING_MODE = running_mode_module.VisionTaskRunningMode
_ImageProcessingOptions = image_processing_options_module.ImageProcessingOptions
_SHORT_RANGE_BLAZE_FACE_MODEL = 'face_detection_short_range.tflite'
_PORTRAIT_IMAGE = 'portrait.jpg'
_PORTRAIT_EXPECTED_DETECTION = 'portrait_expected_detection.pbtxt'
_PORTRAIT_ROTATED_IMAGE = 'portrait_rotated.jpg'
_PORTRAIT_ROTATED_EXPECTED_DETECTION = (
'portrait_rotated_expected_detection.pbtxt'
)
_CAT_IMAGE = 'cat.jpg'
_KEYPOINT_ERROR_THRESHOLD = 1e-2
_TEST_DATA_DIR = 'mediapipe/tasks/testdata/vision'
def _get_expected_face_detector_result(file_name: str) -> FaceDetectorResult:
face_detection_result_file_path = test_utils.get_test_data_path(
os.path.join(_TEST_DATA_DIR, file_name)
)
with open(face_detection_result_file_path, 'rb') as f:
face_detection_proto = detection_pb2.Detection()
text_format.Parse(f.read(), face_detection_proto)
face_detection = detections_module.Detection.create_from_pb2(
face_detection_proto
)
return FaceDetectorResult(detections=[face_detection])
class ModelFileType(enum.Enum):
FILE_CONTENT = 1
FILE_NAME = 2
class FaceDetectorTest(parameterized.TestCase):
def setUp(self):
super().setUp()
self.test_image = _Image.create_from_file(
test_utils.get_test_data_path(
os.path.join(_TEST_DATA_DIR, _PORTRAIT_IMAGE)
)
)
self.model_path = test_utils.get_test_data_path(
os.path.join(_TEST_DATA_DIR, _SHORT_RANGE_BLAZE_FACE_MODEL)
)
def test_create_from_file_succeeds_with_valid_model_path(self):
# Creates with default option and valid model file successfully.
with _FaceDetector.create_from_model_path(self.model_path) as detector:
self.assertIsInstance(detector, _FaceDetector)
def test_create_from_options_succeeds_with_valid_model_path(self):
# Creates with options containing model file successfully.
base_options = _BaseOptions(model_asset_path=self.model_path)
options = _FaceDetectorOptions(base_options=base_options)
with _FaceDetector.create_from_options(options) as detector:
self.assertIsInstance(detector, _FaceDetector)
def test_create_from_options_fails_with_invalid_model_path(self):
with self.assertRaisesRegex(
RuntimeError, 'Unable to open file at /path/to/invalid/model.tflite'
):
base_options = _BaseOptions(
model_asset_path='/path/to/invalid/model.tflite'
)
options = _FaceDetectorOptions(base_options=base_options)
_FaceDetector.create_from_options(options)
def test_create_from_options_succeeds_with_valid_model_content(self):
# Creates with options containing model content successfully.
with open(self.model_path, 'rb') as f:
base_options = _BaseOptions(model_asset_buffer=f.read())
options = _FaceDetectorOptions(base_options=base_options)
detector = _FaceDetector.create_from_options(options)
self.assertIsInstance(detector, _FaceDetector)
def _expect_keypoints_correct(self, actual_keypoints, expected_keypoints):
self.assertLen(actual_keypoints, len(expected_keypoints))
for i in range(len(actual_keypoints)):
self.assertAlmostEqual(
actual_keypoints[i].x,
expected_keypoints[i].x,
delta=_KEYPOINT_ERROR_THRESHOLD,
)
self.assertAlmostEqual(
actual_keypoints[i].y,
expected_keypoints[i].y,
delta=_KEYPOINT_ERROR_THRESHOLD,
)
def _expect_face_detector_results_correct(
self, actual_results, expected_results
):
self.assertLen(actual_results.detections, len(expected_results.detections))
for i in range(len(actual_results.detections)):
actual_bbox = actual_results.detections[i].bounding_box
expected_bbox = expected_results.detections[i].bounding_box
self.assertEqual(actual_bbox, expected_bbox)
self.assertNotEmpty(actual_results.detections[i].keypoints)
self._expect_keypoints_correct(
actual_results.detections[i].keypoints,
expected_results.detections[i].keypoints,
)
@parameterized.parameters(
(ModelFileType.FILE_NAME, _PORTRAIT_EXPECTED_DETECTION),
(ModelFileType.FILE_CONTENT, _PORTRAIT_EXPECTED_DETECTION),
)
def test_detect(self, model_file_type, expected_detection_result_file):
# Creates detector.
if model_file_type is ModelFileType.FILE_NAME:
base_options = _BaseOptions(model_asset_path=self.model_path)
elif model_file_type is ModelFileType.FILE_CONTENT:
with open(self.model_path, 'rb') as f:
model_content = f.read()
base_options = _BaseOptions(model_asset_buffer=model_content)
else:
# Should never happen
raise ValueError('model_file_type is invalid.')
options = _FaceDetectorOptions(base_options=base_options)
detector = _FaceDetector.create_from_options(options)
# Performs face detection on the input.
detection_result = detector.detect(self.test_image)
# Comparing results.
expected_detection_result = _get_expected_face_detector_result(
expected_detection_result_file
)
self._expect_face_detector_results_correct(
detection_result, expected_detection_result
)
# Closes the detector explicitly when the detector is not used in
# a context.
detector.close()
@parameterized.parameters(
(ModelFileType.FILE_NAME, _PORTRAIT_EXPECTED_DETECTION),
(ModelFileType.FILE_CONTENT, _PORTRAIT_EXPECTED_DETECTION),
)
def test_detect_in_context(
self, model_file_type, expected_detection_result_file
):
# Creates detector.
if model_file_type is ModelFileType.FILE_NAME:
base_options = _BaseOptions(model_asset_path=self.model_path)
elif model_file_type is ModelFileType.FILE_CONTENT:
with open(self.model_path, 'rb') as f:
model_content = f.read()
base_options = _BaseOptions(model_asset_buffer=model_content)
else:
# Should never happen
raise ValueError('model_file_type is invalid.')
options = _FaceDetectorOptions(base_options=base_options)
with _FaceDetector.create_from_options(options) as detector:
# Performs face detection on the input.
detection_result = detector.detect(self.test_image)
# Comparing results.
expected_detection_result = _get_expected_face_detector_result(
expected_detection_result_file
)
self._expect_face_detector_results_correct(
detection_result, expected_detection_result
)
def test_detect_succeeds_with_rotated_image(self):
base_options = _BaseOptions(model_asset_path=self.model_path)
options = _FaceDetectorOptions(base_options=base_options)
with _FaceDetector.create_from_options(options) as detector:
# Load the test image.
test_image = _Image.create_from_file(
test_utils.get_test_data_path(
os.path.join(_TEST_DATA_DIR, _PORTRAIT_ROTATED_IMAGE)
)
)
# Rotated input image.
image_processing_options = _ImageProcessingOptions(rotation_degrees=-90)
# Performs face detection on the input.
detection_result = detector.detect(test_image, image_processing_options)
# Comparing results.
expected_detection_result = _get_expected_face_detector_result(
_PORTRAIT_ROTATED_EXPECTED_DETECTION
)
self._expect_face_detector_results_correct(
detection_result, expected_detection_result
)
def test_empty_detection_outputs(self):
# Load a test image with no faces.
test_image = _Image.create_from_file(
test_utils.get_test_data_path(os.path.join(_TEST_DATA_DIR, _CAT_IMAGE))
)
options = _FaceDetectorOptions(
base_options=_BaseOptions(model_asset_path=self.model_path)
)
with _FaceDetector.create_from_options(options) as detector:
# Performs face detection on the input.
detection_result = detector.detect(test_image)
self.assertEmpty(detection_result.detections)
def test_missing_result_callback(self):
options = _FaceDetectorOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.LIVE_STREAM,
)
with self.assertRaisesRegex(
ValueError, r'result callback must be provided'
):
with _FaceDetector.create_from_options(options) as unused_detector:
pass
@parameterized.parameters((_RUNNING_MODE.IMAGE), (_RUNNING_MODE.VIDEO))
def test_illegal_result_callback(self, running_mode):
options = _FaceDetectorOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=running_mode,
result_callback=mock.MagicMock(),
)
with self.assertRaisesRegex(
ValueError, r'result callback should not be provided'
):
with _FaceDetector.create_from_options(options) as unused_detector:
pass
def test_calling_detect_for_video_in_image_mode(self):
options = _FaceDetectorOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.IMAGE,
)
with _FaceDetector.create_from_options(options) as detector:
with self.assertRaisesRegex(
ValueError, r'not initialized with the video mode'
):
detector.detect_for_video(self.test_image, 0)
def test_calling_detect_async_in_image_mode(self):
options = _FaceDetectorOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.IMAGE,
)
with _FaceDetector.create_from_options(options) as detector:
with self.assertRaisesRegex(
ValueError, r'not initialized with the live stream mode'
):
detector.detect_async(self.test_image, 0)
def test_calling_detect_in_video_mode(self):
options = _FaceDetectorOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.VIDEO,
)
with _FaceDetector.create_from_options(options) as detector:
with self.assertRaisesRegex(
ValueError, r'not initialized with the image mode'
):
detector.detect(self.test_image)
def test_calling_detect_async_in_video_mode(self):
options = _FaceDetectorOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.VIDEO,
)
with _FaceDetector.create_from_options(options) as detector:
with self.assertRaisesRegex(
ValueError, r'not initialized with the live stream mode'
):
detector.detect_async(self.test_image, 0)
def test_detect_for_video_with_out_of_order_timestamp(self):
options = _FaceDetectorOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.VIDEO,
)
with _FaceDetector.create_from_options(options) as detector:
unused_result = detector.detect_for_video(self.test_image, 1)
with self.assertRaisesRegex(
ValueError, r'Input timestamp must be monotonically increasing'
):
detector.detect_for_video(self.test_image, 0)
@parameterized.parameters(
(
ModelFileType.FILE_NAME,
_PORTRAIT_IMAGE,
0,
_get_expected_face_detector_result(_PORTRAIT_EXPECTED_DETECTION),
),
(
ModelFileType.FILE_CONTENT,
_PORTRAIT_IMAGE,
0,
_get_expected_face_detector_result(_PORTRAIT_EXPECTED_DETECTION),
),
(
ModelFileType.FILE_NAME,
_PORTRAIT_ROTATED_IMAGE,
-90,
_get_expected_face_detector_result(
_PORTRAIT_ROTATED_EXPECTED_DETECTION
),
),
(
ModelFileType.FILE_CONTENT,
_PORTRAIT_ROTATED_IMAGE,
-90,
_get_expected_face_detector_result(
_PORTRAIT_ROTATED_EXPECTED_DETECTION
),
),
(ModelFileType.FILE_NAME, _CAT_IMAGE, 0, FaceDetectorResult([])),
(ModelFileType.FILE_CONTENT, _CAT_IMAGE, 0, FaceDetectorResult([])),
)
def test_detect_for_video(
self,
model_file_type,
test_image_file_name,
rotation_degrees,
expected_detection_result,
):
# Creates detector.
if model_file_type is ModelFileType.FILE_NAME:
base_options = _BaseOptions(model_asset_path=self.model_path)
elif model_file_type is ModelFileType.FILE_CONTENT:
with open(self.model_path, 'rb') as f:
model_content = f.read()
base_options = _BaseOptions(model_asset_buffer=model_content)
else:
# Should never happen
raise ValueError('model_file_type is invalid.')
options = _FaceDetectorOptions(
base_options=base_options, running_mode=_RUNNING_MODE.VIDEO
)
with _FaceDetector.create_from_options(options) as detector:
for timestamp in range(0, 300, 30):
# Load the test image.
test_image = _Image.create_from_file(
test_utils.get_test_data_path(
os.path.join(_TEST_DATA_DIR, test_image_file_name)
)
)
# Set the image processing options.
image_processing_options = _ImageProcessingOptions(
rotation_degrees=rotation_degrees
)
# Performs face detection on the input.
detection_result = detector.detect_for_video(
test_image, timestamp, image_processing_options
)
# Comparing results.
self._expect_face_detector_results_correct(
detection_result, expected_detection_result
)
def test_calling_detect_in_live_stream_mode(self):
options = _FaceDetectorOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.LIVE_STREAM,
result_callback=mock.MagicMock(),
)
with _FaceDetector.create_from_options(options) as detector:
with self.assertRaisesRegex(
ValueError, r'not initialized with the image mode'
):
detector.detect(self.test_image)
def test_calling_detect_for_video_in_live_stream_mode(self):
options = _FaceDetectorOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.LIVE_STREAM,
result_callback=mock.MagicMock(),
)
with _FaceDetector.create_from_options(options) as detector:
with self.assertRaisesRegex(
ValueError, r'not initialized with the video mode'
):
detector.detect_for_video(self.test_image, 0)
def test_detect_async_calls_with_illegal_timestamp(self):
options = _FaceDetectorOptions(
base_options=_BaseOptions(model_asset_path=self.model_path),
running_mode=_RUNNING_MODE.LIVE_STREAM,
result_callback=mock.MagicMock(),
)
with _FaceDetector.create_from_options(options) as detector:
detector.detect_async(self.test_image, 100)
with self.assertRaisesRegex(
ValueError, r'Input timestamp must be monotonically increasing'
):
detector.detect_async(self.test_image, 0)
@parameterized.parameters(
(
ModelFileType.FILE_NAME,
_PORTRAIT_IMAGE,
0,
_get_expected_face_detector_result(_PORTRAIT_EXPECTED_DETECTION),
),
(
ModelFileType.FILE_CONTENT,
_PORTRAIT_IMAGE,
0,
_get_expected_face_detector_result(_PORTRAIT_EXPECTED_DETECTION),
),
(
ModelFileType.FILE_NAME,
_PORTRAIT_ROTATED_IMAGE,
-90,
_get_expected_face_detector_result(
_PORTRAIT_ROTATED_EXPECTED_DETECTION
),
),
(
ModelFileType.FILE_CONTENT,
_PORTRAIT_ROTATED_IMAGE,
-90,
_get_expected_face_detector_result(
_PORTRAIT_ROTATED_EXPECTED_DETECTION
),
),
(ModelFileType.FILE_NAME, _CAT_IMAGE, 0, FaceDetectorResult([])),
(ModelFileType.FILE_CONTENT, _CAT_IMAGE, 0, FaceDetectorResult([])),
)
def test_detect_async_calls(
self,
model_file_type,
test_image_file_name,
rotation_degrees,
expected_detection_result,
):
# Creates detector.
if model_file_type is ModelFileType.FILE_NAME:
base_options = _BaseOptions(model_asset_path=self.model_path)
elif model_file_type is ModelFileType.FILE_CONTENT:
with open(self.model_path, 'rb') as f:
model_content = f.read()
base_options = _BaseOptions(model_asset_buffer=model_content)
else:
# Should never happen
raise ValueError('model_file_type is invalid.')
observed_timestamp_ms = -1
def check_result(
result: FaceDetectorResult,
unused_output_image: _Image,
timestamp_ms: int,
):
self._expect_face_detector_results_correct(
result, expected_detection_result
)
self.assertLess(observed_timestamp_ms, timestamp_ms)
self.observed_timestamp_ms = timestamp_ms
options = _FaceDetectorOptions(
base_options=base_options,
running_mode=_RUNNING_MODE.LIVE_STREAM,
result_callback=check_result,
)
# Load the test image.
test_image = _Image.create_from_file(
test_utils.get_test_data_path(
os.path.join(_TEST_DATA_DIR, test_image_file_name)
)
)
with _FaceDetector.create_from_options(options) as detector:
for timestamp in range(0, 300, 30):
# Set the image processing options.
image_processing_options = _ImageProcessingOptions(
rotation_degrees=rotation_degrees
)
detector.detect_async(test_image, timestamp, image_processing_options)
if __name__ == '__main__':
absltest.main()

View File

@ -152,3 +152,23 @@ py_library(
"//mediapipe/tasks/python/vision/core:vision_task_running_mode", "//mediapipe/tasks/python/vision/core:vision_task_running_mode",
], ],
) )
py_library(
name = "face_detector",
srcs = [
"face_detector.py",
],
deps = [
"//mediapipe/python:_framework_bindings",
"//mediapipe/python:packet_creator",
"//mediapipe/python:packet_getter",
"//mediapipe/tasks/cc/vision/face_detector/proto:face_detector_graph_options_py_pb2",
"//mediapipe/tasks/python/components/containers:detections",
"//mediapipe/tasks/python/core:base_options",
"//mediapipe/tasks/python/core:optional_dependencies",
"//mediapipe/tasks/python/core:task_info",
"//mediapipe/tasks/python/vision/core:base_vision_task_api",
"//mediapipe/tasks/python/vision/core:image_processing_options",
"//mediapipe/tasks/python/vision/core:vision_task_running_mode",
],
)

View File

@ -0,0 +1,332 @@
# Copyright 2023 The MediaPipe Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""MediaPipe face detector task."""
import dataclasses
from typing import Callable, Mapping, Optional
from mediapipe.python import packet_creator
from mediapipe.python import packet_getter
from mediapipe.python._framework_bindings import image as image_module
from mediapipe.python._framework_bindings import packet as packet_module
from mediapipe.tasks.cc.vision.face_detector.proto import face_detector_graph_options_pb2
from mediapipe.tasks.python.components.containers import detections as detections_module
from mediapipe.tasks.python.core import base_options as base_options_module
from mediapipe.tasks.python.core import task_info as task_info_module
from mediapipe.tasks.python.core.optional_dependencies import doc_controls
from mediapipe.tasks.python.vision.core import base_vision_task_api
from mediapipe.tasks.python.vision.core import image_processing_options as image_processing_options_module
from mediapipe.tasks.python.vision.core import vision_task_running_mode as running_mode_module
FaceDetectorResult = detections_module.DetectionResult
_BaseOptions = base_options_module.BaseOptions
_FaceDetectorGraphOptionsProto = (
face_detector_graph_options_pb2.FaceDetectorGraphOptions
)
_RunningMode = running_mode_module.VisionTaskRunningMode
_ImageProcessingOptions = image_processing_options_module.ImageProcessingOptions
_TaskInfo = task_info_module.TaskInfo
_DETECTIONS_OUT_STREAM_NAME = 'detections'
_DETECTIONS_TAG = 'DETECTIONS'
_NORM_RECT_STREAM_NAME = 'norm_rect_in'
_NORM_RECT_TAG = 'NORM_RECT'
_IMAGE_IN_STREAM_NAME = 'image_in'
_IMAGE_OUT_STREAM_NAME = 'image_out'
_IMAGE_TAG = 'IMAGE'
_TASK_GRAPH_NAME = 'mediapipe.tasks.vision.face_detector.FaceDetectorGraph'
_MICRO_SECONDS_PER_MILLISECOND = 1000
@dataclasses.dataclass
class FaceDetectorOptions:
"""Options for the face detector task.
Attributes:
base_options: Base options for the face detector task.
running_mode: The running mode of the task. Default to the image mode. Face
detector task has three running modes: 1) The image mode for detecting
faces on single image inputs. 2) The video mode for detecting faces on the
decoded frames of a video. 3) The live stream mode for detecting faces on
a live stream of input data, such as from camera.
min_detection_confidence: The minimum confidence score for the face
detection to be considered successful.
min_suppression_threshold: The minimum non-maximum-suppression threshold for
face detection to be considered overlapped.
result_callback: The user-defined result callback for processing live stream
data. The result callback should only be specified when the running mode
is set to the live stream mode.
"""
base_options: _BaseOptions
running_mode: _RunningMode = _RunningMode.IMAGE
min_detection_confidence: Optional[float] = None
min_suppression_threshold: Optional[float] = None
result_callback: Optional[
Callable[
[detections_module.DetectionResult, image_module.Image, int], None
]
] = None
@doc_controls.do_not_generate_docs
def to_pb2(self) -> _FaceDetectorGraphOptionsProto:
"""Generates an FaceDetectorOptions protobuf object."""
base_options_proto = self.base_options.to_pb2()
base_options_proto.use_stream_mode = (
False if self.running_mode == _RunningMode.IMAGE else True
)
return _FaceDetectorGraphOptionsProto(
base_options=base_options_proto,
min_detection_confidence=self.min_detection_confidence,
min_suppression_threshold=self.min_suppression_threshold,
)
class FaceDetector(base_vision_task_api.BaseVisionTaskApi):
"""Class that performs face detection on images."""
@classmethod
def create_from_model_path(cls, model_path: str) -> 'FaceDetector':
"""Creates an `FaceDetector` object from a TensorFlow Lite model and the default `FaceDetectorOptions`.
Note that the created `FaceDetector` instance is in image mode, for
detecting faces on single image inputs.
Args:
model_path: Path to the model.
Returns:
`FaceDetector` object that's created from the model file and the default
`FaceDetectorOptions`.
Raises:
ValueError: If failed to create `FaceDetector` object from the provided
file such as invalid file path.
RuntimeError: If other types of error occurred.
"""
base_options = _BaseOptions(model_asset_path=model_path)
options = FaceDetectorOptions(
base_options=base_options, running_mode=_RunningMode.IMAGE
)
return cls.create_from_options(options)
@classmethod
def create_from_options(cls, options: FaceDetectorOptions) -> 'FaceDetector':
"""Creates the `FaceDetector` object from face detector options.
Args:
options: Options for the face detector task.
Returns:
`FaceDetector` object that's created from `options`.
Raises:
ValueError: If failed to create `FaceDetector` object from
`FaceDetectorOptions` such as missing the model.
RuntimeError: If other types of error occurred.
"""
def packets_callback(output_packets: Mapping[str, packet_module.Packet]):
if output_packets[_IMAGE_OUT_STREAM_NAME].is_empty():
return
image = packet_getter.get_image(output_packets[_IMAGE_OUT_STREAM_NAME])
if output_packets[_DETECTIONS_OUT_STREAM_NAME].is_empty():
empty_packet = output_packets[_DETECTIONS_OUT_STREAM_NAME]
options.result_callback(
FaceDetectorResult([]),
image,
empty_packet.timestamp.value // _MICRO_SECONDS_PER_MILLISECOND,
)
return
detection_proto_list = packet_getter.get_proto_list(
output_packets[_DETECTIONS_OUT_STREAM_NAME]
)
detection_result = detections_module.DetectionResult(
[
detections_module.Detection.create_from_pb2(result)
for result in detection_proto_list
]
)
timestamp = output_packets[_IMAGE_OUT_STREAM_NAME].timestamp
options.result_callback(
detection_result,
image,
timestamp.value // _MICRO_SECONDS_PER_MILLISECOND,
)
task_info = _TaskInfo(
task_graph=_TASK_GRAPH_NAME,
input_streams=[
':'.join([_IMAGE_TAG, _IMAGE_IN_STREAM_NAME]),
':'.join([_NORM_RECT_TAG, _NORM_RECT_STREAM_NAME]),
],
output_streams=[
':'.join([_DETECTIONS_TAG, _DETECTIONS_OUT_STREAM_NAME]),
':'.join([_IMAGE_TAG, _IMAGE_OUT_STREAM_NAME]),
],
task_options=options,
)
return cls(
task_info.generate_graph_config(
enable_flow_limiting=options.running_mode
== _RunningMode.LIVE_STREAM
),
options.running_mode,
packets_callback if options.result_callback else None,
)
def detect(
self,
image: image_module.Image,
image_processing_options: Optional[_ImageProcessingOptions] = None,
) -> FaceDetectorResult:
"""Performs face detection on the provided MediaPipe Image.
Only use this method when the FaceDetector is created with the image
running mode.
Args:
image: MediaPipe Image.
image_processing_options: Options for image processing.
Returns:
A face detection result object that contains a list of face detections,
each detection has a bounding box that is expressed in the unrotated input
frame of reference coordinates system, i.e. in `[0,image_width) x [0,
image_height)`, which are the dimensions of the underlying image data.
Raises:
ValueError: If any of the input arguments is invalid.
RuntimeError: If face detection failed to run.
"""
normalized_rect = self.convert_to_normalized_rect(
image_processing_options, roi_allowed=False
)
output_packets = self._process_image_data({
_IMAGE_IN_STREAM_NAME: packet_creator.create_image(image),
_NORM_RECT_STREAM_NAME: packet_creator.create_proto(
normalized_rect.to_pb2()
),
})
if output_packets[_DETECTIONS_OUT_STREAM_NAME].is_empty():
return FaceDetectorResult([])
detection_proto_list = packet_getter.get_proto_list(
output_packets[_DETECTIONS_OUT_STREAM_NAME]
)
return detections_module.DetectionResult(
[
detections_module.Detection.create_from_pb2(result)
for result in detection_proto_list
]
)
def detect_for_video(
self,
image: image_module.Image,
timestamp_ms: int,
image_processing_options: Optional[_ImageProcessingOptions] = None,
) -> detections_module.DetectionResult:
"""Performs face detection on the provided video frames.
Only use this method when the FaceDetector is created with the video
running mode. It's required to provide the video frame's timestamp (in
milliseconds) along with the video frame. The input timestamps should be
monotonically increasing for adjacent calls of this method.
Args:
image: MediaPipe Image.
timestamp_ms: The timestamp of the input video frame in milliseconds.
image_processing_options: Options for image processing.
Returns:
A face detection result object that contains a list of face detections,
each detection has a bounding box that is expressed in the unrotated input
frame of reference coordinates system, i.e. in `[0,image_width) x [0,
image_height)`, which are the dimensions of the underlying image data.
Raises:
ValueError: If any of the input arguments is invalid.
RuntimeError: If face detection failed to run.
"""
normalized_rect = self.convert_to_normalized_rect(
image_processing_options, roi_allowed=False
)
output_packets = self._process_video_data({
_IMAGE_IN_STREAM_NAME: packet_creator.create_image(image).at(
timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND
),
_NORM_RECT_STREAM_NAME: packet_creator.create_proto(
normalized_rect.to_pb2()
).at(timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND),
})
if output_packets[_DETECTIONS_OUT_STREAM_NAME].is_empty():
return FaceDetectorResult([])
detection_proto_list = packet_getter.get_proto_list(
output_packets[_DETECTIONS_OUT_STREAM_NAME]
)
return detections_module.DetectionResult(
[
detections_module.Detection.create_from_pb2(result)
for result in detection_proto_list
]
)
def detect_async(
self,
image: image_module.Image,
timestamp_ms: int,
image_processing_options: Optional[_ImageProcessingOptions] = None,
) -> None:
"""Sends live image data (an Image with a unique timestamp) to perform face detection.
Only use this method when the FaceDetector is created with the live stream
running mode. The input timestamps should be monotonically increasing for
adjacent calls of this method. This method will return immediately after the
input image is accepted. The results will be available via the
`result_callback` provided in the `FaceDetectorOptions`. The
`detect_async` method is designed to process live stream data such as camera
input. To lower the overall latency, face detector may drop the input
images if needed. In other words, it's not guaranteed to have output per
input image.
The `result_callback` provides:
- A face detection result object that contains a list of face detections,
each detection has a bounding box that is expressed in the unrotated
input frame of reference coordinates system,
i.e. in `[0,image_width) x [0,image_height)`, which are the dimensions
of the underlying image data.
- The input image that the face detector runs on.
- The input timestamp in milliseconds.
Args:
image: MediaPipe Image.
timestamp_ms: The timestamp of the input image in milliseconds.
image_processing_options: Options for image processing.
Raises:
ValueError: If the current input timestamp is smaller than what the face
detector has already processed.
"""
normalized_rect = self.convert_to_normalized_rect(
image_processing_options, roi_allowed=False
)
self._send_live_stream_data({
_IMAGE_IN_STREAM_NAME: packet_creator.create_image(image).at(
timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND
),
_NORM_RECT_STREAM_NAME: packet_creator.create_proto(
normalized_rect.to_pb2()
).at(timestamp_ms * _MICRO_SECONDS_PER_MILLISECOND),
})