Source code for brom_drake.watchers.port_watcher.file_naming_convention
from enum import IntEnum
from pathlib import Path
from typing import List
from pydrake.systems.framework import (
LeafSystem,
OutputPort,
PortDataType,
)
[docs]
class PathOrganizationConvention(IntEnum):
"""
**Description**
This enum is used to define the organization convention for the file paths.
"""
kFlat = 0 # e.g. "plant_generalized_output_dim_0.png"
kHierarchical = 1 # e.g. "system_plant/port_generalized_output/dim_0.png"
[docs]
def compute_safe_system_name(system_name: str) -> str:
"""
*Description*
This function returns a filesystem-safe version of the system name.
*Returns*
safe_system_name: str
The filesystem-safe version of the system name.
"""
# First, let's check to see how many "/" exist in the name
slash_occurences = [i for i, letter in enumerate(system_name) if letter == "/"]
if len(slash_occurences) > 0:
system_name = system_name[
slash_occurences[-1] + 1 :
] # truncrate string based on the last slash
# Second, replace all spaces with underscores
system_name = system_name.replace(" ", "_")
return system_name
[docs]
def generate_all_file_paths_for_ports_data(
output_port: OutputPort,
file_format: str,
organization_convention: PathOrganizationConvention,
dimension_names: dict[int, str] = None,
component_name: str = None,
) -> List[Path]:
"""
*Description*
Generates ALL file path for the data of the given output port.
For ports that have multiple dimensions, this will generate a file path for each dimension.
For example, if the output port is a 3-dimensional vector, this function will generate 3 file paths,
one for each dimension.
*Parameters*
output_port: OutputPort
The output port for which to generate the file path.
file_format: str
The file format for the data (e.g., "npy", "csv", etc.).
component_name: str, optional
The name of the COMPONENT of the output port for which to generate the file path.
This is used because some output ports produce dictionaries of data, where each component of the dictionary has a different name.
"""
# Identify the number of dimensions of the output port
if output_port.get_data_type() == PortDataType.kVectorValued:
num_dimensions = output_port.size()
else:
raise ValueError(
f"Unsupported output port data type: {output_port.get_data_type()}"
)
# Create dimension names dict, if it is not provided
if dimension_names is None:
dimension_names = {
dimension: f"dim{dimension}" for dimension in range(num_dimensions)
}
# Generate file paths for each dimension
file_paths = []
for dimension, dimension_name in dimension_names.items():
file_path = file_path_for_port_data_dimension(
output_port=output_port,
file_format=file_format,
organization_convention=organization_convention,
dimension_name=dimension_name,
component_name=component_name,
)
file_paths.append(file_path)
return file_paths
[docs]
def file_path_for_port_data_dimension(
output_port: OutputPort,
file_format: str,
dimension_name: str = None,
organization_convention: PathOrganizationConvention = PathOrganizationConvention.kFlat,
component_name: str = None,
) -> Path:
"""
*Description*
Generates a file path for the data of the given output port and dimension.
*Parameters*
output_port: OutputPort
The output port for which to generate the file path.
dimension_name: str, optional
The name of the dimension of the output port for which to generate the file path.
By default, this is set to None, which means that the file path will be generated for the first dimension of the output port.
component_name: str, optional
The name of the COMPONENT of the output port for which to generate the file path.
This is used because some output ports produce dictionaries of data, where each component of the dictionary has a different name.
file_format: str
The file format for the data (e.g., "npy", "csv", etc.).
organization_convention: PathOrganizationConvention, optional
The organization convention for the file paths. Defaults to PathOrganizationConvention.kFlat.
"""
# Setup
dimension_name_is_empty = dimension_name is None or dimension_name == ""
component_name_is_empty = component_name is None or component_name == ""
# Collect System and Port Names
# - System Name
system: LeafSystem = output_port.get_system()
system_name = system.get_name()
safe_system_name = compute_safe_system_name(system_name)
# - Port Name
port_name = output_port.get_name()
# Build File Path
file_path: str = ""
if organization_convention == PathOrganizationConvention.kFlat:
file_path = f"system_{safe_system_name}_port_{port_name}"
# Remove the dimension part of the file path if the output port is not vector-valued (i.e., if it only has one dimension)
if not component_name_is_empty:
file_path += f"_{component_name}"
if not dimension_name_is_empty:
file_path += f"_{dimension_name}"
elif organization_convention == PathOrganizationConvention.kHierarchical:
file_path = f"system_{safe_system_name}/port_{port_name}"
# Remove the dimension part of the file path if the output port is not vector-valued (i.e., if it only has one dimension)
if not component_name_is_empty:
file_path += f"/{component_name}"
if not dimension_name_is_empty:
file_path += f"/{dimension_name}"
else:
raise ValueError(
f"Unsupported organization convention: {organization_convention}"
)
# Add suffix
file_path += f".{file_format}"
return Path(file_path)