"""
DiagramWatcher.py
Description:
Creates a DiagramWatcher class that can be used to monitor the state of a Diagram
and automatically log signals of interest.
"""
import os
from typing import Dict, List, Union
import matplotlib.pyplot as plt
import numpy as np
from pathlib import Path
import logging
from pydrake.multibody.plant import MultibodyPlant
from pydrake.systems.framework import Diagram, DiagramBuilder, LeafSystem, PortDataType
from pydrake.systems.primitives import (
VectorLogSink, ConstantVectorSource, AffineSystem, LogVectorOutput,
)
# Internal Imports
from brom_drake.DiagramTarget import DiagramTarget
from brom_drake.PortWatcher.port_watcher import PortWatcher
from brom_drake.PortWatcher.port_watcher_options import PortWatcherOptions, PortWatcherPlottingOptions, PortWatcherRawDataOptions
from brom_drake.DiagramWatcher.diagram_watcher_options import DiagramWatcherOptions
from brom_drake.DiagramWatcher.constants import INELIGIBLE_SYSTEM_TYPES
from brom_drake.DiagramWatcher.errors import UnrecognizedTargetError
[docs]
class DiagramWatcher:
"""
*Description*
An object that will iterate through all elements of a partially built
Drake Diagram (via the DiagramBuilder) and add PortWatchers to the specified targets.
*Parameters*
subject: DiagramBuilder
We will search through the subject (a diagram builder)
to find all the systems that we want to monitor.
targets: List[DiagramTarget], optional
The targets that we want to monitor, by default this tries to monitor
all systems in the diagram. (i.e., when this value is None, we will monitor all systems).
options: DiagramWatcherOptions, optional
The options that configure the DiagramWatcher, by default DiagramWatcherOptions()
"""
def __init__(
self,
subject: DiagramBuilder,
targets: List[DiagramTarget] = None,
options: DiagramWatcherOptions = DiagramWatcherOptions(),
):
"""
*Description*
Initializes the DiagramWatcher class.
*Parameters*
subject : DiagramBuilder
We will search through the subject (a diagram builder)
to find all the systems that we want to monitor.
targets : List[DiagramTarget], optional
The targets that we want to monitor, by default None
port_watcher_options : PortWatcherOptions, optional
The options for the PortWatcher, by default PortWatcherOptions()
"""
# Setup
# Needs to be populated by the user of this class AFTER the diagram has been built
self.diagram = None
self.diagram_context = None
# Check subject
if not isinstance(subject, DiagramBuilder):
raise ValueError("subject must be a DiagramBuilder!")
# Save the inputs
self.subject = subject
self.options = options
# Create the .brom directory, to store:
# - activity_summary.log
# - all plots
if os.path.exists(self.options.base_directory):
os.system(f"rm -r {self.options.base_directory}")
# Create directory to plot in
os.makedirs(self.options.base_directory, exist_ok=True)
self.logger = self.create_logger() # Create an "activity summary" log
# which details what the
# DiagramWatcher is doing.
# Collect All the Connections and Systems
self.eligible_systems = self.find_eligible_systems(subject)
if targets is None:
targets = [DiagramTarget(system.get_name()) for system in self.eligible_systems]
else:
self.check_targets(targets, self.eligible_systems)
# Log the list of eligible systems
self.logger.info(f"Found {len(self.eligible_systems)} systems in diagram are eligible for targeting:")
for idx, system in enumerate(self.eligible_systems):
self.logger.info(f"{idx}: {system.get_name()}")
# For Each Target with None ports, we will try to
# "smartly" create the targets that we want to monitor
inferred_targets = self.get_smart_targets(subject, targets)
self.inferred_targets = inferred_targets
# For each target's port, we will add a logger
self.port_watchers: Dict[str, Dict[str, PortWatcher]] = {target.name: {} for target in inferred_targets}
self.logger.info("Adding loggers to the diagram... (via PortWatcher objects)")
for target in inferred_targets:
system = subject.GetSubsystemByName(target.name)
for port_index in target.ports:
target_port = system.get_output_port(port_index)
options_for_target_port = options.to_port_watcher_options()
try:
# Configure PortWatcher
self.port_watchers[target.name][target_port.get_name()] = PortWatcher(
target_port, subject, self.logger,
logger_name=f"{target.name}_logger_{port_index}",
base_watcher_dir=options.base_directory,
options=options_for_target_port,
)
except Exception as e:
if self.options.hide_messages.during_port_watcher_connection:
continue
# If we have an error, we will log it
self.logger.debug(
f"Failed to add a watcher to port {target_port.get_name()} of system {target.name}: {e}",
exc_info=False,
)
if target_port.get_name() in self.port_watchers[target.name]:
# Announce that we successfully added logger
self.logger.info(f"Added logger to port {target_port.get_name()} of system {target.name}")
else:
# If we did not add the logger, then we will send a small note.
self.logger.info(
f"Unable to add logger to port {target_port.get_name()} of system {target.name}",
)
[docs]
def create_new_port_watcher_options(
self,
options: PortWatcherOptions,
plot_dir: str = None,
raw_data_dir: str = None,
) -> PortWatcherOptions:
"""
*Description*
Creates a new set of PortWatcherOptions with the given options.
*Parameters*
options : PortWatcherOptions
The options to use as a base.
plot_dir : str, optional
The directory to save the plots in, by default None
raw_data_dir : str, optional
The directory to save the raw data in, by default None
"""
# Setup
new_plot_dir = options.plotting.base_directory
if plot_dir is not None:
new_plot_dir = plot_dir
new_raw_data_dir = options.raw_data.base_directory
if raw_data_dir is not None:
new_raw_data_dir = raw_data_dir
# Return the new options
return PortWatcherOptions(
plotting=PortWatcherPlottingOptions(
plot_arrangement=options.plotting.plot_arrangement,
plot_dpi=options.plotting.plot_dpi,
save_to_file=options.plotting.save_to_file,
file_format=options.plotting.file_format,
figure_naming_convention=options.plotting.figure_naming_convention,
),
raw_data=PortWatcherRawDataOptions(
save_to_file=options.raw_data.save_to_file,
base_directory=new_raw_data_dir,
file_format=options.raw_data.file_format,
),
)
def __del__(self):
"""
*Description*
Destructor for the Diagram Watcher.
Will plot the data from all of our loggers if we have access to the diagram context.
"""
# Setup
is_ready_to_plot = self.diagram is not None
if not is_ready_to_plot:
return # Return early if we don't have access to the diagram context
# Upon deletion, we will PLOT the data from all of our loggers
# if we have access to the diagram context
self.save_figures()
self.save_raw_data()
# Close all handlers in the logger and the remove them
for handler in self.logger.handlers:
handler.close()
self.logger.removeHandler(handler)
[docs]
def create_logger(self) -> logging.Logger:
"""
*Description*
Configures the "activity summary" a log of brom's activity.
*Returns*
logger: logging.Logger
The configured logger for LOG MESSAGES.
In other words, this is not a logger of signals from the diagram.
"""
# Setup
options = self.options
# Create the basic logger
logger = logging.getLogger("brom_drake.DiagramWatcher")
for handler in logger.handlers:
# Remove all existing handlers
logger.removeHandler(handler)
# Create a file handler, if none exists
# Create a logging directory if it does not exist
watcher_outputs_base_directory = Path(options.base_directory)
watcher_outputs_base_directory.mkdir(parents=True, exist_ok=True)
# Create a file handler
file_handler = logging.FileHandler(
filename=options.base_directory + "/activity_summary.log",
mode='w', # Append mode
)
file_handler.setLevel(logging.DEBUG)
# Create a formatter and set it for the file handler
formatter = logging.Formatter(
"%(asctime)s | %(levelname)s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
file_handler.setFormatter(formatter)
# Add the file handler to the logger
logger.addHandler(file_handler)
# Create a terminal handler
terminal_handler = logging.StreamHandler()
terminal_handler.setLevel(logging.WARNING) # Set to WARNING to avoid cluttering terminal with INFO messages
terminal_handler.setFormatter(formatter)
logger.addHandler(terminal_handler)
# Avoid duplicate logs
logger.propagate = False
# Make sure the logger responds to all messages of level DEBUG and above
logger.setLevel(logging.DEBUG) # Set to DEBUG to capture all messages
return logger
[docs]
def check_targets(
self,
targets: List[DiagramTarget],
eligible_systems: List[Union[MultibodyPlant, AffineSystem, LeafSystem]],
) -> List[Union[MultibodyPlant, AffineSystem, LeafSystem]]:
"""
*Description*
Finds the systems specified by the targets list that
we want to watch/monitor.
We will try to ignore all systems that are:
- Scene Graphs
- Loggers
and raise an error if the target is not found in the eligible systems.
*Parameters*
targets : List[DiagramTarget]
The targets that we want to monitor.
eligible_systems : List[Union[MultibodyPlant, AffineSystem, LeafSystem]]
The systems that are eligible for monitoring.
"""
# Find all the systems that are eligible for logging
eligible_system_dict = {
system.get_name(): system for system in eligible_systems
}
# Search for each target in eligible_systems
targeted = []
for target in targets:
# Check if the target name is in the eligible systems
if target.name not in eligible_system_dict.keys():
raise UnrecognizedTargetError(target, eligible_system_dict.keys())
# If it is, then also check that the port index is correct
if target.ports is None:
continue # No need to check things if ports is None
num_ports_in_target = eligible_system_dict[target.name].num_output_ports()
for port_index in target.ports:
if port_index < 0 or port_index >= num_ports_in_target:
raise ValueError(f"Port index {port_index} is out of bounds for system {target.name} (only {num_ports_in_target} ports exist)")
# All checks passed!
pass
[docs]
def find_eligible_systems(
self,
builder: DiagramBuilder
) -> List[Union[MultibodyPlant, AffineSystem, LeafSystem]]:
"""
*Description*
Finds all the systems that are eligible for logging.
We want to ignore all systems that are:
- Scene Graphs
- Loggers
"""
# Find all the systems that are eligible for logging
eligible_systems = []
self.logger.info("Finding all eligible systems for logging...")
for system in builder.GetSystems():
if type(system) in INELIGIBLE_SYSTEM_TYPES:
self.logger.warning(
f"System {system.get_name()} (of type {type(system)}) is not eligible for logging! Skipping..."
)
continue
# Otherwise add to list
eligible_systems.append(system)
self.logger.info(
f"System {system.get_name()} (of type {type(system)}) is eligible for logging with the watcher."
)
return eligible_systems
[docs]
def get_smart_targets(
self,
subject: DiagramBuilder,
targets: List[DiagramTarget],
) -> List[DiagramTarget]:
"""
*Description*
For each target with None ports, we will try to
"smartly" create the targets that we want to monitor.
*Parameters*
subject : DiagramBuilder
The diagram builder that contains the systems.
targets : List[DiagramTarget]
The targets that we want to monitor.
*Returns*
List[DiagramTarget]
The list of targets with inferred ports.
"""
# Setup
smart_targets = []
for target in targets:
if target.ports is not None:
smart_targets.append(target)
continue
# If ports is None, then we will try to "smartly" create the ports
system = subject.GetSubsystemByName(target.name)
# TODO: Add support for investigating output ports
num_ports = system.num_output_ports()
if num_ports == 0:
self.logger.warning(f"System {target.name} has no output ports! Skipping...")
continue
output_ports_to_watch = [port_index for port_index in range(num_ports)]
# Add a target with the connected ports to the smart_targets list
smart_targets.append(
DiagramTarget(target.name, output_ports_to_watch),
)
# Log the list of inferred targets
self.logger.info(f"Found {len(smart_targets)} inferred targets:")
for idx, target in enumerate(smart_targets):
self.logger.info(f"{idx}: {target.name} - {target.ports}")
return smart_targets
[docs]
def save_raw_data(self):
"""
*Description*
Saves all the raw data from the port watchers.
"""
# Announce Saving Raw Data has started
self.logger.info("Saving raw data...")
# Algorithm
for system_name in self.port_watchers:
system_ii = self.diagram.GetSubsystemByName(system_name)
ports_on_ii = self.port_watchers[system_name]
self.logger.info(f"Saving raw data for system {system_name}...")
for port_name in ports_on_ii:
temp_port_watcher = ports_on_ii[port_name]
temp_raw_data_options = temp_port_watcher.options.raw_data
# Save raw data only if the PortWatcher flag is set
if temp_raw_data_options.save_to_file:
try:
temp_port_watcher.save_raw_data(self.diagram_context)
self.logger.info(f"Saved raw data for port {port_name} on system {system_name}")
except Exception as e:
self.logger.error(f"Failed to save raw data for port {port_name} on system {system_name}: {e}")
else:
self.logger.info(f"Skipped saving raw data for port {port_name} on system {system_name} (flag not set)")