Source code for gym_jiminy.common.blocks.quantity_observer

"""Implementation of Mahony filter block compatible with gym_jiminy
reinforcement learning pipeline environment design.
"""
from typing import Any, Type, TypeVar, cast

import numpy as np
import gymnasium as gym

from jiminy_py import tree

from ..bases import (
    BaseObs, BaseAct, BaseObserverBlock, InterfaceJiminyEnv, InterfaceQuantity)
from ..utils import DataNested, build_copyto


ValueT = TypeVar('ValueT', bound=DataNested)


[docs] def get_space(data: DataNested) -> gym.Space[DataNested]: """Infer space from a given value. .. warning:: Beware that space inference is lossly. Firstly, one cannot discriminate between `gym.spaces.Box` and other non-container spaces, e.g. `gym.spaces.Discrete` or `gym.spaces.MultiBinary`. Because of this limitation, it is assumed that all `np.ndarray` data has been sampled by a `gym.spaces.Box` space. Secondly, it is impossible to determine the bounds of the space, so it is assumed to be unbounded. :param value: Any value sampled from a given space. """ data_type = type(data) if tree.issubclass_mapping(data_type): return gym.spaces.Dict([ (field, get_space(value)) for field, value in data.items()]) # type: ignore[union-attr] if tree.issubclass_sequence(data_type): return gym.spaces.Tuple([get_space(value) for value in data]) assert isinstance(data, np.ndarray) return gym.spaces.Box( low=float("-inf"), high=float("inf"), shape=data.shape, dtype=data.dtype.type)
[docs] class QuantityObserver(BaseObserverBlock[ValueT, None, BaseObs, BaseAct]): """Add a given pre-defined quantity to the observation of the environment. .. warning:: The observation space of a quantity must be invariant. Yet, nothing prevent the shape of the quantity to change dynamically. As a result, it is up to user to make sure that does not occur in practice, otherwise it will raise an exception. """ def __init__(self, name: str, env: InterfaceJiminyEnv[BaseObs, BaseAct], quantity: Type[InterfaceQuantity[ValueT]], *, update_ratio: int = 1, **kwargs: Any) -> None: """ :param name: Name of the block. :param env: Environment to connect with. :param quantity: Type of the quantity. :param update_ratio: Ratio between the update period of the observer and the one of the subsequent observer. -1 to match the simulation timestep of the environment. Optional: `1` by default. :param kwargs: Additional arguments that will be forwarded to the constructor of the quantity. """ # Add the quantity to the environment and define proxy for fast access self.data = env.quantities.add(name, (quantity, kwargs)) # Initialize the observer super().__init__(name, env, update_ratio) # Try to bind the memory of the quantity to the observation. # Note that there is no guarantee that the quantity will be updated # in-place without dynamic memory allocation, so it needs to be checked # at run-time systematically and copy the value if necessary. self.observation = self.data.get() # Define specialized copyto operator for efficiency. # This is necessary because there is no guarantee that the quantity # will be updated in-place without dynamic memory allocation. self._copyto_observation = build_copyto(self.observation) def __del__(self) -> None: try: self.env.quantities.discard(self.name) except Exception: # pylint: disable=broad-except # This method must not fail under any circumstances pass def _initialize_observation_space(self) -> None: # Let us infer the observation space from the value of the quantity. # Note that it is always valid to fetch the value of a quantity, even # if no simulation is running. self.observation_space = cast( gym.Space[ValueT], get_space(self.data.get()))
[docs] def refresh_observation(self, measurement: BaseObs) -> None: # Evaluate the quantity value = self.data.get() # Update the observation in-place in case of dynamic memory allocation if self.observation is not value: self._copyto_observation(value)