Pipeline

This method gathers base implementations for blocks to be used in pipeline control design.

It implements:

  • the concept of block thats can be connected to a BaseJiminyEnv environment through multiple InterfaceJiminyEnv indirections

  • a base controller block, along with a concrete PD controller

  • a wrapper to combine a controller block and a BaseJiminyEnv environment, eventually already wrapped, so that it appears as a black-box environment.

class gym_jiminy.common.bases.pipeline.BasePipelineWrapper(env, **kwargs)[source]

Bases: InterfaceJiminyEnv[Obs, Act], Generic[Obs, Act, BaseObs, BaseAct]

Base class for wrapping a BaseJiminyEnv Gym environment so that it appears as a single, unified, environment. The environment may have been wrapped multiple times already.

If several successive blocks must be composed, just wrap each of them successively one by one.

Warning

Hot-plug of additional blocks is supported, but the environment need to be reset after changing the pipeline.

Warning

This architecture is not designed for trainable blocks, but rather for robotic-oriented controllers and observers, such as PID controllers, inverse kinematics, Model Predictive Control (MPC), sensor fusion… It is recommended to add the controllers and observers into the policy itself if they have to be trainable.

Parameters:
  • env (InterfaceJiminyEnv[BaseObs, BaseAct]) – Base or already wrapped jiminy environment.

  • kwargs (Any) – Extra keyword arguments for multiple inheritance.

env: InterfaceJiminyEnv[BaseObs, BaseAct]
get_wrapper_attr(name)[source]

Return the value of an attribute in the first layer of the pipeline environment for which it exists, from this wrapper to the base environment.

If the attribute does not exist in any layer, then an exception AttributeError is raised.

Parameters:
  • name (str) – Name of the attribute.

  • value – Desired value of the attribute.

Return type:

Any

set_wrapper_attr(name, value, *, force=True)[source]

Assign an attribute to a specified value in the first layer of the pipeline environment for which it already exists, from this wrapper to the base environment. If the attribute does not exist in any layer, it is directly added to this wrapper.

Parameters:
  • name (str) – Name of the attribute.

  • value (Any) – Desired value of the attribute.

  • force (bool)

Return type:

None

property render_mode: str | None

Rendering mode of the base environment.

property spec: EnvSpec | None

Random number generator of the base environment.

property np_random: Generator

Random number generator of the base environment.

property unwrapped: BaseJiminyEnv

The “underlying environment at the basis of the pipeline from which this environment is part of.

property step_dt: float

Get timestep of a single ‘step’.

property training: bool

Check whether the environment is in training or evaluation mode.

train(mode=True)[source]

Sets the environment in training or evaluation mode.

Parameters:

mode (bool) – Whether to set training (True) or evaluation mode (False). Optional: True by default.

Return type:

None

reset(*, seed=None, options=None)[source]

Reset the unified environment.

In practice, it resets the environment and initializes the generic pipeline internal buffers through the use of ‘controller_hook’.

Parameters:
  • seed (int | None) – Random seed, as a positive integer. Optional: None by default. If None, then the internal random generator of the environment will be kept as-is, without updating its seed.

  • options (Dict[str, Any] | None) – Additional information to specify how the environment is reset. The field ‘reset_hook’ is reserved for chaining multiple BasePipelineWrapper. It is not meant to be defined manually. Optional: None by default.

Return type:

Tuple[Mapping[str, StructNested[ValueT]] | Sequence[StructNested[ValueT]] | ndarray, Dict[str, Any]]

step(action)[source]

Run a simulation step for a given action.

Parameters:

action (Act) – Next action to perform. None to not update it.

Returns:

Next observation, reward, status of the episode (done or not), and a dictionary of extra information.

Return type:

Tuple[Mapping[str, StructNested[ValueT]] | Sequence[StructNested[ValueT]] | ndarray, SupportsFloat, bool, bool, Dict[str, Any]]

