Source code for brom_drake.watchers.port_watcher.file_manager
from brom_drake.watchers.port_watcher.file_naming_convention import (
compute_safe_system_name,
PathOrganizationConvention,
file_path_for_port_data_dimension,
generate_all_file_paths_for_ports_data,
)
from brom_drake.watchers.port_watcher.port_watcher_options import (
PortFigureArrangement,
FigureNamingConvention,
PortWatcherPlottingOptions,
PortWatcherRawDataOptions,
)
from dataclasses import dataclass
import warnings
from pathlib import Path
from pydrake.multibody.plant import MultibodyPlant
from pydrake.systems.framework import OutputPort
from pydrake.systems.primitives import VectorLogSink
from typing import List
[docs]
@dataclass
class PortWatcherFileManager:
"""
*Description*
This class manages file paths and directories for saving data
collected by the PortWatcher system.
"""
base_directory: Path
plotting_options: PortWatcherPlottingOptions
raw_data_options: PortWatcherRawDataOptions = PortWatcherRawDataOptions()
[docs]
def compute_path_for_each_figure(
self,
output_port: OutputPort,
associated_log_sink: VectorLogSink,
port_component_name: str = None,
) -> List[Path]:
"""
*Description*
Computes the names of all of the figures that will be produced for
*Returns*
figure_paths_out: List[Path]
The paths of all of the figures that will be produced by
this PortWatcherPlotter object.
"""
# Setup
plotting_options = self.plotting_options
file_format = plotting_options.file_format
log_sink_size = associated_log_sink.get_input_port().size()
# Create the figure paths based on the naming convention given to the
# PortWatcherPlotter.
file_organization_convention: PathOrganizationConvention = None
match plotting_options.figure_naming_convention:
case FigureNamingConvention.kFlat:
file_organization_convention = PathOrganizationConvention.kFlat
case FigureNamingConvention.kHierarchical:
file_organization_convention = PathOrganizationConvention.kHierarchical
case _:
raise NotImplementedError(
f"Invalid figure naming convention for figure_names(): {plotting_options.figure_naming_convention}."
)
# Now define names
match plotting_options.plot_arrangement:
case PortFigureArrangement.OnePlotPerPort:
return [
self.plot_dir
/ file_path_for_port_data_dimension(
output_port=output_port,
file_format=file_format,
organization_convention=file_organization_convention,
component_name=port_component_name,
)
]
case PortFigureArrangement.OnePlotPerDim:
# If there is a sub-component name, then we will
# create a sub-directory for it
relative_file_paths = generate_all_file_paths_for_ports_data(
output_port=output_port,
file_format=file_format,
organization_convention=file_organization_convention,
component_name=port_component_name,
)
return [
self.plot_dir / relative_file_path
for relative_file_path in relative_file_paths
]
case _:
raise NotImplementedError(
f"Invalid plot arrangement for figure naming convention {plotting_options.figure_naming_convention}: {plotting_options.plot_arrangement}."
)
[docs]
def figure_names_under_flat_convention(
self,
output_port: OutputPort,
associated_log_sink: VectorLogSink,
port_component_name: str = None,
) -> List[Path]:
"""
*Description*
Returns the names associated with each figure that this port will
generate assuming we are under the kFlat convention.
*Arguments*
output_port: OutputPort
The output port for which we are generating figure names.
We can extract the system name, port name, and port size from this.
port_component_name: str
A "sub-component" of the port that we wish to give a unique name in the
figures.
*Returns*
figure_names: List[Path]
List of paths where each path is a file name for an associated figure.
.. deprecated::
Use :meth:`compute_path_for_each_figure` instead.
This method will be removed in a future release.
"""
warnings.warn(
"figure_names_under_flat_convention is deprecated and will be removed in a future release. "
"Use compute_path_for_each_figure instead.",
DeprecationWarning,
stacklevel=2,
)
# Setup
plotting_options = self.plotting_options
format = plotting_options.file_format
plot_dir = self.plot_dir
# The naming also depends on the arrangement of the plots
# (i.e., if there is one plot per port, or one plot per dimension)
system = output_port.get_system()
system_name = system.get_name()
safe_system_name = compute_safe_system_name(system_name)
port_name = output_port.get_name()
log_sink_size = associated_log_sink.get_input_port().size()
match plotting_options.plot_arrangement:
case PortFigureArrangement.OnePlotPerPort:
return [
plot_dir
/ file_path_for_port_data_dimension(
output_port=output_port,
file_format=format,
dimension_name=port_component_name,
)
]
case PortFigureArrangement.OnePlotPerDim:
if port_component_name is None:
# If there is no sub-component name, then we just
# create the files in the main plot directory
relative_file_paths = generate_all_file_paths_for_ports_data(
output_port=output_port,
file_format=format,
organization_convention=PathOrganizationConvention.kFlat,
)
return [
plot_dir / relative_file_path
for relative_file_path in relative_file_paths
]
else:
# If there is a sub-component name, then we will
# create a sub-directory for it
dimension_names = {
dimension: f"{port_component_name}/dim{dimension}"
for dimension in range(log_sink_size)
}
relative_file_paths = generate_all_file_paths_for_ports_data(
output_port=output_port,
file_format=format,
dimension_names=dimension_names,
organization_convention=PathOrganizationConvention.kFlat,
)
return [
plot_dir / relative_file_path
for relative_file_path in relative_file_paths
]
case _:
raise NotImplementedError(
f"Invalid plot arrangement for figure naming convention {plotting_options.figure_naming_convention}: {plotting_options.plot_arrangement}."
)
[docs]
def figure_names_under_hierarchical_convention(
self,
output_port: OutputPort,
associated_log_sink: VectorLogSink,
port_component_name: str = None,
) -> List[Path]:
"""
*Description*
Returns the names associated with each figure that this port will
generate assuming we are under the kHierarchical convention.
*Parameters*
output_port: OutputPort
The output port for which we are generating figure names.
We can extract the system name, port name, and port size from this.
port_component_name: str
A "sub-component" of the port that we wish to give a unique name in the figures.
*Returns*
paths_out: List[Path]
Each path in this list is a file path for an associated figure.
.. deprecated::
Use :meth:`compute_path_for_each_figure` instead.
This method will be removed in a future release.
"""
warnings.warn(
"figure_names_under_hierarchical_convention is deprecated and will be removed in a future release. "
"Use compute_path_for_each_figure instead.",
DeprecationWarning,
stacklevel=2,
)
# Setup
plotting_options = self.plotting_options
format = plotting_options.file_format
# Compute the figure paths based on the arrangement of the plots
# (i.e., if there is one plot per port, or one plot per dimension
system = output_port.get_system()
system_name = system.get_name()
safe_system_name = compute_safe_system_name(system_name)
port_name = output_port.get_name()
log_sink_size = associated_log_sink.get_input_port().size()
if plotting_options.plot_arrangement == PortFigureArrangement.OnePlotPerPort:
return [
self.plot_dir
/ file_path_for_port_data_dimension(
output_port=output_port,
file_format=format,
dimension_name=port_component_name,
organization_convention=PathOrganizationConvention.kHierarchical,
)
]
elif plotting_options.plot_arrangement == PortFigureArrangement.OnePlotPerDim:
if port_component_name is None:
# If there is no sub-component name, then we just
# create the files in the main plot directory
relative_file_paths = generate_all_file_paths_for_ports_data(
output_port=output_port,
file_format=format,
organization_convention=PathOrganizationConvention.kHierarchical,
)
return [
self.plot_dir / relative_file_path
for relative_file_path in relative_file_paths
]
else:
# If there is a sub-component name, then we will
# create a sub-directory for it
return [
self.plot_dir
/ f"system_{safe_system_name}/port_{port_name}/{port_component_name}/dim_{self.name_of_data_at_index(ii, output_port, associated_log_sink, remove_spaces=True)}.{format}"
for ii in range(log_sink_size)
]
else:
raise NotImplementedError(
f"Invalid plot arrangement for figure naming convention {plotting_options.figure_naming_convention}: {plotting_options.plot_arrangement}."
)
[docs]
def name_of_data_at_index(
self,
dim_index: int,
target_port: OutputPort,
associated_log_sink: VectorLogSink,
remove_spaces: bool = False,
) -> str:
"""
*Description*
Returns the name of the data which is in index dim_index
of this vector-valued port.
TODO(kwesi): Consider moving this to its own utility file, outside of
the file manager.
*Parameters*
self : PortWatcherPlotter
The PortWatcherPlotter object.
dim_index : int
The index of the data in the port.
remove_spaces : bool
Whether to remove spaces from the name.
*Returns*
name_of_data: str
The name of the data at index dim_index.
"""
# Setup
plotting_options = self.plotting_options
n_dims_sink = associated_log_sink.get_input_port().size()
system = target_port.get_system()
# Input Processing
if dim_index >= n_dims_sink:
raise ValueError(
f"dim_index ({dim_index}) is greater than the number of dimensions in the port ({n_dims_sink})."
)
# Default name
name = f"Dim #{dim_index}"
# If we are using the OnePlotPerPort config, then use the default name
if plotting_options.plot_arrangement == PortFigureArrangement.OnePlotPerPort:
return name
# Otherwise, try to get a better name
# - For MultibodyPlants, we can get specific names for certain dimensions of the "state" output port
if type(system) is MultibodyPlant:
# The multi-body plant has names for specific ports
if target_port.get_name() == "state":
# We can get the names of the state
state_names = system.GetStateNames()
name = state_names[dim_index]
# Filter our spaces, if requested
if remove_spaces:
name = name.replace(" ", "_")
# Return name!
return name
@property
def plot_dir(self) -> Path:
"""
*Description*
This function returns the directory where the plots will be saved.
*Returns*
plot_dir: Path
The directory where the plots will be saved.
"""
return self.base_directory / "plots"
@property
def raw_data_dir(self) -> Path:
"""
*Description*
This function returns the directory where the raw data will be saved.
*Returns*
raw_data_dir: Path
The directory where the raw data will be saved.
"""
return self.base_directory / "raw_data"
[docs]
def raw_data_file_path(
self, output_port: OutputPort, port_component_name: str = None
) -> Path:
"""
*Description*
This function returns the file name for saving raw data.
*Parameters*
output_port: OutputPort
The output port for which to generate the file path.
port_component_name: str
A "sub-component" of the port that we wish to give a unique name in the data.
*Returns*
raw_data_file_name: Path
The file name for saving raw data.
"""
return self.raw_data_dir / file_path_for_port_data_dimension(
output_port=output_port,
file_format=self.raw_data_options.file_format,
organization_convention=self.raw_data_options.file_organization_convention,
component_name=port_component_name,
dimension_name=output_port.get_name() + "_data",
)
[docs]
def time_data_file_path(
self, output_port: OutputPort, component_name: str = None
) -> Path:
"""
*Description*
This function returns the file name for saving time data.
*Returns*
time_data_file_path: Path
The file name for saving time data.
"""
port_name = output_port.get_name()
return self.raw_data_dir / file_path_for_port_data_dimension(
output_port=output_port,
file_format=self.raw_data_options.file_format,
organization_convention=self.raw_data_options.file_organization_convention,
component_name=component_name,
dimension_name=port_name + "_times",
)