Generic

Generic gym environment specifically tailored to work with Jiminy Simulator as backend physics engine, and Jiminy Viewer as 3D visualizer. It implements the official OpenAI Gym API and extended it to add more functionalities.

class gym_jiminy.common.envs.generic.BaseJiminyEnv(simulator, step_dt, simulation_duration_max=86400.0, debug=False, render_mode=None, **kwargs)[source]

Bases: InterfaceJiminyEnv[ObsT, ActT], Generic[ObsT, ActT]

Base class to train an agent in Gym OpenAI using Jiminy simulator for physics computations.

It creates an Gym environment wrapping an already instantiated Jiminy simulator and behaves like any standard Gym environment.

The observation space is a dictionary gathering the current simulation time, the real robot state, and the sensors data. The action is a vector gathering the torques of the actuator of the robot.

There is no reward by default. It is up to the user to overload this class to implement one. It has been designed to be highly flexible and easy to customize by overloading it to fit the vast majority of users’ needs.

Parameters:
  • simulator (Simulator) – Jiminy Python simulator used for physics computations. It must be fully initialized.

  • step_dt (float) – Simulation timestep for learning. Note that it is independent from the controller and observation update periods. The latter are configured via engine.set_options.

  • simulation_duration_max (float) – Maximum duration of a simulation. If the current simulation time exceeds this threshold, then it will triggers is_truncated=True. It cannot exceed the maximum possible duration before telemetry log time overflow which is extremely large (about 30 years). Beware that log data are stored in RAM, which may cause out-of-memory error if the episode is lasting for too long without reset. Optional: About 4GB of log data assuming 5ms control update period and telemetry disabled for everything but the robot configuration.

  • debug (bool) – Whether the debug mode must be enabled. Doing it enables telemetry recording.

  • render_mode (str | None) – Desired rendering mode, ie “human” or “rgb_array”. If “human” is specified, calling render will open a graphical window for visualization, otherwise a rgb image is returned, as a 3D numpy array whose first dimension are the 3 red, green, blue channels and the two subsequent dimensions are the pixel height and weight respectively. None to select automatically the most appropriate mode based on the user-specified rendering backend if any, or the machine environment. Note that “rgb_array” does not require a graphical window manager. Optional: None by default.

  • kwargs (Any) – Extra keyword arguments that may be useful for derived environments with multiple inheritance, and to allow automatic pipeline wrapper generation.

simulator: Simulator
render_mode: str | None = None
stepper_state: jiminy.StepperState
is_simulation_running: npt.NDArray[np.bool_]
robot: jiminy.Robot
robot_state: jiminy.RobotState
sensor_measurements: SensorMeasurementStackMap
derived: InterfaceJiminyEnv

Top-most block from which this environment is part of when leveraging modular pipeline design capability.

property np_random: Generator

Returns the environment’s internal _np_random that if not set will initialise with a random seed.

Returns:

Instances of np.random.Generator

num_steps: npt.NDArray[np.int64]

Number of simulation steps that has been performed since last reset of the base environment.

Note

The counter is incremented before updating the observation at the end of the step, and consequently, before evaluating the reward and the termination conditions.

quantities: QuantityManager
observation: ObsT
action: ActT
_get_time_space()[source]

Get time space.

Return type:

Box

_get_agent_state_space(use_theoretical_model=False, ignore_velocity_limit=True)[source]

Get state space.

This method is not meant to be overloaded in general since the definition of the state space is mostly consensual. One must rather overload _initialize_observation_space to customize the observation space as a whole.

Parameters:
  • use_theoretical_model (bool) – Whether to compute the state space associated with the theoretical model instead of the extended simulation model.

  • ignore_velocity_limit (bool) – Whether to ignore the velocity bounds specified in model.

Return type:

Dict

_get_measurements_space()[source]

Get sensor space.

It gathers the sensors data in a dictionary. It maps each available type of sensor to the associated data matrix. Rows correspond to the sensor type’s fields, and columns correspond to each individual sensor.

Return type:

Dict

_initialize_action_space()[source]

Configure the action space of the environment.

The action is a vector gathering the torques of the actuator of the robot.

Warning

