Pipeline¶
This method gathers base implementations for blocks to be used in pipeline control design.
It implements:
the concept of block that can be connected to a BaseJiminyEnv environment through any level of indirection
a base controller block, along with a concret PD controller
a wrapper to combine a controller block and a BaseJiminyEnv environment, eventually already wrapped, so that it appears as a black-box environment.
- class gym_jiminy.common.bases.pipeline_bases.BasePipelineWrapper(env, **kwargs)[source]¶
Bases:
gym_jiminy.common.bases.generic_bases.ObserverControllerInterface
,gym.core.Wrapper
Wrap a BaseJiminyEnv Gym environment and a single block, so that it appears as a single, unified, environment. Eventually, the environment can already be wrapped inside one or several gym.Wrapper containers.
If several successive blocks must be used, just wrap successively each block one by one with the resulting intermediary PipelineWrapper.
Warning
This architecture is not designed for trainable blocks, but rather for robotic-oriented controllers and observers, such as PID controllers, inverse kinematics, Model Predictive Control (MPC), sensor fusion… It is recommended to add the controllers and observers into the policy itself if they have to be trainable.
- Parameters
kwargs (Any) – Extra keyword arguments for multiple inheritance.
env (Union[gym.core.Wrapper, gym_jiminy.common.envs.env_generic.BaseJiminyEnv]) –
- Return type
None
- env: Union[gym.core.Wrapper, gym_jiminy.common.envs.env_generic.BaseJiminyEnv]¶
- stepper_state: jiminy.StepperState¶
- system_state: jiminy.SystemState¶
- sensors_data: jiminy.sensorsData¶
- _controller_handle(t, q, v, sensors_data, command)[source]¶
Thin wrapper around user-specified compute_command method.
Warning
This method is not supposed to be called manually nor overloaded.
- Parameters
t (float) –
q (numpy.ndarray) –
v (numpy.ndarray) –
sensors_data (jiminy_py.core.sensorsData) –
command (numpy.ndarray) –
- Return type
None
- _get_block_index()[source]¶
Get the index of the block. It corresponds the “deepness” of the block, namely how many blocks deriving from the same wrapper type than the current one are already wrapped in the environment.
- Return type
- get_observation()[source]¶
Get post-processed observation.
It performs a recursive shallow copy of the observation.
Warning
This method is not supposed to be called manually nor overloaded.
- reset(controller_hook=None, **kwargs)[source]¶
Reset the unified environment.
In practice, it resets the environment and initializes the generic pipeline internal buffers through the use of ‘controller_hook’.
- Parameters
controller_hook (Optional[Callable[[], Optional[Tuple[Optional[Callable[[float, numpy.ndarray, numpy.ndarray, jiminy_py.core.sensorsData], None]], Optional[Callable[[float, numpy.ndarray, numpy.ndarray, jiminy_py.core.sensorsData, numpy.ndarray], None]]]]]]) – Used internally for chaining multiple BasePipelineWrapper. It is not meant to be defined manually. Optional: None by default.
kwargs (Any) – Extra keyword arguments to comply with OpenAI Gym API.
- Return type
Union[Dict[str, StructNested], Sequence[StructNested], numpy.ndarray]
- step(action=None)[source]¶
Run a simulation step for a given action.
- Parameters
action (Optional[Union[Dict[str, StructNested], Sequence[StructNested], numpy.ndarray]]) – Next action to perform. None to not update it.
- Returns
Next observation, reward, status of the episode (done or not), and a dictionary of extra information.
- Return type
Tuple[Union[Dict[str, StructNested], Sequence[StructNested], numpy.ndarray], float, bool, Dict[str, Any]]
- _setup()[source]¶
Configure the wrapper.
By default, it only resets some internal buffers.
Note
This method must be called once, after the environment has been reset. This is done automatically when calling reset method.
- Return type
None
- refresh_observation()[source]¶
Compute the unified observation.
By default, it forwards the observation computed by the environment.
- Parameters
measure – Observation of the environment.
- Return type
None
- compute_command(measure, action)[source]¶
Compute the motors efforts to apply on the robot.
By default, it forwards the command computed by the environment.
- _initialize_action_space()¶
Configure the action space of the controller.
Note
This method is called right after _setup, so that both the environment to control and the controller itself should be already initialized.
- Return type
None
- _initialize_observation_space()¶
Configure the observation space.
- Return type
None
- _observer_handle(t, q, v, sensors_data)¶
TODO Write documentation.
- Parameters
t (float) –
q (numpy.ndarray) –
v (numpy.ndarray) –
sensors_data (jiminy_py.core.sensorsData) –
- Return type
None
- property action_space¶
- classmethod class_name()¶
- close()¶
Override close in your subclass to perform any necessary cleanup.
Environments will automatically close() themselves when garbage collected or when the program exits.
- compute_reward(*args, info, **kwargs)¶
Compute reward at current episode state.
See ControllerInterface.compute_reward for details.
Note
This method is called after updating the internal buffer ‘_num_steps_beyond_done’, which is None if the simulation is not done, 0 right after, and so on.
- compute_reward_terminal(*, info)¶
Compute terminal reward at current episode final state.
Note
Implementation is optional. Not computing terminal reward if not overloaded by the user for the sake of efficiency.
Warning
Similarly to compute_reward, ‘info’ can be updated by reference to log extra info for monitoring.
- property observation_space¶
- render(mode='human', **kwargs)¶
Renders the environment.
The set of supported modes varies per environment. (And some environments do not support rendering at all.) By convention, if mode is:
human: render to the current display or terminal and return nothing. Usually for human consumption.
rgb_array: Return an numpy.ndarray with shape (x, y, 3), representing RGB values for an x-by-y pixel image, suitable for turning into a video.
ansi: Return a string (str) or StringIO.StringIO containing a terminal-style text representation. The text can include newlines and ANSI escape sequences (e.g. for colors).
- Note:
- Make sure that your class’s metadata ‘render.modes’ key includes
the list of supported modes. It’s recommended to call super() in implementations to use the functionality of this method.
- Args:
mode (str): the mode to render with
Example:
- class MyEnv(Env):
metadata = {‘render.modes’: [‘human’, ‘rgb_array’]}
- def render(self, mode=’human’):
- if mode == ‘rgb_array’:
return np.array(…) # return RGB frame suitable for video
- elif mode == ‘human’:
… # pop up a window and render
- else:
super(MyEnv, self).render(mode=mode) # just raise an exception
- property reward_range¶
Built-in immutable sequence.
If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable’s items.
If the argument is a tuple, the return value is the same object.
- seed(seed=None)¶
Sets the seed for this env’s random number generator(s).
- Note:
Some environments use multiple pseudorandom number generators. We want to capture all such seeds used in order to ensure that there aren’t accidental correlations between multiple generators.
- Returns:
- list<bigint>: Returns the list of seeds used in this env’s random
number generators. The first value in the list should be the “main” seed, or the value which a reproducer should pass to ‘seed’. Often, the main seed equals the provided ‘seed’, but this won’t be true if seed=None, for example.
- property spec¶
- property unwrapped¶
Completely unwrap this env.
- Returns:
gym.Env: The base non-wrapped gym.Env instance
- _observation: DataNested¶
- _action: DataNested¶
- class gym_jiminy.common.bases.pipeline_bases.ObservedJiminyEnv(env, observer, augment_observation=False, **kwargs)[source]¶
Bases:
gym_jiminy.common.bases.pipeline_bases.BasePipelineWrapper
Wrap a BaseJiminyEnv Gym environment and a single observer, so that it appears as a single, unified, environment. Eventually, the environment can already be wrapped inside one or several gym.Wrapper containers.
The input observation ‘obs_env’ of ‘observer’ must be consistent with the observation space ‘obs’ of the environment. The observation space of the outcoming unified environment will be the observation space of the highest-level observer, while its action space will be the one of the unwrapped environment ‘obs’.
Warning
This design is not suitable for learning the observer, but rather for robotic-oriented observers, such as sensor fusion algorithms, Kalman filters… It is recommended to add the observer into the policy itself if it has to be trainable.
- Parameters
env (Union[gym.core.Wrapper, gym_jiminy.common.envs.env_generic.BaseJiminyEnv]) – Environment to control. It can be an already controlled environment wrapped in ObservedJiminyEnv if one desires to stack several controllers with BaseJiminyEnv.
observer (gym_jiminy.common.bases.block_bases.BaseObserverBlock) – Observer to use to extract higher-level features.
augment_observation (bool) – Whether or not to gather the high-level features computed by the observer with the raw observation of the environment. This option is only available if the observation space is of type gym.spaces.Dict. Optional: Disabled by default.
kwargs (Any) – Extra keyword arguments to allow automatic pipeline wrapper generation.
- env: Union[gym.core.Wrapper, gym_jiminy.common.envs.env_generic.BaseJiminyEnv]¶
- property action_space¶
- property observation_space¶
- _action: DataNested¶
- _observation: DataNested¶
- _setup()[source]¶
Configure the wrapper.
In addition to the base implementation, it configures the observer.
- Return type
None
- refresh_observation()[source]¶
Compute high-level features based on the current wrapped environment’s observation.
It gathers the original observation from the environment with the features computed by the observer, if requested, otherwise it forwards the features directly without any further processing.
Warning
Beware it updates and returns ‘_observation’ buffer to deal with multiple observers with different update periods. Even so, it is safe to call this method multiple times successively.
- Returns
Updated part of the observation only for efficiency.
- Return type
None
- _controller_handle(t, q, v, sensors_data, command)¶
Thin wrapper around user-specified compute_command method.
Warning
This method is not supposed to be called manually nor overloaded.
- Parameters
t (float) –
q (numpy.ndarray) –
v (numpy.ndarray) –
sensors_data (jiminy_py.core.sensorsData) –
command (numpy.ndarray) –
- Return type
None
- _get_block_index()¶
Get the index of the block. It corresponds the “deepness” of the block, namely how many blocks deriving from the same wrapper type than the current one are already wrapped in the environment.
- Return type
- _initialize_action_space()¶
Configure the action space of the controller.
Note
This method is called right after _setup, so that both the environment to control and the controller itself should be already initialized.
- Return type
None
- _initialize_observation_space()¶
Configure the observation space.
- Return type
None
- _observer_handle(t, q, v, sensors_data)¶
TODO Write documentation.
- Parameters
t (float) –
q (numpy.ndarray) –
v (numpy.ndarray) –
sensors_data (jiminy_py.core.sensorsData) –
- Return type
None
- classmethod class_name()¶
- close()¶
Override close in your subclass to perform any necessary cleanup.
Environments will automatically close() themselves when garbage collected or when the program exits.
- compute_command(measure, action)¶
Compute the motors efforts to apply on the robot.
By default, it forwards the command computed by the environment.
- compute_reward(*args, info, **kwargs)¶
Compute reward at current episode state.
See ControllerInterface.compute_reward for details.
Note
This method is called after updating the internal buffer ‘_num_steps_beyond_done’, which is None if the simulation is not done, 0 right after, and so on.
- compute_reward_terminal(*, info)¶
Compute terminal reward at current episode final state.
Note
Implementation is optional. Not computing terminal reward if not overloaded by the user for the sake of efficiency.
Warning
Similarly to compute_reward, ‘info’ can be updated by reference to log extra info for monitoring.
- get_observation()¶
Get post-processed observation.
It performs a recursive shallow copy of the observation.
Warning
This method is not supposed to be called manually nor overloaded.
- render(mode='human', **kwargs)¶
Renders the environment.
The set of supported modes varies per environment. (And some environments do not support rendering at all.) By convention, if mode is:
human: render to the current display or terminal and return nothing. Usually for human consumption.
rgb_array: Return an numpy.ndarray with shape (x, y, 3), representing RGB values for an x-by-y pixel image, suitable for turning into a video.
ansi: Return a string (str) or StringIO.StringIO containing a terminal-style text representation. The text can include newlines and ANSI escape sequences (e.g. for colors).
- Note:
- Make sure that your class’s metadata ‘render.modes’ key includes
the list of supported modes. It’s recommended to call super() in implementations to use the functionality of this method.
- Args:
mode (str): the mode to render with
Example:
- class MyEnv(Env):
metadata = {‘render.modes’: [‘human’, ‘rgb_array’]}
- def render(self, mode=’human’):
- if mode == ‘rgb_array’:
return np.array(…) # return RGB frame suitable for video
- elif mode == ‘human’:
… # pop up a window and render
- else:
super(MyEnv, self).render(mode=mode) # just raise an exception
- reset(controller_hook=None, **kwargs)¶
Reset the unified environment.
In practice, it resets the environment and initializes the generic pipeline internal buffers through the use of ‘controller_hook’.
- Parameters
controller_hook (Optional[Callable[[], Optional[Tuple[Optional[Callable[[float, numpy.ndarray, numpy.ndarray, jiminy_py.core.sensorsData], None]], Optional[Callable[[float, numpy.ndarray, numpy.ndarray, jiminy_py.core.sensorsData, numpy.ndarray], None]]]]]]) – Used internally for chaining multiple BasePipelineWrapper. It is not meant to be defined manually. Optional: None by default.
kwargs (Any) – Extra keyword arguments to comply with OpenAI Gym API.
- Return type
Union[Dict[str, StructNested], Sequence[StructNested], numpy.ndarray]
- property reward_range¶
Built-in immutable sequence.
If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable’s items.
If the argument is a tuple, the return value is the same object.
- seed(seed=None)¶
Sets the seed for this env’s random number generator(s).
- Note:
Some environments use multiple pseudorandom number generators. We want to capture all such seeds used in order to ensure that there aren’t accidental correlations between multiple generators.
- Returns:
- list<bigint>: Returns the list of seeds used in this env’s random
number generators. The first value in the list should be the “main” seed, or the value which a reproducer should pass to ‘seed’. Often, the main seed equals the provided ‘seed’, but this won’t be true if seed=None, for example.
- property spec¶
- step(action=None)¶
Run a simulation step for a given action.
- Parameters
action (Optional[Union[Dict[str, StructNested], Sequence[StructNested], numpy.ndarray]]) – Next action to perform. None to not update it.
- Returns
Next observation, reward, status of the episode (done or not), and a dictionary of extra information.
- Return type
Tuple[Union[Dict[str, StructNested], Sequence[StructNested], numpy.ndarray], float, bool, Dict[str, Any]]
- property unwrapped¶
Completely unwrap this env.
- Returns:
gym.Env: The base non-wrapped gym.Env instance
- stepper_state: jiminy.StepperState¶
- system_state: jiminy.SystemState¶
- sensors_data: jiminy.sensorsData¶
- class gym_jiminy.common.bases.pipeline_bases.ControlledJiminyEnv(env, controller, augment_observation=False, **kwargs)[source]¶
Bases:
gym_jiminy.common.bases.pipeline_bases.BasePipelineWrapper
Wrap a BaseJiminyEnv Gym environment and a single controller, so that it appears as a single, unified, environment. Eventually, the environment can already be wrapped inside one or several gym.Wrapper containers.
If several successive controllers must be used, just wrap successively each controller one by one with the resulting ControlledJiminyEnv.
The output command ‘cmd_X’ of ‘ctrl_X’ must be consistent with the action space ‘act_X’ of the subsequent block. The action space of the outcoming unified environment will be the action space of the highest-level controller ‘act_N’, while its observation space will be the one of the unwrapped environment ‘obs’. Alternatively, the later can also gather the (stacked) action space of the successive controllers if one is to observe the intermediary controllers’ targets.
Note
The environment and each controller has their own update period.
Warning
This design is not suitable for learning the controllers ‘ctrl_X’, but rather for robotic-oriented controllers, such as PID control, inverse kinematics, admittance control, or Model Predictive Control (MPC). It is recommended to add the controllers into the policy itself if it has to be trainable.
Note
As a reminder, env.step_dt refers to the learning step period, namely the timestep between two successive frames:
[obs, reward, done, info]
This definition remains true, independently of whether or not the environment is wrapped with a controller using this class. On the contrary, env.control_dt corresponds to the apparent control update period, namely the update period of the higher-level controller if multiple are piped together. The same goes for env.observe_dt.
- Parameters
env (Union[gym.core.Wrapper, gym_jiminy.common.envs.env_generic.BaseJiminyEnv]) – Environment to control. It can be an already controlled environment wrapped in ControlledJiminyEnv if one desires to stack several controllers with BaseJiminyEnv.
controller (gym_jiminy.common.bases.block_bases.BaseControllerBlock) – Controller to use to send targets to the subsequent block.
augment_observation (bool) – Whether or not to gather the target of the controller with the observation of the environment. This option is only available if the observation space is of type gym.spaces.Dict. Optional: Disabled by default.
kwargs (Any) – Extra keyword arguments to allow automatic pipeline wrapper generation.
- property action_space¶
- property observation_space¶
- _action: DataNested¶
- _observation: DataNested¶
- _controller_handle(t, q, v, sensors_data, command)¶
Thin wrapper around user-specified compute_command method.
Warning
This method is not supposed to be called manually nor overloaded.
- Parameters
t (float) –
q (numpy.ndarray) –
v (numpy.ndarray) –
sensors_data (jiminy_py.core.sensorsData) –
command (numpy.ndarray) –
- Return type
None
- _get_block_index()¶
Get the index of the block. It corresponds the “deepness” of the block, namely how many blocks deriving from the same wrapper type than the current one are already wrapped in the environment.
- Return type
- _initialize_action_space()¶
Configure the action space of the controller.
Note
This method is called right after _setup, so that both the environment to control and the controller itself should be already initialized.
- Return type
None
- _initialize_observation_space()¶
Configure the observation space.
- Return type
None
- _observer_handle(t, q, v, sensors_data)¶
TODO Write documentation.
- Parameters
t (float) –
q (numpy.ndarray) –
v (numpy.ndarray) –
sensors_data (jiminy_py.core.sensorsData) –
- Return type
None
- _setup()[source]¶
Configure the wrapper.
In addition to the base implementation, it configures the controller and registers its target to the telemetry.
- Return type
None
- classmethod class_name()¶
- close()¶
Override close in your subclass to perform any necessary cleanup.
Environments will automatically close() themselves when garbage collected or when the program exits.
- get_observation()¶
Get post-processed observation.
It performs a recursive shallow copy of the observation.
Warning
This method is not supposed to be called manually nor overloaded.
- render(mode='human', **kwargs)¶
Renders the environment.
The set of supported modes varies per environment. (And some environments do not support rendering at all.) By convention, if mode is:
human: render to the current display or terminal and return nothing. Usually for human consumption.
rgb_array: Return an numpy.ndarray with shape (x, y, 3), representing RGB values for an x-by-y pixel image, suitable for turning into a video.
ansi: Return a string (str) or StringIO.StringIO containing a terminal-style text representation. The text can include newlines and ANSI escape sequences (e.g. for colors).
- Note:
- Make sure that your class’s metadata ‘render.modes’ key includes
the list of supported modes. It’s recommended to call super() in implementations to use the functionality of this method.
- Args:
mode (str): the mode to render with
Example:
- class MyEnv(Env):
metadata = {‘render.modes’: [‘human’, ‘rgb_array’]}
- def render(self, mode=’human’):
- if mode == ‘rgb_array’:
return np.array(…) # return RGB frame suitable for video
- elif mode == ‘human’:
… # pop up a window and render
- else:
super(MyEnv, self).render(mode=mode) # just raise an exception
- reset(controller_hook=None, **kwargs)¶
Reset the unified environment.
In practice, it resets the environment and initializes the generic pipeline internal buffers through the use of ‘controller_hook’.
- Parameters
controller_hook (Optional[Callable[[], Optional[Tuple[Optional[Callable[[float, numpy.ndarray, numpy.ndarray, jiminy_py.core.sensorsData], None]], Optional[Callable[[float, numpy.ndarray, numpy.ndarray, jiminy_py.core.sensorsData, numpy.ndarray], None]]]]]]) – Used internally for chaining multiple BasePipelineWrapper. It is not meant to be defined manually. Optional: None by default.
kwargs (Any) – Extra keyword arguments to comply with OpenAI Gym API.
- Return type
Union[Dict[str, StructNested], Sequence[StructNested], numpy.ndarray]
- property reward_range¶
Built-in immutable sequence.
If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable’s items.
If the argument is a tuple, the return value is the same object.
- seed(seed=None)¶
Sets the seed for this env’s random number generator(s).
- Note:
Some environments use multiple pseudorandom number generators. We want to capture all such seeds used in order to ensure that there aren’t accidental correlations between multiple generators.
- Returns:
- list<bigint>: Returns the list of seeds used in this env’s random
number generators. The first value in the list should be the “main” seed, or the value which a reproducer should pass to ‘seed’. Often, the main seed equals the provided ‘seed’, but this won’t be true if seed=None, for example.
- property spec¶
- step(action=None)¶
Run a simulation step for a given action.
- Parameters
action (Optional[Union[Dict[str, StructNested], Sequence[StructNested], numpy.ndarray]]) – Next action to perform. None to not update it.
- Returns
Next observation, reward, status of the episode (done or not), and a dictionary of extra information.
- Return type
Tuple[Union[Dict[str, StructNested], Sequence[StructNested], numpy.ndarray], float, bool, Dict[str, Any]]
- property unwrapped¶
Completely unwrap this env.
- Returns:
gym.Env: The base non-wrapped gym.Env instance
- env: Union[gym.Wrapper, BaseJiminyEnv]¶
- stepper_state: jiminy.StepperState¶
- system_state: jiminy.SystemState¶
- sensors_data: jiminy.sensorsData¶
- compute_command(measure, action)[source]¶
Compute the motors efforts to apply on the robot.
In practice, it updates, whenever it is necessary:
the target sent to the subsequent block by the controller
the command send to the robot by the environment through the subsequent block
- refresh_observation()[source]¶
Compute the unified observation based on the current wrapped environment’s observation and controller’s target.
It gathers the actual observation from the environment with the target of the controller, if requested, otherwise it forwards the observation directly without any further processing.
Warning
Beware it shares the environment observation whenever it is possible for the sake of efficiency. Despite that, it is safe to call this method multiple times successively.
- Returns
Original environment observation, eventually including controllers targets if requested.
- Return type
None
- compute_reward(*args, **kwargs)[source]¶
Compute reward at current episode state.
See ControllerInterface.compute_reward for details.
Note
This method is called after updating the internal buffer ‘_num_steps_beyond_done’, which is None if the simulation is not done, 0 right after, and so on.
- compute_reward_terminal(*args, **kwargs)[source]¶
Compute terminal reward at current episode final state.
Note
Implementation is optional. Not computing terminal reward if not overloaded by the user for the sake of efficiency.
Warning
Similarly to compute_reward, ‘info’ can be updated by reference to log extra info for monitoring.