Pipeline

This method gathers base implementations for blocks to be used in pipeline control design.

It implements:

  • the concept of block that can be connected to a BaseJiminyEnv environment through any level of indirection

  • a base controller block, along with a concret PD controller

  • a wrapper to combine a controller block and a BaseJiminyEnv environment, eventually already wrapped, so that it appears as a single, unified environment.

class gym_jiminy.common.bases.pipeline_bases.BasePipelineWrapper(env, **kwargs)[source]

Bases: gym_jiminy.common.bases.generic_bases.ObserverControllerInterface, gym.core.Wrapper

Wrap a BaseJiminyEnv Gym environment and a single block, so that it appears as a single, unified, environment. Eventually, the environment can already be wrapped inside one or several gym.Wrapper containers.

If several successive blocks must be used, just wrap successively each block one by one with the resulting intermediary PipelineWrapper.

Warning

This architecture is not designed for trainable blocks, but rather for robotic-oriented controllers and observers, such as PID controllers, inverse kinematics, Model Predictive Control (MPC), sensor fusion… It is recommended to add the controllers and observers into the policy itself if they have to be trainable.

Parameters
Return type

None

env: Union[gym.core.Wrapper, gym_jiminy.common.envs.env_generic.BaseJiminyEnv]
simulator: Simulator
stepper_state: jiminy.StepperState
system_state: jiminy.SystemState
sensors_data: jiminy.sensorsData
_controller_handle(t, q, v, sensors_data, command)[source]

Thin wrapper around user-specified compute_command method.

Warning

This method is not supposed to be called manually nor overloaded.

Parameters
Return type

None

_get_block_index()[source]

Get the index of the block. It corresponds the “deepness” of the block, namely how many blocks deriving from the same wrapper type than the current one are already wrapped in the environment.

Return type

int

get_observation()[source]

Get post-processed observation.

It performs a recursive shallow copy of the observation.

Warning

This method is not supposed to be called manually nor overloaded.

Return type

Union[Dict[str, StructNested], Sequence[StructNested], ValueType]

reset(controller_hook=None, **kwargs)[source]

Reset the unified environment.

In practice, it resets the environment and initializes the generic pipeline internal buffers through the use of ‘controller_hook’.

Parameters
  • controller_hook (Optional[Callable[Optional[Tuple[Optional[Callable[[float, numpy.ndarray, numpy.ndarray, jiminy_py.core.sensorsData], None]], Optional[Callable[[float, numpy.ndarray, numpy.ndarray, jiminy_py.core.sensorsData, numpy.ndarray], None]]]]]]) – Used internally for chaining multiple BasePipelineWrapper. It is not meant to be defined manually. Optional: None by default.

  • kwargs (Any) – Extra keyword arguments to comply with OpenAI Gym API.

Return type

Union[Dict[str, StructNested], Sequence[StructNested], ValueType]

step(action=None)[source]

Run a simulation step for a given action.

Parameters

action (Optional[Union[Dict[str, StructNested], Sequence[StructNested], ValueType]]) – Next action to perform. None to not update it.

Returns

Next observation, reward, status of the episode (done or not), and a dictionary of extra information.

Return type

Tuple[Union[Dict[str, StructNested], Sequence[StructNested], ValueType], float, bool, Dict[str, Any]]

_setup()[source]

Configure the wrapper.

By default, it only resets some internal buffers.

Note

This method must be called once, after the environment has been reset. This is done automatically when calling reset method.

Return type

None

refresh_observation()[source]

Compute the unified observation.

By default, it forwards the observation computed by the environment.

Parameters

measure – Observation of the environment.

Return type

None

compute_command(measure, action)[source]

Compute the motors efforts to apply on the robot.

By default, it forwards the command computed by the environment.

Parameters
  • measure (Union[Dict[str, StructNested], Sequence[StructNested], ValueType]) – Observation of the environment.

  • action (Union[Dict[str, StructNested], Sequence[StructNested], ValueType]) – Target to achieve.

Return type

Union[Dict[str, StructNested], Sequence[StructNested], ValueType]

_observer_handle(t, q, v, sensors_data)

