"""This module promotes quantities as first-class objects.
Defining quantities this way allows for standardization of common intermediary
metrics for computation of reward components and and termination conditions, eg
the center of pressure or the average spatial velocity of a frame. Overall, it
greatly reduces code duplication and bugs.
Apart from that, it offers a dedicated quantity manager that is responsible for
optimizing the computation path, which is expected to significantly increase
the step collection throughput. This speedup is achieved by caching already
computed that did not changed since then, computing redundant intermediary
quantities only once per step, and gathering similar quantities in a large
batch to leverage vectorization of math instructions.
"""
import re
import weakref
from enum import IntEnum
from weakref import ReferenceType
from abc import abstractmethod, ABCMeta
from collections import OrderedDict
from collections.abc import MutableSet
from dataclasses import dataclass, replace
from functools import wraps
from typing import (
Any, Dict, List, Optional, Tuple, Generic, TypeVar, Type, Iterator,
Collection, Callable, Literal, ClassVar, TYPE_CHECKING)
import numpy as np
import jiminy_py.core as jiminy
from jiminy_py.core import ( # pylint: disable=no-name-in-module
array_copyto, multi_array_copyto)
from jiminy_py.dynamics import (
TrajectoryTimeMode, State, Trajectory, update_quantities)
import pinocchio as pin
from .interfaces import InterfaceJiminyEnv
ValueT = TypeVar('ValueT')
[docs]
class WeakMutableCollection(MutableSet, Generic[ValueT]):
"""Mutable unordered list container storing weak reference to objects.
Elements will be discarded when no strong reference to the value exists
anymore, and a user-specified callback will be triggered if any.
Internally, it is implemented as a set for which uniqueness is
characterized by identity instead of equality operator.
"""
__slots__ = ("_callback", "_weakrefs")
def __init__(self, callback: Optional[Callable[[
"WeakMutableCollection[ValueT]", ReferenceType
], None]] = None) -> None:
"""
:param callback: Callback that will be triggered every time an element
is discarded from the container.
Optional: None by default.
"""
self._callback = callback
self._weakrefs: List[ReferenceType] = []
def __callback__(self, ref: ReferenceType) -> None:
"""Internal method that will be called every time an element must be
discarded from the containers, either because it was requested by the
user or because no strong reference to the value exists anymore.
If a callback has been specified by the user, it will be triggered
after removing the weak reference from the container.
"""
# Even though a temporary weak reference is provided for removal, the
# identity check is performed on the object being stored. If the latter
# has already been deleted, then one of the object in the list that
# has been deleted while be removed. It is not a big deal if it was
# actually the right weak reference since all of them will be removed
# in the end, so it is not a big deal.
value = ref()
for i, ref_i in enumerate(self._weakrefs):
if value is ref_i():
del self._weakrefs[i]
break
if self._callback is not None:
self._callback(self, ref)
def __contains__(self, obj: Any) -> bool:
"""Dunder method to check if a weak reference to a given object is
already stored in the container, which is characterized by identity
instead of equality operator.
:param obj: Object to look for in the container.
"""
return any(ref() is obj for ref in self._weakrefs)
def __iter__(self) -> Iterator[ValueT]:
"""Dunder method that returns an iterator over the objects of the
container for which a reference still exist.
"""
for ref in self._weakrefs:
obj = ref()
if obj is not None:
yield obj
def __len__(self) -> int:
"""Dunder method that returns the length of the container.
"""
return len(self._weakrefs)
[docs]
def add(self, value: ValueT) -> None:
"""Add a new element to the container if not already contained.
This has no effect if the element is already present.
:param obj: Object to add to the container.
"""
if value not in self:
self._weakrefs.append(weakref.ref(value, self.__callback__))
[docs]
def discard(self, value: ValueT) -> None:
"""Remove an element from the container if stored in it.
This method does not raise an exception when the element is missing.
:param obj: Object to remove from the container.
"""
if value in self:
self.__callback__(weakref.ref(value))
[docs]
class QuantityStateMachine(IntEnum):
"""Specify the current state of a given (unique) quantity, which determines
the steps to perform for retrieving its current value.
"""
IS_RESET = 0
"""The quantity at hand has just been reset. The quantity must first be
initialized, then refreshed and finally stored in cached before to retrieve
its value.
"""
IS_INITIALIZED = 1
"""The quantity at hand has been initialized but never evaluated for the
current robot state. Its value must still be refreshed and stored in cache
before to retrieve it.
"""
IS_CACHED = 2
"""The quantity at hand has been evaluated and its value stored in cache.
As such, its value can be retrieve from cache directly.
"""
# Define proxies for fast lookup
_IS_RESET, _IS_INITIALIZED, _IS_CACHED = ( # pylint: disable=invalid-name
QuantityStateMachine)
[docs]
class SharedCache(Generic[ValueT]):
"""Basic thread local shared cache.
Its API mimics `std::optional` from the Standard C++ library. All it does
is encapsulating any Python object as a mutable variable, plus exposing a
simple mechanism for keeping track of all "owners" of the cache.
.. warning::
This implementation is not thread safe.
"""
__slots__ = (
"_value", "_weakrefs", "_owner", "_auto_refresh", "sm_state", "owners")
owners: Collection["InterfaceQuantity[ValueT]"]
"""Owners of the shared buffer, ie quantities relying on it to store the
result of their evaluation. This information may be useful for determining
the most efficient computation path overall.
.. note::
Quantities must add themselves to this list when passing them a shared
cache instance.
.. note::
Internally, it stores weak references to avoid holding alive quantities
that could be garbage collected otherwise. `WeakSet` cannot be used
because owners are objects all having the same hash, eg "identical"
quantities.
"""
def __init__(self) -> None:
"""
"""
# Cached value if any
self._value: Optional[ValueT] = None
# Whether auto-refresh is requested
self._auto_refresh = True
# Basic state machine management
self.sm_state: QuantityStateMachine = QuantityStateMachine.IS_RESET
# Initialize "owners" of the shared buffer.
# Define callback to reset part of the computation graph whenever a
# quantity owning the cache gets garbage collected, namely all
# quantities that may assume at some point the existence of this
# deleted owner to adjust their computation path.
def _callback(
self: WeakMutableCollection["InterfaceQuantity[ValueT]"],
ref: ReferenceType) -> None: # pylint: disable=unused-argument
owner: Optional["InterfaceQuantity[ValueT]"]
for owner in self:
# Stop going up in parent chain if dynamic computation graph
# update is disable for efficiency.
while (owner.allow_update_graph and
owner.parent is not None and owner.parent.has_cache):
owner = owner.parent
owner.reset(reset_tracking=True)
# Initialize weak reference to owning quantities
self._weakrefs = WeakMutableCollection(_callback)
# Maintain alive owning quantities upon reset
self.owners = self._weakrefs
self._owner: Optional["InterfaceQuantity[ValueT]"] = None
[docs]
def add(self, owner: "InterfaceQuantity[ValueT]") -> None:
"""Add a given quantity instance to the set of co-owners associated
with the shared cache at hand.
.. warning::
All shared cache co-owners must be instances of the same unique
quantity. An exception will be thrown if an attempt is made to add
a quantity instance that does not satisfy this condition.
:param owner: Quantity instance to add to the set of co-owners.
"""
# Make sure that the quantity is not already part of the co-owners
if id(owner) in map(id, self.owners):
raise ValueError(
"The specified quantity instance is already an owner of this "
"shared cache.")
# Make sure that the new owner is consistent with the others if any
if any(owner != _owner for _owner in self._weakrefs):
raise ValueError(
"Quantity instance inconsistent with already existing shared "
"cache owners.")
# Add quantity instance to shared cache owners
self._weakrefs.add(owner)
# Refresh owners
if self.sm_state is not QuantityStateMachine.IS_RESET:
self.owners = tuple(self._weakrefs)
[docs]
def discard(self, owner: "InterfaceQuantity[ValueT]") -> None:
"""Remove a given quantity instance from the set of co-owners
associated with the shared cache at hand.
:param owner: Quantity instance to remove from the set of co-owners.
"""
# Make sure that the quantity is part of the co-owners
if id(owner) not in map(id, self.owners):
raise ValueError(
"The specified quantity instance is not an owner of this "
"shared cache.")
# Restore "dynamic" owner list as it may be involved in quantity reset
self.owners = self._weakrefs
# Remove quantity instance from shared cache owners
self._weakrefs.discard(owner)
# Refresh owners.
# Note that one must keep tracking the quantity instance being used in
# computations, aka 'self._owner', even if it is no longer an actual
# shared cache owner. This is necessary because updating it would
# require resetting the state machine, which is not an option as it
# would mess up with quantities storing history since initialization.
if self.sm_state is not QuantityStateMachine.IS_RESET:
self.owners = tuple(self._weakrefs)
[docs]
def reset(self,
ignore_auto_refresh: bool = False,
reset_state_machine: bool = False) -> None:
"""Clear value stored in cache if any.
:param ignore_auto_refresh: Whether to skip automatic refresh of all
co-owner quantities of this shared cache.
Optional: False by default.
:param reset_state_machine: Whether to reset completely the state
machine of the underlying quantity, ie not
considering it initialized anymore.
Optional: False by default.
"""
# Clear cache
if self.sm_state == _IS_CACHED:
self.sm_state = _IS_INITIALIZED
# Special branch if case quantities must be reset on the way
if reset_state_machine:
# Reset the state machine completely
self.sm_state = _IS_RESET
# Update list of owning quantities
self.owners = self._weakrefs
self._owner = None
# Reset auto-refresh buffer
self._auto_refresh = True
# Refresh automatically if not already proven useless and not ignored
if not ignore_auto_refresh and self._auto_refresh:
for owner in self.owners:
if owner.auto_refresh:
owner.get()
break
else:
self._auto_refresh = False
[docs]
def get(self) -> ValueT:
"""Return cached value if any, otherwise evaluate it and store it.
"""
# Get value already cached if any
if self.sm_state == _IS_CACHED:
# return cast(ValueT, self._value)
return self._value # type: ignore[return-value]
# Evaluate quantity
try:
if self.sm_state == _IS_RESET:
# Cache the list of owning quantities
self.owners = tuple(self._weakrefs)
# Stick to the first owning quantity systematically
owner = self.owners[0]
self._owner = owner
# Initialize quantity if not already done manually
if not owner._is_initialized:
try:
owner.initialize()
except Exception:
# Revert initialization of this quantity as it failed
owner.reset(reset_tracking=False,
ignore_requirements=True,
ignore_others=True)
raise
assert owner._is_initialized
# Get first owning quantity systematically
# assert self._owner is not None
owner = self._owner # type: ignore[assignment]
# Make sure that the state has been refreshed.
# This is necessary because it would update the internal state of
# the associated 'pinocchio_data'. Note that auto-refresh is not
# enabled for StateQuantity in TRAJECTORY mode as it is costly to
# evaluate. Instead, it is postponed here, right before updating
# any quantity that may rely on it.
if owner._force_update_state:
owner.state.get()
# Refresh quantity
value = owner.refresh()
except RecursionError as e:
raise LookupError(
"Mutual dependency between quantities is forbidden.") from e
# Cache the value
self.sm_state = _IS_CACHED
self._value = value
return value
[docs]
class InterfaceQuantity(Generic[ValueT], metaclass=ABCMeta):
"""Interface for generic quantities involved observer-controller blocks,
reward components or termination conditions.
.. note::
Quantities are meant to be managed automatically via `QuantityManager`.
Dealing with quantities manually is error-prone, and as such, is
strongly discourage. Nonetheless, power-user that understand the risks
are allowed to do it.
.. warning::
Mutual dependency between quantities is disallowed.
.. warning::
The user is responsible for implementing the dunder methods `__eq__`
and `__hash__` that characterize identical quantities. This property is
used internally by `QuantityManager` to synchronize cache between them.
It is advised to use decorator `@dataclass(unsafe_hash=True)` for
convenience, but it can also be done manually.
"""
requirements: Dict[str, "InterfaceQuantity"]
"""Intermediary quantities on which this quantity may rely on for its
evaluation at some point, depending on the optimal computation path at
runtime. They will be exposed to the user as usual attributes.
"""
allow_update_graph: ClassVar[bool] = True
"""Whether dynamic computation graph update is allowed. This implies that
the quantity can be reset at any point in time to re-compute the optimal
computation path, typically after deletion or addition of some other node
to its dependent sub-graph. When this happens, the quantity gets reset on
the spot, even if a simulation is already running. This is not always
acceptable, hence the capability to disable this feature at class-level.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Optional["InterfaceQuantity"],
requirements: Dict[str, "QuantityCreator"],
*,
auto_refresh: bool = False) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
:param requirements: Intermediary quantities on which this quantity
depends for its evaluation, as a dictionary
whose keys are tuple gathering their respective
class plus any keyword-arguments of its
constructor except 'env' and 'parent'.
:param auto_refresh: Whether this quantity must be refreshed
automatically as soon as its shared cache has been
cleared if specified.
"""
# Backup some of user argument(s)
self.env = env
self.parent = parent
self.auto_refresh = auto_refresh
# Make sure that all requirement names would be valid as property
requirement_names = requirements.keys()
if any(re.match('[^A-Za-z0-9_]', name) for name in requirement_names):
raise ValueError("The name of all quantity requirements should be "
"ASCII alphanumeric characters plus underscore.")
# Instantiate intermediary quantities if any
self.requirements: Dict[str, InterfaceQuantity] = {
name: cls(env, self, **kwargs)
for name, (cls, kwargs) in requirements.items()}
# Define proxies for user-specified intermediary quantities.
# This approach is much faster than hiding quantities behind value
# getters. In particular, dynamically adding properties, which is hacky
# but the fastest alternative option, still adds 35% overhead on Python
# 3.11 compared to calling `get` directly. The "official" approaches
# are even slower, ie implementing custom `__getattribute__` method or
# worst custom `__getattr__` method.
for name, quantity in self.requirements.items():
setattr(self, name, quantity)
# Update the state explicitly if available but auto-refresh not enabled
self._force_update_state = False
if isinstance(self, AbstractQuantity):
self._force_update_state = not self.state.auto_refresh
# Shared cache handling
self._cache: Optional[SharedCache[ValueT]] = None
self.has_cache = False
# Track whether the quantity has been called since previous reset
self._is_active = False
# Whether the quantity must be re-initialized
self._is_initialized: bool = False
if TYPE_CHECKING:
def __getattr__(self, name: str) -> Any:
"""Get access to intermediary quantities as first-class properties,
without having to do it through `requirements`.
.. warning::
Accessing quantities this way is convenient, but unfortunately
much slower than do it through dynamically added properties. As
a result, this approach is only used to fix typing issues.
:param name: Name of the requested quantity.
"""
try:
return self.__getattribute__('requirements')[name].get()
except KeyError as e:
raise AttributeError(
f"'{type(self)}' object has no attribute '{name}'") from e
@property
def cache(self) -> SharedCache[ValueT]:
"""Get shared cache if available, otherwise raises an exception.
.. warning::
This method is not meant to be overloaded.
"""
if not self.has_cache:
raise RuntimeError(
"No shared cache has been set for this quantity. Make sure it "
"is managed by some `QuantityManager` instance.")
return self._cache # type: ignore[return-value]
@cache.setter
def cache(self, cache: Optional[SharedCache[ValueT]]) -> None:
"""Set optional cache variable. When specified, it is used to store
evaluated quantity and retrieve its value later one.
.. warning::
Value is stored by reference for efficiency. It is up to the user
to the copy to retain its current value for later use if necessary.
.. note::
One may overload this method to encapsulate the cache in a custom
wrapper with specialized 'get' and 'set' methods before passing it
to the base implementation. For instance, to enforce copy of the
cached value before returning it.
"""
# Withdraw this quantity from the owners of its current cache if any
if self._cache is not None:
try:
self._cache.discard(self)
except ValueError:
# This may fail if the quantity is already being garbage
# collected when clearing cache.
pass
# Declare this quantity as owner of the cache if specified
if cache is not None:
cache.add(self)
# Update internal cache attribute
self._cache = cache
self.has_cache = cache is not None
[docs]
def is_active(self, any_cache_owner: bool = False) -> bool:
"""Whether this quantity is considered active, namely `initialize` has
been called at least once since previous tracking reset.
:param any_owner: False to check only if this exact instance is active,
True if any of the identical quantities (sharing the
same cache) is considered sufficient.
Optional: False by default.
"""
if not any_cache_owner or self._cache is None:
return self._is_active
return any(owner._is_active for owner in self._cache.owners)
[docs]
def get(self) -> ValueT:
"""Get cached value of requested quantity if available, otherwise
evaluate it and store it in cache.
This quantity is considered active as soon as this method has been
called at least once since previous tracking reset. The method
`is_active` will be return true even before calling `initialize`.
.. warning::
This method is not meant to be overloaded.
"""
# Delegate getting value to shared cache if available
if self._cache is not None:
# Get value
value = self._cache.get()
# This instance is not forceably considered active at this point.
# Note that it must be done AFTER getting the value, otherwise it
# would mess up with computation graph tracking at initialization.
self._is_active = True
# Return cached value
return value
# Evaluate quantity
try:
# Initialize quantity
if not self._is_initialized:
self.initialize()
assert self._is_initialized
# Refresh quantity
return self.refresh()
except RecursionError as e:
raise LookupError(
"Mutual dependency between quantities is disallowed.") from e
[docs]
def reset(self,
reset_tracking: bool = False,
*,
ignore_requirements: bool = False,
ignore_others: bool = False) -> None:
"""Consider that the quantity must be re-initialized before being
evaluated once again.
If shared cache is available, then it will be cleared first then all
identical quantities will be jointly reset.
.. note::
This method must be called right before performing any agent step,
otherwise this quantity will not be refreshed if it was evaluated
previously.
.. warning::
This method is not meant to be overloaded.
:param reset_tracking: Do not consider this quantity as active anymore
until the `get` method gets called once again.
Optional: False by default.
:param ignore_requirements:
Whether to skip reset reset of intermediary quantities.
Optional: False by default.
:param ignore_others:
Whether to ignore any shared cache co-owner quantity instances.
Optional: False by default.
"""
# Make sure that auto-refresh can be honored
if self.auto_refresh and not self.has_cache:
raise RuntimeError(
"Automatic refresh enabled but no shared cache is available. "
"Please add one before calling this method.")
# Reset all requirements first.
# This is necessary to avoid auto-refreshing quantities with deprecated
# cache if enabled.
if not ignore_requirements:
for quantity in self.requirements.values():
quantity.reset(reset_tracking,
ignore_requirements=False,
ignore_others=ignore_others)
# No longer consider this exact instance as initialized
self._is_initialized = False
# Skip reset if dynamic computation graph update is not allowed
if self.env.is_simulation_running and not self.allow_update_graph:
return
# No longer consider this exact instance as active if requested
if reset_tracking:
self._is_active = False
# More work must to be done if this quantity has a shared cache that
# has not been completely reset yet.
if self.has_cache and self.cache.sm_state is not _IS_RESET:
# Reset shared cache state machine first, to avoid triggering reset
# propagation to all identical quantities.
self.cache.reset(ignore_auto_refresh=True,
reset_state_machine=True)
# Reset all identical quantities except itself since already done
if not ignore_others:
for owner in self.cache.owners:
if owner is not self:
owner.reset(reset_tracking=reset_tracking,
ignore_requirements=True,
ignore_others=True)
# Reset shared cache afterward with auto-refresh enabled if needed
if self.env.is_simulation_running:
self.cache.reset(ignore_auto_refresh=False,
reset_state_machine=False)
[docs]
def initialize(self) -> None:
"""Initialize internal buffers.
This is typically useful to refresh shared memory proxies or to
re-initialize pre-allocated buffers.
.. warning::
Intermediary quantities 'requirements' are NOT initialized
automatically because they can be initialized lazily in most cases,
or are optional depending on the most efficient computation path at
run-time. It is up to the developer implementing quantities to take
care of it.
.. note::
This method must be called before starting a new episode.
.. note::
Lazy-initialization is used for efficiency, ie `initialize` will be
called before the first time `refresh` has to be called, which may
never be the case if cache is shared between multiple identical
instances of the same quantity.
"""
# The quantity is now considered initialized and active unconditionally
self._is_initialized = True
self._is_active = True
[docs]
@abstractmethod
def refresh(self) -> ValueT:
"""Evaluate this quantity based on the agent state at the end of the
current agent step.
"""
QuantityValueT_co = TypeVar('QuantityValueT_co', covariant=True)
QuantityCreator = Tuple[
Type[InterfaceQuantity[QuantityValueT_co]], Dict[str, Any]]
[docs]
class QuantityEvalMode(IntEnum):
"""Specify on which state to evaluate a given quantity.
"""
TRUE = 0
"""Current state of the environment.
"""
REFERENCE = 1
"""State of the reference trajectory at the current simulation time.
"""
# Define proxies for fast lookup
_TRUE, _REFERENCE = QuantityEvalMode
[docs]
@dataclass(unsafe_hash=True)
class AbstractQuantity(InterfaceQuantity, Generic[ValueT]):
"""Base class for generic quantities involved observer-controller blocks,
reward components or termination conditions.
.. note::
A dataset of trajectories made available through `self.trajectories`.
The latter is synchronized because all quantities as long as shared
cached is available. At least one trajectory must be added to the
dataset and selected prior to using `QuantityEvalMode.REFERENCE`
evaluation mode since the dataset is initially empty by default.
.. seealso::
See `InterfaceQuantity` documentation for details.
"""
mode: QuantityEvalMode
"""Specify on which state to evaluate this quantity. See `QuantityEvalMode`
documentation for details about each mode.
.. warning::
Mode `REFERENCE` requires a reference trajectory to be selected
manually prior to evaluating this quantity for the first time.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Optional[InterfaceQuantity],
requirements: Dict[str, "QuantityCreator"],
*,
mode: QuantityEvalMode = QuantityEvalMode.TRUE,
auto_refresh: bool = False) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
:param requirements: Intermediary quantities on which this quantity
depends for its evaluation, as a dictionary
whose keys are tuple gathering their respective
class plus any keyword-arguments of its
constructor except 'env' and 'parent'.
:param mode: Desired mode of evaluation for this quantity. If mode is
set to `QuantityEvalMode.TRUE`, then current simulation
state will be used in dynamics computations. If mode is
set to `QuantityEvalMode.REFERENCE`, then the state at the
current simulation time of the selected reference
trajectory will be used instead.
:param auto_refresh: Whether this quantity must be refreshed
automatically as soon as its shared cache has been
cleared if specified.
"""
# Backup user argument(s)
self.mode = mode
# Make sure that no user-specified requirement is named 'trajectory'
requirement_names = requirements.keys()
if "trajectory" in requirement_names:
raise ValueError(
"Key 'trajectory' is reserved and cannot be used for "
"user-specified requirements.")
# Make sure that state requirement is valid if any or use default
quantity = requirements.get("state")
if quantity is not None:
cls, kwargs = quantity
if (not issubclass(cls, StateQuantity) or
kwargs.setdefault("mode", mode) != mode):
raise ValueError(
"Key 'state' is reserved and can only be used to specify "
"a `StateQuantity` requirement, as a way to give the "
"opportunity to overwrite 'update_*' default arguments.")
else:
requirements["state"] = (StateQuantity, dict(mode=mode))
# Call base implementation
super().__init__(env, parent, requirements, auto_refresh=auto_refresh)
# Add trajectory quantity proxy
trajectory = self.state.trajectory
assert isinstance(trajectory, DatasetTrajectoryQuantity)
self.trajectory = trajectory
# Robot for which the quantity must be evaluated
self.robot = jiminy.Robot()
self.pinocchio_model = pin.Model()
self.pinocchio_data = self.pinocchio_model.createData()
[docs]
def initialize(self) -> None:
# Call base implementation
super().initialize()
# Try forcing initialization of state quantity if not already done
if not self.state._is_initialized:
try:
self.state.initialize()
except RuntimeError:
# Revert state initialization
self.state.reset(reset_tracking=False,
ignore_requirements=True,
ignore_others=False)
# It may have failed because no simulation running, which may
# be problematic but not blocking at this point. Just checking
# that the pinocchio model has been properly initialized.
if self.state.pinocchio_model.nq == 0:
raise
# Refresh robot proxy
assert isinstance(self.state, StateQuantity)
self.robot = self.state.robot
self.pinocchio_model = self.state.pinocchio_model
self.pinocchio_data = self.state.pinocchio_data
[docs]
def sync(fun: Callable[..., None]) -> Callable[..., None]:
"""Wrap any `InterfaceQuantity` instance method to forward call to all
co-owners of the same shared cache.
This wrapper is useful to keep all identical instances of the same quantity
in sync.
"""
@wraps(fun)
def fun_safe(self: InterfaceQuantity, *args: Any, **kwargs: Any) -> None:
# Hijack instance for adding private an attribute tracking whether its
# internal state went out-of-sync between identical instances.
# Note that a local variable cannot be used because all synched methods
# must shared the same tracking state variable. Otherwise, one method
# may be flagged out-of-sync but not the others.
if not hasattr(self, "__is_synched__"):
self.__is_synched__ = self.has_cache # type: ignore[attr-defined]
# Check if quantity has cache but is already out-of-sync.
# Raise exception if it now has cache while it was not the case before.
must_sync = self.has_cache and len(self.cache.owners) > 1
if not self.__is_synched__ and must_sync:
raise RuntimeError(
"This quantity went out-of-sync. Make sure that no synched "
"method is called prior to setting shared cache.")
self.__is_synched__ = self.has_cache # type: ignore[attr-defined]
# Call instance method on all co-owners of shared cache
cls = type(self)
for owner in (self.cache.owners if self.has_cache else (self,)):
assert isinstance(owner, cls)
value = fun( # type: ignore[func-returns-value]
owner, *args, **kwargs)
if value is not None:
raise NotImplementedError(
"Only instance methods that returns `None` are supported.")
return fun_safe
[docs]
@dataclass(unsafe_hash=True)
class DatasetTrajectoryQuantity(InterfaceQuantity[State]):
"""This class manages a dataset of trajectories.
The dataset is empty by default. Trajectories must be added or discarded
manually. Only one trajectory can be selected at once. Once a trajectory
has been selecting, its state at the current simulation can be easily
retrieved.
This class supports trajectories for which only part of the attributes of
the underlying state sequence have been specified. Obviously, missing
attributes of a trajectory will also be missing from the retrieved state.
It is the responsibility of the practitioner to make sure that all the
information that is necessary for its own application is available.
All instances of this quantity sharing the same cache are synchronized,
which means that adding, discarding, or selecting a trajectory on any of
them would propagate on all the others.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Optional[InterfaceQuantity],
) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
"""
# Call base implementation
super().__init__(env, parent, requirements={}, auto_refresh=False)
# Ordered set of named tuples (trajectory, mode) as a dictionary
self._registry: OrderedDict[
str, Tuple[Trajectory, TrajectoryTimeMode]] = OrderedDict()
# Whether the dataset is locked, ie no traj can be added/discarded
self._lock = False
# Name of the trajectory that is currently selected
self._name = ""
# Selected trajectory and accompagnying mode if any
self._trajectory_mode_pair: Optional[
Tuple[Trajectory, TrajectoryTimeMode]] = None
# Time offset between selected trajectory and simulation
self._time_offset = 0.0
self._time_offset_ratio = 0.0
def __contains__(self, name: str) -> bool:
"""Whether a given trajectory name is part of the dataset.
"""
return name in self._registry
def __iter__(self) -> Iterator[str]:
"""Iterate over the names of all the trajectories in the dataset.
"""
return iter(self._registry.keys())
def __getitem__(self, name: str) -> Trajectory:
"""Get the desired trajectory in the dataset if available, raises an
exception otherwise.
"""
trajectory, _ = self._registry[name]
return trajectory
def __len__(self) -> int:
"""Number of trajectory in the dataset.
"""
return len(self._registry)
def __bool__(self) -> bool:
"""Whether the dataset of trajectory is currently empty.
"""
return bool(self._registry)
@property
def is_locked(self) -> bool:
"""Whether this dataset is locked, which means that no additional
trajectories can be added nor discarded.
"""
return self._lock
@property
def name(self) -> str:
"""Name of the trajectory that is currently selected.
"""
return self._name
@property
def trajectory(self) -> Trajectory:
"""Trajectory that is currently selected if any, raises an exception
otherwise.
"""
# Make sure that a trajectory has been selected
if self._trajectory_mode_pair is None:
raise RuntimeError("No trajectory has been selected.")
# Return selected trajectory
trajectory, _ = self._trajectory_mode_pair
return trajectory
@property
def mode(self) -> TrajectoryTimeMode:
"""Time wrapping mode of the trajectory that is currently selected.
"""
# Make sure that a trajectory has been selected
if self._trajectory_mode_pair is None:
raise RuntimeError("No trajectory has been selected.")
# Return selected trajectory
_, mode = self._trajectory_mode_pair
return mode
@property
def robot(self) -> jiminy.Robot:
"""Robot associated with the selected trajectory.
"""
return self.trajectory.robot
@property
def use_theoretical_model(self) -> bool:
"""Whether the selected trajectory is associated with the theoretical
dynamical model or extended simulation model of the robot.
"""
return self.trajectory.use_theoretical_model
@property
def time_offset_ratio(self) -> float:
"""Relative time offset between the current simulation time and the
selected trajectory.
"""
return self._time_offset_ratio
@time_offset_ratio.setter
@sync
def time_offset_ratio(self, value: float) -> None:
"""Set relative time offset between the current simulation time and the
trajectory being selected at query time.
:param value:
Enforces a given relative time offset between the current
simulation time and the trajectory. Note that 0.0 and 1.0
correspond to the initial and final time of the trajectory,
respectively. `None` to disable time shift.
Optional: `None` by default.
"""
# Update internal buffer
self._time_offset_ratio = value
# Refresh selected trajectory to update time offset and reset quantity
self.select(self.name)
@InterfaceQuantity.cache.setter # type: ignore[attr-defined]
def cache(self, cache: Optional[SharedCache[ValueT]]) -> None:
# Get existing registry if any and making sure not already out-of-sync
owner: Optional[InterfaceQuantity] = None
if cache is not None and cache.owners:
owner = next(iter(cache.owners))
assert isinstance(owner, DatasetTrajectoryQuantity)
if self._trajectory_mode_pair:
raise RuntimeError(
"Trajectory dataset not empty. Impossible to add a shared "
"cache already having owners.")
# Call base implementation
InterfaceQuantity.cache.fset(self, cache) # type: ignore[attr-defined]
# Catch-up synchronization
if owner:
# Shallow copy the original registry, so that deletion / addition
# does not propagate to other instances.
self._registry = owner._registry.copy()
if owner._trajectory_mode_pair is not None:
self._time_offset_ratio = owner._time_offset_ratio
self.select(owner._name)
@sync
def _add(self,
name: str,
trajectory: Trajectory,
mode: Literal['raise', 'wrap', 'clip']) -> None:
"""Add a trajectory to local internal registry only without performing
any validity check.
.. warning::
This method is used internally by `add` method. It is not meant to
be called manually.
:param name: Desired name of the trajectory.
:param trajectory: Trajectory instance to register.
:param mode: Specifies how to deal with query time of are out of the
time interval of the trajectory. See `Trajectory.get`
documentation for details.
"""
self._registry[name] = (trajectory, mode)
[docs]
def add(self,
name: str,
trajectory: Trajectory,
mode: Literal['raise', 'wrap', 'clip'] = 'raise') -> None:
"""Jointly add a trajectory to the local internal registry of all
instances sharing the same cache as this quantity.
:param name: Desired name of the trajectory. It must be unique. If a
trajectory with the exact same name already exists, then
it must be discarded first, so as to prevent silently
overwriting it by mistake.
:param trajectory: Trajectory instance to register.
:param mode: Specifies how to deal with query time of are out of the
time interval of the trajectory. See `Trajectory.get`
documentation for details.
Optional: 'raise' by default.
"""
# Make sure that the dataset is not locked
if self.is_locked:
raise RuntimeError(
"Trajectory dataset already locked. Impossible to add any "
"trajectory.")
# Make sure that no trajectory with the exact same name already exists
if name in self._registry:
raise KeyError(
"A trajectory with the exact same name already exists. Please "
"delete it first before adding a new one.")
# Allocate new dummy robot to avoid altering the simulation one
if trajectory.robot is self.env.robot:
trajectory = replace(trajectory, robot=trajectory.robot.copy())
# Add the same post-processed trajectory to all identical instances.
# Note that `add` must be splitted in two methods. A first part that
# applies some trajectory post-processing only once, and a second part
# that adds the post-processed trajectory to all identical quantities
# at once. It is absolutely essential to proceed this way, because it
# guarantees that the underlying trajectories are all references to the
# same memory, including `pinocchio_data`. This means that calling
# `update_quantities` will perform the update for all of them at once.
# Consequently, kinematics and dynamics quantities of all `State`
# instances will be up-to-date as long as `refresh` is called once for
# a given evaluation mode.
self._add(name, trajectory, mode)
[docs]
@sync
def discard(self, name: str) -> None:
"""Jointly remove a trajectory from the local internal registry of all
instances sharing the same cache as this quantity.
:param name: Name of the trajectory to discard.
"""
# Make sure that the dataset is not locked
if self.is_locked:
raise RuntimeError(
"Trajectory dataset already locked. Impossible to discard any "
"trajectory.")
# Un-select trajectory if it corresponds to the discarded one
if self._name == name:
self._trajectory_mode_pair = None
self._name = ""
# Delete trajectory for global registry
del self._registry[name]
[docs]
@sync
def clear(self) -> None:
"""Clear the trajectory dataset from the local internal registry of all
instances sharing the same cache as this quantity.
"""
# Make sure that the dataset is not locked
if self.is_locked:
raise RuntimeError(
"Trajectory dataset already locked. Impossible to clear the "
"dataset.")
# Un-select trajectory
self._trajectory_mode_pair = None
self._name = ""
# Delete the whole registry
self._registry.clear()
@sync
def _select(self, name: str) -> None:
"""Select an existing trajectory from the dataset shared amongst all
managed quantities without resetting the quantity itself at this point.
.. warning::
This method is used internally by `select` method. It is not meant
o be called manually.
:param name: Name of the trajectory to select.
"""
# Make sure that at least one trajectory has been specified
if not self._registry:
raise ValueError("Cannot select trajectory on a empty dataset.")
# Select the desired trajectory for all identical instances
trajectory, _ = self._trajectory_mode_pair = self._registry[name]
self._name = name
# Update the absolute time ratio
time_start, time_end = trajectory.time_interval
time_delta = time_end - time_start
self._time_offset = time_start + self._time_offset_ratio * time_delta
[docs]
def select(self, name: str) -> None:
"""Select an existing trajectory from the dataset shared amongst all
managed quantities, then reset the quantity itself.
.. note::
There is no way to select a different reference trajectory for
individual quantities at the time being.
:param name: Name of the trajectory to select.
"""
# Update the selected trajectory for all the instances at once
self._select(name)
# Un-initialize all instances of this quantity
self.reset(reset_tracking=False)
[docs]
@sync
def lock(self) -> None:
"""Forbid adding/discarding trajectories to the dataset from now on.
"""
self._lock = True
[docs]
def refresh(self) -> State:
"""Compute state of selected trajectory at current simulation time.
"""
# Make sure that a trajectory has been selected
if self._trajectory_mode_pair is None:
raise RuntimeError("No trajectory has been selected.")
# Get the query time
time_query = self.env.stepper_state.t + self._time_offset
# Fetch the state at query time from the selected trajectory
trajectory, mode = self._trajectory_mode_pair
return trajectory.get(time_query, mode)
[docs]
@dataclass(unsafe_hash=True)
class StateQuantity(InterfaceQuantity[State]):
"""State to consider when evaluating any quantity deriving from
`AbstractQuantity` using the same evaluation mode as this instance.
This quantity is refreshed automatically no matter what. This guarantees
that all low-level kinematics and dynamics quantities that can be computed
from the current state are up-to-date. More specifically, every quantities
would be up-to-date if the evaluation mode is `QuantityEvalMode.TRUE`,
while it would depends on the information available on the selected
trajectory if the evaluation mode is `QuantityEvalMode.REFERENCE`. See
`update_quantities` documentation for details.
"""
mode: QuantityEvalMode
"""Specify on which state to evaluate this quantity. See `QuantityEvalMode`
documentation for details about each mode.
.. warning::
Mode `REFERENCE` requires a reference trajectory to be selected
manually prior to evaluating this quantity for the first time.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Optional[InterfaceQuantity],
*,
mode: QuantityEvalMode = QuantityEvalMode.TRUE,
update_kinematics: bool = True,
update_dynamics: bool = False,
update_centroidal: bool = False,
update_energy: bool = False,
update_jacobian: bool = False) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
:param mode: Desired mode of evaluation for this quantity. If mode is
set to `QuantityEvalMode.TRUE`, then current simulation
state will be used in dynamics computations. If mode is
set to `QuantityEvalMode.REFERENCE`, then at the state of
some reference trajectory at the current simulation time
will be used instead.
Optional: 'QuantityEvalMode.TRUE' by default.
:param update_kinematics: Whether to update body and frame transforms,
spatial velocities and accelerations stored
in `self.pinocchio_data` if necessary to be
consistent with the current state of the
robot. This argument has no effect if mode is
set to `QuantityEvalMode.TRUE` because this
property is already guarantee.
Optional: False by default.
:param update_dynamics: Whether to update the non-linear effects and
the joint internal forces stored in
`self.pinocchio_data` if necessary.
Optional: False by default.
:param update_centroidal: Whether to update the centroidal dynamics
(incl. CoM) stored in `self.pinocchio_data`
if necessary.
Optional: True by default.
:param update_energy: Whether to update the potential and kinematic
energy stored in `self.pinocchio_data` if
necessary.
Optional: False by default.
:param update_jacobian: Whether to update the joint Jacobian matrices
stored in `self.pinocchio_data` if necessary.
Optional: False by default.
"""
# Make sure that the input arguments are valid
update_kinematics = (
update_kinematics or update_dynamics or update_centroidal or
update_energy or update_jacobian)
# Backup user argument(s)
self.mode = mode
self.update_kinematics = update_kinematics
self.update_dynamics = update_dynamics
self.update_centroidal = update_centroidal
self.update_energy = update_energy
self.update_jacobian = update_jacobian
# Enable auto-refresh based on the evaluation mode
# Note that it is necessary to auto-refresh this quantity, as it is the
# one responsible for making sure that dynamics quantities are always
# up-to-date when refreshing quantities. The latter are involved one
# way of the other in the computation of any quantity, which means that
# pre-computing it does not induce any unnecessary computations as long
# as the user fetches the value of at least one quantity.
# Although this assumption is very likely to be true at the step update
# period, it is not the case at the observer update period. It sounds
# more efficient refresh to the state the first time any quantity gets
# computed. However, systematically checking if the state must be
# refreshed for all quantities adds overhead and may be fairly costly
# overall. The optimal trade-off is to rely on auto-refresh if the
# evaluation mode is TRUE, since refreshing the state only consists in
# copying some data, which is very cheap. On the contrary, it is more
# efficient to only refresh the state when needed if the evaluation
# mode is TRAJ.
# * Update state: 500ns (TRUE) | 5.0us (TRAJ)
# * Check cache state: 70ns
auto_refresh = mode == QuantityEvalMode.TRUE
# Call base implementation.
super().__init__(
env,
parent,
requirements=dict(trajectory=(DatasetTrajectoryQuantity, {})),
auto_refresh=auto_refresh)
# Robot for which the quantity must be evaluated
self.robot = jiminy.Robot()
self.pinocchio_model = self.robot.pinocchio_model
self.pinocchio_data = self.robot.pinocchio_data
# State for which the quantity must be evaluated
self._state = State(t=np.nan, q=np.array([]))
# Persistent buffer for storing body external forces if necessary
self._f_external_vec = pin.StdVec_Force()
self._f_external_list: List[np.ndarray] = []
self._f_external_batch = np.array([])
self._f_external_slices: Tuple[np.ndarray, ...] = ()
# Persistent buffer storing all lambda multipliers for efficiency
self._constraint_lambda_batch = np.array([])
# Slices in stacked lambda multiplier flat vector
self._constraint_lambda_slices: List[np.ndarray] = []
# Lambda multipliers of all the constraints individually
self._constraint_lambda_list: List[np.ndarray] = []
# Whether to update kinematic and dynamic data to be consistent with
# the current state of the robot, based on the requirement of all the
# co-owners of shared cache.
self._update_kinematics = False
self._update_dynamics = False
self._update_centroidal = False
self._update_energy = False
self._update_jacobian = False
[docs]
def initialize(self) -> None:
# Determine which data must be update based on shared cache co-owners
owners = self.cache.owners if self.has_cache else (self,)
self._update_kinematics = False
self._update_dynamics = False
self._update_centroidal = False
self._update_energy = False
self._update_jacobian = False
for owner in owners:
self._update_kinematics |= owner.update_kinematics
self._update_dynamics |= owner.update_dynamics
self._update_centroidal |= owner.update_centroidal
self._update_energy |= owner.update_energy
self._update_jacobian |= owner.update_jacobian
# Backup previous robot, which will be used to check if it has changed
robot_prev = self.robot
# Update pinocchio model and data proxies
if self.mode is QuantityEvalMode.TRUE:
self.robot = self.env.robot
use_theoretical_model = False
else:
self.robot = self.trajectory.robot
use_theoretical_model = self.trajectory.use_theoretical_model
if use_theoretical_model:
self.pinocchio_model = self.robot.pinocchio_model_th
self.pinocchio_data = self.robot.pinocchio_data_th
else:
self.pinocchio_model = self.robot.pinocchio_model
self.pinocchio_data = self.robot.pinocchio_data
# Refresh robot and pinocchio proxies for co-owners of shared cache.
# Note that automatic refresh is not sufficient to guarantee that
# `initialize` will be called unconditionally, because it will be
# skipped if a value is already stored in cache. As a result, it is
# necessary to synchronize calls to this method between co-owners of
# the shared cache manually, so that it will be called by the first
# instance to found the cache empty. Only the necessary bits are
# synchronized instead of the whole method, to avoid messing up with
# computation graph tracking.
for owner in owners:
assert isinstance(owner, StateQuantity)
owner.robot = self.robot
owner.pinocchio_model = self.pinocchio_model
owner.pinocchio_data = self.pinocchio_data
# Early-return if the main cache owner is already initialized
owner = self.cache._owner if self.has_cache else None
if owner is not None and owner._is_initialized:
return
# Call base implementation.
# The quantity will be considered initialized and active at this point.
super().initialize()
# Raise exception is not simulation is running
if self.mode is QuantityEvalMode.TRUE:
if not self.env.is_simulation_running:
raise RuntimeError("No simulation running. Impossible to "
"initialize this quantity.")
# Early return if a complete refresh is not necessary.
# Note that memory is systematically re-allocated for `robot_state` at
# every reset, which must that proxies must always be reset in TRUE
# evaluation mode, even if the robot did not changed. On the contrary,
# it is safe to assume that the options of the robot are never
# updated in REFERENCE evaluation mode. As a result, it is sufficient
# to check that its address did not changed since last reset.
if (self.mode is QuantityEvalMode.REFERENCE and
self.robot is robot_prev and self._f_external_list):
return
# Refresh proxies and allocate memory for storing external forces
if self.mode is QuantityEvalMode.TRUE:
self._f_external_vec = self.env.robot_state.f_external
else:
self._f_external_vec = pin.StdVec_Force()
self._f_external_vec.extend([
pin.Force() for _ in range(self.pinocchio_model.njoints)])
self._f_external_list = [
f_ext.vector for f_ext in self._f_external_vec]
self._f_external_batch = np.zeros((self.pinocchio_model.njoints, 6))
self._f_external_slices = tuple(self._f_external_batch)
# Allocate memory for lambda vector
constraint_fieldnames = self.robot.log_constraint_fieldnames
self._constraint_lambda_batch = np.zeros((len(constraint_fieldnames),))
# Refresh mapping from lambda multipliers to corresponding slice
self._constraint_lambda_list.clear()
self._constraint_lambda_slices.clear()
constraint_lookup_pairs = tuple(
(f"Constraint{registry_type}", registry)
for registry_type, registry in (
("BoundJoints", self.robot.constraints.bounds_joints),
("ContactFrames", self.robot.constraints.contact_frames),
("CollisionBodies", {
name: constraint for constraints in (
self.robot.constraints.collision_bodies)
for name, constraint in constraints.items()}),
("User", self.robot.constraints.user)))
i = 0
while i < len(constraint_fieldnames):
fieldname = constraint_fieldnames[i]
for registry_type, registry in constraint_lookup_pairs:
if fieldname.startswith(registry_type):
break
constraint_name = fieldname[len(registry_type):-1]
constraint = registry[constraint_name]
self._constraint_lambda_list.append(constraint.lambda_c)
self._constraint_lambda_slices.append(
self._constraint_lambda_batch[i:(i + constraint.size)])
i += constraint.size
# Allocate state for which the quantity must be evaluated if needed
if self.mode is QuantityEvalMode.TRUE:
self._state = State(
0.0,
self.env.robot_state.q,
self.env.robot_state.v,
self.env.robot_state.a,
self.env.robot_state.u,
self.env.robot_state.command,
self._f_external_batch,
self._constraint_lambda_batch)
[docs]
def refresh(self) -> State:
"""Compute the current state depending on the mode of evaluation, and
make sure that kinematics and dynamics quantities are up-to-date.
"""
if self.mode == _TRUE:
# Update the current simulation time
self._state.t = self.env.stepper_state.t
# Update external forces and constraint multipliers in state buffer
multi_array_copyto(self._f_external_slices, self._f_external_list)
multi_array_copyto(
self._constraint_lambda_slices, self._constraint_lambda_list)
else:
self._state = self.trajectory.get()
# Copy body external forces from stacked buffer to force vector
has_forces = self._state.f_external is not None
if has_forces:
array_copyto(self._f_external_batch, self._state.f_external)
multi_array_copyto(self._f_external_list,
self._f_external_slices)
# Update all dynamical quantities that can be given available data
if self.update_kinematics:
update_quantities(
self.robot,
self._state.q,
self._state.v,
self._state.a,
self._f_external_vec if has_forces else None,
update_dynamics=self._update_dynamics,
update_centroidal=self._update_centroidal,
update_energy=self._update_energy,
update_jacobian=self._update_jacobian,
update_collisions=False,
use_theoretical_model=(
self.trajectory.use_theoretical_model))
# Restore lagrangian multipliers of the constraints if available
if self._state.lambda_c is not None:
array_copyto(
self._constraint_lambda_batch, self._state.lambda_c)
multi_array_copyto(self._constraint_lambda_list,
self._constraint_lambda_slices)
# Return state
return self._state