# pylint: disable=redefined-builtin
"""Generic quantities that may be relevant for any kind of robot, regardless
its topology (multiple or single branch, fixed or floating base...) and the
application (locomotion, grasping...).
"""
import warnings
from enum import IntEnum
from functools import partial
from dataclasses import dataclass
from typing import (
List, Dict, Optional, Protocol, Sequence, Tuple, Union, Callable,
runtime_checkable)
import numpy as np
import numba as nb
import jiminy_py.core as jiminy
from jiminy_py.core import ( # pylint: disable=no-name-in-module
array_copyto, multi_array_copyto)
import pinocchio as pin
import hppfcl as fcl
from ..bases import (
InterfaceJiminyEnv, InterfaceQuantity, AbstractQuantity, StateQuantity,
QuantityEvalMode)
from ..utils import (
matrix_to_rpy, matrix_to_quat, quat_apply, remove_yaw_from_quat,
quat_interpolate_middle)
from .transform import (
StackedQuantity, MaskedQuantity, UnaryOpQuantity, BinaryOpQuantity)
[docs]
@runtime_checkable
class FrameQuantity(Protocol):
"""Protocol that must be satisfied by all quantities associated with one
particular frame.
This protocol is used when aggregating individual frame-level quantities
in a larger batch for computation vectorization on all frames at once.
Intermediate quantities managing these batches will make sure that all
their parents derive from one of the supported protocols, which includes
this one.
"""
frame_name: str
[docs]
@runtime_checkable
class MultiFrameQuantity(Protocol):
"""Protocol that must be satisfied by all quantities associated with
a particular set of frames for which the same batched intermediary
quantities must be computed.
This protocol is involved in automatic computation vectorization. See
`FrameQuantity` documentation for details.
"""
frame_names: Tuple[str, ...]
[docs]
def aggregate_frame_names(quantity: InterfaceQuantity) -> Tuple[
Tuple[str, ...],
Dict[Union[str, Tuple[str, ...]], Union[int, Tuple[()], slice]]]:
"""Generate a sequence of frame names that contains all the sub-sequences
specified by the parents of all the cache owners of a given quantity.
Ideally, the generated sequence should be the shortest possible. Since
finding the optimal sequence is a complex problem, a heuristic is used
instead. It consists in aggregating first all multi-frame quantities
sequentially after ordering them by decreasing length, followed by all
single-frame quantities.
.. note::
Only active quantities are considered for best performance, which may
change dynamically. Delegating this responsibility to cache owners may
be possible but difficult to implement because `frame_names` must be
cleared first before re-registering themselves, just in case of optimal
computation graph has changed, not only once to avoid getting rid of
quantities that just registered themselves. Nevertheless, whenever
re-initializing this quantity to take into account changes of the
active set must be decided by cache owners.
:param quantity: Quantity whose parent implements either `FrameQuantity` or
`MultiFrameQuantity` protocol. All the parents of all its
cache owners must also implement one of these protocol.
"""
# Make sure that parent quantity implement multi- or single-frame protocol
assert isinstance(quantity.parent, (FrameQuantity, MultiFrameQuantity))
quantities = (quantity.cache.owners if quantity.has_cache else (quantity,))
# First, order all multi-frame quantities by decreasing length
frame_names_chunks: List[Tuple[str, ...]] = []
for owner in quantities:
parent = owner.parent
assert parent is not None
if parent.is_active(any_cache_owner=False):
if isinstance(parent, MultiFrameQuantity):
frame_names_chunks.append(parent.frame_names)
# Next, process ordered multi-frame quantities sequentially.
# For each of them, we first check if its set of frames is completely
# included in the current full set. If so, then there is nothing do not and
# we can move to the next quantity. If not, then we check if a part of its
# tail or head is contained at the beginning or end of the full set
# respectively. If so, only the missing part is prepended or appended
# respectively. If not, then the while set of frames is appended to the
# current full set before moving to the next quantity.
frame_names: List[str] = []
frame_names_chunks = sorted(frame_names_chunks, key=len)[::-1]
for frame_names_ in map(list, frame_names_chunks):
nframes, nframes_ = map(len, (frame_names, frame_names_))
for i in range(nframes - nframes_ + 1):
# Check if the sub-chain is completely included in the
# current full set.
if frame_names_ == frame_names[i:(i + nframes_)]:
break
else:
for i in range(1, nframes_ + 1):
# Check if part of the frame names matches with the
# tail of the current full set. If so, append the
# disjoint head only.
if (frame_names[(nframes - nframes_ + i):] ==
frame_names_[:(nframes_ - i)]):
frame_names += frame_names_[(nframes_ - i):]
break
# Check if part of the frame names matches with the
# head of the current full set. If so, prepend the
# disjoint tail only.
if frame_names[:(nframes_ - i)] == frame_names_[i:]:
frame_names = frame_names_[:i] + frame_names
break
# Finally, loop over all single-frame quantities.
# If a frame name is missing, then it is appended to the current full set.
# Otherwise, we just move to the next quantity.
frame_name_chunks: List[str] = []
for owner in quantities:
parent = owner.parent
assert parent is not None
if parent.is_active(any_cache_owner=False):
if isinstance(parent, FrameQuantity):
frame_name_chunks.append(parent.frame_name)
frame_name = frame_name_chunks[-1]
if frame_name not in frame_names:
frame_names.append(frame_name)
frame_names = tuple(frame_names)
# Compute mapping from frame names to their corresponding indices in the
# generated sequence of frame names.
# The indices are stored as a slice for non-empty multi-frame quantities,
# as an empty tuple for empty multi-frame quantities, or as an integer for
# single-frame quantities.
frame_slices: Dict[
Union[str, Tuple[str, ...]], Union[int, Tuple[()], slice]] = {}
nframes = len(frame_names)
for frame_names_ in frame_names_chunks:
if frame_names_ in frame_slices:
continue
if not frame_names_:
frame_slices[frame_names_] = ()
continue
nframes_ = len(frame_names_)
for i in range(nframes - nframes_ + 1):
if frame_names_ == frame_names[i:(i + nframes_)]:
break
frame_slices[frame_names_] = slice(i, i + nframes_)
for frame_name in frame_name_chunks:
if frame_name in frame_slices:
continue
frame_slices[frame_name] = frame_names.index(frame_name)
return frame_names, frame_slices
@dataclass(unsafe_hash=True)
class _BatchedFramesRotationMatrix(
AbstractQuantity[Dict[Union[str, Tuple[str, ...]], np.ndarray]]):
"""3D rotation matrix of the orientation of all frames involved in
quantities relying on it and are active since last reset of computation
tracking if shared cache is available, its parent otherwise.
This quantity only provides a performance benefit when managed by some
`QuantityManager`. It is not supposed to be instantiated manually but use
as requirement of some other quantity for computation vectorization on all
frames at once.
The data associated with each frame is exposed to the user as a batched 3D
contiguous array. The two first dimensions are rotation matrix elements,
while the last one are individual frames with the same ordering as
'self.frame_names'.
.. note::
This quantity does not allow for specifying frames directly. There is
no way to get the orientation of multiple frames at once for now.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: InterfaceQuantity,
mode: QuantityEvalMode) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement.
:param mode: Desired mode of evaluation for this quantity.
"""
# Make sure that a parent has been specified
assert isinstance(parent, (FrameQuantity, MultiFrameQuantity))
# Call base implementation
super().__init__(
env, parent, requirements={}, mode=mode, auto_refresh=False)
# Initialize the ordered list of frame names
self.frame_names: Tuple[str, ...] = ()
# Store all rotation matrices at once
self._rot_mat_batch: np.ndarray = np.array([])
# Define proxy for views of the batch storing all rotation matrices
self._rot_mat_views: List[np.ndarray] = []
# Define proxy for the rotation matrices of all frames
self._rot_mat_list: List[np.ndarray] = []
# Mapping from frame names to views of batched rotation matrices
self._rot_mat_map: Dict[Union[str, Tuple[str, ...]], np.ndarray] = {}
def initialize(self) -> None:
# Deactivate all cache owners first since only one is tracking frames
for quantity in (self.cache.owners if self.has_cache else (self,)):
quantity._is_active = False
# Call base implementation
super().initialize()
# Update the frame names based on the cache owners of this quantity
self.frame_names, frame_slices = aggregate_frame_names(self)
# Re-allocate memory as the number of frames is not known in advance.
# Note that Fortran memory layout (column-major) is used for speed up
# because it preserves contiguity when copying frame data.
# Anyway, C memory layout (row-major) does not make sense in this case
# since chunks of columns are systematically extracted, which means
# that the returned array would NEVER be contiguous.
nframes = len(self.frame_names)
self._rot_mat_batch = np.zeros((3, 3, nframes), order='F')
# Refresh proxies
self._rot_mat_views.clear()
self._rot_mat_list.clear()
for i, frame_name in enumerate(self.frame_names):
frame_index = self.pinocchio_model.getFrameId(frame_name)
rot_matrix = self.pinocchio_data.oMf[frame_index].rotation
self._rot_mat_views.append(self._rot_mat_batch[..., i])
self._rot_mat_list.append(rot_matrix)
# Re-assign mapping from frame names to their corresponding data
self._rot_mat_map = {
key: self._rot_mat_batch[:, :, frame_slice]
for key, frame_slice in frame_slices.items()}
def refresh(self) -> Dict[Union[str, Tuple[str, ...]], np.ndarray]:
# Copy all rotation matrices in contiguous buffer
multi_array_copyto(self._rot_mat_views, self._rot_mat_list)
# Return proxy directly without copy
return self._rot_mat_map
[docs]
class OrientationType(IntEnum):
"""Specify the desired vector representation of the frame orientations.
"""
MATRIX = 0
"""3D rotation matrix.
"""
EULER = 1
"""Euler angles (Roll, Pitch, Yaw).
"""
QUATERNION = 2
"""Quaternion coordinates (QuatX, QuatY, QuatZ, QuatW).
"""
ANGLE_AXIS = 3
"""Angle-Axis representation (theta * AxisX, theta * AxisY, theta * AxisZ).
"""
# Define proxies for fast lookup
_MATRIX, _EULER, _QUATERNION, _ANGLE_AXIS = ( # pylint: disable=invalid-name
OrientationType)
@dataclass(unsafe_hash=True)
class _BatchedFramesOrientation(
InterfaceQuantity[Dict[Union[str, Tuple[str, ...]], np.ndarray]]):
"""Vector representation of the orientation in world reference frame of all
frames involved in quantities relying on it and are active since last reset
of computation tracking if shared cache is available, its parent otherwise.
The vector representation of the orientation of all the frames are stacked
in a single contiguous N-dimensional array whose last dimension corresponds
to the individual frames.
The orientation of all frames is exposed to the user as a dictionary whose
keys are the individual frame names. Internally, data are stored in batched
2D contiguous array for efficiency. The first dimension gathers the 3 Euler
angles (roll, pitch, yaw), while the second one are individual frames with
the same ordering as 'self.frame_names'.
This quantity is used internally by `FrameOrientation`. It is not supposed
to be instantiated manually. See `_BatchedFramesRotationMatrix`
documentation for details.
In the particular case of Euler angle representation, the expected maximum
speedup wrt computing Euler angles individually is about x15, which is
achieved asymptotically for more than 100 frames. Still, it is already x5
faster for 5 frames, x7 for 10 frames, and x9 for 20 frames.
"""
type: OrientationType
"""Selected vector representation of the orientation for all frames.
"""
mode: QuantityEvalMode
"""Specify on which state to evaluate this quantity. See `QuantityEvalMode`
documentation for details about each mode.
.. warning::
Mode `REFERENCE` requires a reference trajectory to be selected
manually prior to evaluating this quantity for the first time.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Union["FrameOrientation", "MultiFrameOrientation"],
type: OrientationType,
mode: QuantityEvalMode) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: `FrameOrientation` or `MultiFrameOrientation` instance
from which this quantity is a requirement.
:param type: Desired vector representation of the orientation for all
frames. Note that `OrientationType.ANGLE_AXIS` is not
supported for now.
:param mode: Desired mode of evaluation for this quantity.
"""
# Make sure that a suitable parent has been provided
assert isinstance(parent, (FrameOrientation, MultiFrameOrientation))
# Make sure that the specified orientation representation is supported
if type not in (OrientationType.MATRIX,
OrientationType.EULER,
OrientationType.QUATERNION):
raise ValueError(
"This quantity only supports orientation representations "
"'MATRIX', 'EULER', and 'QUATERNION'.")
# Backup some user argument(s)
self.type = type
self.mode = mode
# Initialize the ordered list of frame names.
# Note that this must be done BEFORE calling base `__init__`, otherwise
# `isinstance(..., (FrameQuantity, MultiFrameQuantity))` will fail.
self.frame_names: Tuple[str, ...] = ()
# Call base implementation
super().__init__(
env,
parent,
requirements=dict(
rot_mat_map=(_BatchedFramesRotationMatrix, dict(mode=mode))),
auto_refresh=False)
# Mapping from frame names managed by this specific instance to their
# corresponding indices in the generated sequence of frame names.
self._frame_slices: Tuple[Tuple[
Union[str, Tuple[str, ...]], Union[int, Tuple[()], slice]], ...
] = ()
# Store the representation of the orientation of all frames at once
self._data_batch: np.ndarray = np.array([])
# Mapping from chunks of frame names to vector representation views
self._data_map: Dict[Union[str, Tuple[str, ...]], np.ndarray] = {}
def initialize(self) -> None:
# Deactivate all cache owners first since only one is tracking frames
for quantity in (self.cache.owners if self.has_cache else (self,)):
quantity._is_active = False
# Call base implementation
super().initialize()
# Update the frame names based on the cache owners of this quantity
self.frame_names, frame_slices_map = aggregate_frame_names(self)
# Re-assign mapping of chunk of frame names being managed
self._frame_slices = tuple(frame_slices_map.items())
# Re-allocate memory as the number of frames is not known in advance
nframes = len(self.frame_names)
if self.type in (OrientationType.EULER, OrientationType.ANGLE_AXIS):
self._data_batch = np.zeros((3, nframes), order='F')
elif self.type == OrientationType.QUATERNION:
self._data_batch = np.zeros((4, nframes), order='F')
# Re-assign mapping from chunks of frame names to corresponding data
if self.type is not OrientationType.MATRIX:
self._data_map = {
key: self._data_batch[..., frame_slice]
for key, frame_slice in frame_slices_map.items()}
def refresh(self) -> Dict[Union[str, Tuple[str, ...]], np.ndarray]:
# Get the complete batch of rotation matrices managed by this instance
value = self.rot_mat_map.get()
rot_mat_batch = value[self.frame_names]
# Convert all rotation matrices at once to the desired representation
if self.type == _EULER:
matrix_to_rpy(rot_mat_batch, self._data_batch)
elif self.type == _QUATERNION:
matrix_to_quat(rot_mat_batch, self._data_batch)
else:
# Slice data.
# Note that it cannot be pre-computed once and for all because
# the batched data reference may changed dynamically.
self._data_map = {
key: rot_mat_batch[..., frame_slice]
for key, frame_slice in self._frame_slices}
# Return proxy directly without copy
return self._data_map
[docs]
@dataclass(unsafe_hash=True)
class FrameOrientation(InterfaceQuantity[np.ndarray]):
"""Vector representation of the orientation of a given frame in world
reference frame at the end of the agent step.
"""
frame_name: str
"""Name of the frame on which to operate.
"""
type: OrientationType
"""Desired vector representation of the orientation.
"""
mode: QuantityEvalMode
"""Specify on which state to evaluate this quantity. See `QuantityEvalMode`
documentation for details about each mode.
.. warning::
Mode `REFERENCE` requires a reference trajectory to be selected
manually prior to evaluating this quantity for the first time.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Optional[InterfaceQuantity],
frame_name: str,
*,
type: OrientationType = OrientationType.MATRIX,
mode: QuantityEvalMode = QuantityEvalMode.TRUE) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
:param frame_name: Name of the frame on which to operate.
:param type: Desired vector representation of the orientation.
Optional: 'OrientationType.MATRIX' by default.
:param mode: Desired mode of evaluation for this quantity.
Optional: 'QuantityEvalMode.TRUE' by default.
"""
# Backup some user argument(s)
self.frame_name = frame_name
self.type = type
self.mode = mode
# Call base implementation
super().__init__(
env,
parent,
requirements=dict(
data=(_BatchedFramesOrientation, dict(
type=type,
mode=mode))),
auto_refresh=False)
[docs]
def initialize(self) -> None:
# Check if the quantity is already active
was_active = self._is_active
# Call base implementation
super().initialize()
# Force re-initializing shared data if the active set has changed
if not was_active:
# Must reset the tracking for shared computation systematically,
# just in case the optimal computation path has changed to the
# point that relying on batched quantity is no longer relevant.
self.data.reset(reset_tracking=True)
[docs]
def refresh(self) -> np.ndarray:
value = self.data.get()
return value[self.frame_name]
[docs]
@dataclass(unsafe_hash=True)
class MultiFrameOrientation(InterfaceQuantity[np.ndarray]):
"""Vector representation of the orientation of a given set of frames in
world reference frame at the end of the agent step.
The vector representation of the orientation of all the frames are stacked
in a single contiguous N-dimensional array whose last dimension corresponds
to the individual frames. See `_BatchedFramesOrientation` documentation for
details.
"""
frame_names: Tuple[str, ...]
"""Name of the frames on which to operate.
"""
type: OrientationType
"""Selected vector representation of the orientation for all frames.
"""
mode: QuantityEvalMode
"""Specify on which state to evaluate this quantity. See `QuantityEvalMode`
documentation for details about each mode.
.. warning::
Mode `REFERENCE` requires a reference trajectory to be selected
manually prior to evaluating this quantity for the first time.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Optional[InterfaceQuantity],
frame_names: Sequence[str],
*,
type: OrientationType = OrientationType.MATRIX,
mode: QuantityEvalMode = QuantityEvalMode.TRUE) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
:param frame_names: Name of the frames on which to operate.
:param type: Desired vector representation of the orientation for all
frames.
Optional: 'OrientationType.MATRIX' by default.
:param mode: Desired mode of evaluation for this quantity.
Optional: 'QuantityEvalMode.TRUE' by default.
"""
# Make sure that the user did not pass a single frame name
assert not isinstance(frame_names, str)
# Backup some user argument(s)
self.frame_names = tuple(frame_names)
self.type = type
self.mode = mode
# Call base implementation
super().__init__(
env,
parent,
requirements=dict(
data=(_BatchedFramesOrientation, dict(
type=type,
mode=mode))),
auto_refresh=False)
[docs]
def initialize(self) -> None:
# Check if the quantity is already active
was_active = self._is_active
# Call base implementation.
# The quantity is now considered active at this point.
super().initialize()
# Force re-initializing shared data if the active set has changed
if not was_active:
self.data.reset(reset_tracking=True)
[docs]
def refresh(self) -> np.ndarray:
# Return a slice of batched data.
# Note that mapping from frame names to frame index in batched data
# cannot be pre-computed as it may changed dynamically.
value = self.data.get()
return value[self.frame_names]
@dataclass(unsafe_hash=True)
class _BatchedFramesPosition(
AbstractQuantity[Dict[Union[str, Tuple[str, ...]], np.ndarray]]):
"""Position vector (X, Y, Z) of all frames involved in quantities relying
on it and are active since last reset of computation tracking if shared
cache is available, its parent otherwise.
It is not supposed to be instantiated manually but use internally by
`FramePosition`. See `_BatchedFramesRotationMatrix` documentation.
The positions of all frames are exposed to the user as a dictionary whose
keys are the individual frame names and/or set of frame names as a tuple.
Internally, data are stored in batched 2D contiguous array for efficiency.
The first dimension gathers the 3 components (X, Y, Z), while the second
one are individual frames with the same ordering as 'self.frame_names'.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Union["FramePosition", "MultiFramePosition"],
mode: QuantityEvalMode) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: `FramePosition` or `MultiFramePosition` instance from
which this quantity is a requirement.
:param mode: Desired mode of evaluation for this quantity.
"""
# Make sure that a suitable parent has been provided
assert isinstance(parent, (FramePosition, MultiFramePosition))
# Initialize the ordered list of frame names
self.frame_names: Tuple[str, ...] = ()
# Call base implementation
super().__init__(
env,
parent,
requirements={},
mode=mode,
auto_refresh=False)
# Define proxy for the position vectors of all frames
self._pos_refs: List[np.ndarray] = []
# Store the position of all frames at once
self._pos_batch: np.ndarray = np.array([])
# Define proxy for views of the batch storing all translation vectors
self._pos_views: List[np.ndarray] = []
# Mapping from chunks of frame names to individual position views
self._pos_map: Dict[Union[str, Tuple[str, ...]], np.ndarray] = {}
def initialize(self) -> None:
# Deactivate all cache owners first since only one is tracking frames
for quantity in (self.cache.owners if self.has_cache else (self,)):
quantity._is_active = False
# Call base implementation
super().initialize()
# Update the frame names based on the cache owners of this quantity
self.frame_names, frame_slices = aggregate_frame_names(self)
# Re-allocate memory as the number of frames is not known in advance
nframes = len(self.frame_names)
self._pos_batch = np.zeros((3, nframes), order='F')
# Refresh proxies
self._pos_views.clear()
self._pos_refs.clear()
for i, frame_name in enumerate(self.frame_names):
frame_index = self.pinocchio_model.getFrameId(frame_name)
translation = self.pinocchio_data.oMf[frame_index].translation
self._pos_views.append(self._pos_batch[:, i])
self._pos_refs.append(translation)
# Re-assign mapping from frame names to their corresponding data
self._pos_map = {
key: self._pos_batch[:, frame_slice]
for key, frame_slice in frame_slices.items()}
def refresh(self) -> Dict[Union[str, Tuple[str, ...]], np.ndarray]:
# Copy all translations in contiguous buffer
multi_array_copyto(self._pos_views, self._pos_refs)
# Return proxy directly without copy
return self._pos_map
[docs]
@dataclass(unsafe_hash=True)
class FramePosition(InterfaceQuantity[np.ndarray]):
"""Position vector (X, Y, Z) of a given frame in world reference frame at
the end of the agent step.
"""
frame_name: str
"""Name of the frame on which to operate.
"""
mode: QuantityEvalMode
"""Specify on which state to evaluate this quantity. See `QuantityEvalMode`
documentation for details about each mode.
.. warning::
Mode `REFERENCE` requires a reference trajectory to be selected
manually prior to evaluating this quantity for the first time.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Optional[InterfaceQuantity],
frame_name: str,
*,
mode: QuantityEvalMode = QuantityEvalMode.TRUE) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
:param frame_name: Name of the frame on which to operate.
:param mode: Desired mode of evaluation for this quantity.
Optional: 'QuantityEvalMode.TRUE' by default.
"""
# Backup some user argument(s)
self.frame_name = frame_name
self.mode = mode
# Call base implementation
super().__init__(
env,
parent,
requirements=dict(
data=(_BatchedFramesPosition, dict(mode=mode))),
auto_refresh=False)
[docs]
def initialize(self) -> None:
# Check if the quantity is already active
was_active = self._is_active
# Call base implementation
super().initialize()
# Force re-initializing shared data if the active set has changed
if not was_active:
self.data.reset(reset_tracking=True)
[docs]
def refresh(self) -> np.ndarray:
value = self.data.get()
return value[self.frame_name]
[docs]
@dataclass(unsafe_hash=True)
class MultiFramePosition(InterfaceQuantity[np.ndarray]):
"""Position vector (X, Y, Z) of a given set of frames in world reference
frame at the end of the agent step.
"""
frame_names: Tuple[str, ...]
"""Name of the frames on which to operate.
"""
mode: QuantityEvalMode
"""Specify on which state to evaluate this quantity. See `QuantityEvalMode`
documentation for details about each mode.
.. warning::
Mode `REFERENCE` requires a reference trajectory to be selected
manually prior to evaluating this quantity for the first time.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Optional[InterfaceQuantity],
frame_names: Sequence[str],
*,
mode: QuantityEvalMode = QuantityEvalMode.TRUE) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
:param frame_name: Name of the frames on which to operate.
:param mode: Desired mode of evaluation for this quantity.
Optional: 'QuantityEvalMode.TRUE' by default.
"""
# Make sure that the user did not pass a single frame name
assert not isinstance(frame_names, str)
# Backup some user argument(s)
self.frame_names = tuple(frame_names)
self.mode = mode
# Call base implementation
super().__init__(
env,
parent,
requirements=dict(
data=(_BatchedFramesPosition, dict(mode=mode))),
auto_refresh=False)
[docs]
def initialize(self) -> None:
# Check if the quantity is already active
was_active = self._is_active
# Call base implementation
super().initialize()
# Force re-initializing shared data if the active set has changed
if not was_active:
self.data.reset(reset_tracking=True)
[docs]
def refresh(self) -> np.ndarray:
value = self.data.get()
return value[self.frame_names]
[docs]
@dataclass(unsafe_hash=True)
class FrameXYZQuat(InterfaceQuantity[np.ndarray]):
"""Spatial vector representation (X, Y, Z, QuatX, QuatY, QuatZ, QuatW) of
the transform of a given frame in world reference frame at the end of the
agent step.
"""
frame_name: str
"""Name of the frame on which to operate.
"""
mode: QuantityEvalMode
"""Specify on which state to evaluate this quantity. See `QuantityEvalMode`
documentation for details about each mode.
.. warning::
Mode `REFERENCE` requires a reference trajectory to be selected
manually prior to evaluating this quantity for the first time.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Optional[InterfaceQuantity],
frame_name: str,
*,
mode: QuantityEvalMode = QuantityEvalMode.TRUE) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
:param frame_name: Name of the frame on which to operate.
:param mode: Desired mode of evaluation for this quantity.
Optional: 'QuantityEvalMode.TRUE' by default.
"""
# Backup some user argument(s)
self.frame_name = frame_name
self.mode = mode
# Call base implementation
super().__init__(
env,
parent,
requirements=dict(
position=(FramePosition, dict(
frame_name=frame_name,
mode=mode)),
quat=(FrameOrientation, dict(
frame_name=frame_name,
type=OrientationType.QUATERNION,
mode=mode))),
auto_refresh=False)
# Pre-allocate memory for storing the pose XYZQuat of all frames
self._xyzquat = np.zeros((7,))
[docs]
def refresh(self) -> np.ndarray:
# Copy the position of all frames at once in contiguous buffer
array_copyto(self._xyzquat[:3], self.position.get())
# Copy the quaternion of all frames at once in contiguous buffer
array_copyto(self._xyzquat[-4:], self.quat.get())
return self._xyzquat
[docs]
@dataclass(unsafe_hash=True)
class MultiFrameXYZQuat(InterfaceQuantity[np.ndarray]):
"""Spatial vector representation (X, Y, Z, QuatX, QuatY, QuatZ, QuatW) of
the transform of a given set of frames in world reference frame at the end
of the agent step.
"""
frame_names: Tuple[str, ...]
"""Name of the frames on which to operate.
"""
mode: QuantityEvalMode
"""Specify on which state to evaluate this quantity. See `QuantityEvalMode`
documentation for details about each mode.
.. warning::
Mode `REFERENCE` requires a reference trajectory to be selected
manually prior to evaluating this quantity for the first time.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Optional[InterfaceQuantity],
frame_names: Sequence[str],
*,
mode: QuantityEvalMode = QuantityEvalMode.TRUE) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
:param frame_name: Name of the frames on which to operate.
:param mode: Desired mode of evaluation for this quantity.
Optional: 'QuantityEvalMode.TRUE' by default.
"""
# Make sure that the user did not pass a single frame name
assert not isinstance(frame_names, str)
# Backup some user argument(s)
self.frame_names = tuple(frame_names)
self.mode = mode
# Call base implementation
super().__init__(
env,
parent,
requirements=dict(
positions=(MultiFramePosition, dict(
frame_names=frame_names,
mode=mode)),
quats=(MultiFrameOrientation, dict(
frame_names=frame_names,
type=OrientationType.QUATERNION,
mode=mode))),
auto_refresh=False)
# Pre-allocate memory for storing the pose XYZQuat of all frames
self._xyzquats = np.zeros((7, len(frame_names)), order='C')
[docs]
def refresh(self) -> np.ndarray:
# Copy the position of all frames at once in contiguous buffer
array_copyto(self._xyzquats[:3], self.positions.get())
# Copy the quaternion of all frames at once in contiguous buffer
array_copyto(self._xyzquats[-4:], self.quats.get())
return self._xyzquats
[docs]
@dataclass(unsafe_hash=True)
class MultiFrameMeanXYZQuat(InterfaceQuantity[np.ndarray]):
"""Spatial vector representation (X, Y, Z, QuatX, QuatY, QuatZ, QuatW) of
the average transform of a given set of frames in world reference frame at
the end of the agent step.
Broadly speaking, the average is defined as the value minimizing the mean
error wrt every individual elements, considering some distance metric. In
this case, the average position (X, Y, Z) and orientation as a quaternion
vector (QuatX, QuatY, QuatZ, QuatW) are computed separately (double
geodesic). It has the advantage to be much easier to compute, and to
decouple the translation from the rotation, which is desirable when
defining reward components weighting differently position or orientation
errors. See `quaternion_average` for details about the distance metric
being used to compute the average orientation.
"""
frame_names: Tuple[str, ...]
"""Name of the frames on which to operate.
"""
mode: QuantityEvalMode
"""Specify on which state to evaluate this quantity. See `QuantityEvalMode`
documentation for details about each mode.
.. warning::
Mode `REFERENCE` requires a reference trajectory to be selected
manually prior to evaluating this quantity for the first time.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Optional[InterfaceQuantity],
frame_names: Sequence[str],
*,
mode: QuantityEvalMode = QuantityEvalMode.TRUE) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
:param frame_name: Name of the frames on which to operate.
:param mode: Desired mode of evaluation for this quantity.
Optional: 'QuantityEvalMode.TRUE' by default.
"""
# Make sure that the user did not pass a single frame name
assert not isinstance(frame_names, str)
# Backup some user argument(s)
self.frame_names = tuple(frame_names)
self.mode = mode
# Call base implementation
super().__init__(
env,
parent,
requirements=dict(
positions=(MultiFramePosition, dict(
frame_names=frame_names,
mode=mode)),
quats=(MultiFrameOrientation, dict(
frame_names=frame_names,
type=OrientationType.QUATERNION,
mode=mode))),
auto_refresh=False)
# Jit-able method specialization of `np.mean` for `axis=-1`
@nb.jit(nopython=True, cache=True, fastmath=True)
def position_average(value: np.ndarray, out: np.ndarray) -> None:
"""Compute the mean of an array over its last axis only.
:param value: N-dimensional array from which the last axis will be
reduced.
:param out: Pre-allocated array in which to store the result.
"""
out[:] = np.sum(value, -1) / value.shape[-1]
self._position_average = position_average
# Jit-able specialization of `quat_average` for 2D matrices
@nb.jit(nopython=True, cache=True, fastmath=True)
def quat_average_2d(quat: np.ndarray, out: np.ndarray) -> None:
"""Compute the average of a batch of quaternions [qx, qy, qz, qw].
.. note::
Jit-able specialization of `quat_average` for 2D matrices, with
further optimization for the special case where there is only 2
quaternions.
:param quat: N-dimensional (N >= 2) array whose first dimension
gathers the 4 quaternion coordinates [qx, qy, qz, qw].
:param out: Pre-allocated array in which to store the result.
"""
num_quats = quat.shape[1]
if num_quats == 1:
out[:] = quat
elif num_quats == 2:
quat_interpolate_middle(quat[:, 0], quat[:, 1], out)
else:
_, eigvec = np.linalg.eigh(quat @ quat.T)
out[:] = eigvec[..., -1]
self._quat_average = quat_average_2d
# Pre-allocate memory for the mean for mean pose vector XYZQuat
self._xyzquat_mean = np.zeros((7,))
# Define position and orientation proxies for fast access
self._position_mean_view = self._xyzquat_mean[:3]
self._quat_mean_view = self._xyzquat_mean[3:]
[docs]
def refresh(self) -> np.ndarray:
# Compute the mean translation
self._position_average(self.positions.get(), self._position_mean_view)
# Compute the mean quaternion
self._quat_average(self.quats.get(), self._quat_mean_view)
return self._xyzquat_mean
[docs]
@dataclass(unsafe_hash=True)
class MultiFrameCollisionDetection(InterfaceQuantity[bool]):
"""Check if some geometry objects are colliding with each other.
It takes into account some safety margins by which their volume will be
inflated / deflated.
.. note::
Jiminy enforces all collision geometries to be either primitive shapes
or convex polyhedra for efficiency. In practice, tf meshes where
specified in the original URDF file, then they will be converted into
their respective convex hull.
"""
frame_names: Tuple[str, ...]
"""Name of the bodies of the robot to consider for collision detection.
All the geometry objects sharing with them the same parent joint will be
taking into account.
"""
security_margin: float
"""Signed distance below which a pair of geometry objects is stated in
collision.
This can be interpreted as inflating or deflating the geometry objects by
the safety margin depending on whether it is positive or negative
respectively. Therefore, the actual geometry objects do no have to be in
contact to be stated in collision if the satefy margin is positive. On the
contrary, the penetration depth must be large enough if the security margin
is positive.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Optional[InterfaceQuantity],
frame_names: Sequence[str],
*,
security_margin: float = 0.0) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
:param frame_names: Name of the bodies of the robot to consider for
collision detection. All the geometry objects
sharing with them the same parent joint will be
taking into account.
:param security_margin: Signed distance below which a pair of geometry
objects is stated in collision.
Optional: 0.0 by default.
"""
# Backup some user-arguments
self.frame_names = tuple(frame_names)
self.security_margin = security_margin
# Call base implementation
super().__init__(
env,
parent,
requirements={},
auto_refresh=False)
# Initialize a broadphase manager for each collision group
self._collision_groups = [
fcl.DynamicAABBTreeCollisionManager() for _ in frame_names]
# Initialize pair-wise collision requests between groups of bodies
self._requests: List[Tuple[
fcl.BroadPhaseCollisionManager,
fcl.BroadPhaseCollisionManager,
fcl.CollisionCallBackBase]] = []
for i in range(len(frame_names)):
for j in range(i + 1, len(frame_names)):
manager_1 = self._collision_groups[i]
manager_2 = self._collision_groups[j]
callback = fcl.CollisionCallBackDefault()
request: fcl.CollisionRequest = (
callback.data.request) # pylint: disable=no-member
request.gjk_initial_guess = jiminy.GJKInitialGuess.CachedGuess
# request.gjk_variant = fcl.GJKVariant.NesterovAcceleration
# request.break_distance = 0.1
request.gjk_tolerance = 1e-6
request.distance_upper_bound = 1e-6
request.num_max_contacts = 1
request.security_margin = security_margin
self._requests.append((manager_1, manager_2, callback))
# Store callable responsible to updating transform of colision objects
self._transform_updates: List[Callable[[], None]] = []
[docs]
def initialize(self) -> None:
# Call base implementation
super().initialize()
# Define robot proxy for convenience
robot = self.env.robot
# Clear all collision managers
for manager in self._collision_groups:
manager.clear()
# Get the list of parent joint indices mapping
frame_indices_map: Dict[int, int] = {}
for i, frame_name in enumerate(self.frame_names):
frame_index = robot.pinocchio_model.getFrameId(frame_name)
frame = robot.pinocchio_model.frames[frame_index]
frame_indices_map[frame.parent] = i
# Add collision objects to their corresponding manager
self._transform_updates.clear()
for i, geom in enumerate(robot.collision_model.geometryObjects):
j = frame_indices_map.get(geom.parentJoint)
if j is not None:
obj = fcl.CollisionObject(geom.geometry)
self._collision_groups[j].registerObject(obj)
pose = robot.collision_data.oMg[i]
translation, rotation = pose.translation, pose.rotation
self._transform_updates += (
partial(obj.setTranslation, translation),
partial(obj.setRotation, rotation))
# Initialize collision detection facilities
for manager in self._collision_groups:
manager.setup()
[docs]
def refresh(self) -> bool:
# Update collision object placement
for transform_update in self._transform_updates:
transform_update()
# Update all collision managers
# for manager in self._collision_groups:
# manager.update()
# Check collision for all candidate pairs
for manager_1, manager_2, callback in self._requests:
manager_1.collide(manager_2, callback)
if callback.data.result.isCollision():
return True
return False
@dataclass(unsafe_hash=True)
class _DifferenceFrameXYZQuat(InterfaceQuantity[np.ndarray]):
"""Motion vector representation (VX, VY, VZ, WX, WY, WZ) of the finite
difference between the pose of a given frame at the end of previous and
current agent steps.
The finite difference is defined here as the geodesic distance in SE3 Lie
Group. Under this definition, the rate of change of the translation depends
on rate of change of the orientation of the frame, which may be undesirable
in some cases. Alternatively, the double geodesic distance could be used
instead to completely decouple the position from the orientation.
"""
frame_name: str
"""Name of the frame on which to operate.
"""
mode: QuantityEvalMode
"""Specify on which state to evaluate this quantity. See `QuantityEvalMode`
documentation for details about each mode.
.. warning::
Mode `REFERENCE` requires a reference trajectory to be selected
manually prior to evaluating this quantity for the first time.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Optional[InterfaceQuantity],
frame_name: str,
*,
mode: QuantityEvalMode = QuantityEvalMode.TRUE) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
:param frame_name: Name of the frame on which to operate.
:param reference_frame:
Whether the spatial velocity must be computed in local reference
frame (aka 'pin.LOCAL') or re-aligned with world axes (aka
'pin.LOCAL_WORLD_ALIGNED').
Optional: 'pinocchio.ReferenceFrame.LOCAL' by default.
:param mode: Desired mode of evaluation for this quantity.
Optional: 'QuantityEvalMode.TRUE' by default.
"""
# Backup some user argument(s)
self.frame_name = frame_name
self.mode = mode
# Call base implementation
super().__init__(
env,
parent,
requirements=dict(
xyzquat_stack=(StackedQuantity, dict(
quantity=(FrameXYZQuat, dict(
frame_name=frame_name,
mode=mode)),
max_stack=2))),
auto_refresh=False)
# Define specialize difference operator on SE3 Lie group
self._difference = (
pin.liegroups.SE3().difference) # pylint: disable=no-member
# Pre-allocate memory to store the pose difference
self._data: np.ndarray = np.zeros(6)
def refresh(self) -> np.ndarray:
# Fetch previous and current XYZQuat representation of frame transform.
# It will raise an exception if not enough data is available at this
# point. This should never occur in practice as it will be fine at
# the end of the first step already, before the reward and termination
# conditions are evaluated.
xyzquat_prev, xyzquat = self.xyzquat_stack.get()
# Compute average frame velocity in local frame since previous step
self._data[:] = self._difference(xyzquat_prev, xyzquat)
return self._data
[docs]
@dataclass(unsafe_hash=True)
class AverageFrameXYZQuat(InterfaceQuantity[np.ndarray]):
"""Spatial vector representation (X, Y, Z, QuatX, QuatY, QuatZ, QuatW) of
the average pose of a given frame over the whole agent step.
The average frame pose is obtained by integration of the average velocity
over the whole agent step, backward in time from the state at the end of
the step to the midpoint. See `_DifferenceFrameXYZQuat` documentation for
details.
.. note::
There is a coupling between the rate of change of the orientation over
the agent step and the position of the midpoint. Depending on the
application, it may be desirable to decouple the translation from the
rotation completely by performing computation on the Cartesian Product
of the 3D Euclidean space R3 times the Special Orthogonal Group SO3.
The resulting distance metric is referred to as double geodesic and
does not correspond to the actual shortest path anymore.
"""
frame_name: str
"""Name of the frame on which to operate.
"""
mode: QuantityEvalMode
"""Specify on which state to evaluate this quantity. See `QuantityEvalMode`
documentation for details about each mode.
.. warning::
Mode `REFERENCE` requires a reference trajectory to be selected
manually prior to evaluating this quantity for the first time.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Optional[InterfaceQuantity],
frame_name: str,
*,
mode: QuantityEvalMode = QuantityEvalMode.TRUE) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
:param frame_name: Name of the frame on which to operate.
:param mode: Desired mode of evaluation for this quantity.
Optional: 'QuantityEvalMode.TRUE' by default.
"""
# Backup some user argument(s)
self.frame_name = frame_name
self.mode = mode
# Call base implementation
super().__init__(
env,
parent,
requirements=dict(
xyzquat_next=(FrameXYZQuat, dict(
frame_name=frame_name,
mode=mode)),
xyzquat_diff=(_DifferenceFrameXYZQuat, dict(
frame_name=frame_name,
mode=mode))),
auto_refresh=False)
# Define specialize integrate operator on SE3 Lie group
self._integrate = (
pin.liegroups.SE3().integrate) # pylint: disable=no-member
[docs]
def refresh(self) -> np.ndarray:
# Interpolate the average spatial velocity at midpoint
return self._integrate(
self.xyzquat_next.get(), - 0.5 * self.xyzquat_diff.get())
[docs]
@dataclass(unsafe_hash=True)
class AverageFrameRollPitch(InterfaceQuantity[np.ndarray]):
"""Quaternion representation of the average Yaw-free orientation from the
Roll-Pitch_yaw decomposition of a given frame over the whole agent step.
.. seealso::
See `remove_yaw_from_quat` and `AverageFrameXYZQuat` for details about
the Roll-Pitch-Yaw decomposition and how the average frame pose is
defined respectively.
"""
frame_name: str
"""Name of the frame on which to operate.
"""
mode: QuantityEvalMode
"""Specify on which state to evaluate this quantity. See `QuantityEvalMode`
documentation for details about each mode.
.. warning::
Mode `REFERENCE` requires a reference trajectory to be selected
manually prior to evaluating this quantity for the first time.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Optional[InterfaceQuantity],
frame_name: str,
*,
mode: QuantityEvalMode = QuantityEvalMode.TRUE) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
:param frame_name: Name of the frame on which to operate.
:param mode: Desired mode of evaluation for this quantity.
Optional: 'QuantityEvalMode.TRUE' by default.
"""
# Backup some user argument(s)
self.frame_name = frame_name
self.mode = mode
# Call base implementation
super().__init__(
env,
parent,
requirements=dict(
quat_mean=(MaskedQuantity, dict(
quantity=(AverageFrameXYZQuat, dict(
frame_name=frame_name,
mode=mode)),
axis=0,
keys=(3, 4, 5, 6)))),
auto_refresh=False)
# Twist-free average orientation of the base as a quaternion
self._quat_no_yaw_mean = np.zeros((4,))
[docs]
def refresh(self) -> np.ndarray:
# Compute Yaw-free average orientation
remove_yaw_from_quat(self.quat_mean.get(), self._quat_no_yaw_mean)
return self._quat_no_yaw_mean
[docs]
@dataclass(unsafe_hash=True)
class FrameSpatialAverageVelocity(InterfaceQuantity[np.ndarray]):
"""Average spatial velocity of a given frame at the end of the agent step.
The average spatial velocity is obtained by finite difference. More
precisely, it is defined here as the ratio of the geodesic distance in SE3
Lie Group between the pose of the frame at the end of previous and current
step over the time difference between them. Notably, under this definition,
the linear average velocity jointly depends on rate of change of the
translation and rotation of the frame, which may be undesirable in some
cases. Alternatively, the double geodesic distance could be used instead to
completely decouple the translation from the rotation.
.. note::
The local frame for which the velocity is expressed is defined as the
midpoint interpolation between the previous and current frame pose.
This definition is arbitrary, in a sense that any other point for an
interpolation ratio going from 0.0 (previous pose) to 1.0 (current
pose) would be equally valid.
"""
frame_name: str
"""Name of the frame on which to operate.
"""
reference_frame: pin.ReferenceFrame
"""Whether the spatial velocity must be computed in local reference frame
or re-aligned with world axes.
"""
mode: QuantityEvalMode
"""Specify on which state to evaluate this quantity. See `QuantityEvalMode`
documentation for details about each mode.
.. warning::
Mode `REFERENCE` requires a reference trajectory to be selected
manually prior to evaluating this quantity for the first time.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Optional[InterfaceQuantity],
frame_name: str,
*,
reference_frame: pin.ReferenceFrame = pin.LOCAL,
mode: QuantityEvalMode = QuantityEvalMode.TRUE) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
:param frame_name: Name of the frame on which to operate.
:param reference_frame:
Whether the spatial velocity must be computed in local reference
frame (aka 'pin.LOCAL') or re-aligned with world axes (aka
'pin.LOCAL_WORLD_ALIGNED').
Optional: 'pinocchio.ReferenceFrame.LOCAL' by default.
:param mode: Desired mode of evaluation for this quantity.
Optional: 'QuantityEvalMode.TRUE' by default.
"""
# Make sure at requested reference frame is supported
if reference_frame not in (pin.LOCAL, pin.LOCAL_WORLD_ALIGNED):
raise ValueError("Reference frame must be either 'pin.LOCAL' or "
"'pin.LOCAL_WORLD_ALIGNED'.")
# Backup some user argument(s)
self.frame_name = frame_name
self.reference_frame = reference_frame
self.mode = mode
# Call base implementation
super().__init__(
env,
parent,
requirements=dict(
xyzquat_diff=(_DifferenceFrameXYZQuat, dict(
frame_name=frame_name,
mode=mode)),
quat_mean=(MaskedQuantity, dict(
quantity=(AverageFrameXYZQuat, dict(
frame_name=frame_name,
mode=mode)),
axis=0,
keys=(3, 4, 5, 6)))),
auto_refresh=False)
# Inverse time difference from previous to next state
self._inv_step_dt = 1.0 / self.env.step_dt
# Pre-allocate memory for the spatial velocity
self._v_spatial: np.ndarray = np.zeros(6)
# Reshape linear plus angular velocity vector to vectorize rotation
self._v_lin_ang = self._v_spatial.reshape((2, 3)).T
[docs]
def refresh(self) -> np.ndarray:
# Compute average frame velocity in local frame since previous step
np.multiply(
self.xyzquat_diff.get(), self._inv_step_dt, self._v_spatial)
# Translate local velocity to world frame
if self.reference_frame == pin.LOCAL_WORLD_ALIGNED:
# Define world frame as the "middle" between prev and next pose.
# Here, we only care about the middle rotation, so we can consider
# SO3 Lie Group algebra instead of SE3.
quat_apply(self.quat_mean.get(), self._v_lin_ang, self._v_lin_ang)
return self._v_spatial
[docs]
@dataclass(unsafe_hash=True)
class MultiActuatedJointKinematic(AbstractQuantity[np.ndarray]):
"""Current position, velocity or acceleration of all the actuated joints
of the robot before or after the mechanical transmissions.
In practice, all actuated joints must be 1DoF for now. In the case of
revolute unbounded revolute joints, the principal angle 'theta' is used to
encode the position, not the polar coordinates `(cos(theta), sin(theta))`.
.. warning::
Data is extracted from the true configuration vector instead of using
sensor data. As a result, this quantity is appropriate for computing
reward components and termination conditions but must be avoided in
observers and controllers.
.. warning::
Revolute unbounded joints are not supported for now.
"""
kinematic_level: pin.KinematicLevel
"""Kinematic level to consider, ie position, velocity or acceleration.
"""
is_motor_side: bool
"""Whether the compute kinematic data on motor- or joint-side, ie before or
after their respective mechanical transmision.
"""
def __init__(self,
env: InterfaceJiminyEnv,
parent: Optional[InterfaceQuantity],
*,
kinematic_level: pin.KinematicLevel = pin.POSITION,
is_motor_side: bool = False,
mode: QuantityEvalMode = QuantityEvalMode.TRUE) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
:param kinematic_level: Desired kinematic level, ie position, velocity
or acceleration.
:param is_motor_side: Whether the compute kinematic data on motor- or
joint-side, ie before or after the mechanical
transmisions.
Optional: False by default.
:param mode: Desired mode of evaluation for this quantity.
"""
# Backup some of the user-arguments
self.kinematic_level = kinematic_level
self.is_motor_side = is_motor_side
# Call base implementation
super().__init__(
env,
parent,
requirements=dict(
state=(StateQuantity, dict(
update_kinematics=False))),
mode=mode,
auto_refresh=False)
# Mechanical joint position indices.
# Note that it will only be used in last resort if it can be written as
# a slice. Indeed, "fancy" indexing returns a copy of the original data
# instead of a view, which requires fetching data at every refresh.
self.kinematic_indices: List[int] = []
# Keep track of the mechanical reduction ratio for all the motors
self._joint_to_motor_ratios = np.array([])
# Buffer storing mechanical joint positions
self._data = np.array([])
# Whether mechanical joint positions must be updated at every refresh
self._must_refresh = False
[docs]
def initialize(self) -> None:
# Call base implementation
super().initialize()
# Make sure that the state data meet requirements
state = self.state.get()
if ((self.kinematic_level == pin.ACCELERATION and state.a is None) or
(self.kinematic_level >= pin.VELOCITY and state.v is None)):
raise RuntimeError(
"Available state data do not meet requirements for kinematic "
f"level '{self.kinematic_level}'.")
# Refresh mechanical joint position indices and reduction ratio
joint_to_motor_ratios = []
self.kinematic_indices.clear()
for motor in self.robot.motors:
joint = self.pinocchio_model.joints[motor.joint_index]
joint_type = jiminy.get_joint_type(joint)
if joint_type == jiminy.JointModelType.ROTARY_UNBOUNDED:
raise ValueError(
"Revolute unbounded joints are not supported for now.")
if self.kinematic_level == pin.KinematicLevel.POSITION:
kin_first, kin_last = joint.idx_q, joint.idx_q + joint.nq
else:
kin_first, kin_last = joint.idx_v, joint.idx_v + joint.nv
motor_options = motor.get_options()
mechanical_reduction = motor_options["mechanicalReduction"]
joint_to_motor_ratios.append(mechanical_reduction)
self.kinematic_indices += range(kin_first, kin_last)
self._joint_to_motor_ratios = np.array(joint_to_motor_ratios)
# Determine whether data can be extracted from state by reference
kin_first = min(self.kinematic_indices)
kin_last = max(self.kinematic_indices)
self._must_refresh = True
if self.mode == QuantityEvalMode.TRUE:
try:
if np.all(np.array(self.kinematic_indices) == np.arange(
kin_first, kin_last + 1)):
self._must_refresh = False
else:
warnings.warn(
"Consider using the same ordering for motors and "
"joints for optimal performance.")
except ValueError:
pass
# Try extracting mechanical joint positions by reference if possible
if self._must_refresh:
self._data = np.full((len(self.kinematic_indices),), float("nan"))
else:
state = self.state.get()
if self.kinematic_level == pin.KinematicLevel.POSITION:
self._data = state.q[slice(kin_first, kin_last + 1)]
elif self.kinematic_level == pin.KinematicLevel.VELOCITY:
self._data = state.v[slice(kin_first, kin_last + 1)]
else:
self._data = state.a[slice(kin_first, kin_last + 1)]
[docs]
def refresh(self) -> np.ndarray:
# Update mechanical joint positions only if necessary
state = self.state.get()
if self._must_refresh:
if self.kinematic_level == pin.KinematicLevel.POSITION:
data = state.q
elif self.kinematic_level == pin.KinematicLevel.VELOCITY:
data = state.v
else:
data = state.a
data.take(self.kinematic_indices, None, self._data, "clip")
# Translate encoder data at joint level
if self.is_motor_side:
self._data *= self._joint_to_motor_ratios
return self._data
[docs]
class EnergyGenerationMode(IntEnum):
"""Specify what happens to the energy generated by motors when breaking.
"""
CHARGE = 0
"""The energy flows back to the battery to charge them without any kind of
losses in the process if negative overall.
"""
LOST_EACH = 1
"""The generated energy by each motor individually is lost by thermal
dissipation, without flowing back to the battery nor powering other motors
consuming energy if any.
"""
LOST_GLOBAL = 2
"""The energy is lost by thermal dissipation without flowing back to the
battery if negative overall.
"""
PENALIZE = 3
"""The generated energy by each motor individually is treated as consumed.
"""
# Define proxies for fast lookup
_CHARGE, _LOST_EACH, _LOST_GLOBAL, _PENALIZE = map(int, EnergyGenerationMode)
[docs]
@dataclass(unsafe_hash=True)
class AverageMechanicalPowerConsumption(InterfaceQuantity[float]):
"""Average mechanical power consumption by all the motors over a sliding
time window.
"""
max_stack: int
"""Time horizon over which values of the instantaneous power consumption
will be stacked for computing the average.
"""
generator_mode: EnergyGenerationMode
"""Specify what happens to the energy generated by motors when breaking.
See `EnergyGenerationMode` documentation for details.
"""
mode: QuantityEvalMode
"""Specify on which state to evaluate this quantity. See `QuantityEvalMode`
documentation for details about each mode.
.. warning::
Mode `REFERENCE` requires a reference trajectory to be selected
manually prior to evaluating this quantity for the first time.
"""
def __init__(
self,
env: InterfaceJiminyEnv,
parent: Optional[InterfaceQuantity],
*,
horizon: float,
generator_mode: EnergyGenerationMode = EnergyGenerationMode.CHARGE,
mode: QuantityEvalMode = QuantityEvalMode.TRUE) -> None:
"""
:param env: Base or wrapped jiminy environment.
:param parent: Higher-level quantity from which this quantity is a
requirement if any, `None` otherwise.
:param horizon: Horizon over which values of the quantity will be
stacked before computing the average.
:param generator_mode: Specify what happens to the energy generated by
motors when breaking.
Optional: `EnergyGenerationMode.CHARGE` by
default.
:param mode: Desired mode of evaluation for this quantity.
"""
# Convert horizon in stack length, assuming constant env timestep
max_stack = max(int(np.ceil(horizon / env.step_dt)), 1)
# Backup some of the user-arguments
self.max_stack = max_stack
self.generator_mode = generator_mode
self.mode = mode
# Jit-able method computing the total instantaneous power consumption
@nb.jit(nopython=True, cache=True, fastmath=True)
def _compute_power(generator_mode: int, # EnergyGenerationMode
motor_velocities: np.ndarray,
motor_efforts: np.ndarray) -> float:
"""Compute the total instantaneous mechanical power consumption of
all motors.
:param generator_mode: Specify what happens to the energy generated
by motors when breaking.
:param motor_velocities: Velocity of all the motors before
transmission as a 1D array. The order must
be consistent with the motor indices.
:param motor_efforts: Effort of all the motors before transmission
as a 1D array. The order must be consistent
with the motor indices.
"""
if generator_mode in (_CHARGE, _LOST_GLOBAL):
total_power = np.dot(motor_velocities, motor_efforts)
if generator_mode == _CHARGE:
return total_power
return max(total_power, 0.0)
motor_powers = motor_velocities * motor_efforts
if generator_mode == _LOST_EACH:
return np.sum(np.maximum(motor_powers, 0.0))
return np.sum(np.abs(motor_powers))
# Call base implementation
super().__init__(
env,
parent,
requirements=dict(
total_power_stack=(StackedQuantity, dict(
quantity=(BinaryOpQuantity, dict(
quantity_left=(UnaryOpQuantity, dict(
quantity=(StateQuantity, dict(
update_kinematics=False,
mode=self.mode)),
op=lambda state: state.command)),
quantity_right=(MultiActuatedJointKinematic, dict(
kinematic_level=pin.KinematicLevel.VELOCITY,
is_motor_side=True,
mode=self.mode)),
op=partial(_compute_power, int(self.generator_mode)))),
max_stack=self.max_stack,
as_array=True,
mode='slice'))),
auto_refresh=False)
[docs]
def refresh(self) -> float:
return np.mean(self.total_power_stack.get())