This method is called internally by reset method. It is not meant to be overloaded since the actual action space of the robot is uniquely defined.

Return type:

None

_initialize_seed(seed=None)[source]

Specify the seed of the environment.

Note

This method is not meant to be called manually.

Warning

It also resets the low-level jiminy Engine. Therefore one must call the reset method afterward.

Parameters:

seed (int | None) – Random seed, as a positive integer. Optional: A strongly random seed will be generated by gym if omitted.

Returns:

Updated seed of the environment

Return type:

None

register_variable(name, value, fieldnames=None, namespace=None)[source]

Register variable to the telemetry.

Warning

Variables are registered by reference. Consequently, the user is responsible to manage the lifetime of the data to prevent it from being garbage collected.

See also

See gym_jiminy.common.utils.register_variables for details.

Parameters:
  • name (str) – Base name of the variable. It will be used to prepend fields, using ‘.’ delimiter.

  • value (Mapping[str, StructNested[ValueT]] | Iterable[StructNested[ValueT]] | ndarray) – Variable to register. It supports any nested data structure whose leaves have type np.ndarray and either dtype np.float64 or np.int64.

  • fieldnames (str | Mapping[str, StructNested[ValueT]] | Iterable[StructNested[ValueT]] | None) – Nested fieldnames with the exact same data structure as the variable to register ‘value’. Individual elements of each leaf array must have its own fieldname, all gathered in a nested tuple with the same shape of the array. Optional: Generic fieldnames will be generated automatically.

  • namespace (str | None) – Namespace used to prepend the base name ‘name’, using ‘.’ delimiter. Empty string to disable. Optional: Disabled by default.

Return type:

None

property step_dt: float

Get timestep of a single ‘step’.

property is_training: bool

Check whether the environment is in ‘train’ or ‘eval’ mode.

property unwrapped: BaseJiminyEnv

The “underlying environment at the basis of the pipeline from which this environment is part of.

train()[source]

Sets the environment in training mode.

Return type:

None

eval()[source]

Sets the environment in evaluation mode.

This only has an effect on certain environments. It can be used for instance to enable clipping or filtering of the action at evaluation time specifically. See documentations of a given environment for details about their behaviors in training and evaluation modes.

Return type:

None

reset(*, seed=None, options=None)[source]

Reset the environment.

In practice, it resets the backend simulator and set the initial state of the robot. The initial state is obtained by calling ‘_sample_state’. This method is also in charge of setting the initial action (at the beginning) and observation (at the end).

Warning

It starts the simulation immediately. As a result, it is not possible to change the robot (included options), nor to register log variable.

Parameters:
  • seed (int | None) – Random seed, as a positive integer. Optional: A strongly random seed will be generated by gym if omitted.

  • options (Dict[str, Any] | None) – Additional information to specify how the environment is reset. The field ‘reset_hook’ is reserved for chaining multiple BasePipelineWrapper. It is not meant to be defined manually. Optional: None by default.

Returns:

Initial observation of the episode and some auxiliary information for debugging or monitoring purpose.

Return type:

Tuple[Mapping[str, StructNested[ValueT]] | Iterable[StructNested[ValueT]] | ndarray, Dict[str, Any]]

close()[source]

Clean up the environment after the user has finished using it.

It terminates the Python Jiminy engine.

Warning

Calling reset or step afterward is an undefined behavior.

Return type:

None

step(action)[source]

Integration timestep of the environment’s dynamics under prescribed agent’s action over a single timestep, i.e. collect one transition step of the underlying Markov Decision Process of the learning problem.

Warning

When reaching the end of an episode (terminated or truncated), it is necessary to reset the environment for starting a new episode before being able to do steps once again.

Parameters:

action (ActT) – Action performed by the agent during the ongoing step.

Returns:

  • observation (ObsType) - The next observation of the environment’s as a result of taking agent’s action.

  • reward (float): the reward associated with the transition step.

  • terminated (bool): Whether the agent reaches the terminal state, which means success or failure depending of the MDP of the task.

  • truncated (bool): Whether some truncation condition outside the scope of the MDP is satisfied. This can be used to end an episode prematurely before a terminal state is reached, for instance if the agent’s state is going out of bounds.

  • info (dict): Contains auxiliary information that may be helpful for debugging and monitoring. This might, for instance, contain: metrics that describe the agent’s performance state, variables that are hidden from observations, or individual reward terms from which the total reward is derived.

