Source code for gym_jiminy.common.blocks.mahony_filter

"""Implementation of Mahony filter block compatible with gym_jiminy
reinforcement learning pipeline environment design.
"""
import logging
from typing import List, Union, Optional

import numpy as np
import numba as nb
import gymnasium as gym

from jiminy_py.core import (  # pylint: disable=no-name-in-module
    ImuSensor as imu)

from ..bases import BaseObsT, BaseActT, BaseObserverBlock, InterfaceJiminyEnv
from ..utils import (fill,
                     matrices_to_quat,
                     swing_from_vector,
                     compute_tilt_from_quat,
                     remove_twist_from_quat)


EARTH_SURFACE_GRAVITY = 9.81

LOGGER = logging.getLogger(__name__)


[docs] @nb.jit(nopython=True, cache=True) def mahony_filter(q: np.ndarray, omega: np.ndarray, gyro: np.ndarray, acc: np.ndarray, bias_hat: np.ndarray, kp: float, ki: float, dt: float) -> None: """Attitude Estimation using Mahony filter. .. note:: This method is fully vectorized, which means that multiple IMU signals can be processed at once. The first dimension corresponds to individual IMU data. :param q: Current orientation estimate as a quaternion to update in-place. The twist part of its twist-after-swing decomposition may have been removed. :param omega: Pre-allocated memory that will be updated to store the estimate of the angular velocity in local frame. :param gyro: Sample of tri-axial Gyroscope in rad/s. It corresponds to the angular velocity in local frame. :param acc: Sample of tri-axial Accelerometer in m/s^2. It corresponds to classical (as opposed to spatial) acceleration of the IMU in local frame minus gravity. :param bias_hat: Current gyroscope bias estimate to update in-place. :param kp: Proportional gain used for gyro-accel sensor fusion. :param ki: Integral gain used for gyro bias estimate. :param dt: Time step, in seconds, between consecutive Quaternions. """ # Compute expected Earth's gravity (Euler-Rodrigues Formula): R(q).T @ e_z v_x, v_y, v_z = compute_tilt_from_quat(q) # Compute the angular velocity using Explicit Complementary Filter: # omega_mes = v_a_hat x v_a, where x is the cross product. v_x_hat, v_y_hat, v_z_hat = acc / EARTH_SURFACE_GRAVITY omega_mes = np.stack(( v_y_hat * v_z - v_z_hat * v_y, v_z_hat * v_x - v_x_hat * v_z, v_x_hat * v_y - v_y_hat * v_x), 0) omega[:] = gyro - bias_hat + kp * omega_mes # Early return if there is no IMU motion if (np.abs(omega) < 1e-6).all(): return # Compute Axis-Angle repr. of the angular velocity: exp3(dt * omega) theta = np.sqrt(np.sum(omega * omega, 0)) axis = omega / theta theta *= dt / 2 (p_x, p_y, p_z), p_w = (axis * np.sin(theta)), np.cos(theta) # Integrate the orientation: q * exp3(dt * omega) q_x, q_y, q_z, q_w = q q[0], q[1], q[2], q[3] = ( q_x * p_w + q_w * p_x - q_z * p_y + q_y * p_z, q_y * p_w + q_z * p_x + q_w * p_y - q_x * p_z, q_z * p_w - q_y * p_x + q_x * p_y + q_w * p_z, q_w * p_w - q_x * p_x - q_y * p_y - q_z * p_z, ) # First order quaternion normalization to prevent compounding of errors q *= (3.0 - np.sum(np.square(q), 0)) / 2 # Update Gyro bias bias_hat -= dt * ki * omega_mes
[docs] @nb.jit(nopython=True, cache=True) def update_twist(q: np.ndarray, twist: np.ndarray, omega: np.ndarray, time_constant_inv: float, dt: float) -> None: """Update the twist estimate of the Twist-after-Swing decomposition of given orientations in quaternion representation using leaky integrator. :param q: Current swing estimate as a quaternion. It will be updated in-place to add the estimated twist. :param twist: Current twist estimate to update in-place. :param omega: Current angular velocity estimate in local frame. :param time_constant_inv: Inverse of the time constant of the leaky integrator used to update the twist estimate. :param dt: Time step, in seconds, between consecutive Quaternions. """ # Compute the derivative of the twist angle: # The element-wise time-derivative of a quaternion is given by: # dq = 0.5 * q * Quaternion(axis=gyro, w=0.0) [1] # The twist around a given axis can be computed as follows: # p = q_axis.dot(twist_axis) * twist_axis [2] # twist = pin.Quaternion(np.array((*p, q_w))).normalized() # The twist angle can be inferred from this expression: # twist = 2 * atan2(q_axis.dot(twist_axis), q_w) [3] # The derivative of twist angle can be derived: # dtwist = 2 * ( # (dq_axis * q_w - q_axis * dq_w).dot(twist_axis) [4] # ) / (q_axis.dot(twist_axis) ** 2 + q_w ** 2) # One can show that if q_s is the swing part of the orientation, then: # q_s_axis.dot(twist_axis) = 0 # It yields: # dtwist = 2 * dq_s_axis.dot(twist_axis) / q_s_w [5] q_x, q_y, _, q_w = q dtwist = (- q_y * omega[0] + q_x * omega[1]) / q_w + omega[2] # Update twist angle using Leaky Integrator scheme to avoid long-term drift twist *= 1.0 - time_constant_inv * dt twist += dtwist * dt # Update quaternion to add estimated twist p_z, p_w = np.sin(0.5 * twist), np.cos(0.5 * twist) q[0], q[1], q[2], q[3] = ( p_w * q_x - p_z * q_y, p_z * q_x + p_w * q_y, p_z * q_w, p_w * q_w, )
[docs] class MahonyFilter( BaseObserverBlock[np.ndarray, np.ndarray, BaseObsT, BaseActT]): """Mahony's Nonlinear Complementary Filter on SO(3). .. seealso:: Robert Mahony, Tarek Hamel, and Jean-Michel Pflimlin "Nonlinear Complementary Filters on the Special Orthogonal Group" IEEE Transactions on Automatic Control, Institute of Electrical and Electronics Engineers, 2008, 53 (5), pp.1203-1217: https://hal.archives-ouvertes.fr/hal-00488376/document .. warning:: This filter works best for 'observe_dt' smaller or equal to 5ms. Its performance drops rapidly beyond this point. Having 'observe_dt' equal to 10ms is generally acceptable but the yaw estimate is drifting anyway even for fairly slow motions and without sensor noise and bias. """ def __init__(self, name: str, env: InterfaceJiminyEnv[BaseObsT, BaseActT], *, twist_time_constant: Optional[float] = 0.0, exact_init: bool = True, kp: Union[np.ndarray, float] = 1.0, ki: Union[np.ndarray, float] = 0.1, update_ratio: int = 1) -> None: """ :param name: Name of the block. :param env: Environment to connect with. :param twist_time_constant: If specified, it corresponds to the time constant of the leaky integrator used to estimate the twist part of twist-after-swing decomposition of the estimated orientation in place of the Mahony Filter. If `0.0`, then its is kept constant equal to zero. `None` to kept the original estimate provided by Mahony Filter. See `remove_twist_from_quat` and `update_twist` doc for details. Optional: `0.0` by default. :param exact_init: Whether to initialize orientation estimate using accelerometer measurements or ground truth. `False` is not recommended because the robot is often free-falling at init, which is not realistic anyway. Optional: `True` by default. :param mahony_kp: Proportional gain used for gyro-accel sensor fusion. Set it to 0.0 to use only the gyroscope. In such a case, the orientation estimate would be exact if the sensor is bias- and noise-free, and the update period matches the simulation integrator timestep. Optional: `1.0` by default. :param mahony_ki: Integral gain used for gyro bias estimate. Optional: `0.1` by default. :param update_ratio: Ratio between the update period of the observer and the one of the subsequent observer. -1 to match the simulation timestep of the environment. Optional: `1` by default. """ # Handling of default argument(s) num_imu_sensors = len(env.robot.sensor_names[imu.type]) if isinstance(kp, float): kp = np.full((num_imu_sensors,), kp) if isinstance(ki, float): ki = np.full((num_imu_sensors,), ki) # Backup some of the user arguments self.exact_init = exact_init self.kp = kp self.ki = ki # Keep track of how the twist must be computed self.twist_time_constant_inv: Optional[float] if twist_time_constant is None: self.twist_time_constant_inv = None else: if twist_time_constant > 0.0: self.twist_time_constant_inv = 1.0 / twist_time_constant else: self.twist_time_constant_inv = float("inf") self._remove_twist = self.twist_time_constant_inv is not None self._update_twist = ( self.twist_time_constant_inv is not None and np.isfinite(self.twist_time_constant_inv)) # Whether the observer has been initialized. # This flag must be managed internally because relying on # `self.env.is_simulation_running` is not possible for observer blocks. # Unlike `compute_command`, the simulation is already running when # `refresh_observation`` is called for the first time of an episode. self._is_initialized = False # Whether the observer has been compiled already. # This is necessary avoid raising a timeout exception during the first # simulation step. It is not reliable to only check if `mahony_filter` # has been compiled once, because a different environment may have been # involved, for which `mahony_filter` may be another signature, # triggering yet another compilation. self._is_compiled = False # Define gyroscope and accelerometer proxies for fast access. # Note that they will be initialized in `_setup` method. self.gyro, self.acc = np.array([]), np.array([]) # Allocate gyroscope bias estimate self._bias = np.zeros((3, num_imu_sensors)) # Allocate twist angle estimate around z-axis in world frame self._twist = np.zeros((1, num_imu_sensors)) # Store the estimate angular velocity to avoid redundant computations self._omega = np.zeros((3, num_imu_sensors)) # Initialize the observer super().__init__(name, env, update_ratio) def _initialize_state_space(self) -> None: """Configure the internal state space of the observer. It corresponds to the current gyroscope bias estimate. The twist angle is not part of the internal state although being integrated over time because it is uniquely determined from the orientation estimate. """ # Strictly speaking, 'q_prev' is part of the internal state of the # observer since it is involved in its computations. Yet, it is not an # internal state of the (partially observable) MDP since the previous # observation must be provided anyway when integrating the observable # dynamics by definition. num_imu_sensors = len(self.env.robot.sensor_names[imu.type]) self.state_space = gym.spaces.Box( low=np.full((3, num_imu_sensors), -np.inf), high=np.full((3, num_imu_sensors), np.inf), dtype=np.float64) def _initialize_observation_space(self) -> None: """Configure the observation space of the observer. It corresponds to the current orientation estimate for all the IMUs of the robot at once, with special treatment for their twist part. See `__init__` documentation for details. """ num_imu_sensors = len(self.env.robot.sensor_names[imu.type]) self.observation_space = gym.spaces.Box( low=np.full((4, num_imu_sensors), -1.0 - 1e-9), high=np.full((4, num_imu_sensors), 1.0 + 1e-9), dtype=np.float64) def _setup(self) -> None: # Call base implementation super()._setup() # Fix initialization of the observation to be valid quaternions self.observation[-1] = 1.0 # Make sure observe update is discrete-time if self.env.observe_dt <= 0.0: raise ValueError( "This block does not support time-continuous update.") # Refresh gyroscope and accelerometer proxies self.gyro, self.acc = np.split( self.env.sensor_measurements[imu.type], 2) # Reset the sensor bias fill(self._bias, 0) # Reset the twist estimate fill(self._twist, 0) # Warn if 'observe_dt' is too large to provide a meaningful if self.observe_dt > 0.01 + 1e-6: LOGGER.warning( "Beware 'observe_dt' (%s) is too large for Mahony filters to " "provide a meaningful estimate of the IMU orientations. It " "should not exceed 10ms.", self.observe_dt) # Call `refresh_observation` manually to make sure that all the jitted # method of it control flow has been compiled. if not self._is_compiled: self._is_initialized = False for _ in range(2): self.refresh_observation(self.env.observation) self._is_compiled = True # Consider that the observer is not initialized anymore self._is_initialized = False
[docs] def get_state(self) -> np.ndarray: return self._bias
@property def fieldnames(self) -> List[List[str]]: sensor_names = self.env.robot.sensor_names[imu.type] return [[f"{name}.Quat{e}" for name in sensor_names] for e in ("x", "y", "z", "w")]
[docs] def refresh_observation(self, measurement: BaseObsT) -> None: # Re-initialize the quaternion estimate if no simulation running. # It corresponds to the rotation transforming 'acc' in 'e_z'. if not self._is_initialized: if not self.exact_init: if (np.abs(self.acc) < 0.1 * EARTH_SURFACE_GRAVITY).all(): LOGGER.warning( "The robot is free-falling. Impossible to initialize " "Mahony filter for 'exact_init=False'.") else: # Try to determine the orientation of the IMU from its # measured acceleration at initialization. This approach is # not very accurate because the initial acceleration is # often jerky, plus the tilt is not observable at all. acc = self.acc / np.linalg.norm(self.acc, axis=0) swing_from_vector(acc, self.observation) self._is_initialized = True if not self._is_initialized: # Get true orientation of IMU frames imu_rots = [] robot = self.env.robot for name in robot.sensor_names[imu.type]: sensor = robot.get_sensor(imu.type, name) assert isinstance(sensor, imu) rot = robot.pinocchio_data.oMf[sensor.frame_index].rotation imu_rots.append(rot) # Convert it to quaternions matrices_to_quat(tuple(imu_rots), self.observation) # Keep track of tilt if necessary if self._update_twist: self._twist[:] = np.arctan2( self.observation[2], self.observation[3]) self._is_initialized = True return # Run an iteration of the filter, computing the next state estimate mahony_filter(self.observation, self._omega, self.gyro, self.acc, self._bias, self.kp, self.ki, self.observe_dt) # Remove twist if requested if self._remove_twist: remove_twist_from_quat(self.observation) if self._update_twist: update_twist(self.observation, self._twist, self._omega, self.twist_time_constant_inv, self.observe_dt)