diff --git a/src/prpy/collision.py b/src/prpy/collision.py index a031c94..04990ec 100644 --- a/src/prpy/collision.py +++ b/src/prpy/collision.py @@ -26,14 +26,21 @@ # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. - -import openravepy +from openravepy import ( + CollisionOptions, + CollisionOptionsStateSaver, + CollisionReport, + RaveCreateKinBody, + openrave_exception, + ) from prpy.exceptions import PrPyException -from prpy.planning.exceptions import CollisionPlanningError -from prpy.planning.exceptions import SelfCollisionPlanningError +from prpy.planning.exceptions import ( + CollisionPlanningError, + SelfCollisionPlanningError, +) -class SimpleRobotCollisionChecker: +class SimpleRobotCollisionCheckerContextManager(object): """RobotCollisionChecker which uses the standard OpenRAVE interface. This RobotCollisionChecker is instantiated with a robot, @@ -44,10 +51,28 @@ class SimpleRobotCollisionChecker: is raised. """ - def __init__(self, robot): + def __init__(self, robot, collision_options): self.robot = robot self.env = robot.GetEnv() + self.checker = self.env.GetCollisionChecker() + if self.checker is None: + raise PrPyException('No collision checker found on environment') + + self.collision_saver = CollisionOptionsStateSaver( + self.checker, collision_options) + + @property + def collision_options(self): + return self.collision_saver.newoptions + + def __enter__(self): + self.collision_saver.__enter__() + return self + + def __exit__(self, type, value, traceback): + self.collision_saver.__exit__(type, value, traceback) + def CheckCollision(self, report=None): if self.env.CheckCollision(self.robot, report=report): return True @@ -56,14 +81,14 @@ def CheckCollision(self, report=None): return False def VerifyCollisionFree(self): - report = openravepy.CollisionReport() + report = CollisionReport() if self.env.CheckCollision(self.robot, report=report): raise CollisionPlanningError.FromReport(report) elif self.robot.CheckSelfCollision(report=report): raise SelfCollisionPlanningError.FromReport(report) -class BakedRobotCollisionChecker: +class BakedRobotCollisionCheckerContextManager(object): """RobotCollisionChecker which uses a baked collision interface. When this RobotCollisionChecker is instantiated with a robot, @@ -78,29 +103,92 @@ class BakedRobotCollisionChecker: CollisionPlanningError on collision. """ - def __init__(self, robot): + def __init__(self, robot, collision_options=CollisionOptions.ActiveDOFs): self.robot = robot self.env = robot.GetEnv() + self.checker = self.env.GetCollisionChecker() if self.checker is None: raise PrPyException('No collision checker found on environment') + + self.baked_kinbody = None + self.collision_saver = CollisionOptionsStateSaver( + self.checker, collision_options) + + @property + def collision_options(self): + return self.collision_saver.newoptions + + def __enter__(self): + if self.baked_kinbody is not None: + raise PrPyException( + 'Another baked KinBody is available. Did you call __enter__' + ' twice or forget to call __exit__?') + try: kb_type = self.checker.SendCommand('BakeGetType') - except openravepy.openrave_exception: + except openrave_exception: raise PrPyException('Collision checker does not support baking') + + # TODO: How should we handle exceptions that are thrown below? + self.collision_saver.__enter__() + # This "bakes" the following Env and Self checks. - # (after the bake, the composite check is stored in self.baked) + # (after the bake, the composite check is stored in self.baked_kinbody) self.checker.SendCommand('BakeBegin') + self.env.CheckCollision(self.robot) self.robot.CheckSelfCollision() - self.baked = openravepy.RaveCreateKinBody(self.env, kb_type) + + self.baked_kinbody = RaveCreateKinBody(self.env, kb_type) + if self.baked_kinbody is None: + raise PrPyException('Failed to create baked KinBody.') + self.checker.SendCommand('BakeEnd') + return self + + def __exit__(self, type, value, traceback): + if self.baked_kinbody is None: + raise PrPyException( + 'No baked KinBody is available. Did you call __exit__' + ' without calling __enter__ first?') + + del self.baked_kinbody + self.baked_kinbody = None + + self.collision_saver.__exit__(type, value, traceback) + def CheckCollision(self, report=None): + if self.baked_kinbody is None: + raise PrPyException( + 'No baked KinBody is available. Did you call __enter__?') + # The baked check is performed by checking self collision on baked - return self.checker.CheckSelfCollision(self.baked, report) + return self.checker.CheckSelfCollision(self.baked_kinbody, report) def VerifyCollisionFree(self): - report = openravepy.CollisionReport() + report = CollisionReport() if self.CheckCollision(report=report): raise CollisionPlanningError.FromReport(report) + + +class SimpleRobotCollisionChecker(object): + def __init__(self, collision_options=CollisionOptions.ActiveDOFs): + self.collision_options = collision_options + + def __call__(self, robot): + return SimpleRobotCollisionCheckerContextManager( + robot, self.collision_options) + + +class BakedRobotCollisionChecker(object): + def __init__(self, collision_options=CollisionOptions.ActiveDOFs): + self.collision_options = collision_options + + def __call__(self, robot): + return BakedRobotCollisionCheckerContextManager( + robot, self.collision_options) + + +DefaultRobotCollisionChecker = SimpleRobotCollisionChecker() diff --git a/src/prpy/planning/cbirrt.py b/src/prpy/planning/cbirrt.py index bc14f57..cb7c1d3 100644 --- a/src/prpy/planning/cbirrt.py +++ b/src/prpy/planning/cbirrt.py @@ -31,7 +31,11 @@ import numpy import openravepy from ..util import SetTrajectoryTags -from ..collision import SimpleRobotCollisionChecker, BakedRobotCollisionChecker +from ..collision import ( + BakedRobotCollisionChecker, + DefaultRobotCollisionChecker, + SimpleRobotCollisionChecker, +) from base import (BasePlanner, PlanningError, UnsupportedPlanningError, ClonedPlanningMethod, LockedPlanningMethod, Tags) import prpy.kin @@ -39,16 +43,17 @@ class CBiRRTPlanner(BasePlanner): - def __init__(self, robot_collision_checker=SimpleRobotCollisionChecker): + def __init__(self, robot_collision_checker=DefaultRobotCollisionChecker): super(CBiRRTPlanner, self).__init__() self.problem = openravepy.RaveCreateProblem(self.env, 'CBiRRT') if self.problem is None: raise UnsupportedPlanningError('Unable to create CBiRRT module.') - if robot_collision_checker == SimpleRobotCollisionChecker: + self.robot_collision_checker = robot_collision_checker + if isinstance(robot_collision_checker, SimpleRobotCollisionChecker): self._is_baked = False - elif robot_collision_checker == BakedRobotCollisionChecker: + elif isinstance(robot_collision_checker, BakedRobotCollisionChecker): self._is_baked = True else: raise NotImplementedError( @@ -202,7 +207,7 @@ def PlanToTSR(self, robot, tsr_chains, smoothingitrs=100, **kw_args): def Plan(self, robot, smoothingitrs=None, timelimit=1.0, allowlimadj=0, jointstarts=None, jointgoals=None, psample=None, tsr_chains=None, extra_args=None, **kw_args): - from openravepy import CollisionOptions, CollisionOptionsStateSaver + from openravepy import CollisionOptionsStateSaver env = robot.GetEnv() problem = openravepy.RaveCreateProblem(env, 'CBiRRT') @@ -301,8 +306,11 @@ def Plan(self, robot, smoothingitrs=None, timelimit=1.0, allowlimadj=0, args += ['filename', traj_path] args_str = ' '.join(args) - with CollisionOptionsStateSaver(env.GetCollisionChecker(), - CollisionOptions.ActiveDOFs): + # Bypass the robot_collision_checker context manager since CBiRRT does + # its own baking in C++. + robot_checker = self.robot_collision_checker(robot) + options = robot_checker.collision_options + with CollisionOptionsStateSaver(env.GetCollisionChecker(), options): response = problem.SendCommand(args_str, True) if not response.strip().startswith('1'): diff --git a/src/prpy/planning/chomp.py b/src/prpy/planning/chomp.py index 8307cda..d772a3e 100644 --- a/src/prpy/planning/chomp.py +++ b/src/prpy/planning/chomp.py @@ -34,14 +34,13 @@ import numpy import openravepy from ..util import SetTrajectoryTags, GetLinearCollisionCheckPts -from ..collision import SimpleRobotCollisionChecker +from ..collision import DefaultRobotCollisionChecker from .exceptions import ( CollisionPlanningError, SelfCollisionPlanningError ) from base import (BasePlanner, PlanningError, UnsupportedPlanningError, ClonedPlanningMethod, Tags) -from openravepy import CollisionOptions, CollisionOptionsStateSaver from prpy.util import VanDerCorputSampleGenerator, SampleTimeGenerator import prpy.tsr @@ -167,7 +166,8 @@ def get_affected_links(body, dof_indices): class CHOMPPlanner(BasePlanner): - def __init__(self, require_cache=False, robot_collision_checker=SimpleRobotCollisionChecker): + def __init__(self, require_cache=False, + robot_collision_checker=DefaultRobotCollisionChecker): super(CHOMPPlanner, self).__init__() self.require_cache = require_cache self.robot_collision_checker = robot_collision_checker @@ -276,17 +276,10 @@ def _Plan(self, robot, sampling_func=VanDerCorputSampleGenerator, **kwargs): checks = GetLinearCollisionCheckPts(robot, traj, norm_order=2, sampling_func=sampling_func) - with CollisionOptionsStateSaver(self.env.GetCollisionChecker(), - CollisionOptions.ActiveDOFs): - - # Instantiate a robot checker - robot_checker = self.robot_collision_checker(robot) - + with self.robot_collision_checker(robot) as robot_checker: for t, q in checks: robot.SetActiveDOFValues(q) - - # Check collision (throws an exception on collision) - robot_checker.VerifyCollisionFree() + robot_checker.VerifyCollisionFree() # Throws on collision. # Tag the trajectory as non-determistic since CBiRRT is a randomized # planner. Additionally tag the goal as non-deterministic if CBiRRT diff --git a/src/prpy/planning/ompl.py b/src/prpy/planning/ompl.py index 1a2459c..70ddbd0 100644 --- a/src/prpy/planning/ompl.py +++ b/src/prpy/planning/ompl.py @@ -36,13 +36,16 @@ from base import (BasePlanner, PlanningError, UnsupportedPlanningError, ClonedPlanningMethod, LockedPlanningMethod, Tags) from openravepy import ( - CollisionOptions, CollisionOptionsStateSaver, PlannerStatus, ErrorCode, openrave_exception ) -from ..collision import SimpleRobotCollisionChecker, BakedRobotCollisionChecker +from ..collision import ( + BakedRobotCollisionChecker, + DefaultRobotCollisionChecker, + SimpleRobotCollisionChecker, +) from .cbirrt import SerializeTSRChain logger = logging.getLogger(__name__) @@ -50,15 +53,16 @@ class OMPLPlanner(BasePlanner): def __init__(self, algorithm='RRTConnect', - robot_collision_checker=SimpleRobotCollisionChecker): + robot_collision_checker=DefaultRobotCollisionChecker): super(OMPLPlanner, self).__init__() self.setup = False self.algorithm = algorithm + self.robot_collision_checker = robot_collision_checker - if robot_collision_checker == SimpleRobotCollisionChecker: + if isinstance(robot_collision_checker, SimpleRobotCollisionChecker): self._is_baked = False - elif robot_collision_checker == BakedRobotCollisionChecker: + elif isinstance(robot_collision_checker, BakedRobotCollisionChecker): self._is_baked = True else: raise NotImplementedError( @@ -157,8 +161,11 @@ def _Plan(self, robot, goal=None, tsrchains=None, timelimit=5., with env: planner.InitPlan(robot, params) - with CollisionOptionsStateSaver(env.GetCollisionChecker(), - CollisionOptions.ActiveDOFs): + # Bypass the robot_collision_checker context manager since or_ompl + # does its own baking in C++. + robot_checker = self.robot_collision_checker(robot) + options = robot_checker.collision_options + with CollisionOptionsStateSaver(env.GetCollisionChecker(), options): status = planner.PlanPath(traj, releasegil=True) if status not in [PlannerStatus.HasSolution, diff --git a/src/prpy/planning/snap.py b/src/prpy/planning/snap.py index f6da65f..912202c 100644 --- a/src/prpy/planning/snap.py +++ b/src/prpy/planning/snap.py @@ -37,8 +37,7 @@ LockedPlanningMethod, Tags ) -from ..collision import SimpleRobotCollisionChecker -from openravepy import CollisionOptions, CollisionOptionsStateSaver +from ..collision import DefaultRobotCollisionChecker class SnapPlanner(BasePlanner): @@ -54,7 +53,7 @@ class SnapPlanner(BasePlanner): most commonly used as the first item in a Sequence meta-planner to avoid calling a motion planner when the trivial solution is valid. """ - def __init__(self, robot_collision_checker=SimpleRobotCollisionChecker): + def __init__(self, robot_collision_checker=DefaultRobotCollisionChecker): super(SnapPlanner, self).__init__() self.robot_collision_checker = robot_collision_checker @@ -180,12 +179,7 @@ def _Snap(self, robot, goal, **kw_args): norm_order=2, sampling_func=vdc) - with CollisionOptionsStateSaver(env.GetCollisionChecker(), - CollisionOptions.ActiveDOFs): - - # Instantiate a robot checker - robot_checker = self.robot_collision_checker(robot) - + with self.robot_collision_checker(robot) as robot_checker: # Run constraint checks at DOF resolution: for t, q in checks: # Set the joint positions diff --git a/src/prpy/planning/tsr.py b/src/prpy/planning/tsr.py index a4d301b..64ae47d 100644 --- a/src/prpy/planning/tsr.py +++ b/src/prpy/planning/tsr.py @@ -36,10 +36,8 @@ PlanningError, UnsupportedPlanningError) from .base import Tags from ..util import SetTrajectoryTags -from ..collision import SimpleRobotCollisionChecker +from ..collision import DefaultRobotCollisionChecker from openravepy import ( - CollisionOptions, - CollisionOptionsStateSaver, IkFilterOptions, IkParameterization, IkParameterizationType @@ -60,7 +58,7 @@ def grouper(n, iterable): class TSRPlanner(BasePlanner): - def __init__(self, delegate_planner=None, robot_collision_checker=SimpleRobotCollisionChecker): + def __init__(self, delegate_planner=None, robot_collision_checker=DefaultRobotCollisionChecker): super(TSRPlanner, self).__init__() self.delegate_planner = delegate_planner self.robot_collision_checker = robot_collision_checker @@ -134,11 +132,6 @@ def compute_ik_solutions(tsrchain): return ik_solutions - # Instantiate a robot checker - with CollisionOptionsStateSaver( - env.GetCollisionChecker(), CollisionOptions.ActiveDOFs): - robot_checker = self.robot_collision_checker(robot) - def is_configuration_valid(ik_solution): p = openravepy.KinBody.SaveParameters with robot.CreateRobotStateSaver(p.LinkTransformation): @@ -170,8 +163,7 @@ def is_time_available(*args): # Set ActiveDOFs for IK collision checking. We intentionally # restore the original collision checking options before calling # the planner to give it a pristine environment. - with CollisionOptionsStateSaver( - env.GetCollisionChecker(), CollisionOptions.ActiveDOFs): + with self.robot_collision_checker(robot) as robot_checker: while is_time_available() and len(configurations_chunk) < chunk_size: # Generate num_candidates candidates and rank them using the # user-supplied IK ranker. diff --git a/src/prpy/planning/vectorfield.py b/src/prpy/planning/vectorfield.py index 50aafcb..dc26e45 100644 --- a/src/prpy/planning/vectorfield.py +++ b/src/prpy/planning/vectorfield.py @@ -35,9 +35,8 @@ import openravepy from .. import util from base import BasePlanner, PlanningError, LockedPlanningMethod, ClonedPlanningMethod, Tags -from ..collision import SimpleRobotCollisionChecker +from ..collision import DefaultRobotCollisionChecker from enum import Enum -from openravepy import CollisionOptions, CollisionOptionsStateSaver logger = logging.getLogger(__name__) @@ -75,7 +74,7 @@ def DoesCache(cls, status): class VectorFieldPlanner(BasePlanner): - def __init__(self, robot_collision_checker=SimpleRobotCollisionChecker): + def __init__(self, robot_collision_checker=DefaultRobotCollisionChecker): super(VectorFieldPlanner, self).__init__() self.robot_collision_checker = robot_collision_checker @@ -548,12 +547,7 @@ def fn_callback(t, q): nonlocals['exception'] = e return -1 # Stop. - with CollisionOptionsStateSaver(env.GetCollisionChecker(), - CollisionOptions.ActiveDOFs): - - # Instantiate a robot checker - robot_checker = self.robot_collision_checker(robot) - + with self.robot_collision_checker(robot) as robot_checker: # Integrate the vector field to get a configuration space path. # # TODO: Tune the integrator parameters.