# mypy: disable-error-code="attr-defined, name-defined"
"""This module implements a basic wrapper on top of the lower-level Jiminy
Engine that simplifies the user API for the very common single-robot scenario
while extending its capability by integrating native support of 3D scene
rendering and figure plotting of telemetry log data.
"""
import os
import re
import atexit
import logging
import pathlib
import tempfile
import warnings
from copy import deepcopy
from functools import partial
from typing import Any, List, Dict, Optional, Union, Sequence, Callable
import toml
import numpy as np
from . import core as jiminy
from .robot import BaseJiminyRobot, generate_default_hardware_description_file
from .dynamics import Trajectory
from .log import read_log, build_robot_from_log
from .viewer import (CameraPoseType,
interactive_mode,
get_default_backend,
extract_replay_data_from_log,
play_trajectories,
Viewer)
if interactive_mode() >= 2:
from tqdm.notebook import tqdm
else:
from tqdm import tqdm # type: ignore[assignment]
try:
from .plot import TabbedFigure
except ImportError:
TabbedFigure = type(None) # type: ignore[misc,assignment]
LOGGER = logging.getLogger(__name__)
DEFAULT_UPDATE_PERIOD = 1.0e-3 # 0.0 for time continuous update
DEFAULT_GROUND_STIFFNESS = 4.0e6
DEFAULT_GROUND_DAMPING = 2.0e3
ProfileForceFunc = Callable[[float, np.ndarray, np.ndarray, np.ndarray], None]
def _build_robot_from_urdf(name: str,
urdf_path: str,
hardware_path: Optional[str] = None,
mesh_path_dir: Optional[str] = None,
has_freeflyer: bool = True,
avoid_instable_collisions: bool = True,
debug: bool = False) -> jiminy.Robot:
r"""Create and initialize a new robot from scratch, based on configuration
files only. It creates a default hardware file if none is provided. See
`generate_default_hardware_description_file` for details.
:param name: Name of the robot to build from URDF.
:param urdf_path: Path of the URDF of the robot.
:param hardware_path: Path of Jiminy hardware description toml file.
Optional: Looking for '\*_hardware.toml' file in
the same folder and with the same name.
:param mesh_path_dir: Path to the folder containing all the meshes.
Optional: Env variable 'JIMINY_DATA_PATH' will be
used if available.
:param has_freeflyer: Whether the robot is fixed-based wrt its root
link, or can move freely in the world.
Optional: True by default.
:param avoid_instable_collisions: Prevent numerical instabilities by
replacing collision mesh by vertices
of associated minimal volume bounding
box, and replacing primitive box by
its vertices.
:param debug: Whether the debug mode must be activated. Doing it
enables temporary files automatic deletion.
"""
# Check if robot name is valid
if re.match('[^A-Za-z0-9_]', name):
raise ValueError("The name of the robot should be case-insensitive "
"ASCII alphanumeric characters plus underscore.")
# Generate a temporary Hardware Description File if necessary
if hardware_path is None:
hardware_path = str(pathlib.Path(
urdf_path).with_suffix('')) + '_hardware.toml'
if not os.path.exists(hardware_path):
# Create a file that will be closed (thus deleted) at exit
urdf_name = os.path.splitext(os.path.basename(urdf_path))[0]
fd, hardware_path = tempfile.mkstemp(
prefix=f"{urdf_name}_", suffix="_hardware.toml")
os.close(fd)
if not debug:
def remove_file_at_exit(file_path: str) -> None:
try:
os.remove(file_path)
except (PermissionError, FileNotFoundError):
pass
atexit.register(partial(
remove_file_at_exit, hardware_path))
# Generate default Hardware Description File
generate_default_hardware_description_file(
urdf_path, hardware_path, verbose=debug)
# Build the robot
robot = BaseJiminyRobot(name)
robot.initialize(
urdf_path, hardware_path, mesh_path_dir, (), has_freeflyer,
avoid_instable_collisions, load_visual_meshes=debug, verbose=debug)
return robot
[docs]
class Simulator:
"""Simulation wrapper providing a unified user-API on top of the low-level
jiminy C++ core library and Python-native modules for 3D rendering and log
data visualization.
The simulation can now be multi-robot but it has been design to remain as
easy of use as possible for single-robot simulation which are just
multi-robot simulations with only one robot.
* Single-robot simulations: The name of the robot is an empty string by
default but can be specified. It will then appear in the log if specified.
* Multi-robots simulations: The name of the first robot is an empty string
by default but it is advised to specify one. You can add robots to the
simulation with the method `add_robot`, robot names have to be specified.
Some proxy and methods are not compatible with multi-robot simulations:
Single-robot simulations | Multi-robot simulations
---------------------------------------------------------------------------
Simulator.viewer ---> Simulator.viewers
Simulator.robot ---> Simulator.engine.robots
Simulator.robot_state ---> Simulator.engine.robot_states
Simulator.register_profile_force -> Simulator.engine.register_profile_force
Simulator.register_impulse_force -> Simulator.engine.register_impulse_force
The methods `replay` and `plot` are not supported for multi-robot
simulations at the time being.
In case of multi-robot simulations, single-robot proxies either return
information associated with the first robot or raise an exception.
"""
def __init__(self, # pylint: disable=unused-argument
robot: jiminy.Robot,
viewer_kwargs: Optional[Dict[str, Any]] = None,
**kwargs: Any) -> None:
"""
:param robot: Jiminy robot already initialized.
:param viewer_kwargs: Keyword arguments to override default arguments
whenever a viewer must be instantiated, eg when
`render` method is first called. Specifically,
`backend` is ignored if one is already available.
Optional: None by default.
:param kwargs: Used arguments to allow automatic pipeline wrapper
generation.
"""
# Backup the user arguments
self.viewer_kwargs = deepcopy(viewer_kwargs or {})
# Check if robot name is valid
if re.match('[^A-Za-z0-9_]', robot.name):
raise ValueError("The name of the robot should be case-insensitive"
" ASCII alphanumeric characters plus underscore.")
# Instantiate the low-level Jiminy engine, then initialize it
self.engine = jiminy.Engine()
self.engine.add_robot(robot)
# Create shared memories and python-native attribute for fast access
self.stepper_state = self.engine.stepper_state
self.is_simulation_running = self.engine.is_simulation_running
# Viewer management
self._viewers: List[Viewer] = []
# Internal buffer for progress bar management
self._pbar: Optional[tqdm] = None
# Figure holder
self._figures: List[TabbedFigure] = []
# Reset the low-level jiminy engine
self.reset()
[docs]
@classmethod
def build(cls,
urdf_path: str,
hardware_path: Optional[str] = None,
mesh_path_dir: Optional[str] = None,
has_freeflyer: bool = True,
config_path: Optional[str] = None,
avoid_instable_collisions: bool = True,
debug: bool = False,
*, name: str = "",
**kwargs: Any) -> 'Simulator':
r"""Create a new single-robot simulator instance from scratch based on
configuration files only.
:param urdf_path: Path of the urdf model to be used for the simulation.
:param hardware_path: Path of Jiminy hardware description toml file.
Optional: Looking for '\*_hardware.toml' file in
the same folder and with the same name.
:param mesh_path_dir: Path to the folder containing all the meshes.
Optional: Env variable 'JIMINY_DATA_PATH' will be
used if available.
:param has_freeflyer: Whether the robot is fixed-based wrt its root
link, or can move freely in the world.
Optional: True by default.
:param config_path: Configuration toml file to import. It will be
imported AFTER loading the hardware description
file. It can be automatically generated from an
instance by calling `export_config_file` method.
One can specify `None` for loading for the file
having the same full path as the URDF file but
suffix '_options.toml' if any. Passing an empty
string "" will force skipping import completely.
Optional: Empty by default
:param avoid_instable_collisions: Prevent numerical instabilities by
replacing collision mesh by vertices
of associated minimal volume bounding
box, and replacing primitive box by
its vertices.
:param debug: Whether the debug mode must be activated. Doing it
enables temporary files automatic deletion.
:param name: Desired name of the robot.
Optional: Empty string by default.
:param kwargs: Keyword arguments to forward to class constructor.
"""
# Handling of default argument(s)
if config_path is None:
config_path = str(
pathlib.Path(urdf_path).with_suffix('')) + '_options.toml'
if not os.path.exists(config_path):
config_path = ""
# Instantiate and initialize the robot
robot = _build_robot_from_urdf(
name, urdf_path, hardware_path, mesh_path_dir, has_freeflyer,
avoid_instable_collisions, debug)
# Instantiate and initialize the engine
simulator = Simulator.__new__(cls)
Simulator.__init__(
simulator, robot, engine_class=jiminy.Engine, **kwargs)
# Get engine options
engine_options = simulator.engine.get_options()
# Update controller/sensors update period, based on extra toml info
control_period = robot.extra_info.pop('controllerUpdatePeriod', None)
sensors_period = robot.extra_info.pop('sensorsUpdatePeriod', None)
if control_period is None and sensors_period is None:
control_period = DEFAULT_UPDATE_PERIOD
sensors_period = DEFAULT_UPDATE_PERIOD
elif control_period is None:
control_period = sensors_period
else:
sensors_period = control_period
engine_options['stepper']['controllerUpdatePeriod'] = control_period
engine_options['stepper']['sensorsUpdatePeriod'] = sensors_period
# Handling of ground model parameters, based on extra toml info
engine_options['contacts']['stiffness'] = \
robot.extra_info.pop('groundStiffness', DEFAULT_GROUND_STIFFNESS)
engine_options['contacts']['damping'] = \
robot.extra_info.pop('groundDamping', DEFAULT_GROUND_DAMPING)
# Set engine options
simulator.engine.set_options(engine_options)
# Override the default options by the one in the configuration file
if config_path != "":
simulator.import_options(config_path)
return simulator
[docs]
def add_robot(self,
name: str,
urdf_path: str,
hardware_path: Optional[str] = None,
mesh_path_dir: Optional[str] = None,
has_freeflyer: bool = True,
avoid_instable_collisions: bool = True,
debug: bool = False) -> None:
r"""Create a new robot from scratch based on configuration files only
and add it to the simulator.
:param urdf_path: Path of the urdf model to be used for the simulation.
:param hardware_path: Path of Jiminy hardware description toml file.
Optional: Looking for '\*_hardware.toml' file in
the same folder and with the same name.
:param mesh_path_dir: Path to the folder containing all the meshes.
Optional: Env variable 'JIMINY_DATA_PATH' will be
used if available.
:param has_freeflyer: Whether the robot is fixed-based wrt its root
link, or can move freely in the world.
Optional: True by default.
:param avoid_instable_collisions: Prevent numerical instabilities by
replacing collision mesh by vertices
of associated minimal volume bounding
box, and replacing primitive box by
its vertices.
:param debug: Whether the debug mode must be activated. Doing it
enables temporary files automatic deletion.
"""
# Instantiate the robot
robot = _build_robot_from_urdf(
name, urdf_path, hardware_path, mesh_path_dir, has_freeflyer,
avoid_instable_collisions, debug)
# Check if some unsupported objects have been specified
unsupported_nested_paths = (
('groundStiffness', ('contacts', 'stiffness')),
('groundDamping', ('contacts', 'damping')),
('controllerUpdatePeriod', ('stepper', 'controllerUpdatePeriod')),
('sensorsUpdatePeriod', ('stepper', 'sensorsUpdatePeriod')))
engine_options = self.engine.get_options()
for extra_info_key, option_nested_path in unsupported_nested_paths:
if extra_info_key in robot.extra_info.keys():
option = engine_options
for option_path in option_nested_path:
option = option[option_path]
if robot.extra_info[extra_info_key] != option:
warnings.warn(
f"You have specified a different {extra_info_key} "
"than the one of the engine, the simulation will run "
f"with {option}.")
# Add the new robot to the engine
self.engine.add_robot(robot)
def __del__(self) -> None:
"""Custom deleter to make sure the close is properly closed at exit.
"""
self.close()
def __getattr__(self, name: str) -> Any:
"""Convenient fallback attribute getter.
It enables to get access to the attribute and methods of the low-level
Jiminy engine directly, without having to do it through `engine`.
.. note::
This method is not meant to be called manually.
"""
return getattr(self.__getattribute__('engine'), name)
def __dir__(self) -> List[str]:
"""Attribute lookup.
It is mainly used by autocomplete feature of Ipython. It is overloaded
to get consistent autocompletion wrt `getattr`.
"""
return [*super().__dir__(), *dir(self.engine)]
@property
def viewer(self) -> Optional[Viewer]:
"""Convenience proxy to get the viewer associated with the robot that
was first added if any.
"""
if self.is_viewer_available:
return self._viewers[0]
return None
@property
def viewers(self) -> Sequence[Viewer]:
"""Convenience proxy to get all the viewers associated with the ongoing
simulation.
"""
return self._viewers[:len(self.engine.robots)]
@property
def robot(self) -> jiminy.Robot:
"""Convenience proxy to get the robot in single-robot simulations.
Internally, all it does is returning `self.engine.robots[0]` without
any additional processing.
.. warning::
Method only supported for single-robot simulations. Call
`self.engine.robots` in multi-robot simulations.
"""
# A property is used in place of an instance attribute to keep pointing
# to the right robot if the latter has been replaced by the user. Doing
# so is dodgy and rarely necessary. For this reason, this capability is
# not advertised but supported nonetheless.
robot, = self.engine.robots
return robot
@property
def robot_state(self) -> jiminy.RobotState:
"""Convenience proxy to get the state of the robot in single-robot
simulations.
Internally, all it does is returning `self.engine.robot_states[0]`
without any additional processing.
.. warning::
Method only supported for single-robot simulations. Call
`self.engine.robot_states` in multi-robot simulations.
"""
# property is used in place of attribute for extra safety
robot_state, = self.engine.robot_states
return robot_state
@property
def is_viewer_available(self) -> bool:
"""Returns whether some viewer instances associated with the ongoing
simulation is currently opened.
.. warning::
Method only supported for single-robot simulations.
"""
return (len(self._viewers) > 0 and
self._viewers[0].is_open()) # type: ignore[misc]
[docs]
def register_profile_force(self,
frame_name: str,
force_func: ProfileForceFunc,
update_period: float = 0.0) -> None:
r"""Apply an external force profile on a given frame.
The force can be time- and state-dependent, and may be time-continuous
or updated periodically (Zero-Order Hold).
:param frame_name: Name of the frame at which to apply the force.
:param force_func:
.. raw:: html
Force profile as a callable with signature:
| force_func\(
| **t**: float,
| **q**: np.ndarray,
| **v**: np.ndarray,
| **force**: np.ndarray
| \) -> None
where `force` corresponds the spatial force in local world aligned
frame, ie its origin is located at application frame but its basis
is aligned with world frame. It is represented as a `np.ndarray`
(Fx, Fy, Fz, Mx, My, Mz) that must be updated in-place.
:param update_period: Update period of the force. It must be set to 0.0
for time-continuous. Discrete update is strongly
recommended for native Python callables because
evaluating them is so slow that it would slowdown
the whole simulation. There is no issue for C++
bindings such as `jiminy.RandomPerlinProcess`.
"""
if len(self.engine.robots) > 1:
raise NotImplementedError(
"To register a force in multirobot simulation, you should use "
"`simulation.engine.register_profile_force` and specify the "
"name of the robot you want the force applied to.")
return self.engine.register_profile_force(
"", frame_name, force_func, update_period)
[docs]
def register_impulse_force(self,
frame_name: str,
t: float,
dt: float,
force: np.ndarray) -> None:
r"""Apply an external impulse force on a given frame.
The force starts at the fixed point in time and lasts a given duration.
In the meantime, its profile is square-shaped, ie the force remains
constant.
:param frame_name: Name of the frame at which to apply the force.
:param t: Time at which to start applying the external force.
:param dt: Duration of the force.
:param force_func: Spatial force in local world aligned frame, ie its
origin is located at application frame but its basis
is aligned with world frame. It is represented as a
`np.ndarray` (Fx, Fy, Fz, Mx, My, Mz).
"""
if len(self.engine.robots) > 1:
raise NotImplementedError(
"To register a force in multirobot simulation, you should use "
"`simulation.engine.register_profile_force` and specify the "
"name of the robot you want the force applied to.")
return self.engine.register_impulse_force("", frame_name, t, dt, force)
[docs]
def seed(self, seed: Union[np.uint32, np.ndarray]) -> None:
"""Set the seed of the simulation and reset the simulation.
.. warning::
It also resets the low-level jiminy Engine. Therefore one must call
the `reset` method manually afterward.
:param seed: Desired seed as a sequence of unsigned integers 32 bits.
"""
assert seed.dtype == np.uint32, "'seed' must have dtype np.uint32."
# Make sure no simulation is running before setting the seed
self.engine.stop()
# Set the seed in engine options
engine_options = self.engine.get_options()
engine_options["stepper"]["randomSeedSeq"] = np.asarray(seed)
self.engine.set_options(engine_options)
# It is expected by OpenAI Gym API to reset env after setting the seed
self.reset()
[docs]
def reset(self, remove_all_forces: bool = False) -> None:
"""Reset the simulator.
It resets the simulation time to zero, and generate a new random model
of the robot. If one wants to get exactly the same result as before,
either set the randomness of the model and sensors to zero, or set the
seed once again to reinitialize the random number generator.
:param remove_all_forces:
Whether to remove already registered external forces. Note that it
can also be done separately by calling `remove_all_forces` method.
Optional: Do not remove by default.
"""
# Reset the backend engine
self.engine.reset(False, remove_all_forces)
[docs]
def start(
self,
q_init: Union[np.ndarray, Dict[str, np.ndarray]],
v_init: Union[np.ndarray, Dict[str, np.ndarray]],
a_init: Optional[Union[np.ndarray, Dict[str, np.ndarray]]] = None,
is_state_theoretical: Optional[bool] = None
) -> None:
"""Initialize a simulation, starting from (q_init, v_init) at t=0.
:param q_init: Initial configuration (by robot if it is a dictionary).
:param v_init: Initial velocity (by robot if it is a dictionary).
:param a_init: Initial acceleration (by robot if it is a dictionary).
It is only used by acceleration dependent sensors and
controllers, such as IMU and force sensors.
:param is_state_theoretical: Whether the initial state is associated
with the actual or theoretical model of
the robot. This option is only supported
when passing `np.ndarray` for starting a
single-robot simulation.
"""
# Call base implementation
if isinstance(q_init, np.ndarray):
assert isinstance(v_init, np.ndarray)
assert a_init is None or isinstance(a_init, np.ndarray)
if is_state_theoretical is None:
is_state_theoretical = False
self.engine.start(q_init, v_init, a_init, is_state_theoretical)
else:
assert isinstance(v_init, dict)
assert a_init is None or isinstance(a_init, dict)
if is_state_theoretical is not None:
raise NotImplementedError(
"Optional argument 'is_state_theoretical' is only "
"supported for single-robot simulations.")
self.engine.start(q_init, v_init, a_init)
# Share the external force buffer of the viewer with the engine.
# Note that the force vector must be converted to pain list to avoid
# copy with external sub-vector.
for robot_state, viewer in zip(self.engine.robot_states, self.viewers):
viewer.f_external = [*robot_state.f_external][1:]
[docs]
def simulate(self,
t_end: float,
q_init: Union[np.ndarray, Dict[str, np.ndarray]],
v_init: Union[np.ndarray, Dict[str, np.ndarray]],
a_init: Optional[
Union[np.ndarray, Dict[str, np.ndarray]]] = None,
is_state_theoretical: Optional[bool] = None,
callback: Optional[Callable[[], bool]] = None,
log_path: Optional[str] = None,
show_progress_bar: bool = True) -> None:
"""Run a simulation, starting from x0=(q0,v0) at t=0 up to tf.
.. note::
Optionally, log the result of the simulation.
:param t_end: Simulation duration.
:param q_init: Initial configuration (by robot if it is a dictionnary).
:param v_init: Initial velocity (by robot if it is a dictionnary).
:param a_init: Initial acceleration (by robot if it is a dictionnary).
It is only used by acceleration dependent sensors and
controllers, such as IMU and force sensors.
:param is_state_theoretical: In single robot simulations, whether the
initial state is associated with the
actual or theoretical model of the robot.
:param callback: Callable that can be specified to abort simulation. It
will be evaluated after every simulation step. Abort
if false is returned.
Optional: None by default.
:param log_path: Save log data to this location. Disable if None.
Note that the format extension '.data' is enforced.
Optional, disable by default.
:param show_progress_bar: Whether to display a progress bar during the
simulation. None to enable only if available.
Optional: None by default.
"""
# Handling of progress bar if requested
if show_progress_bar:
# Initialize the progress bar
self._pbar = tqdm(total=t_end, bar_format=(
"{percentage:3.0f}%|{bar}| {n:.2f}/{total_fmt} "
"[{elapsed}<{remaining}]"))
# Define callable responsible for updating the progress bar
def update_progress_bar() -> bool:
"""Update progress bar based on current simulation time.
"""
nonlocal self
if self._pbar is not None:
t = self.engine.stepper_state.t
self._pbar.update(t - self._pbar.n)
return True
# Hijack simulation callback to also update the progress bar
if callback is None:
callback = update_progress_bar
else:
def callback_wrapper(callback: Callable[[], bool]) -> bool:
"""Update progress bar based on current simulation time,
then call a given callback function.
"""
nonlocal update_progress_bar
return update_progress_bar() and callback()
callback = partial(callback_wrapper, self, callback)
# Run the simulation
err = None
try:
# Call base implementation
# Single-robot simulations with `np.ndarray`,
# `is_state_theoretical` is supported.
if isinstance(q_init, np.ndarray):
assert isinstance(v_init, np.ndarray)
assert a_init is None or isinstance(a_init, np.ndarray)
if is_state_theoretical is None:
is_state_theoretical = False
self.engine.simulate(
t_end, q_init, v_init, a_init, is_state_theoretical,
callback)
# Multi-robot simulations or single-robot simulations with
# dictionaries, `is_state_theoretical` is not supported.
else:
assert isinstance(v_init, dict)
assert a_init is None or isinstance(a_init, dict)
if is_state_theoretical is not None:
raise NotImplementedError(
"Optional argument 'is_state_theoretical' is only "
"supported for single-robot simulations.")
self.engine.simulate(t_end, q_init, v_init, a_init, callback)
except Exception as e: # pylint: disable=broad-exception-caught
err = e
# Make sure that the progress bar is properly closed
if show_progress_bar:
assert self._pbar is not None
self._pbar.close()
self._pbar = None
# Re-throw exception if not successful
if err is not None:
raise err
# Write log
if log_path is not None and self.engine.stepper_state.q:
# Log data would be available as long as the stepper state is
# initialized. It may be the case no matter if a simulation is
# actually running, since data are cleared at reset not at stop.
log_suffix = pathlib.Path(log_path).suffix[1:]
if log_suffix not in ("data", "hdf5"):
raise ValueError(
"Log format must be either '.data' or '.hdf5'.")
log_format = log_suffix if log_suffix != 'data' else 'binary'
self.engine.write_log(log_path, format=log_format)
[docs]
def render(self,
return_rgb_array: bool = False,
width: Optional[int] = None,
height: Optional[int] = None,
camera_pose: Optional[CameraPoseType] = None,
update_ground_profile: Optional[bool] = None,
**kwargs: Any) -> Optional[np.ndarray]:
"""Render the current state of the simulation. One can display it or
return an RGB array instead.
:param return_rgb_array: Whether to return the current frame as an rgb
array or render it directly.
:param width: Width of the returned RGB frame, if enabled.
:param height: Height of the returned RGB frame, if enabled.
:param camera_pose: Tuple position [X, Y, Z], rotation [Roll, Pitch,
Yaw], frame name/index specifying the absolute or
relative pose of the camera. `None` to disable.
Optional: `None` by default.
:param update_ground_profile: Whether to update the ground profile. It
must be called manually only if necessary
because it is costly.
Optional: True by default if no viewer
available, False otherwise.
:param kwargs: Extra keyword arguments to forward at `Viewer`
initialization.
:returns: Rendering as an RGB array (3D numpy array), if enabled, None
otherwise.
"""
# Handle default arguments
if update_ground_profile is None:
update_ground_profile = not self.is_viewer_available
# Close the current viewer backend if not suitable
if kwargs.get("backend", Viewer.backend) != Viewer.backend:
Viewer.close()
# Update viewer_kwargs with provided kwargs
viewer_kwargs: Dict[str, Any] = {**dict(
backend=(self.viewer or Viewer).backend,
delete_robot_on_close=True),
**self.viewer_kwargs,
**kwargs}
# Instantiate the robot and viewer client if necessary.
# A new dedicated scene and window will be created.
if not self.is_viewer_available:
# Make sure that the current viewers are properly closed if any
for viewer in self._viewers:
viewer.close()
self._viewers.clear()
# Create new viewer instances
for robot, robot_state in zip(
self.engine.robots, self.engine.robot_states):
# Create a single viewer instance
robot_name = (
robot.name or robot.pinocchio_model.name or "robot")
viewer = Viewer(
robot,
use_theoretical_model=False,
open_gui_if_parent=False,
**{"robot_name": robot_name, **viewer_kwargs})
assert viewer.backend is not None
self._viewers.append(viewer)
# Share the external force buffer of the viewer with the engine
if self.engine.is_simulation_running:
viewer.f_external = [*robot_state.f_external][1:]
if viewer.backend.startswith('panda3d'):
# Enable display of COM, DCM and contact markers by default
# if the robot has freeflyer.
if robot.has_freeflyer:
if "display_com" not in viewer_kwargs:
viewer.display_center_of_mass(True)
if "display_dcm" not in viewer_kwargs:
viewer.display_capture_point(True)
if "display_contacts" not in viewer_kwargs:
viewer.display_contact_forces(True)
# Enable display of external forces by default only for
# the joints having an external force registered to it.
if "display_f_external" not in viewer_kwargs:
profile_forces = self.engine.profile_forces[robot.name]
force_frames = set(
robot.pinocchio_model.frames[f.frame_index].parent
for f in profile_forces)
impulse_forces = self.engine.impulse_forces[robot.name]
force_frames |= set(
robot.pinocchio_model.frames[f.frame_index].parent
for f in impulse_forces)
visibility = viewer._display_f_external
assert isinstance(visibility, list)
for i in force_frames:
visibility[i - 1] = True
viewer.display_external_forces(visibility)
# Initialize camera pose
assert self.viewer is not None
if viewer.is_backend_parent and camera_pose is None:
camera_pose = viewer_kwargs.get("camera_pose", (
(9.0, 0.0, 2e-5), (np.pi/2, 0.0, np.pi/2), None))
# Enable the ground profile is requested and available
assert self.viewer is not None and self.viewer.backend is not None
if update_ground_profile:
engine_options = self.engine.get_options()
ground_profile = engine_options["world"]["groundProfile"]
Viewer.update_floor(
ground_profile, simplify_mesh=True, show_vertices=False)
# Set the camera pose if requested
if camera_pose is not None:
self.viewer.set_camera_transform(*camera_pose)
# Make sure the graphical window is open if required
if not return_rgb_array and self.viewer.backend != "panda3d-sync":
Viewer.open_gui()
# Try refreshing the viewer
for viewer in self._viewers:
viewer.refresh()
# Compute and return rgb array if needed
if return_rgb_array:
return Viewer.capture_frame(
width or viewer_kwargs.get("width"),
height or viewer_kwargs.get("height"))
return None
[docs]
def replay(self,
extra_logs_files: Sequence[str] = (),
extra_trajectories: Sequence[Trajectory] = (),
**kwargs: Any) -> None:
"""Replay the current episode until now.
.. warning::
Method only supported for single-robot simulations.
:param kwargs: Extra keyword arguments for delegation to
`replay.play_trajectories` method.
"""
# Make sure that the simulation is single-robot
if len(self.engine.robots) > 1:
raise NotImplementedError(
"This method is only supported for single-robot simulations.")
# Close extra viewer instances if any
for viewer in self._viewers[1:]:
viewer.delete_robot_on_close = True
viewer.close()
# Extract log data and robot from extra log files
robots = [self.robot]
logs_data = [self.engine.log_data]
for log_file in extra_logs_files:
log_data = read_log(log_file)
robot = build_robot_from_log(
log_data, mesh_package_dirs=self.robot.mesh_package_dirs)
robots.append(robot)
logs_data.append(log_data)
# Extract trajectory data from pairs (robot, log)
trajectories: List[Trajectory] = []
update_hooks: List[
Optional[Callable[[float, np.ndarray, np.ndarray], None]]] = []
extra_kwargs: Dict[str, Any] = {}
for robot, log_data in zip(robots, logs_data):
if log_data:
traj, update_hook, _kwargs = \
extract_replay_data_from_log(log_data, robot)
trajectories.append(traj)
update_hooks.append(update_hook)
extra_kwargs.update(_kwargs)
trajectories += list(extra_trajectories)
update_hooks += [None for _ in extra_trajectories]
# Make sure there is something to replay
if not trajectories:
raise RuntimeError(
"Nothing to replay. Please run a simulation before calling "
"`replay` method, or provided data manually.")
# Make sure the viewer is instantiated before replaying
backend = (kwargs.get('backend', (self.viewer or Viewer).backend) or
get_default_backend())
must_not_open_gui = (
backend == "panda3d-sync" or
kwargs.get('record_video_path') is not None)
self.render(**{
'return_rgb_array': must_not_open_gui,
'update_floor': True,
**kwargs})
# Define sequence of viewer instances
viewers = [self.viewer, *[None for _ in trajectories[:-1]]]
# Replay the trajectories
self._viewers = play_trajectories(
trajectories,
update_hooks,
viewers=viewers,
**{'verbose': True,
**self.viewer_kwargs,
**extra_kwargs,
'display_f_external': None,
**kwargs})
[docs]
def close(self) -> None:
"""Close the connection with the renderer.
"""
if hasattr(self, "_viewers"):
for viewer in self._viewers:
viewer.close()
self._viewers.clear()
if hasattr(self, "figures"):
for figure in self._figures:
figure.close()
self._figures.clear()
[docs]
def plot(self,
enable_flexiblity_data: bool = False,
block: Optional[bool] = None,
**kwargs: Any) -> Union[TabbedFigure, Sequence[TabbedFigure]]:
"""Display common simulation data over time.
The figure features several tabs:
- Subplots with robot configuration
- Subplots with robot velocity
- Subplots with robot acceleration
- Subplots with motors torques
- Subplots with raw sensor data (one tab for each type of sensor)
:param enable_flexiblity_data:
Enable display of flexibility joints in robot's configuration,
velocity and acceleration subplots.
Optional: False by default.
:param block: Whether to wait for the figure to be closed before
returning.
Optional: False in interactive mode, True otherwise.
:param kwargs: Extra keyword arguments to forward to `TabbedFigure`.
"""
# Make sure plot submodule is available
try:
# pylint: disable=import-outside-toplevel
from .plot import plot_log
except ImportError as e:
raise ImportError(
"Method not available. Please install 'jiminy_py[plot]'."
) from e
# Create figure for each robot, without closing the existing one
figures = []
for robot in self.robots:
window_title = ".".join(filter(
None, (kwargs.get("window_title", "jiminy"), robot.name)))
figure = plot_log(
self.log_data, robot, enable_flexiblity_data, block, **{
**kwargs, "window_title": window_title})
figures.append(figure)
# Keep track of all figures that has been created so far
self._figures += figures
# Return only figures that has just been created
if len(self.robots) > 1:
return figures
return figures[0]
[docs]
def export_options(self, config_path: Union[str, os.PathLike]) -> None:
"""Export in a single configuration file all the options of the
simulator, ie the engine and all the robots.
.. note::
The generated configuration file can be imported thereafter using
`import_options` method.
:param config_path: Full path of the location where to store the
generated file. The extension '.toml' will be
enforced.
"""
config_path = pathlib.Path(config_path).with_suffix('.toml')
with open(config_path, 'w') as f:
toml.dump(
self.get_simulation_options(), f, toml.TomlNumpyEncoder())
[docs]
def import_options(self, config_path: Union[str, os.PathLike]) -> None:
"""Import all the options of the simulator at once, ie the engine
and all the robots.
.. note::
A full configuration file can be exported beforehand using
`export_options` method.
:param config_path: Full path of the configuration file to load.
"""
def deep_update(original: Dict[str, Any],
new_dict: Dict[str, Any],
*, _key_root: str = "") -> Dict[str, Any]:
"""Updates `original` dict with values from `new_dict` recursively.
If a new key should be introduced, then an error is thrown instead.
.. warning::
Modify `original` in place.
:param original: Dictionary with default values.
:param new_dict: Dictionary with values to be updated.
:param _key_root: Internal variable keeping track of current depth
within nested dict hierarchy.
:returns: Update dictionary.
"""
for key, value in new_dict.items():
key_root = "/".join((_key_root, key))
if key not in original:
raise ValueError(f"Key '{key_root}' not found")
if isinstance(value, dict):
deep_update(original[key], value, _key_root=key_root)
else:
original[key] = new_dict[key]
return original
options = deep_update(
self.get_simulation_options(), toml.load(str(config_path)))
self.set_simulation_options(options)