forked from anki/vector-python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvision.py
More file actions
182 lines (142 loc) · 7.22 KB
/
vision.py
File metadata and controls
182 lines (142 loc) · 7.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# Copyright (c) 2018 Anki, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License in the file LICENSE.txt or at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility methods for Vector's vision
Vector's can detect various types of objects through his camera feed.
The :class:`VisionComponent` class defined in this module is made available as
:attr:`anki_vector.robot.Robot.vision` and can be used to enable/disable vision
processing on the robot.
"""
# __all__ should order by constants, event classes, other classes, functions.
__all__ = ['VisionComponent']
from concurrent import futures
from . import util, connection, events
from .messaging import protocol
class VisionComponent(util.Component): # pylint: disable=too-few-public-methods
"""VisionComponent exposes controls for the robot's internal image processing.
The :class:`anki_vector.robot.Robot` or :class:`anki_vector.robot.AsyncRobot` instance owns this vision component.
:param robot: A reference to the owner Robot object.
"""
def __init__(self, robot):
super().__init__(robot)
self._detect_faces = False
self._detect_custom_objects = False
# TODO implement
# self._detect_motion = False
self._display_camera_feed_on_face = False
robot.events.subscribe(self._handle_mirror_mode_disabled_event, events.Events.mirror_mode_disabled)
robot.events.subscribe(self._handle_vision_modes_auto_disabled_event, events.Events.vision_modes_auto_disabled)
def close(self):
"""Close all the running vision modes and wait for a response."""
vision_mode = self.disable_all_vision_modes() # pylint: disable=assignment-from-no-return
if isinstance(vision_mode, futures.Future):
vision_mode.result()
def _handle_mirror_mode_disabled_event(self, _robot, _event_type, _msg):
self._display_camera_feed_on_face = False
def _handle_vision_modes_auto_disabled_event(self, _robot, _event_type, _msg):
self._detect_faces = False
self._detect_custom_objects = False
# self._detect_motion = False
self._display_camera_feed_on_face = False
@property
def detect_faces(self):
return self._detect_faces
@property
def detect_custom_objects(self):
return self._detect_custom_objects
# TODO implement
# @property
# def detect_motion(self):
# return self._detect_motion
@property
def display_camera_feed_on_face(self):
return self._display_camera_feed_on_face
@connection.on_connection_thread()
async def disable_all_vision_modes(self):
if self.detect_faces:
await self.enable_face_detection(False, False)
if self.detect_custom_objects:
await self.enable_custom_object_detection(False)
# if self.detect_motion:
# await self.enable_motion_detection(False)
if self.display_camera_feed_on_face:
await self.enable_display_camera_feed_on_face(False)
@connection.on_connection_thread()
async def enable_custom_object_detection(self, detect_custom_objects: bool = True):
"""Enable custom object detection on the robot's camera.
If custom object detection is being turned off, the robot may still choose to keep it on
if another subscriber (including one internal to the robot) requests this vision mode be active.
See :class:`objects.CustomObjectMarkers`.
:param detect_custom_objects: Specify whether we want the robot to detect custom objects.
.. testcode::
import anki_vector
with anki_vector.Robot() as robot:
robot.vision.enable_custom_object_detection()
"""
self._detect_custom_objects = detect_custom_objects
enable_marker_detection_request = protocol.EnableMarkerDetectionRequest(enable=detect_custom_objects)
return await self.grpc_interface.EnableMarkerDetection(enable_marker_detection_request)
@connection.on_connection_thread()
async def enable_face_detection(
self,
detect_faces: bool = True,
# detect_smile: bool = False,
estimate_expression: bool = False,
# detect_blink: bool = False,
# detect_gaze: bool = False
):
"""Enable face detection on the robot's camera
:param detect_faces: Specify whether we want the robot to detect faces.
:param detect_smile: Specify whether we want the robot to detect smiles in detected faces.
:param estimate_expression: Specify whether we want the robot to estimate what expression detected faces are showing.
:param detect_blink: Specify whether we want the robot to detect how much detected faces are blinking.
:param detect_gaze: Specify whether we want the robot to detect where detected faces are looking.
"""
self._detect_faces = detect_faces
enable_face_detection_request = protocol.EnableFaceDetectionRequest(
enable=detect_faces,
enable_smile_detection=False,
enable_expression_estimation=estimate_expression,
enable_blink_detection=False,
enable_gaze_detection=False)
return await self.grpc_interface.EnableFaceDetection(enable_face_detection_request)
# TODO implement
# @connection.on_connection_thread()
# async def enable_motion_detection(self, detect_motion: bool = True):
# """Enable motion detection on the robot's camera
#
# :param detect_motion: Specify whether we want the robot to detect motion.
#
# .. testcode::
#
# import anki_vector
# with anki_vector.Robot() as robot:
# robot.vision.enable_motion_detection(detect_motion=True)
# """
# self._detect_motion = detect_motion
# enable_motion_detection_request = protocol.EnableMotionDetectionRequest(enable=detect_motion)
# return await self.grpc_interface.EnableMotionDetection(enable_motion_detection_request)
@connection.on_connection_thread()
async def enable_display_camera_feed_on_face(self, display_camera_feed_on_face: bool = True):
"""Display the robot's camera feed on its face along with any detections (if enabled)
:param display_camera_feed_on_face: Specify whether we want to display the robot's camera feed on its face.
.. testcode::
import anki_vector
import time
with anki_vector.Robot() as robot:
robot.vision.enable_display_camera_feed_on_face()
time.sleep(10.0)
"""
self._display_camera_feed_on_face = display_camera_feed_on_face
display_camera_feed_request = protocol.EnableMirrorModeRequest(enable=display_camera_feed_on_face)
return await self.grpc_interface.EnableMirrorMode(display_camera_feed_request)