port_watcher¶
Submodules¶
Classes¶
- class brom_drake.watchers.port_watcher.file_manager.PortWatcherFileManager(base_directory: Path, plotting_options: PortWatcherPlottingOptions, raw_data_file_format: str = 'npy')[source]¶
Description
This class manages file paths and directories for saving data collected by the PortWatcher system.
- compute_path_for_each_figure(output_port: OutputPort, associated_log_sink: VectorLogSink, port_component_name: str = None) List[Path][source]¶
Description
Computes the names of all of the figures that will be produced for
Returns
- figure_paths_out: List[Path]
The paths of all of the figures that will be produced by this PortWatcherPlotter object.
- static compute_safe_system_name(system_name: str) str[source]¶
Description
This function returns a filesystem-safe version of the system name.
Returns
- safe_system_name: str
The filesystem-safe version of the system name.
- figure_names_under_flat_convention(output_port: OutputPort, associated_log_sink: VectorLogSink, port_component_name: str = None) List[Path][source]¶
Description
Returns the names associated with each figure that this port will generate assuming we are under the kFlat convention.
Arguments
- output_port: OutputPort
The output port for which we are generating figure names. We can extract the system name, port name, and port size from this.
- port_component_name: str
A “sub-component” of the port that we wish to give a unique name in the figures.
Returns
- figure_names: List[Path]
List of paths where each path is a file name for an associated figure.
- figure_names_under_hierarchical_convention(output_port: OutputPort, associated_log_sink: VectorLogSink, port_component_name: str = None) List[Path][source]¶
Description
Returns the names associated with each figure that this port will generate assuming we are under the kHierarchical convention.
Parameters
- output_port: OutputPort
The output port for which we are generating figure names. We can extract the system name, port name, and port size from this.
- port_component_name: str
A “sub-component” of the port that we wish to give a unique name in the figures.
Returns
- paths_out: List[Path]
Each path in this list is a file path for an associated figure.
- name_of_data_at_index(dim_index: int, target_port: OutputPort, associated_log_sink: VectorLogSink, remove_spaces: bool = False) str[source]¶
Description
Returns the name of the data which is in index dim_index of this vector-valued port.
TODO(kwesi): Consider moving this to its own utility file, outside of the file manager.
Parameters
- selfPortWatcherPlotter
The PortWatcherPlotter object.
- dim_indexint
The index of the data in the port.
- remove_spacesbool
Whether to remove spaces from the name.
Returns
- name_of_data: str
The name of the data at index dim_index.
- property plot_dir: Path¶
Description
This function returns the directory where the plots will be saved.
Returns
- plot_dir: Path
The directory where the plots will be saved.
- property raw_data_dir: Path¶
Description
This function returns the directory where the raw data will be saved.
Returns
- raw_data_dir: Path
The directory where the raw data will be saved.
- raw_data_file_path(system_name: str, port_name: str, port_component_name: str = None) Path[source]¶
Description
This function returns the file name for saving raw data.
Parameters
- port_component_name: str
A “sub-component” of the port that we wish to give a unique name in the data.
Returns
- raw_data_file_name: Path
The file name for saving raw data.
- class brom_drake.watchers.port_watcher.plotter.PortWatcherPlotter(port: OutputPort, python_logger: Logger, file_manager: PortWatcherFileManager, plotting_options: PortWatcherPlottingOptions = (PortFigureArrangement.OnePlotPerPort, 300, True, 'png', FigureNamingConvention.kFlat))[source]¶
Description
A plotter for the PortWatcher object. This is responsible for interpreting the data from the
brom_drake.PortWatcher.PortWatcherTODO(Kwesi): Consider making this a dataclass
- add_to_python_report(message: str)[source]¶
Description¶
Logs a message to the Python logger. :param message: A string with the message we want to send to the logs. :return:
- compute_plot_shape(n_dims: int) Tuple[int, int][source]¶
Description
Computes the shape of the plot based on the data.
Parameters
- n_dims: int
The number of dimensions in the data.
Returns
- n_rows: int
The number of rows in the plot.
- n_columns: int
The number of columns in the plot
- data_dimension() int[source]¶
Description
Returns the dimension of the data in the port.
Parameters
- selfPortWatcherPlotter
The PortWatcherPlotter object.
Returns
- data_dim: int
The dimension of the data in the port.
- plot_logger_data(drake_vector_log: VectorLogSink, diagram_context: Context) Tuple[List[Figure], List[List[Axes]]][source]¶
Description
This function plots the data in the logger.
Parameters
- selfPortWatcherPlotter
The PortWatcherPlotter object.
- diagram_contextContext
The context of the diagram.
Returns
- Tuple[List[plt.Figure], List[List[plt.Axes]]]
A tuple where: - the first element is a list of figures and - the second element is a list of lists of axes.
- plot_logger_data_subplots(times: array, data: array, drake_vector_log: VectorLogSink)[source]¶
Description
This function plots the data in the logger.
TODO(Kwesi): Consider adding refactoring this to remove dependecny on drake_vector_log
Parameters
- selfPortWatcherPlotter
The PortWatcherPlotter object.
- timesnp.array
The times at which the data was recorded.
- datanp.array
The data that was recorded.
Returns
- fig_out: plt.Figure
The figure containing subplots that we use in the output.
- axes_list: list[plt.Axes]
The list of axes.
- save_figures(vector_log_sink: VectorLogSink, diagram_context: Context, port_component_name: str | None = None)[source]¶
Description
This function saves the figures.
Note
TODO(kwesi): Make it so that this function computes names + directory structure based on plot arrangement.
Parameters
- selfPortWatcherPlotter
The PortWatcherPlotter object.
- diagram_contextContext
The context of the diagram.
- class brom_drake.watchers.port_watcher.port_figure_arrangement.PortFigureArrangement(*values)[source]¶
- class brom_drake.watchers.port_watcher.port_watcher.PortWatcher(output_port: OutputPort, builder: DiagramBuilder, python_logger: Logger, logger_name: str = None, options: PortWatcherOptions = ((PortFigureArrangement.OnePlotPerPort, 300, True, 'png', FigureNamingConvention.kFlat), (True, 'npy')), base_watcher_dir: str = './brom')[source]¶
Description
The real workhorse of the
DiagramWatcherclass. This class adds the elements to the drake diagram that will monitor a given system’s output port (output_port), if possible.- get_data_dictionary(diagram_context: Context) Dict[str, ndarray][source]¶
Description
Returns the data recorded by this PortWatcher as a dictionary.
Warning
The data in the dictionary is only available after the diagram has been simulated and the VectorLogSinks have recorded data.
The keys of the dictionary are the names of the output ports being watched, and the values are the corresponding data as numpy arrays. Importantly, most of the time this data dictionary contains only one key-value pair, since most ports being watched are not list-valued (i.e., they do not have “components” that require multiple VectorLogSinks to monitor). However, in the case of list-valued ports, there will be multiple key-value pairs in the dictionary, where each key corresponds to a different component of the port (e.g., “element_0_out”, “element_1_out”, etc.). In this case, the user can use the output port name (which is included in the keys) to determine which component of the port each key-value pair corresponds to.
Note
The dimensions of the data will always be (n_data_dim, n_timesteps). In other words, there will be one row for each dimension of the data, and one column for each time step recorded by the VectorLogSinks.
Parameters
- diagram_context: Context
The context of the diagram.
Returns
- data: Dict[str, np.ndarray]
A dictionary containing the data from all of the VectorLogSinks associated with this PortWatcher. The keys are the names of the output ports being watched.
- get_timing_array(diagram_context: Context) ndarray[source]¶
Description
Returns the timing data recorded by this PortWatcher as a numpy array.
Parameters
- diagram_context: Context
The context of the diagram.
Returns timing: np.ndarray
A numpy array containing the timing data from one of the VectorLogSinks associated with this PortWatcher. Since all of the VectorLogSinks should have the same timing, it does not matter which one we pull the timing data from.
- get_vector_log_sink(with_index: int = None, with_output_port_name: str = None) VectorLogSink[source]¶
Description
Returns the VectorLogSink corresponding to the given index or output port name. By default, this returns the first VectorLogSink in the internal dictionary.
Parameters
- with_index: int, optional
The index of the VectorLogSink to return. By default, None.
- with_output_port_name: str, optional
The name of the output port of the VectorLogSink to return. By default, None.
Returns
- vector_log_sink: VectorLogSink
The VectorLogSink corresponding to the given index or output port name.
- safe_system_name(system: LeafSystem = None) str[source]¶
Description
Returns a safe name for the system.
Returns
- name: str
System’s name
- save_all_figures(diagram_context: Context)[source]¶
Description
Saves all of the figures that the PortWatcher should according to its options. This may generate one figure for each dimension of the output port being watched, or just one figure for the entire port.
Parameters
- diagram_context: Context
The context of the diagram.
- save_raw_data(diagram_context: Context)[source]¶
Description
Saves the raw data to file(s). The number of files depends on the components in the data AND the options of the PortWatcher (i.e., whether to save each component in a separate file or all in one file).
Arguments
- diagram_context: Context
The context of the diagram.
Notes
TODO(Kwesi): Take advantage of get_data_dictionary function to clean up this code.
- class brom_drake.watchers.port_watcher.port_watcher_options.FigureNamingConvention(*values)[source]¶
Description
This enum is used to define the naming convention for the figures.
- class brom_drake.watchers.port_watcher.port_watcher_options.PortWatcherOptions(plotting, raw_data)[source]¶
- plotting: PortWatcherPlottingOptions¶
Alias for field number 0
- raw_data: PortWatcherRawDataOptions¶
Alias for field number 1
- class brom_drake.watchers.port_watcher.port_watcher_options.PortWatcherPlottingOptions(plot_arrangement, plot_dpi, save_to_file, file_format, figure_naming_convention)[source]¶
- figure_naming_convention: FigureNamingConvention¶
Alias for field number 4
- file_format: str¶
Alias for field number 3
- plot_arrangement: PortFigureArrangement¶
Alias for field number 0
- plot_dpi: int¶
Alias for field number 1
- save_to_file: bool¶
Alias for field number 2
Functions¶
- brom_drake.watchers.port_watcher.support_types.assert_abstract_value_ports_type_is_supported(output_port_value: Any)[source]¶
- brom_drake.watchers.port_watcher.support_types.assert_port_is_supported(output_port: OutputPort)[source]¶
Description
Checks to see if the port is of the correct type for plotting.
- brom_drake.watchers.port_watcher.support_types.create_port_value_type_error(example_value: Any) ValueError[source]¶
Description
Creates an error message for the port value type.
Parameters
- example_value: Any
The type of value in the problematic output port.
Returns
- error_out: ValueError
The error message.
Variables¶
- brom_drake.watchers.port_watcher.plotter.DEFAULT_PLOT_DIR = './brom/watcher/plots'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- brom_drake.watchers.port_watcher.port_watcher.DEFAULT_BROM_DIR = './brom'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- brom_drake.watchers.port_watcher.port_watcher_options.DEFAULT_PLOT_DIR = './brom/watcher/plots'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- brom_drake.watchers.port_watcher.port_watcher_options.DEFAULT_RAW_DATA_DIR = './brom/watcher/raw_data'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- brom_drake.watchers.port_watcher.port_watcher_options.DEFAULT_WATCHER_DIR = './brom/watcher'¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.