"""This method gathers base implementations for blocks to be used in pipeline
control design.
It implements:
- the concept of block that can be connected to a `BaseJiminyEnv` environment
through any level of indirection
- a base controller block, along with a concret PD controller
- a wrapper to combine a controller block and a `BaseJiminyEnv` environment,
eventually already wrapped, so that it appears as a black-box environment.
"""
from copy import deepcopy
from collections import OrderedDict
from typing import Optional, Union, Tuple, Dict, Any, List, Callable
import numpy as np
import gym
import jiminy_py.core as jiminy
from jiminy_py.simulator import Simulator
from ..utils import (DataNested,
is_breakpoint,
zeros,
fill,
copy,
set_value)
from ..envs import ObserverHandleType, ControllerHandleType, BaseJiminyEnv
from .block_bases import BaseControllerBlock, BaseObserverBlock
from .generic_bases import ObserverControllerInterface
[docs]class BasePipelineWrapper(ObserverControllerInterface, gym.Wrapper):
"""Wrap a `BaseJiminyEnv` Gym environment and a single block, so that it
appears as a single, unified, environment. Eventually, the environment can
already be wrapped inside one or several `gym.Wrapper` containers.
If several successive blocks must be used, just wrap successively each
block one by one with the resulting intermediary `PipelineWrapper`.
.. warning::
This architecture is not designed for trainable blocks, but rather for
robotic-oriented controllers and observers, such as PID controllers,
inverse kinematics, Model Predictive Control (MPC), sensor fusion...
It is recommended to add the controllers and observers into the
policy itself if they have to be trainable.
"""
env: Union[gym.Wrapper, BaseJiminyEnv]
def __init__(self,
env: Union[gym.Wrapper, BaseJiminyEnv],
**kwargs: Any) -> None:
"""
:param kwargs: Extra keyword arguments for multiple inheritance.
"""
# Initialize base wrapper and interfaces through multiple inheritance
super().__init__(env) # Do not forward extra arguments, if any
# Refresh some proxies for fast lookup
self.simulator: Simulator = self.env.simulator
self.stepper_state: jiminy.StepperState = self.env.stepper_state
self.system_state: jiminy.SystemState = self.env.system_state
self.sensors_data: jiminy.sensorsData = self.env.sensors_data
# Define some internal buffers
self._command = zeros(self.env.unwrapped.action_space)
def __dir__(self) -> List[str]:
"""Attribute lookup.
It is mainly used by autocomplete feature of Ipython. It is overloaded
to get consistent autocompletion wrt `getattr`.
"""
return super().__dir__() + self.env.__dir__()
[docs] def _controller_handle(self,
t: float,
q: np.ndarray,
v: np.ndarray,
sensors_data: jiminy.sensorsData,
command: np.ndarray) -> None:
"""Thin wrapper around user-specified `compute_command` method.
.. warning::
This method is not supposed to be called manually nor overloaded.
"""
command[:] = self.compute_command(
self.env.get_observation(), self._action)
[docs] def _get_block_index(self) -> int:
"""Get the index of the block. It corresponds the "deepness" of the
block, namely how many blocks deriving from the same wrapper type than
the current one are already wrapped in the environment.
"""
i = 0
block = self.env
while not isinstance(block, BaseJiminyEnv):
i += isinstance(block, self.__class__)
block = block.env
return i
[docs] def get_observation(self) -> DataNested:
"""Get post-processed observation.
It performs a recursive shallow copy of the observation.
.. warning::
This method is not supposed to be called manually nor overloaded.
"""
return copy(self._observation)
[docs] def reset(self,
controller_hook: Optional[Callable[[], Optional[Tuple[
Optional[ObserverHandleType],
Optional[ControllerHandleType]]]]] = None,
**kwargs: Any) -> DataNested:
"""Reset the unified environment.
In practice, it resets the environment and initializes the generic
pipeline internal buffers through the use of 'controller_hook'.
:param controller_hook: Used internally for chaining multiple
`BasePipelineWrapper`. It is not meant to be
defined manually.
Optional: None by default.
:param kwargs: Extra keyword arguments to comply with OpenAI Gym API.
"""
# pylint: disable=unused-argument,arguments-differ
# Define chained controller hook
def register() -> Tuple[ObserverHandleType, ControllerHandleType]:
"""Register the block to the higher-level block.
This method is used internally to make sure that `_setup` method
of connected blocks are called in the right order, namely from the
lowest-level block to the highest-level one, right after reset of
the low-level simulator and just before performing the first step.
"""
nonlocal self, controller_hook
# Initialize the pipeline wrapper
self._setup()
# Forward the observer and controller handles provided by the
# controller hook of higher-level block, if any, or use the
# ones of this block otherwise.
observer_handle, controller_handle = None, None
if controller_hook is not None:
handles = controller_hook()
if handles is not None:
observer_handle, controller_handle = handles
if observer_handle is None:
observer_handle = self._observer_handle
if controller_handle is None:
controller_handle = self._controller_handle
return observer_handle, controller_handle
# Reset base pipeline
self.env.reset(register, **kwargs)
return self.get_observation()
[docs] def step(self,
action: Optional[DataNested] = None
) -> Tuple[DataNested, float, bool, Dict[str, Any]]:
"""Run a simulation step for a given action.
:param action: Next action to perform. `None` to not update it.
:returns: Next observation, reward, status of the episode (done or
not), and a dictionary of extra information.
"""
# Backup the action to perform, if any
if action is not None:
set_value(self._action, action)
# Compute the next learning step
_, reward, done, info = self.env.step()
# Compute block's reward and sum it to total
reward += self.compute_reward(info=info)
if self.enable_reward_terminal:
if done and self.env.unwrapped._num_steps_beyond_done == 0:
reward += self.compute_reward_terminal(info=info)
return self.get_observation(), reward, done, info
# methods to override:
# ----------------------------
[docs] def _setup(self) -> None:
"""Configure the wrapper.
By default, it only resets some internal buffers.
.. note::
This method must be called once, after the environment has been
reset. This is done automatically when calling `reset` method.
"""
# Call base implementation
super()._setup()
# Reset some internal buffers
fill(self._action, 0.0)
fill(self._command, 0.0)
# Refresh some proxies for fast lookup
self.sensors_data = self.env.sensors_data
[docs] def refresh_observation(self) -> None: # type: ignore[override]
"""Compute the unified observation.
By default, it forwards the observation computed by the environment.
:param measure: Observation of the environment.
"""
# pylint: disable=arguments-differ
self.env.refresh_observation()
[docs] def compute_command(self,
measure: DataNested,
action: DataNested) -> DataNested:
"""Compute the motors efforts to apply on the robot.
By default, it forwards the command computed by the environment.
:param measure: Observation of the environment.
:param action: Target to achieve.
"""
set_value(self._action, action)
set_value(self.env._action, action)
return self.env.compute_command(measure, action)
[docs]class ObservedJiminyEnv(BasePipelineWrapper):
"""Wrap a `BaseJiminyEnv` Gym environment and a single observer, so that
it appears as a single, unified, environment. Eventually, the environment
can already be wrapped inside one or several `gym.Wrapper` containers.
.. aafig::
:proportional:
:textual:
+---------+ +------------+
"act_1" | | "obs_env" | |
------->+ "env" +---------->+ "observer" +----------->
| | "obs" | | "features"
+---------+ +------------+
The input observation 'obs_env' of 'observer' must be consistent with the
observation space 'obs' of the environment. The observation space of the
outcoming unified environment will be the observation space of the
highest-level observer, while its action space will be the one of the
unwrapped environment 'obs'.
.. warning::
This design is not suitable for learning the observer, but rather for
robotic-oriented observers, such as sensor fusion algorithms, Kalman
filters... It is recommended to add the observer into the policy
itself if it has to be trainable.
""" # noqa: E501 # pylint: disable=line-too-long
env: Union[gym.Wrapper, BaseJiminyEnv]
observation_space: Optional[gym.Space]
def __init__(self,
env: Union[gym.Wrapper, BaseJiminyEnv],
observer: BaseObserverBlock,
augment_observation: bool = False,
**kwargs: Any):
"""
:param env: Environment to control. It can be an already controlled
environment wrapped in `ObservedJiminyEnv` if one desires
to stack several controllers with `BaseJiminyEnv`.
:param observer: Observer to use to extract higher-level features.
:param augment_observation: Whether or not to gather the high-level
features computed by the observer with the
raw observation of the environment. This
option is only available if the observation
space is of type `gym.spaces.Dict`.
Optional: Disabled by default.
:param kwargs: Extra keyword arguments to allow automatic pipeline
wrapper generation.
"""
# Backup user arguments
self.observer = observer
self.augment_observation = augment_observation
# Make sure that the unwrapped environment matches the controlled one
assert env.unwrapped is observer.env
# Initialize base wrapper
super().__init__(env, **kwargs)
# Retrieve the environment observation
observation = self.env.get_observation()
# Update the action space
self.action_space = self.env.action_space
# Set the controller name, based on the controller index
self.observer_name = f"observer_{self._get_block_index()}"
# Check that the initial observation of the environment is consistent
# with the action space of the observer.
assert self.observer.action_space.contains(observation), (
"The command is not consistent with the action space of the "
"subsequent block.")
# Update the observation space
if self.augment_observation:
self.observation_space = deepcopy(self.env.observation_space)
if not isinstance(self.observation_space, gym.spaces.Dict):
self.observation_space = gym.spaces.Dict(OrderedDict(
measures=self.observation_space))
self.observation_space.spaces.setdefault(
'features', gym.spaces.Dict()).spaces[
self.observer_name] = self.observer.observation_space
else:
self.observation_space = self.observer.observation_space
# Initialize some internal buffers
self._action = zeros(self.action_space, dtype=np.float64)
self._observation = zeros(self.observation_space)
[docs] def _setup(self) -> None:
"""Configure the wrapper.
In addition to the base implementation, it configures the observer.
"""
# Configure the observer
self.observer._setup()
# Call base implementation
super()._setup()
# Compute the observe and control update periods
self.observe_dt = self.observer.observe_dt
self.control_dt = self.env.control_dt
[docs] def refresh_observation(self) -> None: # type: ignore[override]
"""Compute high-level features based on the current wrapped
environment's observation.
It gathers the original observation from the environment with the
features computed by the observer, if requested, otherwise it forwards
the features directly without any further processing.
.. warning::
Beware it updates and returns '_observation' buffer to deal with
multiple observers with different update periods. Even so, it is
safe to call this method multiple times successively.
:returns: Updated part of the observation only for efficiency.
"""
# pylint: disable=arguments-differ
# Get environment observation
super().refresh_observation()
# Update observed features if necessary
t = self.stepper_state.t
if is_breakpoint(t, self.observe_dt, self._dt_eps):
obs = self.env.get_observation()
self.observer.refresh_observation(obs)
if not self.simulator.is_simulation_running:
features = self.observer.get_observation()
if self.augment_observation:
assert isinstance(self._observation, dict)
# Make sure to store references
if isinstance(obs, gym.spaces.Dict):
self._observation = obs
else:
self._observation['measures'] = obs
self._observation.setdefault('features', OrderedDict())[
self.observer_name] = features
else:
self._observation = features
[docs]class ControlledJiminyEnv(BasePipelineWrapper):
"""Wrap a `BaseJiminyEnv` Gym environment and a single controller, so that
it appears as a single, unified, environment. Eventually, the environment
can already be wrapped inside one or several `gym.Wrapper` containers.
If several successive controllers must be used, just wrap successively each
controller one by one with the resulting `ControlledJiminyEnv`.
.. aafig::
:proportional:
:textual:
+---------+ +----------+ +----------+
"act_1" | | "act_2" | | "act_3" | |
------->+ "env" +------> "+" ------->+ "ctrl_1" +--------> "+" ------->+ "ctrl_2" +--------> "="
| | "obs" | | "cmd_1" | | "cmd_2"
+---------+ +----------+ +----------+
+----------------------------------------------------+
| |
v |
+-----+----+ +----------+ +---------+ |
"act_3" | | "act_2" | | "act_1" | | | "obs + cmd_1 + cmd_2"
------->+ "ctrl_2" +-------->+ "ctrl_1" +-------->+ "env" +--o---o---------------------->
| | "cmd_2" | | "cmd_1" | | | "obs + cmd_1"
+----------+ +-----+----+ +---------+ |
^ |
| |
+---------------------------+
The output command 'cmd_X' of 'ctrl_X' must be consistent with the action
space 'act_X' of the subsequent block. The action space of the outcoming
unified environment will be the action space of the highest-level
controller 'act_N', while its observation space will be the one of the
unwrapped environment 'obs'. Alternatively, the later can also gather the
(stacked) action space of the successive controllers if one is to observe
the intermediary controllers' targets.
.. note::
The environment and each controller has their own update period.
.. warning::
This design is not suitable for learning the controllers 'ctrl_X', but
rather for robotic-oriented controllers, such as PID control, inverse
kinematics, admittance control, or Model Predictive Control (MPC). It
is recommended to add the controllers into the policy itself if it has
to be trainable.
""" # noqa: E501 # pylint: disable=line-too-long
observation_space: Optional[gym.Space]
def __init__(self,
env: Union[gym.Wrapper, BaseJiminyEnv],
controller: BaseControllerBlock,
augment_observation: bool = False,
**kwargs: Any):
"""
.. note::
As a reminder, `env.step_dt` refers to the learning step period,
namely the timestep between two successive frames:
[obs, reward, done, info]
This definition remains true, independently of whether or not the
environment is wrapped with a controller using this class. On the
contrary, `env.control_dt` corresponds to the apparent control
update period, namely the update period of the higher-level
controller if multiple are piped together. The same goes for
`env.observe_dt`.
:param env: Environment to control. It can be an already controlled
environment wrapped in `ControlledJiminyEnv` if one desires
to stack several controllers with `BaseJiminyEnv`.
:param controller: Controller to use to send targets to the subsequent
block.
:param augment_observation: Whether or not to gather the target of the
controller with the observation of the
environment. This option is only available
if the observation space is of type
`gym.spaces.Dict`.
Optional: Disabled by default.
:param kwargs: Extra keyword arguments to allow automatic pipeline
wrapper generation.
"""
# Backup user arguments
self.controller = controller
self.augment_observation = augment_observation
# Make sure that the unwrapped environment matches the controlled one
assert env.unwrapped is controller.env
# Initialize base wrapper
super().__init__(env, **kwargs)
# Enable terminal reward only if the controller implements it
self.enable_reward_terminal = self.controller.enable_reward_terminal
# Set the controller name, based on the controller index
self.controller_name = f"controller_{self._get_block_index()}"
# Update the action space
self.action_space = self.controller.action_space
# Check that 'augment_observation' can be enabled
assert not self.augment_observation or isinstance(
self.env.observation_space, gym.spaces.Dict), (
"'augment_observation' is only available for environments whose "
"observation space inherits from `gym.spaces.Dict`.")
# Append the controller's target to the observation if requested
self.observation_space = deepcopy(self.env.observation_space)
if self.augment_observation:
if not isinstance(self.observation_space, gym.spaces.Dict):
self.observation_space = gym.spaces.Dict(OrderedDict(
measures=self.observation_space))
self.observation_space.spaces.setdefault(
'targets', gym.spaces.Dict()).spaces[
self.controller_name] = self.controller.action_space
# Initialize some internal buffers
self._action = zeros(self.action_space, dtype=np.float64)
self._target = zeros(self.env.action_space, dtype=np.float64)
self._observation = zeros(self.observation_space)
# Register the controller target to the telemetry
self.env.register_variable("action",
self._action,
self.controller.get_fieldnames(),
self.controller_name)
[docs] def _setup(self) -> None:
"""Configure the wrapper.
In addition to the base implementation, it configures the controller
and registers its target to the telemetry.
"""
# Configure the controller
self.controller._setup()
# Call base implementation
super()._setup()
# Reset some additional internal buffers
fill(self._target, 0.0)
# Compute the observe and control update periods
self.observe_dt = self.env.observe_dt
self.control_dt = self.controller.control_dt
[docs] def compute_command(self,
measure: DataNested,
action: DataNested
) -> DataNested:
"""Compute the motors efforts to apply on the robot.
In practice, it updates, whenever it is necessary:
- the target sent to the subsequent block by the controller
- the command send to the robot by the environment through the
subsequent block
:param measure: Observation of the environment.
:param action: High-level target to achieve.
"""
# Update the target to send to the subsequent block if necessary.
# Note that `_observation` buffer has already been updated right before
# calling this method by `_controller_handle`, so it can be used as
# measure argument without issue.
t = self.stepper_state.t
if is_breakpoint(t, self.control_dt, self._dt_eps):
target = self.controller.compute_command(self._observation, action)
set_value(self._target, target)
# Update the command to send to the actuators of the robot.
# Note that the environment itself is responsible of making sure to
# update the command of the right period. Ultimately, this is done
# automatically by the engine, which is calling `_controller_handle` at
# the right period.
if self.simulator.is_simulation_running:
# Do not update command during the first iteration because the
# action is undefined at this point
set_value(self.env._action, self._target)
set_value(self._command, self.env.compute_command(
self._observation, self._target))
return self._command
[docs] def refresh_observation(self) -> None: # type: ignore[override]
"""Compute the unified observation based on the current wrapped
environment's observation and controller's target.
It gathers the actual observation from the environment with the target
of the controller, if requested, otherwise it forwards the observation
directly without any further processing.
.. warning::
Beware it shares the environment observation whenever it is
possible for the sake of efficiency. Despite that, it is safe to
call this method multiple times successively.
:returns: Original environment observation, eventually including
controllers targets if requested.
"""
# Get environment observation
super().refresh_observation()
# Add target to observation if requested
if not self.simulator.is_simulation_running:
obs = self.env.get_observation()
if self.augment_observation:
assert isinstance(self._observation, dict)
# Make sure to store references
if isinstance(obs, dict):
self._observation = copy(obs)
else:
self._observation['measures'] = obs
self._observation.setdefault('targets', OrderedDict())[
self.controller_name] = self._action
else:
self._observation = obs
[docs] def compute_reward(self, *args: Any, **kwargs: Any) -> float:
return self.controller.compute_reward(*args, **kwargs)
[docs] def compute_reward_terminal(self, *args: Any, **kwargs: Any) -> float:
return self.controller.compute_reward_terminal(*args, **kwargs)