"""Generic environment to learn locomotion skills for legged robots using
Jiminy simulator as physics engine.
"""
from typing import Optional, Dict, Union, Callable, Any
import numpy as np
import gym
from jiminy_py.core import (EncoderSensor as encoder,
EffortSensor as effort,
ContactSensor as contact,
ForceSensor as force,
ImuSensor as imu,
PeriodicGaussianProcess,
Robot)
from jiminy_py.simulator import Simulator
import pinocchio as pin
from ..utils import sample
from .env_generic import BaseJiminyEnv
GROUND_STIFFNESS_LOG_RANGE = (5.5, 7.0)
GROUND_DAMPING_RATIO_RANGE = (0.2, 1.0)
GROUND_FRICTION_RANGE = (0.8, 8.0)
F_IMPULSE_DT = 10.0e-3
F_IMPULSE_PERIOD = 2.0
F_IMPULSE_DELTA = 0.25
F_IMPULSE_SCALE = 1000.0
F_PROFILE_SCALE = 50.0
F_PROFILE_WAVELENGTH = 0.2
F_PROFILE_PERIOD = 1.0
FLEX_STIFFNESS_SCALE = 1000
FLEX_DAMPING_SCALE = 10
SENSOR_DELAY_SCALE = {
encoder.type: 3.0e-3,
effort.type: 0.0,
contact.type: 0.0,
force.type: 0.0,
imu.type: 0.0
}
SENSOR_NOISE_SCALE = {
encoder.type: np.array([0.0, 0.02]),
effort.type: np.array([10.0]),
contact.type: np.array([2.0, 2.0, 2.0, 10.0, 10.0, 10.0]),
force.type: np.array([2.0, 2.0, 2.0]),
imu.type: np.array([0.0, 0.0, 0.0, 0.02, 0.02, 0.02, 0.2, 0.2, 0.2])
}
DEFAULT_SIMULATION_DURATION = 20.0 # (s) Default simulation duration
DEFAULT_STEP_DT = 1.0e-3 # (s) Stepper update period
DEFAULT_HLC_TO_LLC_RATIO = 1 # (NA)
ForceImpulseType = Dict[str, Union[str, float, np.ndarray]]
ForceProfileFunc = Callable[[float, np.ndarray, np.ndarray, np.ndarray], None]
ForceProfileType = Dict[str, Union[str, ForceProfileFunc]]
[docs]class WalkerJiminyEnv(BaseJiminyEnv):
"""Gym environment for learning locomotion skills for legged robots using
torque control directly.
Jiminy Engine is used to perform physics evaluation, and Meshcat is used
for rendering.
The observation and action spaces are unchanged wrt `BaseJiminyEnv`.
"""
metadata = {
'render.modes': ['human'],
}
def __init__(self,
urdf_path: Optional[str],
hardware_path: Optional[str] = None,
mesh_path: Optional[str] = None,
simu_duration_max: float = DEFAULT_SIMULATION_DURATION,
step_dt: float = DEFAULT_STEP_DT,
enforce_bounded_spaces: Optional[bool] = False,
reward_mixture: Optional[dict] = None,
std_ratio: Optional[dict] = None,
config_path: Optional[str] = None,
avoid_instable_collisions: bool = True,
debug: bool = False, *,
robot: Optional[Robot] = None,
**kwargs: Any) -> None:
r"""
:param urdf_path: Path of the urdf model to be used for the simulation.
:param hardware_path: Path of Jiminy hardware description toml file.
Optional: Looking for '\*_hardware.toml' file in
the same folder and with the same name.
:param mesh_path: Path to the folder containing the model meshes.
Optional: Env variable 'JIMINY_DATA_PATH' will be
used if available.
:param simu_duration_max: Maximum duration of a simulation before
returning done.
:param step_dt: Simulation timestep for learning.
:param enforce_bounded_spaces:
Whether or not to enforce finite bounds for the observation and
action spaces. If so, '\*_MAX' are used whenever it is necessary.
:param reward_mixture: Weighting factors of selected contributions to
total reward.
:param std_ratio: Relative standard deviation of selected contributions
to environment stochasticity.
:param config_path: Configuration toml file to import. It will be
imported AFTER loading the hardware description
file. It can be automatically generated from an
instance by calling `export_config_file` method.
Optional: Looking for '\*_options.toml' file in the
same folder and with the same name. If not found,
using default configuration.
:param avoid_instable_collisions: Prevent numerical instabilities by
replacing collision mesh by vertices
of associated minimal volume bounding
box, and replacing primitive box by
its vertices.
:param debug: Whether or not the debug mode must be activated.
Doing it enables telemetry recording.
:param robot: Robot being simulated, already instantiated and
initialized. Build default robot using 'urdf_path',
'hardware_path' and 'mesh_path' if omitted.
Optional: None by default.
:param kwargs: Keyword arguments to forward to `BaseJiminyEnv` class.
"""
# Handling of default arguments
if reward_mixture is None:
reward_mixture = {
'direction': 0.0,
'energy': 0.0,
'done': 1.0
}
reward_mixture = {k: v for k, v in reward_mixture.items() if v > 0.0}
if std_ratio is None:
std_ratio = {
'model': 0.0,
'ground': 0.0,
'sensors': 0.0,
'disturbance': 0.0,
}
std_ratio = {k: v for k, v in std_ratio.items() if v > 0.0}
# Backup user arguments
self.simu_duration_max = simu_duration_max
self.reward_mixture = reward_mixture
self.urdf_path = urdf_path
self.mesh_path = mesh_path
self.hardware_path = hardware_path
self.config_path = config_path
self.std_ratio = std_ratio
self.avoid_instable_collisions = avoid_instable_collisions
# Robot and engine internal buffers
self._f_xy_profile = [
PeriodicGaussianProcess(F_PROFILE_WAVELENGTH, F_PROFILE_PERIOD),
PeriodicGaussianProcess(F_PROFILE_PERIOD, F_PROFILE_PERIOD)]
self._power_consumption_max = 0.0
self._height_neutral = 0.0
# Configure the backend simulator
if robot is None:
assert isinstance(self.urdf_path, str)
simulator = Simulator.build(
urdf_path=self.urdf_path,
hardware_path=self.hardware_path,
mesh_path=self.mesh_path,
config_path=self.config_path,
avoid_instable_collisions=self.avoid_instable_collisions,
debug=debug,
**{**dict(
has_freeflyer=True,
use_theoretical_model=False),
**kwargs})
else:
# Instantiate a simulator and load the options
simulator = Simulator(robot, **kwargs)
simulator.import_options(config_path)
# Initialize base class
super().__init__(
simulator, step_dt, enforce_bounded_spaces, debug, **kwargs)
[docs] def _setup(self) -> None:
"""Configure the environment.
It is doing the following steps, successively:
- updates some proxies that will be used for computing the
reward and termination condition,
- enforce some options of the low-level robot and engine,
- randomize the environment according to 'std_ratio'.
.. note::
This method is called internally by `reset` method at the very
beginning. One must overide it to implement new contributions to
the environment stochasticity, or to create custom low-level robot
if the model must be different for each learning episode.
"""
# Call the base implementation
super()._setup()
if not self.robot.has_freeflyer:
raise RuntimeError(
"`WalkerJiminyEnv` only supports robots with freeflyer.")
# Update some internal buffers used for computing the reward
motor_effort_limit = self.robot.pinocchio_model.effortLimit[
self.robot.motors_velocity_idx]
motor_velocity_limit = self.robot.velocity_limit[
self.robot.motors_velocity_idx]
self._power_consumption_max = sum(
motor_effort_limit * motor_velocity_limit)
# Compute the height of the freeflyer in neutral configuration
# TODO: Take into account the ground profile.
q_init, _ = self._sample_state()
self._height_neutral = q_init[2]
# Get the options of robot and engine
robot_options = self.robot.get_options()
engine_options = self.simulator.engine.get_options()
# Make sure to log at least required data for reward
# computation and log replay
engine_options['telemetry']['enableConfiguration'] = True
engine_options['telemetry']['enableVelocity'] = True
engine_options['telemetry']['enableForceExternal'] = \
'disturbance' in self.std_ratio.keys()
# Enable the flexible model
robot_options["model"]["dynamics"]["enableFlexibleModel"] = True
# Set maximum number of iterations by simulation seconds in average
engine_options["stepper"]["iterMax"] = \
int(self.simu_duration_max / 1.0e-4)
# Set maximum computation time for single internal integration steps
if self.debug:
engine_options["stepper"]["timeout"] = 0.0
else:
engine_options["stepper"]["timeout"] = 1.0
# ============= Add some stochasticity to the environment =============
# Change ground friction and sprint-dumper contact dynamics
ground_std_ratio = self.std_ratio.get('ground', 0.0)
ground_stiffness = sample(
*GROUND_STIFFNESS_LOG_RANGE, scale=ground_std_ratio,
enable_log_scale=True, rg=self.rg)
ground_damping_critic = 2.0 * np.sqrt(
ground_stiffness * self.robot.pinocchio_data.mass[0])
engine_options['contacts']['stiffness'] = ground_stiffness
engine_options['contacts']['damping'] = ground_damping_critic * sample(
*GROUND_DAMPING_RATIO_RANGE, scale=ground_std_ratio, rg=self.rg)
engine_options['contacts']['friction'] = sample(
*GROUND_FRICTION_RANGE, scale=ground_std_ratio, rg=self.rg)
# Add sensor noise, bias and delay
if 'sensors' in self.std_ratio.keys():
for sensor in (encoder, effort, contact, force, imu):
sensors_options = robot_options["sensors"][sensor.type]
for sensor_options in sensors_options.values():
sensor_options['delay'] = sample(
0.0, (self.std_ratio['sensors'] *
SENSOR_DELAY_SCALE[sensor.type]), rg=self.rg)
sensor_options['delay'] = sample(
0.0, (self.std_ratio['sensors'] *
SENSOR_NOISE_SCALE[sensor.type]), rg=self.rg)
# Randomize the flexibility parameters
if 'model' in self.std_ratio.keys():
dynamics_options = robot_options["model"]["dynamics"]
for flexibility in dynamics_options["flexibilityConfig"]:
flexibility['stiffness'] += FLEX_STIFFNESS_SCALE * sample(
scale=self.std_ratio['model'], rg=self.rg)
flexibility['damping'] += FLEX_DAMPING_SCALE * sample(
scale=self.std_ratio['model'], rg=self.rg)
# Apply the disturbance to the first actual body
if 'disturbance' in self.std_ratio.keys():
# Make sure the pinocchio model has at least one frame
assert self.robot.pinocchio_model.nframes
# Determine the actual root body of the kinematic tree
is_root_found = False
frame = self.robot.pinocchio_model.frames[0]
for frame in self.robot.pinocchio_model.frames:
if frame.type == pin.FrameType.BODY and frame.parent == 1:
is_root_found = True
break
if is_root_found:
frame_name = frame.name
else:
raise RuntimeError(
"There is an issue with the robot model. Impossible to "
"determine the root joint.")
# Schedule some external impulse forces applied on PelvisLink
for t_ref in np.arange(
0.0, self.simu_duration_max, F_IMPULSE_PERIOD)[1:]:
t = t_ref + sample(scale=F_IMPULSE_DELTA, rg=self.rg)
f_xy = sample(dist='normal', shape=(2,), rg=self.rg)
f_xy /= np.linalg.norm(f_xy, ord=2)
f_xy *= sample(
0.0, self.std_ratio['disturbance']*F_IMPULSE_SCALE,
rg=self.rg)
self.simulator.register_force_impulse(
frame_name, t, F_IMPULSE_DT, np.pad(f_xy, (0, 4)))
# Schedule a single periodic force profile applied on PelvisLink
for func in self._f_xy_profile:
func.reset()
self.simulator.register_force_profile(
frame_name, self._force_external_profile)
# Set the options, finally
self.robot.set_options(robot_options)
self.simulator.engine.set_options(engine_options)
[docs] def _get_time_space(self) -> gym.Space:
"""Get time space.
It takes advantage of knowing the maximum simulation duration to shrink
down the range. Note that observation will be out-of-bounds steps are
performed after this point.
"""
return gym.spaces.Box(
low=0.0, high=self.simu_duration_max, shape=(1,),
dtype=np.float64)
[docs] def _force_external_profile(self,
t: float,
q: np.ndarray,
v: np.ndarray,
wrench: np.ndarray) -> None:
"""User-specified processing of external force profiles.
Typical usecases are time rescaling (1.0 second by default), or
changing the orientation of the force (x/y in world frame by default).
It could also be used for clamping the force.
.. warning::
Beware it updates 'wrench' by reference for the sake of efficiency.
:param t: Current time.
:param q: Current configuration vector of the robot.
:param v: Current velocity vector of the robot.
:param wrench: Force to apply on the robot as a vector (linear and
angular) [Fx, Fy, Fz, Mx, My, Mz].
"""
# pylint: disable=unused-argument
wrench[0] = F_PROFILE_SCALE * self._f_xy_profile[0](t)
wrench[1] = F_PROFILE_SCALE * self._f_xy_profile[1](t)
wrench[:2] *= self.std_ratio['disturbance']
[docs] def is_done(self) -> bool: # type: ignore[override]
"""Determine whether the episode is over.
The termination conditions are the following:
- fall detection (enabled if the robot has a freeflyer):
the freeflyer goes lower than 75% of its height in
neutral configuration.
- maximum simulation duration exceeded
"""
# pylint: disable=arguments-differ
if not self.simulator.is_simulation_running:
raise RuntimeError(
"No simulation running. Please start one before calling this "
"method.")
if self.system_state.q[2] < self._height_neutral * 0.75:
return True
if self.simulator.stepper_state.t >= self.simu_duration_max:
return True
return False
[docs] def compute_reward(self, # type: ignore[override]
info: Dict[str, Any]) -> float:
"""Compute reward at current episode state.
It computes the reward associated with each individual contribution
according to 'reward_mixture'.
.. note::
This method can be overwritten to implement new contributions to
the reward, or to monitor more information.
:returns: Total reward.
"""
# pylint: disable=arguments-differ
reward_dict = info.setdefault('reward', {})
# Define some proxies
reward_mixture_keys = self.reward_mixture.keys()
if 'energy' in reward_mixture_keys:
v_mot = self.robot.sensors_data[encoder.type][1]
command = self.system_state.command
power_consumption = np.sum(np.maximum(command * v_mot, 0.0))
power_consumption_rel = \
power_consumption / self._power_consumption_max
reward_dict['energy'] = - power_consumption_rel
if 'done' in reward_mixture_keys:
reward_dict['done'] = 1.0
# Compute the total reward
reward_total = sum([self.reward_mixture[name] * value
for name, value in reward_dict.items()])
return reward_total
[docs] def compute_reward_terminal(self, *, info: Dict[str, Any]) -> float:
"""Compute the reward at the end of the episode.
It computes the terminal reward associated with each individual
contribution according to 'reward_mixture'.
"""
reward_dict = info.setdefault('reward', {})
reward_mixture_keys = self.reward_mixture.keys()
# Add a negative reward proportional to the average deviation on
# Y-axis. It is equal to 0.0 if the frontal displacement is perfectly
# symmetric wrt Y-axis over the whole trajectory.
if 'direction' in reward_mixture_keys:
frontal_displacement = abs(np.mean(self.log_data[
'HighLevelController.currentFreeflyerPositionTransY']))
reward_dict['direction'] = - frontal_displacement
# Compute the total reward
reward_total = sum([self.reward_mixture[name] * value
for name, value in reward_dict.items()])
return reward_total