TODO Write documentation.

Parameters
Return type

None

_refresh_action_space()

Configure the action space of the controller.

Note

This method is called right after _setup, so that both the environment to control and the controller itself should be already initialized.

Return type

None

_refresh_observation_space()

Configure the observation space.

Return type

None

action_space: Optional[gym.Space] = None
classmethod class_name()
close()

Override close in your subclass to perform any necessary cleanup.

Environments will automatically close() themselves when garbage collected or when the program exits.

compute_reward(*args, info, **kwargs)

Compute reward at current episode state.

See ControllerInterface.compute_reward for details.

Note

This method is called after updating the internal buffer ‘_num_steps_beyond_done’, which is None if the simulation is not done, 0 right after, and so on.

Parameters
  • args (Any) – Extra arguments that may be useful for derived environments, for example Gym.GoalEnv.

  • info (Dict[str, Any]) – Dictionary of extra information for monitoring.

  • kwargs (Any) – Extra keyword arguments. See ‘args’.

Returns

Total reward.

Return type

float

compute_reward_terminal(*, info)

Compute terminal reward at current episode final state.

Note

Implementation is optional. Not computing terminal reward if not overloaded by the user for the sake of efficiency.

Warning

Similarly to compute_reward, ‘info’ can be updated by reference to log extra info for monitoring.

Parameters

info (Dict[str, Any]) – Dictionary of extra information for monitoring.

Returns

Terminal reward.

Return type

float

metadata = {'render.modes': []}
observation_space: Optional[gym.Space] = None
render(mode='human', **kwargs)

Renders the environment.

The set of supported modes varies per environment. (And some environments do not support rendering at all.) By convention, if mode is:

  • human: render to the current display or terminal and return nothing. Usually for human consumption.

  • rgb_array: Return an numpy.ndarray with shape (x, y, 3), representing RGB values for an x-by-y pixel image, suitable for turning into a video.

  • ansi: Return a string (str) or StringIO.StringIO containing a terminal-style text representation. The text can include newlines and ANSI escape sequences (e.g. for colors).

Note:
Make sure that your class’s metadata ‘render.modes’ key includes

the list of supported modes. It’s recommended to call super() in implementations to use the functionality of this method.

Args:

mode (str): the mode to render with

Example:

class MyEnv(Env):

metadata = {‘render.modes’: [‘human’, ‘rgb_array’]}

def render(self, mode=’human’):
if mode == ‘rgb_array’:

return np.array(…) # return RGB frame suitable for video

elif mode == ‘human’:

… # pop up a window and render

else:

super(MyEnv, self).render(mode=mode) # just raise an exception

reward_range = (-inf, inf)
seed(seed=None)

Sets the seed for this env’s random number generator(s).

Note:

Some environments use multiple pseudorandom number generators. We want to capture all such seeds used in order to ensure that there aren’t accidental correlations between multiple generators.

Returns:
list<bigint>: Returns the list of seeds used in this env’s random

number generators. The first value in the list should be the “main” seed, or the value which a reproducer should pass to ‘seed’. Often, the main seed equals the provided ‘seed’, but this won’t be true if seed=None, for example.

property spec
property unwrapped

Completely unwrap this env.

Returns:

gym.Env: The base non-wrapped gym.Env instance

_dt_eps: Optional[float]
observe_dt: float
_observation: Optional[DataNested]
control_dt: float
_action: Optional[DataNested]
class gym_jiminy.common.bases.pipeline_bases.ObservedJiminyEnv(env, observer, augment_observation=False, **kwargs)[source]

Bases: gym_jiminy.common.bases.pipeline_bases.BasePipelineWrapper

Wrap a BaseJiminyEnv Gym environment and a single observer, so that it appears as a single, unified, environment. Eventually, the environment can already be wrapped inside one or several gym.Wrapper containers.

../../../../_images/aafig-270f40d7eee0ea6f1cd24a51ac709c035f423e00.svg

The input observation ‘obs_env’ of ‘observer’ must be consistent with the observation space ‘obs’ of the environment. The observation space of the outcoming unified environment will be the observation space of the highest-level observer, while its action space will be the one of the unwrapped environment ‘obs’.