stop()[source]

Stop the underlying simulation completely.

Note

This method has nothing to do with termination and/or truncation of episodes. Calling it manually should never be necessary for collecting samples during training.

Note

This method is mainly intended for evaluation analysis and debugging. Stopping the episode is only necessary for switching between training and evaluation mode, and to log the final state, otherwise it will be missing from plots and viewer replay (see InterfaceJiminyEnv.log_data for details). Moreover, sensor data will not be available when calling replay. The helper method jiminy_py.viewer.replay.play_logs_data must be preferred to replay an episode that cannot be stopped.

Note

Resuming a simulation is not supported, which means that calling reset to start a new simulation is necessary prior to calling step once again. Falling to do so will trigger an exception.

Return type:

None

plot(enable_block_states=False, **kwargs)[source]

Plot figures of simulation data over time associated with the ongoing episode until now if any, the previous one otherwise.

Parameters:
  • kwargs (Any) – Implementation-specific extra keyword arguments if any.

  • enable_block_states (bool)

Return type:

TabbedFigure

replay(**kwargs)[source]

Replay the ongoing episode until now if any, the previous one otherwise.

Parameters:

kwargs (Any) – Implementation-specific extra keyword arguments if any.

Return type:

None

evaluate(policy_fn, seed=None, horizon=None, enable_stats=True, enable_replay=None, **kwargs)[source]

Evaluate a policy on the environment over a complete episode.

Warning

It ignores any top-level gym.Wrapper that may be used for training but are not considered part of the environment pipeline.

Parameters:
  • policy_fn (Callable[[Obs, Act | None, SupportsFloat | None, bool, bool, Dict[str, Any]], Act]) – Policy to evaluate as a callback function. It must have the following signature (**rew** = None at reset):

    policy_fn(obs: Obs,
    action_prev: Optional[Act],
    reward: Optional[float],
    terminated: bool,
    truncated: bool,
    info: InfoType
    ) -> Act # action

  • seed (int | None) – Seed of the environment to be used for the evaluation of the policy. Optional: None by default. If not specified, then a strongly random seed will be generated by gym.

  • horizon (float | None) – Horizon of the simulation before early termination. None to disable. Optional: None by default.

  • enable_stats (bool) – Whether to print high-level statistics after the simulation. Optional: Enabled by default.

  • enable_replay (bool | None) – Whether to enable replay of the simulation, and eventually recording if the extra keyword argument record_video_path is provided. Optional: Enabled by default if display is available, disabled otherwise.

  • kwargs (Any) – Extra keyword arguments to forward to the replay method if replay is requested.

Return type:

Tuple[List[SupportsFloat], List[Dict[str, Any]]]

play_interactive(enable_travelling=None, start_paused=True, enable_is_done=True, verbose=True, **kwargs)[source]

Interact evaluation mode where the robot or the world itself are actively “controlled” via keyboard inputs, with real-time rendering.

This method is not available for all pipeline environments. When it does, the available interactions and keyboard mapping is completely implementation-specific. Please refer to the documentation of the base environment being considered for details.

Warning

It ignores any top-level gym.Wrapper that may be used for training but are not considered part of the pipeline environment.

Parameters:
  • enable_travelling (bool | None) – Whether enable travelling, following the motion of the root frame of the model. This parameter is ignored if the model has no freeflyer. Optional: Enabled by default iif ‘panda3d’ viewer backend is used.

  • start_paused (bool) – Whether to start in pause. Optional: Enabled by default.

  • verbose (bool) – Whether to display status messages.

  • kwargs (Any) – Implementation-specific extra keyword arguments if any.

  • enable_is_done (bool)

Return type:

None

has_terminated(info)[source]

Determine whether the episode is over, because a terminal state of the underlying MDP has been reached or an aborting condition outside the scope of the MDP has been triggered.

By default, it does nothing but forwarding the request to the base environment. This behavior can be overwritten by the user.

