diff --git a/src/morse/middleware/socket_datastream.py b/src/morse/middleware/socket_datastream.py index b51c5e031..c21e0cb7a 100644 --- a/src/morse/middleware/socket_datastream.py +++ b/src/morse/middleware/socket_datastream.py @@ -7,7 +7,7 @@ from morse.helpers.transformation import Transformation3d from morse.middleware import AbstractDatastream from morse.core import services -from morse.core.exceptions import MorseRPCInvokationError +from morse.core.exceptions import MorseRPCInvokationError, MorseMiddlewareError try: import mathutils @@ -166,6 +166,16 @@ def __init__(self, args, kwargs): # Call the constructor of the parent class DatastreamManager.__init__(self, args, kwargs) + self.time_sync = kwargs.get('time_sync', False) + self.sync_port = kwargs.get('sync_port', -1) + + if self.time_sync: + if self.sync_port == -1: + logger.error("time_sync is required, but sync_port is not configured") + raise MorseMiddlewareError("sync_port is not configured") + else: + self._init_trigger() + # port -> MorseSocketServ self._server_dict = {} @@ -184,6 +194,44 @@ def __init__(self, args, kwargs): services.do_service_registration(self.get_stream_port, 'simulation') services.do_service_registration(self.get_all_stream_ports, 'simulation') + def __del__(self): + if self.time_sync: + self._end_trigger() + + def _init_trigger(self): + self._sync_client = None + self._sync_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sync_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self._sync_server.bind(('', self.sync_port)) + self._sync_server.listen(1) + logger.info("Creating clock synchronisation on port %d" % self.sync_port) + + def _wait_trigger(self): + # If there is some client, just wait on it + if self._sync_client: + logger.debug("Waiting trigger") + msg = self._sync_client.recv(2048) + if not msg: #deconnection of client + self._sync_client = None + else: + # Otherwise, we just check if there is some client waiting + # If there is no client, we do not block for the moment to avoid + # weird interaction at the startup + logger.debug("Checking for some client on synchronisation port") + try: + inputready, _, _ = select.select([self._sync_server], [], [], 0) + except select.error: + pass + except socket.error: + pass + + if self._sync_server in inputready: + self._sync_client, _ = self._sync_server.accept() + + def _end_trigger(self): + self._sync_client.close() + self._sync_server.shutdown(socket.SHUT_RDWR) + def list_streams(self): """ List all publish streams. """ @@ -236,3 +284,7 @@ def register_component(self, component_name, component_instance, mw_data): self._component_nameservice[component_name] = mw_data[2]['port'] if must_inc_base_port: self._base_port += 1 + + def action(self): + if self.time_sync: + self._wait_trigger()