Source code for gym_jiminy.common.bases.quantities

"""This module promotes quantities as first-class objects.

Defining quantities this way allows for standardization of common intermediary
metrics for computation of reward components and and termination conditions, eg
the center of pressure or the average spatial velocity of a frame. Overall, it
greatly reduces code duplication and bugs.

Apart from that, it offers a dedicated quantity manager that is responsible for
optimizing the computation path, which is expected to significantly increase
the step collection throughput. This speedup is achieved by caching already
computed that did not changed since then, computing redundant intermediary
quantities only once per step, and gathering similar quantities in a large
batch to leverage vectorization of math instructions.
"""
import re
import weakref
from enum import IntEnum
from weakref import ReferenceType
from abc import ABC, abstractmethod
from collections import OrderedDict
from collections.abc import MutableSet
from dataclasses import dataclass, replace
from functools import wraps
from typing import (
    Any, Dict, List, Optional, Tuple, Generic, TypeVar, Type, Iterator,
    Collection, Callable, Literal, ClassVar, TYPE_CHECKING)

import numpy as np

import jiminy_py.core as jiminy
from jiminy_py.core import (  # pylint: disable=no-name-in-module
    array_copyto, multi_array_copyto)
from jiminy_py.dynamics import State, Trajectory, update_quantities
import pinocchio as pin

from .interfaces import InterfaceJiminyEnv


ValueT = TypeVar('ValueT')