Note

This method is called after refresh_observation, so that the internal buffer ‘observation’ is up-to-date.

Parameters:

info (Dict[str, Any]) – Dictionary of extra information for monitoring.

Returns:

terminated and truncated flags.

Return type:

Tuple[bool, bool]

render()[source]

Render the unified environment.

By default, it does nothing but forwarding the request to the base environment. This behavior can be overwritten by the user.

Return type:

RenderFrame | List[RenderFrame] | None

close()[source]

Closes the wrapper and its base environment.

By default, it does nothing but forwarding the request to the base environment. This behavior can be overwritten by the user.

Return type:

None

class gym_jiminy.common.bases.pipeline.ComposedJiminyEnv(env, *, reward=None, terminations=(), trajectories=None, augment_observation=False)[source]

Bases: BasePipelineWrapper[Obs, Act, BaseObs, Act], Generic[Obs, Act, BaseObs]

Extend an environment, eventually already wrapped, by plugging ad-hoc reward components and termination conditions, including their accompanying trajectory database if any.

This wrappers optionally adds the current state of the reference trajectory to the observation space under nested key (‘states’, ‘reference’), while leaving its action space unchanged. Transformation of the observation and action space is done via additional observation and/or control blocks.

Note

This wrapper derives from BasePipelineWrapper, and such as, it is considered as part of the environment pipeline unlike gym.Wrapper. This means that it will be taken into account when calling evaluate or play_interactive on the wrapped environment.

Warning

Setting ‘augment_observation=True’ enforces several restriction on the trajectory database to make sure that the observation space remains invariant. First, the database is locked, so that no trajectory can be added nor removed anymore. Then, the robot model must be the same for all the trajectories.

Warning

This class is final, ie not meant to be derived.

Parameters:
  • env (InterfaceJiminyEnv[BaseObs, Act]) – Environment to extend, eventually already wrapped.

  • reward (AbstractReward | None) – Reward object deriving from AbstractReward. It will be evaluated at each step of the environment and summed up with the one returned by the wrapped environment. This reward must be already instantiated and associated with the provided environment. None for not considering any reward. Optional: None by default.

  • terminations (Sequence[AbstractTerminationCondition]) – Sequence of termination condition objects deriving from AbstractTerminationCondition. They will be checked along with the one enforced by the wrapped environment. If provided, these termination conditions must be already instantiated and associated with the environment at hand. Optional: Empty sequence by default.

  • trajectories (Dict[str, Tuple[Trajectory, Literal['raise', 'wrap', 'clip']]] | None) – Ordered set of named tuples (trajectory, mode) as a dictionary, where ‘mode’ corresponds to the time wrapping mode. See Trajectory.get for details. The first trajectory being specified, in any, will be selected as reference by default. None to skip the whole process. Optional: None by default.

  • augment_observation (bool) – Whether to add the current state of the reference trajectory to the observation of the environment if any. Optional: False by default.

refresh_observation(measurement)[source]

Compute high-level features based on the current wrapped environment’s observation.

It simply forwards the observation computed by the wrapped environment without any processing.

Parameters:

measurement (EngineObsType) – Low-level measure from the environment to process to get higher-level observation.

Return type:

None

has_terminated(info)[source]

Determine whether the practitioner is instructed to stop the ongoing episode on the spot because a termination condition has been triggered, either coming from the based environment or from the ad-hoc termination conditions that has been plugged on top of it.

At each step of the wrapped environment, all its termination conditions will be evaluated sequentially until one of them eventually gets triggered. If this happens, evaluation is skipped for the remaining ones and the reward is evaluated straight away. Ultimately, the practitioner is instructed to stop the ongoing episode, but it is his own responsibility to honor this request. The first condition being evaluated is the one of the underlying environment, then comes the ones of this composition layer, following the original sequence ordering.

Note

