Source code for brom_drake.utils.watcher.add_watcher

"""
This file defines a couple of convenience functions for adding in the watcher
to a target diagram builder. These functions should be useful for 99% of the
users of the DiagramWatcher class.
"""
from typing import List, Tuple, Union

import numpy as np
from pydrake.all import DiagramBuilder, Diagram
from pydrake.systems.framework import Context

from brom_drake.DiagramTarget import DiagramTarget
from brom_drake.DiagramWatcher import DiagramWatcher, DiagramWatcherOptions
from brom_drake import directories
from brom_drake.PortWatcher.port_watcher_options import (
    PortFigureArrangement,
    PortWatcherPlottingOptions,
    PortWatcherRawDataOptions,
    FigureNamingConvention,
    PortWatcherOptions,
)

PotentialTargetTypes = List[
    Union[
        str,
        Tuple[str, int],
        Tuple[int, int],
        Tuple[str, str],
        Tuple[str, List[int]],
        Tuple[int, List[int]],
        Tuple[str, List[str]],
    ],
]


[docs] def add_watcher( builder: DiagramBuilder, targets: PotentialTargetTypes = None, watcher_dir: str = directories.DEFAULT_WATCHER_DIR, plot_arrangement: PortFigureArrangement = PortFigureArrangement.OnePlotPerPort, figure_naming_convention: FigureNamingConvention = FigureNamingConvention.kFlat, file_format: str = "png", ) -> DiagramWatcher: """ **Description** This function adds a :py:class:`~brom_drake.DiagramWatcher.DiagramWatcher.DiagramWatcher` to a Drake Diagram. The diagram is not finalized yet and so it is accessible via the `builder` argument. The watcher will insert LogVectorSink systems to log the output ports of the systems specified in the `targets` argument. Importantly, this function does NOT build the diagram. **Parameters** builder : DiagramBuilder The diagram builder to which we want to add the watcher. targets : List[Tuple[Union[str, int]]] The targets that we want to watch. watcher_dir : str, optional The directory in which we will store the data collected by the DiagramWatcher. By default, :py:const:`~brom_drake.directories.directories.DEFAULT_WATCHER_DIR`. plot_arrangement : PortFigureArrangement, optional The arrangement of the plots. By default, PortFigureArrangement.OnePlotPerPort. figure_naming_convention : FigureNamingConvention, optional The naming convention for the figures. By default, FigureNamingConvention.kFlat. file_format : str, optional The file format for the figures. By default, "png". **Example Usage** The simplest way to use this function is to just pass in the diagram builder: .. code-block:: python watcher = add_watcher(builder) You can also specify which systems and ports to watch by passing in a list of targets. Each target can be specified as either: - A string representing the name of the system to watch (all ports will be watched), or - A tuple of (system_name: str, port_index: int) to watch a specific port of a system, or - A tuple of (system_index: int, port_index: int) to watch a specific port of a system by its index. Here are some examples of how to specify targets: .. code-block:: python watcher = add_watcher(builder, [("plant", 0), ("controller", 0)]) **Returns** watcher : DiagramWatcher The watcher that we have added to the diagram builder. """ # Input Processing if not isinstance(plot_arrangement, PortFigureArrangement): raise ValueError( f"plot_arrangement must be of type PortFigureArrangement; received {plot_arrangement} of type" + f" {type(plot_arrangement)}." ) # Parse targets list if it exists if targets is not None: targets = parse_list_of_simplified_targets(builder, targets) watcher = DiagramWatcher( builder, targets=targets, options=DiagramWatcherOptions( base_directory=watcher_dir, plotting_options=PortWatcherPlottingOptions( plot_arrangement=plot_arrangement, figure_naming_convention=figure_naming_convention, file_format=file_format, ), raw_data_options=PortWatcherRawDataOptions( save_to_file=True, ), ) ) return watcher
[docs] def add_watcher_and_build( builder: DiagramBuilder, targets: PotentialTargetTypes = None, watcher_dir: str = directories.DEFAULT_WATCHER_DIR, plot_arrangement: PortFigureArrangement = PortFigureArrangement.OnePlotPerPort, figure_naming_convention: FigureNamingConvention = FigureNamingConvention.kFlat, file_format: str = "png", ) -> Tuple[DiagramWatcher, Diagram, Context]: """ **Description** This function adds a :py:class:`~brom_drake.DiagramWatcher.DiagramWatcher.DiagramWatcher` to a Drake Diagram. The diagram is not finalized yet and so it is accessible via the `builder` argument. The watcher will insert LogVectorSink systems to log the output ports of the systems specified in the `targets` argument. Once all of the LogVectorSink systems have been added, the diagram is built and a reference to the built diagram and its default context is added to the DiagramWatcher object. Example Usage: .. code-block:: python watcher = add_watcher_and_build(builder) In addition, one can specify which systems to watch: .. code-block:: python watcher = add_watcher_and_build(builder, targets=[("plant",)]) The DiagramWatcher will then watch all supported ports of the "plant" system that it can find. Furthermore, to specify which systems and specific ports to watch: .. code-block:: python watcher = add_watcher_and_build(builder, targets=[("plant", 0), ("controller", 0)]) This will watch output port 0 of both the "plant" and "controller" systems, if possible. **Parameters** builder: DiagramBuilder The diagram builder to which we want to add the watcher. targets: List[Tuple[Union[str, int]]] The targets that we want to watch. watcher_dir: str The directory in which we will store the data collected by the DiagramWatcher. plot_arrangement: PortFigureArrangement The arrangement of the plots. (Can be PortFigureArrangement.OnePlotPerPort OR PortFigureArrangement.OnePlotPerDim) figure_naming_convention: FigureNamingConvention The naming convention for the figures. (Can be FigureNamingConvention.kFlat OR FigureNamingConvention.kHierarchical) file_format: str The file format for the figures. **Returns** watcher: DiagramWatcher The watcher that we have added to the diagram builder. diagram: Diagram The built diagram. diagram_context: Context The default context for the built diagram. """ watcher = add_watcher( builder, targets=targets, watcher_dir=watcher_dir, plot_arrangement=plot_arrangement, figure_naming_convention=figure_naming_convention, file_format=file_format, ) # Build the diagram and add a reference to the watcher diagram = builder.Build() diagram_context = diagram.CreateDefaultContext() watcher.diagram = diagram watcher.diagram_context = diagram_context return watcher, diagram, diagram_context
[docs] def parse_list_of_simplified_targets( builder: DiagramBuilder, targets: PotentialTargetTypes, ) -> List[DiagramTarget]: """ **Description** This function takes a list of simplified targets and converts them to the full form. **Example Usage** .. code-block:: python targets = [("plant", 0), ("controller", 0)] parsed_targets = parse_list_of_simplified_targets(targets) **Parameters** builder : DiagramBuilder The diagram builder to which we want to add the watcher. targets : List[Tuple[Union[str, int]] The list of simplified targets. **Returns** parsed_targets: List[DiagramTarget] A list of all the targets that we want to watch. """ # Setup parsed_targets = [] system_list = builder.GetSystems() # If input is not a list, then raise an error if not isinstance(targets, list): raise ValueError("the input \"targets\" must be a list.") if len(targets) == 0: raise ValueError( "the input \"targets\" must be a non-empty list containing tuples or strings.\n" + "If you want to watch the entire diagram, then pass in None as the value of targets." ) # Parse each element in targets for target in targets: target_name, ports_list = None, None if isinstance(target, str): target_name = target elif isinstance(target, tuple): # Parse the first element in the tuple if isinstance(target[0], str): target_name = target[0] elif isinstance(target[0], int): # Check if the index is valid if target[0] < 0 or target[0] >= len(system_list): raise ValueError( f"the index {target[0]} is not a valid index for the list of systems (has length {len(system_list)})." ) target_name = system_list[target[0]].get_name() else: raise ValueError( "the first element of the tuple must be a string or an integer; received {} (type {}).".format( target[0], type(target[0]) ) ) # Parse the second element in the tuple if isinstance(target[1], int): ports_list = [target[1]] elif isinstance(target[1], str): # Find the index of port with this name port_name = target[1] # Find the system system = None for ii, system_ii in enumerate(system_list): if system_ii.get_name() == target_name: system = system_ii break # for jj in range(system.num_output_ports()): # print(system.get_output_port(jj).get_name()) if system.HasOutputPort(port_name): ports_list = [int(system.GetOutputPort(port_name).get_index())] else: raise ValueError( f"the system {target_name} does not have an output port named {port_name}." ) elif isinstance(target[1], list): ports_list = [] for ii, elt_ii in enumerate(target[1]): if isinstance(elt_ii, int): ports_list.append(elt_ii) elif isinstance(elt_ii, str): port_name = elt_ii # Find the system system = None for ii, system_ii in enumerate(system_list): if system_ii.get_name() == target_name: system = system_ii break if system.HasOutputPort(port_name): ports_list += [int(system.GetOutputPort(port_name).get_index())] else: raise ValueError( f"the system {target_name} does not have an output port named {port_name}." ) else: raise ValueError( f"the target_list[{ii}] of the tuple is not an integer or a string! " + f"Received {elt_ii} of type {type(elt_ii)}." ) else: raise ValueError( "the second element of the tuple must be either a: \n" + "- an integer\n" + "- a list of integers\n" + "- a string\n" + "- a list of strings\n" + "- None.\n" + f"Received type {type(target[1])}" ) # Create the diagram target object dt = DiagramTarget(target_name, ports_list) parsed_targets.append(dt) return parsed_targets