Return type:

Tuple[Mapping[str, StructNested[ValueT]] | Iterable[StructNested[ValueT]] | ndarray, SupportsFloat, bool, bool, Dict[str, Any]]

render()[source]

Render the agent in its environment.

Changed in version This: method does not take input arguments anymore due to changes of the official gym.Wrapper API. A workaround is to set simulator.viewer_kwargs beforehand. Alternatively, it is possible to call the low-level implementation simulator.render directly to avoid API restrictions.

Returns:

RGB array if ‘render_mode’ is ‘rgb_array’, None otherwise.

Return type:

RenderFrame | List[RenderFrame] | None

plot(enable_block_states=False, **kwargs)[source]

Display common simulation data and action over time.

Parameters:
  • enable_block_states (bool) – Whether to display the internal state of all blocks.

  • kwargs (Any) – Extra keyword arguments to forward to simulator.plot.

Return type:

TabbedFigure

replay(**kwargs)[source]

Replay the current episode until now.

Parameters:

kwargs (Any) – Extra keyword arguments for delegation to replay.play_trajectories method.

Return type:

None

play_interactive(enable_travelling=None, start_paused=True, enable_is_done=True, verbose=True, **kwargs)[source]

Activate interact mode enabling to control the robot using keyboard.

It stops automatically as soon as terminated or truncated is True. One has to press a key to start the interaction. If no key is pressed, the action is not updated and the previous one keeps being sent to the robot.

Warning

It ignores any external gym.Wrapper that may be used for training but are not considered part of the environment pipeline.

Warning

This method requires _key_to_action method to be implemented by the user by overloading it, otherwise it raises an exception.

Parameters:
  • enable_travelling (bool | None) – Whether enable travelling, following the motion of the root frame of the model. This parameter is ignored if the model has no freeflyer. Optional: Enabled by default iif ‘panda3d’ viewer backend is used.

  • start_paused (bool) – Whether to start in pause. Optional: Enabled by default.

  • verbose (bool) – Whether to display status messages.

  • kwargs (Any) – Extra keyword arguments to forward to _key_to_action method.

  • enable_is_done (bool)

Return type:

None

evaluate(policy_fn, seed=None, horizon=None, enable_stats=True, enable_replay=None, **kwargs)[source]

Evaluate a policy on the environment over a complete episode.

Warning

It ignores any external gym.Wrapper that may be used for training but are not considered part of the environment pipeline.

Parameters:
  • policy_fn (Callable[[Mapping[str, StructNested[ValueT]] | Iterable[StructNested[ValueT]] | ndarray, float | None, bool, Dict[str, Any]], ActT]) – Policy to evaluate as a callback function. It must have the following signature (**rew** = None at reset):

    policy_fn(obs: DataNested,
    reward: Optional[float],
    done_or_truncated: bool,
    info: InfoType
    ) -> ActT # action

  • seed (int | None) – Seed of the environment to be used for the evaluation of the policy. Optional: Random seed if not provided.

  • horizon (int | None) – Horizon of the simulation, namely maximum number of steps before termination. None to disable. Optional: Disabled by default.

  • enable_stats (bool) – Whether to print high-level statistics after the simulation. Optional: Enabled by default.

  • enable_replay (bool | None) – Whether to enable replay of the simulation, and eventually recording if the extra keyword argument record_video_path is provided. Optional: Enabled by default if display is available, disabled otherwise.

  • kwargs (Any) – Extra keyword arguments to forward to the replay method if replay is requested.

Return type:

List[Dict[str, Any]]

_setup()[source]

Configure the environment. It must guarantee that its internal state is valid after calling this method.

By default, it enforces some options of the engine.

Warning

Beware this method is called BEFORE observe_dt and controller_dt are properly set, so one cannot rely on it at this point. Yet, step_dt is available and should always be. One can still access the low-level controller update period through engine_options[‘stepper’][‘controllerUpdatePeriod’].

Note