Warning

This design is not suitable for learning the observer, but rather for robotic-oriented observers, such as sensor fusion algorithms, Kalman filters… It is recommended to add the observer into the policy itself if it has to be trainable.

Parameters
  • env (Union[gym.core.Wrapper, gym_jiminy.common.envs.env_generic.BaseJiminyEnv]) – Environment to control. It can be an already controlled environment wrapped in ObservedJiminyEnv if one desires to stack several controllers with BaseJiminyEnv.

  • observer (gym_jiminy.common.bases.block_bases.BaseObserverBlock) – Observer to use to extract higher-level features.

  • augment_observation (bool) – Whether or not to gather the high-level features computed by the observer with the raw observation of the environment. This option is only available if the observation space is of type gym.spaces.Dict. Optional: Disabled by default.

  • kwargs (Any) – Extra keyword arguments to allow automatic pipeline wrapper generation.

env: Union[gym.core.Wrapper, gym_jiminy.common.envs.env_generic.BaseJiminyEnv]
action_space: Optional[gym.Space] = None
observation_space: Optional[gym.spaces.space.Space] = None
_action: Optional[DataNested]
_observation: Optional[DataNested]
_setup()[source]

Configure the wrapper.

In addition to the base implementation, it configures the observer.

Return type

None

refresh_observation()[source]

Compute high-level features based on the current wrapped environment’s observation.

It gathers the original observation from the environment with the features computed by the observer, if requested, otherwise it forwards the features directly without any further processing.

Warning

Beware it updates and returns ‘_observation’ buffer to deal with multiple observers with different update periods. Even so, it is safe to call this method multiple times successively.

Returns

Updated part of the observation only for efficiency.

Return type

None

_controller_handle(t, q, v, sensors_data, command)

Thin wrapper around user-specified compute_command method.

Warning

This method is not supposed to be called manually nor overloaded.

Parameters
Return type

None

_get_block_index()

Get the index of the block. It corresponds the “deepness” of the block, namely how many blocks deriving from the same wrapper type than the current one are already wrapped in the environment.

Return type

int

_observer_handle(t, q, v, sensors_data)

TODO Write documentation.

Parameters
Return type

None

_refresh_action_space()

Configure the action space of the controller.

Note

This method is called right after _setup, so that both the environment to control and the controller itself should be already initialized.

Return type

None

_refresh_observation_space()

Configure the observation space.

Return type

None

classmethod class_name()
close()

Override close in your subclass to perform any necessary cleanup.

Environments will automatically close() themselves when garbage collected or when the program exits.

compute_command(measure, action)

Compute the motors efforts to apply on the robot.

By default, it forwards the command computed by the environment.

Parameters
  • measure (Union[Dict[str, StructNested], Sequence[StructNested], ValueType]) – Observation of the environment.

  • action (Union[Dict[str, StructNested], Sequence[StructNested], ValueType]) – Target to achieve.

Return type

Union[Dict[str, StructNested], Sequence[StructNested], ValueType]

compute_reward(*args, info, **kwargs)

Compute reward at current episode state.

See ControllerInterface.compute_reward for details.

Note

This method is called after updating the internal buffer ‘_num_steps_beyond_done’, which is None if the simulation is not done, 0 right after, and so on.

Parameters
  • args (Any) – Extra arguments that may be useful for derived environments, for example Gym.GoalEnv.

  • info (Dict[str, Any]) – Dictionary of extra information for monitoring.

  • kwargs (Any) – Extra keyword arguments. See ‘args’.

Returns

Total reward.

Return type

float

compute_reward_terminal(*, info)

Compute terminal reward at current episode final state.

Note

Implementation is optional. Not computing terminal reward if not overloaded by the user for the sake of efficiency.

Warning

Similarly to compute_reward, ‘info’ can be updated by reference to log extra info for monitoring.

Parameters

info (Dict[str, Any]) – Dictionary of extra information for monitoring.

Returns

Terminal reward.

Return type

float

get_observation()

Get post-processed observation.

It performs a recursive shallow copy of the observation.

Warning

