Source code for jiminy_py.plot
# mypy: disable-error-code="attr-defined"
"""This modules provides a set of tools to visualize telemetry log data in a
convenient and portable way. These tools are designed to work on any platform
without root permission, including in local Jupyter notebook, VS Code, and
Google Colaboratory. No graphical server is required for offscreen rendering.
"""
import os
import sys
import fnmatch
import pathlib
import argparse
import logging
from dataclasses import dataclass
from math import ceil, sqrt, floor
from textwrap import dedent
from itertools import cycle
from functools import partial
from collections import OrderedDict
from weakref import WeakKeyDictionary
from typing import (
Dict, Any, List, Optional, Tuple, Union, Callable, Type, cast)
import numpy as np
try:
import matplotlib
import matplotlib.pyplot as plt
except ImportError as e:
raise ImportError(
"Submodule not available. Please install 'jiminy_py[plot]'.") from e
except RuntimeError as e:
# You can get a runtime error if Matplotlib is installed but cannot be
# imported because of some conflicts with jupyter event loop for instance.
raise ImportError("Matplotlib cannot be imported.") from e
from matplotlib import colors
from matplotlib.axes import Axes
from matplotlib.lines import Line2D
from matplotlib.artist import Artist
from matplotlib.legend import Legend
from matplotlib.widgets import Button
from matplotlib.transforms import Bbox
from matplotlib.backend_bases import Event, LocationEvent
from matplotlib.backends.backend_pdf import PdfPages
import jiminy_py.core as jiminy
from .core import ( # pylint: disable=no-name-in-module
EncoderSensor, EffortSensor, ContactSensor, ForceSensor, ImuSensor)
from .log import (read_log,
extract_variables_from_log,
build_robot_from_log)
from .viewer import interactive_mode
SENSORS_FIELDS: Dict[
Type[jiminy.AbstractSensor], Union[List[str], Dict[str, List[str]]]
] = {
EncoderSensor: EncoderSensor.fieldnames,
EffortSensor: EffortSensor.fieldnames,
ContactSensor: ContactSensor.fieldnames,
ForceSensor: {
k: [e[len(k):] for e in ForceSensor.fieldnames if e.startswith(k)]
for k in ['F', 'M']
},
ImuSensor: {
k: [e[len(k):] for e in ImuSensor.fieldnames if e.startswith(k)]
for k in ['Quat', 'Gyro', 'Accel']
}
}
class _ButtonBlit(Button):
def _motion(self, event: Event) -> None:
if self.ignore(event):
return
c = self.hovercolor if event.inaxes == self.ax else self.color
if not colors.same_color(c, self.ax.get_facecolor()):
self.ax.set_facecolor(c)
if self.drawon:
# It is necessary to flush events beforehand to make sure
# figure refresh cannot get interrupted by button blitting.
# Otherwise the figure would be blank.
# FIXME: `flush_events` on 'matplotlib>=3.8' causes deadlock
assert self.ax.figure is not None
# self.ax.figure.canvas.flush_events()
self.ax.draw_artist(self.ax)
self.ax.figure.canvas.blit(self.ax.bbox)
[docs]
@dataclass
class TabData:
"""Internal data stored for each tab if `TabbedFigure`.
"""
axes: List[Axes]
"""Matpolib `Axes` handles of every subplots.
"""
legend_data: Tuple[List[Artist], List[str]]
"""Matpolib Legend data as returned by `get_legend_handles_labels` method.
First element of pair is a list of Artist to legend, which usually are
`line2D` object, and the second element is the list of labels for each of
them.
"""
button: _ButtonBlit
"""Button on which to click to switch to the tab at hands.
"""
button_axcut: Axes
"""Axe of the button.
.. note::
This attribute is used internally to define the position and size of
the button.
"""
nav_stack: List[WeakKeyDictionary]
"""Matplotlib `NavigationToolbar2` navigation stack history.
.. note::
This attribute is used internally for undo/redo history. It is not
meant to be modified manually, but only copied/restored when needed.
"""
nav_pos: int
"""Matplotlib `NavigationToolbar2` navigation stack position.
.. note::
This attribute is used internally for undo/redo history. It is not
meant to be modified manually, but only copied/restored when needed.
"""
[docs]
class TabbedFigure:
"""A windows with several tabs holding matplotlib subplots. It enables
adding, modifying and removing tabs sequentially and conveniently.
.. note::
It has been designed to be cross-platform, and supported by any
Matplotlib backend. So it can be used on-the-spot right after fresh
install of Python and `jiminy_py`, without requiring elevated
privilege to install Qt4/5 or Tkinter.
.. warning::
It only supports plotting time series, the time corresponding to the
horizontal axis of every subplot.
"""
def __init__(self, # pylint: disable=unused-argument
sync_tabs: bool = False,
window_title: str = "jiminy",
offscreen: bool = False,
**kwargs: Any) -> None:
"""Create empty tabbed figure.
.. note::
It will show-up on display automatically only adding the first tab.
:param sync_tabs: Synchronize time window on every tabs (horizontal
axis), rather than independently for every tabs.
:param window_title: Desired indow title.
Optional: "jiminy" by default.
:param offscreen: Whether to enable display on screen of figure.
Optional: False by default.
:param kwargs: Unused extra keyword arguments to enable forwarding.
"""
# Backup user arguments
self.sync_tabs = sync_tabs
self.offscreen = offscreen
# Warn the user if Matplotlib backend is not fully compatible
if interactive_mode() >= 2 and matplotlib.get_backend() != 'nbAgg':
msg = (
"Matplotlib's 'widget' and 'inline' backends are not properly "
"supported.")
if interactive_mode() == 2:
msg += (
" Please add '%matplotlib notebook' at the top and "
"restart the kernel.")
logging.warning(msg)
# Internal state buffers
self.figure = plt.figure(layout="constrained")
self.legend: Optional[Legend] = None
self.ref_ax: Optional[Axes] = None
self.tabs_data: Dict[str, TabData] = {}
self.tab_active: Optional[TabData] = None
self.bbox_inches: Bbox = Bbox([[0.0, 0.0], [1.0, 1.0]])
# Set window title
if not self.offscreen:
assert self.figure.canvas.manager is not None
self.figure.canvas.manager.set_window_title(window_title)
# Customize figure subplot layout and reserve space for buttons
# self.figure.get_layout_engine().set(w_pad=0.1, h_pad=0.1)
self.subfigs = self.figure.subfigures(
2, 1, wspace=0.1, height_ratios=[0.94, 0.06])
# Set window size
if self.offscreen:
self.figure.set_size_inches(18, 12)
else:
self.figure.set_size_inches(12, 8)
# Register 'on resize' event callback to adjust layout
self.figure.canvas.mpl_connect('resize_event', self.adjust_layout)
def __del__(self) -> None:
self.close()
[docs]
def adjust_layout(self,
event: Optional[ # pylint: disable=unused-argument
Event] = None, *,
refresh_canvas: bool = False) -> None:
"""Optimize buttons width and grid subplot arrangement of the active
tab for readability based on the current window size. Then, adjust
margins to maximize plot sizes.
:param event: Event spent by figure `mpl_connect` 'resize_event'.
:param refresh_canvas: Force redrawing figure canvas.
"""
# No active tab (probably because there is none). Returning early.
if self.tab_active is None:
return
# Compute figure area for later export
bbox = Bbox(((0.0, 0.07), (1.0, 1.0)))
bbox_pixels = bbox.transformed(self.figure.transFigure)
self.bbox_inches = bbox_pixels.transformed(
self.figure.dpi_scale_trans.inverted())
# Refresh button size, in case the number of tabs has changed
buttons_width = (1.0 - 0.006) / len(self.tabs_data)
for i, tab in enumerate(self.tabs_data.values()):
tab.button_axcut.set_position(
(buttons_width * i + 0.003, 0.1, buttons_width, 1.0))
# Re-arrange subplots in case figure aspect ratio has changed
axes = self.tab_active.axes
num_subplots = len(axes)
figure_extent = self.figure.get_window_extent()
figure_ratio = figure_extent.width / figure_extent.height
num_rows_1 = max(1, floor(sqrt(num_subplots / figure_ratio)))
num_cols_1 = ceil(num_subplots / num_rows_1)
num_cols_2 = ceil(sqrt(num_subplots * figure_ratio))
num_rows_2 = ceil(num_subplots / num_cols_2)
if num_rows_1 * num_cols_1 < num_rows_2 * num_cols_2:
num_rows, num_cols = map(int, (num_rows_1, num_cols_1))
else:
num_rows, num_cols = map(int, (num_rows_2, num_cols_2))
grid_spec = self.subfigs[0].add_gridspec(num_rows, num_cols)
for i, ax in enumerate(axes, 1):
ax.set_subplotspec(grid_spec[i - 1])
# Refresh figure canvas if requested
if refresh_canvas:
self.refresh()
def __click(self, event: Event, force_update: bool = False) -> None:
"""Event handler used internally to switch tab when a button is
pressed.
.. warning::
This method is not supposed to be called manually. Please call
`select_active_tab` for selecting tab instead.
"""
# Assert(s) for type checker
assert self.tab_active is not None
# Get tab name to activate
for tab in self.tabs_data.values():
button = tab.button
if button.ax == event.inaxes:
tab_name = button.label.get_text().replace('\n', ' ')
break
# Early return if already active
if not force_update and self.tab_active is self.tabs_data[tab_name]:
return
# Backup navigation history if any
if not self.offscreen:
assert self.figure.canvas.toolbar is not None
cur_stack = self.figure.canvas.toolbar._nav_stack
for tab in self.tabs_data.values():
if self.sync_tabs or tab is self.tab_active:
tab.nav_stack = cur_stack._elements.copy()
tab.nav_pos = cur_stack._pos
# Update axes and title
for ax in self.tab_active.axes:
self.subfigs[0].delaxes(ax)
if self.legend is not None:
self.legend.remove()
self.legend = None
self.tab_active = self.tabs_data[tab_name]
self.subfigs[0].suptitle(tab_name)
for ax in self.tab_active.axes:
self.subfigs[0].add_subplot(ax)
handles, labels = self.tab_active.legend_data
if labels:
self.legend = self.subfigs[0].legend(
handles, labels, ncol=len(handles), loc='outside lower center')
# # Restore navigation history and toolbar state if necessary
if not self.offscreen:
assert self.figure.canvas.toolbar is not None
cur_stack._elements = self.tab_active.nav_stack
cur_stack._pos = self.tab_active.nav_pos
self.figure.canvas.toolbar.set_history_buttons()
# Update buttons style
for tab in self.tabs_data.values():
button = tab.button
if tab is self.tab_active:
button.ax.set_facecolor('green')
button.color = 'green'
button.hovercolor = 'green'
else:
button.ax.set_facecolor('white')
button.color = 'white'
button.hovercolor = '0.95'
# Adjust layout and refresh figure
self.adjust_layout(refresh_canvas=True)
# FIXME: `flush_events` on 'matplotlib>=3.8' causes deadlock
# self.figure.canvas.flush_events()
[docs]
def add_tab(self, # pylint: disable=unused-argument
tab_name: str,
time: np.ndarray,
data: Union[np.ndarray, Dict[str, Union[
Dict[str, np.ndarray], np.ndarray]]],
plot_method: Optional[
Union[Callable[..., Any], str]] = None, *,
refresh_canvas: bool = True,
**kwargs: Any) -> None:
"""Create a new tab holding the provided data.
Each tab holds exactly one grid of subplots. There is one subplot for
each time series that has been provided, and all of them having to be
associated with the exact same time sequence. The layout is dynamically
optimized for readability.
The added tab will only be selected as the active one automatically if
there were no tab beforehand.
:param tab_name: Name of the tab to be added. It must be a unique
identifier not already used for another tab. It will
be displayed as label for the buttons used to select
the active tab.
:param time: Unique time sequence associated with the provided time
series. It does not have to be evenly spaced but must be
monotonically increasing.
:param data: Set of time series to plot. If a simple array is provided,
then there will be only one subplot, with one (unlabeled)
line if the array is 1D, one per column otherwise. If a
dictionary of arrays is provided, there is one subplot per
item, the key being used as label and the value is treated
as simple array. Finally, in case of a nested dictionary,
each sub-value must be a 1D array and sub-keys will be
used to label individual lines.
:param plot_method: Callable method taking axis object, time, and data
array in argument, or string instance method of
`matplotlib.axes.Axes`.
Optional: `step(..., where='post')` by default.
:param refresh_canvas: Whether to refresh the figure. This step can be
skipped if other tabs are going to be added or
deleted soon, to avoid useless computation and
figure flickering.
Optional: True by default.
"""
# Make sure that the time sequence is valid
assert (np.diff(time) >= 0.0).all(), (
"The time sequence must be monotonically increasing.")
# Make sure that the provided tab name does not exist already
assert tab_name not in self.tabs_data.keys(), (
"There is already one tab with the exact same name. Please remove "
"it explicitly before replacing it by a another one.")
# Handle default arguments and converters
if plot_method is None:
plot_method = partial(Axes.step, where='post')
if isinstance(plot_method, str):
plot_method = getattr(Axes, plot_method)
assert callable(plot_method)
if isinstance(data, dict):
# Compute plot grid arrangement
n_cols = len(data)
n_rows = 1
while n_cols > n_rows + 2:
n_rows = n_rows + 1
n_cols = int(np.ceil(len(data) / n_rows))
# Initialize axes, and early return if none
axes: List[plt.Axes] = []
ref_ax = self.ref_ax if self.sync_tabs else None
for i, plot_name in enumerate(data.keys()):
uniq_label = '_'.join((tab_name, plot_name))
ax = self.subfigs[0].add_subplot(
n_rows, n_cols, i+1, label=uniq_label)
ax.autoscale(True, axis='x', tight=True)
ax.autoscale(True, axis='y', tight=False)
ax.ticklabel_format(axis='x', style='plain', useOffset=True)
ax.ticklabel_format(
axis='y', style='sci', scilimits=(-3, 3), useOffset=False)
if self.tabs_data:
self.subfigs[0].delaxes(ax)
if ref_ax is not None:
ax.sharex(ref_ax)
else:
ref_ax = ax
axes.append(ax)
if self.sync_tabs:
self.ref_ax = ref_ax
if not axes:
return
# Update their content
for (plot_name, plot_data), ax in zip(data.items(), axes):
if isinstance(plot_data, dict):
for line_name, line_data in plot_data.items():
assert line_data.size == time.size
plot_method(ax, time, line_data, label=line_name)
else:
plot_method(ax, time, plot_data)
ax.set_title(plot_name, fontsize='medium')
ax.grid(True)
else:
# Draw single figure instead of subplot
ax = self.subfigs[0].add_subplot(1, 1, 1, label=tab_name)
plot_method(ax, time, data)
if self.tabs_data:
self.subfigs[0].delaxes(ax)
ax.autoscale(enable=True, axis='both', tight=True)
ax.grid(True)
axes = [ax]
# Get unique legend for every subplots
legend_data = ax.get_legend_handles_labels()
# Add buttons to show/hide information
uniq_label = '_'.join((tab_name, "button"))
button_axcut = self.subfigs[1].add_axes(
[0.0, 0.0, 0.0, 0.0], label=uniq_label)
button = _ButtonBlit(button_axcut,
tab_name.replace(' ', '\n'),
color='white')
# Register buttons events
button.on_clicked(self.__click)
# Create new tab data container
self.tabs_data[tab_name] = TabData(
axes, legend_data, button, button_axcut, nav_stack=[], nav_pos=-1)
# Check if it is the first tab to be added
if self.tab_active is None:
# Set new tab has the active one if none before
self.tab_active = self.tabs_data[tab_name]
# Show tab without blocking
for ax in axes:
ax.set_visible(True)
self.subfigs[0].suptitle(tab_name)
handles, labels = legend_data
if labels:
self.legend = self.subfigs[0].legend(
handles, labels, loc='outside lower center')
button.ax.set_facecolor('green')
button.color = 'green'
button.hovercolor = 'green'
# Update figure and show it without blocking if not done automatically
self.adjust_layout(refresh_canvas=refresh_canvas)
if not self.offscreen and interactive_mode() < 2:
self.figure.show()
[docs]
def select_active_tab(self, tab_name: str) -> None:
"""Select the active tab.
A single tab is considered active at a time.
:param tab_name: Name of the tab to select. It must be to one of the
names that has been specified when calling `add_tab`
previously.
"""
# Make sure that the provided tab name exists
assert tab_name in self.tabs_data.keys(), (
"No tab with this exact name has been added.")
event = LocationEvent("click", self.figure.canvas, 0, 0)
event.inaxes = self.tabs_data[tab_name].button.ax
self.__click(event, force_update=True)
[docs]
def remove_tab(self,
tab_name: str, *,
refresh_canvas: bool = True) -> None:
"""Remove a given tab.
If the removed tab was the active one, the first tab that has been
added while be made active from now on.
:param tab_name: Name of the tab to remove. It must be to one of the
names that has been specified when calling `add_tab`
previously.
:param refresh_canvas: Whether to refresh the figure. This step can be
skipped if other tabs are going to be added or
deleted soon, to avoid useless computation and
figure flickering.
Optional: True by default.
"""
# Assert(s) for type checker
assert self.tab_active is not None
# Reset current tab if it is the removed one
tab = self.tabs_data.pop(tab_name)
if tab is self.tab_active and self.tabs_data:
self.select_active_tab(next(iter(self.tabs_data.keys())))
# Change reference axis if to be deleted
if any(ax is self.ref_ax for ax in tab.axes):
if self.tabs_data:
self.ref_ax = self.tab_active.axes[0]
else:
self.ref_ax = None
# Disable button
tab.button.disconnect_events()
tab.button_axcut.remove()
# Remove axes and legend manually is not more tabs available
if not self.tabs_data:
if self.subfigs[0]._suptitle is not None:
self.subfigs[0]._suptitle.remove()
self.subfigs[0]._suptitle = None
for ax in tab.axes:
ax.remove()
if self.legend is not None:
self.legend.remove()
self.legend = None
# Refresh figure
self.adjust_layout(refresh_canvas=refresh_canvas)
[docs]
def clear(self) -> None:
"""Remove all tabs at once.
"""
# Remove every figure axes
for tab_name in list(self.tabs_data.keys()): # list to make copy
self.remove_tab(tab_name, refresh_canvas=False)
self.refresh()
[docs]
def save_tab(self, pdf_path: str) -> None:
"""Export the active tab in a single-page PDF file, excluding tab
buttons. Lines are stored as vector instead of being rasterized.
:param pdf_path: Desired location for generated pdf file.
"""
pdf_path = str(pathlib.Path(pdf_path).with_suffix('.png'))
self.subfigs[0].savefig(
pdf_path, format='pdf', bbox_inches=self.bbox_inches)
[docs]
def save_all_tabs(self, pdf_path: str) -> None:
"""Export the whole figure in a single PDF file containing one page per
tab and excluding systematically the tab buttons.
.. seealso::
See `save_tab` documentation for details.
:param pdf_path: Desired location for generated pdf file.
"""
pdf_path = str(pathlib.Path(pdf_path).with_suffix('.pdf'))
with PdfPages(pdf_path) as pdf:
for tab_name in self.tabs_data.keys():
self.select_active_tab(tab_name)
pdf.savefig(bbox_inches=self.bbox_inches)
[docs]
@classmethod
def plot(cls,
time: np.ndarray,
tabs_data: Dict[str, Union[np.ndarray, Dict[str, Union[
Dict[str, np.ndarray], np.ndarray]]]],
pdf_path: Optional[str] = None,
**kwargs: Any) -> "TabbedFigure":
"""Create a new tabbed figure along with multiple tabs holding the
provided data, then eventually export it as PDF.
:param time: Unique time sequence associated with the provided time
series. It does not have to be evenly spaced but must be
monotonically increasing.
:param tabs_data: Set of time series to plot in multiple tabs, as a
nested dictionary. There will be one tab per item,
the key and value being the name and the data of the
tab, respectively. See `add_tab` documentation about
how the data are displayed based on their structure.
:param pdf_path: If specified, the whole figure will be exported in a
PDF file at the desired location without rendering on
screen. See `save_all_tabs` documentation for details.
Optional: `None` by default.
:param kwargs: Extra keyword arguments to forward to `add_tab` method.
"""
tabbed_figure = cls(**{
"offscreen": pdf_path is not None, **kwargs})
for name, data in tabs_data.items():
tabbed_figure.add_tab(
name, time, data, **kwargs, refresh_canvas=False)
tabbed_figure.refresh()
if pdf_path is not None:
tabbed_figure.save_all_tabs(pdf_path)
return tabbed_figure
[docs]
def plot_log(log_data: Dict[str, Any],
robot: Optional[jiminy.Robot] = None,
enable_flexiblity_data: bool = False,
block: Optional[bool] = None,
**kwargs: Any) -> TabbedFigure:
"""Display standard simulation data over time.
The figure features several tabs:
- Subplots with robot configuration
- Subplots with robot velocity
- Subplots with robot acceleration
- Subplots with motors torques
- Subplots with raw sensor data (one tab for each type of sensor)
:param log_data: Logged data (constants and variables) as a dictionary.
:param robot: Jiminy robot associated with the logged trajectory.
Optional: None by default. If None, then it will be
reconstructed from 'log_data' using `build_robot_from_log`.
:param enable_flexiblity_data:
Enable display of flexibility joints in robot's configuration,
velocity and acceleration subplots.
Optional: False by default.
:param block: Whether to wait for the figure to be closed before
returning. Non-op for offscreen rendering and notebooks.
Optional: False in interactive mode, True otherwise.
:param kwargs: Extra keyword arguments to forward to `TabbedFigure`.
"""
# Blocking by default if not interactive
if block is None:
block = interactive_mode() == 0
# Extract log data
if not log_data:
raise RuntimeError("No data to plot.")
log_vars: Dict[str, np.ndarray] = log_data["variables"]
# Build robot from log if necessary
if robot is None:
robot = build_robot_from_log(log_data)
# Figures data structure as a dictionary
tabs_data: Dict[
str, Dict[str, Union[np.ndarray, Dict[str, np.ndarray]]]
] = OrderedDict()
# Get time and robot positions, velocities, accelerations and efforts
time = log_vars["Global.Time"]
for fields_type in ("Position", "Velocity", "Acceleration", "Effort"):
fieldnames: List[str] = getattr(robot, "_".join((
"log", fields_type.lower(), "fieldnames")))
if not enable_flexiblity_data:
# Filter out flexibility data
fieldnames = list(filter(
lambda field: not any(
name in field
for name in robot.flexibility_joint_names),
fieldnames))
try:
values = extract_variables_from_log(
log_vars, fieldnames, namespace=robot.name, as_dict=True)
tabs_data[' '.join(("State", fields_type))] = OrderedDict(
(field[7:].replace(fields_type, ""), elem)
for field, elem in values.items())
except KeyError:
# Variable has not been recorded and is missing in log file
pass
# Get command information
try:
command = extract_variables_from_log(
log_vars, robot.log_command_fieldnames, namespace=robot.name)
tabs_data['Command'] = OrderedDict(
zip((motor.name for motor in robot.motors), command))
except KeyError:
# Variable has not been recorded and is missing in log file
pass
# Get sensors information
for sensors_class, sensors_fields in SENSORS_FIELDS.items():
sensors_type: str = cast(str, sensors_class.type)
sensor_names = tuple(
sensor.name for sensor in robot.sensors.get(sensors_type, []))
if not sensor_names:
continue
if isinstance(sensors_fields, dict):
for fields_prefix, fieldnames in sensors_fields.items():
try:
type_name = ' '.join((sensors_type, fields_prefix))
data_nested = [
extract_variables_from_log(log_vars, ['.'.join(
(sensors_type, name, fields_prefix + field))
for name in sensor_names], robot.name)
for field in fieldnames]
tabs_data[type_name] = OrderedDict(
(field, OrderedDict(zip(sensor_names, data)))
for field, data in zip(fieldnames, data_nested))
except KeyError:
# Variable has not been recorded and is missing in log file
pass
else:
for field in sensors_fields:
try:
type_name = ' '.join((sensors_type, field))
data = extract_variables_from_log(log_vars, [
'.'.join((sensors_type, name, field))
for name in sensor_names], robot.name)
tabs_data[type_name] = OrderedDict(zip(
sensor_names, data))
except KeyError:
# Variable has not been recorded and is missing in log file
pass
# Create figure, without closing the existing one
figure = TabbedFigure.plot(
time, tabs_data, **{ # type: ignore[arg-type]
"plot_method": "plot", "sync_tabs": True, **kwargs})
# Show the figure if appropriate, blocking if necessary
if not figure.offscreen:
plt.show(block=block)
return figure
def plot_log_interactive() -> None:
"""Main CLI entry-point for plotting log data using matplotlib.
"""
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
description=dedent("""\
Plot data from a jiminy log file using matplotlib.
Specify a list of fields to plot, separated by a colon for
plotting on the same subplot.
Example: h1 h2:h3:h4 generates two subplots, one with h1, one
with h2, h3, and h4.
Wildcard token '*' can be used. In such a case:
- If *h2* matches several fields:
each field will be plotted individually in subplots.
- If :*h2* or :*h2*:*h3*:*h4* matches several fields:
each field will be plotted in the same subplot.
- If *h2*:*h3*:*h4* matches several fields:
each match of h2, h3, and h4 will be plotted jointly in subplots.
Note that if the number of matches for h2, h3, h4 differs, only
the minimum number will be plotted.
Enter no plot command (only the file name) to view the list of
fields available inside the file."
"""))
parser.add_argument("input", help="Input logfile.")
parser.add_argument(
"-c", "--compare", type=str, default=None, help=dedent("""\
Colon-separated list of comparison log files.
The same data as the original log will be plotted in the same
subplot, with different line styes. These logfiles must be of the
same length and contain the same header as the original log file.
Note that you can click on the figure top legend to show / hide
data from specific files.
"""))
main_arguments, plotting_commands = parser.parse_known_args()
# Load log file
main_fullpath = main_arguments.input
log_data = read_log(main_fullpath)
log_vars = log_data["variables"]
# If no plotting commands, display the list of headers instead
if len(plotting_commands) == 0:
print("Available data:", *map(
lambda s: f"- {s}", log_vars.keys()), sep="\n")
sys.exit(0)
# Load all comparison logs, if any
compare_data: Dict[str, Dict[str, np.ndarray]] = OrderedDict()
if main_arguments.compare is not None:
for fullpath in main_arguments.compare.split(':'):
if fullpath == main_fullpath or fullpath in compare_data.keys():
raise RuntimeError(
"All log files must be unique when comparing them.")
log_data = read_log(fullpath)
compare_data[fullpath] = log_data["variables"]
# Define line style cycle used for logs comparison
linestyles = ("--", "-.", ":")
# Parse plotting arguments
plotted_elements = []
for cmd in plotting_commands:
# Check that the command is valid, i.e. that all elements exits.
# If it is the case, add it to the list.
same_subplot = cmd[0] == ':'
headers = cmd.strip(':').split(':')
# Expand each element according to wildcard expression
matching_fieldnames = []
for header in headers:
match = sorted(fnmatch.filter(log_vars.keys(), header))
if len(match) > 0:
matching_fieldnames.append(match)
else:
print(f"No matching headers for expression {header}")
if len(matching_fieldnames) == 0:
continue
# Compute number of subplots
if same_subplot:
plotted_elements.append([
e for l_sub in matching_fieldnames for e in l_sub])
else:
n_subplots = min(len(header) for header in matching_fieldnames)
for i in range(n_subplots):
plotted_elements.append(
[header[i] for header in matching_fieldnames])
# Create figure
n_plot = len(plotted_elements)
if not n_plot:
print("Nothing to plot. Exiting...")
return
fig = plt.figure(layout="constrained")
# Set window title
assert fig.canvas.manager is not None
fig.canvas.manager.set_window_title(main_arguments.input)
# Set window size
fig.set_size_inches(14, 8)
# Create subplots, arranging them in a rectangular fashion.
# Do not allow for n_cols to be more than n_rows + 2.
n_cols = n_plot
n_rows = 1
while n_cols > n_rows + 2:
n_rows = int(n_rows + 1)
n_cols = int(np.ceil(n_plot / (1.0 * n_rows)))
axes = fig.subplots(n_rows, n_cols, sharex=True, squeeze=False).flat[:]
# Store lines in dictionary {file_name: plotted lines}, to enable to
# toggle individually the visibility the data related to each of them.
main_name = os.path.basename(main_arguments.input)
plotted_lines: Dict[str, List[Line2D]] = {main_name: []}
for c in compare_data:
plotted_lines[os.path.basename(c)] = []
# Plot each element
t = log_vars['Global.Time']
for ax, plotted_elem in zip(axes, plotted_elements):
for name in plotted_elem:
line = ax.step(t, log_vars[name], label=name)
plotted_lines[main_name].append(line[0])
linecycler = cycle(linestyles)
for c in compare_data:
line = ax.step(compare_data[c]['Global.Time'],
compare_data[c][name],
next(linecycler),
color=line[0].get_color())
plotted_lines[os.path.basename(c)].append(line[0])
# Add legend and grid for each plot
for ax, plotted_elem in zip(axes, plotted_elements):
ax.set_xlabel('time (s)')
if len(plotted_elem) > 1:
ax.legend()
else:
ax.set_title(plotted_elem[0], fontsize='medium')
ax.grid(True)
# If a compare plot is present, add overall legend specifying line types
if len(compare_data) > 0:
linecycler = cycle(linestyles)
# Dictionary: line in legend to log name
legend_lines = {Line2D([0], [0], color='k'): main_name}
for data_str in compare_data:
legend_line_object = Line2D(
[0], [0], color='k', linestyle=next(linecycler))
legend_lines[legend_line_object] = os.path.basename(data_str)
legend = fig.legend(
legend_lines.keys(), legend_lines.values(), ncol=3,
loc='outside lower center')
# Create a dict {picker: log name} for legend lines and labels
picker_to_name = {}
for legline, legtxt, name in zip(
legend.get_lines(), legend.get_texts(), legend_lines.values()):
legline.set_picker(10) # 10 pts tolerance
picker_to_name.update({legline: name, legtxt: name})
# Make legend interactive
def legend_clicked(event: Event) -> None:
file_name = picker_to_name[event.artist]
for line in plotted_lines[file_name]:
line.set_visible(not line.get_visible())
fig.canvas.draw()
fig.canvas.mpl_connect('pick_event', legend_clicked)
plt.show(block=True)