Source code for gym_jiminy.common.blocks.body_orientation_observer

"""Implementation of a stateless body orientation state estimator block
compatible with gym_jiminy reinforcement learning pipeline environment design.
"""
from collections import OrderedDict
from collections.abc import Mapping
from typing import List, Dict, Sequence, Union, Optional

import numpy as np
import numba as nb
import gymnasium as gym

from jiminy_py.core import ImuSensor  # pylint: disable=no-name-in-module

from ..bases import BaseAct, BaseObs, BaseObserverBlock, InterfaceJiminyEnv
from ..wrappers.observation_layout import NestedData
from ..utils import (DataNested,
                     fill,
                     quat_to_rpy,
                     matrices_to_quat,
                     quat_multiply,
                     quat_apply,
                     remove_twist_from_quat)


[docs] @nb.jit(nopython=True, cache=True) def update_twist(q: np.ndarray, twist: np.ndarray, omega: np.ndarray, time_constant_inv: float, dt: float) -> None: """Update the twist estimate of the Twist-after-Swing decomposition of given orientations in quaternion representation using leaky integrator. :param q: Current swing estimate as a quaternion. It will be updated in-place to add the estimated twist. :param twist: Current twist estimate to update in-place. :param omega: Current angular velocity estimate in local frame. :param time_constant_inv: Inverse of the time constant of the leaky integrator used to update the twist estimate. :param dt: Time step, in seconds, between consecutive Quaternions. """ # Compute the derivative of the twist angle: # The element-wise time-derivative of a quaternion is given by: # dq = 0.5 * q * Quaternion(axis=gyro, w=0.0) [1] # The twist around a given axis can be computed as follows: # p = q_axis.dot(twist_axis) * twist_axis [2] # twist = pin.Quaternion(np.array((*p, q_w))).normalized() # The twist angle can be inferred from this expression: # twist = 2 * atan2(q_axis.dot(twist_axis), q_w) [3] # The derivative of twist angle can be derived: # dtwist = 2 * ( # (dq_axis * q_w - q_axis * dq_w).dot(twist_axis) [4] # ) / (q_axis.dot(twist_axis) ** 2 + q_w ** 2) # One can show that if q_s is the swing part of the orientation, then: # q_s_axis.dot(twist_axis) = 0 # It yields: # dtwist = 2 * dq_s_axis.dot(twist_axis) / q_s_w [5] q_x, q_y, _, q_w = q dtwist = (- q_y * omega[0] + q_x * omega[1]) / q_w + omega[2] # Update twist angle using Leaky Integrator scheme to avoid long-term drift twist *= max(0.0, 1.0 - time_constant_inv * dt) twist += dtwist * dt # Update quaternion to add estimated twist p_z, p_w = np.sin(0.5 * twist), np.cos(0.5 * twist) q[0], q[1], q[2], q[3] = ( p_w * q_x - p_z * q_y, p_z * q_x + p_w * q_y, p_z * q_w, p_w * q_w)
[docs] class BodyObserver(BaseObserverBlock[ Dict[str, np.ndarray], np.ndarray, BaseObs, BaseAct]): """Compute the orientation and angular velocity in local frame of the parent body associated with all the IMU sensors of the robot. .. note:: The twist angle is not part of the internal state, even if being integrated over time, because it is uniquely determined from the orientation estimate. """ def __init__(self, name: str, env: InterfaceJiminyEnv[BaseObs, BaseAct], *, nested_imu_quat_key: NestedData = ( "features", "mahony_filter", "quat"), nested_imu_omega_key: NestedData = ( "features", "mahony_filter", "omega",), twist_time_constant: Optional[float] = None, compute_rpy: bool = True, update_ratio: int = 1) -> None: """ :param name: Name of the block. :param env: Environment to connect with. :param nested_imu_quat_key: Nested key from environment observation mapping to the IMU quaternion estimates. Their ordering must be consistent with the true IMU sensors of the robot. :param nested_imu_omega_key: Nested key from environment observation mapping to the IMU angular velocity in local frame estimates. Their ordering must be consistent with the true IMU sensors of the robot. :param twist_time_constant: If specified, it corresponds to the time constant of the leaky integrator (Exponential Moving Average) used to estimate the twist part of twist-after-swing decomposition of the estimated orientation in place of the Mahony Filter. If `0.0`, then its is kept constant equal to zero. `None` to kept the original estimate provided by Mahony Filter. See `remove_twist_from_quat` and `update_twist` documentations for details. Optional: `0.0` by default. :param compute_rpy: Whether to compute the Yaw-Pitch-Roll Euler angles representation for the 3D orientation of the IMU, in addition to the quaternion representation. Optional: True by default. :param update_ratio: Ratio between the update period of the observer and the one of the subsequent observer. -1 to match the simulation timestep of the environment. Optional: `1` by default. """ # Backup some of the user-argument(s) self.compute_rpy = compute_rpy # Keep track of how the twist must be computed self.twist_time_constant_inv: Optional[float] if twist_time_constant is None: self.twist_time_constant_inv = None else: if twist_time_constant > 0.0: self.twist_time_constant_inv = 1.0 / twist_time_constant else: self.twist_time_constant_inv = float("inf") self._remove_twist = self.twist_time_constant_inv is not None self._update_twist = ( self.twist_time_constant_inv is not None and np.isfinite(self.twist_time_constant_inv)) # Define observed / estimated IMU data proxies for fast access obs_imu_data_list: List[np.ndarray] = [] for nested_imu_key in (nested_imu_quat_key, nested_imu_omega_key): obs_imu_data: DataNested = env.observation for key in nested_imu_key: if isinstance(key, str): assert isinstance(obs_imu_data, Mapping) obs_imu_data = obs_imu_data[key] elif isinstance(key, int): assert isinstance(obs_imu_data, Sequence) obs_imu_data = obs_imu_data[key] else: assert isinstance(obs_imu_data, np.ndarray) slices: List[Union[int, slice]] = [] for start_end in key: if isinstance(start_end, int): slices.append(start_end) elif not start_end: slices.append(slice(None,)) else: slices.append(slice(*start_end)) obs_imu_data = obs_imu_data[tuple(slices)] assert isinstance(obs_imu_data, np.ndarray) obs_imu_data_list.append(obs_imu_data) self._obs_imu_quats, self._obs_imu_omegas = obs_imu_data_list # Extract the relative IMU frame placement wrt its parent body imu_rel_rot_mats: List[np.ndarray] = [] for sensor in env.robot.sensors[ImuSensor.type]: frame = env.robot.pinocchio_model.frames[sensor.frame_index] imu_rel_rot_mats.append(frame.placement.rotation) self._imu_rel_quats = matrices_to_quat(tuple(imu_rel_rot_mats)) # Allocate twist angle estimate around z-axis in world frame. num_imu_sensors = len(env.robot.sensors[ImuSensor.type]) self._twist = np.zeros((1, num_imu_sensors)) # Initialize the observer super().__init__(name, env, update_ratio) # Define proxies for the body orientations and angular velocities self._quat = self.observation["quat"] if self.compute_rpy: self._rpy = self.observation["rpy"] else: self._rpy = np.array([]) self._omega = self.observation["omega"] def _initialize_observation_space(self) -> None: num_imu_sensors = len(self.env.robot.sensors[ImuSensor.type]) observation_space: Dict[str, gym.Space] = OrderedDict() observation_space["quat"] = gym.spaces.Box( low=np.full((4, num_imu_sensors), -1.0 - 1e-9), high=np.full((4, num_imu_sensors), 1.0 + 1e-9), dtype=np.float64) if self.compute_rpy: high = np.array([np.pi, np.pi/2, np.pi]) + 1e-9 observation_space["rpy"] = gym.spaces.Box( low=-high[:, np.newaxis].repeat(num_imu_sensors, axis=1), high=high[:, np.newaxis].repeat(num_imu_sensors, axis=1), dtype=np.float64) observation_space["omega"] = gym.spaces.Box( low=float('-inf'), high=float('inf'), shape=(3, num_imu_sensors), dtype=np.float64) self.observation_space = gym.spaces.Dict( **observation_space) # type: ignore[arg-type] def _setup(self) -> None: # Call base implementation super()._setup() # Reset the twist estimate fill(self._twist, 0) # Fix initialization of the observation to be valid quaternions self._quat[-1] = 1.0 @property def fieldnames(self) -> Dict[str, List[List[str]]]: imu_sensors = self.env.robot.sensors[ImuSensor.type] fieldnames: Dict[str, List[List[str]]] = {} fieldnames["quat"] = [ [".".join((sensor.name, f"Quat{e}")) for sensor in imu_sensors] for e in ("X", "Y", "Z", "W")] if self.compute_rpy: fieldnames["rpy"] = [ [".".join((sensor.name, e)) for sensor in imu_sensors] for e in ("Roll", "Pitch", "Yaw")] fieldnames["omega"] = [ [".".join((sensor.name, e)) for sensor in imu_sensors] for e in ("X", "Y", "Z")] return fieldnames
[docs] def refresh_observation(self, measurement: BaseObs) -> None: # Compute the parent body orientations quat_multiply(self._obs_imu_quats, self._imu_rel_quats, out=self._quat, is_right_conjugate=True) # Compute the parent body angular velocities in local frame. # Note that batched "quaternion apply" is faster than sequential # "rotation matrix apply" in practice, both when using `np.einsum` or # single-threaded numba jitted implementation. quat_apply(self._imu_rel_quats, self._obs_imu_omegas, out=self._omega) # Remove twist if requested if self._remove_twist: remove_twist_from_quat(self._quat) # Update twist if requested if self._update_twist: update_twist(self._quat, self._twist, self._omega, self.twist_time_constant_inv, self.observe_dt) # Compute the RPY representation if requested if self.compute_rpy: quat_to_rpy(self._quat, self._rpy)