Source code for brom_drake.DiagramWatcher.DiagramWatcher

"""
DiagramWatcher.py
Description:

    Creates a DiagramWatcher class that can be used to monitor the state of a Diagram
    and automatically log signals of interest.
"""
import os
from typing import Dict, List, Union
import matplotlib.pyplot as plt
import numpy as np
from pathlib import Path
import logging

from pydrake.multibody.plant import MultibodyPlant
from pydrake.systems.framework import Diagram, DiagramBuilder, LeafSystem, PortDataType
from pydrake.systems.primitives import (
    VectorLogSink, ConstantVectorSource, AffineSystem, LogVectorOutput,
)

# Internal Imports
from brom_drake.DiagramTarget import DiagramTarget
from brom_drake.PortWatcher.port_watcher import PortWatcher
from brom_drake.PortWatcher.port_watcher_options import PortWatcherOptions, PortWatcherPlottingOptions, PortWatcherRawDataOptions
from brom_drake.DiagramWatcher.diagram_watcher_options import DiagramWatcherOptions
from brom_drake.DiagramWatcher.constants import INELIGIBLE_SYSTEM_TYPES
from brom_drake.DiagramWatcher.errors import UnrecognizedTargetError


[docs] class DiagramWatcher: """ *Description* An object that will iterate through all elements of a partially built Drake Diagram (via the DiagramBuilder) and add PortWatchers to the specified targets. *Parameters* subject: DiagramBuilder We will search through the subject (a diagram builder) to find all the systems that we want to monitor. targets: List[DiagramTarget], optional The targets that we want to monitor, by default this tries to monitor all systems in the diagram. (i.e., when this value is None, we will monitor all systems). options: DiagramWatcherOptions, optional The options that configure the DiagramWatcher, by default DiagramWatcherOptions() """ def __init__( self, subject: DiagramBuilder, targets: List[DiagramTarget] = None, options: DiagramWatcherOptions = DiagramWatcherOptions(), ): """ *Description* Initializes the DiagramWatcher class. *Parameters* subject : DiagramBuilder We will search through the subject (a diagram builder) to find all the systems that we want to monitor. targets : List[DiagramTarget], optional The targets that we want to monitor, by default None port_watcher_options : PortWatcherOptions, optional The options for the PortWatcher, by default PortWatcherOptions() """ # Setup # Needs to be populated by the user of this class AFTER the diagram has been built self.diagram = None self.diagram_context = None # Check subject if not isinstance(subject, DiagramBuilder): raise ValueError("subject must be a DiagramBuilder!") # Save the inputs self.subject = subject self.options = options # Create the .brom directory, to store: # - activity_summary.log # - all plots if os.path.exists(self.options.base_directory): os.system(f"rm -r {self.options.base_directory}") # Create directory to plot in os.makedirs(self.options.base_directory, exist_ok=True) self.logger = self.create_logger() # Create an "activity summary" log # which details what the # DiagramWatcher is doing. # Collect All the Connections and Systems self.eligible_systems = self.find_eligible_systems(subject) if targets is None: targets = [DiagramTarget(system.get_name()) for system in self.eligible_systems] else: self.check_targets(targets, self.eligible_systems) # Log the list of eligible systems self.logger.info(f"Found {len(self.eligible_systems)} systems in diagram are eligible for targeting:") for idx, system in enumerate(self.eligible_systems): self.logger.info(f"{idx}: {system.get_name()}") # For Each Target with None ports, we will try to # "smartly" create the targets that we want to monitor inferred_targets = self.get_smart_targets(subject, targets) self.inferred_targets = inferred_targets # For each target's port, we will add a logger self.port_watchers: Dict[str, Dict[str, PortWatcher]] = {target.name: {} for target in inferred_targets} self.logger.info("Adding loggers to the diagram... (via PortWatcher objects)") for target in inferred_targets: system = subject.GetSubsystemByName(target.name) for port_index in target.ports: target_port = system.get_output_port(port_index) options_for_target_port = options.to_port_watcher_options() try: # Configure PortWatcher self.port_watchers[target.name][target_port.get_name()] = PortWatcher( target_port, subject, self.logger, logger_name=f"{target.name}_logger_{port_index}", base_watcher_dir=options.base_directory, options=options_for_target_port, ) except Exception as e: if self.options.hide_messages.during_port_watcher_connection: continue # If we have an error, we will log it self.logger.debug( f"Failed to add a watcher to port {target_port.get_name()} of system {target.name}: {e}", exc_info=False, ) if target_port.get_name() in self.port_watchers[target.name]: # Announce that we successfully added logger self.logger.info(f"Added logger to port {target_port.get_name()} of system {target.name}") else: # If we did not add the logger, then we will send a small note. self.logger.info( f"Unable to add logger to port {target_port.get_name()} of system {target.name}", )
[docs] def create_new_port_watcher_options( self, options: PortWatcherOptions, plot_dir: str = None, raw_data_dir: str = None, ) -> PortWatcherOptions: """ *Description* Creates a new set of PortWatcherOptions with the given options. *Parameters* options : PortWatcherOptions The options to use as a base. plot_dir : str, optional The directory to save the plots in, by default None raw_data_dir : str, optional The directory to save the raw data in, by default None """ # Setup new_plot_dir = options.plotting.base_directory if plot_dir is not None: new_plot_dir = plot_dir new_raw_data_dir = options.raw_data.base_directory if raw_data_dir is not None: new_raw_data_dir = raw_data_dir # Return the new options return PortWatcherOptions( plotting=PortWatcherPlottingOptions( plot_arrangement=options.plotting.plot_arrangement, plot_dpi=options.plotting.plot_dpi, save_to_file=options.plotting.save_to_file, file_format=options.plotting.file_format, figure_naming_convention=options.plotting.figure_naming_convention, ), raw_data=PortWatcherRawDataOptions( save_to_file=options.raw_data.save_to_file, base_directory=new_raw_data_dir, file_format=options.raw_data.file_format, ), )
def __del__(self): """ *Description* Destructor for the Diagram Watcher. Will plot the data from all of our loggers if we have access to the diagram context. """ # Setup is_ready_to_plot = self.diagram is not None if not is_ready_to_plot: return # Return early if we don't have access to the diagram context # Upon deletion, we will PLOT the data from all of our loggers # if we have access to the diagram context self.save_figures() self.save_raw_data() # Close all handlers in the logger and the remove them for handler in self.logger.handlers: handler.close() self.logger.removeHandler(handler)
[docs] def create_logger(self) -> logging.Logger: """ *Description* Configures the "activity summary" a log of brom's activity. *Returns* logger: logging.Logger The configured logger for LOG MESSAGES. In other words, this is not a logger of signals from the diagram. """ # Setup options = self.options # Create the basic logger logger = logging.getLogger("brom_drake.DiagramWatcher") for handler in logger.handlers: # Remove all existing handlers logger.removeHandler(handler) # Create a file handler, if none exists # Create a logging directory if it does not exist watcher_outputs_base_directory = Path(options.base_directory) watcher_outputs_base_directory.mkdir(parents=True, exist_ok=True) # Create a file handler file_handler = logging.FileHandler( filename=options.base_directory + "/activity_summary.log", mode='w', # Append mode ) file_handler.setLevel(logging.DEBUG) # Create a formatter and set it for the file handler formatter = logging.Formatter( "%(asctime)s | %(levelname)s | %(name)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) file_handler.setFormatter(formatter) # Add the file handler to the logger logger.addHandler(file_handler) # Create a terminal handler terminal_handler = logging.StreamHandler() terminal_handler.setLevel(logging.WARNING) # Set to WARNING to avoid cluttering terminal with INFO messages terminal_handler.setFormatter(formatter) logger.addHandler(terminal_handler) # Avoid duplicate logs logger.propagate = False # Make sure the logger responds to all messages of level DEBUG and above logger.setLevel(logging.DEBUG) # Set to DEBUG to capture all messages return logger
[docs] def check_targets( self, targets: List[DiagramTarget], eligible_systems: List[Union[MultibodyPlant, AffineSystem, LeafSystem]], ) -> List[Union[MultibodyPlant, AffineSystem, LeafSystem]]: """ *Description* Finds the systems specified by the targets list that we want to watch/monitor. We will try to ignore all systems that are: - Scene Graphs - Loggers and raise an error if the target is not found in the eligible systems. *Parameters* targets : List[DiagramTarget] The targets that we want to monitor. eligible_systems : List[Union[MultibodyPlant, AffineSystem, LeafSystem]] The systems that are eligible for monitoring. """ # Find all the systems that are eligible for logging eligible_system_dict = { system.get_name(): system for system in eligible_systems } # Search for each target in eligible_systems targeted = [] for target in targets: # Check if the target name is in the eligible systems if target.name not in eligible_system_dict.keys(): raise UnrecognizedTargetError(target, eligible_system_dict.keys()) # If it is, then also check that the port index is correct if target.ports is None: continue # No need to check things if ports is None num_ports_in_target = eligible_system_dict[target.name].num_output_ports() for port_index in target.ports: if port_index < 0 or port_index >= num_ports_in_target: raise ValueError(f"Port index {port_index} is out of bounds for system {target.name} (only {num_ports_in_target} ports exist)") # All checks passed! pass
[docs] def find_eligible_systems( self, builder: DiagramBuilder ) -> List[Union[MultibodyPlant, AffineSystem, LeafSystem]]: """ *Description* Finds all the systems that are eligible for logging. We want to ignore all systems that are: - Scene Graphs - Loggers """ # Find all the systems that are eligible for logging eligible_systems = [] self.logger.info("Finding all eligible systems for logging...") for system in builder.GetSystems(): if type(system) in INELIGIBLE_SYSTEM_TYPES: self.logger.warning( f"System {system.get_name()} (of type {type(system)}) is not eligible for logging! Skipping..." ) continue # Otherwise add to list eligible_systems.append(system) self.logger.info( f"System {system.get_name()} (of type {type(system)}) is eligible for logging with the watcher." ) return eligible_systems
[docs] def get_smart_targets( self, subject: DiagramBuilder, targets: List[DiagramTarget], ) -> List[DiagramTarget]: """ *Description* For each target with None ports, we will try to "smartly" create the targets that we want to monitor. *Parameters* subject : DiagramBuilder The diagram builder that contains the systems. targets : List[DiagramTarget] The targets that we want to monitor. *Returns* List[DiagramTarget] The list of targets with inferred ports. """ # Setup smart_targets = [] for target in targets: if target.ports is not None: smart_targets.append(target) continue # If ports is None, then we will try to "smartly" create the ports system = subject.GetSubsystemByName(target.name) # TODO: Add support for investigating output ports num_ports = system.num_output_ports() if num_ports == 0: self.logger.warning(f"System {target.name} has no output ports! Skipping...") continue output_ports_to_watch = [port_index for port_index in range(num_ports)] # Add a target with the connected ports to the smart_targets list smart_targets.append( DiagramTarget(target.name, output_ports_to_watch), ) # Log the list of inferred targets self.logger.info(f"Found {len(smart_targets)} inferred targets:") for idx, target in enumerate(smart_targets): self.logger.info(f"{idx}: {target.name} - {target.ports}") return smart_targets
[docs] def save_figures(self): """ *Description* Saves all the figures made from plotting data currently saved in the known port watchers. """ # Announce Saving Figures has started self.logger.info("Saving figures...") # Algorithm for system_name in self.port_watchers: system_ii = self.diagram.GetSubsystemByName(system_name) ports_on_ii = self.port_watchers[system_name] self.logger.info(f"Saving figures for system {system_name}...") for port_name in ports_on_ii: temp_port_watcher = ports_on_ii[port_name] temp_plotting_options = temp_port_watcher.options.plotting # Plot only if the PortWatcher flag is set if temp_plotting_options.save_to_file: try: temp_port_watcher.save_all_figures(self.diagram_context) self.logger.info(f"Saved figures for port {port_name} on system {system_name}") except Exception as e: self.logger.error(f"Failed to save figures for port {port_name} on system {system_name}: {e}") else: self.logger.info(f"Skipped plotting for port {port_name} on system {system_name} (flag not set)")
[docs] def save_raw_data(self): """ *Description* Saves all the raw data from the port watchers. """ # Announce Saving Raw Data has started self.logger.info("Saving raw data...") # Algorithm for system_name in self.port_watchers: system_ii = self.diagram.GetSubsystemByName(system_name) ports_on_ii = self.port_watchers[system_name] self.logger.info(f"Saving raw data for system {system_name}...") for port_name in ports_on_ii: temp_port_watcher = ports_on_ii[port_name] temp_raw_data_options = temp_port_watcher.options.raw_data # Save raw data only if the PortWatcher flag is set if temp_raw_data_options.save_to_file: try: temp_port_watcher.save_raw_data(self.diagram_context) self.logger.info(f"Saved raw data for port {port_name} on system {system_name}") except Exception as e: self.logger.error(f"Failed to save raw data for port {port_name} on system {system_name}: {e}") else: self.logger.info(f"Skipped saving raw data for port {port_name} on system {system_name} (flag not set)")