[docs] class WeakMutableCollection(MutableSet, Generic[ValueT]): """Mutable unordered list container storing weak reference to objects. Elements will be discarded when no strong reference to the value exists anymore, and a user-specified callback will be triggered if any. Internally, it is implemented as a set for which uniqueness is characterized by identity instead of equality operator. """ __slots__ = ("_callback", "_weakrefs") def __init__(self, callback: Optional[Callable[[ "WeakMutableCollection[ValueT]", ReferenceType ], None]] = None) -> None: """ :param callback: Callback that will be triggered every time an element is discarded from the container. Optional: None by default. """ self._callback = callback self._weakrefs: List[ReferenceType] = [] def __callback__(self, ref: ReferenceType) -> None: """Internal method that will be called every time an element must be discarded from the containers, either because it was requested by the user or because no strong reference to the value exists anymore. If a callback has been specified by the user, it will be triggered after removing the weak reference from the container. """ # Even though a temporary weak reference is provided for removal, the # identity check is performed on the object being stored. If the latter # has already been deleted, then one of the object in the list that # has been deleted while be removed. It is not a big deal if it was # actually the right weak reference since all of them will be removed # in the end, so it is not a big deal. value = ref() for i, ref_i in enumerate(self._weakrefs): if value is ref_i(): del self._weakrefs[i] break if self._callback is not None: self._callback(self, ref) def __contains__(self, obj: Any) -> bool: """Dunder method to check if a weak reference to a given object is already stored in the container, which is characterized by identity instead of equality operator. :param obj: Object to look for in the container. """ return any(ref() is obj for ref in self._weakrefs) def __iter__(self) -> Iterator[ValueT]: """Dunder method that returns an iterator over the objects of the container for which a reference still exist. """ for ref in self._weakrefs: obj = ref() if obj is not None: yield obj def __len__(self) -> int: """Dunder method that returns the length of the container. """ return len(self._weakrefs)
[docs] def add(self, value: ValueT) -> None: """Add a new element to the container if not already contained. This has no effect if the element is already present. :param obj: Object to add to the container. """ if value not in self: self._weakrefs.append(weakref.ref(value, self.__callback__))
[docs] def discard(self, value: ValueT) -> None: """Remove an element from the container if stored in it. This method does not raise an exception when the element is missing. :param obj: Object to remove from the container. """ if value in self: self.__callback__(weakref.ref(value))
[docs] class QuantityStateMachine(IntEnum): """Specify the current state of a given (unique) quantity, which determines the steps to perform for retrieving its current value. """ IS_RESET = 0 """The quantity at hands has just been reset. The quantity must first be initialized, then refreshed and finally stored in cached before to retrieve its value. """ IS_INITIALIZED = 1 """The quantity at hands has been initialized but never evaluated for the current robot state. Its value must still be refreshed and stored in cache before to retrieve it. """ IS_CACHED = 2 """The quantity at hands has been evaluated and its value stored in cache. As such, its value can be retrieve from cache directly. """
# Define proxies for fast lookup _IS_RESET, _IS_INITIALIZED, _IS_CACHED = ( # pylint: disable=invalid-name QuantityStateMachine)
[docs] class SharedCache(Generic[ValueT]): """Basic thread local shared cache. Its API mimics `std::optional` from the Standard C++ library. All it does is encapsulating any Python object as a mutable variable, plus exposing a simple mechanism for keeping track of all "owners" of the cache. .. warning:: This implementation is not thread safe. """ __slots__ = ( "_value", "_weakrefs", "_owner", "_auto_refresh", "sm_state", "owners") owners: Collection["InterfaceQuantity[ValueT]"] """Owners of the shared buffer, ie quantities relying on it to store the result of their evaluation. This information may be useful for determining the most efficient computation path overall. .. note:: Quantities must add themselves to this list when passing them a shared cache instance. .. note:: Internally, it stores weak references to avoid holding alive quantities that could be garbage collected otherwise. `WeakSet` cannot be used because owners are objects all having the same hash, eg "identical" quantities. """ def __init__(self) -> None: """ """ # Cached value if any self._value: Optional[ValueT] = None # Whether auto-refresh is requested self._auto_refresh = True # Basic state machine management self.sm_state: QuantityStateMachine = QuantityStateMachine.IS_RESET # Initialize "owners" of the shared buffer. # Define callback to reset part of the computation graph whenever a # quantity owning the cache gets garbage collected, namely all # quantities that may assume at some point the existence of this # deleted owner to adjust their computation path. def _callback( self: WeakMutableCollection["InterfaceQuantity[ValueT]"], ref: ReferenceType) -> None: # pylint: disable=unused-argument owner: Optional["InterfaceQuantity[ValueT]"] for owner in self: # Stop going up in parent chain if dynamic computation graph # update is disable for efficiency. while (owner.allow_update_graph and owner.parent is not None and owner.parent.has_cache): owner = owner.parent owner.reset(reset_tracking=True) # Initialize weak reference to owning quantities self._weakrefs = WeakMutableCollection(_callback) # Maintain alive owning quantities upon reset self.owners = self._weakrefs self._owner: Optional["InterfaceQuantity[ValueT]"] = None
[docs] def add(self, owner: "InterfaceQuantity[ValueT]") -> None: """Add a given quantity instance to the set of co-owners associated with the shared cache at hands. .. warning:: All shared cache co-owners must be instances of the same unique quantity. An exception will be thrown if an attempt is made to add a quantity instance that does not satisfy this condition. :param owner: Quantity instance to add to the set of co-owners. """ # Make sure that the quantity is not already part of the co-owners if id(owner) in map(id, self.owners): raise ValueError( "The specified quantity instance is already an owner of this " "shared cache.") # Make sure that the new owner is consistent with the others if any if any(owner != _owner for _owner in self._weakrefs): raise ValueError( "Quantity instance inconsistent with already existing shared " "cache owners.") # Add quantity instance to shared cache owners self._weakrefs.add(owner) # Refresh owners if self.sm_state is not QuantityStateMachine.IS_RESET: self.owners = tuple(self._weakrefs)
[docs] def discard(self, owner: "InterfaceQuantity[ValueT]") -> None: """Remove a given quantity instance from the set of co-owners associated with the shared cache at hands. :param owner: Quantity instance to remove from the set of co-owners. """ # Make sure that the quantity is part of the co-owners if id(owner) not in map(id, self.owners): raise ValueError( "The specified quantity instance is not an owner of this " "shared cache.") # Restore "dynamic" owner list as it may be involved in quantity reset self.owners = self._weakrefs # Remove quantity instance from shared cache owners self._weakrefs.discard(owner) # Refresh owners. # Note that one must keep tracking the quantity instance being used in # computations, aka 'self._owner', even if it is no longer an actual # shared cache owner. This is necessary because updating it would # require resetting the state machine, which is not an option as it # would mess up with quantities storing history since initialization. if self.sm_state is not QuantityStateMachine.IS_RESET: self.owners = tuple(self._weakrefs)
[docs] def reset(self, ignore_auto_refresh: bool = False, reset_state_machine: bool = False) -> None: """Clear value stored in cache if any. :param ignore_auto_refresh: Whether to skip automatic refresh of all co-owner quantities of this shared cache. Optional: False by default. :param reset_state_machine: Whether to reset completely the state machine of the underlying quantity, ie not considering it initialized anymore. Optional: False by default. """ # Clear cache if self.sm_state is _IS_CACHED: self.sm_state = _IS_INITIALIZED # Special branch if case quantities must be reset on the way if reset_state_machine: # Reset the state machine completely self.sm_state = _IS_RESET # Update list of owning quantities self.owners = self._weakrefs self._owner = None # Reset auto-refresh buffer self._auto_refresh = True # Refresh automatically if not already proven useless and not ignored if not ignore_auto_refresh and self._auto_refresh: for owner in self.owners: if owner.auto_refresh: owner.get() break else: self._auto_refresh = False
[docs] def get(self) -> ValueT: """Return cached value if any, otherwise evaluate it and store it. """ # Get value already stored if self.sm_state is _IS_CACHED: # return cast(ValueT, self._value) return self._value # type: ignore[return-value] # Evaluate quantity try: if self.sm_state is _IS_RESET: # Cache the list of owning quantities self.owners = tuple(self._weakrefs) # Stick to the first owning quantity systematically owner = self.owners[0] self._owner = owner # Initialize quantity if not already done manually if not owner._is_initialized: owner.initialize() assert owner._is_initialized # Get first owning quantity systematically # assert self._owner is not None owner = self._owner # type: ignore[assignment] # Make sure that the state has been refreshed if owner._force_update_state: owner.state.get() # Refresh quantity value = owner.refresh() except RecursionError as e: raise LookupError( "Mutual dependency between quantities is disallowed.") from e # Update state machine self.sm_state = _IS_CACHED # Return value after storing it self._value = value return value
[docs] class InterfaceQuantity(ABC, Generic[ValueT]): """Interface for generic quantities involved observer-controller blocks, reward components or termination conditions. .. note:: Quantities are meant to be managed automatically via `QuantityManager`. Dealing with quantities manually is error-prone, and as such, is strongly discourage. Nonetheless, power-user that understand the risks are allowed to do it. .. warning:: Mutual dependency between quantities is disallowed. .. warning:: The user is responsible for implementing the dunder methods `__eq__` and `__hash__` that characterize identical quantities. This property is used internally by `QuantityManager` to synchronize cache between them. It is advised to use decorator `@dataclass(unsafe_hash=True)` for convenience, but it can also be done manually. """ requirements: Dict[str, "InterfaceQuantity"] """Intermediary quantities on which this quantity may rely on for its evaluation at some point, depending on the optimal computation path at runtime. They will be exposed to the user as usual attributes. """ allow_update_graph: ClassVar[bool] = True """Whether dynamic computation graph update is allowed. This implies that the quantity can be reset at any point in time to re-compute the optimal computation path, typically after deletion or addition of some other node to its dependent sub-graph. When this happens, the quantity gets reset on the spot, even if a simulation is already running. This is not always acceptable, hence the capability to disable this feature at class-level. """ def __init__(self, env: InterfaceJiminyEnv, parent: Optional["InterfaceQuantity"], requirements: Dict[str, "QuantityCreator"], *, auto_refresh: bool = False) -> None: """ :param env: Base or wrapped jiminy environment. :param parent: Higher-level quantity from which this quantity is a requirement if any, `None` otherwise. :param requirements: Intermediary quantities on which this quantity depends for its evaluation, as a dictionary whose keys are tuple gathering their respective class plus any keyword-arguments of its constructor except 'env' and 'parent'. :param auto_refresh: Whether this quantity must be refreshed automatically as soon as its shared cache has been cleared if specified, otherwise this does nothing. """ # Backup some of user argument(s) self.env = env self.parent = parent self.auto_refresh = auto_refresh # Make sure that all requirement names would be valid as property requirement_names = requirements.keys() if any(re.match('[^A-Za-z0-9_]', name) for name in requirement_names): raise ValueError("The name of all quantity requirements should be " "ASCII alphanumeric characters plus underscore.") # Instantiate intermediary quantities if any self.requirements: Dict[str, InterfaceQuantity] = { name: cls(env, self, **kwargs) for name, (cls, kwargs) in requirements.items()} # Define proxies for user-specified intermediary quantities. # This approach is much faster than hidding quantities behind value # getters. In particular, dynamically adding properties, which is hacky # but which is the fastest alternative option, still adds 35% overhead # on Python 3.11 compared to calling `get` directly. The "official" # approaches are even slower, ie implementing custom `__getattribute__` # method or worst custom `__getattr__` method. for name, quantity in self.requirements.items(): setattr(self, name, quantity) # Update the state explicitly if available but auto-refresh not enabled self._force_update_state = False if isinstance(self, AbstractQuantity): self._force_update_state = not self.state.auto_refresh # Shared cache handling self._cache: Optional[SharedCache[ValueT]] = None self.has_cache = False # Track whether the quantity has been called since previous reset self._is_active = False # Whether the quantity must be re-initialized self._is_initialized: bool = False if TYPE_CHECKING: def __getattr__(self, name: str) -> Any: """Get access to intermediary quantities as first-class properties, without having to do it through `requirements`. .. warning:: Accessing quantities this way is convenient, but unfortunately much slower than do it through dynamically added properties. As a result, this approach is only used to fix typing issues. :param name: Name of the requested quantity. """ try: return self.__getattribute__('requirements')[name].get() except KeyError as e: raise AttributeError( f"'{type(self)}' object has no attribute '{name}'") from e @property def cache(self) -> SharedCache[ValueT]: """Get shared cache if available, otherwise raises an exception. .. warning:: This method is not meant to be overloaded. """ if not self.has_cache: raise RuntimeError( "No shared cache has been set for this quantity. Make sure it " "is managed by some `QuantityManager` instance.") return self._cache # type: ignore[return-value] @cache.setter def cache(self, cache: Optional[SharedCache[ValueT]]) -> None: """Set optional cache variable. When specified, it is used to store evaluated quantity and retrieve its value later one. .. warning:: Value is stored by reference for efficiency. It is up to the user to the copy to retain its current value for later use if necessary. .. note:: One may overload this method to encapsulate the cache in a custom wrapper with specialized 'get' and 'set' methods before passing it to the base implementation. For instance, to enforce copy of the cached value before returning it. """ # Withdraw this quantity from the owners of its current cache if any if self._cache is not None: try: self._cache.discard(self) except ValueError: # This may fail if the quantity is already being garbage # collected when clearing cache. pass # Declare this quantity as owner of the cache if specified if cache is not None: cache.add(self) # Update internal cache attribute self._cache = cache self.has_cache = cache is not None
[docs] def is_active(self, any_cache_owner: bool = False) -> bool: """Whether this quantity is considered active, namely `initialize` has been called at least once since previous tracking reset. :param any_owner: False to check only if this exact instance is active, True if any of the identical quantities (sharing the same cache) is considered sufficient. Optional: False by default. """ if not any_cache_owner or self._cache is None: return self._is_active return any(owner._is_active for owner in self._cache.owners)
[docs] def get(self) -> ValueT: """Get cached value of requested quantity if available, otherwise evaluate it and store it in cache. This quantity is considered active as soon as this method has been called at least once since previous tracking reset. The method `is_active` will be return true even before calling `initialize`. .. warning:: This method is not meant to be overloaded. """ # Delegate getting value to shared cache if available if self._cache is not None: # Get value value = self._cache.get() # This instance is not forceably considered active at this point. # Note that it must be done AFTER getting the value, otherwise it # would mess up with computation graph tracking at initialization. self._is_active = True # Return cached value return value # Evaluate quantity try: # Initialize quantity if not self._is_initialized: self.initialize() assert self._is_initialized # Refresh quantity return self.refresh() except RecursionError as e: raise LookupError( "Mutual dependency between quantities is disallowed.") from e
[docs] def reset(self, reset_tracking: bool = False, *, ignore_other_instances: bool = False) -> None: """Consider that the quantity must be re-initialized before being evaluated once again. If shared cache is available, then it will be cleared and all identity quantities will jointly be reset. .. note:: This method must be called right before performing any agent step, otherwise this quantity will not be refreshed if it was evaluated previously. .. warning:: This method is not meant to be overloaded. :param reset_tracking: Do not consider this quantity as active anymore until the `get` method gets called once again. Optional: False by default. :param ignore_other_instances: Whether to skip reset of intermediary quantities as well as any shared cache co-owner quantity instances. Optional: False by default. """ # Make sure that auto-refresh can be honored if self.auto_refresh and not self.has_cache: raise RuntimeError( "Automatic refresh enabled but no shared cache is available. " "Please add one before calling this method.") # Reset all requirements first if not ignore_other_instances: for quantity in self.requirements.values(): quantity.reset(reset_tracking, ignore_other_instances=False) # Skip reset if dynamic computation graph update is not allowed if self.env.is_simulation_running and not self.allow_update_graph: return # No longer consider this exact instance as active if reset_tracking: self._is_active = False # No longer consider this exact instance as initialized self._is_initialized = False # More work must to be done if shared cache if appropriate if self.has_cache: # Reset all identical quantities. # Note that auto-refresh will be done afterward if requested. if not ignore_other_instances: for owner in self.cache.owners: if owner is not self: owner.reset(reset_tracking=reset_tracking, ignore_other_instances=True) # Reset shared cache # pylint: disable=unexpected-keyword-arg self.cache.reset( ignore_auto_refresh=not self.env.is_simulation_running, reset_state_machine=True)
[docs] def initialize(self) -> None: """Initialize internal buffers. This is typically useful to refresh shared memory proxies or to re-initialize pre-allocated buffers. .. warning:: Intermediary quantities 'requirements' are NOT initialized automatically because they can be initialized lazily in most cases, or are optional depending on the most efficient computation path at run-time. It is up to the developer implementing quantities to take care of it. .. note:: This method must be called before starting a new episode. .. note:: Lazy-initialization is used for efficiency, ie `initialize` will be called before the first time `refresh` has to be called, which may never be the case if cache is shared between multiple identical instances of the same quantity. """ # The quantity is now considered initialized and active unconditionally self._is_initialized = True self._is_active = True
[docs] @abstractmethod def refresh(self) -> ValueT: """Evaluate this quantity based on the agent state at the end of the current agent step. """
QuantityValueT_co = TypeVar('QuantityValueT_co', covariant=True) QuantityCreator = Tuple[ Type[InterfaceQuantity[QuantityValueT_co]], Dict[str, Any]]
[docs] class QuantityEvalMode(IntEnum): """Specify on which state to evaluate a given quantity. """ TRUE = 0 """Current state of the environment. """ REFERENCE = 1 """State of the reference trajectory at the current simulation time. """
# Define proxies for fast lookup _TRUE, _REFERENCE = QuantityEvalMode
[docs] @dataclass(unsafe_hash=True) class AbstractQuantity(InterfaceQuantity, Generic[ValueT]): """Base class for generic quantities involved observer-controller blocks, reward components or termination conditions. .. note:: A dataset of trajectories made available through `self.trajectories`. The latter is synchronized because all quantities as long as shared cached is available. At least one trajectory must be added to the dataset and selected prior to using `QuantityEvalMode.REFERENCE` evaluation mode since the dataset is initially empty by default. .. seealso:: See `InterfaceQuantity` documentation for details. """ mode: QuantityEvalMode """Specify on which state to evaluate this quantity. See `QuantityEvalMode` documentation for details about each mode. .. warning:: Mode `REFERENCE` requires a reference trajectory to be selected manually prior to evaluating this quantity for the first time. """ def __init__(self, env: InterfaceJiminyEnv, parent: Optional[InterfaceQuantity], requirements: Dict[str, "QuantityCreator"], *, mode: QuantityEvalMode = QuantityEvalMode.TRUE, auto_refresh: bool = False) -> None: """ :param env: Base or wrapped jiminy environment. :param parent: Higher-level quantity from which this quantity is a requirement if any, `None` otherwise. :param requirements: Intermediary quantities on which this quantity depends for its evaluation, as a dictionary whose keys are tuple gathering their respective class plus any keyword-arguments of its constructor except 'env' and 'parent'. :param mode: Desired mode of evaluation for this quantity. If mode is set to `QuantityEvalMode.TRUE`, then current simulation state will be used in dynamics computations. If mode is set to `QuantityEvalMode.REFERENCE`, then the state at the current simulation time of the selected reference trajectory will be used instead. :param auto_refresh: Whether this quantity must be refreshed automatically as soon as its shared cache has been cleared if specified, otherwise this does nothing. """ # Backup user argument(s) self.mode = mode # Make sure that no user-specified requirement is named 'trajectory' requirement_names = requirements.keys() if "trajectory" in requirement_names: raise ValueError( "Key 'trajectory' is reserved and cannot be used for " "user-specified requirements.") # Make sure that state requirement is valid if any or use default quantity = requirements.get("state") if quantity is not None: cls, kwargs = quantity if (not issubclass(cls, StateQuantity) or kwargs.setdefault("mode", mode) != mode): raise ValueError( "Key 'state' is reserved and can only be used to specify " "a `StateQuantity` requirement, as a way to give the " "opportunity to overwrite 'update_*' default arguments.") else: requirements["state"] = (StateQuantity, dict(mode=mode)) # Call base implementation super().__init__(env, parent, requirements, auto_refresh=auto_refresh) # Add trajectory quantity proxy trajectory = self.state.trajectory assert isinstance(trajectory, DatasetTrajectoryQuantity) self.trajectory = trajectory # Robot for which the quantity must be evaluated self.robot = jiminy.Robot() self.pinocchio_model = pin.Model() self.pinocchio_data = self.pinocchio_model.createData()
[docs] def initialize(self) -> None: # Call base implementation super().initialize() # Force initializing state quantity self.state.initialize() # Refresh robot proxy assert isinstance(self.state, StateQuantity) self.robot = self.state.robot self.pinocchio_model = self.state.pinocchio_model self.pinocchio_data = self.state.pinocchio_data
[docs] def sync(fun: Callable[..., None]) -> Callable[..., None]: """Wrap any `InterfaceQuantity` instance method to forward call to all co-owners of the same shared cache. This wrapper is useful to keep all identical instances of the same quantity in sync. """ @wraps(fun) def fun_safe(self: InterfaceQuantity, *args: Any, **kwargs: Any) -> None: # Hijack instance for adding private an attribute tracking whether its # internal state went out-of-sync between identical instances. # Note that a local variable cannot be used because all synched methods # must shared the same tracking state variable. Otherwise, one method # may be flagged out-of-sync but not the others. if not hasattr(self, "__is_synched__"): self.__is_synched__ = self.has_cache # type: ignore[attr-defined] # Check if quantity has cache but is already out-of-sync. # Raise exception if it now has cache while it was not the case before. must_sync = self.has_cache and len(self.cache.owners) > 1 if not self.__is_synched__ and must_sync: raise RuntimeError( "This quantity went out-of-sync. Make sure that no synched " "method is called priori to setting shared cache.") self.__is_synched__ = self.has_cache # type: ignore[attr-defined] # Call instance method on all co-owners of shared cache cls = type(self) for owner in (self.cache.owners if self.has_cache else (self,)): assert isinstance(owner, cls) value = fun(owner, *args, **kwargs) if value is not None: raise NotImplementedError( "Instance methods that does not return `None` are not " "supported.") return fun_safe
[docs] @dataclass(unsafe_hash=True) class DatasetTrajectoryQuantity(InterfaceQuantity[State]): """This class manages a database of trajectories. The database is empty by default. Trajectories must be added or discarded manually. Only one trajectory can be selected at once. Once a trajectory has been selecting, its state at the current simulation can be easily retrieved. This class does not require to only adding trajectories for which all attributes of the underlying state sequence have been specified. Missing attributes of a trajectory will also be missing from the retrieved state. It is the responsible of the user to make sure all cases are properly handled if needed. All instances of this quantity sharing the same cache are synchronized, which means that adding, discarding, or selecting a trajectory on any of them would propagate on all the others. """ def __init__(self, env: InterfaceJiminyEnv, parent: Optional[InterfaceQuantity], ) -> None: """ :param env: Base or wrapped jiminy environment. :param parent: Higher-level quantity from which this quantity is a requirement if any, `None` otherwise. """ # Call base implementation super().__init__(env, parent, requirements={}, auto_refresh=False) # Ordered set of named reference trajectories as a dictionary self.registry: OrderedDict[str, Trajectory] = OrderedDict() # Name of the trajectory that is currently selected self._name = "" # Selected trajectory if any self._trajectory: Optional[Trajectory] = None # Specifies how to deal with query time that are out-of-bounds self._mode: Literal['raise', 'wrap', 'clip'] = 'raise' @property def trajectory(self) -> Trajectory: """Trajectory that is currently selected if any, raises an exception otherwise. """ # Make sure that a trajectory has been selected if self._trajectory is None: raise RuntimeError("No trajectory has been selected.") # Return selected trajectory return self._trajectory @property def robot(self) -> jiminy.Robot: """Robot associated with the selected trajectory. """ return self.trajectory.robot @property def use_theoretical_model(self) -> bool: """Whether the selected trajectory is associated with the theoretical dynamical model or extended simulation model of the robot. """ return self.trajectory.use_theoretical_model @sync def _add(self, name: str, trajectory: Trajectory) -> None: """Add a trajectory to local internal registry only without performing any validity check. .. warning:: This method is used internally by `add` method. It is not meant to be called manually. :param name: Desired name of the trajectory. :param trajectory: Trajectory instance to register. """ self.registry[name] = trajectory
[docs] def add(self, name: str, trajectory: Trajectory) -> None: """Jointly add a trajectory to the local internal registry of all instances sharing the same cache as this quantity. :param name: Desired name of the trajectory. It must be unique. If a trajectory with the exact same name already exists, then it must be discarded first, so as to prevent silently overwriting it by mistake. :param trajectory: Trajectory instance to register. """ # Make sure that no trajectory with the exact same name already exists if name in self.registry: raise KeyError( "A trajectory with the exact same name already exists. Please " "delete it first before adding a new one.") # Allocate new dummy robot to avoid altering the simulation one if trajectory.robot is self.env.robot: trajectory = replace(trajectory, robot=trajectory.robot.copy()) # Add the same post-processed trajectory to all identical instances. # Note that `add` must be splitted in two methods. A first part that # applies some trajectory post-processing only once, and a second part # that adds the post-processed trajectory to all identical quantities # at once. It is absolutely essential to proceed this way, because it # guarantees that the underlying trajectories are all references to the # same memory, including `pinocchio_data`. This means that calling # `update_quantities` will perform the update for all of them at once. # Consequently, kinematics and dynamics quantities of all `State` # instances will be up-to-date as long as `refresh` is called once for # a given evaluation mode. self._add(name, trajectory)
[docs] @sync def discard(self, name: str) -> None: """Jointly remove a trajectory from the local internal registry of all instances sharing the same cache as this quantity. :param name: Name of the trajectory to discard. """ # Un-select trajectory if it corresponds to the discarded one if self._name == name: self._trajectory = None self._name = "" # Delete trajectory for global registry del self.registry[name]
[docs] @sync def select(self, name: str, mode: Literal['raise', 'wrap', 'clip'] = 'raise') -> None: """Jointly select a trajectory in the internal registry of all instances sharing the same cache as this quantity. :param name: Name of the trajectory to discard. :param mode: Specifies how to deal with query time of are out of the time interval of the trajectory. See `Trajectory.get` documentation for details. """ # Make sure that at least one trajectory has been specified if not self.registry: raise ValueError("Cannot select trajectory on a empty dataset.") # Select the desired trajectory for all identical instances self._trajectory = self.registry[name] self._name = name # Backup user-specified mode self._mode = mode # Un-initialize quantity when the selected trajectory changes self.reset(reset_tracking=False)
@property def name(self) -> str: """Name of the trajectory that is currently selected. """ return self._name @InterfaceQuantity.cache.setter # type: ignore[attr-defined] def cache(self, cache: Optional[SharedCache[ValueT]]) -> None: # Get existing registry if any and making sure not already out-of-sync owner: Optional[InterfaceQuantity] = None if cache is not None and cache.owners: owner = next(iter(cache.owners)) assert isinstance(owner, DatasetTrajectoryQuantity) if self._trajectory: raise RuntimeError( "Trajectory dataset not empty. Impossible to add a shared " "cache already having owners.") # Call base implementation InterfaceQuantity.cache.fset(self, cache) # type: ignore[attr-defined] # Catch-up synchronization if owner: # Shallow copy the original registry, so that deletion / addition # does not propagate to other instances. self.registry = owner.registry.copy() if owner._trajectory is not None: self.select(owner._name, owner._mode)
[docs] def refresh(self) -> State: """Compute state of selected trajectory at current simulation time. """ return self.trajectory.get(self.env.stepper_state.t, self._mode)
[docs] @dataclass(unsafe_hash=True) class StateQuantity(InterfaceQuantity[State]): """State to consider when evaluating any quantity deriving from `AbstractQuantity` using the same evaluation mode as this instance. This quantity is refreshed automatically no matter what. This guarantees that all low-level kinematics and dynamics quantities that can be computed from the current state are up-to-date. More specifically, every quantities would be up-to-date if the evaluation mode is `QuantityEvalMode.TRUE`, while it would depends on the information available on the selected trajectory if the evaluation mode is `QuantityEvalMode.REFERENCE`. See `update_quantities` documentation for details. """ mode: QuantityEvalMode """Specify on which state to evaluate this quantity. See `QuantityEvalMode` documentation for details about each mode. .. warning:: Mode `REFERENCE` requires a reference trajectory to be selected manually prior to evaluating this quantity for the first time. """ def __init__(self, env: InterfaceJiminyEnv, parent: Optional[InterfaceQuantity], *, mode: QuantityEvalMode = QuantityEvalMode.TRUE, update_kinematics: bool = True, update_dynamics: bool = False, update_centroidal: bool = False, update_energy: bool = False, update_jacobian: bool = False) -> None: """ :param env: Base or wrapped jiminy environment. :param parent: Higher-level quantity from which this quantity is a requirement if any, `None` otherwise. :param mode: Desired mode of evaluation for this quantity. If mode is set to `QuantityEvalMode.TRUE`, then current simulation state will be used in dynamics computations. If mode is set to `QuantityEvalMode.REFERENCE`, then at the state of some reference trajectory at the current simulation time will be used instead. Optional: 'QuantityEvalMode.TRUE' by default. :param update_kinematics: Whether to update body and frame transforms, spatial velocities and accelerations stored in `self.pinocchio_data` if necessary to be consistent with the current state of the robot. This argument has no effect if mode is set to `QuantityEvalMode.TRUE` because this property is already guarantee. Optional: False by default. :param update_dynamics: Whether to update the non-linear effects and the joint internal forces stored in `self.pinocchio_data` if necessary. Optional: False by default. :param update_centroidal: Whether to update the centroidal dynamics (incl. CoM) stored in `self.pinocchio_data` if necessary. Optional: True by default. :param update_energy: Whether to update the potential and kinematic energy stored in `self.pinocchio_data` if necessary. Optional: False by default. :param update_jacobian: Whether to update the joint Jacobian matrices stored in `self.pinocchio_data` if necessary. Optional: False by default. """ # Make sure that the input arguments are valid update_kinematics = ( update_kinematics or update_dynamics or update_centroidal or update_energy or update_jacobian) # Backup user argument(s) self.mode = mode self.update_kinematics = update_kinematics self.update_dynamics = update_dynamics self.update_centroidal = update_centroidal self.update_energy = update_energy self.update_jacobian = update_jacobian # Enable auto-refresh based on the evaluation mode # Note that it is necessary to auto-refresh this quantity, as it is the # one responsible for making sure that dynamics quantities are always # up-to-date when refreshing quantities. The latter are involved one # way of the other in the computation of any quantity, which means that # pre-computing it does not induce any unnecessary computations as long # as the user fetches the value of at least one quantity. Although this # assumption is very likely to be true at the step update period, it is # not the case at the observer update period. It sounds more efficient # refresh to the state the first time any quantity gets computed. # However, systematically checking if the state must be refreshed for # all quantities adds overheat and may be fairly costly overall. The # optimal trade-off is to rely on auto-refresh if the evaluation mode # is TRUE, since refreshing the state only consists in copying some # data, which is very cheap. On the contrary, it is more efficient to # only refresh the state when needed if the evaluation mode is TRAJ. # * Update state: 500ns (TRUE) | 5.0us (TRAJ) # * Check cache state: 70ns auto_refresh = mode is QuantityEvalMode.TRUE # Call base implementation. super().__init__( env, parent, requirements=dict(trajectory=(DatasetTrajectoryQuantity, {})), auto_refresh=auto_refresh) # Robot for which the quantity must be evaluated self.robot = env.robot self.pinocchio_model = env.robot.pinocchio_model self.pinocchio_data = env.robot.pinocchio_data # State for which the quantity must be evaluated self._state = State(t=np.nan, q=np.array([])) # Persistent buffer for storing body external forces if necessary self._f_external_vec = pin.StdVec_Force() self._f_external_list: List[np.ndarray] = [] self._f_external_batch = np.array([]) self._f_external_slices: Tuple[np.ndarray, ...] = () # Persistent buffer storing all lambda multipliers for efficiency self._constraint_lambda_batch = np.array([]) # Slices in stacked lambda multiplier flat vector self._constraint_lambda_slices: List[np.ndarray] = [] # Lambda multipliers of all the constraints individually self._constraint_lambda_list: List[np.ndarray] = [] # Whether to update kinematic and dynamic data to be consistent with # the current state of the robot, based on the requirement of all the # co-owners of shared cache. self._update_kinematics = False self._update_dynamics = False self._update_centroidal = False self._update_energy = False self._update_jacobian = False
[docs] def initialize(self) -> None: # Determine which data must be update based on shared cache co-owners owners = self.cache.owners if self.has_cache else (self,) self._update_kinematics = False self._update_dynamics = False self._update_centroidal = False self._update_energy = False self._update_jacobian = False for owner in owners: self._update_kinematics |= owner.update_kinematics self._update_dynamics |= owner.update_dynamics self._update_centroidal |= owner.update_centroidal self._update_energy |= owner.update_energy self._update_jacobian |= owner.update_jacobian # Refresh robot and pinocchio proxies for co-owners of shared cache. # Note that automatic refresh is not sufficient to guarantee that # `initialize` will be called unconditionally, because it will be # skipped if a value is already stored in cache. As a result, it is # necessary to synchronize calls to this method between co-owners of # the shared cache manually, so that it will be called by the first # instance to found the cache empty. Only the necessary bits are # synchronized instead of the whole method, to avoid messing up with # computation graph tracking. for owner in owners: assert isinstance(owner, StateQuantity) if owner._is_initialized: continue if owner.mode is QuantityEvalMode.TRUE: owner.robot = owner.env.robot use_theoretical_model = False else: owner.robot = owner.trajectory.robot use_theoretical_model = owner.trajectory.use_theoretical_model if use_theoretical_model: owner.pinocchio_model = owner.robot.pinocchio_model_th owner.pinocchio_data = owner.robot.pinocchio_data_th else: owner.pinocchio_model = owner.robot.pinocchio_model owner.pinocchio_data = owner.robot.pinocchio_data # Call base implementation. # The quantity will be considered initialized and active at this point. super().initialize() # Refresh proxies and allocate memory for storing external forces if self.mode is QuantityEvalMode.TRUE: self._f_external_vec = self.env.robot_state.f_external else: self._f_external_vec = pin.StdVec_Force() self._f_external_vec.extend([ pin.Force() for _ in range(self.pinocchio_model.njoints)]) self._f_external_list = [ f_ext.vector for f_ext in self._f_external_vec] self._f_external_batch = np.zeros((self.pinocchio_model.njoints, 6)) self._f_external_slices = tuple(self._f_external_batch) # Allocate memory for lambda vector self._constraint_lambda_batch = np.zeros( (len(self.robot.log_constraint_fieldnames),)) # Refresh mapping from lambda multipliers to corresponding slice self._constraint_lambda_list.clear() self._constraint_lambda_slices.clear() constraint_lookup_pairs = tuple( (f"Constraint{registry_type}", registry) for registry_type, registry in ( ("BoundJoints", self.robot.constraints.bounds_joints), ("ContactFrames", self.robot.constraints.contact_frames), ("CollisionBodies", { name: constraint for constraints in ( self.robot.constraints.collision_bodies) for name, constraint in constraints.items()}), ("User", self.robot.constraints.user))) i = 0 while i < len(self.robot.log_constraint_fieldnames): fieldname = self.robot.log_constraint_fieldnames[i] for registry_type, registry in constraint_lookup_pairs: if fieldname.startswith(registry_type): break constraint_name = fieldname[len(registry_type):-1] constraint = registry[constraint_name] self._constraint_lambda_list.append(constraint.lambda_c) self._constraint_lambda_slices.append( self._constraint_lambda_batch[i:(i + constraint.size)]) i += constraint.size # Allocate state for which the quantity must be evaluated if needed if self.mode is QuantityEvalMode.TRUE: if not self.env.is_simulation_running: raise RuntimeError("No simulation running. Impossible to " "initialize this quantity.") self._state = State( 0.0, self.env.robot_state.q, self.env.robot_state.v, self.env.robot_state.a, self.env.robot_state.u, self.env.robot_state.command, self._f_external_batch, self._constraint_lambda_batch)
[docs] def refresh(self) -> State: """Compute the current state depending on the mode of evaluation, and make sure that kinematics and dynamics quantities are up-to-date. """ if self.mode is _TRUE: # Update the current simulation time self._state.t = self.env.stepper_state.t # Update external forces and constraint multipliers in state buffer multi_array_copyto(self._f_external_slices, self._f_external_list) multi_array_copyto( self._constraint_lambda_slices, self._constraint_lambda_list) else: self._state = self.trajectory.get() # Copy body external forces from stacked buffer to force vector has_forces = self._state.f_external is not None if has_forces: array_copyto(self._f_external_batch, self._state.f_external) multi_array_copyto(self._f_external_list, self._f_external_slices) # Update all dynamical quantities that can be given available data if self.update_kinematics: update_quantities( self.robot, self._state.q, self._state.v, self._state.a, self._f_external_vec if has_forces else None, update_dynamics=self._update_dynamics, update_centroidal=self._update_centroidal, update_energy=self._update_energy, update_jacobian=self._update_jacobian, update_collisions=False, use_theoretical_model=( self.trajectory.use_theoretical_model)) # Restore lagrangian multipliers of the constraints if available if self._state.lambda_c is not None: array_copyto( self._constraint_lambda_batch, self._state.lambda_c) multi_array_copyto(self._constraint_lambda_list, self._constraint_lambda_slices) # Return state return self._state