-
Notifications
You must be signed in to change notification settings - Fork 745
Expand file tree
/
Copy pathengine.py
More file actions
1103 lines (927 loc) · 40.4 KB
/
engine.py
File metadata and controls
1103 lines (927 loc) · 40.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright 2017 The dm_control Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Mujoco `Physics` implementation and helper classes.
The `Physics` class provides the main Python interface to MuJoCo.
MuJoCo models are defined using the MJCF XML format. The `Physics` class
can load a model from a path to an XML file, an XML string, or from a serialized
MJB binary format. See the named constructors for each of these cases.
Each `Physics` instance defines a simulated world. To step forward the
simulation, use the `step` method. To set a control or actuation signal, use the
`set_control` method, which will apply the provided signal to the actuators in
subsequent calls to `step`.
Use the `Camera` class to create RGB or depth images. A `Camera` can render its
viewport to an array using the `render` method, and can query for objects
visible at specific positions using the `select` method. The `Physics` class
also provides a `render` method that returns a pixel array directly.
"""
import collections
import contextlib
import threading
from typing import Callable, NamedTuple, Optional, Union
from absl import logging
from dm_control import _render
from dm_control.mujoco import index
from dm_control.mujoco import wrapper
from dm_control.mujoco.wrapper import util
from dm_control.rl import control as _control
from dm_env import specs
import mujoco
import numpy as np
_FONT_STYLES = {
'normal': mujoco.mjtFont.mjFONT_NORMAL,
'shadow': mujoco.mjtFont.mjFONT_SHADOW,
'big': mujoco.mjtFont.mjFONT_BIG,
}
_GRID_POSITIONS = {
'top left': mujoco.mjtGridPos.mjGRID_TOPLEFT,
'top right': mujoco.mjtGridPos.mjGRID_TOPRIGHT,
'bottom left': mujoco.mjtGridPos.mjGRID_BOTTOMLEFT,
'bottom right': mujoco.mjtGridPos.mjGRID_BOTTOMRIGHT,
}
Contexts = collections.namedtuple('Contexts', ['gl', 'mujoco'])
Selected = collections.namedtuple(
'Selected', ['body', 'geom', 'flex', 'skin', 'world_position'])
NamedIndexStructs = collections.namedtuple(
'NamedIndexStructs', ['model', 'data'])
Pose = collections.namedtuple(
'Pose', ['lookat', 'distance', 'azimuth', 'elevation'])
_BOTH_SEGMENTATION_AND_DEPTH_ENABLED = (
'`segmentation` and `depth` cannot both be `True`.')
_INVALID_PHYSICS_STATE = (
'Physics state is invalid. Warning(s) raised: {warning_names}')
_OVERLAYS_NOT_SUPPORTED_FOR_DEPTH_OR_SEGMENTATION = (
'Overlays are not supported with depth or segmentation rendering.')
_RENDER_FLAG_OVERRIDES_NOT_SUPPORTED_FOR_DEPTH_OR_SEGMENTATION = (
'`render_flag_overrides` are not supported for depth or segmentation '
'rendering.')
_KEYFRAME_ID_OUT_OF_RANGE = (
'`keyframe_id` must be between 0 and {max_valid} inclusive, got: {actual}.')
class Physics(_control.Physics):
"""Encapsulates a MuJoCo model.
A MuJoCo model is typically defined by an MJCF XML file [0]
```python
physics = Physics.from_xml_path('/path/to/model.xml')
with physics.reset_context():
physics.named.data.qpos['hinge'] = np.random.rand()
# Apply controls and advance the simulation state.
physics.set_control(np.random.random_sample(size=N_ACTUATORS))
physics.step()
# Render a camera defined in the XML file to a NumPy array.
rgb = physics.render(height=240, width=320, id=0)
```
[0] http://www.mujoco.org/book/modeling.html
"""
_contexts = None
def __new__(cls, *args, **kwargs):
# TODO(b/174603485): Re-enable once lint stops spuriously firing here.
obj = super(Physics, cls).__new__(cls) # pylint: disable=no-value-for-parameter
# The lock is created in `__new__` rather than `__init__` because there are
# a number of existing subclasses that override `__init__` without calling
# the `__init__` method of the superclass.
obj._contexts_lock = threading.Lock() # pylint: disable=protected-access
return obj
def __init__(self, data):
"""Initializes a new `Physics` instance.
Args:
data: Instance of `wrapper.MjData`.
"""
self._warnings_cause_exception = True
self._reload_from_data(data)
@contextlib.contextmanager
def suppress_physics_errors(self):
"""Physics warnings will be logged rather than raise exceptions."""
prev_state = self._warnings_cause_exception
self._warnings_cause_exception = False
try:
yield
finally:
self._warnings_cause_exception = prev_state
def enable_profiling(self):
"""Enables Mujoco timing profiling."""
wrapper.enable_timer(True)
def set_control(self, control):
"""Sets the control signal for the actuators.
Args:
control: NumPy array or array-like actuation values.
"""
np.copyto(self.data.ctrl, control)
def _step_with_up_to_date_position_velocity(self, nstep: int = 1) -> None:
"""Physics step with up-to-date position and velocity dependent fields."""
# In the case of Euler integration we assume mj_step1 has already been
# called for this state, finish the step with mj_step2 and then update all
# position and velocity related fields with mj_step1. This ensures that
# (most of) mjData is in sync with qpos and qvel. In the case of non-Euler
# integrators (e.g. RK4) an additional mj_step1 must be called after the
# last mj_step to ensure mjData syncing.
if self.model.opt.integrator != mujoco.mjtIntegrator.mjINT_RK4.value:
mujoco.mj_step2(self.model.ptr, self.data.ptr)
if nstep > 1:
mujoco.mj_step(self.model.ptr, self.data.ptr, nstep-1)
else:
mujoco.mj_step(self.model.ptr, self.data.ptr, nstep)
mujoco.mj_step1(self.model.ptr, self.data.ptr)
def step(self, nstep: int = 1) -> None:
"""Advances the physics state by `nstep`s.
Args:
nstep: Optional integer, number of steps to take.
The actuation can be updated by calling the `set_control` function first.
"""
with self.check_invalid_state():
if self.legacy_step:
self._step_with_up_to_date_position_velocity(nstep)
else:
mujoco.mj_step(self.model.ptr, self.data.ptr, nstep)
def render(
self,
height=240,
width=320,
camera_id=-1,
overlays=(),
depth=False,
segmentation=False,
scene_option=None,
render_flag_overrides=None,
scene_callback: Optional[Callable[['Physics', mujoco.MjvScene],
None]] = None,
):
"""Returns a camera view as a NumPy array of pixel values.
Args:
height: Viewport height (number of pixels). Optional, defaults to 240.
width: Viewport width (number of pixels). Optional, defaults to 320.
camera_id: Optional camera name or index. Defaults to -1, the free
camera, which is always defined. A nonnegative integer or string
corresponds to a fixed camera, which must be defined in the model XML.
If `camera_id` is a string then the camera must also be named.
overlays: An optional sequence of `TextOverlay` instances to draw. Only
supported if `depth` is False.
depth: If `True`, this method returns a NumPy float array of depth values
(in meters). Defaults to `False`, which results in an RGB image.
segmentation: If `True`, this method returns a 2-channel NumPy int32 array
of label values where the pixels of each object are labeled with the
pair (mjModel ID, mjtObj enum object type). Background pixels are
labeled (-1, -1). Defaults to `False`, which returns an RGB image.
scene_option: An optional `wrapper.MjvOption` instance that can be used to
render the scene with custom visualization options. If None then the
default options will be used.
render_flag_overrides: Optional mapping specifying rendering flags to
override. The keys can be either lowercase strings or `mjtRndFlag` enum
values, and the values are the overridden flag values, e.g.
`{'wireframe': True}` or `{mujoco.mjtRndFlag.mjRND_WIREFRAME: True}`.
See `mujoco.mjtRndFlag` for the set of valid flags. Must be None if
either `depth` or `segmentation` is True.
scene_callback: Called after the scene has been created and before
it is rendered. Can be used to add more geoms to the scene.
Returns:
The rendered RGB, depth or segmentation image.
"""
camera = Camera(
physics=self,
height=height,
width=width,
camera_id=camera_id,
scene_callback=scene_callback)
image = camera.render(
overlays=overlays, depth=depth, segmentation=segmentation,
scene_option=scene_option, render_flag_overrides=render_flag_overrides)
camera._scene.free() # pylint: disable=protected-access
return image
def get_state(self, sig=None):
"""Returns the physics state.
Args:
sig: Optional integer, if specified then the returned array corresponds to
the state obtained by calling `mj_getState` with `sig`.
Returns:
NumPy array containing full physics simulation state.
"""
if sig is None:
return np.concatenate(self._physics_state_items())
else:
retval = np.empty(mujoco.mj_stateSize(self.model.ptr, sig), np.float64)
mujoco.mj_getState(self.model.ptr, self.data.ptr, retval, sig)
return retval
def set_state(self, physics_state, sig=None):
"""Sets the physics state.
Args:
physics_state: NumPy array containing the full physics simulation state.
sig: Optional integer, if specified then physics_state is passed directly
to `mj_setState` with `sig`.
Raises:
ValueError: If `physics_state` has invalid size.
"""
if sig is None:
state_items = self._physics_state_items()
expected_shape = (sum(item.size for item in state_items),)
if expected_shape != physics_state.shape:
raise ValueError(
'Input physics state has shape {}. Expected {}.'.format(
physics_state.shape, expected_shape
)
)
start = 0
for state_item in state_items:
size = state_item.size
np.copyto(state_item, physics_state[start:start + size])
start += size
else:
mujoco.mj_setState(
self.model.ptr,
self.data.ptr,
np.asarray(physics_state, np.float64),
sig,
)
def copy(self, share_model=False):
"""Creates a copy of this `Physics` instance.
Args:
share_model: If True, the copy and the original will share a common
MjModel instance. By default, both model and data will both be copied.
Returns:
A `Physics` instance.
"""
new_data = self.data._make_copy(share_model=share_model) # pylint: disable=protected-access
cls = self.__class__
new_obj = cls.__new__(cls)
# pylint: disable=protected-access
new_obj._warnings_cause_exception = True
new_obj._reload_from_data(new_data)
# pylint: enable=protected-access
return new_obj
def reset(self, keyframe_id=None):
"""Resets internal variables of the simulation, possibly to a keyframe.
Args:
keyframe_id: Optional integer specifying the index of a keyframe defined
in the model XML to which the simulation state should be initialized.
Must be between 0 and `self.model.nkey - 1` (inclusive).
Raises:
ValueError: If `keyframe_id` is out of range.
"""
if keyframe_id is None:
mujoco.mj_resetData(self.model.ptr, self.data.ptr)
else:
if not 0 <= keyframe_id < self.model.nkey:
raise ValueError(_KEYFRAME_ID_OUT_OF_RANGE.format(
max_valid=self.model.nkey-1, actual=keyframe_id))
mujoco.mj_resetDataKeyframe(self.model.ptr, self.data.ptr, keyframe_id)
# Disable actuation since we don't yet have meaningful control inputs.
with self.model.disable('actuation'):
self.forward()
def after_reset(self):
"""Runs after resetting internal variables of the physics simulation."""
# Disable actuation since we don't yet have meaningful control inputs.
with self.model.disable('actuation'):
self.forward()
def forward(self):
"""Recomputes the forward dynamics without advancing the simulation."""
# Note: `mj_forward` differs from `mj_step1` in that it also recomputes
# quantities that depend on acceleration (and therefore on the state of the
# controls). For example `mj_forward` updates accelerometer and gyro
# readings, whereas `mj_step1` does not.
# http://www.mujoco.org/book/programming.html#siForward
with self.check_invalid_state():
mujoco.mj_forward(self.model.ptr, self.data.ptr)
@contextlib.contextmanager
def check_invalid_state(self):
"""Checks whether the physics state is invalid at exit.
Yields:
None
Raises:
PhysicsError: if the simulation state is invalid at exit, unless this
context is nested inside a `suppress_physics_errors` context, in which
case a warning will be logged instead.
"""
np.copyto(self._warnings_before, self._warnings)
yield
np.greater(self._warnings, self._warnings_before, out=self._new_warnings)
if any(self._new_warnings):
warning_names = np.compress(self._new_warnings,
list(mujoco.mjtWarning.__members__))
message = _INVALID_PHYSICS_STATE.format(
warning_names=', '.join(warning_names))
if self._warnings_cause_exception:
raise _control.PhysicsError(message)
else:
logging.warn(message)
def __getstate__(self):
return self.data # All state is assumed to reside within `self.data`.
def __setstate__(self, data):
# Note: `_contexts_lock` is normally created in `__new__`, but `__new__` is
# not invoked during unpickling.
self._contexts_lock = threading.Lock()
self._warnings_cause_exception = True
self._reload_from_data(data)
def _reload_from_model(self, model):
"""Initializes a new or existing `Physics` from a `wrapper.MjModel`.
Creates a new `wrapper.MjData` instance, then delegates to
`_reload_from_data`.
Args:
model: Instance of `wrapper.MjModel`.
"""
data = wrapper.MjData(model)
self._reload_from_data(data)
def _reload_from_data(self, data):
"""Initializes a new or existing `Physics` instance from a `wrapper.MjData`.
Assigns all attributes, sets up named indexing, and creates rendering
contexts if rendering is enabled.
The default constructor as well as the other `reload_from` methods should
delegate to this method.
Args:
data: Instance of `wrapper.MjData`.
"""
if not isinstance(data, wrapper.MjData):
raise TypeError(f'Expected wrapper.MjData. Got: {type(data)}.')
self._data = data
# Performance optimization: pre-allocate numpy arrays used when checking for
# MuJoCo warnings on each step.
self._warnings = self.data.warning.number
self._warnings_before = np.empty_like(self._warnings)
self._new_warnings = np.empty(dtype=bool, shape=(len(self._warnings),))
# Forcibly free any previous GL context in order to avoid problems with GL
# implementations that do not support multiple contexts on a given device.
with self._contexts_lock:
if self._contexts:
self._free_rendering_contexts()
# Call kinematics update to enable rendering.
try:
self.after_reset()
except _control.PhysicsError as e:
logging.warning(e)
# Set up named indexing.
axis_indexers = index.make_axis_indexers(self.model)
self._named = NamedIndexStructs(
model=index.struct_indexer(self.model, 'mjmodel', axis_indexers),
data=index.struct_indexer(self.data, 'mjdata', axis_indexers),)
def free(self):
"""Frees the native MuJoCo data structures held by this `Physics` instance.
This is an advanced feature for use when manual memory management is
necessary. This `Physics` object MUST NOT be used after this function has
been called.
"""
with self._contexts_lock:
if self._contexts:
self._free_rendering_contexts()
if hasattr(self, '_data'):
del self._data
@classmethod
def from_model(cls, model):
"""A named constructor from a `wrapper.MjModel` instance."""
data = wrapper.MjData(model)
return cls(data)
@classmethod
def from_xml_string(cls, xml_string, assets=None):
"""A named constructor from a string containing an MJCF XML file.
Args:
xml_string: XML string containing an MJCF model description.
assets: Optional dict containing external assets referenced by the model
(such as additional XML files, textures, meshes etc.), in the form of
`{filename: contents_string}` pairs. The keys should correspond to the
filenames specified in the model XML.
Returns:
A new `Physics` instance.
"""
model = wrapper.MjModel.from_xml_string(xml_string, assets=assets)
return cls.from_model(model)
@classmethod
def from_byte_string(cls, byte_string):
"""A named constructor from a model binary as a byte string."""
model = wrapper.MjModel.from_byte_string(byte_string)
return cls.from_model(model)
@classmethod
def from_xml_path(cls, file_path):
"""A named constructor from a path to an MJCF XML file.
Args:
file_path: String containing path to model definition file.
Returns:
A new `Physics` instance.
"""
model = wrapper.MjModel.from_xml_path(file_path)
return cls.from_model(model)
@classmethod
def from_binary_path(cls, file_path):
"""A named constructor from a path to an MJB model binary file.
Args:
file_path: String containing path to model definition file.
Returns:
A new `Physics` instance.
"""
model = wrapper.MjModel.from_binary_path(file_path)
return cls.from_model(model)
def reload_from_xml_string(self, xml_string, assets=None):
"""Reloads the `Physics` instance from a string containing an MJCF XML file.
After calling this method, the state of the `Physics` instance is the same
as a new `Physics` instance created with the `from_xml_string` named
constructor.
Args:
xml_string: XML string containing an MJCF model description.
assets: Optional dict containing external assets referenced by the model
(such as additional XML files, textures, meshes etc.), in the form of
`{filename: contents_string}` pairs. The keys should correspond to the
filenames specified in the model XML.
"""
new_model = wrapper.MjModel.from_xml_string(xml_string, assets=assets)
self._reload_from_model(new_model)
def reload_from_xml_path(self, file_path):
"""Reloads the `Physics` instance from a path to an MJCF XML file.
After calling this method, the state of the `Physics` instance is the same
as a new `Physics` instance created with the `from_xml_path`
named constructor.
Args:
file_path: String containing path to model definition file.
"""
self._reload_from_model(wrapper.MjModel.from_xml_path(file_path))
@property
def named(self):
return self._named
def _make_rendering_contexts(self):
"""Creates the OpenGL and MuJoCo rendering contexts."""
# Get the offscreen framebuffer size, as specified in the model XML.
max_width = self.model.vis.global_.offwidth
max_height = self.model.vis.global_.offheight
# Create the OpenGL context.
render_context = _render.Renderer(
max_width=max_width, max_height=max_height)
# Create the MuJoCo context.
mujoco_context = wrapper.MjrContext(self.model, render_context)
self._contexts = Contexts(gl=render_context, mujoco=mujoco_context)
def _free_rendering_contexts(self):
"""Frees existing OpenGL and MuJoCo rendering contexts."""
self._contexts.mujoco.free()
self._contexts.gl.free()
self._contexts = None
@property
def contexts(self):
"""Returns a `Contexts` namedtuple, used in `Camera`s and rendering code."""
with self._contexts_lock:
if not self._contexts:
self._make_rendering_contexts()
return self._contexts
@property
def model(self):
return self._data.model
@property
def data(self):
return self._data
def _physics_state_items(self):
"""Returns list of arrays making up internal physics simulation state.
The physics state consists of the state variables, their derivatives and
actuation activations. If the model contains plugins, then the state will
also contain any plugin state.
Returns:
List of NumPy arrays containing full physics simulation state.
"""
if self.model.nplugin > 0:
return [
self.data.qpos,
self.data.qvel,
self.data.act,
self.data.plugin_state,
]
else:
return [self.data.qpos, self.data.qvel, self.data.act]
# Named views of simulation data.
def control(self):
"""Returns a copy of the control signals for the actuators."""
return self.data.ctrl.copy()
def activation(self):
"""Returns a copy of the internal states of actuators.
For details, please refer to
http://www.mujoco.org/book/computation.html#geActuation
Returns:
Activations in a numpy array.
"""
return self.data.act.copy()
def state(self):
"""Returns the full physics state. Alias for `get_physics_state`."""
return np.concatenate(self._physics_state_items())
def position(self):
"""Returns a copy of the generalized positions (system configuration)."""
return self.data.qpos.copy()
def velocity(self):
"""Returns a copy of the generalized velocities."""
return self.data.qvel.copy()
def timestep(self):
"""Returns the simulation timestep."""
return self.model.opt.timestep
def time(self):
"""Returns episode time in seconds."""
return self.data.time
class CameraMatrices(NamedTuple):
"""Component matrices used to construct the camera matrix.
The matrix product over these components yields the camera matrix.
Attributes:
image: (3, 3) image matrix.
focal: (3, 4) focal matrix.
rotation: (4, 4) rotation matrix.
translation: (4, 4) translation matrix.
"""
image: np.ndarray
focal: np.ndarray
rotation: np.ndarray
translation: np.ndarray
class Camera:
"""Mujoco scene camera.
Holds rendering properties such as the width and height of the viewport. The
camera position and rotation is defined by the Mujoco camera corresponding to
the `camera_id`. Multiple `Camera` instances may exist for a single
`camera_id`, for example to render the same view at different resolutions.
"""
def __init__(
self,
physics: Physics,
height: int = 240,
width: int = 320,
camera_id: Union[int, str] = -1,
max_geom: Optional[int] = None,
scene_callback: Optional[Callable[[Physics, mujoco.MjvScene],
None]] = None,
):
"""Initializes a new `Camera`.
Args:
physics: Instance of `Physics`.
height: Optional image height. Defaults to 240.
width: Optional image width. Defaults to 320.
camera_id: Optional camera name or index. Defaults to -1, the free
camera, which is always defined. A nonnegative integer or string
corresponds to a fixed camera, which must be defined in the model XML.
If `camera_id` is a string then the camera must also be named.
max_geom: Optional integer specifying the maximum number of geoms that can
be rendered in the same scene. If None this will be chosen automatically
based on the estimated maximum number of renderable geoms in the model.
scene_callback: Called after the scene has been created and before
it is rendered. Can be used to add more geoms to the scene.
Raises:
ValueError: If `camera_id` is outside the valid range, or if `width` or
`height` exceed the dimensions of MuJoCo's offscreen framebuffer.
"""
buffer_width = physics.model.vis.global_.offwidth
buffer_height = physics.model.vis.global_.offheight
if width > buffer_width:
raise ValueError('Image width {} > framebuffer width {}. Either reduce '
'the image width or specify a larger offscreen '
'framebuffer in the model XML using the clause\n'
'<visual>\n'
' <global offwidth="my_width"/>\n'
'</visual>'.format(width, buffer_width))
if height > buffer_height:
raise ValueError('Image height {} > framebuffer height {}. Either reduce '
'the image height or specify a larger offscreen '
'framebuffer in the model XML using the clause\n'
'<visual>\n'
' <global offheight="my_height"/>\n'
'</visual>'.format(height, buffer_height))
if isinstance(camera_id, str):
camera_id = physics.model.name2id(camera_id, 'camera')
if camera_id < -1:
raise ValueError('camera_id cannot be smaller than -1.')
if camera_id >= physics.model.ncam:
raise ValueError('model has {} fixed cameras. camera_id={} is invalid.'.
format(physics.model.ncam, camera_id))
self._width = width
self._height = height
self._physics = physics
self._scene_callback = scene_callback
# Variables corresponding to structs needed by Mujoco's rendering functions.
self._scene = wrapper.MjvScene(model=physics.model, max_geom=max_geom)
self._scene_option = wrapper.MjvOption()
self._perturb = wrapper.MjvPerturb()
self._perturb.active = 0
self._perturb.select = 0
self._rect = mujoco.MjrRect(0, 0, self._width, self._height)
self._render_camera = wrapper.MjvCamera()
self._render_camera.fixedcamid = camera_id
if camera_id == -1:
self._render_camera.type = mujoco.mjtCamera.mjCAMERA_FREE
mujoco.mjv_defaultFreeCamera(physics.model._model, self._render_camera)
else:
# As defined in the Mujoco documentation, mjCAMERA_FIXED refers to a
# camera explicitly defined in the model.
self._render_camera.type = mujoco.mjtCamera.mjCAMERA_FIXED
# Internal buffers.
self._rgb_buffer = np.empty((self._height, self._width, 3), dtype=np.uint8)
self._depth_buffer = np.empty((self._height, self._width), dtype=np.float32)
if self._physics.contexts.mujoco is not None:
with self._physics.contexts.gl.make_current() as ctx:
ctx.call(mujoco.mjr_setBuffer, mujoco.mjtFramebuffer.mjFB_OFFSCREEN,
self._physics.contexts.mujoco.ptr)
@property
def width(self):
"""Returns the image width (number of pixels)."""
return self._width
@property
def height(self):
"""Returns the image height (number of pixels)."""
return self._height
@property
def option(self):
"""Returns the camera's visualization options."""
return self._scene_option
@property
def scene(self):
"""Returns the `mujoco.MjvScene` instance used by the camera."""
return self._scene
def matrices(self) -> CameraMatrices:
"""Computes the component matrices used to compute the camera matrix.
Returns:
An instance of `CameraMatrices` containing the image, focal, rotation, and
translation matrices of the camera.
"""
camera_id = self._render_camera.fixedcamid
if camera_id == -1:
# If the camera is a 'free' camera, we get its position and orientation
# from the scene data structure. It is a stereo camera, so we average over
# the left and right channels. Note: we call `self.update()` in order to
# ensure that the contents of `scene.camera` are correct.
self.update()
pos = np.mean([camera.pos for camera in self.scene.camera], axis=0)
z = -np.mean([camera.forward for camera in self.scene.camera], axis=0)
y = np.mean([camera.up for camera in self.scene.camera], axis=0)
rot = np.vstack((np.cross(y, z), y, z))
fov = self._physics.model.vis.global_.fovy
else:
pos = self._physics.data.cam_xpos[camera_id]
rot = self._physics.data.cam_xmat[camera_id].reshape(3, 3).T
fov = self._physics.model.cam_fovy[camera_id]
# Translation matrix (4x4).
translation = np.eye(4)
translation[0:3, 3] = -pos
# Rotation matrix (4x4).
rotation = np.eye(4)
rotation[0:3, 0:3] = rot
# Focal transformation matrix (3x4).
focal_scaling = (1./np.tan(np.deg2rad(fov)/2)) * self.height / 2.0
focal = np.diag([-focal_scaling, focal_scaling, 1.0, 0])[0:3, :]
# Image matrix (3x3).
image = np.eye(3)
image[0, 2] = (self.width - 1) / 2.0
image[1, 2] = (self.height - 1) / 2.0
return CameraMatrices(
image=image, focal=focal, rotation=rotation, translation=translation)
@property
def matrix(self):
"""Returns the 3x4 camera matrix.
For a description of the camera matrix see, e.g.,
https://en.wikipedia.org/wiki/Camera_matrix.
For a usage example, see the associated test.
"""
image, focal, rotation, translation = self.matrices()
return image @ focal @ rotation @ translation
def update(self, scene_option=None):
"""Updates geometry used for rendering.
Args:
scene_option: A custom `wrapper.MjvOption` instance to use to render
the scene instead of the default. If None, will use the default.
"""
scene_option = scene_option or self._scene_option
mujoco.mjv_updateScene(self._physics.model.ptr, self._physics.data.ptr,
scene_option.ptr, self._perturb.ptr,
self._render_camera.ptr, mujoco.mjtCatBit.mjCAT_ALL,
self._scene.ptr)
def _render_on_gl_thread(self, depth, overlays):
"""Performs only those rendering calls that require an OpenGL context."""
# Render the scene.
mujoco.mjr_render(self._rect, self._scene.ptr,
self._physics.contexts.mujoco.ptr)
if not depth:
# If rendering RGB, draw any text overlays on top of the image.
for overlay in overlays:
overlay.draw(self._physics.contexts.mujoco.ptr, self._rect)
# Read the contents of either the RGB or depth buffer.
mujoco.mjr_readPixels(self._rgb_buffer if not depth else None,
self._depth_buffer if depth else None, self._rect,
self._physics.contexts.mujoco.ptr)
def render(
self,
overlays=(),
depth=False,
segmentation=False,
scene_option=None,
render_flag_overrides=None,
):
"""Renders the camera view as a numpy array of pixel values.
Args:
overlays: An optional sequence of `TextOverlay` instances to draw. Only
supported if `depth` and `segmentation` are both False.
depth: An optional boolean. If True, makes the camera return depth
measurements. Cannot be enabled if `segmentation` is True.
segmentation: An optional boolean. If True, make the camera return a
pixel-wise segmentation of the scene. Cannot be enabled if `depth` is
True.
scene_option: A custom `wrapper.MjvOption` instance to use to render
the scene instead of the default. If None, will use the default.
render_flag_overrides: Optional mapping containing rendering flags to
override. The keys can be either lowercase strings or `mjtRndFlag` enum
values, and the values are the overridden flag values, e.g.
`{'wireframe': True}` or `{mujoco.mjtRndFlag.mjRND_WIREFRAME: True}`.
See `mujoco.mjtRndFlag` for the set of valid flags. Must be empty if
either `depth` or `segmentation` is True.
Returns:
The rendered scene.
* If `depth` and `segmentation` are both False (default), this is a
(height, width, 3) uint8 numpy array containing RGB values.
* If `depth` is True, this is a (height, width) float32 numpy array
containing depth values (in meters).
* If `segmentation` is True, this is a (height, width, 2) int32 numpy
array where the first channel contains the integer ID of the object at
each pixel, and the second channel contains the corresponding object
type (a value in the `mjtObj` enum). Background pixels are labeled
(-1, -1).
Raises:
ValueError: If either `overlays` or `render_flag_overrides` is requested
when `depth` or `segmentation` rendering is enabled.
ValueError: If both depth and segmentation flags are set together.
"""
if overlays and (depth or segmentation):
raise ValueError(_OVERLAYS_NOT_SUPPORTED_FOR_DEPTH_OR_SEGMENTATION)
if render_flag_overrides and (depth or segmentation):
raise ValueError(
_RENDER_FLAG_OVERRIDES_NOT_SUPPORTED_FOR_DEPTH_OR_SEGMENTATION)
if depth and segmentation:
raise ValueError(_BOTH_SEGMENTATION_AND_DEPTH_ENABLED)
if render_flag_overrides is None:
render_flag_overrides = {}
# Update scene geometry.
self.update(scene_option=scene_option)
if self._scene_callback:
self._scene_callback(self._physics, self._scene)
# Enable flags to compute segmentation labels
if segmentation:
render_flag_overrides.update({
mujoco.mjtRndFlag.mjRND_SEGMENT: True,
mujoco.mjtRndFlag.mjRND_IDCOLOR: True,
})
# Render scene and text overlays, read contents of RGB or depth buffer.
with self.scene.override_flags(render_flag_overrides):
with self._physics.contexts.gl.make_current() as ctx:
ctx.call(self._render_on_gl_thread, depth=depth, overlays=overlays)
if depth:
# Get the distances to the near and far clipping planes.
extent = self._physics.model.stat.extent
near = self._physics.model.vis.map.znear * extent
far = self._physics.model.vis.map.zfar * extent
# Convert from [0 1] to depth in meters, see links below:
# http://stackoverflow.com/a/6657284/1461210
# https://www.khronos.org/opengl/wiki/Depth_Buffer_Precision
image = near / (1 - self._depth_buffer * (1 - near / far))
elif segmentation:
# Convert 3-channel uint8 to 1-channel uint32.
image3 = self._rgb_buffer.astype(np.uint32)
segimage = (image3[:, :, 0] +
image3[:, :, 1] * (2**8) +
image3[:, :, 2] * (2**16))
# Remap segid to 2-channel (object ID, object type) pair.
# Seg ID 0 is background -- will be remapped to (-1, -1).
segid2output = np.full((self._scene.ngeom + 1, 2), fill_value=-1,
dtype=np.int32) # Seg id cannot be > ngeom + 1.
visible_geoms = [g for g in self._scene.geoms if g.segid != -1]
visible_segids = np.array([g.segid + 1 for g in visible_geoms], np.int32)
visible_objid = np.array([g.objid for g in visible_geoms], np.int32)
visible_objtype = np.array([g.objtype for g in visible_geoms], np.int32)
segid2output[visible_segids, 0] = visible_objid
segid2output[visible_segids, 1] = visible_objtype
image = segid2output[segimage]
else:
image = self._rgb_buffer
# The first row in the buffer is the bottom row of pixels in the image.
return self._physics.contexts.gl.to_pixels(image)
def select(self, cursor_position):
"""Returns bodies and geoms visible at given coordinates in the frame.
Args:
cursor_position: A `tuple` containing x and y coordinates, normalized to
between 0 and 1, and where (0, 0) is bottom-left.
Returns:
A `Selected` namedtuple. Fields are None if nothing is selected.
"""
self.update()
aspect_ratio = self._width / self._height
cursor_x, cursor_y = cursor_position
pos = np.empty(3, np.double)
geom_id_arr = np.intc([-1])
flex_id_arr = np.intc([-1])
skin_id_arr = np.intc([-1])
body_id = mujoco.mjv_select(self._physics.model.ptr, self._physics.data.ptr,
self._scene_option.ptr, aspect_ratio, cursor_x,
cursor_y, self._scene.ptr, pos, geom_id_arr,
flex_id_arr, skin_id_arr)
[geom_id] = geom_id_arr
[flex_id] = flex_id_arr
[skin_id] = skin_id_arr
# Validate IDs
if body_id != -1:
assert 0 <= body_id < self._physics.model.nbody
else:
body_id = None
if geom_id != -1:
assert 0 <= geom_id < self._physics.model.ngeom
else:
geom_id = None
if flex_id != -1:
assert 0 <= flex_id < self._physics.model.nflex
else:
flex_id = None
if skin_id != -1:
assert 0 <= skin_id < self._physics.model.nskin
else:
skin_id = None
if all(id_ is None for id_ in (body_id, geom_id, skin_id)):
pos = None
return Selected(
body=body_id,
geom=geom_id,
flex=flex_id,
skin=skin_id,
world_position=pos,
)