Source code for jiminy_py.log
# mypy: disable-error-code="attr-defined, name-defined"
"""Utilities for extracting structured information from log data, from
reconstructing the robot to reading telemetry variables.
"""
import re
from bisect import bisect_right
from itertools import zip_longest, starmap
from collections import OrderedDict
from typing import (
Any, Callable, List, Dict, Optional, Sequence, Union, Literal, overload,
cast)
import numpy as np
from . import core as jiminy
from . import tree
from .dynamics import State, Trajectory
FieldNested = Union[Dict[str, 'FieldNested'], Sequence['FieldNested'], str]
read_log = jiminy.core.Engine.read_log
@overload
def extract_variables_from_log(log_vars: Dict[str, np.ndarray],
fieldnames: FieldNested,
namespace: str = "",
*, as_dict: Literal[False] = False
) -> List[np.ndarray]:
...
@overload
def extract_variables_from_log(log_vars: Dict[str, np.ndarray],
fieldnames: FieldNested,
namespace: str = "",
*, as_dict: Literal[True]
) -> Dict[str, np.ndarray]:
...
[docs]
def extract_variables_from_log(log_vars: Dict[str, np.ndarray],
fieldnames: FieldNested,
namespace: str = "",
*, as_dict: bool = False
) -> Union[
List[np.ndarray], Dict[str, np.ndarray]]:
"""Extract values associated with a set of variables in a specific
namespace.
:param log_vars: Logged variables as a dictionary.
:param fieldnames: Structured fieldnames.
:param namespace: Namespace of the fieldnames. Empty string to disable.
Optional: Empty by default.
:param as_dict: Whether to return a dictionary mapping flattened fieldnames
to values.
Optional: True by default.
"""
# Extract values from log if it exists
if as_dict:
keys: List[str] = []
values: List[np.ndarray] = []
for fieldname_path, fieldname in tree.flatten_with_path(fieldnames):
# A key is the concatenation of namespace and full path of fieldname
key = ".".join(filter(
lambda key: isinstance(key, str) and key, # type: ignore[arg-type]
(namespace, *fieldname_path, fieldname)))
# Raise an exception if the key does not exists and not fail safe
if key not in log_vars:
raise KeyError(f"Variable '{key}' not found in log file.")
# Extract the value corresponding to the key
if as_dict:
keys.append(key)
values.append(log_vars[key])
# Return flat mapping from fieldnames (without prefix) to scalar values
if as_dict:
return OrderedDict(zip(keys, values))
return values
[docs]
def build_robot_from_log(
log_data: Dict[str, Any],
mesh_path_dir: Optional[str] = None,
mesh_package_dirs: Sequence[str] = (),
*, robot_name: Optional[str] = None
) -> jiminy.Robot:
"""Create and initialize a robot from single- or multi- robot simulations.
.. note::
It returns a valid and fully initialized robot, that can be used to
perform new simulation if added to a Jiminy Engine, but the original
controller is lost.
.. note::
Model options and `robot.pinocchio_model` will be the same as during
the simulation until the next call to `reset` or `set_options` methods.
.. note::
In case of multi-robot simulation, one may use `build_robots_from_log`
for building all robots at once without having the specify explicitly
the name for each of them.
:param log_data: Logged data (constants plus variables) as a dictionary.
:param mesh_path_dir: Overwrite the common root of all absolute mesh paths.
It may be necessary to read logs generated on
different environments.
:param mesh_package_dirs: Prepend custom mesh package search path
directories to the ones provided by log file. It
may be necessary to specify it to read log
generated on a different environment.
:param robot_name: Name of the robot to build from log. If `None`, then the
name will be detected automatically in case of
single-robot simulations, otherwise it will raise an
exception.
:returns: Reconstructed robot.
"""
# Extract log constants
log_constants = log_data["constants"]
# Try to infer robot names from log constants if not specified
if robot_name is None:
robot_names = []
for key in log_data["constants"].keys():
robot_name_match = re.match(r"^(\w*?)\.?robot$", key)
if robot_name_match is not None:
robot_names.append(robot_name_match.group(1))
if len(robot_names) > 1:
raise ValueError(
"In case of multi-robot simulations, the name of the robot to "
"build must be specified. Please call `build_robots_from_log` "
"to build all robots at once.")
assert robot_names
robot_name = robot_names[0]
# Get binary serialized robot data
robot_data = log_constants[".".join(filter(None, (robot_name, "robot")))]
# Load robot from binary data
return jiminy.load_from_binary(
robot_data, mesh_path_dir, mesh_package_dirs)
[docs]
def build_robots_from_log(
log_data: Dict[str, Any],
mesh_path_dir: Optional[str] = None,
mesh_package_dirs: Sequence[str] = (),
) -> Sequence[jiminy.Robot]:
"""Build all the robots in the log of the simulation.
.. note::
Internally, this function calls `build_robot_from_log` to build each
available robot. Refer to `build_robot_from_log` for more information.
:param log_data: Logged data (constants and variables) as a dictionary.
:param mesh_path_dir: Overwrite the common root of all absolute mesh paths.
It which may be necessary to read log generated on a
different environment.
:param mesh_package_dirs: Prepend custom mesh package search path
directories to the ones provided by log file. It
may be necessary to specify it to read log
generated on a different environment.
:returns: Sequence of reconstructed robots.
"""
# Try to infer robot names from log constants
robot_names = []
for key in log_data["constants"].keys():
robot_name_match = re.match(r"^(\w*?)\.?robot$", key)
if robot_name_match is not None:
robot_names.append(robot_name_match.group(1))
# Build all the robots sequentially
robots = []
for robot_name in robot_names:
robot = build_robot_from_log(
log_data, mesh_path_dir, mesh_package_dirs, robot_name=robot_name)
robots.append(robot)
return robots
[docs]
def extract_trajectory_from_log(log_data: Dict[str, Any],
robot: Optional[jiminy.Robot] = None,
*, robot_name: Optional[str] = None
) -> Trajectory:
"""Extract the minimal required information from raw log data in order to
replay the simulation in a viewer.
.. note::
It extracts the required data for replay, namely temporal evolution of:
- robot configuration: to display of the robot on the scene,
- robot velocity: to update velocity-dependent markers such as DCM,
- external forces: to update force-dependent markers.
:param log_data: Logged data (constants and variables) as a dictionary.
:param robot: Jiminy robot associated with the logged trajectory.
Optional: None by default. If None, then it will be
reconstructed from 'log_data' using `build_robot_from_log`.
:param robot_name: Name of the robot to be constructed in the log. If it
is not known, call `build_robot_from_log`.
:returns: Dictionary whose keys are the name of each robot and values are
the corresponding `Trajectory` object.
"""
# Prepare robot namespace
if robot is not None:
if robot_name is None:
robot_name = robot.name
elif robot_name != robot.name:
raise ValueError(
"The name specified in `robot_name` and the name of the robot "
"are differents.")
elif robot_name is None:
robot_name = ""
# Handling of default argument(s)
if robot is None:
robot = build_robot_from_log(log_data, robot_name=robot_name)
# Define some proxies for convenience
log_vars = log_data["variables"]
# Extract robot state data over time for all quantities available
data: Dict[str, Union[Sequence[np.ndarray], np.ndarray]] = OrderedDict()
for name in ("position",
"velocity",
"acceleration",
"effort",
"command",
"f_external",
"constraint"):
fieldnames = getattr(robot, f"log_{name}_fieldnames")
try:
data[name] = extract_variables_from_log(
log_vars, fieldnames, robot_name)
except KeyError:
data[name] = []
# Add fictitious 'universe' external force and reshape data if available
f_ext: np.ndarray = cast(np.ndarray, data["f_external"])
if len(f_ext) > 0:
f_ext = np.stack((*((np.zeros_like(f_ext[0]),) * 6), *f_ext), axis=-1)
data["f_external"] = f_ext.reshape((len(f_ext), -1, 6), order='A')
# Stack available data
for key, values in data.items():
if len(values) > 0 and not isinstance(values, np.ndarray):
data[key] = np.stack(values, axis=-1)
# Create state sequence
states = tuple(starmap(
State, zip_longest(log_vars["Global.Time"], *data.values())))
# Create the trajectory
return Trajectory(states, robot, use_theoretical_model=False)
[docs]
def extract_trajectories_from_log(
log_data: Dict[str, Any],
robots: Optional[Sequence[jiminy.Robot]] = None
) -> Dict[str, Trajectory]:
"""Extract the minimal required information from raw log data in order to
replay the simulation in a viewer.
.. note::
This function calls `extract_trajectory_from_log` to extract each
robot's trajectory. Refer to `extract_trajectory_from_log` for more
information.
:param log_data: Logged data (constants and variables) as a dictionary.
:param robots: Sequence of Jiminy robots associated with the logged
trajectory.
Optional: None by default. If None, then it will be
reconstructed from 'log_data' using `build_robots_from_log`.
:returns: Dictionary mapping each robot name to its corresponding
trajectory.
"""
# Handling of default argument(s)
if robots is None:
robots = build_robots_from_log(log_data)
# Load the trajectory associated with each robot sequentially
trajectories = {}
for robot in robots:
trajectories[robot.name] = extract_trajectory_from_log(
log_data, robot, robot_name=robot.name)
return trajectories
[docs]
def update_sensor_measurements_from_log(
log_data: Dict[str, Any],
robot: jiminy.Model
) -> Callable[[float, np.ndarray, np.ndarray], None]:
"""Helper to make it easy to emulate sensor data update based on log data.
.. note::
It returns an update_hook that can forwarding the `Viewer.replay` to
display sensor information such as contact forces for instance.
:param log_data: Logged data (constants and variables) as a dictionary.
:param robot: Jiminy robot associated with the logged trajectory.
:returns: Callable taking update time in argument and returning nothing.
Note that it does not through an exception if out-of-range, but
rather clip to desired time to the available data range.
"""
# Extract time from log
log_vars = log_data["variables"]
times = log_vars["Global.Time"]
# Filter sensors whose data is available
sensor_data_map = []
for sensor_group in robot.sensors.values():
for sensor in sensor_group:
log_sensor_name = ".".join((sensor.type, sensor.name))
try:
data = np.stack(extract_variables_from_log(
log_vars, sensor.fieldnames, log_sensor_name), axis=-1)
sensor_data_map.append((sensor, data))
except KeyError:
pass
def update_hook(t: float,
q: np.ndarray, # pylint: disable=unused-argument
v: np.ndarray # pylint: disable=unused-argument
) -> None:
nonlocal times, sensor_data_map
# Early return if no more data is available
if t > times[-1]:
return
# Get surrounding indices in log data
i = bisect_right(times, t)
i_prev, i_next = max(i - 1, 0), min(i, len(times) - 1)
# Compute linear interpolation ratio
if i_next == i_prev:
# Special case for final data point
ratio = 1.0
else:
ratio = (t - times[i_prev]) / (times[i_next] - times[i_prev])
# Update sensors data
for sensor, data in sensor_data_map:
value_prev, value_next = data[i_prev], data[i_next]
sensor.data = value_prev + (value_next - value_prev) * ratio
return update_hook