This method is called after refresh_observation, so that the internal buffer ‘observation’ is up-to-date.

See also

See InterfaceJiminyEnv.has_terminated documentation for details.

Parameters:

info (Dict[str, Any]) – Dictionary of extra information for monitoring.

Returns:

terminated and truncated flags.

Return type:

Tuple[bool, bool]

compute_command(action, command)[source]

Compute the motors efforts to apply on the robot.

It simply forwards the command computed by the wrapped environment without any processing.

Parameters:
  • action (Act) – High-level target to achieve by means of the command.

  • command (ndarray) – Lower-level command to updated in-place.

Return type:

None

compute_reward(terminated, info)[source]

Compute the sum of the ad-hoc reward components that have been plugged on top of the wrapped environment.

See also

See InterfaceController.compute_reward documentation for details.

Parameters:
  • terminated (bool) – Whether the episode has reached the terminal state of the MDP at the current step. This flag can be used to compute a specific terminal reward.

  • info (Dict[str, Any]) – Dictionary of extra information for monitoring.

Returns:

Aggregated reward for the current step.

Return type:

float

step(action)[source]

Run a simulation step for a given action.

Parameters:

action (Act) – Next action to perform. None to not update it.

Returns:

Next observation, reward, status of the episode (done or not), and a dictionary of extra information.

Return type:

Tuple[Mapping[str, StructNested[ValueT]] | Sequence[StructNested[ValueT]] | ndarray, SupportsFloat, bool, bool, Dict[str, Any]]

class gym_jiminy.common.bases.pipeline.ObservedJiminyEnv(env, observer, **kwargs)[source]

Bases: BasePipelineWrapper[NestedObs, Act, BaseObs, Act], Generic[NestedObs, Act, BaseObs]

Wrap a BaseJiminyEnv Gym environment and a single observer.

../../../../_images/aafig-7565b0d7eea7517aeff19901809927af5f1ca2ad.svg

The input ‘mes_1’ of the ‘observer_1’ must be consistent with the observation ‘obs_1’ of the environment. The observation space of the resulting unified environment will be the observation space of the highest-level observer, while its action space will be the one of the unwrapped environment ‘env’.

Note

The observation space gathers the original observation of the environment with the high-level features computed by the observer in gym.spaces.Dict instance. If the original observation is already a gym.spaces.Dict instance, then the features computed by the observer are added to if under (nested) key [‘features’, observer.name] along with its internal state [‘states’, observer.name] if any. Otherwise, the original observation is stored under key ‘measurement’.

Note

The environment and the observers all have their own update period.

Warning

This design is not suitable for learning the observer, but rather for robotic-oriented observers, such as sensor fusion algorithms, Kalman filters… It is recommended to add the observer into the policy itself if it has to be trainable.

Parameters:
  • env (InterfaceJiminyEnv[BaseObs, Act]) – Environment to observe. It can be an already observed environment wrapped in ObservedJiminyEnv if one desires to stack several observers with BaseJiminyEnv.

  • observer (BaseObserverBlock[OtherObs, OtherState, BaseObs, Act]) – Observer to use to extract higher-level features.

  • kwargs (Any) – Extra keyword arguments to allow automatic pipeline wrapper generation.

refresh_observation(measurement)[source]

Compute high-level features based on the current wrapped environment’s observation.

It gathers the original observation from the environment with the features computed by the observer.

Note

Internally, it can deal with multiple observers with different update periods. Besides, it is safe to call this method multiple times successively.

Parameters:

measurement (EngineObsType) – Low-level measure from the environment to process to get higher-level observation.

Return type:

None

compute_command(action, command)[source]

Compute the motors efforts to apply on the robot.

It simply forwards the command computed by the wrapped environment without any processing.

Parameters:
  • action (Act) – High-level target to achieve by means of the command.

  • command (ndarray) – Lower-level command to updated in-place.

Return type:

None

