# mypy: disable-error-code="attr-defined, name-defined"
"""This module provides a backend-agnostic 3D viewer on top of lower-level
implementations.
This viewer can be used to render the state of a robot, including sensor data
and external forces if any. It is mainly used to render the current simulation
state or replay multiple complete trajectories as-posteriori.
"""
import os
import re
import io
import sys
import uuid
import time
import math
import shutil
import base64
import logging
import pathlib
import tempfile
import subprocess
import webbrowser
import multiprocessing
from urllib.request import urlopen
from functools import wraps, partial
from threading import RLock
from multiprocessing import Process as ProcessMP
from typing import (
Optional, Union, Sequence, Tuple, Dict, Callable, Any, TypedDict, Type,
Set, overload, cast)
import numpy as np
from panda3d_viewer.viewer_errors import ViewerError, ViewerClosedError
try:
from psutil import Process
except ImportError:
Process = type(None) # type: ignore[assignment,misc]
import hppfcl
import pinocchio as pin
from pinocchio import SE3, SE3ToXYZQUAT
from pinocchio.rpy import ( # pylint: disable=import-error
rpyToMatrix, matrixToRpy)
from .. import core as jiminy
from ..core import ( # pylint: disable=no-name-in-module
ContactSensor,
discretize_heightmap)
from ..robot import _DuplicateFilter
from ..dynamics import Trajectory
from .meshcat.utilities import interactive_mode
from .panda3d.panda3d_visualizer import (
Tuple3FType, Tuple4FType, ShapeType, Panda3dApp, Panda3dViewer,
Panda3dVisualizer, convert_bvh_collision_geometry_to_primitive)
REPLAY_FRAMERATE = 30
CAMERA_INV_TRANSFORM_PANDA3D = rpyToMatrix(-np.pi/2, 0.0, 0.0)
CAMERA_INV_TRANSFORM_MESHCAT = rpyToMatrix(-np.pi/2, 0.0, 0.0)
DEFAULT_CAMERA_XYZRPY_ABS = ((7.5, 0.0, 1.4), (1.4, 0.0, np.pi/2))
DEFAULT_CAMERA_XYZRPY_REL = ((4.1, -4.1, 0.85), (1.35, 0.0, 0.8))
DEFAULT_WATERMARK_MAXSIZE = (150, 150)
# Fz force value corresponding to capsule's length of 1m
CONTACT_FORCE_SCALE = 10000.0 # [N]
EXTERNAL_FORCE_SCALE = 800.0 # [N]
COLORS = {'red': (0.85, 0.2, 0.2, 1.0),
'blue': (0.35, 0.5, 1.0, 1.0),
'green': (0.4, 0.7, 0.3, 1.0),
'yellow': (0.9, 0.8, 0.0, 1.0),
'purple': (0.65, 0.4, 1.0, 1.0),
'orange': (0.95, 0.6, 0.0, 1.0),
'pink': (0.9, 0.4, 0.7, 1.0),
'grey': (0.7, 0.7, 0.7, 1.0),
'cyan': (0.1, 0.8, 1.0, 1.0),
'white': (0.95, 0.95, 0.95, 1.0),
'black': (0.45, 0.45, 0.5, 1.0)}
LOGGER = logging.getLogger(__name__)
LOGGER.addFilter(_DuplicateFilter())
@overload
def interp1d(t_in: np.ndarray,
y_in: np.ndarray,
t_out: float) -> float:
...
@overload
def interp1d(t_in: np.ndarray,
y_in: np.ndarray,
t_out: np.ndarray) -> np.ndarray:
...
def interp1d(t_in: np.ndarray,
y_in: np.ndarray,
t_out: Union[float, np.ndarray]) -> Union[float, np.ndarray]:
"""One-dimensional linear interpolation for monotonically increasing sample
points.
:param t_in: The time of the input data points, must be increasing.
:param y_in: The value of the data points, same length as `t_in`.
:param t_out: The time at which to evaluate the interpolated values.
.. seealso::
This method generalizes `numpy.interp`. It is similar to
`scipy.interpolate.interp1d`.
"""
i = -1
is_scalar = isinstance(t_out, float)
if is_scalar:
t_out = np.array([t_out])
assert not isinstance(t_out, float)
y_out = np.empty((t_out.size, y_in.shape[-1]))
for t, y in zip(t_out, y_out):
while i < t_in.size - 1 and t_in[i + 1] < t:
i += 1
if 0 <= i < t_in.size - 1:
ratio = (t - t_in[i]) / (t_in[i + 1] - t_in[i])
y[:] = ratio * y_in[i + 1] + (1.0 - ratio) * y_in[i]
elif i < 0:
y[:] = y_in[0]
else:
y[:] = y_in[t_in.size - 1]
return y_out if not is_scalar else y_out[0]
[docs]
def is_display_available() -> bool:
"""Check if graphical server is available locally for onscreen rendering
or if the viewer can be opened in an interactive cell.
"""
if interactive_mode() >= 2:
return True
if multiprocessing.current_process().daemon:
return False
if not sys.platform.startswith("linux"):
return True
if os.environ.get("DISPLAY"):
return True
return False
[docs]
def get_default_backend() -> str:
"""Determine the default backend viewer, depending eventually on the
running environment, hardware, and set of available backends.
Panda3d is preferred over Meshcat in non-interactive mode, and on Google
Colaboratory because of known latency issue making it unusable.
See https://github.com/googlecolab/colabtools/issues/2870
.. note::
Both Meshcat and Panda3d supports Nvidia EGL rendering without
graphical server. Besides, both can fallback to software rendering if
necessary, although Panda3d offers only very limited support of it.
"""
mode = interactive_mode()
if mode >= 2:
if mode == 2:
try:
get_backend_type('meshcat')
return 'meshcat'
except ImportError:
pass
return 'panda3d-sync'
if is_display_available():
return os.getenv('JIMINY_VIEWER_DEFAULT_BACKEND', 'panda3d')
return 'panda3d-sync'
def get_backend_type(backend_name: str) -> type:
"""Return backend entry-point from its name if available.
.. note::
It is a function because otherwise it would only be run once at
import by the main thread only.
"""
# pylint: disable=import-outside-toplevel,unused-import
if backend_name == "panda3d-sync":
return Panda3dVisualizer
if multiprocessing.current_process().daemon:
raise RuntimeError(
"Please use backend 'panda3d-sync' in daemon thread.")
if backend_name == "panda3d":
if interactive_mode() >= 2:
raise ImportError(
"Synchronous 'panda3d' backend is disabled in interactive "
"mode. Consider using asynchronous 'panda3d-sync' instead.")
return Panda3dVisualizer
if backend_name == "panda3d-qt":
try:
from .panda3d.panda3d_widget import Panda3dQWidget # noqa: F401
return Panda3dVisualizer
except ImportError as e:
raise ImportError(
"'panda3d-qt' backend is not available. Please install either "
"'pyqt5' or 'pyside6'.") from e
if backend_name == "meshcat":
try:
from PIL import Image # noqa: F401
from .meshcat.wrapper import MeshcatWrapper # noqa: F401
from .meshcat.meshcat_visualizer import MeshcatVisualizer
return MeshcatVisualizer
except ImportError as e:
raise ImportError(
"'meshcat' backend is not available. Please install "
"'jiminy[meshcat]'.") from e
raise ValueError(f"Unknown backend '{backend_name}'.")
if sys.version_info >= (3, 11):
def sleep(dt: float) -> None:
"""Function to provide cross-platform time sleep with maximum accuracy.
.. note::
It returns immediately instead of raising an exception if the
input time is negative.
.. note::
Wrapper around the standard method 'time.sleep' which provides
high-precision cross-platform sleep method since Python>=3.11.
:param dt: Sleep duration in seconds.
"""
# Early return if already too late
if dt < 0.0:
return
time.sleep(dt)
else:
# Timer jitter estimate
TIMER_JITTER = 1e-2 if sys.platform.startswith('win') else 1e-3
[docs]
def sleep(dt: float) -> None:
"""Function to provide cross-platform time sleep with maximum accuracy.
.. warning::
Use this method with cautious since it relies on busy looping
principle instead of system scheduler. As a result, it wastes a lot
more resources than time.sleep. However, it is the only way to
ensure accurate delay on Python<3.11
:param dt: Sleep duration in seconds.
"""
# Combine busy loop and timer to release the GIL periodically
t_end = time.perf_counter() + dt
while time.perf_counter() < t_end:
if t_end - time.perf_counter() > TIMER_JITTER:
time.sleep(1e-3)
def get_color_code(
color: Optional[Union[str, Tuple4FType]]) -> Optional[Tuple4FType]:
"""Sanitize color codes.
All it does is converting color codes that has been specified through
pre-defined names into their corresponding 4-tuple (R, G, B, A), where each
component is a floating-point scalar normalized in range [0.0, 1.0].
"""
if isinstance(color, str):
try:
return COLORS[color]
except KeyError as e:
colors_str = ', '.join(f"'{e}'" for e in COLORS.keys())
raise ValueError(
f"Color '{color}' not available. Use a custom (R,G,B,A) "
f"code, or a predefined named color ({colors_str}).") from e
return color
def _must_be_open(fun: Callable[..., Any]) -> Callable[..., Any]:
@wraps(fun)
def fun_safe(*args: Any, **kwargs: Any) -> Any:
self: Union[Viewer, Type[Viewer]] = Viewer
if args and isinstance(args[0], Viewer):
self = args[0]
self = kwargs.get('self', self)
if not self.is_open():
raise RuntimeError(
"No backend available. Please start one before calling "
f"'{fun.__name__}'.")
return fun(*args, **kwargs)
return fun_safe
def _with_lock(fun: Callable[..., Any]) -> Callable[..., Any]:
@wraps(fun)
def fun_safe(*args: Any, **kwargs: Any) -> Any:
self: Union[Viewer, Type[Viewer]] = Viewer
if args and isinstance(args[0], Viewer):
self = args[0]
self = kwargs.get('self', self)
with self._lock:
return fun(*args, **kwargs)
return fun_safe
def _is_async(fun: Callable[..., Any]) -> Callable[..., Any]:
@wraps(fun)
def fun_safe(*args: Any, **kwargs: Any) -> Any:
if Viewer.backend == 'panda3d':
assert Viewer._backend_obj is not None
with Viewer._backend_obj.gui.async_mode():
return fun(*args, **kwargs)
return fun(*args, **kwargs)
return fun_safe
class _ProcessWrapper:
"""Wrap `multiprocessing.Process`, `subprocess.Popen`, and `psutil.Process`
and `Panda3dApp` in the same object to provide unified API.
"""
def __init__(self,
proc: Union[
ProcessMP, subprocess.Popen, Process, Panda3dApp]):
self._proc = proc
def __del__(self) -> None:
"""Automatically kill process at garbage collection.
"""
self.kill()
def is_parent(self) -> bool:
"""Whether the wrapped process is a child of the main python process.
"""
return not isinstance(self._proc, Process)
def is_alive(self) -> bool:
"""Whether the wrapped process is running or idle, but not terminated
nor a zombie yet.
"""
if isinstance(self._proc, ProcessMP):
return self._proc.is_alive()
if isinstance(self._proc, subprocess.Popen):
return self._proc.poll() is None
if isinstance(self._proc, Process):
import psutil # pylint: disable=import-outside-toplevel
try:
return self._proc.status() in (
psutil.STATUS_RUNNING, psutil.STATUS_SLEEPING)
except psutil.NoSuchProcess:
return False
# if isinstance(self._proc, Panda3dApp):
return hasattr(self._proc, 'win')
def wait(self, timeout: float) -> None:
"""Wait until the wrapped process finish what its current task or
timeout is reached.
"""
if isinstance(self._proc, ProcessMP):
self._proc.join(timeout)
if isinstance(self._proc, (subprocess.Popen, Process)):
self._proc.wait(timeout)
if isinstance(self._proc, Panda3dApp):
self._proc.step()
def kill(self) -> None:
"""If wrapped process if it is a child of the main python process and
it is still alive that the time being.
"""
if self.is_parent() and self.is_alive():
if isinstance(self._proc, Panda3dApp):
self._proc.destroy()
else:
# Try to terminate cleanly
self._proc.terminate()
try:
self.wait(timeout=1.0)
except (subprocess.TimeoutExpired,
multiprocessing.TimeoutError):
pass
# Force kill if necessary and reap the zombies
import psutil # pylint: disable=import-outside-toplevel
try:
# Assert(s) for type checker
assert self._proc.pid is not None
Process(self._proc.pid).kill()
os.waitpid(self._proc.pid, 0)
os.waitpid(os.getpid(), 0)
except (psutil.NoSuchProcess,
ChildProcessError,
PermissionError):
pass
multiprocessing.active_children()
CameraPoseType = Tuple[
Optional[Tuple3FType], Optional[Tuple3FType], Optional[Union[int, str]]]
class CameraMotionBreakpointType(TypedDict, total=True):
"""Basic data structure storing all the information needed to describe a
camera motion breakpoint, ie the expected pose of the camera at a given
point in time.
"""
t: float
"""Time at which the camera is expected to have in a specific pose.
"""
pose: Tuple[Tuple3FType, Tuple3FType]
"""Exacted absolute pose of the camera, as a tuple position (X, Y, Z),
rotation (Roll, Pitch, Yaw).
"""
CameraMotionType = Sequence[CameraMotionBreakpointType]
Matrix3FType = np.ndarray # No easy way to specified fixed-size 2-dim array
RotationType = Union[Tuple4FType, Matrix3FType]
FramePoseType = Tuple[Tuple3FType, RotationType]
class MarkerDataType(TypedDict, total=True):
"""Pose of the marker, as a single vector (position [X, Y, Z] + rotation
[Quat X, Quat Y, Quat Z, Quat W]).
"""
pose: Union[np.ndarray, Callable[[], np.ndarray]]
"""Size of the marker. Each principal axis of the geometry are scaled
separately.
"""
scale: Union[Tuple3FType, Callable[[], Tuple3FType]]
"""Color of the marker, as a list of 4 floating-point values ranging from
0.0 to 1.0.
"""
color: Optional[Union[Tuple4FType, Callable[[], Tuple4FType]]]
[docs]
class Viewer:
"""Backend-agnostic 3D viewer.
This class can be used to render the state of a robot, including sensor
data and external forces if any. It is mainly used to render the current
simulation state or replay multiple complete trajectories as-posteriori.
.. note::
The environment variable 'JIMINY_INTERACTIVE_DISABLE' can be
used to force disabling interactive display.
.. note::
The environment variable 'JIMINY_VIEWER_DEFAULT_BACKEND' can be
used to overwrite the default backend.
"""
backend: Optional[str] = None
window_name = 'jiminy'
_has_gui = False
_backend_obj: Optional[Union[Panda3dApp,
Panda3dViewer,
"Panda3dQWidget", # noqa: F821
"MeshcatWrapper"]] = None # noqa: F821
_backend_proc: Optional[_ProcessWrapper] = None
_backend_robot_names: Set[str] = set()
_backend_robot_colors: Dict[str, Optional[Tuple4FType]] = {}
_camera_motion: Optional[
Callable[[float], Tuple[Tuple3FType, Tuple3FType]]] = None
_camera_travelling: Optional[Dict[str, Any]] = None
_camera_xyzrpy: Tuple[Tuple3FType, Tuple3FType] = DEFAULT_CAMERA_XYZRPY_ABS
_lock = RLock() # Unique lock for every viewer in same thread by default
def __init__(self, # pylint: disable=unused-argument
robot: jiminy.Model,
use_theoretical_model: bool = False,
robot_color: Optional[Union[str, Tuple4FType]] = None,
lock: Optional[RLock] = None,
backend: Optional[str] = None,
open_gui_if_parent: Optional[bool] = None,
delete_robot_on_close: bool = False,
robot_name: Optional[str] = None,
scene_name: str = 'world',
display_com: bool = False,
display_dcm: bool = False,
display_contact_frames: bool = False,
display_contact_forces: bool = False,
display_f_external: Optional[
Union[Sequence[bool], bool]] = None,
**kwargs: Any):
"""
:param robot: Jiminy.Model to display.
:param use_theoretical_model: Whether to use the theoretical rigid or
extended simulation model of this robot.
Note that using the extended model if
possible is more efficient since update
of the frames placements can be skipped
as it is already synchronized with the
simulation state.
Optional: Actual model by default.
:param robot_color: Color of the robot. It will override the original
color of the meshes if not `None`. It supports both
RGBA codes as a list of 4 floating-point values
ranging from 0.0 and 1.0, and a few named colors.
Optional: Disabled by default.
:param lock: Custom threading.RLock for parallel rendering. `None` to
use the unique lock associated with the current thread.
Optional: `None` by default.
:param backend: Name of the rendering backend to use. It can be either
'panda3d', 'panda3d-qt', 'meshcat'. None to keep using
to one already running if any, or the default one
otherwise. Note that the default is hardware and
environment dependent. See `viewer.default_backend`
method for details.
Optional: `None` by default.
:param open_gui_if_parent: Open GUI if new viewer's backend server is
started. `None` to fallback to default.
Optional: Do not open gui for 'meshcat'
backend in interactive mode with already one
display cell already opened, open gui by
default in any other case if graphical
server is available.
:param delete_robot_on_close: Enable automatic deletion of the robot
when closing.
Optional: False by default.
:param robot_name: Unique robot name, to identify each robot.
Optional: Randomly generated identifier by default.
:param scene_name: Scene name.
Optional: 'world' by default.
:param display_com: Whether to display the center of mass.
Optional: Disabled by default.
:param display_dcm: Whether to display the capture point / DCM.
Optional: Disabled by default.
:param display_contact_frames: Whether to display the contact frames.
Optional: Disabled by default.
:param display_contact_forces:
Whether to display the contact forces. Note that the user is
responsible for updating sensors data since `Viewer.display` is
only computing kinematic quantities.
Optional: Disabled by default.
:param display_f_external:
Whether to display the external external forces applied at the
joints on the robot. If a boolean is provided, the same visibility
will be set for each joint, alternatively one can provide a boolean
list whose ordering is consistent with `pinocchio_model.names`.
Note that the user is responsible for updating the force buffer
`viewer.f_external` data since `Viewer.display` is only computing
kinematic quantities.
Optional: Root joint for robot with freeflyer by default.
:param kwargs: Unused extra keyword arguments to enable forwarding.
"""
# Define some attribute to avoid raising exception at exit
self.robot_name = "_".join((
"undefined", next(tempfile._get_candidate_names())))
self.__is_open = False
# Make sure the robot is properly initialized
assert robot.is_initialized, (
"Robot not initialized. Impossible to instantiate a viewer.")
# Handling of default arguments
if robot_name is None:
uniq_id = next(tempfile._get_candidate_names())
base_name = robot.name or robot.pinocchio_model.name or "robot"
robot_name = "_".join((base_name, uniq_id))
# Pick the default backend if unspecified and none is already selected
if backend is None:
if Viewer.backend is not None:
backend = Viewer.backend
else:
backend = get_default_backend()
# Make sure that the windows, scene and robot names are valid
if scene_name == Viewer.window_name:
raise ValueError(
"The name of the scene and window must be different.")
if not re.match(r'^[A-Za-z0-9_]+$', scene_name + robot_name):
raise RuntimeError(
"Scene and robot names restricted to case-insensitive ASCII "
"alphanumeric characters plus underscore.")
# Robot names must be unique, then backup the desired one
if robot_name in Viewer._backend_robot_names:
raise ValueError(
"Robot name already exists but must be unique. Please choose "
"a different one, or close the associated viewer.")
# Enforce some arguments based on available features
if not backend.startswith('panda3d'):
if display_com or display_dcm or display_contact_frames or \
display_contact_forces:
LOGGER.warning(
"Panda3d backend is required to display markers, e.g. "
"CoM, DCM or Contact.")
display_com = False
display_dcm = False
display_contact_frames = False
display_contact_forces = False
# Backup some user arguments
self.robot_color = get_color_code(robot_color)
self.robot_name = robot_name
self.scene_name = scene_name
self.use_theoretical_model = use_theoretical_model
self.delete_robot_on_close = delete_robot_on_close
self._lock = lock or Viewer._lock
self._display_com = display_com
self._display_dcm = display_dcm
self._display_contact_frames = display_contact_frames
self._display_contact_forces = display_contact_forces
self._display_f_external = display_f_external
self._client = None
self.__is_open = False
# Initialize marker register
self.markers: Dict[str, MarkerDataType] = {}
self._markers_group = '/'.join((
self.scene_name, self.robot_name, "markers"))
self._markers_visibility: Dict[str, bool] = {}
# Initialize external forces
if self.use_theoretical_model:
njoints = robot.pinocchio_model_th.njoints
else:
njoints = robot.pinocchio_model.njoints
self.f_external = pin.StdVec_Force()
self.f_external.extend([pin.Force.Zero() for _ in range(njoints - 1)])
# Create a unique temporary directory, specific to this viewer instance
self._tempdir = tempfile.mkdtemp(
prefix="_".join((Viewer.window_name, scene_name, robot_name, "")))
# Access the current backend or create one if none is available
self.is_backend_parent = not Viewer.is_alive()
try:
# Start viewer backend
Viewer.connect_backend(backend)
# Assert(s) for type checker
assert Viewer._backend_obj is not None
# Decide whether to open gui
if open_gui_if_parent is None:
if not is_display_available():
open_gui_if_parent = False
elif backend == 'meshcat':
# Opening a new display cell automatically if there is
# no other display cell already opened.
open_gui_if_parent = interactive_mode() >= 2 and (
Viewer._backend_obj is None or
not Viewer._backend_obj.comm_manager.n_comm)
elif backend == 'panda3d':
open_gui_if_parent = interactive_mode() < 2
else:
open_gui_if_parent = False
# Keep track of the backend process associated to the viewer.
# The destructor of this instance must adapt its behavior to the
# case where the backend process has changed in the meantime.
self._gui = Viewer._backend_obj.gui
self._backend_proc = Viewer._backend_proc
self.__is_open = True
# Open gui if requested
try:
if open_gui_if_parent:
Viewer.open_gui()
except RuntimeError as e:
# Convert exception into warning if it fails. It is probably
# because no display is available.
LOGGER.warning("%s", e)
except Exception as e:
self.close()
raise RuntimeError(
"Impossible to create backend or connect to it.") from e
# Load the robot
Viewer._backend_robot_names.add(robot_name)
Viewer._backend_robot_colors[robot_name] = None
self._setup(robot, self.robot_color)
# Set default camera pose
if self.is_backend_parent:
self.set_camera_transform()
# Refresh the viewer since the positions of the meshes and their
# visibility mode are not properly set at this point.
self.refresh(
force_update_visual=True, force_update_collision=True, wait=True)
def __del__(self) -> None:
"""Automatically close the viewer at garbage collection.
"""
self.close()
@_with_lock
@_must_be_open
def _setup(self,
robot: jiminy.Model,
robot_color: Optional[Union[str, Tuple4FType]] = None) -> None:
"""Load (or reload) robot in viewer.
.. note::
This method must be called after calling `engine.reset` since at
this point the viewer has dangling references to the collision
model and data of robot. Indeed, a new robot is generated at each
reset to add some stochasticity to the mass distribution and some
other parameters. This is done automatically if one is using
`simulator.Simulator` instead of `jiminy_py.core.Engine` directly.
:param robot: jiminy.Model to display.
:param robot_color: Color of the robot. It will override the original
color of the meshes if not `None`. It supports both
RGBA codes as a list of 4 floating-point values
ranging from 0.0 and 1.0, and a few named colors.
Optional: Disabled by default.
"""
# Assert(s) for type checker
assert Viewer.backend is not None
# Delete existing robot, if any
assert self.robot_name in Viewer._backend_robot_names
robot_node_path = '/'.join((self.scene_name, self.robot_name))
Viewer._delete_nodes_viewer([
'/'.join((robot_node_path, "visuals")),
'/'.join((robot_node_path, "collisions"))])
# Backup desired color
assert self.robot_name in Viewer._backend_robot_colors.keys()
self.robot_color = get_color_code(robot_color)
Viewer._backend_robot_colors[self.robot_name] = self.robot_color
# Create backend wrapper to get (almost) backend-independent API.
backend_type = get_backend_type(Viewer.backend)
if self.use_theoretical_model:
self._client = backend_type(robot.pinocchio_model_th,
robot.collision_model_th,
robot.visual_model_th)
else:
# The viewer is sharing memory with the engine. It avoids having to
# keep track of the current state of the system and to update the
# internal state for the viewer accordingly every time it changes.
# It spares computational resources by avoiding computing twice the
# same quantities and it enables to display quantities that cannot
# be recovered from the current state only, eg the contact forces.
self._client = backend_type(robot.pinocchio_model,
robot.collision_model,
robot.visual_model)
self._client.data = robot.pinocchio_data
self._client.visual_data = robot.visual_data
self._client.collision_data = robot.collision_data
# Reset external force buffer iif it is necessary
njoints = self._client.model.njoints
if len(self.f_external) != njoints - 1:
self.f_external = pin.StdVec_Force()
self.f_external.extend([
pin.Force.Zero() for _ in range(njoints - 1)])
# Initialize the viewer
self._client.initViewer(viewer=self._gui, loadModel=False)
# Create the scene and load robot
self._client.loadViewerModel(
root_node_name=robot_node_path, color=self.robot_color)
if Viewer.backend.startswith('panda3d'):
# Add markers' display groups
self._gui.append_group(self._markers_group, remove_if_exists=False)
# Extract data for fast access
com_position = self._client.data.com[0]
com_velocity = self._client.data.vcom[0]
gravity = self._client.model.gravity.linear
dcm = np.zeros(3)
# Add center of mass
def get_com_scale() -> Tuple3FType:
nonlocal com_position
return (1.0, 1.0, com_position[2])
self.add_marker(name="COM_0_sphere",
shape="sphere",
pose=[com_position, None],
remove_if_exists=True,
auto_refresh=False,
radius=0.03)
self.add_marker(name="COM_0_cylinder",
shape="cylinder",
pose=[com_position, None],
scale=get_com_scale,
remove_if_exists=True,
auto_refresh=False,
radius=0.004,
length=-1.0,
anchor_bottom=True)
self.display_center_of_mass(self._display_com)
# Add DCM marker
def get_dcm_pose() -> FramePoseType:
nonlocal com_position, com_velocity, gravity, dcm
if com_position[2] > 0.0:
omega = math.sqrt(abs(gravity[2]) / com_position[2])
dcm[:2] = com_position[:2] + com_velocity[:2] / omega
return (dcm, np.array([0.0, 0.0, 0.0, 1.0]))
def get_dcm_scale() -> Tuple3FType:
nonlocal com_position
return np.full((3,), com_position[2] > 0.0, dtype=np.float64)
self.add_marker(name="DCM",
shape="cone",
color="green",
pose=get_dcm_pose,
scale=get_dcm_scale,
remove_if_exists=True,
auto_refresh=False,
radius=0.03,
length=0.03,
num_sides=4)
self.display_capture_point(self._display_dcm)
# Add contact frame markers
for frame_name, frame_index in zip(
robot.contact_frame_names, robot.contact_frame_indices):
frame_pose = self._client.data.oMf[frame_index]
self.add_marker(name='_'.join(("ContactFrame", frame_name)),
shape="sphere",
color="yellow",
pose=[frame_pose.translation, None],
remove_if_exists=True,
auto_refresh=False,
radius=0.01)
self.display_contact_frames(self._display_contact_frames)
# Add contact sensor markers
def get_contact_scale(sensor_data: np.ndarray) -> Tuple3FType:
f_z_rel = sensor_data[2] / CONTACT_FORCE_SCALE
return (1.0, 1.0, min(max(f_z_rel, -1.0), 1.0))
for sensor in robot.sensors.get(ContactSensor.type, []):
frame_index, data = sensor.frame_index, sensor.data
self.add_marker(name='_'.join((sensor.type, sensor.name)),
shape="cylinder",
pose=self._client.data.oMf[frame_index],
scale=partial(get_contact_scale, data),
remove_if_exists=True,
auto_refresh=False,
radius=0.02,
length=0.5,
anchor_bottom=True)
self.display_contact_forces(self._display_contact_forces)
# Add external forces
def get_force_pose(joint_index: int,
joint_position: np.ndarray,
joint_rotation: np.ndarray
) -> FramePoseType:
f = self.f_external[joint_index - 1].linear
f_rotation_local = pin.Quaternion(np.array([0.0, 0.0, 1.0]), f)
f_rotation_world = (
pin.Quaternion(joint_rotation) * f_rotation_local)
return (joint_position, f_rotation_world.coeffs())
def get_force_scale(joint_index: int
) -> Tuple[float, float, float]:
f_ext: np.ndarray = self.f_external[joint_index - 1].linear
f_ext_norm = cast(float, np.linalg.norm(f_ext, 2))
length = min(f_ext_norm / EXTERNAL_FORCE_SCALE, 1.0)
return (1.0, 1.0, length)
for joint_name in self._client.model.names[1:]:
joint_index = self._client.model.getJointId(joint_name)
joint_pose = self._client.data.oMi[joint_index]
pose_fn = partial(get_force_pose,
joint_index,
joint_pose.translation,
joint_pose.rotation)
self.add_marker(name=f"ForceExternal_{joint_name}",
shape="arrow",
color="red",
pose=pose_fn,
scale=partial(get_force_scale, joint_index),
remove_if_exists=True,
auto_refresh=False,
anchor_top=True,
radius=0.015,
length=0.7)
# Check if external forces visibility is deprecated
if isinstance(self._display_f_external, (list, tuple)):
if len(self._display_f_external) != njoints - 1:
self._display_f_external = None
# Display external forces only on freeflyer by default
if self._display_f_external is None:
self._display_f_external = \
[robot.has_freeflyer] + [False] * (njoints - 2)
self.display_external_forces(self._display_f_external)
[docs]
@staticmethod
@_with_lock
@_must_be_open
def open_gui() -> None:
"""Open a new viewer graphical interface. It is only possible if a
backend is already running.
.. note::
Only one graphical interface can be opened locally for efficiency.
"""
# Assert(s) for type checker
assert Viewer.backend is not None
assert Viewer._backend_obj is not None
# If a graphical window is already open, do nothing
if Viewer.has_gui():
return
# Check if a display is available
if not is_display_available():
raise RuntimeError(
"No display available. Impossible to open a gui.")
if Viewer.backend in ('panda3d-qt', 'panda3d-sync'):
# No instance is considered manager of the unique window
raise RuntimeError(
f"Impossible to open gui with '{Viewer.backend}' backend.")
if Viewer.backend == 'panda3d':
Viewer._backend_obj.gui.open_window()
elif Viewer.backend == 'meshcat':
viewer_url = Viewer._backend_obj.gui.url()
if interactive_mode() >= 2:
# pylint: disable=import-outside-toplevel,no-name-in-module
from IPython.core.display import HTML, display
# Scrap the viewer html content, including javascript
# dependencies
with urlopen(viewer_url) as response:
html_content = response.read().decode()
pattern = '<script type="text/javascript" src="%s"></script>'
scripts_js = re.findall(pattern % '(.*)', html_content)
for file in scripts_js:
file_path = os.path.join(viewer_url, file)
with urlopen(file_path) as response:
js_content = response.read().decode()
html_content = html_content.replace(pattern % file, f"""
<script type="text/javascript">
{js_content}
</script>""")
# Provide websocket URL as fallback if needed. It would be
# the case if the environment is not jupyter-notebook nor
# colab but rather jupyterlab or vscode for instance.
from IPython import get_ipython
kernel = get_ipython()
if interactive_mode() == 3:
kernel_config_app = kernel.config['ColabKernelApp']
else:
kernel_config_app = kernel.config['IPKernelApp']
conn_file = kernel_config_app['connection_file']
try:
kernel_id = conn_file.split('-', 1)[1].split('.')[0]
except AttributeError:
kernel_id = None
if kernel_id is not None:
server_pid = Process(os.getpid()).parent().pid
server_list = []
try:
from notebook import notebookapp
server_list += list(notebookapp.list_running_servers())
except ImportError:
# `notebook>=7.0` has removed this submodule entirely
pass
try:
from jupyter_server import serverapp
server_list += list(serverapp.list_running_servers())
except ImportError:
pass
for server_cfg in server_list:
if server_cfg['pid'] != server_pid:
continue
ws_url = (
f"ws{server_cfg['url'][4:]}api/kernels/{kernel_id}"
f"/channels?token={server_cfg['token']}")
html_content = html_content.replace(
"var ws_url = undefined;",
f'var ws_url = "{ws_url}";')
break
if interactive_mode() == 2:
# Isolate HTML in iframe
html_content = html_content.replace(
"\"", """).replace("'", "'")
display(HTML(f"""
<div class="resizable" style="
height: 400px; width: 100%;
overflow-x: auto; overflow-y: hidden;
resize: both">
<iframe srcdoc="{html_content}" style="
width: 100%; height: 100%; border: none;">
</iframe>
</div>
"""))
else:
# Impossible to isolate HTML to get access to google colab
html_content = html_content.replace(
'<div id="meshcat-pane">', """
<div id="meshcat-pane" class="resizable" style="
height: 400px; width: 100%;
overflow-x: auto; overflow-y: hidden;
resize: both">
""")
# Make meshcat dom element unique to avoid conflict if
# multiple view are displayed.
html_content = html_content.replace(
"meshcat-pane", str(uuid.uuid1()))
# Display the content directly inside the main window
display(HTML(html_content))
elif not Viewer.has_gui():
try:
webbrowser.get()
webbrowser.open(viewer_url, new=2, autoraise=True)
except webbrowser.Error: # Fail if not browser is available
LOGGER.warning(
"No browser available for display. Please install one "
"manually.")
return # Skip waiting since there is nothing to wait for
# Wait to finish loading
Viewer.wait(require_client=True)
# There is at least one graphical window at this point
Viewer._has_gui = True
[docs]
@staticmethod
def has_gui() -> bool:
"""Whether the viewer has a Graphical User Interface (GUI), ie the
viewer is connected to a given backend that is running up to now, and
onscreen rendering has been specifically requested.
"""
if Viewer.is_alive():
# Assert(s) for type checker
assert Viewer.backend is not None
assert Viewer._backend_obj is not None
# Make sure the viewer still has gui if necessary
if Viewer.backend == 'meshcat':
ack = Viewer._backend_obj.wait(require_client=False)
Viewer._has_gui = any(msg for msg in ack.split(","))
return Viewer._has_gui
return False
[docs]
@staticmethod
@_with_lock
@_must_be_open
def wait(require_client: bool = False) -> None:
"""Wait for all the meshes to finish loading in every clients.
:param require_client: Wait for at least one client to be available
before checking for mesh loading.
"""
# Assert(s) for type checker
assert Viewer.backend is not None
assert Viewer._backend_obj is not None
if Viewer.backend == 'meshcat':
Viewer._backend_obj.wait(require_client)
elif Viewer.backend.startswith('panda3d'):
Viewer._backend_obj.gui.step()
[docs]
@staticmethod
def is_alive() -> bool:
"""Check if the backend server is running and responding to queries.
"""
return Viewer._backend_proc is not None and \
Viewer._backend_proc.is_alive()
[docs]
def is_open(self: Optional["Viewer"] = None) -> bool:
"""Check if a given viewer instance is open, or if the backend server
is running if no instance is specified.
"""
is_open_ = Viewer.is_alive()
if self is not None:
is_open_ = is_open_ and self.__is_open
return is_open_
[docs]
@_with_lock
def close(self: Optional["Viewer"] = None) -> None:
"""Close a given viewer instance, or all of them if no instance is
specified.
.. note::
Calling this method with an viewer instance always closes the
client. It may also remove the robot from the server if the viewer
attribute `delete_robot_on_close` is True. Moreover, it is the one
having started the backend server, it also terminates it, resulting
in closing every viewer somehow. It results in the same outcome
than calling this method without specifying any viewer instance.
"""
# pylint: disable=unsubscriptable-object
if self is None:
# NEVER closing backend automatically if closing instances,
# even for the parent. It will be closed at Python exit
# automatically. One must call `Viewer.close` to do otherwise.
Viewer._backend_robot_names.clear()
Viewer._backend_robot_colors.clear()
Viewer._camera_xyzrpy = DEFAULT_CAMERA_XYZRPY_ABS
Viewer.detach_camera()
Viewer.remove_camera_motion()
if Viewer.is_alive():
# Assert(s) for type checker
assert Viewer.backend is not None
assert Viewer._backend_obj is not None
assert Viewer._backend_proc is not None
if Viewer.backend == 'meshcat':
Viewer._backend_obj.close()
elif Viewer.backend.startswith('panda3d'):
try:
Viewer._backend_obj.stop()
except ViewerClosedError:
pass
Viewer._backend_proc.wait(0.2)
Viewer._backend_proc.kill()
Viewer.backend = None
Viewer._backend_obj = None
Viewer._backend_proc = None
Viewer._has_gui = False
else:
# Consider that the robot is not available anymore, no matter what
Viewer._backend_robot_names.discard(self.robot_name)
Viewer._backend_robot_colors.pop(self.robot_name, None)
# Disable travelling if associated with this viewer instance
if (Viewer._camera_travelling is not None and
Viewer._camera_travelling['viewer'] is self):
Viewer.detach_camera()
# Check if the backend process has changed or the viewer instance
# has already been closed, which may happened if it has been closed
# manually in the meantime. If so, there is nothing left to do.
if (not self.__is_open or
Viewer._backend_proc is not self._backend_proc):
return
# Assert(s) for type checker
assert Viewer._backend_obj is not None
# Make sure zmq does not hang
if Viewer.backend == 'meshcat' and Viewer.is_alive():
Viewer._backend_obj.gui.window.zmq_socket.RCVTIMEO = 200
# Delete robot from scene if requested
if (self.delete_robot_on_close and self._client is not None and
self.is_open()): # type: ignore[unreachable]
try: # type: ignore[unreachable]
Viewer._delete_nodes_viewer([
self._client.visual_group,
self._client.collision_group,
self._markers_group])
except ViewerError:
pass
# Restore zmq socket timeout, which is disable by default
if Viewer.backend == 'meshcat':
Viewer._backend_obj.gui.window.zmq_socket.RCVTIMEO = -1
# Delete temporary directory
if self._tempdir.startswith(tempfile.gettempdir()):
try:
shutil.rmtree(self._tempdir)
except FileNotFoundError:
pass
# At this point, consider the viewer has been closed no matter what
self.__is_open = False
[docs]
@staticmethod
@_with_lock
def connect_backend(backend: Optional[str] = None) -> None:
"""Get the running process of backend client.
This method can be used to open a new process if necessary.
:param backend: Name of the rendering backend to use. It can be either
'panda3d', 'panda3d-qt', 'meshcat'.
Optional: The default is hardware and environment
dependent. See `viewer.default_backend` for details.
:returns: Pointer to the running backend Client and its PID.
"""
# pylint: disable=import-outside-toplevel
# Handle default arguments
if backend is None:
backend = get_default_backend()
# Sanitize user arguments and check requested backend is available
backend = backend.lower()
get_backend_type(backend)
# Update the backend currently running, if any
if Viewer.backend != backend and Viewer.is_alive():
logging.warning("Different backend already running. Closing it...")
Viewer.close()
# Nothing to do if already connected
if Viewer.is_alive():
return
# Reset some class attribute if backend not available
Viewer.close()
client: Union[Panda3dApp,
Panda3dViewer,
"Panda3dQWidget", # noqa: F821
"MeshcatWrapper"] # noqa: F821
if backend.startswith('panda3d'):
# Instantiate client with onscreen rendering capability enabled.
# Note that it fallbacks to software rendering if necessary.
try:
if backend == 'panda3d-qt':
from .panda3d.panda3d_widget import Panda3dQWidget
client = Panda3dQWidget()
proc = _ProcessWrapper(client)
elif backend == 'panda3d-sync':
client = Panda3dApp()
proc = _ProcessWrapper(client)
else:
client = Panda3dViewer(window_type='onscreen',
window_title=Viewer.window_name)
proc = _ProcessWrapper(client._app)
except RuntimeError as e:
raise RuntimeError(
"Something went wrong. Impossible to instantiate viewer "
"backend.") from e
# The gui is the client itself
client.gui = client # type: ignore[union-attr]
else:
# List of connections likely to correspond to Meshcat servers
import psutil
meshcat_candidate_conn = []
for pid in psutil.pids():
try:
proc_info = Process(pid)
for conn in proc_info.net_connections("tcp4"):
if conn.status != 'LISTEN' or \
conn.laddr.ip != '127.0.0.1':
continue
cmdline = proc_info.cmdline()
if cmdline and ('python' in cmdline[0].lower() or
'meshcat' in cmdline[-1]):
meshcat_candidate_conn.append(conn)
except (psutil.AccessDenied,
psutil.ZombieProcess,
psutil.NoSuchProcess):
pass
# Exclude ipython kernel ports from the look up because sending a
# message on ipython ports will throw a low-level exception, that
# is not blocking on Jupyter, but is on Google Colab.
excluded_ports = []
if interactive_mode() >= 2:
from IPython import get_ipython
try:
excluded_ports += list(
get_ipython().kernel._recorded_ports.values())
except (NameError, AttributeError):
pass # No Ipython kernel running
# Use the first port responding to zmq request, if any.
# Sorting connections to scan most likely ports first.
import zmq
zmq_url = None
context = zmq.Context.instance()
for conn in sorted(
meshcat_candidate_conn, key=lambda conn: conn.laddr.port):
try:
# Note that the timeout must be long enough to give enough
# time to the server to respond, but not to long to avoid
# sending to much time spanning the available connections.
port = conn.laddr.port
if port in excluded_ports:
continue
zmq_url = f"tcp://127.0.0.1:{port}"
zmq_socket = context.socket(zmq.REQ)
zmq_socket.RCVTIMEO = 200 # millisecond
zmq_socket.connect(zmq_url)
zmq_socket.send(b"url")
response = zmq_socket.recv().decode("utf-8")
if response[:4] != "http":
zmq_url = None
except (zmq.error.Again, zmq.error.ZMQError):
zmq_url = None
zmq_socket.close(linger=5)
if zmq_url is not None:
break
# Create a meshcat server if needed and connect to it
from .meshcat.wrapper import MeshcatWrapper
client = MeshcatWrapper(zmq_url)
server_proc: Union[Process, ProcessMP]
if client.server_proc is None:
server_proc = Process(conn.pid)
else:
server_proc = client.server_proc
proc = _ProcessWrapper(server_proc)
# Make sure the backend process is alive
assert proc.is_alive(), (
"Something went wrong. Impossible to instantiate viewer backend.")
# Update global state
Viewer.backend = backend
Viewer._backend_obj = client
Viewer._backend_proc = proc
@staticmethod
@_with_lock
@_must_be_open
def _delete_nodes_viewer(nodes_path: Sequence[str]) -> None:
"""Delete an object or a group of objects in the scene.
.. note::
Be careful, one must specify the full path of a node, including all
parent group, but without the window name, ie
'scene_name/robot_name' to delete the robot.
:param nodes_path: Full path of the node to delete
"""
# Assert(s) for type checker
assert Viewer.backend is not None
assert Viewer._backend_obj is not None
if Viewer.backend.startswith('panda3d'):
for node_path in nodes_path:
try:
Viewer._backend_obj.gui.remove_group(node_path)
except KeyError:
pass
else:
for node_path in nodes_path:
Viewer._backend_obj.gui[node_path].delete()
[docs]
@staticmethod
@_with_lock
@_must_be_open
def set_watermark(img_fullpath: Optional[str] = None,
width: Optional[int] = None,
height: Optional[int] = None) -> None:
"""Insert desired watermark on bottom left corner of the window.
.. note::
The relative width and height cannot exceed 20% of the visible
area, otherwise it will be rescaled.
:param img_fullpath: Full path of the image to use as watermark.
Meshcat supports format '.png', '.jpeg' or 'svg',
while Panda3d only supports '.png' and '.jpeg' for
now. None or empty string to disable.
Optional: None by default.
:param width: Desired width for the image. None to not rescale the
image manually.
Optional: None by default.
:param height: Desired height for the image. None to not rescale the
image manually.
Optional: None by default.
"""
# Assert(s) for type checker
assert Viewer.backend is not None
assert Viewer._backend_obj is not None
if Viewer.backend.startswith('panda3d'):
Viewer._backend_obj.gui.set_watermark(img_fullpath, width, height)
else:
width = width or DEFAULT_WATERMARK_MAXSIZE[0]
height = height or DEFAULT_WATERMARK_MAXSIZE[1]
if img_fullpath is None or img_fullpath == "":
Viewer._backend_obj.remove_watermark()
[docs]
@staticmethod
@_with_lock
@_must_be_open
def set_legend(labels: Optional[Sequence[str]] = None) -> None:
"""Insert legend on top left corner of the window.
.. note::
Make sure to have specified different colors for each robot on the
scene, since it will be used as marker on the legend.
:param labels: Sequence of strings whose length must be consistent
with the number of robots on the scene. None to disable.
Optional: None by default.
"""
# Assert(s) for type checker
assert Viewer.backend is not None
assert Viewer._backend_obj is not None
# Make sure number of labels is consistent with number of robots
if labels is not None and (
len(labels) != len(Viewer._backend_robot_colors)):
raise RuntimeError(
f"Inconsistency between robots {Viewer._backend_robot_names} "
f"({Viewer._backend_robot_colors}) and labels {labels}")
# Make sure all robots have a specific color
if any(color is None for color in Viewer._backend_robot_colors):
raise RuntimeError(
"All robot must have a specific color when setting a legend.")
if Viewer.backend.startswith('panda3d'):
if labels is None:
items = None
else:
items = list(zip(
labels, Viewer._backend_robot_colors.values()))
Viewer._backend_obj.gui.set_legend(items)
else:
if labels is None:
for robot_name in Viewer._backend_robot_colors.keys():
Viewer._backend_obj.remove_legend_item(robot_name)
else:
for text, (robot_name, color) in zip(
labels, Viewer._backend_robot_colors.items()):
if color is not None:
rgba = (*[int(e * 255) for e in color[:3]], color[3])
color_text = f"rgba({','.join(map(str, rgba))})"
else:
color_text = "black"
Viewer._backend_obj.set_legend_item(
robot_name, color_text, text)
[docs]
@staticmethod
@_with_lock
@_must_be_open
@_is_async
def set_clock(t: Optional[float] = None) -> None:
"""Insert clock on bottom right corner of the window.
.. note::
Only Panda3d rendering backend is supported by this method.
:param t: Current simulation time is seconds. None to disable.
Optional: None by default.
"""
# Assert(s) for type checker
assert Viewer.backend is not None
assert Viewer._backend_obj is not None
if Viewer.backend.startswith('panda3d'):
Viewer._backend_obj.gui.set_clock(t)
else:
LOGGER.warning("Adding clock is only available for Panda3d.")
[docs]
@_with_lock
@_must_be_open
def set_camera_lookat(self,
position: Tuple3FType,
relative: Optional[Union[str, int]] = None,
wait: bool = False) -> None:
"""Set the camera look-at position.
.. note::
It preserve the relative camera pose wrt the lookup position.
:param position: Position [X, Y, Z] as a list or 1D array
:param relative: If specified, set the lookat position relative to
provided position, in absolute world otherwise. Both
frame name and index in model are supported.
:param wait: Whether to wait for rendering to finish.
"""
# Assert(s) for type checker
assert Viewer.backend is not None
# Make sure the backend supports this method
if not Viewer.backend.startswith('panda3d'):
raise NotImplementedError(
"This method is only supported by Panda3d.")
# Compute absolute lookat position using frame and relative position
if isinstance(relative, str):
relative = self._client.model.getFrameId(relative)
if isinstance(relative, int):
body_transform = self._client.data.oMf[relative]
position = body_transform.translation + position
# Update camera lookat position
self._gui.set_camera_lookat(position)
# Wait for the backend viewer to finish rendering if requested
if wait:
Viewer.wait(require_client=False)
[docs]
@staticmethod
def register_camera_motion(camera_motion: CameraMotionType) -> None:
"""Register camera motion. It will be used later by `replay` to set the
absolute or relative camera pose, depending on whether or not
travelling is enable.
:param camera_motion:
Camera breakpoint poses over time, as a list of
`CameraMotionBreakpointType` dict. Here is an example::
[{'t': 0.0,
'pose': ([3.5, 0.0, 1.4], [1.4, 0.0, np.pi / 2])},
{'t': 0.3,
'pose': ([4.5, 0.0, 1.4], [1.4, np.pi / 6, np.pi / 2])},
{'t': 0.6,
'pose': ([8.5, 0.0, 1.4], [1.4, np.pi / 3, np.pi / 2])},
{'t': 1.0,
'pose': ([9.5, 0.0, 1.4], [1.4, np.pi / 2, np.pi / 2])}]
"""
t_camera = np.asarray([
camera_break['t'] for camera_break in camera_motion])
camera_xyzrpy = np.stack([np.concatenate(camera_break['pose'])
for camera_break in camera_motion], axis=0)
Viewer._camera_motion = (
lambda t: tuple(np.split( # type: ignore[assignment, return-value]
interp1d(t_camera, camera_xyzrpy, t), 2)))
[docs]
@staticmethod
def remove_camera_motion() -> None:
"""Remove camera motion.
"""
Viewer._camera_motion = None
[docs]
@_must_be_open
def attach_camera(self,
relative: Union[str, int],
position: Optional[Tuple3FType] = None,
rotation: Optional[Tuple3FType] = None,
lock_relative_pose: Optional[bool] = None) -> None:
"""Attach the camera to a given robot frame.
Only the position of the frame is taken into account. A custom relative
orientation of the camera wrt to the frame can be further specified. If
so, then the relative camera pose wrt the frame is locked, otherwise
the camera is only constrained to look at the frame.
:param relative: Name or index of the frame of the robot to follow with
the camera.
:param position: Relative position [X, Y, Z] of the camera wrt the
tracked frame. It is used to initialize the camera
pose if relative pose is not locked. `None` to
disable.
Optional: Disable by default.
:param rotation: Relative orientation [Roll, Pitch, Yaw] of the camera
wrt the tracked frame. It is used to initialize the
camera pose if relative pose is not locked. `None` to
disable.
Optional: Disable by default.
:param lock_relative_pose: Whether to lock the relative pose of the
camera wrt tracked frame.
Optional: False by default iif Panda3d
backend is used.
"""
# Make sure one is not trying to track the camera itself
assert relative != 'camera', "Impossible to track the camera itself !"
# Assert(s) for type checker
assert Viewer.backend is not None
# Make sure the frame exists and it is not the universe itself
if isinstance(relative, str):
relative = self._client.model.getFrameId(relative)
if relative == self._client.model.nframes:
raise ValueError("Trying to attach camera to non-existing frame.")
assert relative != 0, "Impossible to track the universe !"
# Handle of default camera lock mode
if lock_relative_pose is None:
lock_relative_pose = not Viewer.backend.startswith('panda3d')
# Make sure camera lock mode is compatible with viewer backend
if not lock_relative_pose and not Viewer.backend.startswith('panda3d'):
raise NotImplementedError(
"Not locking camera pose is only supported by Panda3d.")
# Set default relative camera pose if pose is partially defined
if lock_relative_pose or position is not None or rotation is not None:
if position is None:
position = DEFAULT_CAMERA_XYZRPY_REL[0]
if rotation is None:
rotation = DEFAULT_CAMERA_XYZRPY_REL[1]
# Set camera pose if relative pose is not locked but provided
if not lock_relative_pose:
if position is not None or rotation is not None:
self.set_camera_transform(position, rotation, relative)
if isinstance(relative, str):
relative = self._client.model.getFrameId(relative)
position = (
self._gui.get_camera_lookat() -
self._client.data.oMf[relative].translation)
rotation = None
Viewer._camera_travelling = {
'viewer': self, 'frame': relative, 'pose': (position, rotation)}
[docs]
@staticmethod
def detach_camera() -> None:
"""Detach the camera.
Must be called to undo `attach_camera`, so that it will stop
automatically tracking a frame.
"""
Viewer._camera_travelling = None
[docs]
@_with_lock
@_must_be_open
def set_color(self,
color: Optional[Union[str, Tuple4FType]] = None
) -> None:
"""Override the color of the visual and collision geometries of the
robot on-the-fly.
.. warning::
This method is only supported by Panda3d for now.
:param color: Color of the robot. It will override the original color
of the meshes if not `None`, and restore them otherwise.
It supports both RGBA codes as a list of 4 floating-point
values ranging from 0.0 and 1.0, and a few named colors.
Optional: Disabled by default.
"""
# Assert(s) for type checker
assert Viewer.backend is not None
assert Viewer._backend_obj is not None
# Return early if this method is not supported by the current backend
if not Viewer.backend.startswith('panda3d'):
LOGGER.warning("This method is only supported by Panda3d.")
return
# Sanitize user-specified color code
color_ = get_color_code(color)
# Update the color of all the geometries of the robot
for model, geom_type in zip(
(self._client.visual_model, self._client.collision_model),
pin.GeometryType.names.values()):
for geom in model.geometryObjects:
node_name = self._client.getViewerNodeName(geom, geom_type)
color = color_
if color is None and geom.overrideMaterial:
color = geom.meshColor
self._gui.set_material(*node_name, color)
# Backup the new color
assert self.robot_name in Viewer._backend_robot_colors.keys()
self.robot_color = color_
Viewer._backend_robot_colors[self.robot_name] = color_
# Refresh the legend accordingly
Viewer._backend_obj.gui.set_legend()
[docs]
@staticmethod
@_with_lock
@_must_be_open
def update_floor(ground_profile: Optional[jiminy.HeightmapFunction] = None,
x_range: Tuple[float, float] = (-10.0, 10.0),
y_range: Tuple[float, float] = (-10.0, 10.0),
grid_unit: Tuple[float, float] = (0.04, 0.04),
simplify_mesh: bool = False,
show_vertices: bool = False) -> None:
"""Display a custom ground profile as a height map or the original tile
ground floor.
:param ground_profile: `jiminy_py.core.HeightmapFunction` associated
with the ground profile. It renders a flat tile
ground if not specified.
Optional: None by default.
:param x_range: Tuple gathering min and max limits along x-axis for
which the heightmap mesh associated with the ground
profile will be procedurally generated.
Optional: (-10.0, 10.0) by default.
:param y_range: Tuple gathering min and max limits along y-axis.
Optional: (-10.0, 10.0) by default.
:param grid_unit: Tuple gathering X and Y discretization steps for the
generation of the heightmap mesh.
Optional: 4cm by default.
:param simplify_mesh: Whether the generated heightmap mesh should be
decimated before final rendering. This option
must be enabled for the ratio between grid size
and unit is very large to avoid a prohibitive
slowdown of the viewer.
Optional: False by default
:param show_vertices: Whether to highlight the mesh vertices.
Optional: disabled by default.
"""
# pylint: disable=import-outside-toplevel
# Assert(s) for type checker
assert Viewer.backend is not None
assert Viewer._backend_obj is not None
# Discretize ground profile if provided
geom = None
if ground_profile is not None:
geom = discretize_heightmap(
ground_profile, *x_range, grid_unit[0], *y_range, grid_unit[1],
must_simplify=simplify_mesh)
# Render original flat tile ground if possible.
# TODO: Improve this check using LocalAABB box geometry instead.
if isinstance(geom, hppfcl.Halfspace):
if abs(geom.d) > 1e-6:
raise RuntimeError(
"Rendering flat ground with non-zero height not supported")
geom = None
# Render ground geometry
if Viewer.backend.startswith('panda3d'):
obj = None
if geom is not None:
obj = convert_bvh_collision_geometry_to_primitive(geom)
Viewer._backend_obj.gui.update_floor(obj, show_vertices)
else:
from .meshcat.meshcat_visualizer import (
update_floor as meshcat_update_floor)
meshcat_update_floor(Viewer._backend_obj.gui, geom)
[docs]
@staticmethod
@_with_lock
@_must_be_open
def add_camera(camera_name: str,
width: int,
height: int,
is_depthmap: bool) -> None:
"""TODO: Write documentation.
.. warning::
This method is only supported by Panda3d for now.
"""
# Assert(s) for type checker
assert Viewer.backend is not None
assert Viewer._backend_obj is not None
# Make sure the backend supports this method
if not Viewer.backend.startswith('panda3d'):
raise NotImplementedError(
"This method is only supported by Panda3d.")
# Add camera
Viewer._backend_obj.gui.add_camera(
camera_name, (width, height), is_depthmap)
[docs]
@staticmethod
@_with_lock
@_must_be_open
def capture_frame(width: Optional[int] = None,
height: Optional[int] = None,
camera_name: Optional[str] = None,
raw_data: bool = False
) -> Union[np.ndarray, bytes]:
"""Take a snapshot and return associated data.
.. warning::
Specifying a camera name is only supported by Panda3d rendering
backend, while raw data mode is only supported by Meshcat.
:param width: Width for the image in pixels. None to keep unchanged.
Optional: Kept unchanged by default.
:param height: Height for the image in pixels. None to keep unchanged.
Optional: Kept unchanged by default.
:param camera_name: Name of the camera to consider. None to specify
the "default" world facing camera that is used
when GUI (onscreen window) in enabled.
Optional: None by default.
:param raw_data: Whether to return a 2D numpy array, or the raw output
from the backend as bytes array.
"""
# Assert(s) for type checker
assert Viewer.backend is not None
assert Viewer._backend_obj is not None
# Check user arguments
if Viewer.backend.startswith('panda3d'):
if camera_name is None:
# Resize window if size has changed
_width, _height = Viewer._backend_obj.gui.getSize()
if width is None:
width = _width
if height is None:
height = _height
if _width != width or _height != height:
Viewer._backend_obj.gui.set_window_size(width, height)
elif width is not None or height is not None:
raise ValueError(
"Specifying both camera name and image width and/or "
"height is not supported.")
# Get screenshot
image = Viewer._backend_obj.gui.get_screenshot(camera_name)
if image is None:
raise RuntimeError(
"Impossible to capture frame. There is something wrong "
"with the graphics stack on this machine.")
return image
# Make sure that no camera name has been specified for meshcat
if camera_name is not None:
raise ValueError(
"Specifying a camera name is only supported by Panda3d.")
# Send capture frame request to the background recorder process
img_html = Viewer._backend_obj.capture_frame(width, height)
# Parse the output to remove the html header, and convert it into
# the desired output format.
img_data = str.encode(img_html.split(",", 1)[-1])
buffer = base64.decodebytes(img_data)
# Return raw data if requested
if raw_data:
return buffer
# Extract numpy array RGBA
from PIL import Image # pylint: disable=import-outside-toplevel
with Image.open(io.BytesIO(buffer)) as img_obj:
rgba_array = np.array(img_obj)
# Return numpy array RGB
return rgba_array[:, :, :-1]
[docs]
@staticmethod
@_with_lock
@_must_be_open
def save_frame(image_path: str,
width: Optional[int] = None,
height: Optional[int] = None) -> None:
"""Save a snapshot in png format.
:param image_path: Fullpath of the image (.png extension is mandatory)
:param width: Width for the image in pixels. None to keep unchanged.
Optional: Kept unchanged by default.
:param height: Height for the image in pixels. None to keep unchanged.
Optional: Kept unchanged by default.
"""
# Assert(s) for type checker
assert Viewer.backend is not None
assert Viewer._backend_obj is not None
image_path = str(pathlib.Path(image_path).with_suffix('.png'))
if Viewer.backend.startswith('panda3d'):
_width, _height = Viewer._backend_obj.gui.getSize()
if width is None:
width = _width
if height is None:
height = _height
if _width != width or _height != height:
Viewer._backend_obj.gui.set_window_size(width, height)
Viewer._backend_obj.gui.save_screenshot(image_path)
else:
img_data = Viewer.capture_frame(width, height, raw_data=True)
with open(image_path, "wb") as f:
f.write(img_data)
[docs]
@_with_lock
@_must_be_open
def display_visuals(self, visibility: bool) -> None:
"""Set the visibility of the visual model of the robot.
:param visibility: Whether to enable or disable display of the visual
model.
"""
self._client.displayVisuals(visibility)
self.refresh()
[docs]
@_with_lock
@_must_be_open
def display_collisions(self, visibility: bool) -> None:
"""Set the visibility of the collision model of the robot.
:param visibility: Whether to enable or disable display of the visual
model.
"""
self._client.displayCollisions(visibility)
self.refresh()
[docs]
@_with_lock
@_must_be_open
def add_marker(
self,
name: str,
shape: ShapeType,
pose: Union[pin.SE3,
Tuple[Optional[Tuple3FType], Optional[RotationType]],
Callable[[], FramePoseType]] = (None, None),
scale: Union[float, Tuple3FType, Callable[[], Tuple3FType]] = 1.0,
color: Union[Optional[Union[str, Tuple4FType]],
Callable[[], Tuple4FType]] = None,
always_foreground: bool = True,
remove_if_exists: bool = False,
auto_refresh: bool = True,
**shape_kwargs: Any) -> MarkerDataType:
"""Add marker on the scene.
.. warning::
This method is only supported by Panda3d.
:param name: Unique name. It must be a valid string identifier.
:param shape: Desired shape, as a string, i.e. 'cone', 'box', 'sphere',
'capsule', 'cylinder', 'frame', or 'arrow'.
:param pose: Pose of the geometry on the scene, as a transform object
`pin.SE3`, or a tuple (position, orientation). In the
latter case, the position must be the vector [X, Y, Z],
while the orientation can be either a rotation matrix, or
a quaternion [X, Y, Z, W]. `None` can be used to specify
neutral frame position and/or orientation.
Optional: Neutral position and orientation by default.
:param scale: Size of the marker. Each principal axis of the geometry
are scaled separately.
:param color: Color of the marker. It supports both RGBA codes as a
list of 4 floating-point values ranging from 0.0 and 1.0,
and a few named colors.
Optional: Robot's color by default if overridden, 'white'
otherwise, except for 'frame'.
:param always_foreground: Whether to force rendering the marker on
foreground.
Optional: True by default.
:param auto_refresh: Whether to refresh the scene after adding the
marker. Useful for adding a bunch of markers and
only refresh once. Note that the marker will not
display properly until then.
:param shape_kwargs: Any additional keyword arguments to forward to
`jiminy_py.viewer.panda3d.panda3d_visualizer.`
`Panda3dApp.append_{shape}` for shape instantiation.
:returns: Dict of type `MarkerDataType`, storing references to the
current pose, scale, and color of the marker, and itself a
reference to `viewer.markers[name]`. Any modification of it
will take effect at next `refresh` call.
"""
# Assert(s) for type checker
assert Viewer.backend is not None
# Make sure the backend supports this method
if not Viewer.backend.startswith('panda3d'):
raise NotImplementedError(
"This method is only supported by Panda3d.")
# Handling of user arguments
if pose is None:
pose = [None, None]
if isinstance(pose, pin.SE3):
pose = [pose.translation, pose.rotation]
if not callable(pose) and any(value is None for value in pose):
pose = list(pose)
if pose[0] is None:
pose[0] = np.zeros(3)
if pose[1] is None:
pose[1] = np.array([0.0, 0.0, 0.0, 1.0])
if isinstance(scale, (float, int)):
scale = np.full((3,), fill_value=scale, dtype=float)
if color is None:
color = self.robot_color
if color is None and shape != 'frame':
color = 'white'
if color is not None and not callable(color):
color = np.asarray(get_color_code(color))
# Remove marker is one already exists and requested
if name in self.markers.keys():
if not remove_if_exists:
raise ValueError(f"marker's name '{name}' already exists.")
self.remove_marker(name)
# Assert(s) for type checker
assert not isinstance(scale, float)
# Add new marker
create_shape = getattr(self._gui, f"append_{shape}")
create_shape(self._markers_group, name, **shape_kwargs)
marker_data: MarkerDataType = {
"pose": pose, "scale": scale, "color": color}
self.markers[name] = marker_data
self._markers_visibility[name] = True
# Make sure the marker always display in front of the model
self._gui.show_node(
self._markers_group, name, True,
always_foreground=always_foreground)
# Refresh the scene if desired
if auto_refresh:
self.refresh()
return marker_data
[docs]
@_with_lock
@_must_be_open
def display_center_of_mass(self, visibility: bool) -> None:
"""Display the position of the center of mass as a sphere.
.. note::
It corresponds to the attribute `com[0]` of the provided
`robot.pinocchio_model`. Calling `Viewer.display` will update it
automatically, while `Viewer.refresh` will not.
:param visibility: Whether to enable or disable display of the center
of mass.
"""
# Assert(s) for type checker
assert Viewer.backend is not None
# Make sure the current backend is supported by this method.
if not Viewer.backend.startswith('panda3d'):
if visibility:
raise NotImplementedError(
"This method is only supported by Panda3d.")
# Return early if not supported but not requested either.
return
for name in self.markers:
if name.startswith("COM_0"):
self._gui.show_node(self._markers_group, name, visibility)
self._markers_visibility[name] = visibility
self._display_com = visibility
[docs]
@_with_lock
@_must_be_open
def display_capture_point(self, visibility: bool) -> None:
"""Display the position of the capture point,also called divergent
component of motion (DCM) as a sphere.
.. note::
Calling `Viewer.display` will update it automatically, while
`Viewer.refresh` will not.
:param visibility: Whether to enable or disable display of the capture
point.
"""
# Assert(s) for type checker
assert Viewer.backend is not None
# Make sure the current backend is supported by this method.
if not Viewer.backend.startswith('panda3d'):
if visibility:
raise NotImplementedError(
"This method is only supported by Panda3d.")
# Return early if not supported but not requested either.
return
# Update visibility
for name in self.markers:
if name == "DCM":
self._gui.show_node(self._markers_group, name, visibility)
self._markers_visibility[name] = visibility
self._display_dcm = visibility
# Must refresh the scene
if visibility:
self.refresh()
[docs]
@_with_lock
@_must_be_open
def display_external_forces(self,
visibility: Union[Sequence[bool], bool]
) -> None:
"""Display external forces applied on the joints the robot, as arrows
of variable length depending of magnitude of the force.
.. warning::
It only display the linear component of the force, while ignoring
the angular part for now.
.. warning::
It corresponds to the attribute `viewer.f_external`. Calling
`Viewer.display` will NOT update its value automatically. It is up
to the user to keep it up-to-date.
.. warning::
This method is only supported by Panda3d.
:param visibility: Whether to display the external force applied at
each joint selectively. If a boolean is provided,
the same visibility will be set for each joint,
alternatively, one can provide a boolean list whose
ordering is consistent with pinocchio model (i.e.
`pinocchio_model.names`).
"""
# Convert boolean visiblity to mask if necessary
if isinstance(visibility, bool):
visibility = (self._client.model.njoints - 1) * (visibility,)
# Assert(s) for type checker
assert Viewer.backend is not None
assert not isinstance(visibility, bool)
# Make sure the current backend is supported by this method.
if not Viewer.backend.startswith('panda3d'):
if any(visibility):
raise NotImplementedError(
"This method is only supported by Panda3d.")
# Return early if not supported but not requested either.
return
# Check that the length of the mask is consistent with the model
assert len(visibility) == self._client.model.njoints - 1, (
"The length of the visibility mask must be equal to the number of "
"joints of the model, 'universe' excluded.")
# Update visibility
for i in range(self._client.model.njoints - 1):
name = f"ForceExternal_{self._client.model.names[i + 1]}"
self._gui.show_node(self._markers_group, name, visibility[i])
self._markers_visibility[name] = visibility[i]
self._display_f_external = list(visibility)
# Must refresh the scene
if any(visibility):
self.refresh()
[docs]
@_with_lock
@_must_be_open
def remove_marker(self, name: str) -> None:
"""Remove a marker, based on its name.
:param identifier: Name of the marker to remove.
"""
if name not in self.markers.keys():
raise ValueError(f"Marker's name '{name}' does not exists.")
self.markers.pop(name)
self._gui.remove_node(self._markers_group, name)
[docs]
@_with_lock
@_must_be_open
@_is_async
def refresh(self,
force_update_visual: bool = False,
force_update_collision: bool = False,
wait: bool = False) -> None:
"""Refresh the configuration of Robot in the viewer.
This method is also in charge of updating the camera placement for
travelling.
.. note::
This method is copy-pasted from `Pinocchio.visualize.*.display`
method, after removing parts responsible of update pinocchio
data and collision data. Visual data must still be updated.
:param force_update_visual: Force update of visual geometries.
:param force_update_collision: Force update of collision geometries.
:param wait: Whether to wait for rendering to finish.
"""
# pylint: disable=invalid-name
# Assert(s) for type checker
assert Viewer.backend is not None
# Extract pinocchio model and data pairs to update
model_list, data_list, model_type_list = [], [], []
if self._client.display_collisions or force_update_collision:
model_list.append(self._client.collision_model)
data_list.append(self._client.collision_data)
model_type_list.append(pin.GeometryType.COLLISION)
if self._client.display_visuals or force_update_visual:
model_list.append(self._client.visual_model)
data_list.append(self._client.visual_data)
model_type_list.append(pin.GeometryType.VISUAL)
# Update geometries placements
for model, data, in zip(model_list, data_list):
pin.updateGeometryPlacements(
self._client.model, self._client.data, model, data)
# Render new geometries placements
if Viewer.backend.startswith('panda3d'):
for geom_model, geom_data, model_type in zip(
model_list, data_list, model_type_list):
pose_dict: Dict[str, FramePoseType] = {}
for i, geom in enumerate(geom_model.geometryObjects):
oMg = geom_data.oMg[i]
x, y, z, qx, qy, qz, qw = SE3ToXYZQUAT(oMg)
group, node_name = self._client.getViewerNodeName(
geom, model_type)
pose_dict[node_name] = ((x, y, z), (qw, qx, qy, qz))
if pose_dict:
self._gui.move_nodes(group, pose_dict)
else:
import umsgpack # pylint: disable=import-outside-toplevel
cmd_data = [b"list"]
for geom_model, geom_data, model_type in zip(
model_list, data_list, model_type_list):
for i, geom in enumerate(geom_model.geometryObjects):
oMg = geom_data.oMg[i]
S = np.diag((*geom.meshScale, 1.0))
H = oMg.homogeneous.dot(S)
node_name = self._client.getViewerNodeName(
geom, model_type)
path = self._gui[node_name].path.lower()
cmd_data += [
b"set_transform",
path.encode("utf-8"),
umsgpack.packb({
"type": "set_transform",
"path": path,
"matrix": list(H.T.flat)
})
]
# Moving all geometries at once using custom command to improve
# throughput due to piping sockets with multiple redirections.
if cmd_data:
self._gui.window.zmq_socket.send_multipart(cmd_data)
self._gui.window.zmq_socket.recv()
# Update the camera placement if necessary
if Viewer._camera_travelling is not None:
if Viewer._camera_travelling['viewer'] is self:
position, rotation = Viewer._camera_travelling['pose']
frame = Viewer._camera_travelling['frame']
if rotation is not None:
self.set_camera_transform(
position, rotation, relative=frame)
else:
self.set_camera_lookat(position, relative=frame)
if Viewer._camera_motion is not None:
self.set_camera_transform(*Viewer._camera_xyzrpy)
# Update pose, color and scale of the markers, if any
if Viewer.backend.startswith('panda3d'):
pose_dict = {}
material_dict: Dict[str, Tuple4FType] = {}
scale_dict: Dict[str, Tuple3FType] = {}
for marker_name, marker_data in self.markers.items():
# Skip marker that are not visible for speedup
if not self._markers_visibility[marker_name]:
continue
# Pose processing
pose = marker_data["pose"]
if callable(pose):
pose = pose()
(x, y, z), orientation = pose
if orientation.ndim > 1:
qx, qy, qz, qw = pin.Quaternion(orientation).coeffs()
else:
qx, qy, qz, qw = orientation
pose_dict[marker_name] = ((x, y, z), (qw, qx, qy, qz))
# Color processing
color = marker_data["color"]
if callable(color):
color = color()
if color is not None:
r, g, b, a = color
material_dict[marker_name] = (2.0 * r, 2.0 * g, 2.0 * b, a)
# Scale processing
scale = marker_data["scale"]
if callable(scale):
scale = scale()
scale_dict[marker_name] = scale
self._gui.move_nodes(self._markers_group, pose_dict)
self._gui.set_materials(self._markers_group, material_dict)
self._gui.set_scales(self._markers_group, scale_dict)
# Wait for the backend viewer to finish rendering if requested
if wait:
Viewer.wait(require_client=False)
[docs]
@_must_be_open
def display(self,
q: np.ndarray,
v: Optional[np.ndarray] = None,
xyz_offset: Optional[np.ndarray] = None,
update_hook: Optional[Callable[[], None]] = None,
wait: bool = False) -> None:
"""Update the configuration of the robot.
.. warning::
It will alter original robot data if viewer attribute
`use_theoretical_model` is false.
:param q: Configuration of the robot.
:param v: Velocity of the robot. Used only to update velocity
dependent markers such as DCM. `None` if undefined.
Optional: `None` by default.
:param xyz_offset: Freeflyer position offset. Note that it does not
check for the robot actually have a freeflyer.
:param update_hook: Callable that will be called right after updating
kinematics data. `None` to disable.
Optional: None by default.
:param wait: Whether to wait for rendering to finish.
"""
assert (self._client.model.nq,) == q.shape[:1], (
"The configuration vector does not have the right size.")
# Make sure the state is valid
if np.isnan(q).any() or (v is not None and np.isnan(v).any()):
raise ValueError("The input state ('q','v') contains 'nan'.")
# Apply offset on the freeflyer, if requested.
# Note that it is NOT checking that the robot has a freeflyer.
if xyz_offset is not None:
q = q.copy() # Make a copy to avoid altering the original data
q[:3] += xyz_offset
# Update pinocchio data
pin.framesForwardKinematics(self._client.model, self._client.data, q)
if v is None:
pin.centerOfMass(self._client.model, self._client.data, q, False)
else:
pin.centerOfMass(
self._client.model, self._client.data, q, v, False)
# Call custom update hook
if update_hook is not None:
update_hook()
# Refresh the viewer
self.refresh(wait=wait)
[docs]
@_must_be_open
def replay(self,
trajectory: Trajectory,
time_interval: Union[
np.ndarray, Tuple[float, float]] = (0.0, np.inf),
speed_ratio: float = 1.0,
xyz_offset: Optional[np.ndarray] = None,
update_hook: Optional[Callable[
[float, np.ndarray, np.ndarray], None]] = None,
enable_clock: bool = False,
wait: bool = False) -> None:
"""Replay a complete robot trajectory at a given real-time ratio.
.. note::
Specifying 'udpate_hook' is necessary to be able to display sensor
information such as contact forces. It will be automatically
disable otherwise.
.. warning::
It will alter original robot data if viewer attribute
`use_theoretical_model` is false.
:param states: Sequence of `State` objects of increasing time.
:param time_interval: Specific time interval to replay.
Optional: Complete evolution by default [0, inf].
:param speed_ratio: Real-time factor.
Optional: No time dilation by default (1.0).
:param xyz_offset: Freeflyer position offset. Note that it does not
check for the robot actually have a freeflyer.
Optional: None by default.
:param update_hook: Callable that will be called periodically between
every state update. `None` to disable, otherwise it
must have the following signature:
.. code-block:: python
f(t:float, q: ndarray, v: ndarray) -> None
Optional: No update hook by default.
:param wait: Whether to wait for rendering to finish.
"""
# Early return if nothing to replay
if not trajectory.has_data:
return
# Sanitize replay time interval
t_start, t_end = trajectory.time_interval
t_start, t_end = (min(max(t, t_start), t_end) for t in time_interval)
# Disable display of sensor data if no update hook is provided
disable_display_contact_forces = False
if update_hook is None and self._display_contact_forces:
disable_display_contact_forces = True
self.display_contact_forces(False)
# Disable display of DCM if no velocity data provided
disable_display_dcm = False
if not trajectory.has_velocity and self._display_dcm:
disable_display_dcm = True
self.display_capture_point(False)
# Replay the whole trajectory at constant speed ratio
t = t_start
time_init = time.time()
time_prev = time_init
v, update_hook_t = None, None
while True:
try:
# Update clock if enabled
if enable_clock:
Viewer.set_clock(t)
# Compute state at current time
state = trajectory.get(t, mode="raise")
# Update viewer force buffer
if state.f_external is not None:
for f_ref, f_i in zip(
self.f_external, state.f_external[1:]):
f_ref.vector[:] = f_i
# Update camera motion
if Viewer._camera_motion is not None:
Viewer._camera_xyzrpy = Viewer._camera_motion(t)
# Update display
if update_hook is not None:
update_hook_t = partial(update_hook, t, state.q, state.v)
self.display(state.q, v, xyz_offset, update_hook_t, wait)
# Sleep for a while if computing faster than display framerate
sleep(1.0 / REPLAY_FRAMERATE - (time.time() - time_prev))
time_prev = time.time()
# Break replay loop if the final time has been reached
if t_end - t < 1e-10:
break
# Update simulation time, taking into account speed ratio
time_elapsed = time_prev - time_init
t = min(t_start + speed_ratio * time_elapsed, t_end)
# Waiting for the first timestep is enough
wait = False
except Exception as e:
# Get backend info to analyze the root cause of the exception
backend = Viewer.backend
is_open = self.is_open() # type: ignore[misc]
# If no backend, probably it has been closed in the meantime
# during multi-threaded replay. So just return without error.
if backend is None:
return
# Make sure the viewer is always properly closed
Viewer.close()
# Once again, the backend was most likely killed on purpose
# during multi-threaded replay. Returning without error.
if not is_open:
return
# Re-raise the original exception if unexpected
if backend.startswith('panda3d'):
if isinstance(e, ViewerClosedError):
return
elif backend == 'meshcat':
import zmq # pylint: disable=import-outside-toplevel
if isinstance(e, (zmq.error.Again, zmq.error.ZMQError)):
return
raise
# Restore Viewer's state if it has been altered
if Viewer.is_alive():
# Disable clock after replay if enabled
if enable_clock:
Viewer.set_clock()
# Restore display if necessary
if disable_display_contact_forces:
self.display_contact_forces(True)
if disable_display_dcm:
self.display_capture_point(True)