diff --git a/mediapipe/python/BUILD b/mediapipe/python/BUILD index a7e777039..5877e7205 100644 --- a/mediapipe/python/BUILD +++ b/mediapipe/python/BUILD @@ -99,6 +99,7 @@ cc_library( "//mediapipe/tasks/cc/vision/image_classifier:image_classifier_graph", "//mediapipe/tasks/cc/vision/image_embedder:image_embedder_graph", "//mediapipe/tasks/cc/vision/image_segmenter:image_segmenter_graph", + "//mediapipe/tasks/cc/vision/interactive_segmenter:interactive_segmenter_graph", "//mediapipe/tasks/cc/vision/object_detector:object_detector_graph", ], ) diff --git a/mediapipe/tasks/python/test/vision/interactive_segmenter_test.py b/mediapipe/tasks/python/test/vision/interactive_segmenter_test.py new file mode 100644 index 000000000..e8c52ae3e --- /dev/null +++ b/mediapipe/tasks/python/test/vision/interactive_segmenter_test.py @@ -0,0 +1,332 @@ +# Copyright 2023 The MediaPipe Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for interactive segmenter.""" + +import enum +import os + +from absl.testing import absltest +from absl.testing import parameterized +import cv2 +import numpy as np + +from mediapipe.python._framework_bindings import image as image_module +from mediapipe.python._framework_bindings import image_frame +from mediapipe.tasks.python.components.containers import keypoint as keypoint_module +from mediapipe.tasks.python.components.containers import rect +from mediapipe.tasks.python.core import base_options as base_options_module +from mediapipe.tasks.python.test import test_utils +from mediapipe.tasks.python.vision import interactive_segmenter +from mediapipe.tasks.python.vision.core import image_processing_options as image_processing_options_module + +_BaseOptions = base_options_module.BaseOptions +_Image = image_module.Image +_ImageFormat = image_frame.ImageFormat +_NormalizedKeypoint = keypoint_module.NormalizedKeypoint +_Rect = rect.Rect +_OutputType = interactive_segmenter.InteractiveSegmenterOptions.OutputType +_InteractiveSegmenter = interactive_segmenter.InteractiveSegmenter +_InteractiveSegmenterOptions = interactive_segmenter.InteractiveSegmenterOptions +_RegionOfInterest = interactive_segmenter.RegionOfInterest +_Format = interactive_segmenter.RegionOfInterest.Format +_ImageProcessingOptions = image_processing_options_module.ImageProcessingOptions + +_MODEL_FILE = 'ptm_512_hdt_ptm_woid.tflite' +_CATS_AND_DOGS = 'cats_and_dogs.jpg' +_CATS_AND_DOGS_MASK_DOG_1 = 'cats_and_dogs_mask_dog1.png' +_CATS_AND_DOGS_MASK_DOG_2 = 'cats_and_dogs_mask_dog2.png' +_MASK_MAGNIFICATION_FACTOR = 255 +_MASK_SIMILARITY_THRESHOLD = 0.97 +_TEST_DATA_DIR = 'mediapipe/tasks/testdata/vision' + + +def _calculate_soft_iou(m1, m2): + intersection_sum = np.sum(m1 * m2) + union_sum = np.sum(m1 * m1) + np.sum(m2 * m2) - intersection_sum + + if union_sum > 0: + return intersection_sum / union_sum + else: + return 0 + + +def _similar_to_float_mask(actual_mask, expected_mask, similarity_threshold): + actual_mask = actual_mask.numpy_view() + expected_mask = expected_mask.numpy_view() / 255.0 + + return ( + actual_mask.shape == expected_mask.shape + and _calculate_soft_iou(actual_mask, expected_mask) > similarity_threshold + ) + + +def _similar_to_uint8_mask(actual_mask, expected_mask, similarity_threshold): + actual_mask_pixels = actual_mask.numpy_view().flatten() + expected_mask_pixels = expected_mask.numpy_view().flatten() + + consistent_pixels = 0 + num_pixels = len(expected_mask_pixels) + + for index in range(num_pixels): + consistent_pixels += ( + actual_mask_pixels[index] * _MASK_MAGNIFICATION_FACTOR + == expected_mask_pixels[index] + ) + + return consistent_pixels / num_pixels >= similarity_threshold + + +class ModelFileType(enum.Enum): + FILE_CONTENT = 1 + FILE_NAME = 2 + + +class InteractiveSegmenterTest(parameterized.TestCase): + + def setUp(self): + super().setUp() + # Load the test input image. + self.test_image = _Image.create_from_file( + test_utils.get_test_data_path( + os.path.join(_TEST_DATA_DIR, _CATS_AND_DOGS) + ) + ) + # Loads ground truth segmentation file. + self.test_seg_image = self._load_segmentation_mask( + _CATS_AND_DOGS_MASK_DOG_1 + ) + self.model_path = test_utils.get_test_data_path( + os.path.join(_TEST_DATA_DIR, _MODEL_FILE) + ) + + def _load_segmentation_mask(self, file_path: str): + # Loads ground truth segmentation file. + gt_segmentation_data = cv2.imread( + test_utils.get_test_data_path(os.path.join(_TEST_DATA_DIR, file_path)), + cv2.IMREAD_GRAYSCALE, + ) + return _Image(_ImageFormat.GRAY8, gt_segmentation_data) + + def test_create_from_file_succeeds_with_valid_model_path(self): + # Creates with default option and valid model file successfully. + with _InteractiveSegmenter.create_from_model_path( + self.model_path + ) as segmenter: + self.assertIsInstance(segmenter, _InteractiveSegmenter) + + def test_create_from_options_succeeds_with_valid_model_path(self): + # Creates with options containing model file successfully. + base_options = _BaseOptions(model_asset_path=self.model_path) + options = _InteractiveSegmenterOptions(base_options=base_options) + with _InteractiveSegmenter.create_from_options(options) as segmenter: + self.assertIsInstance(segmenter, _InteractiveSegmenter) + + def test_create_from_options_fails_with_invalid_model_path(self): + with self.assertRaisesRegex( + RuntimeError, 'Unable to open file at /path/to/invalid/model.tflite' + ): + base_options = _BaseOptions( + model_asset_path='/path/to/invalid/model.tflite' + ) + options = _InteractiveSegmenterOptions(base_options=base_options) + _InteractiveSegmenter.create_from_options(options) + + def test_create_from_options_succeeds_with_valid_model_content(self): + # Creates with options containing model content successfully. + with open(self.model_path, 'rb') as f: + base_options = _BaseOptions(model_asset_buffer=f.read()) + options = _InteractiveSegmenterOptions(base_options=base_options) + segmenter = _InteractiveSegmenter.create_from_options(options) + self.assertIsInstance(segmenter, _InteractiveSegmenter) + + @parameterized.parameters( + ( + ModelFileType.FILE_NAME, + _RegionOfInterest.Format.KEYPOINT, + _NormalizedKeypoint(0.44, 0.7), + _CATS_AND_DOGS_MASK_DOG_1, + 0.84, + ), + ( + ModelFileType.FILE_CONTENT, + _RegionOfInterest.Format.KEYPOINT, + _NormalizedKeypoint(0.44, 0.7), + _CATS_AND_DOGS_MASK_DOG_1, + 0.84, + ), + ( + ModelFileType.FILE_NAME, + _RegionOfInterest.Format.KEYPOINT, + _NormalizedKeypoint(0.66, 0.66), + _CATS_AND_DOGS_MASK_DOG_2, + _MASK_SIMILARITY_THRESHOLD, + ), + ( + ModelFileType.FILE_CONTENT, + _RegionOfInterest.Format.KEYPOINT, + _NormalizedKeypoint(0.66, 0.66), + _CATS_AND_DOGS_MASK_DOG_2, + _MASK_SIMILARITY_THRESHOLD, + ), + ) + def test_segment_succeeds_with_category_mask( + self, + model_file_type, + roi_format, + keypoint, + output_mask, + similarity_threshold, + ): + # Creates segmenter. + if model_file_type is ModelFileType.FILE_NAME: + base_options = _BaseOptions(model_asset_path=self.model_path) + elif model_file_type is ModelFileType.FILE_CONTENT: + with open(self.model_path, 'rb') as f: + model_content = f.read() + base_options = _BaseOptions(model_asset_buffer=model_content) + else: + # Should never happen + raise ValueError('model_file_type is invalid.') + + options = _InteractiveSegmenterOptions( + base_options=base_options, output_type=_OutputType.CATEGORY_MASK + ) + segmenter = _InteractiveSegmenter.create_from_options(options) + + # Performs image segmentation on the input. + roi = _RegionOfInterest(format=roi_format, keypoint=keypoint) + category_masks = segmenter.segment(self.test_image, roi) + self.assertLen(category_masks, 1) + category_mask = category_masks[0] + result_pixels = category_mask.numpy_view().flatten() + + # Check if data type of `category_mask` is correct. + self.assertEqual(result_pixels.dtype, np.uint8) + + # Loads ground truth segmentation file. + test_seg_image = self._load_segmentation_mask(output_mask) + + self.assertTrue( + _similar_to_uint8_mask( + category_masks[0], test_seg_image, similarity_threshold + ), + ( + 'Number of pixels in the candidate mask differing from that of the' + f' ground truth mask exceeds {similarity_threshold}.' + ), + ) + + # Closes the segmenter explicitly when the segmenter is not used in + # a context. + segmenter.close() + + @parameterized.parameters( + ( + _RegionOfInterest.Format.KEYPOINT, + _NormalizedKeypoint(0.44, 0.7), + _CATS_AND_DOGS_MASK_DOG_1, + 0.84, + ), + ( + _RegionOfInterest.Format.KEYPOINT, + _NormalizedKeypoint(0.66, 0.66), + _CATS_AND_DOGS_MASK_DOG_2, + _MASK_SIMILARITY_THRESHOLD, + ), + ) + def test_segment_succeeds_with_confidence_mask( + self, roi_format, keypoint, output_mask, similarity_threshold + ): + # Creates segmenter. + base_options = _BaseOptions(model_asset_path=self.model_path) + roi = _RegionOfInterest(format=roi_format, keypoint=keypoint) + + # Run segmentation on the model in CONFIDENCE_MASK mode. + options = _InteractiveSegmenterOptions( + base_options=base_options, output_type=_OutputType.CONFIDENCE_MASK + ) + + with _InteractiveSegmenter.create_from_options(options) as segmenter: + # Perform segmentation + confidence_masks = segmenter.segment(self.test_image, roi) + + # Check if confidence mask shape is correct. + self.assertLen( + confidence_masks, + 2, + 'Number of confidence masks must match with number of categories.', + ) + + # Loads ground truth segmentation file. + expected_mask = self._load_segmentation_mask(output_mask) + + self.assertTrue( + _similar_to_float_mask( + confidence_masks[1], expected_mask, similarity_threshold + ) + ) + + def test_segment_succeeds_with_rotation(self): + # Creates segmenter. + base_options = _BaseOptions(model_asset_path=self.model_path) + roi = _RegionOfInterest( + format=_RegionOfInterest.Format.KEYPOINT, + keypoint=_NormalizedKeypoint(0.66, 0.66), + ) + + # Run segmentation on the model in CONFIDENCE_MASK mode. + options = _InteractiveSegmenterOptions( + base_options=base_options, output_type=_OutputType.CONFIDENCE_MASK + ) + + with _InteractiveSegmenter.create_from_options(options) as segmenter: + # Perform segmentation + image_processing_options = _ImageProcessingOptions(rotation_degrees=-90) + confidence_masks = segmenter.segment( + self.test_image, roi, image_processing_options + ) + + # Check if confidence mask shape is correct. + self.assertLen( + confidence_masks, + 2, + 'Number of confidence masks must match with number of categories.', + ) + + def test_segment_fails_with_roi_in_image_processing_options(self): + # Creates segmenter. + base_options = _BaseOptions(model_asset_path=self.model_path) + roi = _RegionOfInterest( + format=_RegionOfInterest.Format.KEYPOINT, + keypoint=_NormalizedKeypoint(0.66, 0.66), + ) + + # Run segmentation on the model in CONFIDENCE_MASK mode. + options = _InteractiveSegmenterOptions( + base_options=base_options, output_type=_OutputType.CONFIDENCE_MASK + ) + + with self.assertRaisesRegex( + ValueError, "This task doesn't support region-of-interest." + ): + with _InteractiveSegmenter.create_from_options(options) as segmenter: + # Perform segmentation + image_processing_options = _ImageProcessingOptions( + _Rect(left=0.1, top=0, right=0.9, bottom=1) + ) + segmenter.segment(self.test_image, roi, image_processing_options) + + +if __name__ == '__main__': + absltest.main() diff --git a/mediapipe/tasks/python/vision/BUILD b/mediapipe/tasks/python/vision/BUILD index 2c0053b11..046ce2dc8 100644 --- a/mediapipe/tasks/python/vision/BUILD +++ b/mediapipe/tasks/python/vision/BUILD @@ -106,6 +106,28 @@ py_library( ], ) +py_library( + name = "interactive_segmenter", + srcs = [ + "interactive_segmenter.py", + ], + deps = [ + "//mediapipe/python:_framework_bindings", + "//mediapipe/python:packet_creator", + "//mediapipe/python:packet_getter", + "//mediapipe/tasks/cc/vision/image_segmenter/proto:image_segmenter_graph_options_py_pb2", + "//mediapipe/tasks/cc/vision/image_segmenter/proto:segmenter_options_py_pb2", + "//mediapipe/tasks/python/components/containers:keypoint", + "//mediapipe/tasks/python/core:base_options", + "//mediapipe/tasks/python/core:optional_dependencies", + "//mediapipe/tasks/python/core:task_info", + "//mediapipe/tasks/python/vision/core:base_vision_task_api", + "//mediapipe/tasks/python/vision/core:image_processing_options", + "//mediapipe/tasks/python/vision/core:vision_task_running_mode", + "//mediapipe/util:render_data_py_pb2", + ], +) + py_library( name = "gesture_recognizer", srcs = [ diff --git a/mediapipe/tasks/python/vision/interactive_segmenter.py b/mediapipe/tasks/python/vision/interactive_segmenter.py new file mode 100644 index 000000000..12a30b6ef --- /dev/null +++ b/mediapipe/tasks/python/vision/interactive_segmenter.py @@ -0,0 +1,254 @@ +# Copyright 2023 The MediaPipe Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""MediaPipe interactive segmenter task.""" + +import dataclasses +import enum +from typing import List, Optional + +from mediapipe.python import packet_creator +from mediapipe.python import packet_getter +from mediapipe.python._framework_bindings import image as image_module +from mediapipe.tasks.cc.vision.image_segmenter.proto import image_segmenter_graph_options_pb2 +from mediapipe.tasks.cc.vision.image_segmenter.proto import segmenter_options_pb2 +from mediapipe.tasks.python.components.containers import keypoint as keypoint_module +from mediapipe.tasks.python.core import base_options as base_options_module +from mediapipe.tasks.python.core import task_info as task_info_module +from mediapipe.tasks.python.core.optional_dependencies import doc_controls +from mediapipe.tasks.python.vision.core import base_vision_task_api +from mediapipe.tasks.python.vision.core import image_processing_options as image_processing_options_module +from mediapipe.tasks.python.vision.core import vision_task_running_mode +from mediapipe.util import render_data_pb2 + +_BaseOptions = base_options_module.BaseOptions +_RenderDataProto = render_data_pb2.RenderData +_SegmenterOptionsProto = segmenter_options_pb2.SegmenterOptions +_ImageSegmenterGraphOptionsProto = ( + image_segmenter_graph_options_pb2.ImageSegmenterGraphOptions +) +_RunningMode = vision_task_running_mode.VisionTaskRunningMode +_ImageProcessingOptions = image_processing_options_module.ImageProcessingOptions +_TaskInfo = task_info_module.TaskInfo + +_SEGMENTATION_OUT_STREAM_NAME = 'segmented_mask_out' +_SEGMENTATION_TAG = 'GROUPED_SEGMENTATION' +_IMAGE_IN_STREAM_NAME = 'image_in' +_IMAGE_OUT_STREAM_NAME = 'image_out' +_ROI_STREAM_NAME = 'roi_in' +_ROI_TAG = 'ROI' +_NORM_RECT_STREAM_NAME = 'norm_rect_in' +_NORM_RECT_TAG = 'NORM_RECT' +_IMAGE_TAG = 'IMAGE' +_TASK_GRAPH_NAME = ( + 'mediapipe.tasks.vision.interactive_segmenter.InteractiveSegmenterGraph' +) + + +@dataclasses.dataclass +class InteractiveSegmenterOptions: + """Options for the interactive segmenter task. + + Attributes: + base_options: Base options for the interactive segmenter task. + output_type: The output mask type allows specifying the type of + post-processing to perform on the raw model results. + """ + + class OutputType(enum.Enum): + UNSPECIFIED = 0 + CATEGORY_MASK = 1 + CONFIDENCE_MASK = 2 + + base_options: _BaseOptions + output_type: Optional[OutputType] = OutputType.CATEGORY_MASK + + @doc_controls.do_not_generate_docs + def to_pb2(self) -> _ImageSegmenterGraphOptionsProto: + """Generates an InteractiveSegmenterOptions protobuf object.""" + base_options_proto = self.base_options.to_pb2() + base_options_proto.use_stream_mode = False + segmenter_options_proto = _SegmenterOptionsProto( + output_type=self.output_type.value + ) + return _ImageSegmenterGraphOptionsProto( + base_options=base_options_proto, + segmenter_options=segmenter_options_proto, + ) + + +@dataclasses.dataclass +class RegionOfInterest: + """The Region-Of-Interest (ROI) to interact with.""" + + class Format(enum.Enum): + UNSPECIFIED = 0 + KEYPOINT = 1 + + format: Format + keypoint: Optional[keypoint_module.NormalizedKeypoint] = None + + +def _convert_roi_to_render_data(roi: RegionOfInterest) -> _RenderDataProto: + """Converts region of interest to render data proto.""" + result = _RenderDataProto() + + if roi is not None: + if roi.format == RegionOfInterest.Format.UNSPECIFIED: + raise ValueError('RegionOfInterest format not specified.') + + elif roi.format == RegionOfInterest.Format.KEYPOINT: + if roi.keypoint is not None: + annotation = result.render_annotations.add() + annotation.color.r = 255 + point = annotation.point + point.normalized = True + point.x = roi.keypoint.x + point.y = roi.keypoint.y + return result + else: + raise ValueError('Please specify the Region-of-interest for segmentation.') + + raise ValueError('Unrecognized format.') + + +class InteractiveSegmenter(base_vision_task_api.BaseVisionTaskApi): + """Class that performs interactive segmentation on images. + + Users can represent user interaction through `RegionOfInterest`, which gives + a hint to InteractiveSegmenter to perform segmentation focusing on the given + region of interest. + + The API expects a TFLite model with mandatory TFLite Model Metadata. + + Input tensor: + (kTfLiteUInt8/kTfLiteFloat32) + - image input of size `[batch x height x width x channels]`. + - batch inference is not supported (`batch` is required to be 1). + - RGB and greyscale inputs are supported (`channels` is required to be + 1 or 3). + - if type is kTfLiteFloat32, NormalizationOptions are required to be + attached to the metadata for input normalization. + Output tensors: + (kTfLiteUInt8/kTfLiteFloat32) + - list of segmented masks. + - if `output_type` is CATEGORY_MASK, uint8 Image, Image vector of size 1. + - if `output_type` is CONFIDENCE_MASK, float32 Image list of size + `channels`. + - batch is always 1 + + An example of such model can be found at: + https://tfhub.dev/tensorflow/lite-model/deeplabv3/1/metadata/2 + """ + + @classmethod + def create_from_model_path(cls, model_path: str) -> 'InteractiveSegmenter': + """Creates an `InteractiveSegmenter` object from a TensorFlow Lite model and the default `InteractiveSegmenterOptions`. + + Note that the created `InteractiveSegmenter` instance is in image mode, for + performing image segmentation on single image inputs. + + Args: + model_path: Path to the model. + + Returns: + `InteractiveSegmenter` object that's created from the model file and the + default `InteractiveSegmenterOptions`. + + Raises: + ValueError: If failed to create `InteractiveSegmenter` object from the + provided file such as invalid file path. + RuntimeError: If other types of error occurred. + """ + base_options = _BaseOptions(model_asset_path=model_path) + options = InteractiveSegmenterOptions(base_options=base_options) + return cls.create_from_options(options) + + @classmethod + def create_from_options( + cls, options: InteractiveSegmenterOptions + ) -> 'InteractiveSegmenter': + """Creates the `InteractiveSegmenter` object from interactive segmenter options. + + Args: + options: Options for the interactive segmenter task. + + Returns: + `InteractiveSegmenter` object that's created from `options`. + + Raises: + ValueError: If failed to create `InteractiveSegmenter` object from + `InteractiveSegmenterOptions` such as missing the model. + RuntimeError: If other types of error occurred. + """ + + task_info = _TaskInfo( + task_graph=_TASK_GRAPH_NAME, + input_streams=[ + ':'.join([_IMAGE_TAG, _IMAGE_IN_STREAM_NAME]), + ':'.join([_ROI_TAG, _ROI_STREAM_NAME]), + ':'.join([_NORM_RECT_TAG, _NORM_RECT_STREAM_NAME]), + ], + output_streams=[ + ':'.join([_SEGMENTATION_TAG, _SEGMENTATION_OUT_STREAM_NAME]), + ':'.join([_IMAGE_TAG, _IMAGE_OUT_STREAM_NAME]), + ], + task_options=options, + ) + return cls( + task_info.generate_graph_config(enable_flow_limiting=False), + _RunningMode.IMAGE, + None, + ) + + def segment( + self, + image: image_module.Image, + roi: RegionOfInterest, + image_processing_options: Optional[_ImageProcessingOptions] = None, + ) -> List[image_module.Image]: + """Performs the actual segmentation task on the provided MediaPipe Image. + + The image can be of any size with format RGB. + + Args: + image: MediaPipe Image. + roi: Optional user-specified region of interest for segmentation. + image_processing_options: Options for image processing. + + Returns: + If the output_type is CATEGORY_MASK, the returned vector of images is + per-category segmented image mask. + If the output_type is CONFIDENCE_MASK, the returned vector of images + contains only one confidence image mask. A segmentation result object that + contains a list of segmentation masks as images. + + Raises: + ValueError: If any of the input arguments is invalid. + RuntimeError: If image segmentation failed to run. + """ + normalized_rect = self.convert_to_normalized_rect( + image_processing_options, image, roi_allowed=False + ) + render_data_proto = _convert_roi_to_render_data(roi) + output_packets = self._process_image_data({ + _IMAGE_IN_STREAM_NAME: packet_creator.create_image(image), + _ROI_STREAM_NAME: packet_creator.create_proto(render_data_proto), + _NORM_RECT_STREAM_NAME: packet_creator.create_proto( + normalized_rect.to_pb2() + ), + }) + segmentation_result = packet_getter.get_image_list( + output_packets[_SEGMENTATION_OUT_STREAM_NAME] + ) + return segmentation_result