This method is not supposed to be called manually nor overloaded.

Return type

Union[Dict[str, StructNested], Sequence[StructNested], ValueType]

metadata = {'render.modes': []}
render(mode='human', **kwargs)

Renders the environment.

The set of supported modes varies per environment. (And some environments do not support rendering at all.) By convention, if mode is:

  • human: render to the current display or terminal and return nothing. Usually for human consumption.

  • rgb_array: Return an numpy.ndarray with shape (x, y, 3), representing RGB values for an x-by-y pixel image, suitable for turning into a video.

  • ansi: Return a string (str) or StringIO.StringIO containing a terminal-style text representation. The text can include newlines and ANSI escape sequences (e.g. for colors).

Note:
Make sure that your class’s metadata ‘render.modes’ key includes

the list of supported modes. It’s recommended to call super() in implementations to use the functionality of this method.

Args:

mode (str): the mode to render with

Example:

class MyEnv(Env):

metadata = {‘render.modes’: [‘human’, ‘rgb_array’]}

def render(self, mode=’human’):
if mode == ‘rgb_array’:

return np.array(…) # return RGB frame suitable for video

elif mode == ‘human’:

… # pop up a window and render

else:

super(MyEnv, self).render(mode=mode) # just raise an exception

reset(controller_hook=None, **kwargs)

Reset the unified environment.

In practice, it resets the environment and initializes the generic pipeline internal buffers through the use of ‘controller_hook’.

Parameters
  • controller_hook (Optional[Callable[Optional[Tuple[Optional[Callable[[float, numpy.ndarray, numpy.ndarray, jiminy_py.core.sensorsData], None]], Optional[Callable[[float, numpy.ndarray, numpy.ndarray, jiminy_py.core.sensorsData, numpy.ndarray], None]]]]]]) – Used internally for chaining multiple BasePipelineWrapper. It is not meant to be defined manually. Optional: None by default.

  • kwargs (Any) – Extra keyword arguments to comply with OpenAI Gym API.

Return type

Union[Dict[str, StructNested], Sequence[StructNested], ValueType]

reward_range = (-inf, inf)
seed(seed=None)

Sets the seed for this env’s random number generator(s).

Note:

Some environments use multiple pseudorandom number generators. We want to capture all such seeds used in order to ensure that there aren’t accidental correlations between multiple generators.

Returns:
list<bigint>: Returns the list of seeds used in this env’s random

number generators. The first value in the list should be the “main” seed, or the value which a reproducer should pass to ‘seed’. Often, the main seed equals the provided ‘seed’, but this won’t be true if seed=None, for example.

property spec
step(action=None)

Run a simulation step for a given action.

Parameters

action (Optional[Union[Dict[str, StructNested], Sequence[StructNested], ValueType]]) – Next action to perform. None to not update it.

Returns

Next observation, reward, status of the episode (done or not), and a dictionary of extra information.

Return type

Tuple[Union[Dict[str, StructNested], Sequence[StructNested], ValueType], float, bool, Dict[str, Any]]

property unwrapped

Completely unwrap this env.

Returns:

gym.Env: The base non-wrapped gym.Env instance

simulator: Simulator
stepper_state: jiminy.StepperState
system_state: jiminy.SystemState
sensors_data: jiminy.sensorsData
_dt_eps: Optional[float]
observe_dt: float
control_dt: float
class gym_jiminy.common.bases.pipeline_bases.ControlledJiminyEnv(env, controller, augment_observation=False, **kwargs)[source]

Bases: gym_jiminy.common.bases.pipeline_bases.BasePipelineWrapper

Wrap a BaseJiminyEnv Gym environment and a single controller, so that it appears as a single, unified, environment. Eventually, the environment can already be wrapped inside one or several gym.Wrapper containers.

If several successive controllers must be used, just wrap successively each controller one by one with the resulting ControlledJiminyEnv.

../../../../_images/aafig-1b3cf6954833a4f688d70f2ea00e6ea6d7af7f6c.svg