class gym_jiminy.common.bases.pipeline.ControlledJiminyEnv(env, controller, augment_observation=False, **kwargs)[source]

Bases: BasePipelineWrapper[NestedObs, Act, BaseObs, BaseAct], Generic[NestedObs, Act, BaseObs, BaseAct]

Wrap a BaseJiminyEnv Gym environment and a single controller.

../../../../_images/aafig-1b3cf6954833a4f688d70f2ea00e6ea6d7af7f6c.svg

The output command ‘cmd_X’ of ‘ctrl_X’ must be consistent with the action space ‘act_X’ of the subsequent block. The action space of the resulting unified environment will be the action space of the highest-level controller ‘act_N’, while its observation space will be the one of the unwrapped environment ‘obs’. Alternatively, the later can also gather the (stacked) action space of the successive controllers if one is to observe the intermediary controllers’ targets.

Note

The environment and the controllers all have their own update period.

Warning

This design is not suitable for learning the controllers ‘ctrl_X’, but rather for robotic-oriented pre-defined and possibly model-based controllers, such as PID control, inverse kinematics, admittance control, or Model Predictive Control (MPC). It is recommended to add the controllers into the policy itself if it has to be trainable.

Note

As a reminder, env.step_dt refers to the learning step period, namely the timestep between two successive frames:

[observation, reward, terminated, truncated, info]

This definition remains true, independently of whether the environment is wrapped with a controller using this class. On the contrary, env.control_dt corresponds to the apparent control update period, namely the update period of the higher-level controller if multiple are piped together. The same goes for env.observe_dt.

Parameters:
  • env (InterfaceJiminyEnv[BaseObs, BaseAct]) – Environment to control. It can be an already controlled environment wrapped in ControlledJiminyEnv if one desires to stack several controllers with BaseJiminyEnv.

  • controller (BaseControllerBlock[Act, OtherState, BaseObs, BaseAct]) – Controller to use to send targets to the subsequent block.

  • augment_observation (bool) – Whether to add the target state of the controller to the observation of the environment. Regardless, the internal state of the controller will be added if any. Optional: Disabled by default.

  • kwargs (Any) – Extra keyword arguments to allow automatic pipeline wrapper generation.

refresh_observation(measurement)[source]

Compute the unified observation based on the current wrapped environment’s observation and controller’s target.

It gathers the actual observation from the environment with the target of the controller, if requested, otherwise it forwards the observation directly without any further processing.

Warning

Beware it shares the environment observation whenever possible for the sake of efficiency. Despite that, it is safe to call this method multiple times successively.

Parameters:

measurement (EngineObsType) – Low-level measure from the environment to process to get higher-level observation.

Return type:

None

compute_command(action, command)[source]

Compute the motors efforts to apply on the robot.

In practice, it updates whenever necessary:

  • the target sent to the subsequent block by the controller

  • the command send to the robot by the environment through the subsequent block

Parameters:
  • action (Act) – High-level target to achieve by means of the command.

  • command (ndarray) – Lower-level command to update in-place.

Return type:

None

compute_reward(terminated, info)[source]

Compute the reward related to a specific control block, plus extra information that may be helpful for monitoring or debugging purposes.

For the corresponding MDP to be stationary, the computation of the reward is supposed to involve only the transition from previous to current state of the simulation (possibly comprising multiple agents) under the ongoing action.

By default, it returns 0.0 without extra information no matter what. The user is expected to provide an appropriate reward on its own, either by overloading this method or by wrapping the environment with ComposedJiminyEnv for modular environment pipeline design.

Parameters:
  • terminated (bool) – Whether the episode has reached the terminal state of the MDP at the current step. This flag can be used to compute a specific terminal reward.

  • info (Dict[str, Any]) – Dictionary of extra information for monitoring.

Returns:

Aggregated reward for the current step.

Return type:

float

class gym_jiminy.common.bases.pipeline.BaseTransformObservation(env)[source]