The user must overload this method to enforce custom observer update period, otherwise it will be the same of the controller.

Note

This method is called internally by reset methods.

Return type:

None

_initialize_observation_space()[source]

Configure the observation of the environment.

By default, the observation is a dictionary gathering the current simulation time, the real agent state, and the sensors data.

Note

This method is called internally by reset method at the very end, just before computing and returning the initial observation. This method, alongside refresh_observation, must be overwritten in order to define a custom observation space.

Return type:

None

_neutral()[source]

Returns a neutral valid configuration for the agent.

The default implementation returns the neutral configuration if valid, the “mean” configuration otherwise (right in the middle of the position lower and upper bounds).

Warning

Beware there is no guarantee for this configuration to be statically stable.

Note

This method is called internally by ‘_sample_state’ to generate the initial state. It can be overloaded to ensure static stability of the configuration.

Return type:

ndarray

_sample_state()[source]

Returns a randomized yet valid configuration and velocity for the robot.

The default implementation returns the neutral configuration and zero velocity.

Offsets are applied on the freeflyer to ensure no contact points are going through the ground and up to three are in contact.

Note

This method is called internally by reset to generate the initial state. It can be overloaded to act as a random state generator.

Return type:

Tuple[ndarray, ndarray]

_initialize_buffers()[source]

Initialize internal buffers for fast access to shared memory or to avoid redundant computations.

Note

This method is called at every reset, right after self.simulator.start. At this point, the simulation is running but refresh_observation has never been called, so that it can be used to initialize buffers involving the engine state but required to refresh the observation.

Note

Buffers requiring manual update must be refreshed using _refresh_buffers method.

Warning

This method is not appropriate for initializing buffers involved in compute_command. At the time being, there is no better way that taking advantage of the flag self.is_simulation_running in the method compute_command itself.

Return type:

None

_refresh_buffers()[source]

Refresh internal buffers that must be updated manually.

Note

This method is called after every internal engine.step and before refreshing the observation one last time. As such, it is the right place to update shared data between has_terminated and compute_reward. However, it is not appropriate for quantities involved in refresh_observation not compute_command, which may be called more often than once per step.

Note

_initialize_buffers method can be used to initialize buffers that may requires special care.

Warning

Be careful when using this method to update buffers involved in refresh_observation. The latter is called at self.observe_dt update period, while this method is called at self.step_dt update period. self.observe_dt is likely to be different from self.step_dt, unless configured manually when overloading _setup method.

Return type:

None

refresh_observation(measurement)[source]

Compute the observation based on the current state of the robot.

In practice, it updates the internal buffer directly for the sake of efficiency.

By default, it sets the observation to the value of the measurement, which would not work unless ObsT corresponds to EngineObsType.

Note

This method is called and the end of every low-level Engine.step.

Warning

This method may be called without any simulation running, either to perform basic consistency checking or allocate and initialize buffers. There is no way at the time being to distinguish the initialization stage in particular. A workaround consists in checking whether the simulation already started. It is not exactly the same but it does the job regarding preserving efficiency.

Warning

One must only rely on measurement to get the state of the robot, as anything else is not reliable for this. More specifically, self.robot_state would not be valid if an adaptive stepper is being used for physics integration.

Parameters:

measurement (EngineObsType)

Return type:

None

compute_command(action, command)[source]

Compute the motors efforts to apply on the robot.

By default, all it does is forwarding the input action as is, without performing any processing. One is responsible of overloading this method if the action space has been customized, or just to clip the action to make sure it is never out-of-bounds if necessary.

Warning

There is not good place to initialize buffers that are necessary to compute the command. The only solution for now is to define initialization inside this method itself, using the safeguard if not self.is_simulation_running:.

Parameters:
  • action (ActT) – High-level target to achieve by means of the command.

  • command (ndarray)

Return type:

None

_abc_impl = <_abc._abc_data object>
_controller_handle(t, q, v, sensor_measurements, command)

Thin wrapper around user-specified refresh_observation and compute_command methods.

Note

The internal cache of managed quantities is cleared right away systematically, before anything else.

Warning

