# Copyright 2017 The dm_control Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================ import copy import pickle from absl.testing import absltest from absl.testing import parameterized from dm_control.mujoco import engine from dm_control.mujoco import wrapper from dm_control.mujoco.testing import assets from dm_control.mujoco.wrapper.mjbindings import enums from dm_control.rl import control import mock import mujoco import numpy as np MODEL_PATH = assets.get_path('cartpole.xml') MODEL_WITH_ASSETS = assets.get_contents('model_with_assets.xml') ASSETS = { 'texture.png': assets.get_contents('deepmind.png'), 'mesh.stl': assets.get_contents('cube.stl'), 'included.xml': assets.get_contents('sphere.xml') } class MujocoEngineTest(parameterized.TestCase): def setUp(self): super().setUp() self._physics = engine.Physics.from_xml_path(MODEL_PATH) def _assert_attributes_equal(self, actual_obj, expected_obj, attr_to_compare): for name in attr_to_compare: actual_value = getattr(actual_obj, name) expected_value = getattr(expected_obj, name) try: if isinstance(expected_value, np.ndarray): np.testing.assert_array_equal(actual_value, expected_value) else: self.assertEqual(actual_value, expected_value) except AssertionError as e: raise AssertionError( f"Attribute '{name}' differs from expected value.") from e @parameterized.parameters(0, 'cart', u'cart') def testCameraIndexing(self, camera_id): height, width = 480, 640 _ = engine.Camera( self._physics, height, width, camera_id=camera_id) def testDepthRender(self): plane_and_box = """ """ physics = engine.Physics.from_xml_string(plane_and_box) pixels = physics.render(height=200, width=200, camera_id='top', depth=True) # Nearest pixels should be 2.8m away np.testing.assert_approx_equal(pixels.min(), 2.8, 3) # Furthest pixels should be 3m away (depth is orthographic) np.testing.assert_approx_equal(pixels.max(), 3.0, 3) @parameterized.parameters([True, False]) def testSegmentationRender(self, enable_geom_frame_rendering): box_four_corners = """ """ physics = engine.Physics.from_xml_string(box_four_corners) obj_type_geom = enums.mjtObj.mjOBJ_GEOM # Geom object type obj_type_site = enums.mjtObj.mjOBJ_SITE # Site object type obj_type_decor = enums.mjtObj.mjOBJ_UNKNOWN # Decor object type scene_options = wrapper.MjvOption() if enable_geom_frame_rendering: scene_options.frame = mujoco.mjtFrame.mjFRAME_GEOM pixels = physics.render(height=200, width=200, camera_id='top', segmentation=True, scene_option=scene_options) # The pixel indices below were chosen so that toggling the frame decors do # not affect the segmentation results. with self.subTest('Center pixels should have background label'): np.testing.assert_equal(pixels[95:105, 95:105, 0], -1) np.testing.assert_equal(pixels[95:105, 95:105, 1], -1) with self.subTest('Geoms have correct object type'): np.testing.assert_equal(pixels[15:25, 0:10, 1], obj_type_geom) np.testing.assert_equal(pixels[15:25, 190:200, 1], obj_type_geom) with self.subTest('Sites have correct object type'): np.testing.assert_equal(pixels[190:200, 190:200, 1], obj_type_site) np.testing.assert_equal(pixels[190:200, 0:10, 1], obj_type_site) with self.subTest('Geoms have correct object IDs'): np.testing.assert_equal(pixels[15:25, 0:10, 0], physics.model.name2id('box0', obj_type_geom)) np.testing.assert_equal(pixels[15:25, 190:200, 0], physics.model.name2id('box1', obj_type_geom)) with self.subTest('Sites have correct object IDs'): np.testing.assert_equal(pixels[190:200, 190:200, 0], physics.model.name2id('box2', obj_type_site)) np.testing.assert_equal(pixels[190:200, 0:10, 0], physics.model.name2id('box3', obj_type_site)) with self.subTest('Decor elements present if and only if geom frames are ' 'enabled'): contains_decor = np.any(pixels[:, :, 1] == obj_type_decor) self.assertEqual(contains_decor, enable_geom_frame_rendering) def testSceneCallback(self): empty_world = """ """ def callback(_, scn: mujoco.MjvScene): # Add a red box to the scene scn.ngeom += 1 mujoco.mjv_initGeom( scn.geoms[scn.ngeom - 1], mujoco.mjtGeom.mjGEOM_BOX.value, size=np.array([0.2, 0.2, 0.2]), pos=np.zeros(3), mat=np.eye(3).flatten(), rgba=np.array([1, 0, 0, 1], dtype=np.float32)) physics = engine.Physics.from_xml_string(empty_world) # Without the callback, render should return a black image. empty_image = physics.render( height=8, width=8, camera_id='cam', scene_callback=None) np.testing.assert_array_equal( np.zeros((8, 8, 3), dtype=np.uint8), empty_image) # With the callback, there should be a red box. pixels = physics.render( height=8, width=8, camera_id='cam', scene_callback=callback) # Are there any pixels where red component is bigger than green and blue? any_red_pixels = np.any(pixels[:, :, 0] > np.max(pixels[:, :, 1:3], axis=2)) self.assertTrue(any_red_pixels, 'Expecting some red pixels.') def testTextOverlay(self): height, width = 480, 640 overlay = engine.TextOverlay(title='Title', body='Body', style='big', position='bottom right') no_overlay = self._physics.render(height, width, camera_id=0) with_overlay = self._physics.render(height, width, camera_id=0, overlays=[overlay]) self.assertFalse(np.all(no_overlay == with_overlay), msg='Images are identical with and without text overlay.') def testSceneOption(self): height, width = 480, 640 scene_option = wrapper.MjvOption() # Render geoms as semi-transparent. scene_option.flags[enums.mjtVisFlag.mjVIS_TRANSPARENT] = 1 no_scene_option = self._physics.render(height, width, camera_id=0) with_scene_option = self._physics.render(height, width, camera_id=0, scene_option=scene_option) self.assertFalse(np.all(no_scene_option == with_scene_option), msg='Images are identical with and without scene option.') def testRenderFlags(self): height, width = 480, 640 cam = engine.Camera(self._physics, height, width, camera_id=0) cam.scene.flags[enums.mjtRndFlag.mjRND_WIREFRAME] = 1 # Enable wireframe enabled = cam.render().copy() cam.scene.flags[enums.mjtRndFlag.mjRND_WIREFRAME] = 0 # Disable wireframe disabled = cam.render().copy() self.assertFalse( np.all(disabled == enabled), msg='Images are identical regardless of whether wireframe is enabled.') def testFreeIdempotent(self): self._physics.free() self._physics.free() @parameterized.parameters(((0.5, 0.5), (1, 3)), # pole ((0.5, 0.1), (0, 0)), # ground ((0.9, 0.9), (None, None)), # sky ) def testCameraSelection(self, coordinates, expected_selection): height, width = 480, 640 camera = engine.Camera(self._physics, height, width, camera_id=0) # Test for b/63380170: Enabling visualization of body frames adds # "non-model" geoms to the scene. This means that the indices of geoms # within `camera._scene.geoms` don't match the rows of `model.geom_bodyid`. camera.option.frame = enums.mjtFrame.mjFRAME_BODY selected = camera.select(coordinates) self.assertEqual(expected_selection, selected[:2]) @parameterized.parameters( dict(camera_id='cam0', height=200, width=300), dict(camera_id=1, height=300, width=200), dict(camera_id=-1, height=400, width=400), ) def testCameraMatrix(self, camera_id, height, width): """Tests the camera_matrix() method. Creates a model with two cameras and two small geoms. We render the scene with one of the cameras and check that the geom locations, projected into pixel space, are correct, using segmenation rendering. xyz2pixels() shows how the transformation is used. For a description of the camera matrix see https://en.wikipedia.org/wiki/Camera_matrix. Args: camera_id: One of the two cameras. Can be either integer or String. height: The height of the image (pixels). width: The width of the image (pixels). """ def xyz2pixels(x, y, z, camera_matrix): """Transforms from world coordinates to pixel coordinates.""" xs, ys, s = camera_matrix.dot(np.array([x, y, z, 1.0])) return xs/s, ys/s two_geoms_and_two_cameras = """ """ physics = engine.Physics.from_xml_string(two_geoms_and_two_cameras) camera = engine.Camera(physics, width=width, height=height, camera_id=camera_id) camera_matrix = camera.matrix # Get camera matrix. pixels = camera.render(segmentation=True) # Render a segmentation frame. for geom_id in [0, 1]: # Compute the location of the geom in pixel space using the camera matrix. x, y = xyz2pixels(*physics.data.geom_xpos[geom_id], camera_matrix) row = int(round(y)) column = int(round(x)) # Compare segmentation values of nearest pixel to corresponding geom. [obj_id, obj_type] = pixels[row, column, :] self.assertEqual(obj_type, enums.mjtObj.mjOBJ_GEOM) self.assertEqual(obj_id, geom_id) def testMovableCameraSetGetPose(self): height, width = 240, 320 camera = engine.MovableCamera(self._physics, height, width) image = camera.render().copy() pose = camera.get_pose() lookat_offset = np.array([0.01, 0.02, -0.03]) # Would normally pass the new values directly to camera.set_pose instead of # using the namedtuple _replace method, but this makes the asserts at the # end of the test a little cleaner. new_pose = pose._replace(distance=pose.distance * 1.5, lookat=pose.lookat + lookat_offset, azimuth=pose.azimuth + -15, elevation=pose.elevation - 10) camera.set_pose(*new_pose) self.assertEqual(new_pose.distance, camera.get_pose().distance) self.assertEqual(new_pose.azimuth, camera.get_pose().azimuth) self.assertEqual(new_pose.elevation, camera.get_pose().elevation) np.testing.assert_allclose(new_pose.lookat, camera.get_pose().lookat) self.assertFalse(np.all(image == camera.render())) def testRenderExceptions(self): max_width = self._physics.model.vis.global_.offwidth max_height = self._physics.model.vis.global_.offheight max_camid = self._physics.model.ncam - 1 with self.assertRaisesRegex(ValueError, 'width'): self._physics.render(max_height, max_width + 1, camera_id=max_camid) with self.assertRaisesRegex(ValueError, 'height'): self._physics.render(max_height + 1, max_width, camera_id=max_camid) with self.assertRaisesRegex(ValueError, 'camera_id'): self._physics.render(max_height, max_width, camera_id=max_camid + 1) with self.assertRaisesRegex(ValueError, 'camera_id'): self._physics.render(max_height, max_width, camera_id=-2) def testPhysicsRenderMethod(self): height, width = 240, 320 image = self._physics.render(height=height, width=width) self.assertEqual(image.shape, (height, width, 3)) depth = self._physics.render(height=height, width=width, depth=True) self.assertEqual(depth.shape, (height, width)) segmentation = self._physics.render(height=height, width=width, segmentation=True) self.assertEqual(segmentation.shape, (height, width, 2)) def testExceptionIfBothDepthAndSegmentation(self): with self.assertRaisesWithLiteralMatch( ValueError, engine._BOTH_SEGMENTATION_AND_DEPTH_ENABLED): self._physics.render(depth=True, segmentation=True) def testRenderFlagOverridesAreNotPersistent(self): camera = engine.Camera(self._physics) first_rgb = camera.render().copy() camera.render(segmentation=True) second_rgb = camera.render().copy() np.testing.assert_array_equal(first_rgb, second_rgb) def testCustomRenderFlags(self): default = self._physics.render() wireframe_string_key = self._physics.render( render_flag_overrides=dict(wireframe=True)) self.assertFalse((default == wireframe_string_key).all()) wireframe_enum_key = self._physics.render( render_flag_overrides={enums.mjtRndFlag.mjRND_WIREFRAME: True}) np.testing.assert_array_equal(wireframe_string_key, wireframe_enum_key) @parameterized.parameters(dict(depth=True), dict(segmentation=True)) def testExceptionIfRenderFlagOverridesAndDepthOrSegmentation(self, **kwargs): with self.assertRaisesWithLiteralMatch( ValueError, engine._RENDER_FLAG_OVERRIDES_NOT_SUPPORTED_FOR_DEPTH_OR_SEGMENTATION): self._physics.render(render_flag_overrides=dict(wireframe=True), **kwargs) def testExceptionIfOverlaysAndDepthOrSegmentation(self): overlay = engine.TextOverlay() with self.assertRaisesWithLiteralMatch( ValueError, engine._OVERLAYS_NOT_SUPPORTED_FOR_DEPTH_OR_SEGMENTATION): self._physics.render(depth=True, overlays=[overlay]) with self.assertRaisesWithLiteralMatch( ValueError, engine._OVERLAYS_NOT_SUPPORTED_FOR_DEPTH_OR_SEGMENTATION): self._physics.render(segmentation=True, overlays=[overlay]) def testNamedViews(self): self.assertEqual((1,), self._physics.control().shape) self.assertEqual((2,), self._physics.position().shape) self.assertEqual((2,), self._physics.velocity().shape) self.assertEqual((0,), self._physics.activation().shape) self.assertEqual((4,), self._physics.state().shape) self.assertEqual(0., self._physics.time()) self.assertEqual(0.01, self._physics.timestep()) def testSetGetPhysicsStateLegacy(self): physics_state = self._physics.get_state() # qpos, qvel, act self.assertLen(self._physics._physics_state_items(), 3) self._physics.set_state(physics_state) new_physics_state = np.random.random_sample(physics_state.shape) self._physics.set_state(new_physics_state) np.testing.assert_allclose(new_physics_state, self._physics.get_state()) def testSetGetPhysicsState(self): actual_physics_state = self._physics.get_state( sig=mujoco.mjtState.mjSTATE_FULLPHYSICS.value ) expected_physics_state = np.zeros(mujoco.mj_stateSize( self._physics.model.ptr, mujoco.mjtState.mjSTATE_FULLPHYSICS.value )) mujoco.mj_getState(self._physics.model.ptr, self._physics.data.ptr, expected_physics_state, mujoco.mjtState.mjSTATE_FULLPHYSICS.value) np.testing.assert_array_equal(expected_physics_state, actual_physics_state) new_physics_state = np.random.random_sample(expected_physics_state.shape) self._physics.set_state( new_physics_state, sig=mujoco.mjtState.mjSTATE_FULLPHYSICS.value ) np.testing.assert_array_equal( new_physics_state, self._physics.get_state(sig=mujoco.mjtState.mjSTATE_FULLPHYSICS.value), ) def testSetGetPhysicsStateWithPlugin(self): # Model copied from mujoco/test/plugin/elasticity/elasticity_test.cc model_with_cable_plugin = """ """ physics = engine.Physics.from_xml_string(model_with_cable_plugin) physics_state = physics.get_state() # qpos, qvel, act, plugin_state self.assertLen(physics._physics_state_items(), 4) physics.set_state(physics_state) new_physics_state = np.random.random_sample(physics_state.shape) physics.set_state(new_physics_state) np.testing.assert_allclose(new_physics_state, physics.get_state()) def testSetInvalidPhysicsState(self): badly_shaped_state = np.repeat(self._physics.get_state(), repeats=2) with self.assertRaises(ValueError): self._physics.set_state(badly_shaped_state) def testNamedIndexing(self): self.assertEqual((3,), self._physics.named.data.xpos['cart'].shape) self.assertEqual((2, 3), self._physics.named.data.xpos[['cart', 'pole']].shape) def testReload(self): self._physics.reload_from_xml_path(MODEL_PATH) def testReset(self): self._physics.reset() self.assertEqual(self._physics.data.qpos[1], 0) keyframe_id = 0 self._physics.reset(keyframe_id=keyframe_id) self.assertEqual(self._physics.data.qpos[1], self._physics.model.key_qpos[keyframe_id, 1]) out_of_range = [-1, 3] max_valid = self._physics.model.nkey - 1 for actual in out_of_range: with self.assertRaisesWithLiteralMatch( ValueError, engine._KEYFRAME_ID_OUT_OF_RANGE.format( max_valid=max_valid, actual=actual)): self._physics.reset(keyframe_id=actual) def testLoadAndReloadFromStringWithAssets(self): physics = engine.Physics.from_xml_string( MODEL_WITH_ASSETS, assets=ASSETS) physics.reload_from_xml_string(MODEL_WITH_ASSETS, assets=ASSETS) @parameterized.parameters(*enums.mjtWarning._fields[:-1]) def testDivergenceException(self, warning_name): warning_enum = getattr(enums.mjtWarning, warning_name) with self.assertRaisesWithLiteralMatch( control.PhysicsError, engine._INVALID_PHYSICS_STATE.format(warning_names=warning_name)): with self._physics.check_invalid_state(): self._physics.data.warning[warning_enum].number = 1 # Existing warnings should not raise an exception. with self._physics.check_invalid_state(): pass self._physics.reset() with self._physics.check_invalid_state(): pass @parameterized.parameters(float('inf'), float('nan'), 1e15) def testBadQpos(self, bad_value): with self._physics.reset_context(): self._physics.data.qpos[0] = bad_value with self.assertRaises(control.PhysicsError): with self._physics.check_invalid_state(): mujoco.mj_checkPos(self._physics.model.ptr, self._physics.data.ptr) self._physics.reset() with self._physics.check_invalid_state(): mujoco.mj_checkPos(self._physics.model.ptr, self._physics.data.ptr) def testNanControl(self): with self._physics.reset_context(): pass # Apply the controls. with self.assertRaisesWithLiteralMatch( control.PhysicsError, engine._INVALID_PHYSICS_STATE.format(warning_names='mjWARN_BADCTRL')): with self._physics.check_invalid_state(): self._physics.data.ctrl[0] = float('nan') self._physics.step() def testSuppressPhysicsError(self): bad_value = float('nan') message = engine._INVALID_PHYSICS_STATE.format( warning_names='mjWARN_BADCTRL') def assert_physics_error(): self._physics.data.ctrl[0] = bad_value with self.assertRaisesWithLiteralMatch(control.PhysicsError, message): self._physics.forward() def assert_warning(): self._physics.data.ctrl[0] = bad_value with mock.patch.object(engine.logging, 'warn') as mock_warn: self._physics.forward() mock_warn.assert_called_once_with(message) assert_physics_error() with self._physics.suppress_physics_errors(): assert_warning() with self._physics.suppress_physics_errors(): assert_warning() assert_warning() assert_physics_error() @parameterized.named_parameters( ('_copy', lambda x: x.copy()), ('_deepcopy', copy.deepcopy), ('_pickle_and_unpickle', lambda x: pickle.loads(pickle.dumps(x))), ) def testCopyOrPicklePhysics(self, func): for _ in range(10): self._physics.step() physics2 = func(self._physics) self.assertNotEqual(physics2.model.ptr, self._physics.model.ptr) self.assertNotEqual(physics2.data.ptr, self._physics.data.ptr) model_attr_to_compare = ('nnames', 'njmax', 'body_pos', 'geom_quat') self._assert_attributes_equal( physics2.model, self._physics.model, model_attr_to_compare) data_attr_to_compare = ('time', 'energy', 'qpos', 'xpos') self._assert_attributes_equal( physics2.data, self._physics.data, data_attr_to_compare) for _ in range(10): self._physics.step() physics2.step() self._assert_attributes_equal( physics2.model, self._physics.model, model_attr_to_compare) self._assert_attributes_equal( physics2.data, self._physics.data, data_attr_to_compare) @parameterized.named_parameters( ('_copy', lambda x: x.copy()), ('_pickle_and_unpickle', lambda x: pickle.loads(pickle.dumps(x))), ) def testSuppressErrorsAfterCopyOrPicklePhysics(self, func): # Regression test for a problem that used to exist where # suppress_physics_errors couldn't be used on Physics objects that were # unpickled. physics2 = func(self._physics) with physics2.suppress_physics_errors(): pass def testCopyDataOnly(self): physics2 = self._physics.copy(share_model=True) self.assertEqual(physics2.model.ptr, self._physics.model.ptr) self.assertNotEqual(physics2.data.ptr, self._physics.data.ptr) def testForwardDynamicsUpdatedAfterReset(self): gravity = -9.81 self._physics.model.opt.gravity[2] = gravity with self._physics.reset_context(): pass self.assertAlmostEqual( self._physics.named.data.sensordata['accelerometer'][2], -gravity) def testActuationNotAppliedInAfterReset(self): self._physics.data.ctrl[0] = 1. self._physics.after_reset() # Calls `forward()` with actuation disabled. self.assertEqual(self._physics.data.actuator_force[0], 0.) self._physics.forward() # Call `forward` directly with actuation enabled. self.assertEqual(self._physics.data.actuator_force[0], 1.) def testActionSpec(self): xml = """ """ physics = engine.Physics.from_xml_string(xml) spec = engine.action_spec(physics) self.assertEqual(float, spec.dtype) np.testing.assert_array_equal(spec.minimum, [-mujoco.mjMAXVAL, -1.0]) np.testing.assert_array_equal(spec.maximum, [mujoco.mjMAXVAL, 2.0]) def testNstep(self): # Make initial state. with self._physics.reset_context(): self._physics.data.qvel[0] = 1 self._physics.data.qvel[1] = 1 initial_state = self._physics.get_state() # step() 4 times. for _ in range(4): self._physics.step() for_loop_state = self._physics.get_state() # Reset state, call step(4). with self._physics.reset_context(): self._physics.set_state(initial_state) self._physics.step(4) nstep_state = self._physics.get_state() np.testing.assert_array_equal(for_loop_state, nstep_state) # Repeat test with with RK4 integrator: self._physics.model.opt.integrator = enums.mjtIntegrator.mjINT_RK4 # step() 4 times. with self._physics.reset_context(): self._physics.set_state(initial_state) for _ in range(4): self._physics.step() for_loop_state_rk4 = self._physics.get_state() # Reset state, call step(4). with self._physics.reset_context(): self._physics.set_state(initial_state) self._physics.step(4) nstep_state_rk4 = self._physics.get_state() np.testing.assert_array_equal(for_loop_state_rk4, nstep_state_rk4) if __name__ == '__main__': absltest.main()