The output command ‘cmd_X’ of ‘ctrl_X’ must be consistent with the action space ‘act_X’ of the subsequent block. The action space of the outcoming unified environment will be the action space of the highest-level controller ‘act_N’, while its observation space will be the one of the unwrapped environment ‘obs’. Alternatively, the later can also gather the (stacked) action space of the successive controllers if one is to observe the intermediary controllers’ targets.

Note

The environment and each controller has their own update period.

Warning

This design is not suitable for learning the controllers ‘ctrl_X’, but rather for robotic-oriented controllers, such as PID control, inverse kinematics, admittance control, or Model Predictive Control (MPC). It is recommended to add the controllers into the policy itself if it has to be trainable.

Note

As a reminder, env.step_dt refers to the learning step period, namely the timestep between two successive frames:

[obs, reward, done, info]

This definition remains true, independently of whether or not the environment is wrapped with a controller using this class. On the contrary, env.control_dt corresponds to the apparent control update period, namely the update period of the higher-level controller if multiple are piped together. The same goes for env.observe_dt.

Parameters
  • env (Union[gym.core.Wrapper, gym_jiminy.common.envs.env_generic.BaseJiminyEnv]) – Environment to control. It can be an already controlled environment wrapped in ControlledJiminyEnv if one desires to stack several controllers with BaseJiminyEnv.

  • controller (gym_jiminy.common.bases.block_bases.BaseControllerBlock) – Controller to use to send targets to the subsequent block.

  • augment_observation (bool) – Whether or not to gather the target of the controller with the observation of the environment. This option is only available if the observation space is of type gym.spaces.Dict. Optional: Disabled by default.

  • kwargs (Any) – Extra keyword arguments to allow automatic pipeline wrapper generation.

action_space: Optional[gym.Space] = None
observation_space: Optional[gym.spaces.space.Space] = None
_action: Optional[DataNested]
_observation: Optional[DataNested]
_controller_handle(t, q, v, sensors_data, command)

Thin wrapper around user-specified compute_command method.

Warning

This method is not supposed to be called manually nor overloaded.

Parameters
Return type

None

_get_block_index()

Get the index of the block. It corresponds the “deepness” of the block, namely how many blocks deriving from the same wrapper type than the current one are already wrapped in the environment.

Return type

int

_observer_handle(t, q, v, sensors_data)

TODO Write documentation.

Parameters
Return type

None

_refresh_action_space()

Configure the action space of the controller.

Note

This method is called right after _setup, so that both the environment to control and the controller itself should be already initialized.

Return type

None

_refresh_observation_space()

Configure the observation space.

Return type

None

_setup()[source]

Configure the wrapper.

In addition to the base implementation, it configures the controller and registers its target to the telemetry.

Return type

None

classmethod class_name()
close()

Override close in your subclass to perform any necessary cleanup.

Environments will automatically close() themselves when garbage collected or when the program exits.

get_observation()

Get post-processed observation.

It performs a recursive shallow copy of the observation.

Warning

This method is not supposed to be called manually nor overloaded.

Return type

Union[Dict[str, StructNested], Sequence[StructNested], ValueType]

metadata = {'render.modes': []}
render(mode='human', **kwargs)

Renders the environment.

The set of supported modes varies per environment. (And some environments do not support rendering at all.) By convention, if mode is:

  • human: render to the current display or terminal and return nothing. Usually for human consumption.

  • rgb_array: Return an numpy.ndarray with shape (x, y, 3), representing RGB values for an x-by-y pixel image, suitable for turning into a video.

  • ansi: Return a string (str) or StringIO.StringIO containing a terminal-style text representation. The text can include newlines and ANSI escape sequences (e.g. for colors).

Note:
Make sure that your class’s metadata ‘render.modes’ key includes

the list of supported modes. It’s recommended to call super() in implementations to use the functionality of this method.

Args:

mode (str): the mode to render with

Example:

class MyEnv(Env):

metadata = {‘render.modes’: [‘human’, ‘rgb_array’]}

def render(self, mode=’human’):
if mode == ‘rgb_array’:

return np.array(…) # return RGB frame suitable for video

elif mode == ‘human’:

… # pop up a window and render

else:

super(MyEnv, self).render(mode=mode) # just raise an exception