This method is not supposed to be called manually nor overloaded. It will be used by the base environment to instantiate a jiminy.FunctionalController responsible for both refreshing observations and compute commands of all the way through a given pipeline in the correct order of the blocks to finally sends command motor torques directly to the robot.

Parameters:
  • t (float) – Current simulation time.

  • q (ndarray) – Current extended configuration vector of the robot.

  • v (ndarray) – Current actual velocity vector of the robot.

  • sensor_measurements (SensorMeasurementTree) – Current sensor measurements.

  • command (ndarray) – Output argument corresponding to motors torques to apply on the robot. It must be updated by reference using [:] or np.copyto.

Returns:

Motors torques to apply on the robot.

Return type:

None

_is_protocol = False
_np_random: np.random.Generator | None = None
_observer_handle(t, q, v, sensor_measurements)

Thin wrapper around user-specified refresh_observation method.

Warning

This method is not supposed to be called manually nor overloaded.

Parameters:
  • t (float) – Current simulation time.

  • q (ndarray) – Current extended configuration vector of the robot.

  • v (ndarray) – Current extended velocity vector of the robot.

  • sensor_measurements (SensorMeasurementTree) – Current sensor data.

Return type:

None

compute_reward(terminated, info)

Compute the reward related to a specific control block, plus extra information that may be helpful for monitoring or debugging purposes.

For the corresponding MDP to be stationary, the computation of the reward is supposed to involve only the transition from previous to current state of the simulation (possibly comprising multiple agents) under the ongoing action.

By default, it returns 0.0 without extra information no matter what. The user is expected to provide an appropriate reward on its own, either by overloading this method or by wrapping the environment with ComposedJiminyEnv for modular environment pipeline design.

Parameters:
  • terminated (bool) – Whether the episode has reached the terminal state of the MDP at the current step. This flag can be used to compute a specific terminal reward.

  • info (Dict[str, Any]) – Dictionary of extra information for monitoring.

Returns:

Aggregated reward for the current step.

Return type:

float

control_dt: float = -1
has_terminated(info)[source]

Determine whether the episode is over, because a terminal state of the underlying MDP has been reached or an aborting condition outside the scope of the MDP has been triggered.

By default, it always returns terminated=False, and truncated=True iif the observation is out-of-bounds. One can overload this method to implement custom termination conditions for the environment at hands.

Warning

No matter what, truncation will happen when reaching the maximum simulation duration, i.e. ‘self.simulation_duration_max’.

Note

This method is called after refresh_observation, so that the internal buffer ‘observation’ is up-to-date.

Parameters:

info (Dict[str, Any]) – Dictionary of extra information for monitoring.

Returns:

terminated and truncated flags.

Return type:

Tuple[bool, bool]

metadata: Dict[str, Any] = {'render_modes': ['rgb_array']}
observe_dt: float = -1
reward_range = (-inf, inf)
spec: EnvSpec | None = None
stop()

Stop the episode immediately without waiting for a termination or truncation condition to be satisfied.

Note

This method is mainly intended for data analysis and debugging. Stopping the episode is necessary to log the final state, otherwise it will be missing from plots and viewer replay. Moreover, sensor data will not be available during replay using object-oriented method replay. Helper method play_logs_data must be preferred to replay an episode that cannot be stopped at the time being.

Return type:

None

observation_space: gym.Space
action_space: gym.Space
_key_to_action(key, obs, reward, **kwargs)[source]

Mapping from input keyboard keys to actions.

Note

This method is called before step method systematically, even if not key has been pressed, or reward is not defined. In such a case, the value is None.

Note

The mapping can be state dependent, and the key can be used for something different than computing the action directly. For instance, one can provide as extra argument to this method a custom policy taking user parameters mapped to keyboard in input.

Warning

Overloading this method is required for calling play_interactive method.

Parameters:
  • key (str) – Key pressed by the user as a string. None if no key has been pressed since the last step of the environment.

  • obs (ObsT) – Previous observation from last step of the environment. It is always available, included right after reset.

  • reward (float | None) – Previous reward from last step of the environment. Not available before first step right after reset.

  • kwargs (Any) – Extra keyword argument provided by the user when calling play_interactive method.

Returns:

Action to forward to the environment.

Return type:

ActT | None