from importlib import resources as impresources
from typing import List, Tuple, Callable
import networkx as nx
import numpy as np
from pydrake.common.eigen_geometry import Quaternion
from pydrake.common.value import AbstractValue
from pydrake.geometry import GeometrySet, CollisionFilterDeclaration
from pydrake.math import RigidTransform, RollPitchYaw
from pydrake.multibody.parsing import Parser
from pydrake.multibody.plant import MultibodyPlant
from pydrake.systems.framework import Diagram, Context
from pydrake.systems.primitives import ConstantValueSource
# Internal Imports
import brom_drake.robots as robots
from brom_drake.motion_planning.systems.open_loop_dispensers.open_loop_plan_dispenser import OpenLoopPlanDispenser
from brom_drake.stations.kinematic import UR10eStation as KinematicUR10eStation
from brom_drake.productions import ProductionID
from brom_drake.productions.roles import Role
from brom_drake.productions.types.motion_planning import KinematicMotionPlanningProduction
from brom_drake.utils import Performer, GroundShape, AddGround, MotionPlan
[docs]
class ShelfPlanning1(KinematicMotionPlanningProduction):
"""
*Description*
This production contains a simple motion planning task
where we attempt to reach to different places around the shelfs
of a bookcase.
This is a Kinematic motion planning production, which means that the
the robot's actuators will not be driven. Instead, the robot's links will
be reset to "exactly" the configurations in the motion plan. This is not realistic
but tests the correctness of the plan without worrying about the control problem
of moving the arm.
*Parameters*
time_step: float, optional
The time step for the production, by default 1e-3.
shelf_pose: RigidTransform, optional
The pose of the shelf in the world frame, by default None.
meshcat_port_number: int, optional
The port number for meshcat visualization, by default 7001.
plan_execution_speed: float, optional
The speed at which to execute the motion plan, by default 0.2
*Usage*
You may use this in a similar way to all other KinematicMotionPlanningProduction
objects. ::
# Other imports above...
from brom_drake.motion_planning.algorithms.rrt import RRTConnectPlannerConfig, RRTConnectPlanner
from brom_drake.productions.motion_planning.offline import ShelfPlanning1
# Define the goal pose
easy_goal_position = np.array([+0.0, 1.0, 1.05])
goal_orientation = RollPitchYaw(np.pi / 2.0, np.pi / 2.0, 0.0).ToQuaternion()
goal_pose = RigidTransform(goal_orientation, easy_goal_position)
# Create the production
production = ShelfPlanning1(
meshcat_port_number=meshcat_port_number, # Use None for CI
goal_pose=goal_pose,
)
# Create a planner object which will be used to plan the motion
config = RRTConnectPlannerConfig(
steering_step_size=0.01,
prob_sample_goal=0.05,
max_iterations=int(1e5),
convergence_threshold=1e-3,
)
planner2 = RRTConnectPlanner(
production.arm,
production.plant,
production.scene_graph,
config=config,
)
# To build the production, we only need to provide a planning function
# (can come from anywhere, not just a BaseRRTPlanner object)
diagram, diagram_context = production.easy_cast_and_build(
planner2.plan,
with_watcher=True,
)
print("Simulating...")
# Simulate the diagram
simulator = Simulator(diagram, diagram_context)
simulator.set_target_realtime_rate(1.0)
# Run simulation
simulator.Initialize()
simulator.AdvanceTo(0.1)
planned_trajectory = production.plan_dispenser.planned_trajectory
print(f"Expected end time of the planned trajectory: {planned_trajectory.end_time()}")
# return
simulator.AdvanceTo(planned_trajectory.end_time()+1.0)
"""
def __init__(
self,
time_step: float = 1e-3,
shelf_pose: RigidTransform = None, # The pose of the shelf
meshcat_port_number: int = 7001, # Usually turn off for CI (i.e., make it None)
plan_execution_speed: float = 0.2,
**kwargs,
):
"""
*Description*
Constructor for ShelfPlanning1.
This:
- Stores all inputs in the appropriate places
- Selects the default shelf_pose, if none is given
- Sets the cupboard joints to be in the "open" configuration
- constructs the KinematicUR10eStation used in testing
- Sets the names of the plant and scene graph to use the ShelfPlanning tag
*Parameters*
time_step: float, optional
The time step for the production, by default 1e-3.
shelf_pose: RigidTransform, optional
The pose of the shelf in the world frame, by default None.
meshcat_port_number: int, optional
The port number for meshcat visualization, by default 7001.
plan_execution_speed: float, optional
The speed at which to execute the motion plan, by default 0.2
"""
super().__init__(**kwargs)
self.time_step = time_step
self.shelf_pose = shelf_pose
self.meshcat_port_number = meshcat_port_number
self.plan_execution_speed = plan_execution_speed
# Input Processing
if self.shelf_pose is None:
self.shelf_pose = RigidTransform(
RollPitchYaw(0.0, 0.0, +np.pi/2.0).ToQuaternion(),
np.array([0.0, 1.0, 0.65]),
)
self.desired_cupboard_positions = np.array([0.2, 0.3])
# Create containers for meshcat and other values we will set later
self.meshcat = None
self.plan_dispenser = None
# Set station
self.station = KinematicUR10eStation(
time_step=self.time_step,
meshcat_port_number=self.meshcat_port_number,
)
self.arm = self.station.arm
self.robot_model_idx_ = self.station.arm
# Set Names of Plant and scene graph
self.plant = self.station.plant
self.scene_graph = self.station.scene_graph
self.plant.set_name(f"Shelf1_Production_Plant")
self.scene_graph.set_name(f"Shelf1_Production_SceneGraph")
# Create variables for the models we have
self.shelf_model_index = None
self.geometry_ids_to_ignore = []
[docs]
def add_supporting_cast(self):
"""
**Description**
Modifies the plant and the builder by:
- Adding UR10e station to the plant
- Adding shelf to the plant
- Adding a "ground" to the plant
- Adding features required by the KinematicMotionPlanningProductionFeatures
- Robot's ModelInstanceIndex Source
- Motion Planning Components
- Start and Goal Pose sources
- Setting initial configuration of the arm for the plant
"""
# Call the parent class method
super().add_supporting_cast()
# Setup
self.add_ur10e_station() # Use the UR10e station's plant + scene graph for all other objects
# Add obstacles
self.add_shelf(self.plant)
# Add the ground as well as the start and goal locations
AddGround(self.plant)
self.station.Finalize()
# Add The Motion Planning Components (e.g., the interpolator)
self.add_robot_source_system()
self.add_motion_planning_components()
self.add_start_and_goal_sources_to_builder()
# Add initial conditions for the arm
self.initial_condition_manager.add_initial_configuration(
model_instance_index=self.arm,
configuration=self.start_configuration,
)
# The initial condition will be set for the plant when we build the production.
# Print message to user
print("Added all secondary cast members to the builder.")
[docs]
def add_motion_planning_components(self):
"""
*Description*
Connects the plan dispenser's output port (i.e., the reference signal for the
arm's joint controller) to the input of the UR10eStation.
"""
# Setup
n_actuated_dof = self.plant.num_actuated_dofs()
# Add the Plan Dispenser and connect it to the station
self.plan_dispenser = self.builder.AddSystem(
OpenLoopPlanDispenser(n_actuated_dof, self.plan_execution_speed)
)
self.builder.Connect(
self.plan_dispenser.GetOutputPort("point_in_plan"),
self.station.GetInputPort("desired_joint_positions"),
)
[docs]
def add_shelf(self, plant: MultibodyPlant):
"""
*Description*
Adds a model of the cupboard to the production's plant.
*Parameter*
plant: pydrake.multibody.plant.MultibodyPlant
The plant that we want to add the cupboard to.
"""
# Setup
urdf_file_path = str(
impresources.files(robots) / "models/cupboard/cupboard.sdf"
)
# Add the shelf to the plant
model_idcs = Parser(plant=plant).AddModels(urdf_file_path)
self.shelf_model_index = model_idcs[0]
# Weld the bookshelf to the world frame
plant.WeldFrames(
plant.world_frame(),
plant.GetFrameByName("cupboard_body", self.shelf_model_index),
self.shelf_pose,
)
[docs]
def add_ur10e_station(self):
"""
*Description*
Add the Kinematic UR10e station to the production's plant for simple motion planning.
"""
# Setup
# Add station
self.builder.AddSystem(self.station)
[docs]
def add_cast_and_build(
self,
cast: List[Tuple[Role, Performer]] = [],
with_watcher: bool = False,
) -> Tuple[Diagram, Context]:
"""
*Description*
Modifies the normal add_cast_and_build, so that
we share the context of the plant with the appropriate
parts of the system.
In addition to using the parent class's implementation of add_cast_and_build, this:
- Configures the collision filter to avoid spurious collisions between the cupboard's
various parts and itself
- Insert the motion planner role into the diagram and connect it properly
TODO(kwesi): Consider making this a standard part of the
OfflineKinematicMotionPlanningProduction class.
*Parameters*
cast: List[Tuple[Role, Performer]], optional
The cast to add to the production, by default [].
with_watcher: bool, optional
A Boolean that determines whether to add a watcher to the diagram, by default False.
*Returns*
diagram: Diagram
The built diagram.
diagram_context: Context
The context for the built diagram.
This context will be used to initialize the watcher.
"""
diagram, diagram_context = super().add_cast_and_build(
main_cast_members=cast,
with_watcher=with_watcher,
)
# Configure the scene graph for collision detection
self.configure_collision_filter(
diagram.GetSubsystemContext(
self.scene_graph, diagram_context,
)
)
# Connect arm controller to the appropriate plant_context
self.station.arm_controller.plant_context = diagram.GetSubsystemContext(
self.station.arm_controller.plant, diagram_context,
)
for role_ii, performer_ii in cast:
if role_ii.name == "OfflineMotionPlanner":
performer_ii.set_internal_root_context(
diagram_context
)
return diagram, diagram_context
[docs]
def easy_cast_and_build(
self,
planning_algorithm: Callable[
[np.ndarray, np.ndarray, Callable[[np.ndarray], bool]],
Tuple[MotionPlan, bool],
],
with_watcher: bool = False,
) -> Tuple[Diagram, Context]:
"""
*Description*
This function is used to easily cast and build the production.
*Parameters*
planning_algorithm: Callable[[np.ndarray, np.ndarray, Callable[[np.ndarray], bool]], Tuple[MotionPlan, bool]
The algorithm that produces a path from the start configuration to goal configuration
using the collision checker for the scene.
with_watcher: bool, optional
A Boolean that determines whether to add a watcher to the diagram.
Default is False.
*Returns*
diagram: pydrake.systems.framework.Diagram
The diagram that represents the fully assembled cast, all connected together.
diagram_context: pdrake.systems.framework.Context
The context for the diagram (used by the diagram watcher to monitor the system, if defined).
.. note::
This method returns a `pydrake.systems.framework.Diagram` and its associated `pydrake.systems.framework.Context`.
When constructing a Drake simulation of this, you should use these objects to create the `pydrake.systems.analysis.Simulator`.
"""
# Setup
# Use Base class implementation to start
diagram, diagram_context = super().easy_cast_and_build(
planning_algorithm,
with_watcher=with_watcher,
)
# Configure the scene graph for collision detection
self.configure_collision_filter(
diagram.GetSubsystemContext(
self.scene_graph, diagram_context,
)
)
# Connect arm controller to the appropriate plant_context
self.station.arm_controller.plant_context = diagram.GetSubsystemContext(
self.station.arm_controller.plant, diagram_context,
)
self.performers[0].set_internal_root_context(
diagram_context
)
# Set the initial positions of the arm
# self.station.plant.SetPositions(
# self.station.plant.GetMyMutableContextFromRoot(diagram_context),
# self.arm,
# self.start_configuration,
# )
return diagram, diagram_context
@property
def goal_pose(self) -> RigidTransform:
"""
*Description*
Get the goal pose.
*Returns*
pose_WorldGoal: RigidTransform
The goal pose.
"""
# Algorithm
if self._goal_pose is not None:
# If we have already defined the goal pose, then return it
return self._goal_pose
elif (self._goal_pose is None) and (self._goal_config is None):
# If we have no guidance on how to start, then we will use a default
goal_position = np.array([+0.0, 1.0, 0.6])
goal_orientation = RollPitchYaw(np.pi / 2.0, np.pi / 2.0, 0.0).ToQuaternion()
self._goal_pose = RigidTransform(goal_orientation, goal_position)
return self._goal_pose
elif (self._goal_pose is None) and (self._goal_config is not None):
# Use the goal configuration to get the goal pose
# Using a "forward kinematics solver"
self._goal_pose = self.solve_forward_kinematics_problem_for_arm(self._goal_config)
return self._goal_pose
else:
raise ValueError(
f"Unexpected behavior. This should never happen."
)
@property
def id(self) -> ProductionID:
"""Always returns ``ProductionID.kShelfPlanning1``"""
return ProductionID.kShelfPlanning1
[docs]
def solve_forward_kinematics_problem_for_arm(
self,
robot_joint_positions: np.ndarray,
target_frame_name: str = "ft_frame",
) -> RigidTransform:
"""
*Description*
This method solves the forward kinematics problem for the UR10e arm
by itself.
TODO(Kwesi): Attempt to make this a utility (it is duplicated multiple times!)
*Parameters*
robot_joint_positions: np.ndarray of shape (6,)
The joint configuration of the robot in the production.
target_frame_name: str, optional
The frame that we compute the pose of with respect to the world origin frame.
Default is "ft_frame"
*Returns*
pose_WorldTarget: RigidTransform
The end effector pose of the robot expressed in the world frame.
"""
# Setup
# Create shadow plant and populate it with the UR10e arm
shadow_station = KinematicUR10eStation(
time_step=self.time_step,
meshcat_port_number=None,
)
shadow_station.Finalize()
# Use station's plant to solve the forward kinematics problem
shadow_plant = shadow_station.plant
temp_context = shadow_plant.CreateDefaultContext()
shadow_plant.SetPositions(
temp_context,
shadow_station.arm,
robot_joint_positions,
)
# Get the pose of the end effector
end_effector_pose = shadow_plant.EvalBodyPoseInWorld(
temp_context,
shadow_plant.GetBodyByName(target_frame_name),
)
return end_effector_pose
@property
def start_pose(self) -> RigidTransform:
"""
*Description*
Get the start pose.
*Returns*
pose_WorldStart: RigidTransform
The start pose of the robot's end effector frame with respect to the world frame.
"""
# Setup
if self._start_pose is not None:
# If we have already defined the start pose, then return it
return self._start_pose
elif (self._start_pose is None) and (self._start_config is None):
# If we have no guidance on how to start, then we will use a default
start_position = np.array([+0.3, 0.1, 1.2])
start_orientation = Quaternion(1, 0, 0, 0)
self._start_pose = RigidTransform(start_orientation, start_position)
return self._start_pose
elif (self._start_pose is None) and (self._start_config is not None):
# Use the start configuration to get the starting pose
# Using a "forward kinematics solver"
self._start_pose = self.solve_forward_kinematics_problem_for_arm(self._start_config)
return self._start_pose
else:
raise ValueError(
f"Unexpected behavior. This should never happen."
)