diff --git a/mediapipe/tasks/python/test/vision/BUILD b/mediapipe/tasks/python/test/vision/BUILD index e55e1b572..d555402b8 100644 --- a/mediapipe/tasks/python/test/vision/BUILD +++ b/mediapipe/tasks/python/test/vision/BUILD @@ -185,3 +185,20 @@ py_test( "@com_google_protobuf//:protobuf_python", ], ) + +py_test( + name = "face_aligner_test", + srcs = ["face_aligner_test.py"], + data = [ + "//mediapipe/tasks/testdata/vision:test_images", + "//mediapipe/tasks/testdata/vision:test_models", + ], + deps = [ + "//mediapipe/python:_framework_bindings", + "//mediapipe/tasks/python/components/containers:rect", + "//mediapipe/tasks/python/core:base_options", + "//mediapipe/tasks/python/test:test_utils", + "//mediapipe/tasks/python/vision:face_aligner", + "//mediapipe/tasks/python/vision/core:image_processing_options", + ], +) diff --git a/mediapipe/tasks/python/test/vision/face_aligner_test.py b/mediapipe/tasks/python/test/vision/face_aligner_test.py new file mode 100644 index 000000000..324bd0359 --- /dev/null +++ b/mediapipe/tasks/python/test/vision/face_aligner_test.py @@ -0,0 +1,190 @@ +# Copyright 2023 The MediaPipe Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for face aligner.""" + +import enum +import os + +from absl.testing import absltest +from absl.testing import parameterized + +from mediapipe.python._framework_bindings import image as image_module +from mediapipe.tasks.python.components.containers import rect +from mediapipe.tasks.python.core import base_options as base_options_module +from mediapipe.tasks.python.test import test_utils +from mediapipe.tasks.python.vision import face_aligner +from mediapipe.tasks.python.vision.core import image_processing_options as image_processing_options_module + +_BaseOptions = base_options_module.BaseOptions +_Rect = rect.Rect +_Image = image_module.Image +_FaceAligner = face_aligner.FaceAligner +_FaceAlignerOptions = face_aligner.FaceAlignerOptions +_ImageProcessingOptions = image_processing_options_module.ImageProcessingOptions + +_MODEL = 'face_landmarker_v2.task' +_LARGE_FACE_IMAGE = 'portrait.jpg' +_MODEL_IMAGE_SIZE = 256 +_TEST_DATA_DIR = 'mediapipe/tasks/testdata/vision' + + +class ModelFileType(enum.Enum): + FILE_CONTENT = 1 + FILE_NAME = 2 + + +class FaceAlignerTest(parameterized.TestCase): + + def setUp(self): + super().setUp() + self.test_image = _Image.create_from_file( + test_utils.get_test_data_path( + os.path.join(_TEST_DATA_DIR, _LARGE_FACE_IMAGE) + ) + ) + self.model_path = test_utils.get_test_data_path( + os.path.join(_TEST_DATA_DIR, _MODEL) + ) + + def test_create_from_file_succeeds_with_valid_model_path(self): + # Creates with default option and valid model file successfully. + with _FaceAligner.create_from_model_path(self.model_path) as aligner: + self.assertIsInstance(aligner, _FaceAligner) + + def test_create_from_options_succeeds_with_valid_model_path(self): + # Creates with options containing model file successfully. + base_options = _BaseOptions(model_asset_path=self.model_path) + options = _FaceAlignerOptions(base_options=base_options) + with _FaceAligner.create_from_options(options) as aligner: + self.assertIsInstance(aligner, _FaceAligner) + + def test_create_from_options_fails_with_invalid_model_path(self): + with self.assertRaisesRegex( + RuntimeError, 'Unable to open file at /path/to/invalid/model.tflite' + ): + base_options = _BaseOptions( + model_asset_path='/path/to/invalid/model.tflite' + ) + options = _FaceAlignerOptions(base_options=base_options) + _FaceAligner.create_from_options(options) + + def test_create_from_options_succeeds_with_valid_model_content(self): + # Creates with options containing model content successfully. + with open(self.model_path, 'rb') as f: + base_options = _BaseOptions(model_asset_buffer=f.read()) + options = _FaceAlignerOptions(base_options=base_options) + aligner = _FaceAligner.create_from_options(options) + self.assertIsInstance(aligner, _FaceAligner) + + @parameterized.parameters( + (ModelFileType.FILE_NAME, _LARGE_FACE_IMAGE), + (ModelFileType.FILE_CONTENT, _LARGE_FACE_IMAGE), + ) + def test_align(self, model_file_type, image_file_name): + # Load the test image. + self.test_image = _Image.create_from_file( + test_utils.get_test_data_path( + os.path.join(_TEST_DATA_DIR, image_file_name) + ) + ) + # Creates aligner. + if model_file_type is ModelFileType.FILE_NAME: + base_options = _BaseOptions(model_asset_path=self.model_path) + elif model_file_type is ModelFileType.FILE_CONTENT: + with open(self.model_path, 'rb') as f: + model_content = f.read() + base_options = _BaseOptions(model_asset_buffer=model_content) + else: + # Should never happen + raise ValueError('model_file_type is invalid.') + + options = _FaceAlignerOptions(base_options=base_options) + aligner = _FaceAligner.create_from_options(options) + + # Performs face alignment on the input. + alignd_image = aligner.align(self.test_image) + self.assertIsInstance(alignd_image, _Image) + # Closes the aligner explicitly when the aligner is not used in + # a context. + aligner.close() + + @parameterized.parameters( + (ModelFileType.FILE_NAME, _LARGE_FACE_IMAGE), + (ModelFileType.FILE_CONTENT, _LARGE_FACE_IMAGE), + ) + def test_align_in_context(self, model_file_type, image_file_name): + # Load the test image. + self.test_image = _Image.create_from_file( + test_utils.get_test_data_path( + os.path.join(_TEST_DATA_DIR, image_file_name) + ) + ) + # Creates aligner. + if model_file_type is ModelFileType.FILE_NAME: + base_options = _BaseOptions(model_asset_path=self.model_path) + elif model_file_type is ModelFileType.FILE_CONTENT: + with open(self.model_path, 'rb') as f: + model_content = f.read() + base_options = _BaseOptions(model_asset_buffer=model_content) + else: + # Should never happen + raise ValueError('model_file_type is invalid.') + + options = _FaceAlignerOptions(base_options=base_options) + with _FaceAligner.create_from_options(options) as aligner: + # Performs face alignment on the input. + alignd_image = aligner.align(self.test_image) + self.assertIsInstance(alignd_image, _Image) + self.assertEqual(alignd_image.width, _MODEL_IMAGE_SIZE) + self.assertEqual(alignd_image.height, _MODEL_IMAGE_SIZE) + + def test_align_succeeds_with_region_of_interest(self): + base_options = _BaseOptions(model_asset_path=self.model_path) + options = _FaceAlignerOptions(base_options=base_options) + with _FaceAligner.create_from_options(options) as aligner: + # Load the test image. + test_image = _Image.create_from_file( + test_utils.get_test_data_path( + os.path.join(_TEST_DATA_DIR, _LARGE_FACE_IMAGE) + ) + ) + # Region-of-interest around the face. + roi = _Rect(left=0.32, top=0.02, right=0.67, bottom=0.32) + image_processing_options = _ImageProcessingOptions(roi) + # Performs face alignment on the input. + alignd_image = aligner.align(test_image, image_processing_options) + self.assertIsInstance(alignd_image, _Image) + self.assertEqual(alignd_image.width, _MODEL_IMAGE_SIZE) + self.assertEqual(alignd_image.height, _MODEL_IMAGE_SIZE) + + def test_align_succeeds_with_no_face_detected(self): + base_options = _BaseOptions(model_asset_path=self.model_path) + options = _FaceAlignerOptions(base_options=base_options) + with _FaceAligner.create_from_options(options) as aligner: + # Load the test image. + test_image = _Image.create_from_file( + test_utils.get_test_data_path( + os.path.join(_TEST_DATA_DIR, _LARGE_FACE_IMAGE) + ) + ) + # Region-of-interest that doesn't contain a human face. + roi = _Rect(left=0.1, top=0.1, right=0.2, bottom=0.2) + image_processing_options = _ImageProcessingOptions(roi) + # Performs face alignment on the input. + alignd_image = aligner.align(test_image, image_processing_options) + self.assertIsNone(alignd_image) + + +if __name__ == '__main__': + absltest.main() diff --git a/mediapipe/tasks/python/vision/BUILD b/mediapipe/tasks/python/vision/BUILD index d0c97434f..dcd28dcf5 100644 --- a/mediapipe/tasks/python/vision/BUILD +++ b/mediapipe/tasks/python/vision/BUILD @@ -264,3 +264,22 @@ py_library( "//mediapipe/tasks/python/vision/core:vision_task_running_mode", ], ) + +py_library( + name = "face_aligner", + srcs = [ + "face_aligner.py", + ], + deps = [ + "//mediapipe/python:_framework_bindings", + "//mediapipe/python:packet_creator", + "//mediapipe/python:packet_getter", + "//mediapipe/tasks/cc/vision/face_stylizer/proto:face_stylizer_graph_options_py_pb2", + "//mediapipe/tasks/python/core:base_options", + "//mediapipe/tasks/python/core:optional_dependencies", + "//mediapipe/tasks/python/core:task_info", + "//mediapipe/tasks/python/vision/core:base_vision_task_api", + "//mediapipe/tasks/python/vision/core:image_processing_options", + "//mediapipe/tasks/python/vision/core:vision_task_running_mode", + ], +) diff --git a/mediapipe/tasks/python/vision/face_aligner.py b/mediapipe/tasks/python/vision/face_aligner.py new file mode 100644 index 000000000..53bf71f35 --- /dev/null +++ b/mediapipe/tasks/python/vision/face_aligner.py @@ -0,0 +1,158 @@ +# Copyright 2023 The MediaPipe Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""MediaPipe face aligner task.""" + +import dataclasses +from typing import Optional + +from mediapipe.python import packet_creator +from mediapipe.python import packet_getter +from mediapipe.python._framework_bindings import image as image_module +from mediapipe.tasks.cc.vision.face_stylizer.proto import face_stylizer_graph_options_pb2 +from mediapipe.tasks.python.core import base_options as base_options_module +from mediapipe.tasks.python.core import task_info as task_info_module +from mediapipe.tasks.python.core.optional_dependencies import doc_controls +from mediapipe.tasks.python.vision.core import base_vision_task_api +from mediapipe.tasks.python.vision.core import image_processing_options as image_processing_options_module +from mediapipe.tasks.python.vision.core import vision_task_running_mode as running_mode_module + +_BaseOptions = base_options_module.BaseOptions +_FaceStylizerGraphOptionsProto = ( + face_stylizer_graph_options_pb2.FaceStylizerGraphOptions +) +_RunningMode = running_mode_module.VisionTaskRunningMode +_ImageProcessingOptions = image_processing_options_module.ImageProcessingOptions +_TaskInfo = task_info_module.TaskInfo + +_FACE_ALIGNMENT_IMAGE_NAME = 'face_alignment' +_FACE_ALIGNMENT_IMAGE_TAG = 'FACE_ALIGNMENT' +_NORM_RECT_STREAM_NAME = 'norm_rect_in' +_NORM_RECT_TAG = 'NORM_RECT' +_IMAGE_IN_STREAM_NAME = 'image_in' +_IMAGE_OUT_STREAM_NAME = 'image_out' +_IMAGE_TAG = 'IMAGE' +_TASK_GRAPH_NAME = 'mediapipe.tasks.vision.face_stylizer.FaceStylizerGraph' + + +@dataclasses.dataclass +class FaceAlignerOptions: + """Options for the face aligner task. + + Attributes: + base_options: Base options for the face aligner task. + """ + + base_options: _BaseOptions + + @doc_controls.do_not_generate_docs + def to_pb2(self) -> _FaceStylizerGraphOptionsProto: + """Generates a FaceStylizerOptions protobuf object.""" + base_options_proto = self.base_options.to_pb2() + base_options_proto.use_stream_mode = False + return _FaceStylizerGraphOptionsProto(base_options=base_options_proto) + + +class FaceAligner(base_vision_task_api.BaseVisionTaskApi): + """Class that performs face alignment on images.""" + + @classmethod + def create_from_model_path(cls, model_path: str) -> 'FaceAligner': + """Creates a `FaceAligner` object from a face landmarker task bundle and the default `FaceAlignerOptions`. + + Note that the created `FaceAligner` instance is in image mode, for + aligning one face on a single image input. + + Args: + model_path: Path to the face landmarker task bundle. + + Returns: + `FaceAligner` object that's created from the model file and the default + `FaceAlignerOptions`. + + Raises: + ValueError: If failed to create `FaceAligner` object from the provided + file such as invalid file path. + RuntimeError: If other types of error occurred. + """ + base_options = _BaseOptions(model_asset_path=model_path) + options = FaceAlignerOptions(base_options=base_options) + return cls.create_from_options(options) + + @classmethod + def create_from_options(cls, options: FaceAlignerOptions) -> 'FaceAligner': + """Creates the `FaceAligner` object from face aligner options. + + Args: + options: Options for the face aligner task. + + Returns: + `FaceAligner` object that's created from `options`. + + Raises: + ValueError: If failed to create `FaceAligner` object from + `FaceAlignerOptions` such as missing the model. + RuntimeError: If other types of error occurred. + """ + task_info = _TaskInfo( + task_graph=_TASK_GRAPH_NAME, + input_streams=[ + ':'.join([_IMAGE_TAG, _IMAGE_IN_STREAM_NAME]), + ':'.join([_NORM_RECT_TAG, _NORM_RECT_STREAM_NAME]), + ], + output_streams=[ + ':'.join([_FACE_ALIGNMENT_IMAGE_TAG, _FACE_ALIGNMENT_IMAGE_NAME]), + ':'.join([_IMAGE_TAG, _IMAGE_OUT_STREAM_NAME]), + ], + task_options=options, + ) + return cls( + task_info.generate_graph_config(enable_flow_limiting=False), + _RunningMode.IMAGE, + None, + ) + + def align( + self, + image: image_module.Image, + image_processing_options: Optional[_ImageProcessingOptions] = None, + ) -> image_module.Image: + """Performs face alignment on the provided MediaPipe Image. + + Only use this method when the FaceAligner is created with the image + running mode. + + Args: + image: MediaPipe Image. + image_processing_options: Options for image processing. + + Returns: + The aligned face image. The aligned output image size is the same as the + model output size. None if no face is detected on the input image. + + Raises: + ValueError: If any of the input arguments is invalid. + RuntimeError: If face alignment failed to run. + """ + normalized_rect = self.convert_to_normalized_rect( + image_processing_options, image + ) + output_packets = self._process_image_data({ + _IMAGE_IN_STREAM_NAME: packet_creator.create_image(image), + _NORM_RECT_STREAM_NAME: packet_creator.create_proto( + normalized_rect.to_pb2() + ), + }) + if output_packets[_FACE_ALIGNMENT_IMAGE_NAME].is_empty(): + return None + return packet_getter.get_image(output_packets[_FACE_ALIGNMENT_IMAGE_NAME])