# Copyright 2017 The dm_control Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Mujoco `Physics` implementation and helper classes.
The `Physics` class provides the main Python interface to MuJoCo.
MuJoCo models are defined using the MJCF XML format. The `Physics` class
can load a model from a path to an XML file, an XML string, or from a serialized
MJB binary format. See the named constructors for each of these cases.
Each `Physics` instance defines a simulated world. To step forward the
simulation, use the `step` method. To set a control or actuation signal, use the
`set_control` method, which will apply the provided signal to the actuators in
subsequent calls to `step`.
Use the `Camera` class to create RGB or depth images. A `Camera` can render its
viewport to an array using the `render` method, and can query for objects
visible at specific positions using the `select` method. The `Physics` class
also provides a `render` method that returns a pixel array directly.
"""
import collections
import contextlib
import threading
from typing import Callable, NamedTuple, Optional, Union
from absl import logging
from dm_control import _render
from dm_control.mujoco import index
from dm_control.mujoco import wrapper
from dm_control.mujoco.wrapper import util
from dm_control.rl import control as _control
from dm_env import specs
import mujoco
import numpy as np
_FONT_STYLES = {
'normal': mujoco.mjtFont.mjFONT_NORMAL,
'shadow': mujoco.mjtFont.mjFONT_SHADOW,
'big': mujoco.mjtFont.mjFONT_BIG,
}
_GRID_POSITIONS = {
'top left': mujoco.mjtGridPos.mjGRID_TOPLEFT,
'top right': mujoco.mjtGridPos.mjGRID_TOPRIGHT,
'bottom left': mujoco.mjtGridPos.mjGRID_BOTTOMLEFT,
'bottom right': mujoco.mjtGridPos.mjGRID_BOTTOMRIGHT,
}
Contexts = collections.namedtuple('Contexts', ['gl', 'mujoco'])
Selected = collections.namedtuple(
'Selected', ['body', 'geom', 'flex', 'skin', 'world_position'])
NamedIndexStructs = collections.namedtuple(
'NamedIndexStructs', ['model', 'data'])
Pose = collections.namedtuple(
'Pose', ['lookat', 'distance', 'azimuth', 'elevation'])
_BOTH_SEGMENTATION_AND_DEPTH_ENABLED = (
'`segmentation` and `depth` cannot both be `True`.')
_INVALID_PHYSICS_STATE = (
'Physics state is invalid. Warning(s) raised: {warning_names}')
_OVERLAYS_NOT_SUPPORTED_FOR_DEPTH_OR_SEGMENTATION = (
'Overlays are not supported with depth or segmentation rendering.')
_RENDER_FLAG_OVERRIDES_NOT_SUPPORTED_FOR_DEPTH_OR_SEGMENTATION = (
'`render_flag_overrides` are not supported for depth or segmentation '
'rendering.')
_KEYFRAME_ID_OUT_OF_RANGE = (
'`keyframe_id` must be between 0 and {max_valid} inclusive, got: {actual}.')
class Physics(_control.Physics):
"""Encapsulates a MuJoCo model.
A MuJoCo model is typically defined by an MJCF XML file [0]
```python
physics = Physics.from_xml_path('/path/to/model.xml')
with physics.reset_context():
physics.named.data.qpos['hinge'] = np.random.rand()
# Apply controls and advance the simulation state.
physics.set_control(np.random.random_sample(size=N_ACTUATORS))
physics.step()
# Render a camera defined in the XML file to a NumPy array.
rgb = physics.render(height=240, width=320, id=0)
```
[0] http://www.mujoco.org/book/modeling.html
"""
_contexts = None
def __new__(cls, *args, **kwargs):
# TODO(b/174603485): Re-enable once lint stops spuriously firing here.
obj = super(Physics, cls).__new__(cls) # pylint: disable=no-value-for-parameter
# The lock is created in `__new__` rather than `__init__` because there are
# a number of existing subclasses that override `__init__` without calling
# the `__init__` method of the superclass.
obj._contexts_lock = threading.Lock() # pylint: disable=protected-access
return obj
def __init__(self, data):
"""Initializes a new `Physics` instance.
Args:
data: Instance of `wrapper.MjData`.
"""
self._warnings_cause_exception = True
self._reload_from_data(data)
@contextlib.contextmanager
def suppress_physics_errors(self):
"""Physics warnings will be logged rather than raise exceptions."""
prev_state = self._warnings_cause_exception
self._warnings_cause_exception = False
try:
yield
finally:
self._warnings_cause_exception = prev_state
def enable_profiling(self):
"""Enables Mujoco timing profiling."""
wrapper.enable_timer(True)
def set_control(self, control):
"""Sets the control signal for the actuators.
Args:
control: NumPy array or array-like actuation values.
"""
np.copyto(self.data.ctrl, control)
def _step_with_up_to_date_position_velocity(self, nstep: int = 1) -> None:
"""Physics step with up-to-date position and velocity dependent fields."""
# In the case of Euler integration we assume mj_step1 has already been
# called for this state, finish the step with mj_step2 and then update all
# position and velocity related fields with mj_step1. This ensures that
# (most of) mjData is in sync with qpos and qvel. In the case of non-Euler
# integrators (e.g. RK4) an additional mj_step1 must be called after the
# last mj_step to ensure mjData syncing.
if self.model.opt.integrator != mujoco.mjtIntegrator.mjINT_RK4.value:
mujoco.mj_step2(self.model.ptr, self.data.ptr)
if nstep > 1:
mujoco.mj_step(self.model.ptr, self.data.ptr, nstep-1)
else:
mujoco.mj_step(self.model.ptr, self.data.ptr, nstep)
mujoco.mj_step1(self.model.ptr, self.data.ptr)
def step(self, nstep: int = 1) -> None:
"""Advances the physics state by `nstep`s.
Args:
nstep: Optional integer, number of steps to take.
The actuation can be updated by calling the `set_control` function first.
"""
with self.check_invalid_state():
if self.legacy_step:
self._step_with_up_to_date_position_velocity(nstep)
else:
mujoco.mj_step(self.model.ptr, self.data.ptr, nstep)
def render(
self,
height=240,
width=320,
camera_id=-1,
overlays=(),
depth=False,
segmentation=False,
scene_option=None,
render_flag_overrides=None,
scene_callback: Optional[Callable[['Physics', mujoco.MjvScene],
None]] = None,
):
"""Returns a camera view as a NumPy array of pixel values.
Args:
height: Viewport height (number of pixels). Optional, defaults to 240.
width: Viewport width (number of pixels). Optional, defaults to 320.
camera_id: Optional camera name or index. Defaults to -1, the free
camera, which is always defined. A nonnegative integer or string
corresponds to a fixed camera, which must be defined in the model XML.
If `camera_id` is a string then the camera must also be named.
overlays: An optional sequence of `TextOverlay` instances to draw. Only
supported if `depth` is False.
depth: If `True`, this method returns a NumPy float array of depth values
(in meters). Defaults to `False`, which results in an RGB image.
segmentation: If `True`, this method returns a 2-channel NumPy int32 array
of label values where the pixels of each object are labeled with the
pair (mjModel ID, mjtObj enum object type). Background pixels are
labeled (-1, -1). Defaults to `False`, which returns an RGB image.
scene_option: An optional `wrapper.MjvOption` instance that can be used to
render the scene with custom visualization options. If None then the
default options will be used.
render_flag_overrides: Optional mapping specifying rendering flags to
override. The keys can be either lowercase strings or `mjtRndFlag` enum
values, and the values are the overridden flag values, e.g.
`{'wireframe': True}` or `{mujoco.mjtRndFlag.mjRND_WIREFRAME: True}`.
See `mujoco.mjtRndFlag` for the set of valid flags. Must be None if
either `depth` or `segmentation` is True.
scene_callback: Called after the scene has been created and before
it is rendered. Can be used to add more geoms to the scene.
Returns:
The rendered RGB, depth or segmentation image.
"""
camera = Camera(
physics=self,
height=height,
width=width,
camera_id=camera_id,
scene_callback=scene_callback)
image = camera.render(
overlays=overlays, depth=depth, segmentation=segmentation,
scene_option=scene_option, render_flag_overrides=render_flag_overrides)
camera._scene.free() # pylint: disable=protected-access
return image
def get_state(self, sig=None):
"""Returns the physics state.
Args:
sig: Optional integer, if specified then the returned array corresponds to
the state obtained by calling `mj_getState` with `sig`.
Returns:
NumPy array containing full physics simulation state.
"""
if sig is None:
return np.concatenate(self._physics_state_items())
else:
retval = np.empty(mujoco.mj_stateSize(self.model.ptr, sig), np.float64)
mujoco.mj_getState(self.model.ptr, self.data.ptr, retval, sig)
return retval
def set_state(self, physics_state, sig=None):
"""Sets the physics state.
Args:
physics_state: NumPy array containing the full physics simulation state.
sig: Optional integer, if specified then physics_state is passed directly
to `mj_setState` with `sig`.
Raises:
ValueError: If `physics_state` has invalid size.
"""
if sig is None:
state_items = self._physics_state_items()
expected_shape = (sum(item.size for item in state_items),)
if expected_shape != physics_state.shape:
raise ValueError(
'Input physics state has shape {}. Expected {}.'.format(
physics_state.shape, expected_shape
)
)
start = 0
for state_item in state_items:
size = state_item.size
np.copyto(state_item, physics_state[start:start + size])
start += size
else:
mujoco.mj_setState(
self.model.ptr,
self.data.ptr,
np.asarray(physics_state, np.float64),
sig,
)
def copy(self, share_model=False):
"""Creates a copy of this `Physics` instance.
Args:
share_model: If True, the copy and the original will share a common
MjModel instance. By default, both model and data will both be copied.
Returns:
A `Physics` instance.
"""
new_data = self.data._make_copy(share_model=share_model) # pylint: disable=protected-access
cls = self.__class__
new_obj = cls.__new__(cls)
# pylint: disable=protected-access
new_obj._warnings_cause_exception = True
new_obj._reload_from_data(new_data)
# pylint: enable=protected-access
return new_obj
def reset(self, keyframe_id=None):
"""Resets internal variables of the simulation, possibly to a keyframe.
Args:
keyframe_id: Optional integer specifying the index of a keyframe defined
in the model XML to which the simulation state should be initialized.
Must be between 0 and `self.model.nkey - 1` (inclusive).
Raises:
ValueError: If `keyframe_id` is out of range.
"""
if keyframe_id is None:
mujoco.mj_resetData(self.model.ptr, self.data.ptr)
else:
if not 0 <= keyframe_id < self.model.nkey:
raise ValueError(_KEYFRAME_ID_OUT_OF_RANGE.format(
max_valid=self.model.nkey-1, actual=keyframe_id))
mujoco.mj_resetDataKeyframe(self.model.ptr, self.data.ptr, keyframe_id)
# Disable actuation since we don't yet have meaningful control inputs.
with self.model.disable('actuation'):
self.forward()
def after_reset(self):
"""Runs after resetting internal variables of the physics simulation."""
# Disable actuation since we don't yet have meaningful control inputs.
with self.model.disable('actuation'):
self.forward()
def forward(self):
"""Recomputes the forward dynamics without advancing the simulation."""
# Note: `mj_forward` differs from `mj_step1` in that it also recomputes
# quantities that depend on acceleration (and therefore on the state of the
# controls). For example `mj_forward` updates accelerometer and gyro
# readings, whereas `mj_step1` does not.
# http://www.mujoco.org/book/programming.html#siForward
with self.check_invalid_state():
mujoco.mj_forward(self.model.ptr, self.data.ptr)
@contextlib.contextmanager
def check_invalid_state(self):
"""Checks whether the physics state is invalid at exit.
Yields:
None
Raises:
PhysicsError: if the simulation state is invalid at exit, unless this
context is nested inside a `suppress_physics_errors` context, in which
case a warning will be logged instead.
"""
np.copyto(self._warnings_before, self._warnings)
yield
np.greater(self._warnings, self._warnings_before, out=self._new_warnings)
if any(self._new_warnings):
warning_names = np.compress(self._new_warnings,
list(mujoco.mjtWarning.__members__))
message = _INVALID_PHYSICS_STATE.format(
warning_names=', '.join(warning_names))
if self._warnings_cause_exception:
raise _control.PhysicsError(message)
else:
logging.warn(message)
def __getstate__(self):
return self.data # All state is assumed to reside within `self.data`.
def __setstate__(self, data):
# Note: `_contexts_lock` is normally created in `__new__`, but `__new__` is
# not invoked during unpickling.
self._contexts_lock = threading.Lock()
self._warnings_cause_exception = True
self._reload_from_data(data)
def _reload_from_model(self, model):
"""Initializes a new or existing `Physics` from a `wrapper.MjModel`.
Creates a new `wrapper.MjData` instance, then delegates to
`_reload_from_data`.
Args:
model: Instance of `wrapper.MjModel`.
"""
data = wrapper.MjData(model)
self._reload_from_data(data)
def _reload_from_data(self, data):
"""Initializes a new or existing `Physics` instance from a `wrapper.MjData`.
Assigns all attributes, sets up named indexing, and creates rendering
contexts if rendering is enabled.
The default constructor as well as the other `reload_from` methods should
delegate to this method.
Args:
data: Instance of `wrapper.MjData`.
"""
if not isinstance(data, wrapper.MjData):
raise TypeError(f'Expected wrapper.MjData. Got: {type(data)}.')
self._data = data
# Performance optimization: pre-allocate numpy arrays used when checking for
# MuJoCo warnings on each step.
self._warnings = self.data.warning.number
self._warnings_before = np.empty_like(self._warnings)
self._new_warnings = np.empty(dtype=bool, shape=(len(self._warnings),))
# Forcibly free any previous GL context in order to avoid problems with GL
# implementations that do not support multiple contexts on a given device.
with self._contexts_lock:
if self._contexts:
self._free_rendering_contexts()
# Call kinematics update to enable rendering.
try:
self.after_reset()
except _control.PhysicsError as e:
logging.warning(e)
# Set up named indexing.
axis_indexers = index.make_axis_indexers(self.model)
self._named = NamedIndexStructs(
model=index.struct_indexer(self.model, 'mjmodel', axis_indexers),
data=index.struct_indexer(self.data, 'mjdata', axis_indexers),)
def free(self):
"""Frees the native MuJoCo data structures held by this `Physics` instance.
This is an advanced feature for use when manual memory management is
necessary. This `Physics` object MUST NOT be used after this function has
been called.
"""
with self._contexts_lock:
if self._contexts:
self._free_rendering_contexts()
if hasattr(self, '_data'):
del self._data
@classmethod
def from_model(cls, model):
"""A named constructor from a `wrapper.MjModel` instance."""
data = wrapper.MjData(model)
return cls(data)
@classmethod
def from_xml_string(cls, xml_string, assets=None):
"""A named constructor from a string containing an MJCF XML file.
Args:
xml_string: XML string containing an MJCF model description.
assets: Optional dict containing external assets referenced by the model
(such as additional XML files, textures, meshes etc.), in the form of
`{filename: contents_string}` pairs. The keys should correspond to the
filenames specified in the model XML.
Returns:
A new `Physics` instance.
"""
model = wrapper.MjModel.from_xml_string(xml_string, assets=assets)
return cls.from_model(model)
@classmethod
def from_byte_string(cls, byte_string):
"""A named constructor from a model binary as a byte string."""
model = wrapper.MjModel.from_byte_string(byte_string)
return cls.from_model(model)
@classmethod
def from_xml_path(cls, file_path):
"""A named constructor from a path to an MJCF XML file.
Args:
file_path: String containing path to model definition file.
Returns:
A new `Physics` instance.
"""
model = wrapper.MjModel.from_xml_path(file_path)
return cls.from_model(model)
@classmethod
def from_binary_path(cls, file_path):
"""A named constructor from a path to an MJB model binary file.
Args:
file_path: String containing path to model definition file.
Returns:
A new `Physics` instance.
"""
model = wrapper.MjModel.from_binary_path(file_path)
return cls.from_model(model)
def reload_from_xml_string(self, xml_string, assets=None):
"""Reloads the `Physics` instance from a string containing an MJCF XML file.
After calling this method, the state of the `Physics` instance is the same
as a new `Physics` instance created with the `from_xml_string` named
constructor.
Args:
xml_string: XML string containing an MJCF model description.
assets: Optional dict containing external assets referenced by the model
(such as additional XML files, textures, meshes etc.), in the form of
`{filename: contents_string}` pairs. The keys should correspond to the
filenames specified in the model XML.
"""
new_model = wrapper.MjModel.from_xml_string(xml_string, assets=assets)
self._reload_from_model(new_model)
def reload_from_xml_path(self, file_path):
"""Reloads the `Physics` instance from a path to an MJCF XML file.
After calling this method, the state of the `Physics` instance is the same
as a new `Physics` instance created with the `from_xml_path`
named constructor.
Args:
file_path: String containing path to model definition file.
"""
self._reload_from_model(wrapper.MjModel.from_xml_path(file_path))
@property
def named(self):
return self._named
def _make_rendering_contexts(self):
"""Creates the OpenGL and MuJoCo rendering contexts."""
# Get the offscreen framebuffer size, as specified in the model XML.
max_width = self.model.vis.global_.offwidth
max_height = self.model.vis.global_.offheight
# Create the OpenGL context.
render_context = _render.Renderer(
max_width=max_width, max_height=max_height)
# Create the MuJoCo context.
mujoco_context = wrapper.MjrContext(self.model, render_context)
self._contexts = Contexts(gl=render_context, mujoco=mujoco_context)
def _free_rendering_contexts(self):
"""Frees existing OpenGL and MuJoCo rendering contexts."""
self._contexts.mujoco.free()
self._contexts.gl.free()
self._contexts = None
@property
def contexts(self):
"""Returns a `Contexts` namedtuple, used in `Camera`s and rendering code."""
with self._contexts_lock:
if not self._contexts:
self._make_rendering_contexts()
return self._contexts
@property
def model(self):
return self._data.model
@property
def data(self):
return self._data
def _physics_state_items(self):
"""Returns list of arrays making up internal physics simulation state.
The physics state consists of the state variables, their derivatives and
actuation activations. If the model contains plugins, then the state will
also contain any plugin state.
Returns:
List of NumPy arrays containing full physics simulation state.
"""
if self.model.nplugin > 0:
return [
self.data.qpos,
self.data.qvel,
self.data.act,
self.data.plugin_state,
]
else:
return [self.data.qpos, self.data.qvel, self.data.act]
# Named views of simulation data.
def control(self):
"""Returns a copy of the control signals for the actuators."""
return self.data.ctrl.copy()
def activation(self):
"""Returns a copy of the internal states of actuators.
For details, please refer to
http://www.mujoco.org/book/computation.html#geActuation
Returns:
Activations in a numpy array.
"""
return self.data.act.copy()
def state(self):
"""Returns the full physics state. Alias for `get_physics_state`."""
return np.concatenate(self._physics_state_items())
def position(self):
"""Returns a copy of the generalized positions (system configuration)."""
return self.data.qpos.copy()
def velocity(self):
"""Returns a copy of the generalized velocities."""
return self.data.qvel.copy()
def timestep(self):
"""Returns the simulation timestep."""
return self.model.opt.timestep
def time(self):
"""Returns episode time in seconds."""
return self.data.time
class CameraMatrices(NamedTuple):
"""Component matrices used to construct the camera matrix.
The matrix product over these components yields the camera matrix.
Attributes:
image: (3, 3) image matrix.
focal: (3, 4) focal matrix.
rotation: (4, 4) rotation matrix.
translation: (4, 4) translation matrix.
"""
image: np.ndarray
focal: np.ndarray
rotation: np.ndarray
translation: np.ndarray
class Camera:
"""Mujoco scene camera.
Holds rendering properties such as the width and height of the viewport. The
camera position and rotation is defined by the Mujoco camera corresponding to
the `camera_id`. Multiple `Camera` instances may exist for a single
`camera_id`, for example to render the same view at different resolutions.
"""
def __init__(
self,
physics: Physics,
height: int = 240,
width: int = 320,
camera_id: Union[int, str] = -1,
max_geom: Optional[int] = None,
scene_callback: Optional[Callable[[Physics, mujoco.MjvScene],
None]] = None,
):
"""Initializes a new `Camera`.
Args:
physics: Instance of `Physics`.
height: Optional image height. Defaults to 240.
width: Optional image width. Defaults to 320.
camera_id: Optional camera name or index. Defaults to -1, the free
camera, which is always defined. A nonnegative integer or string
corresponds to a fixed camera, which must be defined in the model XML.
If `camera_id` is a string then the camera must also be named.
max_geom: Optional integer specifying the maximum number of geoms that can
be rendered in the same scene. If None this will be chosen automatically
based on the estimated maximum number of renderable geoms in the model.
scene_callback: Called after the scene has been created and before
it is rendered. Can be used to add more geoms to the scene.
Raises:
ValueError: If `camera_id` is outside the valid range, or if `width` or
`height` exceed the dimensions of MuJoCo's offscreen framebuffer.
"""
buffer_width = physics.model.vis.global_.offwidth
buffer_height = physics.model.vis.global_.offheight
if width > buffer_width:
raise ValueError('Image width {} > framebuffer width {}. Either reduce '
'the image width or specify a larger offscreen '
'framebuffer in the model XML using the clause\n'
'\n'
' \n'
''.format(width, buffer_width))
if height > buffer_height:
raise ValueError('Image height {} > framebuffer height {}. Either reduce '
'the image height or specify a larger offscreen '
'framebuffer in the model XML using the clause\n'
'\n'
' \n'
''.format(height, buffer_height))
if isinstance(camera_id, str):
camera_id = physics.model.name2id(camera_id, 'camera')
if camera_id < -1:
raise ValueError('camera_id cannot be smaller than -1.')
if camera_id >= physics.model.ncam:
raise ValueError('model has {} fixed cameras. camera_id={} is invalid.'.
format(physics.model.ncam, camera_id))
self._width = width
self._height = height
self._physics = physics
self._scene_callback = scene_callback
# Variables corresponding to structs needed by Mujoco's rendering functions.
self._scene = wrapper.MjvScene(model=physics.model, max_geom=max_geom)
self._scene_option = wrapper.MjvOption()
self._perturb = wrapper.MjvPerturb()
self._perturb.active = 0
self._perturb.select = 0
self._rect = mujoco.MjrRect(0, 0, self._width, self._height)
self._render_camera = wrapper.MjvCamera()
self._render_camera.fixedcamid = camera_id
if camera_id == -1:
self._render_camera.type = mujoco.mjtCamera.mjCAMERA_FREE
mujoco.mjv_defaultFreeCamera(physics.model._model, self._render_camera)
else:
# As defined in the Mujoco documentation, mjCAMERA_FIXED refers to a
# camera explicitly defined in the model.
self._render_camera.type = mujoco.mjtCamera.mjCAMERA_FIXED
# Internal buffers.
self._rgb_buffer = np.empty((self._height, self._width, 3), dtype=np.uint8)
self._depth_buffer = np.empty((self._height, self._width), dtype=np.float32)
if self._physics.contexts.mujoco is not None:
with self._physics.contexts.gl.make_current() as ctx:
ctx.call(mujoco.mjr_setBuffer, mujoco.mjtFramebuffer.mjFB_OFFSCREEN,
self._physics.contexts.mujoco.ptr)
@property
def width(self):
"""Returns the image width (number of pixels)."""
return self._width
@property
def height(self):
"""Returns the image height (number of pixels)."""
return self._height
@property
def option(self):
"""Returns the camera's visualization options."""
return self._scene_option
@property
def scene(self):
"""Returns the `mujoco.MjvScene` instance used by the camera."""
return self._scene
def matrices(self) -> CameraMatrices:
"""Computes the component matrices used to compute the camera matrix.
Returns:
An instance of `CameraMatrices` containing the image, focal, rotation, and
translation matrices of the camera.
"""
camera_id = self._render_camera.fixedcamid
if camera_id == -1:
# If the camera is a 'free' camera, we get its position and orientation
# from the scene data structure. It is a stereo camera, so we average over
# the left and right channels. Note: we call `self.update()` in order to
# ensure that the contents of `scene.camera` are correct.
self.update()
pos = np.mean([camera.pos for camera in self.scene.camera], axis=0)
z = -np.mean([camera.forward for camera in self.scene.camera], axis=0)
y = np.mean([camera.up for camera in self.scene.camera], axis=0)
rot = np.vstack((np.cross(y, z), y, z))
fov = self._physics.model.vis.global_.fovy
else:
pos = self._physics.data.cam_xpos[camera_id]
rot = self._physics.data.cam_xmat[camera_id].reshape(3, 3).T
fov = self._physics.model.cam_fovy[camera_id]
# Translation matrix (4x4).
translation = np.eye(4)
translation[0:3, 3] = -pos
# Rotation matrix (4x4).
rotation = np.eye(4)
rotation[0:3, 0:3] = rot
# Focal transformation matrix (3x4).
focal_scaling = (1./np.tan(np.deg2rad(fov)/2)) * self.height / 2.0
focal = np.diag([-focal_scaling, focal_scaling, 1.0, 0])[0:3, :]
# Image matrix (3x3).
image = np.eye(3)
image[0, 2] = (self.width - 1) / 2.0
image[1, 2] = (self.height - 1) / 2.0
return CameraMatrices(
image=image, focal=focal, rotation=rotation, translation=translation)
@property
def matrix(self):
"""Returns the 3x4 camera matrix.
For a description of the camera matrix see, e.g.,
https://en.wikipedia.org/wiki/Camera_matrix.
For a usage example, see the associated test.
"""
image, focal, rotation, translation = self.matrices()
return image @ focal @ rotation @ translation
def update(self, scene_option=None):
"""Updates geometry used for rendering.
Args:
scene_option: A custom `wrapper.MjvOption` instance to use to render
the scene instead of the default. If None, will use the default.
"""
scene_option = scene_option or self._scene_option
mujoco.mjv_updateScene(self._physics.model.ptr, self._physics.data.ptr,
scene_option.ptr, self._perturb.ptr,
self._render_camera.ptr, mujoco.mjtCatBit.mjCAT_ALL,
self._scene.ptr)
def _render_on_gl_thread(self, depth, overlays):
"""Performs only those rendering calls that require an OpenGL context."""
# Render the scene.
mujoco.mjr_render(self._rect, self._scene.ptr,
self._physics.contexts.mujoco.ptr)
if not depth:
# If rendering RGB, draw any text overlays on top of the image.
for overlay in overlays:
overlay.draw(self._physics.contexts.mujoco.ptr, self._rect)
# Read the contents of either the RGB or depth buffer.
mujoco.mjr_readPixels(self._rgb_buffer if not depth else None,
self._depth_buffer if depth else None, self._rect,
self._physics.contexts.mujoco.ptr)
def render(
self,
overlays=(),
depth=False,
segmentation=False,
scene_option=None,
render_flag_overrides=None,
):
"""Renders the camera view as a numpy array of pixel values.
Args:
overlays: An optional sequence of `TextOverlay` instances to draw. Only
supported if `depth` and `segmentation` are both False.
depth: An optional boolean. If True, makes the camera return depth
measurements. Cannot be enabled if `segmentation` is True.
segmentation: An optional boolean. If True, make the camera return a
pixel-wise segmentation of the scene. Cannot be enabled if `depth` is
True.
scene_option: A custom `wrapper.MjvOption` instance to use to render
the scene instead of the default. If None, will use the default.
render_flag_overrides: Optional mapping containing rendering flags to
override. The keys can be either lowercase strings or `mjtRndFlag` enum
values, and the values are the overridden flag values, e.g.
`{'wireframe': True}` or `{mujoco.mjtRndFlag.mjRND_WIREFRAME: True}`.
See `mujoco.mjtRndFlag` for the set of valid flags. Must be empty if
either `depth` or `segmentation` is True.
Returns:
The rendered scene.
* If `depth` and `segmentation` are both False (default), this is a
(height, width, 3) uint8 numpy array containing RGB values.
* If `depth` is True, this is a (height, width) float32 numpy array
containing depth values (in meters).
* If `segmentation` is True, this is a (height, width, 2) int32 numpy
array where the first channel contains the integer ID of the object at
each pixel, and the second channel contains the corresponding object
type (a value in the `mjtObj` enum). Background pixels are labeled
(-1, -1).
Raises:
ValueError: If either `overlays` or `render_flag_overrides` is requested
when `depth` or `segmentation` rendering is enabled.
ValueError: If both depth and segmentation flags are set together.
"""
if overlays and (depth or segmentation):
raise ValueError(_OVERLAYS_NOT_SUPPORTED_FOR_DEPTH_OR_SEGMENTATION)
if render_flag_overrides and (depth or segmentation):
raise ValueError(
_RENDER_FLAG_OVERRIDES_NOT_SUPPORTED_FOR_DEPTH_OR_SEGMENTATION)
if depth and segmentation:
raise ValueError(_BOTH_SEGMENTATION_AND_DEPTH_ENABLED)
if render_flag_overrides is None:
render_flag_overrides = {}
# Update scene geometry.
self.update(scene_option=scene_option)
if self._scene_callback:
self._scene_callback(self._physics, self._scene)
# Enable flags to compute segmentation labels
if segmentation:
render_flag_overrides.update({
mujoco.mjtRndFlag.mjRND_SEGMENT: True,
mujoco.mjtRndFlag.mjRND_IDCOLOR: True,
})
# Render scene and text overlays, read contents of RGB or depth buffer.
with self.scene.override_flags(render_flag_overrides):
with self._physics.contexts.gl.make_current() as ctx:
ctx.call(self._render_on_gl_thread, depth=depth, overlays=overlays)
if depth:
# Get the distances to the near and far clipping planes.
extent = self._physics.model.stat.extent
near = self._physics.model.vis.map.znear * extent
far = self._physics.model.vis.map.zfar * extent
# Convert from [0 1] to depth in meters, see links below:
# http://stackoverflow.com/a/6657284/1461210
# https://www.khronos.org/opengl/wiki/Depth_Buffer_Precision
image = near / (1 - self._depth_buffer * (1 - near / far))
elif segmentation:
# Convert 3-channel uint8 to 1-channel uint32.
image3 = self._rgb_buffer.astype(np.uint32)
segimage = (image3[:, :, 0] +
image3[:, :, 1] * (2**8) +
image3[:, :, 2] * (2**16))
# Remap segid to 2-channel (object ID, object type) pair.
# Seg ID 0 is background -- will be remapped to (-1, -1).
segid2output = np.full((self._scene.ngeom + 1, 2), fill_value=-1,
dtype=np.int32) # Seg id cannot be > ngeom + 1.
visible_geoms = [g for g in self._scene.geoms if g.segid != -1]
visible_segids = np.array([g.segid + 1 for g in visible_geoms], np.int32)
visible_objid = np.array([g.objid for g in visible_geoms], np.int32)
visible_objtype = np.array([g.objtype for g in visible_geoms], np.int32)
segid2output[visible_segids, 0] = visible_objid
segid2output[visible_segids, 1] = visible_objtype
image = segid2output[segimage]
else:
image = self._rgb_buffer
# The first row in the buffer is the bottom row of pixels in the image.
return self._physics.contexts.gl.to_pixels(image)
def select(self, cursor_position):
"""Returns bodies and geoms visible at given coordinates in the frame.
Args:
cursor_position: A `tuple` containing x and y coordinates, normalized to
between 0 and 1, and where (0, 0) is bottom-left.
Returns:
A `Selected` namedtuple. Fields are None if nothing is selected.
"""
self.update()
aspect_ratio = self._width / self._height
cursor_x, cursor_y = cursor_position
pos = np.empty(3, np.double)
geom_id_arr = np.intc([-1])
flex_id_arr = np.intc([-1])
skin_id_arr = np.intc([-1])
body_id = mujoco.mjv_select(self._physics.model.ptr, self._physics.data.ptr,
self._scene_option.ptr, aspect_ratio, cursor_x,
cursor_y, self._scene.ptr, pos, geom_id_arr,
flex_id_arr, skin_id_arr)
[geom_id] = geom_id_arr
[flex_id] = flex_id_arr
[skin_id] = skin_id_arr
# Validate IDs
if body_id != -1:
assert 0 <= body_id < self._physics.model.nbody
else:
body_id = None
if geom_id != -1:
assert 0 <= geom_id < self._physics.model.ngeom
else:
geom_id = None
if flex_id != -1:
assert 0 <= flex_id < self._physics.model.nflex
else:
flex_id = None
if skin_id != -1:
assert 0 <= skin_id < self._physics.model.nskin
else:
skin_id = None
if all(id_ is None for id_ in (body_id, geom_id, skin_id)):
pos = None
return Selected(
body=body_id,
geom=geom_id,
flex=flex_id,
skin=skin_id,
world_position=pos,
)
class MovableCamera(Camera):
"""Subclass of `Camera` that can be moved by changing its pose.
A `MovableCamera` always corresponds to a MuJoCo free camera with id -1.
"""
def __init__(
self,
physics: Physics,
height: int = 240,
width: int = 320,
max_geom: Optional[int] = None,
scene_callback: Optional[Callable[[Physics, mujoco.MjvScene],
None]] = None,
):
"""Initializes a new `MovableCamera`.
Args:
physics: Instance of `Physics`.
height: Optional image height. Defaults to 240.
width: Optional image width. Defaults to 320.
max_geom: Optional integer specifying the maximum number of geoms that can
be rendered in the same scene. If None this will be chosen automatically
based on the estimated maximum number of renderable geoms in the model.
scene_callback: Called after the scene has been created and before
it is rendered. Can be used to add more geoms to the scene.
"""
super().__init__(physics=physics, height=height, width=width, camera_id=-1,
max_geom=max_geom, scene_callback=scene_callback)
def get_pose(self):
"""Returns the pose of the camera.
Returns:
A `Pose` named tuple with fields:
lookat: NumPy array specifying lookat point.
distance: Float specifying distance to `lookat`.
azimuth: Azimuth in degrees.
elevation: Elevation in degrees.
"""
return Pose(self._render_camera.lookat, self._render_camera.distance,
self._render_camera.azimuth, self._render_camera.elevation)
def set_pose(self, lookat, distance, azimuth, elevation):
"""Sets the pose of the camera.
Args:
lookat: NumPy array or list specifying lookat point.
distance: Float specifying distance to `lookat`.
azimuth: Azimuth in degrees.
elevation: Elevation in degrees.
"""
np.copyto(self._render_camera.lookat, lookat)
self._render_camera.distance = distance
self._render_camera.azimuth = azimuth
self._render_camera.elevation = elevation
class TextOverlay:
"""A text overlay that can be drawn on top of a camera view."""
__slots__ = ('title', 'body', 'style', 'position')
def __init__(self, title='', body='', style='normal', position='top left'):
"""Initializes a new TextOverlay instance.
Args:
title: Title text.
body: Body text.
style: The font style. Can be either "normal", "shadow", or "big".
position: The grid position of the overlay. Can be either "top left",
"top right", "bottom left", or "bottom right".
"""
self.title = title
self.body = body
self.style = _FONT_STYLES[style]
self.position = _GRID_POSITIONS[position]
def draw(self, context, rect):
"""Draws the overlay.
Args:
context: A `mujoco.MjrContext` pointer.
rect: A `mujoco.MjrRect`.
"""
mujoco.mjr_overlay(self.style, self.position, rect,
util.to_binary_string(self.title),
util.to_binary_string(self.body), context)
def action_spec(physics):
"""Returns a `BoundedArraySpec` matching the `physics` actuators."""
num_actions = physics.model.nu
is_limited = physics.model.actuator_ctrllimited.ravel().astype(bool)
control_range = physics.model.actuator_ctrlrange
minima = np.full(num_actions, fill_value=-mujoco.mjMAXVAL, dtype=float)
maxima = np.full(num_actions, fill_value=mujoco.mjMAXVAL, dtype=float)
minima[is_limited], maxima[is_limited] = control_range[is_limited].T
return specs.BoundedArray(
shape=(num_actions,), dtype=float, minimum=minima, maximum=maxima)