diff --git a/src/prpy/collision.py b/src/prpy/collision.py index 04990ec..d0685a7 100644 --- a/src/prpy/collision.py +++ b/src/prpy/collision.py @@ -40,7 +40,7 @@ ) -class SimpleRobotCollisionCheckerContextManager(object): +class SimpleRobotCollisionChecker(object): """RobotCollisionChecker which uses the standard OpenRAVE interface. This RobotCollisionChecker is instantiated with a robot, @@ -88,7 +88,7 @@ def VerifyCollisionFree(self): raise SelfCollisionPlanningError.FromReport(report) -class BakedRobotCollisionCheckerContextManager(object): +class BakedRobotCollisionChecker(object): """RobotCollisionChecker which uses a baked collision interface. When this RobotCollisionChecker is instantiated with a robot, @@ -173,22 +173,20 @@ def VerifyCollisionFree(self): raise CollisionPlanningError.FromReport(report) -class SimpleRobotCollisionChecker(object): +class SimpleRobotCollisionCheckerFactory(object): def __init__(self, collision_options=CollisionOptions.ActiveDOFs): self.collision_options = collision_options def __call__(self, robot): - return SimpleRobotCollisionCheckerContextManager( - robot, self.collision_options) + return SimpleRobotCollisionChecker(robot, self.collision_options) -class BakedRobotCollisionChecker(object): +class BakedRobotCollisionCheckerFactory(object): def __init__(self, collision_options=CollisionOptions.ActiveDOFs): self.collision_options = collision_options def __call__(self, robot): - return BakedRobotCollisionCheckerContextManager( - robot, self.collision_options) + return BakedRobotCollisionChecker(robot, self.collision_options) -DefaultRobotCollisionChecker = SimpleRobotCollisionChecker() +DefaultRobotCollisionCheckerFactory = SimpleRobotCollisionCheckerFactory() diff --git a/src/prpy/planning/cbirrt.py b/src/prpy/planning/cbirrt.py index dbeb301..8799222 100644 --- a/src/prpy/planning/cbirrt.py +++ b/src/prpy/planning/cbirrt.py @@ -32,9 +32,9 @@ import openravepy from ..util import SetTrajectoryTags from ..collision import ( - BakedRobotCollisionChecker, - DefaultRobotCollisionChecker, - SimpleRobotCollisionChecker, + DefaultRobotCollisionCheckerFactory, + BakedRobotCollisionCheckerFactory, + SimpleRobotCollisionCheckerFactory, ) from base import (BasePlanner, PlanningError, UnsupportedPlanningError, ClonedPlanningMethod, Tags) @@ -43,17 +43,21 @@ class CBiRRTPlanner(BasePlanner): - def __init__(self, robot_collision_checker=DefaultRobotCollisionChecker): + def __init__(self, robot_checker_factory=None): super(CBiRRTPlanner, self).__init__() + + if robot_checker_factory is None: + robot_checker_factory = DefaultRobotCollisionCheckerFactory + self.problem = openravepy.RaveCreateProblem(self.env, 'CBiRRT') if self.problem is None: raise UnsupportedPlanningError('Unable to create CBiRRT module.') - self.robot_collision_checker = robot_collision_checker - if isinstance(robot_collision_checker, SimpleRobotCollisionChecker): + self.robot_checker_factory = robot_checker_factory + if isinstance(robot_checker_factory, SimpleRobotCollisionCheckerFactory): self._is_baked = False - elif isinstance(robot_collision_checker, BakedRobotCollisionChecker): + elif isinstance(robot_checker_factory, BakedRobotCollisionCheckerFactory): self._is_baked = True else: raise NotImplementedError( @@ -293,10 +297,9 @@ def Plan(self, robot, smoothingitrs=None, timelimit=None, allowlimadj=0, args += ['filename', traj_path] args_str = ' '.join(args) - # Bypass the robot_collision_checker context manager since CBiRRT does - # its own baking in C++. - robot_checker = self.robot_collision_checker(robot) - options = robot_checker.collision_options + # Bypass the context manager since CBiRRT does its own baking in C++. + collision_checker = self.robot_checker_factory(robot) + options = collision_checker.collision_options with CollisionOptionsStateSaver(env.GetCollisionChecker(), options): response = self.problem.SendCommand(args_str, True) @@ -307,8 +310,7 @@ def Plan(self, robot, smoothingitrs=None, timelimit=None, allowlimadj=0, # Construct the output trajectory. with open(traj_path, 'rb') as traj_file: traj_xml = traj_file.read() - traj = openravepy.RaveCreateTrajectory( - env, 'GenericTrajectory') + traj = openravepy.RaveCreateTrajectory(env, '') traj.deserialize(traj_xml) # Tag the trajectory as non-determistic since CBiRRT is a randomized diff --git a/src/prpy/planning/chomp.py b/src/prpy/planning/chomp.py index d772a3e..78e84e8 100644 --- a/src/prpy/planning/chomp.py +++ b/src/prpy/planning/chomp.py @@ -34,7 +34,7 @@ import numpy import openravepy from ..util import SetTrajectoryTags, GetLinearCollisionCheckPts -from ..collision import DefaultRobotCollisionChecker +from ..collision import DefaultRobotCollisionCheckerFactory from .exceptions import ( CollisionPlanningError, SelfCollisionPlanningError @@ -166,11 +166,15 @@ def get_affected_links(body, dof_indices): class CHOMPPlanner(BasePlanner): - def __init__(self, require_cache=False, - robot_collision_checker=DefaultRobotCollisionChecker): + def __init__(self, require_cache=False, robot_checker_factory=None): super(CHOMPPlanner, self).__init__() + + if robot_checker_factory is None: + robot_checker_factory = DefaultRobotCollisionCheckerFactory + + self.robot_checker_factory = robot_checker_factory self.require_cache = require_cache - self.robot_collision_checker = robot_collision_checker + self.setupEnv(self.env) def setupEnv(self, env): @@ -276,7 +280,7 @@ def _Plan(self, robot, sampling_func=VanDerCorputSampleGenerator, **kwargs): checks = GetLinearCollisionCheckPts(robot, traj, norm_order=2, sampling_func=sampling_func) - with self.robot_collision_checker(robot) as robot_checker: + with self.robot_checker_factory(robot) as robot_checker: for t, q in checks: robot.SetActiveDOFValues(q) robot_checker.VerifyCollisionFree() # Throws on collision. diff --git a/src/prpy/planning/ompl.py b/src/prpy/planning/ompl.py index dcaf503..4a19b67 100644 --- a/src/prpy/planning/ompl.py +++ b/src/prpy/planning/ompl.py @@ -36,12 +36,12 @@ ClonedPlanningMethod, Tags) from openravepy import ( CollisionOptionsStateSaver, - PlannerStatus + PlannerStatus, ) from ..collision import ( - BakedRobotCollisionChecker, - DefaultRobotCollisionChecker, - SimpleRobotCollisionChecker, + BakedRobotCollisionCheckerFactory, + DefaultRobotCollisionCheckerFactory, + SimpleRobotCollisionCheckerFactory, ) from .cbirrt import SerializeTSRChain @@ -49,13 +49,25 @@ class OMPLPlanner(BasePlanner): - def __init__(self, algorithm='RRTConnect', - robot_collision_checker=DefaultRobotCollisionChecker): + def __init__(self, algorithm='RRTConnect', robot_checker_factory=None): super(OMPLPlanner, self).__init__() + if robot_checker_factory is None: + robot_checker_factory = DefaultRobotCollisionCheckerFactory + self.setup = False self.algorithm = algorithm - self.robot_collision_checker = robot_collision_checker + + self.robot_checker_factory = robot_checker_factory + + if isinstance(robot_checker_factory, SimpleRobotCollisionCheckerFactory): + self._is_baked = False + elif isinstance(robot_checker_factory, BakedRobotCollisionCheckerFactory): + self._is_baked = True + else: + raise NotImplementedError( + 'or_ompl only supports Simple and' + ' BakedRobotCollisionCheckerFactory.') planner_name = 'OMPL_{:s}'.format(algorithm) self.planner = openravepy.RaveCreatePlanner(self.env, planner_name) @@ -65,14 +77,6 @@ def __init__(self, algorithm='RRTConnect', 'Unable to create "{:s}" planner. Is or_ompl installed?' .format(planner_name)) - if isinstance(robot_collision_checker, SimpleRobotCollisionChecker): - self._is_baked = False - elif isinstance(robot_collision_checker, BakedRobotCollisionChecker): - self._is_baked = True - else: - raise NotImplementedError( - 'or_ompl only supports Simple and BakedRobotCollisionChecker.') - def __str__(self): return 'OMPL {0:s}'.format(self.algorithm) @@ -133,9 +137,8 @@ def _Plan(self, robot, goal=None, tsrchains=None, timeout=30., shortcut_timeout= self.planner.InitPlan(robot, params) self.setup = True - # Bypass the robot_collision_checker context manager since or_ompl - # does its own baking in C++. - robot_checker = self.robot_collision_checker(robot) + # Bypass the context manager since or_ompl does its own baking. + robot_checker = self.robot_checker_factory(robot) options = robot_checker.collision_options with CollisionOptionsStateSaver(env.GetCollisionChecker(), options): status = self.planner.PlanPath(traj, releasegil=True) diff --git a/src/prpy/planning/snap.py b/src/prpy/planning/snap.py index 447b72c..277e1ac 100644 --- a/src/prpy/planning/snap.py +++ b/src/prpy/planning/snap.py @@ -36,7 +36,7 @@ ClonedPlanningMethod, Tags ) -from ..collision import DefaultRobotCollisionChecker +from ..collision import DefaultRobotCollisionCheckerFactory class SnapPlanner(BasePlanner): @@ -52,9 +52,13 @@ class SnapPlanner(BasePlanner): most commonly used as the first item in a Sequence meta-planner to avoid calling a motion planner when the trivial solution is valid. """ - def __init__(self, robot_collision_checker=DefaultRobotCollisionChecker): + def __init__(self, robot_checker_factory=None): super(SnapPlanner, self).__init__() - self.robot_collision_checker = robot_collision_checker + + if robot_checker_factory is None: + robot_checker_factory = DefaultRobotCollisionCheckerFactory + + self.robot_checker_factory = robot_checker_factory def __str__(self): return 'SnapPlanner' @@ -176,7 +180,7 @@ def _Snap(self, robot, goal, **kw_args): norm_order=2, sampling_func=vdc) - with self.robot_collision_checker(robot) as robot_checker: + with self.robot_checker_factory(robot) as robot_checker: # Run constraint checks at DOF resolution: for t, q in checks: # Set the joint positions diff --git a/src/prpy/planning/tsr.py b/src/prpy/planning/tsr.py index 89adfee..dfbc29c 100644 --- a/src/prpy/planning/tsr.py +++ b/src/prpy/planning/tsr.py @@ -36,7 +36,7 @@ UnsupportedPlanningError) from .base import Tags from ..util import SetTrajectoryTags -from ..collision import DefaultRobotCollisionChecker +from ..collision import DefaultRobotCollisionCheckerFactory from openravepy import ( IkFilterOptions, IkParameterization, @@ -58,10 +58,14 @@ def grouper(n, iterable): class TSRPlanner(BasePlanner): - def __init__(self, delegate_planner=None, robot_collision_checker=DefaultRobotCollisionChecker): + def __init__(self, delegate_planner=None, robot_checker_factory=None): super(TSRPlanner, self).__init__() + + if robot_checker_factory is None: + robot_checker_factory = DefaultRobotCollisionCheckerFactory + self.delegate_planner = delegate_planner - self.robot_collision_checker = robot_collision_checker + self.robot_checker_factory = robot_checker_factory def __str__(self): if self.delegate_planner is not None: @@ -174,7 +178,7 @@ def is_time_available(*args): # Set ActiveDOFs for IK collision checking. We intentionally # restore the original collision checking options before calling # the planner to give it a pristine environment. - with self.robot_collision_checker(robot) as robot_checker: + with self.robot_checker_factory(robot) as robot_checker: while is_time_available() and len(configurations_chunk) < chunk_size: # Generate num_candidates candidates and rank them using the # user-supplied IK ranker. diff --git a/src/prpy/planning/vectorfield.py b/src/prpy/planning/vectorfield.py index c26f026..8a395e9 100644 --- a/src/prpy/planning/vectorfield.py +++ b/src/prpy/planning/vectorfield.py @@ -35,7 +35,7 @@ import openravepy from .base import BasePlanner, PlanningError, ClonedPlanningMethod, Tags from .. import util -from ..collision import DefaultRobotCollisionChecker +from ..collision import DefaultRobotCollisionCheckerFactory from enum import Enum logger = logging.getLogger(__name__) @@ -74,9 +74,13 @@ def DoesCache(cls, status): class VectorFieldPlanner(BasePlanner): - def __init__(self, robot_collision_checker=DefaultRobotCollisionChecker): + def __init__(self, robot_checker_factory=None): super(VectorFieldPlanner, self).__init__() - self.robot_collision_checker = robot_collision_checker + + if robot_checker_factory is None: + robot_checker_factory = DefaultRobotCollisionCheckerFactory + + self.robot_checker_factory = robot_checker_factory def __str__(self): return 'VectorFieldPlanner' @@ -547,7 +551,7 @@ def fn_callback(t, q): nonlocals['exception'] = e return -1 # Stop. - with self.robot_collision_checker(robot) as robot_checker: + with self.robot_checker_factory(robot) as robot_checker: # Integrate the vector field to get a configuration space path. # # TODO: Tune the integrator parameters.