Skip to content

Commit

Permalink
Reuse xmlrpc connections everywhere (#1471)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgrrx authored Feb 10, 2020
1 parent 2e99409 commit 902fb00
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 23 deletions.
25 changes: 23 additions & 2 deletions clients/rospy/src/rospy/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,10 @@ def validator(param_value, caller_id):
return v
return validator

def xmlrpcapi(uri):
_xmlrpc_cache = {}
_xmlrpc_lock = threading.Lock()

def xmlrpcapi(uri, cache=True):
"""
@return: instance for calling remote server or None if not a valid URI
@rtype: xmlrpclib.ServerProxy
Expand All @@ -642,4 +645,22 @@ def xmlrpcapi(uri):
uriValidate = urlparse.urlparse(uri)
if not uriValidate[0] or not uriValidate[1]:
return None
return xmlrpcclient.ServerProxy(uri)
if not cache:
return xmlrpcclient.ServerProxy(uri)
if uri not in _xmlrpc_cache:
with _xmlrpc_lock:
if uri not in _xmlrpc_cache: # allows lazy locking
_xmlrpc_cache[uri] = _LockedServerProxy(uri)
return _xmlrpc_cache[uri]


class _LockedServerProxy(xmlrpcclient.ServerProxy):

def __init__(self, *args, **kwargs):
xmlrpcclient.ServerProxy.__init__(self, *args, **kwargs)
self._lock = threading.Lock()

def _ServerProxy__request(self, methodname, params):
with self._lock:
return xmlrpcclient.ServerProxy._ServerProxy__request(
self, methodname, params)
2 changes: 1 addition & 1 deletion clients/rospy/src/rospy/impl/masterslave.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ def _connect_topic(self, topic, pub_uri):
while not success and not is_shutdown():
try:
code, msg, result = \
xmlrpcapi(pub_uri).requestTopic(caller_id, topic, protocols)
xmlrpcapi(pub_uri, cache=False).requestTopic(caller_id, topic, protocols)
success = True
except Exception as e:
if getattr(e, 'errno', None) == errno.ECONNREFUSED:
Expand Down
32 changes: 12 additions & 20 deletions clients/rospy/src/rospy/msproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ def __init__(self, uri):
@type uri: str
"""
self.target = rospy.core.xmlrpcapi(uri)
self._lock = Lock()

def __getattr__(self, key): #forward api calls to target
if key in _master_arg_remap:
Expand All @@ -101,9 +100,8 @@ def wrappedF(*args, **kwds):
i = i + 1 #callerId does not count
#print "Remap %s => %s"%(args[i], rospy.names.resolve_name(args[i]))
args[i] = rospy.names.resolve_name(args[i])
with self._lock:
f = getattr(self.target, key)
return f(*args, **kwds)
f = getattr(self.target, key)
return f(*args, **kwds)
return wrappedF

def __getitem__(self, key):
Expand All @@ -116,12 +114,11 @@ def __getitem__(self, key):
"""
#NOTE: remapping occurs here!
resolved_key = rospy.names.resolve_name(key)
with self._lock:
try:
return rospy.impl.paramserver.get_param_server_cache().get(resolved_key)
except KeyError:
pass
code, msg, value = self.target.getParam(rospy.names.get_caller_id(), resolved_key)
try:
return rospy.impl.paramserver.get_param_server_cache().get(resolved_key)
except KeyError:
pass
code, msg, value = self.target.getParam(rospy.names.get_caller_id(), resolved_key)
if code != 1: #unwrap value with Python semantics
raise KeyError(key)
return value
Expand All @@ -134,8 +131,7 @@ def __setitem__(self, key, val):
@param val: parameter value
@type val: XMLRPC legal value
"""
with self._lock:
self.target.setParam(rospy.names.get_caller_id(), rospy.names.resolve_name(key), val)
self.target.setParam(rospy.names.get_caller_id(), rospy.names.resolve_name(key), val)

def search_param(self, key):
"""
Expand All @@ -148,8 +144,7 @@ def search_param(self, key):
mappings = rospy.names.get_mappings()
if key in mappings:
key = mappings[key]
with self._lock:
code, msg, val = self.target.searchParam(rospy.names.get_caller_id(), key)
code, msg, val = self.target.searchParam(rospy.names.get_caller_id(), key)
if code == 1:
return val
elif code == -1:
Expand Down Expand Up @@ -181,8 +176,7 @@ def __delitem__(self, key):
@raise ROSException: if parameter server reports an error
"""
resolved_key = rospy.names.resolve_name(key)
with self._lock:
code, msg, _ = self.target.deleteParam(rospy.names.get_caller_id(), resolved_key)
code, msg, _ = self.target.deleteParam(rospy.names.get_caller_id(), resolved_key)
if code == -1:
raise KeyError(key)
elif code != 1:
Expand All @@ -195,8 +189,7 @@ def __contains__(self, key):
@type key: str
@raise ROSException: if parameter server reports an error
"""
with self._lock:
code, msg, value = self.target.hasParam(rospy.names.get_caller_id(), rospy.names.resolve_name(key))
code, msg, value = self.target.hasParam(rospy.names.get_caller_id(), rospy.names.resolve_name(key))
if code != 1:
raise rospy.exceptions.ROSException("cannot check parameter on server: %s"%msg)
return value
Expand All @@ -205,8 +198,7 @@ def __iter__(self):
"""
@raise ROSException: if parameter server reports an error
"""
with self._lock:
code, msg, value = self.target.getParamNames(rospy.names.get_caller_id())
code, msg, value = self.target.getParamNames(rospy.names.get_caller_id())
if code == 1:
return value.__iter__()
else:
Expand Down

0 comments on commit 902fb00

Please sign in to comment.