Source code for gym_jiminy.common.blocks.motor_safety_limit

"""Implementation of basic Proportional-Derivative controller block compatible
with gym_jiminy reinforcement learning pipeline environment design.
"""
import warnings
from typing import List

import numpy as np
import numba as nb

from jiminy_py.core import EncoderSensor  # pylint: disable=no-name-in-module

from .proportional_derivative_controller import get_encoder_to_motor_map

from ..bases import (BaseObsT,
                     InterfaceJiminyEnv,
                     BaseControllerBlock,
                     BasePipelineWrapper,
                     ControlledJiminyEnv)


[docs] @nb.jit(nopython=True, cache=True, fastmath=True) def apply_safety_limits(command: np.ndarray, q_measured: np.ndarray, v_measured: np.ndarray, kp: np.ndarray, kd: np.ndarray, motors_soft_position_lower: np.ndarray, motors_soft_position_upper: np.ndarray, motors_velocity_limit: np.ndarray, motors_effort_limit: np.ndarray, out: np.ndarray) -> None: """Clip the command torque to ensure safe operation. It acts on each actuator independently and only activate close to the position or velocity limits. Basically, the idea to the avoid moving faster when some prescribed velocity limit or exceeding soft position bounds by forcing the command torque to act against it. Still, it may not be enough to prevent such issue in practice as the command torque is bounded. .. warning:: All the input arguments must be in motor order, including the measured position and velocity of the actuators. In practice, those measurements comes from the encoder sensors. If so, then the measurements must be re-ordered if necessary to match motor order instead of sensor order. .. seealso:: See `MotorSafetyLimit` documentation for details. :param command: Desired command to will be updated in-place if necessary. :param q_measured: Current position of the actuators. :param v_measured: Current velocity of the actuators. :param kp: Scale of the velocity bound triggered by position limits. :param kd: Scale of the effort bound triggered by velocity limits. :param motors_soft_position_lower: Soft lower position limit of the actuators. :param motors_soft_position_upper: Soft upper position limit of the actuators. :param motors_velocity_limit: Maximum velocity of the actuators. :param motors_effort_limit: Maximum effort that the actuators can output. The command torque cannot exceed this limits, not even if needed to enforce safe operation. :param out: Pre-allocated memory to store the command motor torques. """ # Computes velocity bounds based on margin from soft joint limit if any safe_velocity_lower = motors_velocity_limit * np.minimum(np.maximum( -kp * (q_measured - motors_soft_position_lower), -1.0), 1.0) safe_velocity_upper = motors_velocity_limit * np.minimum(np.maximum( -kp * (q_measured - motors_soft_position_upper), -1.0), 1.0) # Computes effort bounds based on velocity and effort bounds safe_effort_lower = motors_effort_limit * np.minimum(np.maximum( -kd * (v_measured - safe_velocity_lower), -1.0), 1.0) safe_effort_upper = motors_effort_limit * np.minimum(np.maximum( -kd * (v_measured - safe_velocity_upper), -1.0), 1.0) # Clip command according to safe effort bounds out[:] = np.minimum(np.maximum( command, safe_effort_lower), safe_effort_upper)
[docs] class MotorSafetyLimit( BaseControllerBlock[np.ndarray, np.ndarray, BaseObsT, np.ndarray]): """Safety mechanism primarily designed to prevent hardware damage and premature wear, but also to temper violent, sporadic and dangerous motions. A velocity limit v+ is enforced by bounding the commanded effort such that no effort can be applied to push the joint beyond the velocity limit, and a damping effort is applied if the joint is moving at a velocity beyond the limit, ie -kd * (v - v+). When the joint is near the soft limits x+/-, the velocities are bounded to keep the position from crossing the soft limits. The k_position term determines the scale of the bound on velocity, ie v+/- = -kp * (x - x+/-). These bounds on velocity are the ones determining the bounds on effort. .. note:: The prescribed position and velocity limits may be more respective that the actual hardware specification of the robot. .. warning:: It must be connected directly to the environment without any other intermediary controller. .. seealso:: See official ROS documentation and implementation for details: https://wiki.ros.org/pr2_controller_manager/safety_limits https://github.com/PR2/pr2_mechanism/blob/melodic-devel/pr2_mechanism_model/src/joint.cpp """ # noqa: E501 # pylint: disable=line-too-long def __init__(self, name: str, env: InterfaceJiminyEnv[BaseObsT, np.ndarray], *, kp: float, kd: float, soft_position_margin: float, soft_velocity_max: float) -> None: """ :param name: Name of the block. :param env: Environment to connect with. :param kp: Scale of the velocity bound triggered by position limits. :param kd: Scale of the effort bound triggered by velocity limits. :param soft_position_margin: Minimum distance of the current joint positions from their respective bounds before starting to break. :param soft_velocity_max: Maximum velocity of the joint before starting to break. """ # Make sure that no other controller has been added prior to this block env_unwrapped: InterfaceJiminyEnv = env while isinstance(env_unwrapped, BasePipelineWrapper): if isinstance(env_unwrapped, ControlledJiminyEnv): raise TypeError( "No other control block must be added prior to this one.") env_unwrapped = env_unwrapped.env # Backup some user argument(s) self.kp = kp self.kd = kd # Refresh mechanical reduction ratio encoder_to_joint_ratio = [] for motor in env.robot.motors: motor_options = motor.get_options() encoder_to_joint_ratio.append(motor_options["mechanicalReduction"]) # Define buffers storing information about the motors for efficiency self.motors_position_lower = np.array([ motor.position_limit_lower + ratio * soft_position_margin for motor, ratio in zip(env.robot.motors, encoder_to_joint_ratio)]) self.motors_position_upper = np.array([ motor.position_limit_upper - ratio * soft_position_margin for motor, ratio in zip(env.robot.motors, encoder_to_joint_ratio)]) self.motors_velocity_limit = np.array([ min(motor.velocity_limit, ratio * soft_velocity_max) for motor, ratio in zip(env.robot.motors, encoder_to_joint_ratio)]) self.motors_effort_limit = np.array([ motor.effort_limit for motor in env.robot.motors]) self.motors_effort_limit[ self.motors_position_lower > self.motors_position_upper] = 0.0 # Mapping from motors to encoders self.encoder_to_motor_map = get_encoder_to_motor_map(env.robot) # Whether stored reference to encoder measurements are already in the # same order as the motors, allowing skipping re-ordering entirely. self._is_same_order = isinstance(self.encoder_to_motor_map, slice) if not self._is_same_order: warnings.warn( "Consider using the same ordering for encoders and motors for " "optimal performance.") # Extract measured motor positions and velocities for fast access. # Note that they will be initialized in `_setup` method. self.q_measured, self.v_measured = np.array([]), np.array([]) # Initialize the controller super().__init__(name, env, update_ratio=1) def _initialize_action_space(self) -> None: """Configure the action space of the controller. The action spaces corresponds to the command motors efforts. """ self.action_space = self.env.action_space def _setup(self) -> None: # Call base implementation super()._setup() # Refresh measured motor positions and velocities proxies self.q_measured, self.v_measured = ( self.env.sensor_measurements[EncoderSensor.type]) @property def fieldnames(self) -> List[str]: return [f"currentMotorTorque{motor.name}" for motor in self.env.robot.motors]
[docs] def compute_command(self, action: np.ndarray, command: np.ndarray) -> None: """Apply safety limits to the desired motor torques right before sending it to the robot so as to avoid exceeded prescribed position and velocity limits. :param action: Desired motor torques to apply on the robot. :param command: Current motor torques that will be updated in-place. """ # Extract motor positions and velocity from encoder data q_measured, v_measured = self.q_measured, self.v_measured if not self._is_same_order: q_measured = q_measured[self.encoder_to_motor_map] v_measured = v_measured[self.encoder_to_motor_map] # Clip command according to safe effort bounds apply_safety_limits(action, q_measured, v_measured, self.kp, self.kd, self.motors_position_lower, self.motors_position_upper, self.motors_velocity_limit, self.motors_effort_limit, command)