"""Helper methods to generate learning environment pipeline, consisting in an
bare-bone environment inheriting from `BaseJiminyEnv`, wrapped together with
any number of successive blocks as a unified environment, in Matlab Simulink
fashion.
It enables to break down a complex control architectures in many submodules,
making it easier to maintain and avoiding code duplications between use cases.
"""
import re
import json
import pathlib
from pydoc import locate
from dataclasses import asdict
from functools import partial
from collections.abc import Sequence
from typing import (
Dict, Any, Optional, Union, Type, Sequence as SequenceT, Callable,
TypedDict, Literal, overload, cast)
import h5py
import toml
import numpy as np
import gymnasium as gym
import jiminy_py.core as jiminy
from jiminy_py.dynamics import State, Trajectory
from ..bases import (InterfaceJiminyEnv,
InterfaceBlock,
BaseControllerBlock,
BaseObserverBlock,
BasePipelineWrapper,
ObservedJiminyEnv,
ControlledJiminyEnv,
ComposedJiminyEnv,
AbstractReward,
MixtureReward,
AbstractTerminationCondition)
from ..envs import BaseJiminyEnv
[docs]
class CompositionConfig(TypedDict, total=False):
"""Store information required for instantiating a given composition, which
comprises reward components or a termination condition at the time being.
Specifically, it is a dictionary comprising the class of the composition
that must derive from `AbstractReward` or `AbstractTerminationCondition]`,
and optionally some keyword-arguments to pass to its constructor.
"""
cls: Union[Type[AbstractReward], Type[AbstractTerminationCondition], str]
"""Composition class type.
.. note::
Both class type or fully qualified dotted path are supported.
"""
kwargs: Dict[str, Any]
"""Composition constructor keyword-arguments.
This attribute can be omitted.
"""
[docs]
class TrajectoryDatabaseConfig(TypedDict, total=False):
"""Store information required for adding a database of reference
trajectories to the environment.
Specifically, it is a dictionary comprising a set of named trajectories as
a dictionary whose keys are the name of the trajectories and values are
either the trajectory itself or the path of a file storing its dump in HDF5
format, the name of the selected trajectory, and its interpolation mode.
"""
dataset: Dict[str, Union[str, Trajectory]]
"""Set of named trajectories as a dictionary.
.. note::
Both `Trajectory` objects or path (absolute or relative) are supported.
"""
name: str
"""Name of the selected trajectory if any.
This attribute can be omitted.
"""
mode: Literal['raise', 'wrap', 'clip']
"""Interpolation mode of the selected trajectory if any.
This attribute can be omitted.
"""
[docs]
class EnvConfig(TypedDict, total=False):
"""Store information required for instantiating a given base environment
and compose it with some additional reward components and termination
conditions.
Specifically, it is a dictionary comprising the class of the base
environment, which must derive from `BaseJiminyEnv`, optionally some
keyword-arguments that must be passed to its corresponding constructor, and
eventually the configuration of some additional reward with which to
compose the base environment.
"""
cls: Union[Type[BaseJiminyEnv], str]
"""Environment class type.
.. note::
Both class type or fully qualified dotted path are supported.
"""
kwargs: Dict[str, Any]
"""Environment constructor default arguments.
This attribute can be omitted.
"""
reward: CompositionConfig
"""Reward configuration.
This attribute can be omitted.
"""
terminations: SequenceT[CompositionConfig]
"""Sequence of configuration for every individual termination conditions.
This attribute can be omitted.
"""
trajectories: TrajectoryDatabaseConfig
"""Reference trajectory database configuration.
This attribute can be omitted.
"""
[docs]
class BlockConfig(TypedDict, total=False):
"""Store information required for instantiating a given observation or
control block.
Specifically, it is a dictionary comprising the class of the block, which
must derive from `BaseControllerBlock` or `BaseObserverBlock`, and
optionally some keyword-arguments that must be passed to its corresponding
constructor.
"""
cls: Union[Type[BaseControllerBlock], Type[BaseObserverBlock], str]
"""Block class type. If must derive from `BaseControllerBlock` for
controller blocks or from `BaseObserverBlock` for observer blocks.
.. note::
Both class type or fully qualified dotted path are supported.
"""
kwargs: Dict[str, Any]
"""Block constructor default arguments.
This attribute can be omitted.
"""
[docs]
class WrapperConfig(TypedDict, total=False):
"""Store information required for instantiating a given environment
pipeline wrapper.
Specifically, it is a dictionary comprising the class of the wrapper, which
must derive from `BasePipelineWrapper`, and optionally some
keyword-arguments that must be passed to its corresponding constructor.
"""
cls: Union[Type[BasePipelineWrapper], str]
"""Wrapper class type.
.. note::
Both class type or fully qualified dotted path are supported.
"""
kwargs: Dict[str, Any]
"""Wrapper constructor default arguments.
This attribute can be omitted.
"""
[docs]
class LayerConfig(TypedDict, total=False):
"""Store information required for instantiating a given environment
pipeline layer, ie either a wrapper, or the combination of an observer /
controller block with its corresponding wrapper.
Specifically, it is a dictionary comprising the configuration of the block
if any, and optionally the configuration of the reward and termination. It
is generally sufficient to specify either one or the other. See the
documentation of the both fields for details.
"""
block: BlockConfig
"""Block configuration.
This attribute can be omitted. If so, then 'wrapper_cls' must be
specified and must not require any block. Typically, it happens when the
wrapper is not doing any computation on its own but just transforming the
action or observation, e.g. stacking observation frames.
"""
wrapper: WrapperConfig
"""Wrapper configuration.
This attribute can be omitted. If so, then 'block' must be specified and
must this block must be associated with a unique wrapper type to allow for
automatic type inference. It works with any observer and controller block.
"""
[docs]
def build_pipeline(env_config: EnvConfig,
layers_config: SequenceT[LayerConfig],
*,
root_path: Optional[Union[str, pathlib.Path]] = None
) -> Callable[..., InterfaceJiminyEnv]:
"""Wrap together an environment inheriting from `BaseJiminyEnv` with any
number of layers, as a unified pipeline environment class inheriting from
`BasePipelineWrapper`. Each layer is wrapped individually and successively.
:param env_config:
Configuration of the environment, as a dict of type `EnvConfig`.
:param layers_config:
Configuration of the blocks, as a list. The list is ordered from the
lowest level layer to the highest, each element corresponding to the
configuration of a individual layer, as a dict of type `LayerConfig`.
"""
# Define helper to sanitize composition configuration
def sanitize_composition_config(composition_config: CompositionConfig,
is_reward: bool) -> None:
"""Sanitize composition configuration in-place.
:param composition_config: Configuration of the composition, as a
dict of type `CompositionConfig`.
"""
# Get composition class type
cls = composition_config["cls"]
if isinstance(cls, str):
obj = locate(cls)
if obj is None:
raise RuntimeError(f"Class '{cls}' not found.")
assert isinstance(obj, type) and (
(is_reward and issubclass(obj, AbstractReward)) or
(not is_reward and issubclass(
obj, AbstractTerminationCondition)))
composition_config["cls"] = cls = obj
# Get its constructor keyword-arguments
kwargs = composition_config.get("kwargs", {})
# Special handling for `MixtureReward`
if is_reward and issubclass(cls, MixtureReward):
for component_config in kwargs["components"]:
sanitize_composition_config(component_config, is_reward)
@overload
def build_composition(
env: InterfaceJiminyEnv,
composition_config: CompositionConfig,
is_reward: Literal[True]
) -> AbstractReward:
...
@overload
def build_composition(
env: InterfaceJiminyEnv,
composition_config: CompositionConfig,
is_reward: Literal[False]
) -> AbstractTerminationCondition:
...
# Define helper to build the composition
def build_composition(
env: InterfaceJiminyEnv,
composition_config: CompositionConfig,
is_reward: bool
) -> Union[AbstractReward, AbstractTerminationCondition]:
"""Instantiate a composition associated with a given environment from
some composition configuration.
:param env: Base environment or pipeline wrapper to wrap.
:param composition_config: Configuration of the composition, as a
dict of type `CompositionConfig`.
"""
# Get composition class type
cls = composition_config["cls"]
assert isinstance(cls, type)
# Get its constructor keyword-arguments
kwargs = composition_config.get("kwargs", {}).copy()
# Special handling for `MixtureReward`
if is_reward and issubclass(cls, MixtureReward):
kwargs["components"] = tuple(
build_composition(env, reward_config, is_reward)
for reward_config in kwargs["components"])
# Special handling for 'quantity' key
if "quantity" in kwargs:
quantity_config = kwargs["quantity"]
kwargs["quantity"] = (
quantity_config["cls"], quantity_config["kwargs"])
return cls(env, **kwargs)
# Define helper to build reward
def build_composition_layer(
env_creator: Callable[..., InterfaceJiminyEnv],
reward_config: Optional[CompositionConfig],
terminations_config: SequenceT[CompositionConfig],
trajectories_config: Optional[TrajectoryDatabaseConfig],
**env_kwargs: Any) -> InterfaceJiminyEnv:
"""Helper adding reward components and/or termination conditions on top
of a base environment or a pipeline using `ComposedJiminyEnv` wrapper.
:param env_creator: Callable that takes optional keyword arguments as
input and returns an pipeline or base environment.
:param reward_config: Configuration of the reward, as a dict of type
`CompositionConfig`.
:param termination_config: Configuration of the termination conditions,
as a sequence of dict of type
`CompositionConfig`.
:param trajectories: Set of named trajectories as a dictionary. See
`ComposedJiminyEnv` documentation for details.
:param env_kwargs: Keyword arguments to forward to the constructor of
the wrapped environment. Note that it will only
overwrite the default value, so it will still be
possible to set different values by explicitly
defining them when calling the constructor of the
generated wrapper.
"""
# Instantiate the environment, which may be a lower-level wrapper
env = env_creator(**env_kwargs)
# Instantiate the reward
reward = None
if reward_config is not None:
reward = build_composition(env, reward_config, True)
# Instantiate the termination conditions
terminations = tuple(
build_composition(env, termination_config, False)
for termination_config in terminations_config)
# Get trajectory dataset
trajectories: Dict[str, Trajectory] = {}
if trajectories_config is not None:
trajectories = cast(
Dict[str, Trajectory], trajectories_config["dataset"])
# Instantiate the composition wrapper if necessary
if reward or terminations or trajectories:
env = ComposedJiminyEnv(env,
reward=reward,
terminations=terminations,
trajectories=trajectories)
# Select the reference trajectory if specified
if trajectories_config is not None:
name = trajectories_config.get("name")
if name is not None:
mode = trajectories_config.get("mode", "raise")
env.quantities.select_trajectory(name, mode)
return env
# Define helper to wrap a single layer
def build_controller_observer_layer(
env_creator: Callable[..., InterfaceJiminyEnv],
wrapper_cls: Type[BasePipelineWrapper],
wrapper_kwargs: Dict[str, Any],
block_cls: Optional[Type[InterfaceBlock]],
block_kwargs: Dict[str, Any],
**env_kwargs: Any
) -> BasePipelineWrapper:
"""Helper wrapping a base environment or a pipeline with an additional
observer-controller layer.
:param env_creator: Callable that takes optional keyword arguments as
input and returns an pipeline or base environment.
:param wrapper_cls: Type of wrapper to use to gather the environment
and the block.
:param wrapper_kwargs: Keyword arguments to forward to the constructor
of the wrapper. See 'env_kwargs'.
:param block_cls: Type of block to connect to the environment, if
any. `None` to disable.
Optional: Disabled by default
:param block_kwargs: Keyword arguments to forward to the constructor of
the wrapped block. See 'env_kwargs'.
:param env_kwargs: Keyword arguments to forward to the constructor of
the wrapped environment. Note that it will only
overwrite the default value, so it will still be
possible to set different values by explicitly
defining them when calling the constructor of the
generated wrapper.
"""
# Initialize constructor arguments
args: Any = []
# Instantiate the environment, which may be a lower-level wrapper
env = env_creator(**env_kwargs)
args.append(env)
# Instantiate the block associated with the wrapper if any
if block_cls is not None:
block_name = block_kwargs.pop("name", None)
if block_name is None:
block_index = 0
env_wrapper: gym.Env = env
while isinstance(env_wrapper, BasePipelineWrapper):
if isinstance(env_wrapper, ControlledJiminyEnv):
if isinstance(env_wrapper.controller, block_cls):
block_index += 1
elif isinstance(env_wrapper, ObservedJiminyEnv):
if isinstance(env_wrapper.observer, block_cls):
block_index += 1
env_wrapper = env_wrapper.env
block_name = re.sub(
r"([a-z\d])([A-Z])", r'\1_\2', re.sub(
r"([A-Z]+)([A-Z][a-z])", r'\1_\2', block_cls.__name__)
).lower()
if block_index:
block_name += f"_{block_index}"
block = block_cls(block_name, env, **block_kwargs)
args.append(block)
# Instantiate the wrapper
return wrapper_cls(*args, **wrapper_kwargs)
# Define callable for instantiating the base environment
env_cls = env_config["cls"]
if isinstance(env_cls, str):
obj = locate(env_cls)
assert isinstance(obj, type) and issubclass(obj, BaseJiminyEnv)
env_cls = obj
pipeline_creator: Callable[..., InterfaceJiminyEnv] = partial(
env_cls, **env_config.get("kwargs", {}))
# Parse reward configuration
reward_config = env_config.get("reward")
if reward_config is not None:
sanitize_composition_config(reward_config, is_reward=True)
# Parse the configuration of every termination conditions
terminations_config = env_config.get("terminations", ())
assert isinstance(terminations_config, Sequence)
for termination_config in terminations_config:
sanitize_composition_config(termination_config, is_reward=False)
# Parse trajectory configuration
trajectories_config = env_config.get("trajectories")
if trajectories_config is not None:
trajectories = trajectories_config['dataset']
assert isinstance(trajectories, dict)
for name, path_or_traj in trajectories.items():
if isinstance(path_or_traj, Trajectory):
continue
path = pathlib.Path(path_or_traj)
if not path.is_absolute():
if root_path is None:
raise RuntimeError(
"The argument 'root_path' must be provided when "
"specifying relative trajectory paths.")
path = pathlib.Path(root_path) / path
trajectories[name] = load_trajectory_from_hdf5(path)
# Generate pipeline recursively
for layer_config in layers_config:
# Extract block and wrapper config
block_config = layer_config.get("block") or {}
wrapper_config = layer_config.get("wrapper") or {}
# Make sure block and wrappers are class type and parse them if string
block_cls = block_config.get("cls")
block_cls_: Optional[Type[InterfaceBlock]] = None
if isinstance(block_cls, str):
obj = locate(block_cls)
assert (isinstance(obj, type) and
issubclass(obj, InterfaceBlock))
block_cls_ = obj
elif block_cls is not None:
assert issubclass(block_cls, InterfaceBlock)
block_cls_ = block_cls
wrapper_cls = wrapper_config.get("cls")
wrapper_cls_: Optional[Type[BasePipelineWrapper]] = None
if isinstance(wrapper_cls, str):
obj = locate(wrapper_cls)
assert (isinstance(obj, type) and
issubclass(obj, BasePipelineWrapper))
wrapper_cls_ = obj
elif wrapper_cls is not None:
assert (isinstance(wrapper_cls, type) and
issubclass(wrapper_cls, BasePipelineWrapper))
wrapper_cls_ = wrapper_cls
# Handling of default keyword arguments
block_kwargs = block_config.get("kwargs", {})
wrapper_kwargs = wrapper_config.get("kwargs", {})
# Handling of default wrapper class type
if wrapper_cls_ is None:
if block_cls_ is not None:
if issubclass(block_cls_, BaseControllerBlock):
wrapper_cls_ = ControlledJiminyEnv
elif issubclass(block_cls_, BaseObserverBlock):
wrapper_cls_ = ObservedJiminyEnv
else:
raise ValueError(
f"Block of type '{block_cls_}' does not support "
"automatic default wrapper type inference. Please "
"specify it manually.")
else:
raise ValueError(
"Either 'block.cls' or 'wrapper.cls' must be specified.")
# Add layer on top of the existing pipeline
pipeline_creator = partial(build_controller_observer_layer,
pipeline_creator,
wrapper_cls_,
wrapper_kwargs,
block_cls_,
block_kwargs)
# Add extra user-specified reward, termination conditions and trajectories
pipeline_creator = partial(build_composition_layer,
pipeline_creator,
reward_config,
terminations_config,
trajectories_config)
return pipeline_creator
[docs]
def load_pipeline(fullpath: Union[str, pathlib.Path]
) -> Callable[..., InterfaceJiminyEnv]:
"""Load pipeline from JSON or TOML configuration file.
:param: Fullpath of the configuration file.
"""
fullpath = pathlib.Path(fullpath)
root_path, file_ext = fullpath.parent, fullpath.suffix
with open(fullpath, 'r') as f:
if file_ext == '.json':
return build_pipeline(**json.load(f), root_path=root_path)
if file_ext == '.toml':
return build_pipeline(**toml.load(f), root_path=root_path)
raise ValueError("Only json and toml formats are supported.")
[docs]
def save_trajectory_to_hdf5(trajectory: Trajectory,
fullpath: Union[str, pathlib.Path]) -> None:
"""Export a trajectory object to HDF5 format.
:param trajectory: Trajectory object to save.
:param fullpath: Fullpath of the generated HDF5 file.
"""
# Create HDF5 file
hdf_obj = h5py.File(fullpath, "w")
# Dump each state attribute that are specified for all states at once
if trajectory.states:
state_dict = asdict(trajectory.states[0])
state_fields = tuple(
key for key, value in state_dict.items() if value is not None)
for key in state_fields:
data = np.stack([
getattr(state, key) for state in trajectory.states], axis=0)
hdf_obj.create_dataset(name=f"states/{key}", data=data)
# Dump serialized robot
robot_data = jiminy.save_to_binary(trajectory.robot)
dataset = hdf_obj.create_dataset(name="robot", data=np.array(robot_data))
# Dump whether to use the theoretical model of the robot
dataset.attrs["use_theoretical_model"] = trajectory.use_theoretical_model
# Close the HDF5 file
hdf_obj.close()
[docs]
def load_trajectory_from_hdf5(
fullpath: Union[str, pathlib.Path]) -> Trajectory:
"""Import a trajectory object from file in HDF5 format.
:param fullpath: Fullpath of the HDF5 file to import.
:returns: Loaded trajectory object.
"""
# Open HDF5 file
hdf_obj = h5py.File(fullpath, "r")
# Get all state attributes that are specified
states_dict = {}
if 'states' in hdf_obj.keys():
for key, value in hdf_obj['states'].items():
states_dict[key] = value[...]
# Re-construct state sequence
states = []
for args in zip(*states_dict.values()):
states.append(State(**dict(zip(states_dict.keys(), args))))
# Build trajectory from data.
# Null char '\0' must be added at the end to match original string length.
dataset = hdf_obj['robot']
robot_data = dataset[()]
robot_data += b'\0' * (
dataset.nbytes - len(robot_data)) # pylint: disable=no-member
try:
robot = jiminy.load_from_binary(robot_data)
except MemoryError as e:
raise MemoryError(
"Impossible to build robot from serialized binary data. Make sure "
"that data has been generated on a machine with the same hardware "
"as this one.") from e
# Load whether to use the theoretical model of the robot
use_theoretical_model = dataset.attrs["use_theoretical_model"]
# Close the HDF5 file
hdf_obj.close()
# Re-construct the whole trajectory
return Trajectory(states, robot, use_theoretical_model)