reset(controller_hook=None, **kwargs)

Reset the unified environment.

In practice, it resets the environment and initializes the generic pipeline internal buffers through the use of ‘controller_hook’.

Parameters
  • controller_hook (Optional[Callable[Optional[Tuple[Optional[Callable[[float, numpy.ndarray, numpy.ndarray, jiminy_py.core.sensorsData], None]], Optional[Callable[[float, numpy.ndarray, numpy.ndarray, jiminy_py.core.sensorsData, numpy.ndarray], None]]]]]]) – Used internally for chaining multiple BasePipelineWrapper. It is not meant to be defined manually. Optional: None by default.

  • kwargs (Any) – Extra keyword arguments to comply with OpenAI Gym API.

Return type

Union[Dict[str, StructNested], Sequence[StructNested], ValueType]

reward_range = (-inf, inf)
seed(seed=None)

Sets the seed for this env’s random number generator(s).

Note:

Some environments use multiple pseudorandom number generators. We want to capture all such seeds used in order to ensure that there aren’t accidental correlations between multiple generators.

Returns:
list<bigint>: Returns the list of seeds used in this env’s random

number generators. The first value in the list should be the “main” seed, or the value which a reproducer should pass to ‘seed’. Often, the main seed equals the provided ‘seed’, but this won’t be true if seed=None, for example.

property spec
step(action=None)

Run a simulation step for a given action.

Parameters

action (Optional[Union[Dict[str, StructNested], Sequence[StructNested], ValueType]]) – Next action to perform. None to not update it.

Returns

Next observation, reward, status of the episode (done or not), and a dictionary of extra information.

Return type

Tuple[Union[Dict[str, StructNested], Sequence[StructNested], ValueType], float, bool, Dict[str, Any]]

property unwrapped

Completely unwrap this env.

Returns:

gym.Env: The base non-wrapped gym.Env instance

env: Union[gym.Wrapper, BaseJiminyEnv]
simulator: Simulator
stepper_state: jiminy.StepperState
system_state: jiminy.SystemState
sensors_data: jiminy.sensorsData
_dt_eps: Optional[float]
observe_dt: float
control_dt: float
compute_command(measure, action)[source]

Compute the motors efforts to apply on the robot.

In practice, it updates, whenever it is necessary:

  • the target sent to the subsequent block by the controller

  • the command send to the robot by the environment through the subsequent block

Parameters
  • measure (Union[Dict[str, StructNested], Sequence[StructNested], ValueType]) – Observation of the environment.

  • action (Union[Dict[str, StructNested], Sequence[StructNested], ValueType]) – High-level target to achieve.

Return type

Union[Dict[str, StructNested], Sequence[StructNested], ValueType]

refresh_observation()[source]

Compute the unified observation based on the current wrapped environment’s observation and controller’s target.

It gathers the actual observation from the environment with the target of the controller, if requested, otherwise it forwards the observation directly without any further processing.

Warning

Beware it shares the environment observation whenever it is possible for the sake of efficiency. Despite that, it is safe to call this method multiple times successively.

Returns

Original environment observation, eventually including controllers targets if requested.

Return type

None

compute_reward(*args, **kwargs)[source]

Compute reward at current episode state.

See ControllerInterface.compute_reward for details.

Note

This method is called after updating the internal buffer ‘_num_steps_beyond_done’, which is None if the simulation is not done, 0 right after, and so on.

Parameters
  • args (Any) – Extra arguments that may be useful for derived environments, for example Gym.GoalEnv.

  • info – Dictionary of extra information for monitoring.

  • kwargs (Any) – Extra keyword arguments. See ‘args’.

Returns

Total reward.

Return type

float

compute_reward_terminal(*args, **kwargs)[source]

Compute terminal reward at current episode final state.

Note

Implementation is optional. Not computing terminal reward if not overloaded by the user for the sake of efficiency.

Warning

Similarly to compute_reward, ‘info’ can be updated by reference to log extra info for monitoring.

Parameters
  • info – Dictionary of extra information for monitoring.

  • args (Any) –

  • kwargs (Any) –

Returns

Terminal reward.

Return type

float