# mypy: disable-error-code="attr-defined, name-defined"
"""Generic utilities for conveniently replaying and/or recording a simulation
in a 3D viewer from telemetry log data.
"""
import os
import time
import ctypes
import logging
import pathlib
import asyncio
import tempfile
import argparse
from base64 import b64encode
from collections import deque
from types import TracebackType
from functools import wraps, partial
from threading import get_ident, Thread, Lock
from itertools import cycle, islice
from typing import (
cast, List, Dict, Deque, Any, Optional, Union, Sequence, Tuple, Callable,
Type)
import av
import numpy as np
from tqdm import tqdm
import pinocchio as pin
from .. import core as jiminy
from ..dynamics import Trajectory
from ..log import (read_log,
build_robot_from_log,
extract_trajectory_from_log,
update_sensor_measurements_from_log)
from .viewer import (COLORS,
Tuple3FType,
Tuple4FType,
CameraPoseType,
CameraMotionType,
get_default_backend,
is_display_available,
Viewer)
from .meshcat.utilities import interactive_mode
VIDEO_FRAMERATE = 30
VIDEO_QUALITY = 0.3 # [Mbytes/s]
LOGGER = logging.getLogger(__name__)
ColorType = Union[str, Tuple4FType]
class QRLock:
"""Fair (FIFO) reentrant lock.
It is similar to the built-in `threading.RLock` object that is also
reentrant but unfair.
.. seealso::
For reference: https://stackoverflow.com/a/19695878/4820605
"""
def __init__(self) -> None:
self._lock = Lock()
self._waiters: Deque = deque()
self._owner: Optional[int] = None
self._count = 0
def acquire(self) -> None:
"""Acquire a lock in a blocking way.
.. note::
If this thread already owns the lock, increment the recursion level
by one, and return immediately. Otherwise, if another thread owns
the lock, block until the lock is unlocked. Once the lock is
unlocked (not owned by any thread), then grab ownership, set the
recursion level to one, and return. If more than one thread is
blocked waiting until the lock is unlocked, only one at a time will
be able to grab ownership of the lock.
.. warning::
Unlike built-in `threading.RLock`, if multiple threads are waiting
for acquiring the lock, which one will get it next is not system
dependent. The priority is given to the first thread in queue that
waited for it.
"""
# pylint: disable=consider-using-with
thread_id = get_ident()
if self._owner == thread_id:
self._count += 1
return
self._lock.acquire()
if self._count:
new_lock = Lock()
new_lock.acquire()
self._waiters.append(new_lock)
self._lock.release()
new_lock.acquire()
self._lock.acquire()
self._owner = thread_id
self._count += 1
self._lock.release()
def release(self) -> None:
"""Release a lock, decrementing the recursion level.
.. note::
If after the decrement it is zero, reset the lock to unlocked (not
owned by any thread), and if any other threads are blocked waiting
for the lock to become unlocked, allow exactly one of them to
proceed. If after the decrement the recursion level is still
nonzero, the lock remains locked and owned by the calling thread.
.. warning::
Only call this method when the calling thread owns the lock. A
RuntimeError is raised if this method is called when the lock is
unlocked.
"""
with self._lock:
if self._owner != get_ident():
raise RuntimeError("cannot release un-acquired lock")
self._count = count = self._count - 1
if not count:
self._owner = None
if self._waiters:
self._waiters.popleft().release()
def __enter__(self) -> None:
self.acquire()
def __exit__(self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType]) -> None:
self.release()
viewer_lock = QRLock()
def _with_lock(fun: Callable[..., Any]) -> Callable[..., Any]:
@wraps(fun)
def fun_safe(*args: Any, **kwargs: Any) -> Any:
with viewer_lock:
return fun(*args, **kwargs)
return fun_safe
[docs]
@_with_lock
def play_trajectories(
trajectories: Union[ # pylint: disable=unused-argument
Trajectory, Sequence[Trajectory]],
update_hooks: Optional[Union[
Callable[[float, np.ndarray, np.ndarray], None],
Sequence[Optional[Callable[[float, np.ndarray, np.ndarray], None]]]
]] = None,
time_interval: Union[np.ndarray, Tuple[float, float]] = (0.0, np.inf),
speed_ratio: float = 1.0,
xyz_offsets: Optional[
Union[Tuple3FType, Sequence[Optional[Tuple3FType]]]] = None,
robots_colors: Optional[
Union[ColorType, Sequence[Optional[ColorType]]]] = None,
camera_pose: Optional[CameraPoseType] = None,
enable_travelling: bool = False,
camera_motion: Optional[CameraMotionType] = None,
watermark_fullpath: Optional[str] = None,
legend: Optional[Union[str, Sequence[str]]] = None,
enable_clock: bool = False,
display_com: Optional[bool] = None,
display_dcm: Optional[bool] = None,
display_contacts: Optional[bool] = None,
display_f_external: Optional[Union[Sequence[bool], bool]] = None,
scene_name: str = 'world',
record_video_path: Optional[str] = None,
record_video_size: Optional[Tuple[int, int]] = None,
start_paused: bool = False,
backend: Optional[str] = None,
delete_robot_on_close: Optional[bool] = None,
remove_widgets_overlay: bool = True,
close_backend: bool = False,
viewers: Optional[Union[Viewer, Sequence[Optional[Viewer]]]] = None,
verbose: bool = True,
**kwargs: Any) -> Sequence[Viewer]:
"""Replay one or several robot trajectories in a viewer.
The ratio between the replay and the simulation time is kept constant to
the desired ratio. One can choose between several backend ('panda3d' or
'meshcat').
.. note::
Replay speed is independent of the platform (windows, linux...) and
available CPU power.
:param trajectories: List of trajectories.
:param update_hooks: Callables associated with each robot that can be used
to update non-kinematic robot data, for instance to
emulate sensors data from log using the hook provided
by `update_sensor_measurements_from_log` method.
`None` to disable, otherwise it must have signature:
.. code-block:: python
f(t: float, q: ndarray, v: ndarray) -> None
Optional: None by default.
:param time_interval: Replay only timesteps in this interval of time.
It does not have to be finite.
Optional: [0, inf] by default.
:param speed_ratio: Speed ratio of the simulation.
Optional: 1.0 by default.
:param xyz_offsets: List of constant position of the root joint for each
robot in world frame. None to disable.
Optional: None by default.
:param robots_colors: List of RGBA codes or named colors defining the color
for each robot. It will be applied to every link.
None to disable.
Optional: Original color if single robot, default
color cycle otherwise.
:param camera_pose: Tuple position [X, Y, Z], rotation [Roll, Pitch, Yaw],
frame name/index specifying the initial pose of the
camera or the relative pose wrt the tracked frame
depending on whether travelling is enabled. `None` to
disable.
Optional: None by default.
:param enable_travelling: Whether the camera tracks the robot associated
with the first trajectory specified in
`trajectories`. `None` to disable.
Optional: Disabled by default.
:param camera_motion: Camera breakpoint poses over time, as a list of
`CameraMotionBreakpointType` dict. None to disable.
Optional: None by default.
:param watermark_fullpath: Add watermark to the viewer. It is not
persistent but disabled after replay. This
option is only supported by Meshcat backend.
None to disable.
Optional: No watermark by default.
:param legend: List of labels defining the legend for each robot. It is not
persistent but disabled after replay. None to disable.
Optional: No legend by default for a single robot without
color, the name of each robot otherwise.
:param enable_clock: Add clock on bottom right corner of the viewer.
Only available with 'panda3d' rendering backend.
Optional: Disabled by default.
:param display_com: Whether to display the center of mass. `None` to keep
current viewers' settings, if any.
Optional: Enabled by default iif `viewers` is `None`,
and backend is 'panda3d'.
:param display_dcm: Whether to display the capture point (also called DCM).
`None to keep current viewers' settings.
Optional: Enabled by default iif `viewers` is `None`,
and backend is 'panda3d'.
:param display_contacts: Whether to display the contact forces. Note that
the user is responsible for updating sensors data
via `update_hooks`. `None` to keep current
viewers' settings.
Optional: Enabled by default iif `update_hooks` is
specified, `viewers` is `None`, and backend is
'panda3d'.
:param display_f_external: Whether to display the external external forces
applied at the joints on the robot. If a boolean
is provided, the same visibility will be set for
each joint, alternatively one can provide a
boolean list whose ordering is consistent with
`pinocchio_model.names`. Note that the user is
responsible for updating the force buffer
`viewer.f_external` via `update_hooks`. `None`
to keep current viewers' settings.
Optional: `None` by default.
:param scene_name: Name of viewer's scene in which to display the robot.
Optional: Common default name if omitted.
:param record_video_path: Fullpath location where to save generated video.
It must be specified to enable video recording.
Meshcat only support 'webm' format, while the
other renderer only supports 'mp4' format encoded
with web-compatible 'h264' codec.
Optional: None by default.
:param record_video_size: The width and height of the video recording.
Optional: (800, 800) if non-interactive and
(800, 400) otherwise by default.
:param start_paused: Start the simulation is pause, waiting for keyboard
input before starting to play the trajectories.
Only available if `record_video_path` is None.
Optional: False by default.
:param backend: Backend, one of 'meshcat', 'panda3d' and'panda3d-sync. If
`None`, the most appropriate backend will be selected
automatically based on hardware and python environment.
Optional: `None` by default.
:param delete_robot_on_close: Whether to delete the robot from the viewer
when closing it.
Optional: True by default for Panda3d backend
in non-interactive mode, False otherwise.
:param remove_widgets_overlay: Remove overlay (legend, watermark, clock,
...) automatically before returning.
Optional: Enabled by default.
:param close_backend: Whether to close backend automatically before
returning.
Optional: Disabled by default.
:param viewers: List of already instantiated viewers, associated one by one
in order to each trajectory data. None to disable.
Optional: None by default.
:param verbose: Add information to keep track of the process.
Optional: True by default.
:param kwargs: Unused keyword arguments to allow chaining rendering
methods with ease.
:returns: List of viewers used to play the trajectories.
"""
# Make sure sequence arguments are list or tuple
if isinstance(trajectories, Trajectory):
trajectories = [trajectories]
ntrajs = len(trajectories)
if update_hooks is None:
update_hooks = [None] * ntrajs
if callable(update_hooks):
update_hooks = [update_hooks]
if isinstance(viewers, Viewer):
viewers = [viewers]
elif not viewers:
viewers = None
# Make sure the viewers are still running if specified
if not Viewer.is_alive():
viewers = None
if viewers is not None:
viewers = list(viewers)
for i, viewer in enumerate(viewers):
if (viewer is not None and
not viewer.is_open()): # type: ignore[misc]
viewers[i] = None
break
if all(viewer is None for viewer in viewers):
viewers = None
# Pick the default backend if unspecified and none is already running. Note
# that repeatedly switching between "panda3d-sync" and "panda3d" backends
# is causing segfaults. See https://github.com/panda3d/panda3d/issues/1372.
if backend is None:
if Viewer.is_alive():
backend = Viewer.backend
# elif record_video_path is not None:
# backend = "panda3d-sync"
else:
backend = get_default_backend()
assert backend is not None
# Always close viewers if gui is open if necessary for efficiency
if (backend.startswith("panda3d") and record_video_path is not None and
Viewer.has_gui()):
Viewer.close()
viewers = None
# Create a temporary video if the backend is 'panda3d-sync', no
# 'record_video_path' is provided, and running in interactive mode
# with HTML rendering support. Then load it in running cell.
record_video_html_embedded = (
record_video_path is None and
backend == "panda3d-sync" and interactive_mode() >= 2)
if record_video_html_embedded:
fd, record_video_path = tempfile.mkstemp(suffix='.mp4')
os.close(fd)
# Make sure it is possible to replay something
if backend == "panda3d-sync" and record_video_path is None:
raise RuntimeError(
"Impossible to replay simulation using 'panda3d-sync' backend. "
"Please set 'record_video_path' to save it as a file.")
# Set default video recording size
if record_video_size is None:
record_video_size = (800, 400 if record_video_html_embedded else 800)
# Delete robot by default only if not in interactive viewer
if delete_robot_on_close is None:
delete_robot_on_close = (
backend.startswith('panda3d') and interactive_mode() < 2)
# Handling of default options if no viewer is available
if viewers is None and backend.startswith('panda3d'):
# Check whether at least one of the robots has a freeflyer
has_freeflyer = False
for trajectory in trajectories:
robot = trajectory.robot
assert robot is not None
if robot.has_freeflyer:
has_freeflyer = True
break
# Handling of default display of CoM, DCM and contact forces
if display_com is None:
display_com = has_freeflyer
if display_dcm is None:
display_dcm = has_freeflyer
if display_contacts is None:
display_contacts = all(fun is not None for fun in update_hooks)
# Make sure it is possible to display contacts if requested
if display_contacts:
for trajectory in trajectories:
robot = trajectory.robot
assert robot is not None
if robot.is_locked:
LOGGER.debug(
"`display_contacts` is not available if robot is locked. "
"Please stop any running simulation before replay.")
display_contacts = False
break
# Sanitize user-specified robot offsets
if xyz_offsets is None:
xyz_offsets = ntrajs * (None,)
elif len(xyz_offsets) != ntrajs:
assert isinstance(xyz_offsets[0], float)
xyz_offsets = np.tile(
xyz_offsets, (ntrajs, 1)) # type: ignore[arg-type]
assert xyz_offsets is not None
# Sanitize user-specified robot colors
if robots_colors is None:
if ntrajs == 1:
robots_colors = (None,)
else:
robots_colors = list(islice(
cycle(COLORS.values()), ntrajs))
elif isinstance(robots_colors, str) or isinstance(robots_colors[0], float):
robots_colors = [robots_colors] # type: ignore[list-item]
assert len(robots_colors) == ntrajs
# Sanitize user-specified legend
if legend is not None and isinstance(legend, str):
legend = [legend]
# Make sure the viewers instances are consistent with the trajectories
if viewers is None:
viewers = [None for _ in trajectories]
assert len(viewers) == ntrajs
# Instantiate or refresh viewers if necessary
for i, (viewer, trajectory, color) in enumerate(zip(
viewers, trajectories, robots_colors)):
# Extract robot from trajectory
robot = trajectory.robot
assert robot is not None
# Create new viewer instance if necessary, and load the robot in it
if viewer is None:
uniq_id = next(tempfile._get_candidate_names())
robot_name = f"{uniq_id}_robot_{i}"
use_theoretical_model = trajectory.use_theoretical_model
viewer = Viewer(
robot,
use_theoretical_model=use_theoretical_model,
robot_color=color,
robot_name=robot_name,
backend=backend,
scene_name=scene_name,
delete_robot_on_close=delete_robot_on_close,
open_gui_if_parent=record_video_path is None)
viewers[i] = viewer
# Reset the configuration of the robot
if trajectory.use_theoretical_model:
model = robot.pinocchio_model_th
else:
model = robot.pinocchio_model
viewer.display(pin.neutral(model))
# Reset robot model in viewer if requested color has changed.
# `_setup` is called instead of `set_color` because the latter is not
# supported by meshcat.
if color is not None and color != viewer.robot_color:
viewer._setup(robot, color)
# Assert(s) for type checker
assert all(viewers)
viewers = cast(List[Viewer], viewers)
assert Viewer._backend_obj is not None
# Add default legend with robots names if replaying multiple trajectories
if all(color is not None for color in robots_colors) and legend is None:
legend = [viewer.robot_name for viewer in viewers]
# Make sure clock is only enabled for panda3d backend
if enable_clock and not backend.startswith('panda3d'):
LOGGER.warning(
"`enable_clock` is only available with 'panda3d' backend.")
enable_clock = False
# Early return if nothing to replay
if all(not trajectory.states for trajectory in trajectories):
LOGGER.debug("Nothing to replay.")
return viewers
# Enable camera motion if requested
if camera_motion is not None:
Viewer.register_camera_motion(camera_motion)
# Handle meshcat-specific options
if legend is not None:
try:
Viewer.set_legend(legend)
except ImportError:
LOGGER.warning(
"Impossible to add legend. Please install 'jiminy_py[plot]'.")
legend = None
# Add watermark if requested
if watermark_fullpath is not None:
Viewer.set_watermark(watermark_fullpath)
# Make sure the time interval is valid
if time_interval[1] < time_interval[0]:
raise ValueError("Time interval must be non-empty and positive.")
# Initialize robot and viewer configuration before any further processing
for viewer, trajectory, offset in zip(viewers, trajectories, xyz_offsets):
try:
state = trajectory.get(time_interval[0], mode='clip')
except RuntimeError:
pass
else:
viewer.display(state.q, state.v, offset)
for f_ext in viewer.f_external:
f_ext.vector[:] = 0.0
if backend.startswith('panda3d'):
if display_com is not None:
viewer.display_center_of_mass(display_com)
if display_dcm is not None:
viewer.display_capture_point(display_dcm)
if display_contacts is not None:
viewer.display_contact_forces(display_contacts)
if display_f_external is not None:
viewer.display_external_forces(display_f_external)
# Set camera pose or activate camera travelling if requested
viewer, robot = viewers[0], trajectories[0].robot
if enable_travelling:
position, rotation, relative = None, None, None
if camera_pose is not None:
position, rotation, relative = camera_pose
if relative is None:
# Track the first actual frame by default (0: world, 1: root_joint)
assert robot is not None
if not robot.has_freeflyer:
raise ValueError(
"Enabling travelling requires `camera_pose` to specify at "
"least the relative frame to track if the first robot has "
"no freeflyer.")
relative = 2
viewer.attach_camera(relative, position, rotation)
elif camera_pose is not None:
viewer.set_camera_transform(*camera_pose)
# Wait for the meshes to finish loading if video recording is disable
if record_video_path is None:
if backend == 'meshcat':
if verbose and interactive_mode() < 2:
print("Waiting for meshcat client in browser to connect: "
f"{Viewer._backend_obj.gui.url()}")
Viewer.wait(require_client=True)
if verbose and interactive_mode() < 2:
print("Browser connected! Replaying simulation...")
# Replay the trajectory
if record_video_path is not None:
# Extract and resample trajectory data at fixed framerate
time_max = time_interval[0]
for trajectory in trajectories:
if trajectory.has_data:
_, t_end = trajectory.time_interval
time_max = max(time_max, t_end)
time_max = min(time_max, time_interval[1])
# Initialize video recording
if backend == 'meshcat':
# Sanitize the recording path to enforce '.webm' extension
record_video_path = str(
pathlib.Path(record_video_path).with_suffix('.webm'))
# Start backend recording thread
Viewer._backend_obj.start_recording(
VIDEO_FRAMERATE, *record_video_size)
else:
# Sanitize the recording path to enforce '.mp4' extension
record_video_path = str(
pathlib.Path(record_video_path).with_suffix('.mp4'))
# Create ffmpeg video writer
out = av.open(record_video_path, mode='w')
out.metadata['title'] = scene_name
stream = out.add_stream('libx264', rate=VIDEO_FRAMERATE)
stream.width, stream.height = record_video_size
stream.pix_fmt = 'yuv420p'
stream.bit_rate = VIDEO_QUALITY * (8 * 1024 ** 2)
stream.options = {"preset": "veryfast", "tune": "zerolatency"}
# Create frame storage
frame = av.VideoFrame(*record_video_size, 'rgb24')
frame_bytes = memoryview(frame.planes[0])
# Add frames to video sequentially
update_hook_t = None
time_global = np.arange(
time_interval[0], time_max + 1e-10, speed_ratio / VIDEO_FRAMERATE)
for t in tqdm(
time_global, desc="Rendering frames",
disable=(not verbose and not record_video_html_embedded)):
try:
# Loop over all trajectories
for viewer, trajectory, xyz_offset, update_hook in zip(
viewers, trajectories, xyz_offsets, update_hooks):
# Skip empty trajectories
assert viewer is not None
if not trajectory.has_data:
continue
# Compute robot state at current time
state = trajectory.get(t, mode='clip')
# Update viewer state
if state.f_external is not None:
for f_ref, f_i in zip(
viewer.f_external, state.f_external[1:]):
f_ref.vector[:] = f_i
if update_hook is not None:
update_hook_t = partial(
update_hook, t, state.q, state.v)
viewer.display(state.q, state.v, xyz_offset, update_hook_t)
# Update clock if enabled
if enable_clock:
Viewer.set_clock(t)
# Add frame to video
if backend == 'meshcat':
Viewer._backend_obj.add_frame()
else:
# Update frame.
# Note that `capture_frame` is by far the main bottleneck
# of the whole recording process (~75% on discrete gpu).
buffer = Viewer.capture_frame(*record_video_size)
frame_bytes[:] = buffer.reshape(-1).data
# Write frame
for packet in stream.encode(frame):
out.mux(packet)
except (KeyboardInterrupt, RuntimeError):
# RuntimeError would be raised if the backend got closed during
# recording, which typically happens when terminating Python
# forcibly during async recording.
break
# Finalize video recording
if backend == 'meshcat':
# Stop backend recording thread
assert record_video_path is not None
Viewer._backend_obj.stop_recording(record_video_path)
else:
# Flush and close recording file
for packet in stream.encode(None):
out.mux(packet)
out.close()
else:
# Make sure a gui is opened
Viewer.open_gui()
# Handle start-in-pause mode
if start_paused:
if interactive_mode() < 2:
input("Press Enter to continue...")
if not Viewer.is_alive():
return viewers
else:
LOGGER.warning("Start paused is disabled in interactive mode.")
# Play trajectories with multithreading
def replay_thread(viewer: Viewer, *args: Any) -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
viewer.replay(*args)
threads = []
for viewer, trajectory, xyz_offset, update_hook in zip(
viewers, trajectories, xyz_offsets, update_hooks):
threads.append(Thread(
target=replay_thread,
args=(viewer,
trajectory,
time_interval,
speed_ratio,
xyz_offset,
update_hook,
enable_clock),
daemon=True))
for thread in threads:
thread.start()
for thread in threads:
try:
thread.join()
except KeyboardInterrupt:
assert thread.ident is not None
ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), ctypes.py_object(SystemExit))
if Viewer.is_alive():
# Disable camera travelling and camera motion if it was enabled
if enable_travelling:
Viewer.detach_camera()
if camera_motion is not None:
Viewer.remove_camera_motion()
if remove_widgets_overlay:
# Disable legend if it was enabled
if legend is not None:
Viewer.set_legend()
# Disable watermark if it was enabled
if watermark_fullpath is not None:
Viewer.set_watermark()
if enable_clock:
Viewer.set_clock()
# Close backend if requested
if close_backend:
Viewer.close()
# Show video if temporary
if record_video_html_embedded:
# pylint: disable=import-outside-toplevel,no-name-in-module
from IPython.core.display import HTML, display
assert record_video_path is not None
video_base64 = b64encode(open(record_video_path, 'rb').read()).decode()
os.remove(record_video_path)
display(HTML(f"""
<video controls style="
height: 400px; width: 100%; overflow: hidden;">
<source src="data:video/mp4;base64,{video_base64}" type="video/mp4">
</video>
"""))
return viewers
[docs]
def play_logs_data(
logs_data: Union[
Sequence[Dict[str, np.ndarray]], Dict[str, np.ndarray]],
robots: Optional[Union[
Sequence[Optional[jiminy.Robot]], jiminy.Robot]] = None,
**kwargs: Any) -> Sequence[Viewer]:
"""Play log data in a viewer.
This method simply formats the data then calls `play_trajectories`.
:param logs_data: Either a single dictionary, or a list of dictionaries of
simulation data log.
:param robots: Either a single robot, or a list of robot for each log data.
:param kwargs: Keyword arguments to forward to `play_trajectories` method.
"""
# Reformat input arguments as lists
if isinstance(logs_data, dict):
logs_data = [logs_data]
if isinstance(robots, jiminy.Robot):
robots = [robots]
elif robots is None:
robots = [None,] * len(logs_data)
# Extract a replay data for `play_trajectories` for each pair (robot, log)
trajectories, update_hooks, extra_kwargs = [], [], {}
for log_data, robot in zip(logs_data, robots):
trajectory, update_hook, _kwargs = \
extract_replay_data_from_log(log_data, robot)
trajectories.append(trajectory)
update_hooks.append(update_hook)
extra_kwargs.update(_kwargs)
# Do not display external forces by default if replaying several trajectory
if len(trajectories) > 1:
extra_kwargs.pop("display_f_external", None)
# Finally, play the trajectories
return play_trajectories(
trajectories, update_hooks, **{**extra_kwargs, **kwargs})
[docs]
def play_logs_files(logs_files: Union[str, Sequence[str]],
mesh_path_dir: Optional[str] = None,
mesh_package_dirs: Sequence[str] = (),
**kwargs: Any) -> Sequence[Viewer]:
"""Play the content of a logfile in a viewer.
This method reconstruct the exact extended model used during the simulation
associated with the logfile, including random biases or flexibility joints.
:param logs_files: Either a single simulation log files in any format, or
a list.
:param mesh_path_dir: Overwrite the common root of all absolute mesh paths.
It which may be necessary to read log generated on a
different environment.
:param mesh_package_dirs: Additional search paths for all relative mesh
paths beginning with 'packages://' directive. It
may be necessary to specify it to read log
generated on a different environment.
:param kwargs: Keyword arguments to forward to `play_trajectories` method.
"""
# Reformat as list
if isinstance(logs_files, str):
logs_files = [logs_files]
# Extract log data and build robot for each log file
robots, logs_data = [], []
for log_file in logs_files:
log_data = read_log(log_file)
robot = build_robot_from_log(
log_data, mesh_path_dir, mesh_package_dirs)
logs_data.append(log_data)
robots.append(robot)
# Default legend if several log files are provided
if "legend" not in kwargs and len(logs_files) > 1:
kwargs["legend"] = [pathlib.Path(log_file).stem.split('_')[-1]
for log_file in logs_files]
# Forward arguments to lower-level method
return play_logs_data(logs_data, robots, **kwargs)
[docs]
def async_play_and_record_logs_files(
logs_files: Union[str, Sequence[str]],
enable_replay: Optional[bool] = None,
mesh_path_dir: Optional[str] = None,
mesh_package_dirs: Sequence[str] = (),
**kwargs: Any) -> Optional[Thread]:
"""Play and/or replay the content of a logfile in a viewer asynchronously.
.. note::
This call can be made blocking at any point in time by calling `join()`
on the returned `Thread` instance.
:param logs_files: Simulation log files in any of the supported formats.
:param enable_replay: Whether to force replay the simulation. It would
first replay then record if this option is enabled
and `record_video_path` is specified.
Optional: True by default if `record_video_path` is
not specified and the current backend supports
onscreen rendering, False otherwise.
:param mesh_path_dir: Overwrite the common root of all absolute mesh paths.
It which may be necessary to read log generated on a
different environment.
:param mesh_package_dirs: Prepend custom mesh package search path
directories to the ones provided by log file.
:param kwargs: Keyword arguments to forward to `play_logs_files` method.
"""
# Handling of default argument(s)
enable_recording = "record_video_path" in kwargs
if enable_replay is None:
enable_replay = not enable_recording and (
(Viewer.backend or get_default_backend()) != "panda3d-sync" or
interactive_mode() >= 2)
# Disable replay if not available and video recording is requested
if enable_replay and not is_display_available():
LOGGER.warning("No display available. Disabling replay.")
enable_replay = False
# Nothing to do. Silently returning early.
if not enable_recording and not enable_replay:
return None
# Define method to pass to threading
def _locked_play_and_record(lock: QRLock,
logs_files: Sequence[str],
mesh_path_dir: Optional[str],
mesh_package_dirs: Sequence[str],
enable_replay: bool,
**kwargs: Any) -> None:
"""A lock is used to force waiting for the current evaluation to finish
before starting a new one, otherwise it will crash because of request
collisions, and it is undesirable anyway.
The viewer must be closed after replay if recording is requested,
otherwise the graphical window will dramatically slowdown rendering.
"""
close_backend = kwargs.get("close_backend", None)
record_video_path = kwargs.pop("record_video_path", None)
with lock:
if close_backend:
# Make sure there is no viewer already open at this point.
# Otherwise adding the legend will fail if some robots have not
# been deleted from the scene.
Viewer.close()
if enable_replay:
try:
play_logs_files(
logs_files, mesh_path_dir, mesh_package_dirs, **kwargs)
except RuntimeError as e:
# Replay may fail if current backend does not support it
LOGGER.warning(
"The current viewer backend '%s' does not support "
"replaying simulation: %s", Viewer.backend, e)
if record_video_path is not None:
play_logs_files(
logs_files, mesh_path_dir, mesh_package_dirs,
record_video_path=record_video_path, **kwargs)
if close_backend:
# Honor close backend at exit but not between replay and record
Viewer.close()
# Start replay and record thread
thread = Thread(
target=_locked_play_and_record,
args=(
viewer_lock, logs_files, mesh_path_dir, mesh_package_dirs,
enable_replay),
kwargs={
**dict(
close_backend=(enable_replay and enable_recording),
verbose=False),
**kwargs, **dict(
delete_robot_on_close=True)},
daemon=True)
thread.start()
return thread
def _play_logs_files_entrypoint() -> None:
"""Command-line entrypoint to replay the content of a logfile in a viewer.
"""
# Parse command-line arguments
parser = argparse.ArgumentParser(description=(
"Replay Jiminy simulation log files in a viewer. Multiple files "
"can be specified for simultaneous display"))
parser.add_argument(
'-p', '--start_paused', action='store_true',
help="Start in pause, waiting for keyboard input.")
parser.add_argument(
'-s', '--speed_ratio', type=float, default=1.0,
help="Real time to simulation time factor.")
parser.add_argument(
'-t', '--travelling', action='store_true',
help=("Whether to track the root frame of the first robot, "
"assuming the robot has a freeflyer."))
parser.add_argument(
'-b', '--backend', default='panda3d',
help="Display backend ('panda3d' or 'meshcat').")
parser.add_argument(
'-m', '--mesh_path_dir', default=None,
help="Fullpath location of mesh directory.")
parser.add_argument(
'-v', '--record_video_path', default=None,
help="Fullpath location where to save generated video.")
options, files = parser.parse_known_args()
kwargs = vars(options)
kwargs['logs_files'] = files
# Map argument name(s)
kwargs['enable_travelling'] = kwargs.pop('travelling')
# Replay trajectories
repeat = True
viewers = None
while repeat:
viewers = play_logs_files(
viewers=viewers,
**{**dict(
remove_widgets_overlay=False),
**kwargs})
kwargs["start_paused"] = False
kwargs.setdefault("camera_pose", None)
if kwargs["record_video_path"] is None:
while True:
reply = input("Do you want to replay again (y/[n])?").lower()
if not reply or reply in ("y", "n"):
break
repeat = reply == "y"
else:
repeat = False
# Do not exit method as long as a graphical window is open
while Viewer.has_gui():
time.sleep(0.5)