"""Generic gym environment specifically tailored to work with Jiminy Simulator
as backend physics engine, and Jiminy Viewer as 3D visualizer. It implements
the official OpenAI Gym API and extended it to add more functionalities.
"""
import os
import math
import weakref
import logging
import tempfile
from collections import OrderedDict
from collections.abc import Mapping, Sequence
from traceback import TracebackException
from functools import partial
from typing import (
Dict, Any, List, cast, no_type_check, Optional, Tuple, Callable, Union,
SupportsFloat, Iterator, Generic, Sequence as SequenceT,
Mapping as MappingT, MutableMapping as MutableMappingT)
import numpy as np
from gymnasium import spaces
from gymnasium.core import RenderFrame
import jiminy_py.core as jiminy
from jiminy_py import tree
from jiminy_py.core import array_copyto # pylint: disable=no-name-in-module
from jiminy_py.dynamics import compute_freeflyer_state_from_fixed_body
from jiminy_py.log import extract_variables_from_log
from jiminy_py.simulator import Simulator, TabbedFigure
from jiminy_py.viewer.viewer import (DEFAULT_CAMERA_XYZRPY_REL,
interactive_mode,
get_default_backend,
Viewer)
from jiminy_py.viewer.replay import viewer_lock
import pinocchio as pin
from ..bases import (DT_EPS,
Obs,
Act,
InfoType,
EngineObsType,
PolicyCallbackFun,
InterfaceJiminyEnv)
from ..quantities import QuantityManager
from ..utils import (FieldNested,
DataNested,
zeros,
is_nan,
fill,
build_clip,
build_copyto,
build_contains,
get_fieldnames,
get_robot_state_space,
get_robot_measurements_space,
register_variables)
from .internal import loop_interactive
# Maximum realtime slowdown of simulation steps before triggering timeout error
TIMEOUT_RATIO = 25
# Absolute tolerance when checking that observations are valid.
# Note that the joint positions are out-of-bounds when hitting the mechanical
# stops. Because of this, some tolerance must be added to avoid trigeering
# termination too easily.
OBS_CONTAINS_TOL = 0.01
LOGGER = logging.getLogger(__name__)
class _LazyDictItemFilter(Mapping):
def __init__(self,
dict_packed: MappingT[str, SequenceT[Any]],
item_index: int) -> None:
self.dict_packed = dict_packed
self.item_index = item_index
def __getitem__(self, name: str) -> Any:
return self.dict_packed[name][self.item_index]
def __iter__(self) -> Iterator[str]:
return iter(self.dict_packed)
def __len__(self) -> int:
return len(self.dict_packed)
[docs]
class BaseJiminyEnv(InterfaceJiminyEnv[Obs, Act],
Generic[Obs, Act]):
"""Base class to train an agent in Gym OpenAI using Jiminy simulator for
physics computations.
It creates an Gym environment wrapping an already instantiated Jiminy
simulator and behaves like any standard Gym environment.
The observation space is a dictionary gathering the current simulation
time, the real robot state, and the sensors data. The action is a vector
gathering the torques of the actuator of the robot.
There is no reward by default. It is up to the user to overload this class
to implement one. It has been designed to be highly flexible and easy to
customize by overloading it to fit the vast majority of users' needs.
.. note::
In evaluation or debug modes, log files of the simulations are
automatically written in the default temporary file directory of the
system. Writing is triggered by calling `stop` manually or upon reset,
right before starting the next episode. The path of the lof file
assocciated with a given is stored under key `log_path` of the extra
`info` output when calling `reset`. The user is responsible for
deleting manually old log files if necessary.
"""
derived: InterfaceJiminyEnv
"""Top-most block from which this environment is part of when leveraging
modular pipeline design capability.
"""
def __init__(self,
simulator: Simulator,
step_dt: float,
simulation_duration_max: float = 86400.0,
debug: bool = False,
render_mode: Optional[str] = None,
**kwargs: Any) -> None:
r"""
:param simulator: Jiminy Python simulator used for physics
computations. It must be fully initialized.
:param step_dt: Environment timestep for learning. Note that it is
independent from the controller and observation update
periods. The latter are configured via
`engine.set_options`.
:param simulation_duration_max:
Maximum duration of a simulation. If the current simulation time
exceeds this threshold, then it will triggers `is_truncated=True`.
It cannot exceed the maximum possible duration before telemetry
log time overflow which is extremely large (about 30 years). Beware
that log data are stored in RAM, which may cause out-of-memory
error if the episode is lasting for too long without reset.
Optional: About 4GB of log data assuming 5ms control update period
and telemetry disabled for everything but the robot configuration.
:param render_mode: Desired rendering mode, ie "human" or "rgb_array".
If "human" is specified, calling `render` will open
a graphical window for visualization, otherwise a
rgb image is returned, as a 3D numpy array whose
first dimension are the 3 red, green, blue channels
and the two subsequent dimensions are the pixel
height and weight respectively. `None` to select
automatically the most appropriate mode based on
the user-specified rendering backend if any, or the
machine environment. Note that "rgb_array" does not
require a graphical window manager.
Optional: None by default.
:param debug: Whether the debug mode must be enabled. Doing it enables
telemetry recording.
:param kwargs: Extra keyword arguments that may be useful for derived
environments with multiple inheritance, and to allow
automatic pipeline wrapper generation.
"""
# Make sure that the simulator is single-robot
if len(simulator.robots) > 1:
raise NotImplementedError(
"Multi-robot simulation is not supported for now.")
# Handling of default rendering mode
viewer_backend = (
(simulator.viewer or Viewer).backend or
simulator.viewer_kwargs.get('backend'))
if render_mode is None:
# 'rgb_array' by default if the backend is or will be
# 'panda3d-sync', otherwise 'human' if available.
backend = viewer_backend or get_default_backend()
if backend == "panda3d-sync":
render_mode = 'rgb_array'
elif 'human' in self.metadata['render_modes']:
render_mode = 'human'
else:
render_mode = 'rgb_array'
# Force backend if none is specified and rendering mode is RGB array
if ("backend" not in simulator.viewer_kwargs and
render_mode == 'rgb_array'):
simulator.viewer_kwargs['backend'] = "panda3d-sync"
# Make sure that the robot name is unique
simulator.viewer_kwargs['robot_name'] = None
# Make sure that rendering mode is valid
assert render_mode in self.metadata['render_modes']
# Backup some user arguments
self.simulator = simulator
self.simulation_duration_max = simulation_duration_max
self._step_dt = step_dt
self.render_mode = render_mode
self.debug = debug
# Define some proxies for fast access.
# Note that some of them will be initialized in `_setup` method.
self.engine: jiminy.Engine = self.simulator.engine
self.stepper_state = self.simulator.stepper_state
self.is_simulation_running = self.simulator.is_simulation_running
self.robot = self.simulator.robot
self.robot_state = self.simulator.robot_state
self._robot_state_q = np.array([])
self._robot_state_v = np.array([])
self._robot_state_a = np.array([])
self._sensor_measurements = self.robot.sensor_measurements
# Top-most block of the pipeline is the environment itself by default
self.derived = self
# Store references to the variables to register to the telemetry
self._registered_variables: MutableMappingT[
str, Tuple[FieldNested, DataNested, bool]] = {}
self.log_fieldnames = _LazyDictItemFilter(
self._registered_variables, 0)
# Random number generator.
# This is used for generating random observations and actions, sampling
# the initial state of the robot, and domain randomization.
self.np_random = np.random.Generator(np.random.SFC64())
# Log of the "ongoing" simulation in debug and evaluation mode
self.log_path: Optional[str] = None
# Original simulation options of the ongoing episode before partially
# overwriting it.
self._simu_options_orig: Optional[Dict[str, Any]] = None
# Whether training mode is active, as opposed to evaluation mode
self._is_training = True
# Whether play interactive mode is active
self._is_interactive = False
# Information about the learning process
self._info: InfoType = {}
# Number of simulation steps performed
self.num_steps = np.array(-1, dtype=np.int64)
self._num_steps_beyond_terminate: Optional[int] = None
# Initialize a quantity manager for later use
self.quantities = QuantityManager(self)
# Initialize the interfaces through multiple inheritance
super().__init__() # Do not forward extra arguments, if any
# Initialize the seed of the environment
self._initialize_seed()
# Check that the action and observation spaces are consistent with
# 'compute_command' and 'refresh_observation' respectively.
cls = type(self)
is_action_space_custom = (
BaseJiminyEnv._initialize_action_space is not
cls._initialize_action_space)
if (BaseJiminyEnv.compute_command is cls.compute_command and
is_action_space_custom):
raise NotImplementedError(
"`BaseJiminyEnv.compute_command` must be overloaded when "
"defining a custom action space.")
is_observation_space_custom = (
BaseJiminyEnv._initialize_observation_space is not
cls._initialize_observation_space)
if (BaseJiminyEnv.refresh_observation is cls.refresh_observation and
is_observation_space_custom):
raise NotImplementedError(
"`BaseJiminyEnv.refresh_observation` must be overloaded when "
"defining a custom observation space.")
# Initialize the observation and action buffers if necessary
if not is_observation_space_custom:
# Bind the observation to the engine measurements by default
self.observation = cast(Obs, self.measurement)
else:
self.observation: Obs = zeros(self.observation_space)
if not hasattr(self, "action"):
self.action: Act = zeros(self.action_space)
# Define specialized operators for efficiency.
# Note that a partial view of observation corresponding to measurement
# must be extracted since only this one must be updated during refresh.
self._copyto_action = build_copyto(self.action)
self._contains_action = build_contains(self.action, self.action_space)
reduced_obs_space: spaces.Space[DataNested] = self.observation_space
if not is_observation_space_custom:
# Note that time is removed from the observation space because it
# will be checked independently.
assert isinstance(reduced_obs_space, spaces.Dict)
reduced_obs_space = spaces.Dict([
(key, value)
for key, value in reduced_obs_space.items()
if key != 't'])
self._contains_observation = build_contains(
self.observation, reduced_obs_space, tol_rel=OBS_CONTAINS_TOL)
self._get_clipped_env_observation: Callable[[], DataNested] = (
OrderedDict)
# Set robot in neutral configuration
q = self._neutral()
pin.framesForwardKinematics(
self.robot.pinocchio_model, self.robot.pinocchio_data, q)
# Configure the default camera pose if not already done
if "camera_pose" not in self.simulator.viewer_kwargs:
if self.robot.has_freeflyer:
# Get root frame name.
# The first and second frames are respectively "universe" no
# matter if the robot has a freeflyer or not, and the second
# one is the freeflyer joint "root_joint" if any.
root_name = self.robot.pinocchio_model.names[1]
# Relative camera pose wrt the root frame by default
self.simulator.viewer_kwargs["camera_pose"] = (
*DEFAULT_CAMERA_XYZRPY_REL, root_name)
else:
# Absolute camera pose by default
self.simulator.viewer_kwargs["camera_pose"] = (
(0.0, 7.0, 0.0), (np.pi/2, 0.0, np.pi), None)
# Register the action to the telemetry automatically iif there is
# exactly one scalar action per motor.
if isinstance(self.action, np.ndarray):
action_size = self.action.size
if action_size > 0 and action_size == self.robot.nmotors:
action_fieldnames = [
".".join(('action', motor.name))
for motor in self.robot.motors]
self.register_variable(
'action', self.action, action_fieldnames)
def __del__(self) -> None:
try:
self.close()
except Exception: # pylint: disable=broad-except
# This method must not fail under any circumstances
pass
[docs]
def _initialize_action_space(self) -> None:
"""Configure the action space of the environment.
The action is a vector gathering the torques of the actuator of the
robot.
.. warning::
This method is called internally by `reset` method. It is not
meant to be overloaded since the actual action space of the
robot is uniquely defined.
"""
# Get effort limit
command_limit = np.array([
motor.effort_limit for motor in self.robot.motors])
# Set the action space
self.action_space = cast(spaces.Space[Act], spaces.Box(
low=-command_limit, high=command_limit, dtype=np.float64))
[docs]
def _initialize_seed(self, seed: Optional[int] = None) -> None:
"""Specify the seed of the environment.
.. note::
This method is not meant to be called manually.
.. warning::
It also resets the low-level jiminy Engine. Therefore one must call
the `reset` method afterward.
:param seed: Random seed, as a positive integer.
Optional: A strongly random seed will be generated by gym
if omitted.
:returns: Updated seed of the environment
"""
# Generate distinct sequences of 3 bytes uint32 seeds for the engine
# and environment.
engine_seed = np.random.SeedSequence(seed).generate_state(3)
np_seed = np.random.SeedSequence(engine_seed).generate_state(3)
# Re-initialize the low-level bit generator based on the provided seed
self.np_random.bit_generator.state = np.random.SFC64(np_seed).state
# Reset the seed of the action and observation spaces
obs_seed, act_seed = map(int, self.np_random.integers(
np.iinfo(int).max, size=(2,), dtype=int))
self.observation_space.seed(obs_seed)
self.action_space.seed(act_seed)
# Reset the seed of Jiminy Engine
self.simulator.seed(engine_seed)
[docs]
def register_variable(self,
name: str,
value: DataNested,
fieldnames: Optional[
Union[str, FieldNested]] = None,
namespace: Optional[str] = None,
*,
is_eval_only: bool = True) -> None:
"""Register variable to the telemetry.
.. warning::
Variables are registered by reference. Consequently, the user is
responsible to manage the lifetime of the data to prevent it from
being garbage collected.
.. seealso::
See `gym_jiminy.common.utils.register_variables` for details.
:param name: Base name of the variable. It will be used to prepend
fields, using '.' delimiter.
:param value: Variable to register. It supports any nested data
structure whose leaves have type `np.ndarray` and either
dtype `np.float64` or `np.int64`.
:param fieldnames: Nested fieldnames with the exact same data structure
as the variable to register 'value'. Individual
elements of each leaf array must have its own
fieldname, all gathered in a nested tuple with the
same shape of the array.
Optional: Generic fieldnames will be generated
automatically.
:param namespace: Namespace used to prepend the base name 'name', using
'.' delimiter. Empty string to disable.
Optional: Disabled by default.
:param is_eval_only: Whether to register the variable to the telemetry
only in evaluation mode.
Optional: True by default.
"""
# Create default fieldnames if not specified
if fieldnames is None:
fieldnames = get_fieldnames(value, name)
# Store string in a list
if isinstance(fieldnames, str):
fieldnames = [fieldnames]
# Prepend with namespace if requested
if namespace:
fieldnames = tree.map_structure(
lambda key: ".".join(filter(None, (namespace, key))),
fieldnames)
# Early return with a warning is fieldnames is empty
if not fieldnames:
LOGGER.warning("'value' or 'fieldnames' cannot be empty.")
return
# Check if variable can be registered successfully to the telemetry.
# Note that a dummy controller must be created to avoid using the
# actual one to keep control of when registration will take place.
register_variables(jiminy.BaseController(), fieldnames, value)
# Combine namespace and variable name if provided
name = ".".join(filter(None, (namespace, name)))
# Store the header and a reference to the variable if successful
self._registered_variables[name] = (fieldnames, value, is_eval_only)
[docs]
def set_wrapper_attr(self,
name: str,
value: Any,
*,
force: bool = True) -> None:
if force or hasattr(self, name):
setattr(self, name, value)
return
raise AttributeError(
f"'{type(self).__qualname__}' object has no attribute '{name}'")
@property
def viewer_kwargs(self) -> Dict[str, Any]:
"""Default keyword arguments for the instantiation of the viewer, e.g.
when `render` method is first called.
.. warning::
The default argument `backend` is ignored if a backend is already
up and running, even if no viewer has been instantiated for the
environment at hand in particular.
"""
return self.simulator.viewer_kwargs
@property
def step_dt(self) -> float:
return self._step_dt
@property
def unwrapped(self) -> "BaseJiminyEnv":
return self
# The idiom `@InterfaceJiminyEnv.training.getter` to overwrite only the
# property getter without altering the original setter is not properly
# supported by `pylint` nor `mypy`. As a workaround, the whole property is
# overridden and the setter is redefined.
@property
def training(self) -> bool:
return self._is_training
@training.setter
def training(self, mode: bool) -> None:
self.train(mode)
[docs]
def train(self, mode: bool = True) -> None:
if self.is_simulation_running:
raise RuntimeError(
"Switching between training and evaluation modes is forbidden "
"if a simulation is already running. Please call `stop` "
"method beforehand.")
self._is_training = mode
[docs]
def _update_pipeline(self, derived: Optional[InterfaceJiminyEnv]) -> None:
if self.derived is not self:
derived_old = self.derived
self.derived = self
derived_old._update_pipeline(None)
self.derived = derived or self
[docs]
def reset(self, # type: ignore[override]
*,
seed: Optional[int] = None,
options: Optional[Dict[str, Any]] = None,
) -> Tuple[DataNested, InfoType]:
"""Reset the environment.
In practice, it resets the backend simulator and set the initial state
of the robot. The initial state is obtained by calling '_sample_state'.
This method is also in charge of setting the initial action (at the
beginning) and observation (at the end).
.. warning::
It starts the simulation immediately. As a result, it is not
possible to change the robot (included options), nor to register
log variable.
:param seed: Random seed, as a positive integer.
Optional: `None` by default. If `None`, then the internal
random generator of the environment will be kept as-is,
without updating its seed.
:param options: Additional information to specify how the environment
is reset. The field 'reset_hook' is reserved for
chaining multiple `BasePipelineWrapper`. It is not
meant to be defined manually.
Optional: None by default.
:returns: Initial observation of the episode and some auxiliary
information for debugging or monitoring purpose.
"""
# Stop the episode if one is still running
self.stop()
# Create right away a new temporary log file if necessary
self.log_path = None
if self.debug or not self.training:
fd, self.log_path = tempfile.mkstemp(suffix=".data")
os.close(fd)
# Reset the seed if requested
if seed is not None:
self._initialize_seed(seed)
# Remove external forces, if any
self.simulator.remove_all_forces()
# Make sure the environment is properly setup.
# Note that the robot is not allowed to change after this point.
self._setup()
# Make sure the low-level engine has not changed,
# otherwise some proxies would be corrupted.
if self.engine is not self.simulator.engine:
raise RuntimeError(
"Changing the memory address of the low-level jiminy engine "
"is an undefined behavior.")
# Re-initialize some shared memories.
# It is necessary because the robot may have changed.
self.robot = self.simulator.robot
self.robot_state = self.simulator.robot_state
self._sensor_measurements = self.robot.sensor_measurements
# Reset action
fill(self.action, 0)
# Enforce the low-level controller.
# The robot may have changed, for example it could be randomly
# generated, which would corrupt the old controller. As a result, it is
# necessary to either instantiate a new low-level controller and to
# re-initialize the existing one by calling `controller.initialize`
# method BEFORE calling `reset` method because doing otherwise would
# cause a segfault.
self.robot.controller = None
# Reset the simulator.
# Do NOT remove all forces since it has already been done before, and
# because it would make it impossible to register forces in `_setup`.
self.simulator.reset(remove_all_forces=False)
# Reset some internal buffers
self.num_steps[()] = 0
self._num_steps_beyond_terminate = None
# Make sure that both the observer and the controller are running
# faster than the environment to which it is attached for the action to
# take effect and be observable. Moreover, their respective update
# period must be a divisor of the environment step for both
# computational efficiency and avoid breaking markovian assumption due
# to previous action having a direct effect on the next step.
control_update_ratio, observe_update_ratio = 0.0, 0.0
if self.observe_dt > 0.0:
observe_update_ratio = round(self.step_dt / self.observe_dt, 10)
assert round(observe_update_ratio) == observe_update_ratio, (
"Observer update period must be a divisor of environment "
"simulation timestep")
if self.control_dt > 0.0:
control_update_ratio = round(self.step_dt / self.control_dt, 10)
assert round(control_update_ratio) == control_update_ratio, (
"Controller update period must be a divisor of environment "
"simulation timestep")
# Initialize sensor measurements that are zero-ed at this point. This
# may be necessary for pre-compiling blocks before actually starting
# the simulation to avoid triggering timeout error. Indeed, some
# computations may require valid sensor data, such as normalized
# quaternion or non-zero linear acceleration.
q_init = self._neutral()
v_init, a_init, u_motor = (np.zeros(self.robot.nv),) * 3
f_external = [pin.Force.Zero(),] * self.robot.pinocchio_model.njoints
pin.framesForwardKinematics(
self.robot.pinocchio_model, self.robot.pinocchio_data, q_init)
self.robot.compute_sensor_measurements(
0.0, q_init, v_init, a_init, u_motor, f_external)
# Run the reset hook if any.
# Note that the reset hook must be called after `_setup` because it
# expects that the robot is not going to change anymore at this point.
# Similarly, the observer and controller update periods must be set.
reset_hook: Optional[Callable[[], InterfaceJiminyEnv]] = (
options or {}).get("reset_hook")
env: InterfaceJiminyEnv = self
if reset_hook is not None:
assert callable(reset_hook)
env_derived = reset_hook() or env
assert env_derived.unwrapped is self
env = env_derived
# Update the environment pipeline if necessary
if env is not self.derived:
env._update_pipeline(env)
# Sample the initial state
# Note that it is important to postpone initial state sampling to after
# calling `reset` for all the layers of the pipeline, as some of them
# may affect the base environment. Notably, by selecting the reference
# trajectory, which in turns, may be involve in the initial condition.
q_init, v_init = self._sample_state()
if not jiminy.is_position_valid(self.robot.pinocchio_model, q_init):
raise RuntimeError(
"The initial state provided by `_sample_state` is "
"inconsistent with the dimension or types of joints of the "
"model.")
# Re-initialize the quantity manager.
# Note that computation graph tracking is never reset automatically.
# It is the responsibility of the practitioner implementing a derived
# environment whenever it makes sense for its specific use-case.
self.quantities.reset(reset_tracking=False)
# Instantiate the actual controller.
# Note that a weak reference must be used to avoid circular reference.
self.robot.controller = jiminy.FunctionalController(
partial(type(env)._controller_handle, weakref.proxy(env)))
# Register user-specified variables to the telemetry in evaluation mode
is_eval = self.debug or not self.training
for header, value, is_eval_only in self._registered_variables.values():
if is_eval or not is_eval_only:
register_variables(self.robot.controller, header, value)
# Start the simulation
self.simulator.start(q_init, v_init)
# Refresh robot_state proxies.
# Note that it must be done here because memory is only allocated by
# the engine when starting a simulation.
self._robot_state_q = self.robot_state.q
self._robot_state_v = self.robot_state.v
self._robot_state_a = self.robot_state.a
# Initialize shared buffers
self._initialize_buffers()
# Update shared buffers
self._refresh_buffers()
# Clear cache and auto-refresh managed quantities
self.quantities.clear()
# Initialize the observation
env._observer_handle(
self.stepper_state.t,
self._robot_state_q,
self._robot_state_v,
self._sensor_measurements)
# Initialize specialized most-derived observation clipping operator
self._get_clipped_env_observation = build_clip(
env.observation, env.observation_space)
# Make sure the state is valid, otherwise there `refresh_observation`
# and `_initialize_observation_space` are probably inconsistent.
try:
obs = self._get_clipped_env_observation()
except (TypeError, ValueError) as e:
raise RuntimeError(
"The observation computed by `refresh_observation` is "
"inconsistent with the observation space defined by "
"`_initialize_observation_space` at initialization.") from e
# Make sure there is no 'nan' value in observation
for value in tree.flatten(obs):
if is_nan(value):
raise RuntimeError(
f"'nan' value found in observation ({obs}). Something "
"went wrong with `refresh_observation` method.")
# Reset the extra information buffer and store current log path in it
self._info.clear()
if self.log_path is not None:
self._info['log_path'] = self.log_path
# The simulation cannot be done before doing a single step.
if any(self.derived.has_terminated(self._info)):
raise RuntimeError(
"The simulation has already terminated at `reset`. Check the "
"implementation of `has_terminated` if overloaded.")
# Note that the viewer must be reset if available, otherwise it would
# keep using the old robot model for display, which must be avoided.
if self.simulator.is_viewer_available:
viewer = self.simulator.viewer
assert viewer is not None
viewer._setup(self.robot) # type: ignore[attr-defined]
if viewer.has_gui():
viewer.refresh()
return obs, tree.deepcopy(self._info)
[docs]
def close(self) -> None:
"""Clean up the environment after the user has finished using it.
It terminates the Python Jiminy engine.
.. warning::
Calling `reset` or `step` afterward is an undefined behavior.
"""
self.simulator.close()
[docs]
def step(self, # type: ignore[override]
action: Act
) -> Tuple[DataNested, SupportsFloat, bool, bool, InfoType]:
"""Integration timestep of the environment's dynamics under prescribed
agent's action over a single timestep, i.e. collect one transition step
of the underlying Markov Decision Process of the learning problem.
.. warning::
When reaching the end of an episode (``terminated or truncated``),
it is necessary to reset the environment for starting a new episode
before being able to do steps once again.
:param action: Action performed by the agent during the ongoing step.
:returns:
* observation (ObsType) - The next observation of the environment's
as a result of taking agent's action.
* reward (float): the reward associated with the transition step.
* terminated (bool): Whether the agent reaches the terminal state,
which means success or failure depending of the MDP of the task.
* truncated (bool): Whether some truncation condition outside the
scope of the MDP is satisfied. This can be used to end an episode
prematurely before a terminal state is reached, for instance if
the agent's state is going out of bounds.
* info (dict): Contains auxiliary information that may be helpful
for debugging and monitoring. This might, for instance, contain:
metrics that describe the agent's performance state, variables
that are hidden from observations, or individual reward terms
from which the total reward is derived.
"""
# Make sure a simulation is already running
if not self.is_simulation_running:
raise RuntimeError(
"No simulation running. Please call `reset` before `step`.")
# Update of the action to perform if relevant
if action is not self.action:
# Make sure the action is valid in debug mode
if self.debug:
for value in tree.flatten(action):
if is_nan(value):
raise RuntimeError(
f"'nan' value found in action ({action}).")
# Update the action
self._copyto_action(action)
# Try performing a single environment step
try:
self.simulator.step(self.step_dt)
except Exception:
# Stop the simulation before raising the exception
self.stop()
raise
# Make sure there is no 'nan' value in observation
if is_nan(self._robot_state_a):
raise RuntimeError(
"The acceleration of the system is 'nan'. Something went "
"wrong with jiminy engine.")
# Update number of (successful) steps
self.num_steps += 1
# Update shared buffers
self._refresh_buffers()
# Clear cache and auto-refresh managed quantities
self.quantities.clear()
# Update the observer at the end of the step.
# This is necessary because, internally, it is called at the beginning
# of the every integration steps, during the controller update.
self.derived._observer_handle(
self.stepper_state.t,
self._robot_state_q,
self._robot_state_v,
self._sensor_measurements)
# Reset the extra information buffer
self._info.clear()
# Check if the simulation is over.
# Note that 'truncated' is forced to True if the integration failed or
# if the maximum number of steps will be exceeded next step.
terminated, truncated = self.derived.has_terminated(self._info)
truncated |= (
not self.is_simulation_running or
self.stepper_state.t + DT_EPS > self.simulation_duration_max)
# Check if stepping after done and if it is an undefined behavior
if self._num_steps_beyond_terminate is None:
if terminated:
self._num_steps_beyond_terminate = 0
else:
if self.training and self._num_steps_beyond_terminate == 0:
LOGGER.error(
"Calling `step` after termination is an undefined "
"behavior, and as such, is strongly discouraged in "
"training mode. The reward will be forced to 'nan' from "
"now on. Please call `reset`.")
self._num_steps_beyond_terminate += 1
# Compute reward if not beyond termination
if self._num_steps_beyond_terminate:
reward = float('nan')
else:
# Compute reward and update extra information
reward = self.compute_reward(terminated, self._info)
# Make sure the reward is not 'nan'
if math.isnan(reward):
raise RuntimeError(
"The reward is 'nan'. Something went wrong with "
"`compute_reward` implementation.")
# Clip (and copy) the most derived observation before returning it
obs = self._get_clipped_env_observation()
return obs, reward, terminated, truncated, tree.deepcopy(self._info)
[docs]
def stop(self) -> None:
# Backup whether a simulation is still running at this point
was_simulation_running = bool(self.is_simulation_running)
# Stop the engine.
# This must be done BEFORE writing log, otherwise the final simulation
# state will be missing as it gets flushed after stopping.
self.simulator.stop()
# Write log of simulation if worth it and not already done
if was_simulation_running and (self.debug or not self.training):
if self.num_steps > 0:
self.simulator.write_log(self.log_path, format="binary")
else:
self.log_path = None
[docs]
def render(self) -> Optional[Union[RenderFrame, List[RenderFrame]]]:
"""Render the agent in its environment.
.. versionchanged::
This method does not take input arguments anymore due to changes of
the official `gym.Wrapper` API. A workaround is to set
`simulator.viewer_kwargs` beforehand. Alternatively, it is possible
to call the low-level implementation `simulator.render` directly to
avoid API restrictions.
:returns: RGB array if 'render_mode' is 'rgb_array', None otherwise.
"""
# Set the available rendering modes
viewer_backend = (self.simulator.viewer or Viewer).backend
if self.render_mode == 'human' and viewer_backend == "panda3d-sync":
Viewer.close()
# Call base implementation
return self.simulator.render( # type: ignore[return-value]
return_rgb_array=self.render_mode == 'rgb_array')
[docs]
def plot(self,
enable_block_states: bool = False,
**kwargs: Any) -> TabbedFigure:
"""Plot figures of simulation data over time associated with the
ongoing episode until now if any, the previous one otherwise.
.. Note:
It adds tabs for the base environment action plus all blocks
((state, action) for controllers and (state, features) for
observers) on top of original `Simulator.plot`.
:param enable_block_states: Whether to display the internal state of
all blocks.
:param kwargs: Extra keyword arguments to forward to `simulator.plot`.
"""
# Call base implementation
figure = self.simulator.plot(**kwargs)
assert not isinstance(figure, Sequence)
# Extract log data
log_vars = self.simulator.log_data.get("variables", {})
if not log_vars:
raise RuntimeError(
"Nothing to plot. Please run a simulation before calling "
"`plot` method.")
# Plot all registered variables from high-level to low-level blocks
for key, fieldnames in reversed(list(self.log_fieldnames.items())):
# Filter state if requested
if not enable_block_states and key.endswith(".state"):
continue
# Store fieldnames in dict systematically to avoid code duplication
if not isinstance(fieldnames, dict):
fieldnames = {"": fieldnames}
# Extract hierarchical time series.
# Fieldnames stored in a dictionary cannot be nested. In such a
# case, keys corresponds to subplots, and values are individual
# scalar data over time to be displayed to the same subplot.
t = log_vars["Global.Time"]
base_name = key.replace(".", " ")
for group, subfieldnames in fieldnames.items():
if not isinstance(subfieldnames, (list, tuple)):
LOGGER.error(
"Action space not supported by this method.")
return figure
tab_name = " ".join(filter(None, (base_name, group)))
value_map = extract_variables_from_log(
log_vars, subfieldnames, "controller", as_dict=True)
tab_data = {key.split(".", 2)[-1]: value
for key, value in value_map.items()}
grid_spec: Tuple[Optional[int], Optional[int]] = (None, None)
nrows = len(subfieldnames)
if nrows and isinstance(subfieldnames[0], (list, tuple)):
ncols_all = set(map(len, subfieldnames))
if len(ncols_all) == 1:
grid_spec = (nrows, next(iter(ncols_all)))
try:
figure.add_tab(tab_name,
t,
tab_data, # type: ignore[arg-type]
nrows=grid_spec[0],
ncols=grid_spec[1])
except ValueError:
LOGGER.error("Invalid plot spec for variable %s. Moving "
"to the next one", key)
# Return figure for convenience and consistency with Matplotlib
return figure
[docs]
def replay(self, **kwargs: Any) -> None:
"""Replay the ongoing episode until now if any, the previous one
otherwise.
:param kwargs: Extra keyword arguments to forward to
`jiminy_py.viewer.replay.play_trajectories`.
"""
# Do not open graphical window automatically if recording requested.
# Note that backend is closed automatically is there is no viewer
# backend available at this point, to reduce memory pressure, but it
# will take time to restart it systematically for every recordings.
if kwargs.get('record_video_path') is not None:
kwargs['close_backend'] = not self.simulator.is_viewer_available
# Stop any running simulation before replay if `has_terminated` is True
if self.is_simulation_running and any(self.derived.has_terminated({})):
self.stop()
with viewer_lock:
# Call render before replay in order to take into account custom
# backend viewer instantiation options, eg the initial camera pose,
# and to update the ground profile.
self.simulator.render(
update_ground_profile=True,
return_rgb_array="record_video_path" in kwargs.keys(),
**kwargs)
viewer_kwargs: Dict[str, Any] = {
'verbose': False,
'enable_travelling': self.robot.has_freeflyer,
**kwargs}
self.simulator.replay(**viewer_kwargs)
[docs]
def evaluate(self,
policy_fn: PolicyCallbackFun,
seed: Optional[int] = None,
horizon: Optional[float] = None,
enable_stats: bool = True,
enable_replay: Optional[bool] = None,
**kwargs: Any) -> Tuple[List[SupportsFloat], List[InfoType]]:
# Handling of default arguments
if enable_replay is None:
enable_replay = (
(Viewer.backend or get_default_backend()) != "panda3d-sync" or
interactive_mode() >= 2)
# Stop the episode if one is still running
env = self.derived
env.stop()
# Make sure evaluation mode is enabled
is_training = self.training
if is_training:
self.eval()
# Initialize the simulation
reward: Optional[SupportsFloat]
obs, info = env.reset(seed=seed)
action, reward, terminated, truncated = None, None, False, False
# Run the simulation
reward_episode: List[SupportsFloat] = []
info_episode = [info]
try:
while horizon is None or self.stepper_state.t < horizon:
action = policy_fn(
obs, action, reward, terminated, truncated, info)
if terminated or truncated:
# Break AFTER calling the policy callback if the episode is
# terminated or truncated, which gives the policy the
# opportunity to observe and record the final state.
break
obs, reward, terminated, truncated, info = env.step(action)
info_episode.append(info)
reward_episode.append(reward)
except KeyboardInterrupt:
pass
# Stop the simulation
env.stop()
# Restore training mode if it was enabled
if is_training:
self.train()
# Display some statistic if requested
if enable_stats:
print("env.num_steps:", self.num_steps)
print("cumulative reward:", sum(map(float, reward_episode)))
# Replay the result if requested
if enable_replay:
try:
self.replay(**kwargs)
except Exception as e: # pylint: disable=broad-except
# Do not fail because of replay/recording exception
traceback = TracebackException.from_exception(e)
LOGGER.warning(''.join(traceback.format()))
return reward_episode, info_episode
[docs]
def play_interactive(self,
enable_travelling: Optional[bool] = None,
start_paused: bool = True,
enable_is_done: bool = True,
verbose: bool = True,
**kwargs: Any) -> None:
# Stop the episode if one is still running
env = self.derived
env.stop()
# Enable play interactive flag and make sure training flag is disabled
is_training = self.training
self._is_interactive = True
if is_training:
self.eval()
# Make sure viewer gui is open, so that the viewer will shared external
# forces with the robot automatically.
viewer = self.simulator.viewer
if viewer is None or not viewer.has_gui():
self.simulator.render(update_ground_profile=False)
# Initialize the simulation
obs, _ = env.reset()
reward = None
# Refresh the ground profile
self.simulator.render(update_ground_profile=True)
viewer = self.simulator.viewer
assert viewer is not None # Assert(s) for type checker
# Enable travelling
if enable_travelling is None:
backend = viewer.backend
assert backend is not None # Assert(s) for type checker
enable_travelling = backend.startswith('panda3d')
enable_travelling = enable_travelling and self.robot.has_freeflyer
if enable_travelling:
tracked_frame = self.robot.pinocchio_model.frames[2].name
viewer.attach_camera(tracked_frame)
# Refresh the scene once again to update camera placement
self.render()
# Define interactive loop
def _interact(key: Optional[str] = None) -> bool:
nonlocal obs, reward, enable_is_done
action = self._key_to_action(
key, obs, reward, **{"verbose": verbose, **kwargs})
if action is None:
action = env.action
obs, reward, terminated, truncated, _ = env.step(action)
self.render()
if not enable_is_done:
if self.robot.has_freeflyer:
return self._robot_state_q[2] < 0.0
return False
return terminated or truncated
# Run interactive loop
loop_interactive(max_rate=self.step_dt,
start_paused=start_paused,
verbose=verbose)(_interact)()
# Disable travelling if it enabled
if enable_travelling:
viewer.detach_camera()
# Stop the simulation to unlock the robot.
# It will enable to display contact forces for replay.
env.stop()
# Disable play interactive mode flag and restore training flag
self._is_interactive = False
if is_training:
self.train()
# methods to override:
# ----------------------------
[docs]
def _setup(self) -> None:
"""Configure the environment. It must guarantee that its internal state
is valid after calling this method.
By default, it enforces some options of the engine.
.. warning::
Beware this method is called BEFORE `observe_dt` and
`controller_dt` are properly set, so one cannot rely on it at this
point. Yet, `step_dt` is available and should always be. One can
still access the low-level controller update period through
`engine_options['stepper']['controllerUpdatePeriod']`.
.. note::
The user must overload this method to enforce custom observer
update period, otherwise it will be the same of the controller.
.. note::
This method is called internally by `reset` methods.
"""
# Restore the original simulation options
if self._simu_options_orig is not None:
self.simulator.set_simulation_options(self._simu_options_orig)
# Call base implementation
super()._setup()
# Backup simulation options
self._simu_options_orig = self.simulator.get_simulation_options()
# Extract the observer/controller update period.
# The controller update period is used by default for the observer if
# it was not specify by the user in `_setup`.
engine_options = self.simulator.get_options()
self.control_dt = float(
engine_options['stepper']['controllerUpdatePeriod'])
self.observe_dt = self.control_dt
# Configure the low-level integrator
engine_options["stepper"]["iterMax"] = 0
if self.debug:
engine_options["stepper"]["verbose"] = True
# Set maximum computation time for single internal integration steps
engine_options["stepper"]["timeout"] = self.step_dt * TIMEOUT_RATIO
if self.debug:
engine_options["stepper"]["timeout"] = 0.0
# Enable full logging in debug and evaluation mode
if self.debug or not self.training:
# Enable all telemetry data at engine-level
telemetry_options = engine_options["telemetry"]
for key in telemetry_options.keys():
if key.startswith("enable"):
telemetry_options[key] = True
# Enable telemetry persistence.
# The visual and collision meshes will be stored in log file, so
# that the robot can be loaded on any machine with access to the
# original URDF and mesh files.
engine_options["telemetry"]["isPersistent"] = True
engine_options["telemetry"]["logInternalStepperSteps"] = True
# Enable all telemetry data at robot-level
robot_options = self.robot.get_options()
robot_telemetry_options = robot_options["telemetry"]
for key in robot_telemetry_options.keys():
robot_telemetry_options[key] = True
self.robot.set_options(robot_options)
# Update engine options
self.simulator.set_options(engine_options)
[docs]
def _initialize_observation_space(self) -> None:
"""Configure the observation of the environment.
By default, the observation is a dictionary gathering the current
simulation time, the real agent state, and the sensors data.
.. note::
This method is called internally by `reset` method at the very end,
just before computing and returning the initial observation. This
method, alongside `refresh_observation`, must be overwritten in
order to define a custom observation space.
"""
observation_spaces: Dict[str, spaces.Space] = OrderedDict()
observation_spaces['t'] = spaces.Box(
low=0.0,
high=self.simulation_duration_max,
shape=(),
dtype=np.float64)
observation_spaces['states'] = (
spaces.Dict(agent=get_robot_state_space(self.robot)))
observation_spaces['measurements'] = (
get_robot_measurements_space(self.robot, is_finite=False))
self.observation_space = cast(spaces.Space[Obs], spaces.Dict(
**observation_spaces)) # type: ignore[arg-type]
[docs]
def _neutral(self) -> np.ndarray:
"""Returns a neutral valid configuration for the agent.
The default implementation returns the neutral configuration if valid,
the "mean" configuration otherwise (right in the middle of the position
lower and upper bounds).
.. warning::
Beware there is no guarantee for this configuration to be
statically stable.
.. note::
This method is called internally by '_sample_state' to generate the
initial state. It can be overloaded to ensure static stability of
the configuration.
"""
# Get the neutral configuration of the actual model
q = pin.neutral(self.robot.pinocchio_model)
# Make sure it is not out-of-bounds before returning
position_limit_lower = self.robot.pinocchio_model.lowerPositionLimit
position_limit_upper = self.robot.pinocchio_model.upperPositionLimit
for idx, val in enumerate(q):
lo, hi = position_limit_lower[idx], position_limit_upper[idx]
if hi < val or val < lo:
q[idx] = 0.5 * (lo + hi)
return q
[docs]
def _sample_state(self) -> Tuple[np.ndarray, np.ndarray]:
"""Returns a randomized yet valid configuration and velocity for the
robot.
The default implementation returns the neutral configuration and zero
velocity.
Offsets are applied on the freeflyer to ensure no contact points are
going through the ground and up to three are in contact.
.. note::
This method is called internally by `reset` to generate the initial
state. It can be overloaded to act as a random state generator.
"""
# Get the neutral configuration
q = self._neutral()
# Make sure the configuration is not out-of-bound
q.clip(self.robot.pinocchio_model.lowerPositionLimit,
self.robot.pinocchio_model.upperPositionLimit,
out=q)
# Make sure the configuration is normalized
q = pin.normalize(self.robot.pinocchio_model, q)
# Make sure the robot impacts the ground
if self.robot.has_freeflyer:
engine_options = self.simulator.get_options()
ground_fun = engine_options['world']['groundProfile']
compute_freeflyer_state_from_fixed_body(
self.robot, q, ground_profile=ground_fun)
# Zero velocity
v = np.zeros(self.robot.pinocchio_model.nv)
return q, v
[docs]
def _initialize_buffers(self) -> None:
"""Initialize internal buffers for fast access to shared memory or to
avoid redundant computations.
.. note::
This method is called at every `reset`, right after
`self.simulator.start`. At this point, the simulation is running
but `refresh_observation` has never been called, so that it can be
used to initialize buffers involving the engine state but required
to refresh the observation.
.. note::
Buffers requiring manual update must be refreshed using
`_refresh_buffers` method.
.. warning::
This method is not appropriate for initializing buffers involved in
`compute_command`. At the time being, there is no better way that
taking advantage of the flag `self.is_simulation_running` in the
method `compute_command` itself.
"""
[docs]
def _refresh_buffers(self) -> None:
"""Refresh internal buffers that must be updated manually.
.. note::
This method is called after every internal `engine.step` and before
refreshing the observation one last time. As such, it is the right
place to update shared data between `has_terminated` and
`compute_reward`. However, it is not appropriate for quantities
involved in `refresh_observation` not `compute_command`, which may
be called more often than once per step.
.. note::
`_initialize_buffers` method can be used to initialize buffers that
may requires special care.
.. warning::
Be careful when using this method to update buffers involved in
`refresh_observation`. The latter is called at `self.observe_dt`
update period, while this method is called at `self.step_dt` update
period. `self.observe_dt` is likely to be different from
`self.step_dt`, unless configured manually when overloading
`_setup` method.
"""
[docs]
@no_type_check
def refresh_observation(self, measurement: EngineObsType) -> None:
"""Compute the observation based on the current state of the robot.
In practice, it updates the internal buffer directly for the sake of
efficiency.
By default, it sets the observation to the value of the measurement,
which would not work unless `Obs` corresponds to `EngineObsType`.
.. note::
This method is called and the end of every low-level `Engine.step`.
.. warning::
This method may be called without any simulation running, either
to perform basic consistency checking or allocate and initialize
buffers. There is no way at the time being to distinguish the
initialization stage in particular. A workaround consists in
checking whether the simulation already started. It is not exactly
the same but it does the job regarding preserving efficiency.
.. warning::
One must only rely on `measurement` to get the state of the robot,
as anything else is not reliable for this. More specifically,
`self.robot_state` would not be valid if an adaptive stepper is
being used for physics integration.
"""
[docs]
def compute_command(self, action: Act, command: np.ndarray) -> None:
"""Compute the motors efforts to apply on the robot.
By default, all it does is forwarding the input action as is, without
performing any processing. One is responsible of overloading this
method if the action space has been customized, or just to clip the
action to make sure it is never out-of-bounds if necessary.
.. warning::
There is not good place to initialize buffers that are necessary to
compute the command. The only solution for now is to define
initialization inside this method itself, using the safeguard
`if not self.is_simulation_running:`.
:param action: High-level target to achieve by means of the command.
"""
# Check if the action is out-of-bounds, in debug mode only
if self.debug and not self._contains_action():
LOGGER.warning("The action is out-of-bounds.")
assert isinstance(action, np.ndarray)
array_copyto(command, action)
[docs]
def has_terminated(self, info: InfoType) -> Tuple[bool, bool]:
"""Determine whether the episode is over, because a terminal state of
the underlying MDP has been reached or an aborting condition outside
the scope of the MDP has been triggered.
By default, it always returns `terminated=False`, and `truncated=True`
iif the observation is out-of-bounds. One can overload this method to
implement custom termination conditions for the environment at hand.
.. warning::
No matter what, truncation will happen when reaching the maximum
simulation duration, i.e. 'self.simulation_duration_max'.
.. note::
This method is called after `refresh_observation`, so that the
internal buffer 'observation' is up-to-date.
:param info: Dictionary of extra information for monitoring.
:returns: terminated and truncated flags.
"""
# Make sure that a simulation is running
if not self.is_simulation_running:
raise RuntimeError(
"No simulation running. Please start one before calling this "
"method.")
# Check if the observation is out-of-bounds
truncated = not self._contains_observation()
return False, truncated
[docs]
def _key_to_action(self,
key: Optional[str],
obs: Obs,
reward: Optional[float],
**kwargs: Any) -> Optional[Act]:
"""Mapping from input keyboard keys to actions.
.. note::
This method is called before `step` method systematically, even if
not key has been pressed, or reward is not defined. In such a case,
the value is `None`.
.. note::
The mapping can be state dependent, and the key can be used for
something different than computing the action directly. For
instance, one can provide as extra argument to this method a
custom policy taking user parameters mapped to keyboard in input.
.. warning::
Overloading this method is required for calling `play_interactive`
method.
:param key: Key pressed by the user as a string. `None` if no key has
been pressed since the last step of the environment.
:param obs: Previous observation from last step of the environment.
It is always available, included right after `reset`.
:param reward: Previous reward from last step of the environment.
Not available before first step right after `reset`.
:param kwargs: Extra keyword argument provided by the user when calling
`play_interactive` method.
:returns: Action to forward to the environment. None to hold the
previous action without updating it.
"""
raise NotImplementedError