Source code for brom_drake.motion_planning.systems.open_loop_dispensers.pose

from enum import IntEnum
import numpy as np
from pydrake.all import RigidTransform
from pydrake.common.value import AbstractValue
from pydrake.systems.framework import LeafSystem, Context, BasicVector
from pydrake.trajectories import PiecewisePose, PiecewisePolynomial, PiecewiseQuaternionSlerp

# Internal Imports
from brom_drake.motion_planning.systems.state_of_plan_in_memory import StateOfPlanInMemory

[docs] class OpenLoopPosePlanDispenser(LeafSystem): def __init__( self, speed: float = 0.5, # Speed at which to proceed through points. ): LeafSystem.__init__(self) # Setup self.speed = speed self.plan, self.planned_trajectory = None, None # Create input port for the plan (we assume that when we receive it, we will close the port) self.plan_is_set = False sample_plan = [RigidTransform()] self.t0, self.t_final = -1.0, -1.0 self.plan_port = self.DeclareAbstractInputPort( "plan", AbstractValue.Make(sample_plan), ) self.plan_ready_port = self.DeclareAbstractInputPort( "plan_ready", AbstractValue.Make(False), ) self.state_of_plan_in_memory_idx = self.DeclareDiscreteState( np.array([StateOfPlanInMemory.kNotSet]) ) # Create output ports self.define_output_ports()
[docs] def define_output_ports(self): """ Description ----------- This method defines the output ports for this system. """ self.DeclareAbstractOutputPort( "pose_in_plan", lambda: AbstractValue.Make(RigidTransform()), self.GetCurrentPoseInPlan, ) self.DeclareAbstractOutputPort( "plan_is_set", lambda: AbstractValue.Make(StateOfPlanInMemory.kNotSet), self.GetStateOfPlanInMemory, { self.discrete_state_ticket( self.state_of_plan_in_memory_idx, ) }, # This output should only be updated when the # discrete state state_of_plan_in_memory changes )
def GetCurrentPoseInPlan(self, context: Context, output_point: AbstractValue): # Setup plan_is_ready = self.plan_ready_port.Eval(context) # Get the abstract state abstract_state = context.get_discrete_state(self.state_of_plan_in_memory_idx) # If plan isn't ready, then skip the rest of the logic if not plan_is_ready: output_point.SetFrom( AbstractValue.Make(RigidTransform()) ) return # If this is the first time that plan_is_ready, then # let's save the current time as the time when the plan starts if abstract_state[0] == StateOfPlanInMemory.kNotSet: # Initialize the system to handle the new plan self.initialize_system_for_new_plan(context) # Use interpolation to get the current point t = context.get_time() - self.t0 # Mark the plan as set context.SetDiscreteState(np.array([ StateOfPlanInMemory.kSet, ])) # Output The Current Point if t < 0.0: output_point.SetFrom( AbstractValue.Make(self.planned_trajectory.GetPose(0.0)) ) elif t > self.t_final: output_point.SetFrom( AbstractValue.Make(self.planned_trajectory.GetPose(self.t_final)) ) else: output_point.SetFrom( AbstractValue.Make(self.planned_trajectory.GetPose(t)) )
[docs] def find_time_between_two_points(self, x1: np.ndarray, x2: np.ndarray)->float: """ Description ----------- This method computes the time that it will take to move between two points. :return: """ # Setup # Compute the distance between the two points dist = np.linalg.norm(x1 - x2) return dist / self.speed
[docs] def initialize_system_for_new_plan(self, context: Context): """ Description ----------- This function should lock a couple of things into memory for this object to handle future calls to get the plan. :param context: :return: """ # Setup # Get time to start executing the plan self.t0 = context.get_time() context.SetDiscreteState( np.array([StateOfPlanInMemory.kSet]) ) # Get times where we should reach each point in the plan and save it into a new map self.plan = self.plan_port.Eval(context) n_points_in_plan = len(self.plan) t_ii = 0.0 times = [t_ii] temp_position_plan = np.array([ self.plan[ii].translation() for ii in range(n_points_in_plan) ]) temp_quaternion_plan = np.array([ self.plan[ii].rotation().matrix() for ii in range(n_points_in_plan) ]) for ii in range(n_points_in_plan-1): # Compute the time it should take to travel between two positions in the plan t_ii += self.find_time_between_two_points( temp_position_plan[ii,:], temp_position_plan[ii+1, :], ) times += [t_ii] self.t_final = t_ii position_trajectory = PiecewisePolynomial.FirstOrderHold(times, temp_position_plan.T) orientation_trajectory = PiecewiseQuaternionSlerp(times, temp_quaternion_plan) self.planned_trajectory = PiecewisePose( position_trajectory, orientation_trajectory, )
[docs] def GetStateOfPlanInMemory(self, context: Context, output: AbstractValue): """Plan is set if and only if the internal variable is not None. """ output.SetFrom( context.get_abstract_state(self.state_of_plan_in_memory_idx)[0] )