Source code for gym_jiminy.common.blocks.proportional_derivative_controller

"""Implementation of basic Proportional-Derivative controller block compatible
with gym_jiminy reinforcement learning pipeline environment design.
"""
import warnings
from typing import List, Union

import numpy as np
import numba as nb
import gymnasium as gym

import jiminy_py.core as jiminy
from jiminy_py.core import (  # pylint: disable=no-name-in-module
    EncoderSensor, array_copyto)

from ..bases import BaseObsT, InterfaceJiminyEnv, BaseControllerBlock
from ..utils import fill


# Name of the n-th position derivative
N_ORDER_DERIVATIVE_NAMES = ("Position", "Velocity", "Acceleration")


[docs] @nb.jit(nopython=True, cache=True, inline='always', fastmath=True) def integrate_zoh(state: np.ndarray, state_min: np.ndarray, state_max: np.ndarray, dt: float) -> None: """Second order approximate integration scheme assuming Zero-Order-Hold for the acceleration, taking position, velocity and acceleration bounds into account. Internally, it simply chains two first-order integrators in cascade. The acceleration will be updated in-place if clipping is necessary to satisfy bounds. :param state: Current state, ordered from lowest to highest derivative order, ie: s[i](t) = s[i](t-1) + integ_{t-1}^{t}(s[i+1](t)), as a 2D array whose first dimension gathers the 3 derivative orders. It will be updated in-place. :param state_min: Lower bounds of the state as a 2D array. :param state_max: Upper bounds of the state as a 2D array. :param dt: Integration delta of time since previous state update. """ # Make sure that dt is not negative assert dt >= 0.0, "Integration backward in time is not supported." # Early return if timestep is too small if abs(dt) < 1e-9: return # Split position, velocity and acceleration for convenience position, velocity, acceleration = state # Loop over motors individually as it is faster than masked vectorization _, dim = state.shape for i in range(dim): # Split position, velocity and acceleration bounds position_min, velocity_min, acceleration_min = state_min[:, i] position_max, velocity_max, acceleration_max = state_max[:, i] # Clip acceleration acceleration[i] = min( max(acceleration[i], acceleration_min), acceleration_max) # Backup the initial velocity to later compute the clipped acceleration velocity_prev = velocity[i] # Integrate acceleration 1-step ahead velocity[i] += acceleration[i] * dt # Make sure that "true" velocity bounds are satisfied velocity[i] = min(max(velocity[i], velocity_min), velocity_max) # Force slowing down early enough to avoid violating acceleration # limits when hitting position bounds. horizon = max( int(abs(velocity_prev) / acceleration_max / dt) * dt, dt) position_min_delta = position_min - position[i] position_max_delta = position_max - position[i] if horizon > dt: drift = 0.5 * (horizon * (horizon - dt)) * acceleration_max position_min_delta -= drift position_max_delta += drift velocity_min = position_min_delta / horizon velocity_max = position_max_delta / horizon velocity[i] = min(max(velocity[i], velocity_min), velocity_max) # Velocity after hitting bounds must be cancellable in a single step if np.abs(velocity[i]) > dt * acceleration_max: velocity_min = - max( position_min_delta / velocity[i], dt) * acceleration_max velocity_max = max( position_max_delta / velocity[i], dt) * acceleration_max velocity[i] = min(max(velocity[i], velocity_min), velocity_max) # Back-propagate velocity clipping at the acceleration-level acceleration[i] = (velocity[i] - velocity_prev) / dt # Integrate position 1-step ahead position[i] += dt * velocity[i]
[docs] @nb.jit(nopython=True, cache=True, fastmath=True) def pd_controller(q_measured: np.ndarray, v_measured: np.ndarray, command_state: np.ndarray, command_state_lower: np.ndarray, command_state_upper: np.ndarray, kp: np.ndarray, kd: np.ndarray, motors_effort_limit: np.ndarray, control_dt: float, out: np.ndarray) -> None: """Compute command torques under discrete-time proportional-derivative feedback control. Internally, it integrates the command state (position, velocity and acceleration) at controller update period in order to obtain the desired motor positions 'q_desired' and velocities 'v_desired', takes into account some lower and upper bounds. By computing them this way, the target motor positions and velocities can be interpreted as targets that has be to reach right before updating the command once again. Enforcing consistency between target position and velocity in such a way is a necessary but insufficient condition for the motors to be able to track them. The command effort is computed as follows: tau = - kp * ((q_measured - q_desired) + kd * (v_measured - v_desired)) The torque will be held constant until the next controller update. .. seealso:: See `PDController` documentation to get more information, and `integrate_zoh` documentation for details about the state integration. :param q_measured: Current position of the actuators. :param v_measured: Current velocity of the actuators. :param command_state: Current command state, namely, all the derivatives of the target motors positions up to acceleration order. :param command_state_lower: Lower bound of the command state that must be satisfied at all cost. :param command_state_upper: Upper bound of the command state that must be satisfied at all cost. :param kp: PD controller position-proportional gain in motor order. :param kd: PD controller velocity-proportional gain in motor order. :param motors_effort_limit: Maximum effort that the actuators can output. :param control_dt: Controller update period. It will be involved in the integration of the command state. :param out: Pre-allocated memory to store the command motor torques. """ # Integrate target motor positions and velocities integrate_zoh(command_state, command_state_lower, command_state_upper, control_dt) # Extract targets motors positions and velocities from command state q_target, v_target, _ = command_state # Compute the joint tracking error q_error, v_error = q_target - q_measured, v_target - v_measured # Compute PD command out[:] = kp * (q_error + kd * v_error) # Clip the command motors torques before returning out[:] = np.minimum(np.maximum( out, -motors_effort_limit), motors_effort_limit)
[docs] @nb.jit(nopython=True, cache=True, fastmath=True) def pd_adapter(action: np.ndarray, order: int, command_state: np.ndarray, command_state_lower: np.ndarray, command_state_upper: np.ndarray, dt: float, out: np.ndarray) -> None: """Compute the target motor accelerations that must be held constant for a given time interval in order to reach the desired value of some derivative of the target motor positions at the end of that interval if possible. Internally, it applies backward in time the same integration scheme as the PD controller. Knowing the initial and final value of the derivative over the time interval, constant target motor accelerations can be uniquely deduced. In practice, it consists in successive clipped finite difference of that derivative up to acceleration-level. :param action: Desired value of the n-th derivative of the command motor positions at the end of the controller update. :param order: Derivative order of the position associated with the action. :param command_state: Current command state, namely, all the derivatives of the target motors positions up to acceleration order. :param command_state_lower: Lower bound of the command state that must be satisfied at all cost. :param command_state_upper: Upper bound of the command state that must be satisfied at all cost. :param dt: Time interval during which the target motor accelerations will be held constant. :param out: Pre-allocated memory to store the target motor accelerations. """ # Update command accelerations based on the action and its derivative order if order == 2: # The action corresponds to the command motor accelerations out[:] = action else: if order == 0: # Compute command velocity velocity = (action - command_state[0]) / dt # Clip command velocity velocity = np.minimum(np.maximum( velocity, command_state_lower[1]), command_state_upper[1]) else: # The action corresponds to the command motor velocities velocity = action # Compute command acceleration out[:] = (velocity - command_state[1]) / dt
[docs] def get_encoder_to_motor_map(robot: jiminy.Robot) -> Union[slice, List[int]]: """Get the mapping from encoder sensors to motors. .. warning:: If reordering is necessary, then a list of indices is returned, which can used to permute encoder sensor data to match the command torque vector. Yet, it relies on so-called "fancy" or "advanced" indexing for doing so, which means that the returned data is a copy of the original data instead of a reference. On the contrary, it reordering is not necessary, a slice is returned instead and no copy happens whatsoever. :param robot: Jiminy robot for which to compute mapping. :returns: A slice if possible, a list of indices otherwise. """ # Loop over all actuated joints and look for their encoder counterpart encoder_to_motor_map = [-1,] * robot.nmotors encoders = robot.sensors[EncoderSensor.type] for motor in robot.motors: for i, sensor in enumerate(encoders): assert isinstance(sensor, EncoderSensor) if motor.index == sensor.motor_index: encoder_to_motor_map[sensor.index] = motor.index del encoders[i] break else: raise RuntimeError( f"No encoder sensor associated with motor '{motor.name}'. " "Every actuated joint must have a encoder sensor attached on " "motor side.") # Try converting it to slice if possible if (np.array(encoder_to_motor_map) == np.arange(robot.nmotors)).all(): return slice(None) return encoder_to_motor_map
[docs] class PDController( BaseControllerBlock[np.ndarray, np.ndarray, BaseObsT, np.ndarray]): """Low-level Proportional-Derivative controller. The action are the target motors accelerations. The latter are integrated twice using two first-order integrators in cascade, considering that the acceleration is constant until the next controller update: v_{t+1} = v_{t} + dt * a_{t} q_{t+1} = q_{t} + dt * v_{t+1} .. note:: The position and velocity bounds on the command corresponds to the joint limits specified by the dynamical model of the robot. Then, lax default acceleration bounds are extrapolated. More precisely, they are chosen to be sufficient either to span the whole range of velocity or to allow reaching the command effort limits depending on what is the most restrictive. Position, velocity and acceleration. .. warning:: It must be connected directly to the base environment to control without any intermediary controllers altering its action space. """ def __init__(self, name: str, env: InterfaceJiminyEnv[BaseObsT, np.ndarray], *, update_ratio: int = 1, kp: Union[float, List[float], np.ndarray], kd: Union[float, List[float], np.ndarray], joint_position_margin: float = 0.0, joint_velocity_limit: float = float("inf"), joint_acceleration_limit: float = float("inf")) -> None: """ :param name: Name of the block. :param env: Environment to connect with. :param update_ratio: Ratio between the update period of the controller and the one of the subsequent controller. -1 to match the simulation timestep of the environment. Optional: 1 by default. :param kp: PD controller position-proportional gains in motor order. :param kd: PD controller velocity-proportional gains in motor order. :param joint_position_margin: Minimum distance of the joint target positions from their respective bounds. Optional: 0.0 by default. :param joint_velocity_limit: Restrict maximum joint target velocities wrt their hardware specifications. Optional: "inf" by default. :param joint_acceleration_limit: Maximum joint target acceleration. Optional: "inf" by default. """ # Make sure the action space of the environment has not been altered if env.action_space is not env.unwrapped.action_space: raise RuntimeError( "Impossible to connect this block on an environment whose " "action space has been altered.") # Make sure that the number of PD gains matches the number of motors try: kp = np.broadcast_to(kp, (env.robot.nmotors,)) kd = np.broadcast_to(kd, (env.robot.nmotors,)) except ValueError as e: raise TypeError( "PD gains inconsistent with number of motors.") from e # Backup some user argument(s) self.kp = kp self.kd = kd # Mapping from motors to encoders self.encoder_to_motor_map = get_encoder_to_motor_map(env.robot) # Whether stored reference to encoder measurements are already in the # same order as the motors, allowing skipping re-ordering entirely. self._is_same_order = isinstance(self.encoder_to_motor_map, slice) if not self._is_same_order: warnings.warn( "Consider using the same ordering for encoders and motors for " "optimal performance.") # Define buffers storing information about the motors for efficiency. # Note that even if the robot instance may change from one simulation # to another, the observation and action spaces are required to stay # the same the whole time. This induces that the motors effort limit # must not change unlike the mapping from full state to motors. self.motors_effort_limit = np.array([ motor.effort_limit for motor in env.robot.motors]) # Refresh mechanical reduction ratio encoder_to_joint_ratio = [] for motor in env.robot.motors: motor_options = motor.get_options() encoder_to_joint_ratio.append(motor_options["mechanicalReduction"]) # Define the motors target position bounds motors_position_lower = np.array([ motor.position_limit_lower + ratio * joint_position_margin for motor, ratio in zip(env.robot.motors, encoder_to_joint_ratio)]) motors_position_upper = np.array([ motor.position_limit_upper - ratio * joint_position_margin for motor, ratio in zip(env.robot.motors, encoder_to_joint_ratio)]) # Define the motors target velocity bounds motors_velocity_limit = np.array([ min(motor.velocity_limit, ratio * joint_velocity_limit) for motor, ratio in zip(env.robot.motors, encoder_to_joint_ratio)]) # Define acceleration bounds allowing unrestricted bang-bang control range_limit = 2 * motors_velocity_limit / env.step_dt effort_limit = self.motors_effort_limit / ( self.kp * env.step_dt * np.maximum(env.step_dt / 2, self.kd)) target_acceleration_limit = np.array([ ratio * joint_acceleration_limit for ratio in encoder_to_joint_ratio]) acceleration_limit = np.minimum( np.minimum(range_limit, effort_limit), target_acceleration_limit) # Compute command state bounds self._command_state_lower = np.stack([motors_position_lower, -motors_velocity_limit, -acceleration_limit], axis=0) self._command_state_upper = np.stack([motors_position_upper, motors_velocity_limit, acceleration_limit], axis=0) # Extract measured motor positions and velocities for fast access. # Note that they will be initialized in `_setup` method. self.q_measured, self.v_measured = np.array([]), np.array([]) # Allocate memory for the command state self._command_state = np.zeros((3, env.robot.nmotors)) # Initialize the controller super().__init__(name, env, update_ratio) # References to command acceleration for fast access self._command_acceleration = self._command_state[2] def _initialize_state_space(self) -> None: """Configure the state space of the controller. The state spaces corresponds to the target motors positions and velocities. """ self.state_space = gym.spaces.Box( low=self._command_state_lower[:2], high=self._command_state_upper[:2], dtype=np.float64) def _initialize_action_space(self) -> None: """Configure the action space of the controller. The action spaces corresponds to the target motors accelerations. """ self.action_space = gym.spaces.Box( low=self._command_state_lower[2], high=self._command_state_upper[2], dtype=np.float64) def _setup(self) -> None: # Call base implementation super()._setup() # Make sure control update is discrete-time if self.env.control_dt <= 0.0: raise ValueError( "This block does not support time-continuous update.") # Refresh measured motor positions and velocities proxies self.q_measured, self.v_measured = ( self.env.sensor_measurements[EncoderSensor.type]) # Reset the command state fill(self._command_state, 0) @property def fieldnames(self) -> List[str]: return [f"currentTarget{N_ORDER_DERIVATIVE_NAMES[2]}{motor.name}" for motor in self.env.robot.motors]
[docs] def get_state(self) -> np.ndarray: return self._command_state[:2]
[docs] def compute_command(self, action: np.ndarray, command: np.ndarray) -> None: """Compute the target motor torques using a PD controller. It is proportional to the error between the observed motors positions/ velocities and the target ones. .. warning:: Calling this method manually while a simulation is running is forbidden, because it would mess with the controller update period. :param action: Desired target motor acceleration. :param command: Current motor torques that will be updated in-place. """ # Re-initialize the command state to the current motor state if the # simulation is not running. This must be done here because the # command state must be valid prior to calling `refresh_observation` # for the first time, which happens at `reset`. is_simulation_running = self.env.is_simulation_running if not is_simulation_running: for i, value in enumerate((self.q_measured, self.v_measured)): np.clip(value, self._command_state_lower[i], self._command_state_upper[i], out=self._command_state[i]) # Extract motor positions and velocity from encoder data q_measured, v_measured = self.q_measured, self.v_measured if not self._is_same_order: q_measured = q_measured[self.encoder_to_motor_map] v_measured = v_measured[self.encoder_to_motor_map] # Update target motor accelerations array_copyto(self._command_acceleration, action) # Compute the motor efforts using PD control. # The command state must not be updated if no simulation is running. pd_controller( q_measured, v_measured, self._command_state, self._command_state_lower, self._command_state_upper, self.kp, self.kd, self.motors_effort_limit, self.control_dt if is_simulation_running else 0.0, command)
[docs] class PDAdapter( BaseControllerBlock[np.ndarray, np.ndarray, BaseObsT, np.ndarray]): """Adapt the action of a lower-level Proportional-Derivative controller to be the target motor positions or velocities rather than accelerations. The action is the desired value of some derivative of the target motor positions. The target motor accelerations are then deduced so as to reach this value by the next update of this controller if possible without exceeding the position, velocity and acceleration bounds. Finally, these target position accelerations are passed to a lower-level PD controller, usually running at a higher frequency. .. note:: The higher the derivative order of the action, the smoother the command motor torques. Thus, a high order is generally beneficial for robotic applications. However, it introduces some kind of delay between the action and its effects. This phenomenon limits the responsiveness of the agent and therefore impedes its optimal performance. Besides, it introduces addition layer of indirection between the action and its effect which may be difficult to grasp for the agent. Finally, exploration usually consists in addition temporally uncorrelated gaussian random process at action-level. The effect of such random processes tends to vanish when integrated, making exploration very inefficient. """ def __init__(self, name: str, env: InterfaceJiminyEnv[BaseObsT, np.ndarray], *, update_ratio: int = -1, order: int = 1) -> None: """ :param update_ratio: Ratio between the update period of the controller and the one of the subsequent controller. -1 to match the environment step `env.step_dt`. Optional: -1 by default. :param order: Derivative order of the action. It accepts position or velocity (respectively 0 or 1). Optional: 1 by default. """ # Make sure that the specified derivative order is valid assert order in (0, 1), "Derivative order out-of-bounds" # Make sure that a PD controller block is already connected controller = env.controller # type: ignore[attr-defined] if not isinstance(controller, PDController): raise RuntimeError( "This block must be directly connected to a lower-level " "`PDController` block.") # Backup some user argument(s) self.order = order # Define some proxies for convenience self._pd_controller = controller # Initialize the controller super().__init__(name, env, update_ratio) def _initialize_action_space(self) -> None: """Configure the action space of the controller. The action spaces corresponds to the N-th order derivative of the target motors positions. """ self.action_space = gym.spaces.Box( low=self._pd_controller._command_state_lower[self.order], high=self._pd_controller._command_state_upper[self.order], dtype=np.float64) @property def fieldnames(self) -> List[str]: return [f"nextTarget{N_ORDER_DERIVATIVE_NAMES[self.order]}{motor.name}" for motor in self.env.robot.motors]
[docs] def compute_command(self, action: np.ndarray, command: np.ndarray) -> None: """Compute the target motor accelerations from the desired value of some derivative of the target motor positions. :param action: Desired target motor acceleration. :param command: Current motor torques that will be updated in-place. """ pd_adapter( action, self.order, self._pd_controller._command_state, self._pd_controller._command_state_lower, self._pd_controller._command_state_upper, self.control_dt, command)