# mypy: disable-error-code="attr-defined, name-defined"
"""Utilities for extracting structured information from log data, from
reconstructing the robot to reading telemetry variables.
"""
import os
import re
import tempfile
from bisect import bisect_right
from collections import OrderedDict
from typing import (
Any, Callable, List, Dict, Optional, Sequence, Union, Literal, Type,
overload)
import numpy as np
from . import core as jiminy
from . import tree
from .core import ( # pylint: disable=no-name-in-module
EncoderSensor as encoder,
EffortSensor as effort,
ContactSensor as contact,
ForceSensor as force,
ImuSensor as imu)
from .robot import _fix_urdf_mesh_path
from .dynamics import State, TrajectoryDataType
SENSORS_FIELDS: Dict[
Type[jiminy.AbstractSensor], Union[List[str], Dict[str, List[str]]]
] = {
encoder: encoder.fieldnames,
effort: effort.fieldnames,
contact: contact.fieldnames,
force: {
k: [e[len(k):] for e in force.fieldnames if e.startswith(k)]
for k in ['F', 'M']
},
imu: {
k: [e[len(k):] for e in imu.fieldnames if e.startswith(k)]
for k in ['Quat', 'Gyro', 'Accel']
}
}
FieldNested = Union[Dict[str, 'FieldNested'], Sequence['FieldNested'], str]
read_log = jiminy.core.Engine.read_log
@overload
def extract_variables_from_log(log_vars: Dict[str, np.ndarray],
fieldnames: FieldNested,
namespace: str = "",
*, as_dict: Literal[False] = False
) -> List[np.ndarray]:
...
@overload
def extract_variables_from_log(log_vars: Dict[str, np.ndarray],
fieldnames: FieldNested,
namespace: str = "",
*, as_dict: Literal[True]
) -> Dict[str, np.ndarray]:
...
[docs]
def build_robot_from_log(
log_data: Dict[str, Any],
mesh_path_dir: Optional[str] = None,
mesh_package_dirs: Sequence[str] = (),
*, robot_name: str = ""
) -> jiminy.Robot:
"""Create and initialize a robot from a single- or multi- robot simulation.
.. warning::
If the robot to be built is from a multi-robot simulation, then its
name needs to be specified explicitly. Alternatively, one can load all
robots simultaneously in log file using `build_robots_from_log`.
.. note::
Model options and `robot.pinocchio_model` will be the same as during
the simulation until the next call to `reset` method unless the options
of the robot that has been restored are overwritten manually.
.. note::
It returns a valid and fully initialized robot, that can be used to
perform new simulation if added to a Jiminy Engine, but the original
controller is lost.
.. warning::
It does ot require the original URDF file to exist, but the original
mesh paths (if any) must be valid since they are not bundle in the log
archive for now.
:param log_data: Logged data (constants and variables) as a dictionary.
:param mesh_path_dir: Overwrite the common root of all absolute mesh paths.
It which may be necessary to read log generated on a
different environment.
:param mesh_package_dirs: Prepend custom mesh package search path
directories to the ones provided by log file. It
may be necessary to specify it to read log
generated on a different environment.
:param robot_name: Name of the robot to build from log.
:returns: Reconstructed robot, and parsed log data as returned by
`jiminy_py.log.read_log` method.
"""
# Instantiate empty robot
robot = jiminy.Robot(robot_name)
# Extract log constants
log_constants = log_data["constants"]
try:
pinocchio_model = log_constants[
".".join(filter(None, (robot_name, "pinocchio_model")))]
except KeyError as e:
if robot_name == "":
raise ValueError(
"No robot with empty name has been found. Specify a name or "
"call `build_robots_from_log`.") from e
try:
# Extract geometry models
collision_model = log_constants[
".".join(filter(None, (robot_name, "collision_model")))]
visual_model = log_constants[
".".join(filter(None, (robot_name, "visual_model")))]
# Initialize the model
robot.initialize(pinocchio_model, collision_model, visual_model)
except KeyError as e:
# Extract initialization arguments
urdf_data = log_constants[
".".join(filter(None, (robot_name, "urdf_file")))]
has_freeflyer = int(log_constants[
".".join(filter(None, (robot_name, "has_freeflyer")))])
mesh_package_dirs = [*mesh_package_dirs, *log_constants.get(
".".join(filter(None, (robot_name, "mesh_package_dirs"))), ())]
# Make sure urdf data is available
if len(urdf_data) <= 1:
raise RuntimeError(
"Impossible to build robot. The log is not persistent and the "
"robot was not associated with a valid URDF file.") from e
# Write urdf data in temporary file
urdf_path = os.path.join(
tempfile.gettempdir(),
f"{next(tempfile._get_candidate_names())}.urdf")
with open(urdf_path, "xb") as f:
f.write(urdf_data)
# Fix the mesh paths in the URDF model if requested
if mesh_path_dir is not None:
fixed_urdf_path = _fix_urdf_mesh_path(urdf_path, mesh_path_dir)
os.remove(urdf_path)
urdf_path = fixed_urdf_path
# Initialize model
robot.initialize(urdf_path, has_freeflyer, mesh_package_dirs)
# Delete temporary file
os.remove(urdf_path)
# Load the options
all_options = log_constants["options"]
robot.set_options(all_options[robot_name or "robot"])
# Update model in-place.
# Note that `__setstate__` re-allocates memory instead of just calling
# the copy assignment operator. Although this is undesirable, there is
# no better way on Python side. Anyway, this is not an issue in this
# particular case since the robot has just been created, so nobody got
# references to pre-allocated data at this point.
robot.pinocchio_model.__setstate__(pinocchio_model.__getstate__())
# Allocate corresponding pinocchio data manually
pinocchio_data = pinocchio_model.createData()
robot.pinocchio_data.__setstate__(pinocchio_data.__getstate__())
return robot
[docs]
def build_robots_from_log(
log_data: Dict[str, Any],
mesh_path_dir: Optional[str] = None,
mesh_package_dirs: Sequence[str] = (),
) -> Sequence[jiminy.Robot]:
"""Build all the robots in the log of the simulation.
.. note::
This function calls `build_robot_from_log` to build each robot.
Refer to `build_robot_from_log` for more information.
:param log_data: Logged data (constants and variables) as a dictionary.
:param mesh_path_dir: Overwrite the common root of all absolute mesh paths.
It which may be necessary to read log generated on a
different environment.
:param mesh_package_dirs: Prepend custom mesh package search path
directories to the ones provided by log file. It
may be necessary to specify it to read log
generated on a different environment.
:returns: Sequence of reconstructed robots.
"""
# Try to infer robot names from log constants
robot_names = []
for key in log_data["constants"].keys():
robot_name_match = re.match(r"^(\w*?)\.?has_freeflyer$", key)
if robot_name_match is not None:
robot_names.append(robot_name_match.group(1))
# Build all the robots sequentially
robots = []
for robot_name in robot_names:
robot = build_robot_from_log(
log_data, mesh_path_dir, mesh_package_dirs, robot_name=robot_name)
robots.append(robot)
return robots
[docs]
def update_sensor_measurements_from_log(
log_data: Dict[str, Any],
robot: jiminy.Model
) -> Callable[[float, np.ndarray, np.ndarray], None]:
"""Helper to make it easy to emulate sensor data update based on log data.
.. note::
It returns an update_hook that can forwarding the `Viewer.replay` to
display sensor information such as contact forces for instance.
:param log_data: Logged data (constants and variables) as a dictionary.
:param robot: Jiminy robot associated with the logged trajectory.
:returns: Callable taking update time in argument and returning nothing.
Note that it does not through an exception if out-of-range, but
rather clip to desired time to the available data range.
"""
# Extract time from log
log_vars = log_data["variables"]
times = log_vars["Global.Time"]
# Filter sensors whose data is available
sensors_set, sensors_log = [], []
for sensor_type, sensor_names in robot.sensor_names.items():
sensor_fieldnames = getattr(jiminy, sensor_type).fieldnames
for name in sensor_names:
sensor = robot.get_sensor(sensor_type, name)
log_fieldnames = [
'.'.join((sensor.name, field)) for field in sensor_fieldnames]
if log_fieldnames[0] in log_vars.keys():
sensor_log = np.stack([
log_vars[field] for field in log_fieldnames], axis=-1)
sensors_set.append(sensor)
sensors_log.append(sensor_log)
def update_hook(t: float,
q: np.ndarray, # pylint: disable=unused-argument
v: np.ndarray # pylint: disable=unused-argument
) -> None:
nonlocal times, sensors_set, sensors_log
# Get surrounding indices in log data
i = bisect_right(times, t)
i_prev, i_next = max(i - 1, 0), min(i, len(times) - 1)
# Early return if no more data is available
if i_next == i_prev:
return
# Compute current time ratio
ratio = (t - times[i_prev]) / (times[i_next] - times[i_prev])
# Update sensors data
for sensor, sensor_log, in zip(sensors_set, sensors_log):
value_prev, value_next = sensor_log[i_prev], sensor_log[i_next]
value = value_prev + (value_next - value_prev) * ratio
sensor.data = value
return update_hook