"""
This file defines a couple of convenience functions for adding in the watcher
to a target diagram builder. These functions should be useful for 99% of the
users of the DiagramWatcher class.
"""
from typing import List, Tuple, Union
import numpy as np
from pydrake.all import DiagramBuilder, Diagram
from pydrake.systems.framework import Context
from brom_drake.DiagramTarget import DiagramTarget
from brom_drake.DiagramWatcher import DiagramWatcher, DiagramWatcherOptions
from brom_drake import directories
from brom_drake.PortWatcher.port_watcher_options import (
PortFigureArrangement,
PortWatcherPlottingOptions,
PortWatcherRawDataOptions,
FigureNamingConvention,
PortWatcherOptions,
)
PotentialTargetTypes = List[
Union[
str,
Tuple[str, int],
Tuple[int, int],
Tuple[str, str],
Tuple[str, List[int]],
Tuple[int, List[int]],
Tuple[str, List[str]],
],
]
[docs]
def add_watcher(
builder: DiagramBuilder,
targets: PotentialTargetTypes = None,
watcher_dir: str = directories.DEFAULT_WATCHER_DIR,
plot_arrangement: PortFigureArrangement = PortFigureArrangement.OnePlotPerPort,
figure_naming_convention: FigureNamingConvention = FigureNamingConvention.kFlat,
file_format: str = "png",
) -> DiagramWatcher:
"""
**Description**
This function adds a :py:class:`~brom_drake.DiagramWatcher.DiagramWatcher.DiagramWatcher` to a Drake Diagram.
The diagram is not finalized yet and so it is accessible via the `builder` argument.
The watcher will insert LogVectorSink systems to log the output ports of the systems specified in the `targets` argument.
Importantly, this function does NOT build the diagram.
**Parameters**
builder : DiagramBuilder
The diagram builder to which we want to add the watcher.
targets : List[Tuple[Union[str, int]]]
The targets that we want to watch.
watcher_dir : str, optional
The directory in which we will store the data collected by the DiagramWatcher.
By default, :py:const:`~brom_drake.directories.directories.DEFAULT_WATCHER_DIR`.
plot_arrangement : PortFigureArrangement, optional
The arrangement of the plots.
By default, PortFigureArrangement.OnePlotPerPort.
figure_naming_convention : FigureNamingConvention, optional
The naming convention for the figures.
By default, FigureNamingConvention.kFlat.
file_format : str, optional
The file format for the figures.
By default, "png".
**Example Usage**
The simplest way to use this function is to just pass in the diagram builder:
.. code-block:: python
watcher = add_watcher(builder)
You can also specify which systems and ports to watch by passing in a list of targets.
Each target can be specified as either:
- A string representing the name of the system to watch (all ports will be watched), or
- A tuple of (system_name: str, port_index: int) to watch a specific port of a system, or
- A tuple of (system_index: int, port_index: int) to watch a specific port of a system by its index.
Here are some examples of how to specify targets:
.. code-block:: python
watcher = add_watcher(builder, [("plant", 0), ("controller", 0)])
**Returns**
watcher : DiagramWatcher
The watcher that we have added to the diagram builder.
"""
# Input Processing
if not isinstance(plot_arrangement, PortFigureArrangement):
raise ValueError(
f"plot_arrangement must be of type PortFigureArrangement; received {plot_arrangement} of type" +
f" {type(plot_arrangement)}."
)
# Parse targets list if it exists
if targets is not None:
targets = parse_list_of_simplified_targets(builder, targets)
watcher = DiagramWatcher(
builder,
targets=targets,
options=DiagramWatcherOptions(
base_directory=watcher_dir,
plotting_options=PortWatcherPlottingOptions(
plot_arrangement=plot_arrangement,
figure_naming_convention=figure_naming_convention,
file_format=file_format,
),
raw_data_options=PortWatcherRawDataOptions(
save_to_file=True,
),
)
)
return watcher
[docs]
def add_watcher_and_build(
builder: DiagramBuilder,
targets: PotentialTargetTypes = None,
watcher_dir: str = directories.DEFAULT_WATCHER_DIR,
plot_arrangement: PortFigureArrangement = PortFigureArrangement.OnePlotPerPort,
figure_naming_convention: FigureNamingConvention = FigureNamingConvention.kFlat,
file_format: str = "png",
) -> Tuple[DiagramWatcher, Diagram, Context]:
"""
**Description**
This function adds a :py:class:`~brom_drake.DiagramWatcher.DiagramWatcher.DiagramWatcher` to a Drake Diagram.
The diagram is not finalized yet and so it is accessible via the `builder` argument.
The watcher will insert LogVectorSink systems to log the output ports of the systems specified in the `targets` argument.
Once all of the LogVectorSink systems have been added, the diagram is built and a reference to the built diagram
and its default context is added to the DiagramWatcher object.
Example Usage:
.. code-block:: python
watcher = add_watcher_and_build(builder)
In addition, one can specify which systems to watch:
.. code-block:: python
watcher = add_watcher_and_build(builder, targets=[("plant",)])
The DiagramWatcher will then watch all supported ports of the "plant" system
that it can find.
Furthermore, to specify which systems and specific ports to watch:
.. code-block:: python
watcher = add_watcher_and_build(builder, targets=[("plant", 0), ("controller", 0)])
This will watch output port 0 of both the "plant" and "controller" systems, if possible.
**Parameters**
builder: DiagramBuilder
The diagram builder to which we want to add the watcher.
targets: List[Tuple[Union[str, int]]]
The targets that we want to watch.
watcher_dir: str
The directory in which we will store the data collected by the DiagramWatcher.
plot_arrangement: PortFigureArrangement
The arrangement of the plots.
(Can be PortFigureArrangement.OnePlotPerPort OR PortFigureArrangement.OnePlotPerDim)
figure_naming_convention: FigureNamingConvention
The naming convention for the figures.
(Can be FigureNamingConvention.kFlat OR FigureNamingConvention.kHierarchical)
file_format: str
The file format for the figures.
**Returns**
watcher: DiagramWatcher
The watcher that we have added to the diagram builder.
diagram: Diagram
The built diagram.
diagram_context: Context
The default context for the built diagram.
"""
watcher = add_watcher(
builder,
targets=targets,
watcher_dir=watcher_dir,
plot_arrangement=plot_arrangement,
figure_naming_convention=figure_naming_convention,
file_format=file_format,
)
# Build the diagram and add a reference to the watcher
diagram = builder.Build()
diagram_context = diagram.CreateDefaultContext()
watcher.diagram = diagram
watcher.diagram_context = diagram_context
return watcher, diagram, diagram_context
[docs]
def parse_list_of_simplified_targets(
builder: DiagramBuilder,
targets: PotentialTargetTypes,
) -> List[DiagramTarget]:
"""
**Description**
This function takes a list of simplified targets and converts them to the full form.
**Example Usage**
.. code-block:: python
targets = [("plant", 0), ("controller", 0)]
parsed_targets = parse_list_of_simplified_targets(targets)
**Parameters**
builder : DiagramBuilder
The diagram builder to which we want to add the watcher.
targets : List[Tuple[Union[str, int]]
The list of simplified targets.
**Returns**
parsed_targets: List[DiagramTarget]
A list of all the targets that we want to watch.
"""
# Setup
parsed_targets = []
system_list = builder.GetSystems()
# If input is not a list, then raise an error
if not isinstance(targets, list):
raise ValueError("the input \"targets\" must be a list.")
if len(targets) == 0:
raise ValueError(
"the input \"targets\" must be a non-empty list containing tuples or strings.\n" +
"If you want to watch the entire diagram, then pass in None as the value of targets."
)
# Parse each element in targets
for target in targets:
target_name, ports_list = None, None
if isinstance(target, str):
target_name = target
elif isinstance(target, tuple):
# Parse the first element in the tuple
if isinstance(target[0], str):
target_name = target[0]
elif isinstance(target[0], int):
# Check if the index is valid
if target[0] < 0 or target[0] >= len(system_list):
raise ValueError(
f"the index {target[0]} is not a valid index for the list of systems (has length {len(system_list)})."
)
target_name = system_list[target[0]].get_name()
else:
raise ValueError(
"the first element of the tuple must be a string or an integer; received {} (type {}).".format(
target[0], type(target[0])
)
)
# Parse the second element in the tuple
if isinstance(target[1], int):
ports_list = [target[1]]
elif isinstance(target[1], str):
# Find the index of port with this name
port_name = target[1]
# Find the system
system = None
for ii, system_ii in enumerate(system_list):
if system_ii.get_name() == target_name:
system = system_ii
break
# for jj in range(system.num_output_ports()):
# print(system.get_output_port(jj).get_name())
if system.HasOutputPort(port_name):
ports_list = [int(system.GetOutputPort(port_name).get_index())]
else:
raise ValueError(
f"the system {target_name} does not have an output port named {port_name}."
)
elif isinstance(target[1], list):
ports_list = []
for ii, elt_ii in enumerate(target[1]):
if isinstance(elt_ii, int):
ports_list.append(elt_ii)
elif isinstance(elt_ii, str):
port_name = elt_ii
# Find the system
system = None
for ii, system_ii in enumerate(system_list):
if system_ii.get_name() == target_name:
system = system_ii
break
if system.HasOutputPort(port_name):
ports_list += [int(system.GetOutputPort(port_name).get_index())]
else:
raise ValueError(
f"the system {target_name} does not have an output port named {port_name}."
)
else:
raise ValueError(
f"the target_list[{ii}] of the tuple is not an integer or a string! " +
f"Received {elt_ii} of type {type(elt_ii)}."
)
else:
raise ValueError(
"the second element of the tuple must be either a: \n" +
"- an integer\n" +
"- a list of integers\n" +
"- a string\n" +
"- a list of strings\n" +
"- None.\n" +
f"Received type {type(target[1])}"
)
# Create the diagram target object
dt = DiagramTarget(target_name, ports_list)
parsed_targets.append(dt)
return parsed_targets