forked from anki/vector-python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcamera.py
More file actions
432 lines (332 loc) · 16 KB
/
camera.py
File metadata and controls
432 lines (332 loc) · 16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
# Copyright (c) 2018 Anki, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License in the file LICENSE.txt or at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Support for Vector's camera.
Vector has a built-in camera which he uses to observe the world around him.
The :class:`CameraComponent` class defined in this module is made available as
:attr:`anki_vector.robot.Robot.camera` and can be used to enable/disable image
sending and observe images being sent by the robot. It emits :class:`EvtNewRawCameraImage`
and :class:`EvtNewCameraImage` objects whenever a new camera image is available.
The camera resolution is 1280 x 720 with a field of view of 90 deg (H) x 50 deg (V).
"""
# __all__ should order by constants, event classes, other classes, functions.
__all__ = ["EvtNewRawCameraImage", "EvtNewCameraImage",
"CameraComponent", "CameraImage"]
import asyncio
from concurrent.futures import CancelledError
import io
import time
import sys
from . import annotate, connection, util
from .events import Events
from .exceptions import VectorCameraFeedException
from .messaging import protocol
try:
import numpy as np
except ImportError:
sys.exit("Cannot import numpy: Do `pip3 install numpy` to install")
try:
from PIL import Image
except ImportError:
sys.exit("Cannot import from PIL: Do `pip3 install --user Pillow` to install")
def _convert_to_pillow_image(image_data: bytes) -> Image.Image:
"""Convert raw image bytes to a Pillow Image."""
size = len(image_data)
# Constuct numpy array out of source data
array = np.empty(size, dtype=np.uint8)
array[0:size] = list(image_data)
# Decode compressed source data into uncompressed image data
image = Image.open(io.BytesIO(array))
return image
class CameraImage:
"""A single image from the robot's camera.
This wraps a raw image and provides an :meth:`annotate_image` method
that can resize and add dynamic annotations to the image, such as
marking up the location of objects and faces.
.. testcode::
import anki_vector
with anki_vector.Robot() as robot:
image = robot.camera.capture_single_image()
print(f"Displaying image with id {image.image_id}, received at {image.image_recv_time}")
image.raw_image.show()
:param raw_image: The raw unprocessed image from the camera.
:param image_annotator: The image annotation object.
:param image_id: An image number that increments on every new image received.
"""
def __init__(self, raw_image: Image.Image, image_annotator: annotate.ImageAnnotator, image_id: int):
self._raw_image = raw_image
self._image_annotator = image_annotator
self._image_id = image_id
self._image_recv_time = time.time()
@property
def raw_image(self) -> Image.Image:
"""The raw unprocessed image from the camera."""
return self._raw_image
@property
def image_id(self) -> int:
"""An image number that increments on every new image received."""
return self._image_id
@property
def image_recv_time(self) -> float:
"""The time the image was received and processed by the SDK."""
return self._image_recv_time
def annotate_image(self, scale: float = None, fit_size: tuple = None, resample_mode: int = annotate.RESAMPLE_MODE_NEAREST) -> Image.Image:
"""Adds any enabled annotations to the image.
Optionally resizes the image prior to annotations being applied. The
aspect ratio of the resulting image always matches that of the raw image.
.. testcode::
import anki_vector
with anki_vector.Robot() as robot:
image = robot.camera.capture_single_image()
annotated_image = image.annotate_image()
annotated_image.show()
:param scale: If set then the base image will be scaled by the
supplied multiplier. Cannot be combined with fit_size
:param fit_size: If set, then scale the image to fit inside
the supplied (width, height) dimensions. The original aspect
ratio will be preserved. Cannot be combined with scale.
:param resample_mode: The resampling mode to use when scaling the
image. Should be either :attr:`~anki_vector.annotate.RESAMPLE_MODE_NEAREST`
(fast) or :attr:`~anki_vector.annotate.RESAMPLE_MODE_BILINEAR` (slower,
but smoother).
"""
return self._image_annotator.annotate_image(self._raw_image,
scale=scale,
fit_size=fit_size,
resample_mode=resample_mode)
class CameraComponent(util.Component):
"""Represents Vector's camera.
The CameraComponent object receives images from Vector's camera, unpacks the data,
composes it and makes it available as latest_image.
The :class:`anki_vector.robot.Robot` or :class:`anki_vector.robot.AsyncRobot` instance observes the camera.
.. testcode::
import anki_vector
with anki_vector.Robot() as robot:
robot.camera.init_camera_feed()
image = robot.camera.latest_image
image.raw_image.show()
:param robot: A reference to the owner Robot object.
"""
#: callable: The factory function that returns an
#: :class:`annotate.ImageAnnotator` class or subclass instance.
annotator_factory = annotate.ImageAnnotator
def __init__(self, robot):
super().__init__(robot)
self._image_annotator: annotate.ImageAnnotator = self.annotator_factory(self.robot.world)
self._latest_image: CameraImage = None
self._latest_image_id: int = None
self._camera_feed_task: asyncio.Task = None
self._enabled = False
@property
@util.block_while_none()
def latest_image(self) -> CameraImage:
""":class:`Image.Image`: The most recently processed image received from the robot.
The resolution of latest_image is 640x360.
:getter: Returns the Pillow Image representing the latest image
.. testcode::
import anki_vector
with anki_vector.Robot() as robot:
robot.camera.init_camera_feed()
image = robot.camera.latest_image
image.raw_image.show()
"""
if not self._camera_feed_task:
raise VectorCameraFeedException()
return self._latest_image
@property
@util.block_while_none()
def latest_image_id(self) -> int:
"""The most recently processed image's id received from the robot.
Used only to track chunks of the same image.
:getter: Returns the id for the latest image
.. testcode::
import anki_vector
with anki_vector.Robot() as robot:
robot.camera.init_camera_feed()
image = robot.camera.latest_image
image.raw_image.show()
print(f"latest_image_id: {robot.camera.latest_image_id}")
"""
if not self._camera_feed_task:
raise VectorCameraFeedException()
return self._latest_image_id
@property
def image_annotator(self) -> annotate.ImageAnnotator:
"""The image annotator used to add annotations to the raw camera images.
.. testcode::
import time
import anki_vector
with anki_vector.Robot(show_viewer=True) as robot:
# Annotations (enabled by default) are displayed on the camera feed
time.sleep(5)
# Disable all annotations
robot.camera.image_annotator.annotation_enabled = False
time.sleep(5)
"""
return self._image_annotator
def init_camera_feed(self) -> None:
"""Begin camera feed task.
.. testcode::
import anki_vector
with anki_vector.Robot() as robot:
robot.camera.init_camera_feed()
image = robot.camera.latest_image
image.raw_image.show()
"""
if not self._camera_feed_task or self._camera_feed_task.done():
self._enabled = True
self._camera_feed_task = self.conn.loop.create_task(self._request_and_handle_images())
def close_camera_feed(self) -> None:
"""Cancel camera feed task."""
if self._camera_feed_task:
self._enabled = False
self._camera_feed_task.cancel()
future = self.conn.run_coroutine(self._camera_feed_task)
try:
future.result()
except CancelledError:
self.logger.debug('Camera feed task was cancelled. This is expected during disconnection.')
# wait for streaming to end, up to 10 seconds
iterations = 0
max_iterations = 100
while self.image_streaming_enabled():
time.sleep(0.1)
iterations += 1
if iterations > max_iterations:
# leave loop, even if streaming is still enabled
# because other SDK functions will still work and
# the RPC should have had enough time to finish
# which means we _should_ be in a good state.
self.logger.info('Camera Feed closed, but streaming on'
' robot remained enabled. This is unexpected.')
break
self._camera_feed_task = None
async def _image_streaming_enabled(self) -> bool:
"""request streaming enabled status from the robot"""
request = protocol.IsImageStreamingEnabledRequest()
response = await self.conn.grpc_interface.IsImageStreamingEnabled(request)
enabled = False
if response:
enabled = response.is_image_streaming_enabled
return enabled
def image_streaming_enabled(self) -> bool:
"""True if image streaming is enabled on the robot
.. testcode::
import anki_vector
with anki_vector.Robot() as robot:
image_streaming_enabled = robot.camera.image_streaming_enabled()
if image_streaming_enabled:
print("Robot is streaming video")
else:
print("Robot is not streaming video")
"""
future = self.conn.run_coroutine(self._image_streaming_enabled())
return future.result()
def _unpack_image(self, msg: protocol.CameraFeedResponse) -> None:
"""Processes raw data from the robot into a more useful image structure."""
image = _convert_to_pillow_image(msg.data)
self._latest_image = CameraImage(image, self._image_annotator, msg.image_id)
self._latest_image_id = msg.image_id
self.conn.run_soon(self.robot.events.dispatch_event(EvtNewRawCameraImage(image),
Events.new_raw_camera_image))
self.conn.run_soon(self.robot.events.dispatch_event(EvtNewCameraImage(self._latest_image),
Events.new_camera_image))
if self._image_annotator.annotation_enabled:
image = self._image_annotator.annotate_image(image)
self.robot.viewer.enqueue_frame(image)
async def _request_and_handle_images(self) -> None:
"""Queries and listens for camera feed events from the robot.
Received events are parsed by a helper function."""
try:
req = protocol.CameraFeedRequest()
async for evt in self.grpc_interface.CameraFeed(req):
# If the camera feed is disabled after stream is setup, exit the stream
# (the camera feed on the robot is disabled internally on stream exit)
if not self._enabled:
self.logger.warning('Camera feed has been disabled. Enable the feed to start/continue receiving camera feed data')
return
self._unpack_image(evt)
except CancelledError:
self.logger.debug('Camera feed task was cancelled. This is expected during disconnection.')
@connection.on_connection_thread()
async def capture_single_image(self) -> CameraImage:
"""Request to capture a single image from the robot's camera.
This call requests the robot to capture an image and returns the
received image, formatted as a Pillow image. This differs from `latest_image`,
which maintains the last image received from the camera feed (if enabled).
Note that when the camera feed is enabled this call returns the `latest_image`.
.. testcode::
import anki_vector
with anki_vector.Robot() as robot:
image = robot.camera.capture_single_image()
image.raw_image.show()
"""
if self._enabled:
return self._latest_image
req = protocol.CaptureSingleImageRequest()
res = await self.grpc_interface.CaptureSingleImage(req)
if res and res.data:
image = _convert_to_pillow_image(res.data)
return CameraImage(image, self._image_annotator, res.image_id)
self.logger.error('Failed to capture a single image')
class EvtNewRawCameraImage: # pylint: disable=too-few-public-methods
"""Dispatched when a new raw image is received from the robot's camera.
See also :class:`~anki_vector.camera.EvtNewCameraImage` which provides access
to both the raw image and a scaled and annotated version.
.. testcode::
import threading
import anki_vector
from anki_vector import events
def on_new_raw_camera_image(robot, event_type, event, done):
print("Display new camera image")
event.image.show()
done.set()
with anki_vector.Robot() as robot:
robot.camera.init_camera_feed()
done = threading.Event()
robot.events.subscribe(on_new_raw_camera_image, events.Events.new_raw_camera_image, done)
print("------ waiting for camera events, press ctrl+c to exit early ------")
try:
if not done.wait(timeout=5):
print("------ Did not receive a new camera image! ------")
except KeyboardInterrupt:
pass
:param image: A raw camera image.
"""
def __init__(self, image: Image.Image):
self.image = image
class EvtNewCameraImage: # pylint: disable=too-few-public-methods
"""Dispatched when a new camera image is received and processed from the robot's camera.
.. testcode::
import threading
import anki_vector
from anki_vector import events
def on_new_camera_image(robot, event_type, event, done):
print(f"Display new annotated camera image with id {event.image.image_id}")
annotated_image = event.image.annotate_image()
annotated_image.show()
done.set()
with anki_vector.Robot(enable_face_detection=True, enable_custom_object_detection=True) as robot:
robot.camera.init_camera_feed()
done = threading.Event()
robot.events.subscribe(on_new_camera_image, events.Events.new_camera_image, done)
print("------ waiting for camera events, press ctrl+c to exit early ------")
try:
if not done.wait(timeout=5):
print("------ Did not receive a new camera image! ------")
except KeyboardInterrupt:
pass
:param: A wrapped camera image object that contains the raw image.
"""
def __init__(self, image: CameraImage):
self.image = image