Bases: BasePipelineWrapper[TransformedObs, Act, Obs, Act], Generic[TransformedObs, Obs, Act]

Apply some transform on the observation of the wrapped environment.

The observation transform is only applied once per step, as post-processing right before returning. It is meant to change the way a whole pipeline environment is exposed to the outside rather than changing its internal machinery. Incidentally, the transformed observation must not be involved in the computations of any subsequent pipeline layer.

Note

The user is expected to define the observation transform and its corresponding space by overloading both _initialize_action_space and transform_action. The transform will be applied at the end of every environment step.

Note

This wrapper derives from BasePipelineWrapper, and such as, it is considered internal unlike gym.Wrapper. This means that it will be taken into account when calling evaluate or play_interactive on the wrapped environment.

Parameters:
  • env (InterfaceJiminyEnv[Obs, Act]) – Base or already wrapped jiminy environment.

  • kwargs – Extra keyword arguments for multiple inheritance.

compute_command(action, command)[source]

Compute the motors efforts to apply on the robot.

It simply forwards the command computed by the wrapped environment without any processing.

Parameters:
  • action (Act) – High-level target to achieve by means of the command.

  • command (ndarray) – Lower-level command to update in-place.

Return type:

None

refresh_observation(measurement)[source]

Compute high-level features based on the current wrapped environment’s observation.

It calls transform_observation at step_dt update period, right after having refreshed the base observation.

Warning

The method transform_observation must have been overwritten by the user prior to calling this method.

Parameters:

measurement (EngineObsType) – Low-level measure from the environment to process to get higher-level observation.

Return type:

None

abstract transform_observation()[source]

Compute the transformed observation from the original wrapped environment observation.

Note

The environment observation self.env.observation has been updated prior to calling this method and therefore can be safely accessed.

Note

For the sake of efficiency, this method should directly update in-place the pre-allocated transformed observation buffer self.observation instead of returning a temporary.

Return type:

None

class gym_jiminy.common.bases.pipeline.BaseTransformAction(env)[source]

Bases: BasePipelineWrapper[Obs, TransformedAct, Obs, Act], Generic[TransformedAct, Obs, Act]

Apply some transform on the action of the wrapped environment.

The action transform is only applied once per step, as pre-processing right at the beginning. It is meant to change the way a whole pipeline environment is exposed to the outside rather than changing its internal machinery.

Note

The user is expected to define the observation transform and its corresponding space by overloading both _initialize_action_space and transform_action. The transform will be applied at the beginning of every environment step.

Note

This wrapper derives from BasePipelineWrapper, and such as, it is considered internal unlike gym.Wrapper. This means that it will be taken into account when calling evaluate or play_interactive on the wrapped environment.

Parameters:
  • env (InterfaceJiminyEnv[Obs, Act]) – Base or already wrapped jiminy environment.

  • kwargs – Extra keyword arguments for multiple inheritance.

refresh_observation(measurement)[source]

Compute high-level features based on the current wrapped environment’s observation.

It simply forwards the observation computed by the wrapped environment without any processing.

Parameters:

measurement (EngineObsType) – Low-level measure from the environment to process to get higher-level observation.

Return type:

None

compute_command(action, command)[source]

Compute the motors efforts to apply on the robot.

It calls transform_action at step_dt update period, which will update the environment action. Then, it delegates computation of the command to the base environment.

Warning

The method transform_action must have been overwritten by the user prior to calling this method.

Parameters:
  • action (TransformedAct) – High-level target to achieve by means of the command.

  • command (ndarray) – Lower-level command to update in-place.

Return type:

None

abstract transform_action(action)[source]

Compute the transformed action from the provided wrapped environment action.

Note

For the sake of efficiency, this method should directly update in-place the pre-allocated action buffer of the wrapped environment self.env.action instead of returning a temporary.

Parameters:

action (TransformedAct)

Return type:

None