diff --git a/crossbar/router/broker.py b/crossbar/router/broker.py index 979c172f7..24a0a89bf 100644 --- a/crossbar/router/broker.py +++ b/crossbar/router/broker.py @@ -124,7 +124,7 @@ def detach(self, session): for subscription in self._session_to_subscriptions[session]: - was_subscribed, was_last_subscriber = self._subscription_map.drop_observer(session, subscription) + was_subscribed, was_last_subscriber, was_last_local_subscriber = self._subscription_map.drop_observer(session, subscription) was_deleted = False # delete it if there are no subscribers and no retained events @@ -133,6 +133,10 @@ def detach(self, session): was_deleted = True self._subscription_map.delete_observation(subscription) + is_rlink_session = (session._authrole == 'rlink') + + exclude_authid = session._authid + # publish WAMP meta events, if we have a service session, but # not for the meta API itself! # @@ -143,10 +147,16 @@ def detach(self, session): def _publish(subscription): service_session = self._router._realm.session - # FIXME: what about exclude_authid as colleced from forward_for? like we do elsewhere in this file! - options = types.PublishOptions(correlation_id=None, - correlation_is_anchor=True, - correlation_is_last=False) + # FIXME: what about exclude_authid as collected from forward_for? like we do elsewhere in this file! + options = types.PublishOptions( + correlation_id=None, + correlation_is_anchor=True, + correlation_is_last=False, + exclude_authid=exclude_authid, + exclude_authrole=['rlink'] if is_rlink_session else None, + eligible_authrole=['rlink'] if was_last_local_subscriber and + not was_last_subscriber else None, + ) if was_subscribed: service_session.publish( @@ -156,7 +166,7 @@ def _publish(subscription): options=options, ) - if was_deleted: + if was_deleted or was_last_local_subscriber: options.correlation_is_last = True service_session.publish( 'wamp.subscription.on_delete', @@ -830,12 +840,15 @@ def on_authorize_success(authorization): # ok, session authorized to subscribe. now get the subscription # - subscription, was_already_subscribed, is_first_subscriber = self._subscription_map.add_observer( - session, subscribe.topic, subscribe.match, extra=SubscriptionExtra()) + subscription, was_already_subscribed, is_first_subscriber, is_first_local_subscriber \ + = self._subscription_map.add_observer(session, subscribe.topic, + subscribe.match, extra=SubscriptionExtra()) if not was_already_subscribed: self._session_to_subscriptions[session].add(subscription) + is_rlink_session = (session._authrole == 'rlink') + # publish WAMP meta events, if we have a service session, but # not for the meta API itself! # @@ -853,17 +866,20 @@ def on_authorize_success(authorization): def _publish(): service_session = self._router._realm.session - if exclude_authid or self._router.is_traced: + if exclude_authid or self._router.is_traced or \ + is_first_local_subscriber or is_rlink_session: options = types.PublishOptions( correlation_id=subscribe.correlation_id, correlation_is_anchor=False, correlation_is_last=False, exclude_authid=exclude_authid, + exclude_authrole=['rlink'] if is_rlink_session else None, + eligible_authrole=['rlink'] if is_first_local_subscriber and not is_first_subscriber else None, ) else: options = None - if is_first_subscriber: + if is_first_subscriber or is_first_local_subscriber: subscription_details = { 'id': subscription.id, 'created': subscription.created, @@ -1037,7 +1053,7 @@ def _unsubscribe(self, subscription, session, unsubscribe=None): # drop session from subscription observers # - was_subscribed, was_last_subscriber = self._subscription_map.drop_observer(session, subscription) + was_subscribed, was_last_subscriber, was_last_local_subscriber = self._subscription_map.drop_observer(session, subscription) was_deleted = False if was_subscribed and was_last_subscriber and not subscription.extra.retained_events: @@ -1049,6 +1065,8 @@ def _unsubscribe(self, subscription, session, unsubscribe=None): if was_subscribed: self._session_to_subscriptions[session].discard(subscription) + is_rlink_session = (session._authrole == 'rlink') + # publish WAMP meta events, if we have a service session, but # not for the meta API itself! # @@ -1072,6 +1090,8 @@ def _publish(): correlation_is_anchor=False, correlation_is_last=False, exclude_authid=exclude_authid, + exclude_authrole=['rlink'] if is_rlink_session else None, + eligible_authrole=['rlink'] if was_last_local_subscriber and not was_last_subscriber else None, ) else: options = None @@ -1084,7 +1104,7 @@ def _publish(): options=options, ) - if was_deleted: + if was_deleted or was_last_local_subscriber: if options: options.correlation_is_last = True diff --git a/crossbar/router/dealer.py b/crossbar/router/dealer.py index 083fab783..29cf6066a 100644 --- a/crossbar/router/dealer.py +++ b/crossbar/router/dealer.py @@ -236,7 +236,7 @@ def detach(self, session): invoke.caller._transport.send(reply) for registration in self._session_to_registrations[session]: - was_registered, was_last_callee = self._registration_map.drop_observer(session, registration) + was_registered, was_last_callee, was_last_local_callee = self._registration_map.drop_observer(session, registration) if was_registered and was_last_callee: self._registration_map.delete_observation(registration) @@ -250,7 +250,12 @@ def _publish(registration): service_session = self._router._realm.session # FIXME: what about exclude_authid as collected from forward_for? like we do elsewhere in this file! - options = types.PublishOptions(correlation_id=None) + options = types.PublishOptions( + correlation_id=None, + exclude_authrole=['rlink'] if is_rlink_session else None, + eligible_authrole=['rlink'] if was_last_local_callee and + not was_last_callee else None, + ) if was_registered: service_session.publish( @@ -260,14 +265,13 @@ def _publish(registration): options=options, ) - if was_last_callee: - if not is_rlink_session: - service_session.publish( - 'wamp.registration.on_delete', - session._session_id, - registration.id, - options=options, - ) + if was_last_callee or was_last_local_callee: + service_session.publish( + 'wamp.registration.on_delete', + session._session_id, + registration.id, + options=options, + ) # we postpone actual sending of meta events until we return to this client session self._reactor.callLater(0, _publish, registration) @@ -281,7 +285,7 @@ def processRegister(self, session, register): """ Implements :func:`crossbar.router.interfaces.IDealer.processRegister` """ - # check topic URI: for SUBSCRIBE, must be valid URI (either strict or loose), and all + # check topic URI: for REGISTER, must be valid URI (either strict or loose), and all # URI components must be non-empty other than for wildcard subscriptions # is_rlink_session = (session._authrole == "rlink") @@ -437,6 +441,7 @@ def on_authorize_success(authorization): if authorization['allow']: registration = self._registration_map.get_observation(register.procedure, register.match) if register.force_reregister and registration: + # TODO handle Unregistered in RLink for obs in registration.observers: self._registration_map.drop_observer(obs, registration) kicked = message.Unregistered( @@ -455,7 +460,7 @@ def on_authorize_success(authorization): # registration_extra = RegistrationExtra(register.invoke) registration_callee_extra = RegistrationCalleeExtra(register.concurrency) - registration, was_already_registered, is_first_callee = self._registration_map.add_observer( + registration, was_already_registered, is_first_callee, is_first_local_callee = self._registration_map.add_observer( session, register.procedure, register.match, registration_extra, registration_callee_extra) if not was_already_registered: @@ -489,29 +494,34 @@ def on_authorize_success(authorization): def _publish(): service_session = self._router._realm.session - if exclude_authid or self._router.is_traced: + if exclude_authid or self._router.is_traced or \ + is_rlink_session or is_first_local_callee: options = types.PublishOptions( correlation_id=register.correlation_id, correlation_is_anchor=False, correlation_is_last=False, exclude_authid=exclude_authid, + exclude_authrole=['rlink'] if is_rlink_session else None, + eligible_authrole=['rlink'] if is_first_local_callee and + not is_first_callee else None, + ) else: options = None - if is_first_callee: + if is_first_callee or is_first_local_callee: registration_details = { 'id': registration.id, 'created': registration.created, 'uri': registration.uri, 'match': registration.match, 'invoke': registration.extra.invoke, + 'forced_reregister': register.force_reregister } - if not is_rlink_session: - service_session.publish('wamp.registration.on_create', - session._session_id, - registration_details, - options=options) + service_session.publish('wamp.registration.on_create', + session._session_id, + registration_details, + options=options) if not was_already_registered: if options: @@ -612,7 +622,7 @@ def _unregister(self, registration, session, unregister=None): # drop session from registration observers # - was_registered, was_last_callee = self._registration_map.drop_observer(session, registration) + was_registered, was_last_callee, was_last_local_callee = self._registration_map.drop_observer(session, registration) was_deleted = False is_rlink_session = (session._authrole == "rlink") @@ -652,6 +662,8 @@ def _publish(): correlation_is_anchor=False, correlation_is_last=False, exclude_authid=exclude_authid, + exclude_authrole=['rlink'] if is_rlink_session else None, + eligible_authrole=['rlink'] if was_last_local_callee and not was_last_callee else None, ) else: options = None @@ -662,15 +674,14 @@ def _publish(): registration.id, options=options) - if was_deleted: + if was_deleted or was_last_local_callee: if options: options.correlation_is_last = True - if not is_rlink_session: - service_session.publish('wamp.registration.on_delete', - session._session_id, - registration.id, - options=options) + service_session.publish('wamp.registration.on_delete', + session._session_id, + registration.id, + options=options) # we postpone actual sending of meta events until we return to this client session self._reactor.callLater(0, _publish) diff --git a/crossbar/router/observation.py b/crossbar/router/observation.py index 1d858deeb..0b7829ef6 100644 --- a/crossbar/router/observation.py +++ b/crossbar/router/observation.py @@ -178,6 +178,7 @@ def add_observer(self, observer, uri, match="exact", extra=None, observer_extra= raise Exception("'uri' should be unicode, not {}".format(type(uri).__name__)) is_first_observer = False + is_first_local_observer = False if match == "exact": @@ -218,6 +219,13 @@ def add_observer(self, observer, uri, match="exact", extra=None, observer_extra= else: raise Exception("invalid match strategy '{}'".format(match)) + is_rlink_observer = observer.authrole == 'rlink' + if is_first_observer: + is_first_local_observer = not is_rlink_observer + else: + is_first_local_observer = not is_rlink_observer and \ + next(filter(lambda o: o.authrole != 'rlink', observation.observers), None) is None + # add observer if not already in observation # if observer not in observation.observers: @@ -232,7 +240,7 @@ def add_observer(self, observer, uri, match="exact", extra=None, observer_extra= else: was_already_observed = True - return observation, was_already_observed, is_first_observer + return observation, was_already_observed, is_first_observer, is_first_local_observer def get_observation(self, uri, match="exact"): """ @@ -383,10 +391,13 @@ def drop_observer(self, observer, observation): :rtype: tuple """ was_last_observer = False + was_last_local_observer = False if observer in observation.observers: was_observed = True + is_rlink_observer = observer.authrole == 'rlink' + # remove observer from observation # observation.observers.discard(observer) @@ -400,12 +411,17 @@ def drop_observer(self, observer, observation): # if not observation.observers: was_last_observer = True - + was_last_local_observer = True + else: + was_last_observer = False + was_last_local_observer = not is_rlink_observer and \ + next(filter(lambda o: o.authrole != 'rlink', observation.observers), + None) is None else: # observer wasn't on this observation was_observed = False - return was_observed, was_last_observer + return was_observed, was_last_observer, was_last_local_observer def delete_observation(self, observation): """ diff --git a/crossbar/router/realmstore.py b/crossbar/router/realmstore.py index 8776d3e59..95d782d22 100644 --- a/crossbar/router/realmstore.py +++ b/crossbar/router/realmstore.py @@ -166,9 +166,10 @@ def attach_subscription_map(self, subscription_map: UriObservationMap): for sub in self._config.get('event-history', []): uri = sub['uri'] match = sub.get('match', 'exact') - observation, was_already_observed, was_first_observer = subscription_map.add_observer(self, - uri=uri, - match=match) + observation, was_already_observed, was_first_observer, was_first_local_observer = \ + subscription_map.add_observer(self, + uri=uri, + match=match) subscription_id = observation.id # for in-memory history, we just use a double-ended queue diff --git a/crossbar/router/service.py b/crossbar/router/service.py index 8b6d493a4..a12c53c95 100644 --- a/crossbar/router/service.py +++ b/crossbar/router/service.py @@ -24,13 +24,29 @@ from txaio import make_logger -__all__ = ('RouterServiceAgent', ) +__all__ = ('RouterServiceAgent',) def is_restricted_session(session: ISession): return session.authrole is None or session.authrole == 'trusted' +def is_registration_visible_to_caller(is_rlink_caller, registration) -> bool: + # Visible for non-RLinks + return not is_rlink_caller or not registration.observers or \ + ( + (registration.extra.invoke == 'single' and registration.observers[0].authrole != 'rlink') + or + (next(filter(lambda o: o.authrole != 'rlink', registration.observers), None) is not None) + ) + + +def is_subscription_visible_to_caller(is_rlink_caller, subscription) -> bool: + # Visible for non-RLinks + return not is_rlink_caller or not subscription.observers or \ + (next(filter(lambda o: o.authrole != 'rlink', subscription.observers), None) is not None) + + class RouterServiceAgent(ApplicationSession): """ User router-realm service session, and WAMP meta API implementation. @@ -690,19 +706,24 @@ def registration_list(self, session_id=None, details=None): registration_map = self._router._dealer._registration_map + is_rlink_caller = details and details.caller_authrole == 'rlink' + registrations_exact = [] for registration in registration_map._observations_exact.values(): - if not is_protected_uri(registration.uri, details): + if not is_protected_uri(registration.uri, details) and is_registration_visible_to_caller( + is_rlink_caller, registration): registrations_exact.append(registration.id) registrations_prefix = [] for registration in registration_map._observations_prefix.values(): - if not is_protected_uri(registration.uri, details): + if not is_protected_uri(registration.uri, details) and is_registration_visible_to_caller( + is_rlink_caller, registration): registrations_prefix.append(registration.id) registrations_wildcard = [] for registration in registration_map._observations_wildcard.values(): - if not is_protected_uri(registration.uri, details): + if not is_protected_uri(registration.uri, details) and is_registration_visible_to_caller( + is_rlink_caller, registration): registrations_wildcard.append(registration.id) regs = { @@ -751,14 +772,19 @@ def subscription_list(self, session_id=None, details=None): subscription_map = self._router._broker._subscription_map + is_rlink_caller = details and details.caller_authrole == 'rlink' + + # and is_registration_visible_to_caller(is_rlink_caller, subscription): subscriptions_exact = [] for subscription in subscription_map._observations_exact.values(): - if not is_protected_uri(subscription.uri, details): + if not is_protected_uri(subscription.uri, details) \ + and is_subscription_visible_to_caller(is_rlink_caller, subscription): subscriptions_exact.append(subscription.id) subscriptions_prefix = [] for subscription in subscription_map._observations_prefix.values(): - if not is_protected_uri(subscription.uri, details): + if not is_protected_uri(subscription.uri, details) \ + and is_subscription_visible_to_caller(is_rlink_caller, subscription): subscriptions_prefix.append(subscription.id) subscriptions_wildcard = [] @@ -790,7 +816,10 @@ def registration_match(self, procedure, details=None): """ registration = self._router._dealer._registration_map.best_matching_observation(procedure) - if registration and not is_protected_uri(registration.uri, details): + is_rlink_caller = details and details.caller_authrole == 'rlink' + + if registration and not is_protected_uri(registration.uri, details) and \ + is_registration_visible_to_caller(is_rlink_caller, registration): return registration.id else: return None @@ -809,11 +838,13 @@ def subscription_match(self, topic, details=None): :rtype: obj or None """ subscriptions = self._router._broker._subscription_map.match_observations(topic) + is_rlink_caller = details and details.caller_authrole == 'rlink' if subscriptions: subscription_ids = [] for subscription in subscriptions: - if not is_protected_uri(subscription.uri, details): + if not is_protected_uri(subscription.uri, details) and \ + is_subscription_visible_to_caller(is_rlink_caller, subscription): subscription_ids.append(subscription.id) if subscription_ids: return subscription_ids @@ -839,10 +870,12 @@ def registration_lookup(self, procedure, options=None, details=None): """ options = options or {} match = options.get('match', 'exact') + is_rlink_caller = details and details.caller_authrole == 'rlink' registration = self._router._dealer._registration_map.get_observation(procedure, match) - if registration and not is_protected_uri(registration.uri, details): + if registration and not is_protected_uri(registration.uri, details) and \ + is_registration_visible_to_caller(is_rlink_caller, registration): return registration.id else: return None @@ -866,8 +899,10 @@ def subscription_lookup(self, topic, options=None, details=None): match = options.get('match', 'exact') subscription = self._router._broker._subscription_map.get_observation(topic, match) + is_rlink_caller = details and details.caller_authrole == 'rlink' - if subscription and not is_protected_uri(subscription.uri, details): + if subscription and not is_protected_uri(subscription.uri, details) and \ + is_subscription_visible_to_caller(is_rlink_caller, subscription): return subscription.id else: return None @@ -886,7 +921,9 @@ def registration_list_callees(self, registration_id, details=None): registration = self._router._dealer._registration_map.get_observation_by_id(registration_id) if registration: - if is_protected_uri(registration.uri, details): + is_rlink_caller = details and details.caller_authrole == 'rlink' + if is_protected_uri(registration.uri, details) \ + or not is_registration_visible_to_caller(is_rlink_caller, registration): raise ApplicationError( ApplicationError.NOT_AUTHORIZED, message='not authorized to list callees for protected URI "{}"'.format(registration.uri), @@ -916,7 +953,10 @@ def subscription_list_subscribers(self, subscription_id, details=None): subscription = self._router._broker._subscription_map.get_observation_by_id(subscription_id) if subscription: - if is_protected_uri(subscription.uri, details): + is_rlink_caller = details and details.caller_authrole == 'rlink' + + if is_protected_uri(subscription.uri, details) \ + or not is_subscription_visible_to_caller(is_rlink_caller, subscription): raise ApplicationError( ApplicationError.NOT_AUTHORIZED, message='not authorized to list subscribers for protected URI "{}"'.format(subscription.uri), @@ -946,7 +986,10 @@ def registration_count_callees(self, registration_id, details=None): registration = self._router._dealer._registration_map.get_observation_by_id(registration_id) if registration: - if is_protected_uri(registration.uri, details): + is_rlink_caller = details and details.caller_authrole == 'rlink' + + if is_protected_uri(registration.uri, details) \ + or not is_registration_visible_to_caller(is_rlink_caller, registration): raise ApplicationError( ApplicationError.NOT_AUTHORIZED, message='not authorized to count callees for protected URI "{}"'.format(registration.uri), @@ -972,7 +1015,10 @@ def subscription_count_subscribers(self, subscription_id, details=None): subscription = self._router._broker._subscription_map.get_observation_by_id(subscription_id) if subscription: - if is_protected_uri(subscription.uri, details): + is_rlink_caller = details and details.caller_authrole == 'rlink' + + if is_protected_uri(subscription.uri, details) \ + or not is_subscription_visible_to_caller(is_rlink_caller, subscription): raise ApplicationError( ApplicationError.NOT_AUTHORIZED, message='not authorized to count subscribers for protected URI "{}"'.format(subscription.uri), @@ -1011,7 +1057,10 @@ def subscription_get_events(self, subscription_id, limit=10, details=None): subscription = self._router._broker._subscription_map.get_observation_by_id(subscription_id) if subscription: - if is_protected_uri(subscription.uri, details): + is_rlink_caller = details and details.caller_authrole == 'rlink' + + if is_protected_uri(subscription.uri, details)\ + or not is_subscription_visible_to_caller(is_rlink_caller, subscription): raise ApplicationError( ApplicationError.NOT_AUTHORIZED, message='not authorized to retrieve event history for protected URI "{}"'.format(subscription.uri), diff --git a/crossbar/router/test/test_observation.py b/crossbar/router/test/test_observation.py index a1713090c..0b2f7b1e5 100644 --- a/crossbar/router/test/test_observation.py +++ b/crossbar/router/test/test_observation.py @@ -68,7 +68,7 @@ def test_add_observer(self): uri1 = "com.example.uri1" obs1 = FakeObserver() - observation, was_already_observed, is_first_observer = obs_map.add_observer(obs1, uri1) + observation, was_already_observed, is_first_observer, is_first_local_observer = obs_map.add_observer(obs1, uri1) self.assertIsInstance(observation, ExactUriObservation) self.assertFalse(was_already_observed) @@ -84,10 +84,10 @@ def test_add_observer_was_already_observed(self): uri1 = "com.example.uri1" obs1 = FakeObserver() - observation1, was_already_observed, _ = obs_map.add_observer(obs1, uri1) + observation1, was_already_observed, _, _ = obs_map.add_observer(obs1, uri1) self.assertFalse(was_already_observed) - observation2, was_already_observed, _ = obs_map.add_observer(obs1, uri1) + observation2, was_already_observed, _, _ = obs_map.add_observer(obs1, uri1) self.assertTrue(was_already_observed) self.assertEqual(observation1, observation2) @@ -103,10 +103,10 @@ def test_add_observer_is_first_observer(self): obs1 = FakeObserver() obs2 = FakeObserver() - _, _, is_first_observer = obs_map.add_observer(obs1, uri1) + _, _, is_first_observer, _ = obs_map.add_observer(obs1, uri1) self.assertTrue(is_first_observer) - _, _, is_first_observer = obs_map.add_observer(obs2, uri1) + _, _, is_first_observer, _ = obs_map.add_observer(obs2, uri1) self.assertFalse(is_first_observer) def test_delete_observer(self): @@ -116,8 +116,8 @@ def test_delete_observer(self): obs1 = FakeObserver() obs2 = FakeObserver() - ob1, uri1, _ = obs_map.add_observer(obs1, uri) - ob2, uri2, _ = obs_map.add_observer(obs2, uri) + ob1, uri1, _, _ = obs_map.add_observer(obs1, uri) + ob2, uri2, _, _ = obs_map.add_observer(obs2, uri) self.assertTrue(ob1 is ob2) obs_map.drop_observer(obs1, ob1) @@ -140,7 +140,7 @@ def test_match_observations_match_exact(self): uri1 = "com.example.uri1" obs1 = FakeObserver() - observation1, _, _ = obs_map.add_observer(obs1, uri1) + observation1, _, _, _ = obs_map.add_observer(obs1, uri1) observations = obs_map.match_observations(uri1) @@ -158,9 +158,9 @@ def test_match_observations_match_exact_same(self): obs2 = FakeObserver() obs3 = FakeObserver() - observation1, _, _ = obs_map.add_observer(obs1, uri1) - observation2, _, _ = obs_map.add_observer(obs2, uri1) - observation3, _, _ = obs_map.add_observer(obs3, uri1) + observation1, _, _, _ = obs_map.add_observer(obs1, uri1) + observation2, _, _, _ = obs_map.add_observer(obs2, uri1) + observation3, _, _, _ = obs_map.add_observer(obs3, uri1) observations = obs_map.match_observations(uri1) @@ -177,9 +177,9 @@ def test_match_observations_match_exact_multi(self): uri1 = "com.example.uri1" obs1 = FakeObserver() - observation1, _, _ = obs_map.add_observer(obs1, uri1) - observation2, _, _ = obs_map.add_observer(obs1, uri1) - observation3, _, _ = obs_map.add_observer(obs1, uri1) + observation1, _, _, _ = obs_map.add_observer(obs1, uri1) + observation2, _, _, _ = obs_map.add_observer(obs1, uri1) + observation3, _, _, _ = obs_map.add_observer(obs1, uri1) self.assertEqual(observation1, observation2) self.assertEqual(observation1, observation3) @@ -198,7 +198,7 @@ def test_match_observations_match_prefix(self): obs1 = FakeObserver() - observation1, _, _ = obs_map.add_observer(obs1, "com.example", match=Subscribe.MATCH_PREFIX) + observation1, _, _, _ = obs_map.add_observer(obs1, "com.example", match=Subscribe.MATCH_PREFIX) # test matches for uri in [ @@ -224,7 +224,7 @@ def test_match_observations_match_wildcard_single(self): obs1 = FakeObserver() - observation1, _, _ = obs_map.add_observer(obs1, "com.example..create", match=Subscribe.MATCH_WILDCARD) + observation1, _, _, _ = obs_map.add_observer(obs1, "com.example..create", match=Subscribe.MATCH_WILDCARD) # test matches for uri in ["com.example.foobar.create", "com.example.1.create"]: @@ -248,7 +248,7 @@ def test_match_observations_match_wildcard_multi(self): obs1 = FakeObserver() - observation1, _, _ = obs_map.add_observer(obs1, "com...create", match=Subscribe.MATCH_WILDCARD) + observation1, _, _, _ = obs_map.add_observer(obs1, "com...create", match=Subscribe.MATCH_WILDCARD) # test matches for uri in [ @@ -284,9 +284,9 @@ def test_match_observations_match_multimode(self): obs1 = FakeObserver() - observation1, _, _ = obs_map.add_observer(obs1, "com.example.product.create", match=Subscribe.MATCH_EXACT) - observation2, _, _ = obs_map.add_observer(obs1, "com.example.product", match=Subscribe.MATCH_PREFIX) - observation3, _, _ = obs_map.add_observer(obs1, "com.example..create", match=Subscribe.MATCH_WILDCARD) + observation1, _, _, _ = obs_map.add_observer(obs1, "com.example.product.create", match=Subscribe.MATCH_EXACT) + observation2, _, _, _ = obs_map.add_observer(obs1, "com.example.product", match=Subscribe.MATCH_PREFIX) + observation3, _, _, _ = obs_map.add_observer(obs1, "com.example..create", match=Subscribe.MATCH_WILDCARD) observations = obs_map.match_observations("com.example.product.create") self.assertEqual(observations, [observation1, observation2, observation3]) diff --git a/crossbar/worker/main.py b/crossbar/worker/main.py index 83427b37b..5a33a5ff6 100644 --- a/crossbar/worker/main.py +++ b/crossbar/worker/main.py @@ -333,7 +333,10 @@ def make_session(): from autobahn.twisted.websocket import WampWebSocketServerFactory transport_factory = WampWebSocketServerFactory(make_session, 'ws://localhost') transport_factory.protocol = WorkerServerProtocol - transport_factory.setProtocolOptions(failByDrop=False) + + # we need to increase the opening handshake timeout, + # because when running multiple router workers controller may not be able to connect get to this worker in 5 seconds + transport_factory.setProtocolOptions(failByDrop=False, openHandshakeTimeout=45) # create a protocol instance and wire up to stdio # diff --git a/crossbar/worker/rlink.py b/crossbar/worker/rlink.py index c263cd19d..1611f4cac 100644 --- a/crossbar/worker/rlink.py +++ b/crossbar/worker/rlink.py @@ -21,7 +21,7 @@ from crossbar.common.twisted.endpoint import create_connecting_endpoint_from_config from autobahn.wamp.types import SubscribeOptions, PublishOptions, RegisterOptions, CallOptions, ComponentConfig -from autobahn.wamp.message import Event, Invocation +from autobahn.wamp.message import Event, Invocation, Unregistered from autobahn.wamp.exception import ApplicationError, TransportLost from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner @@ -34,8 +34,22 @@ ) -class BridgeSession(ApplicationSession): +class BridgeLink: + # this class is a wrapper for sub/registration details + link to chained registration/subscription + # Storing a reference to dealer/broker owned object should save memory instead of having to duplicate + # for every BrideSession. Memory becomes an issue with full mesh configs O(n^2) + def __init__(self, details, chained=None): + self.details = details + self.chained = chained + +class URIExclusions: + def __init__(self, uri_excludes): + self.exact = uri_excludes.get('exact', []) if uri_excludes is not None else [] + self.prefix = uri_excludes.get('prefix', []) if uri_excludes is not None else [] + self.wildcard = uri_excludes.get('wildcard', []) if uri_excludes is not None else [] + +class BridgeSession(ApplicationSession): log = make_logger() def __init__(self, config): @@ -45,8 +59,54 @@ def __init__(self, config): # registration-id's of remote registrations from an rlink self._regs = {} - self._exclude_authid = None - self._exclude_authrole = None + self._exclude_authid = config.extra['exclude_authid'] + self._exclude_authrole = config.extra['exclude_authrole'] + self._exclude_uri = config.extra['exclude_uri'] + + self._active = False + self._rlink_id = config.extra['rlink_id'] + + self.other = None + + def _on_unregistered(self, msg): + if msg.request == 0: + # this is a forced un-register either from a call + # to the wamp.* meta-api or the force_reregister + # option + # forward to other + self.log.debug( + "Received UNREGISTERED : {me} {reg_id}", + me=self, + reg_id=msg.registration + ) + + reg_id = msg.registration + + if not self._active: + return + + link = self._regs.get(reg_id, None) + if not link: + self.log.debug("Attempting to force-delete registration {reg_id} from {me} that is not tracked", + reg_id=reg_id, + me=self) + return + + reg_details = link.details + + remote_registration = link.chained + if remote_registration is None: + # see above; we might have un-registered here before + # we got an answer from the other router + self.log.debug( + "Attempting to delete registration {reg_id} from {me} that is has no remote registration id", + reg_id=reg_id, + me=self) + else: + pass + + del self._regs[reg_id] + def onMessage(self, msg): if msg._router_internal is not None: @@ -54,16 +114,45 @@ def onMessage(self, msg): msg.publisher, msg.publisher_authid, msg.publisher_authrole = msg._router_internal elif isinstance(msg, Invocation): msg.caller, msg.caller_authid, msg.caller_authrole = msg._router_internal + if isinstance(msg, Unregistered): + self._on_unregistered(msg) return super(BridgeSession, self).onMessage(msg) + def is_uri_excluded(self, uri): + + # local to router component subscriptions/registrations that shouldn't be forwarded can be defined in config + # i.e. #prefix: ['local.', 'wamp.'] + if uri.startswith("local."): + return True + + if self._exclude_uri is None: + return False + + if uri in self._exclude_uri.exact: + return True + + # check if uri has a prefix match in the list of excluded by prefix + for prefix in self._exclude_uri.prefix: + if uri.startswith(prefix): + return True + + # TODO: handle wildcards + + return False + + + def _common_setup(self, other): + self.other = other + @inlineCallbacks def _setup_event_forwarding(self, other): self.log.debug( - "setup event forwarding between {me} and {other} (exclude_authid={exclude_authid}, exclude_authrole={exclude_authrole})", + "setup event forwarding between {me} and {other} (exclude_authid={exclude_authid}, exclude_authrole={exclude_authrole}, exclude_uri={exclude_uri})", exclude_authid=self._exclude_authid, exclude_authrole=self._exclude_authrole, - me=self._session_id, + exclude_uri=self._exclude_uri, + me=self, other=other) @inlineCallbacks @@ -74,56 +163,71 @@ def on_subscription_create(sub_session, sub_details, details=None): The handler will then also subscribe on the other router, and when receiving events, re-publish those on this router. - :param sub_id: + :param sub_session: :param sub_details: :param details: :return: """ - if sub_details["uri"].startswith("wamp."): + + if not self._active: return - if sub_details["id"] in self._subs: - # this should not happen actually, but not sure .. - self.log.error('on_subscription_create: sub ID {sub_id} already in map {method}', - sub_id=sub_details["id"], - method=hltype(BridgeSession._setup_event_forwarding)) + uri = sub_details['uri'] + if uri.startswith("wamp."): return - self._subs[sub_details["id"]] = sub_details - self._subs[sub_details["id"]]["sub"] = None + sub_id = sub_details["id"] + + if self.is_uri_excluded(uri): + return + + if sub_id in self._subs: + bridge_link = self._subs[sub_id] + if bridge_link.chained: + # This will happen if, partway through the registration process, the RLink disconnects + self.log.error('on_subscription_create: sub ID {sub_id} already in map {method}', + sub_id=sub_id, + method=hltype(BridgeSession._setup_event_forwarding)) + return + else: + self.log.warn( + 'on_subscription_create: sub ID {sub_id} already in map but has no remote subscription', + sub_id=sub_id) + else: + bridge_link = BridgeLink(sub_details) + self._subs[sub_id] = bridge_link - uri = sub_details['uri'] ERR_MSG = [None] @inlineCallbacks def on_event(*args, **kwargs): assert 'details' in kwargs - details = kwargs.pop('details') + event_details = kwargs.pop('details') options = kwargs.pop('options', None) self.log.debug( 'Received event on uri={uri}, options={options} (publisher={publisher}, publisher_authid={publisher_authid}, publisher_authrole={publisher_authrole}, forward_for={forward_for})', uri=uri, options=options, - publisher=details.publisher, - publisher_authid=details.publisher_authid, - publisher_authrole=details.publisher_authrole, - forward_for=details.forward_for) + publisher=event_details.publisher, + publisher_authid=event_details.publisher_authid, + publisher_authrole=event_details.publisher_authrole, + forward_for=event_details.forward_for) - assert details.publisher is not None + assert event_details.publisher is not None this_forward = { - 'session': details.publisher, - 'authid': details.publisher_authid, - 'authrole': details.publisher_authrole, + 'session': event_details.publisher, + 'authid': event_details.publisher_authid, + 'authrole': event_details.publisher_authrole, } - if details.forward_for: + if event_details.forward_for: # the event comes already forwarded from a router node - if len(details.forward_for) >= 0: + if len(event_details.forward_for) >= 0: self.log.debug('SKIP! already forwarded') return - forward_for = copy.deepcopy(details.forward_for) + forward_for = copy.deepcopy(event_details.forward_for) forward_for.append(this_forward) else: forward_for = [this_forward] @@ -135,7 +239,8 @@ def on_event(*args, **kwargs): forward_for=forward_for) try: - yield self.publish(uri, *args, options=options, **kwargs) + # pass original procedure uri to support whildcard events #1959 + yield self.publish(event_details.topic or uri, *args, options=options, **kwargs) except TransportLost: return except ApplicationError as e: @@ -155,79 +260,116 @@ def on_event(*args, **kwargs): ) try: - sub = yield other.subscribe(on_event, uri, options=SubscribeOptions(details=True)) + + # support wildcard registrations and subscription #1959 by passing match type to other + sub = yield other.subscribe(on_event, + uri, + options=SubscribeOptions(details=True, + match=sub_details['match'])) except TransportLost: self.log.debug( - "on_subscription_create: could not forward-subscription '{}' as RLink is not connected".format( - uri)) + "on_subscription_create: could not forward-subscription {uri} as RLink is not connected", uri=uri) return - if sub_details["id"] not in self._subs: - self.log.info("subscription already gone: {uri}", uri=sub_details['uri']) - yield sub.unregister() + if sub_id not in self._subs: + self.log.info("subscription already gone: {uri}", uri=uri) + yield sub.unsubscribe() else: - self._subs[sub_details["id"]]["sub"] = sub + bridge_link.chained = sub self.log.debug( - "created forwarding subscription: me={me} other={other} sub_id={sub_id} sub_details={sub_details} details={details} sub_session={sub_session}", - me=self._session_id, + "created forwarding subscription or {uri}: me={me} other={other} sub_id={sub_id} sub_details={sub_details} details={details} sub_session={sub_session} sub={sub}", + me=self, + uri=uri, other=other, - sub_id=sub_details["id"], + sub_id=sub_id, sub_details=sub_details, details=details, sub_session=sub_session, - ) + sub=sub) # listen to when a subscription is removed from the router # @inlineCallbacks def on_subscription_delete(session_id, sub_id, details=None): self.log.debug( - "Subscription deleted: {me} {session} {sub_id} {details}", + "on_subscription_delete: {me} {session} {sub_id} {details}", me=self, session=session_id, sub_id=sub_id, details=details, ) - sub_details = self._subs.get(sub_id, None) - if not sub_details: - self.log.debug("subscription not tracked - huh??") + if not self._active: return + bridge_link = self._subs.get(sub_id, None) + + if not bridge_link: + self.log.debug("Attempting to delete subscription {sub_id} from {me} that is not tracked", + sub_id=sub_id, + me=self) + return + + sub_details = bridge_link.details + uri = sub_details['uri'] - sub = self._subs[sub_id]["sub"] + sub = bridge_link.chained if sub is None: # see above; we might have un-subscribed here before # we got an answer from the other router - self.log.info("subscription has no 'sub'") + self.log.debug("Attempting to delete subscription {sub_id} from {me} that has no chained subscription", + sub_id=sub_id, + me=self) else: yield sub.unsubscribe() del self._subs[sub_id] - self.log.debug("{other} unsubscribed from {uri}".format(other=other, uri=uri)) + self.log.debug("{me} unsubscribed from {uri} on {other}", me=self, other=other, uri=uri) @inlineCallbacks def forward_current_subs(): # get current subscriptions on the router + self.log.debug("Forwarding all current subcriptions from {me}", me=self) + subs = yield self.call("wamp.subscription.list") - for sub_id in subs['exact']: - sub = yield self.call("wamp.subscription.get", sub_id) - assert sub["id"] == sub_id, "Logic error, subscription IDs don't match" - yield on_subscription_create(self._session_id, sub) + + # support for all match types exact, prefix, wildcard #1959 + for sub_list in subs.values(): + for sub_id in sub_list: + sub = yield self.call("wamp.subscription.get", sub_id) + assert sub["id"] == sub_id, "Logic error, subscription IDs don't match" + + # note that details are not passed to on_subscription_create + yield on_subscription_create(self._session_id, sub) @inlineCallbacks def on_remote_join(_session, _details): + # Remoted joined, start/resume forwarding events + self._active = True yield forward_current_subs() + @inlineCallbacks + def on_remote_leave(_session, _details): + # The remote session has ended, clear subscription records. + # Clearing this dictionary helps avoid the case where + # local procedures are not subscribed on the remote leg + # on reestablishment of remote session. + # See: https://github.com/crossbario/crossbar/issues/1909 + self._subs = {} + + # also suspend forwarding events to remote leg until it re-joins + self._active = False + if self.IS_REMOTE_LEG: yield forward_current_subs() else: # from the local leg, don't try to forward events on the # remote leg unless the remote session is established. other.on('join', on_remote_join) + other.on('leave', on_remote_leave) # listen to when new subscriptions are created on the local router yield self.subscribe(on_subscription_create, @@ -264,20 +406,38 @@ def on_registration_create(reg_session, reg_details, details=None): :param details: :return: """ - if reg_details['uri'].startswith("wamp."): + + if not self._active: return - if reg_details['id'] in self._regs: - # this should not happen actually, but not sure .. - self.log.error('on_registration_create: reg ID {reg_id} already in map {method}', - reg_id=reg_details['id'], - method=hltype(BridgeSession._setup_invocation_forwarding)) + uri = reg_details['uri'] + + # this is a useless check and can probably be removed + # RLinkLocalSession is no longer `trusted` but `rlink`, so wamp.registration.list doesn't return wamp.* + if uri.startswith("wamp."): return - self._regs[reg_details['id']] = reg_details - self._regs[reg_details['id']]['reg'] = None + if self.is_uri_excluded(uri): + return + + reg_id = reg_details["id"] + + if reg_id in self._regs: + bridge_link = self._regs[reg_id] + if bridge_link.chained: + # [From Skully17] This will happen if, partway through the registration process, the RLink disconnects + self.log.error('on_registration_create: reg ID {reg_id} already in map {method}', + reg_id=reg_id, + method=hltype(BridgeSession._setup_invocation_forwarding)) + return + else: + self.log.warn( + 'on_registration_create: reg ID {reg_id} already in map but has no remote registration', + reg_id=reg_id) + else: + bridge_link = BridgeLink(reg_details) + self._regs[reg_id] = bridge_link - uri = reg_details['uri'] ERR_MSG = [None] @inlineCallbacks @@ -320,7 +480,7 @@ def on_call(*args, **kwargs): options = CallOptions(forward_for=forward_for) try: - result = yield self.call(uri, *args, options=options, **kwargs) + result = yield self.call(details.procedure or uri, *args, options=options, **kwargs) except TransportLost: return except ApplicationError as e: @@ -340,24 +500,32 @@ def on_call(*args, **kwargs): ) return result + if not self._active: + return + try: reg = yield other.register(on_call, uri, options=RegisterOptions( details_arg='details', + match=reg_details.get('match', None), invoke=reg_details.get('invoke', None), + force_reregister=reg_details.get('forced_reregister', None), )) except TransportLost: - self.log.debug( - "on_registration_create: could not forward-register '{}' as RLink is not connected".format(uri)) + self.log.debug("on_registration_create: could not forward-register '{uri}' as RLink is not connected", + uri=uri) return except Exception as e: # FIXME: partially fixes https://github.com/crossbario/crossbar/issues/1894, # however we need to make sure this situation never happens. if isinstance(e, ApplicationError) and e.error == 'wamp.error.procedure_already_exists': other_leg = 'local' if self.IS_REMOTE_LEG else 'remote' - self.log.debug(f"on_registration_create: tried to register procedure {uri} on {other_leg} " - f"session but it's already registered.") + self.log.debug( + "on_registration_create: tried to register procedure {uri} on {other_leg} session but it's already registered. session={me}", + uri=uri, + other_leg=other_leg, + me=self) return raise Exception("fatal: could not forward-register '{}'".format(uri)) @@ -365,64 +533,81 @@ def on_call(*args, **kwargs): # on the "other" router, *this* router may have already # un-registered. If that happened, our registration will # be gone, so we immediately un-register on the other side - if reg_details['id'] not in self._regs: + if reg_id not in self._regs: self.log.info("registration already gone: {uri}", uri=reg_details['uri']) yield reg.unregister() else: - self._regs[reg_details['id']]['reg'] = reg - - self.log.info( - "created forwarding registration: me={me} other={other} reg_id={reg_id} reg_details={reg_details} details={details} reg_session={reg_session}", - me=self._session_id, - other=other._session_id, - reg_id=reg_details['id'], - reg_details=reg_details, - details=details, - reg_session=reg_session, - ) + bridge_link.chained = reg + # only log successful creation when we actually registered in local map + self.log.debug( + "created forwarding registration: me={me} other={other} reg_id={reg_id} reg_details={reg_details} details={details} reg_session={reg_session} reg={reg}", + me=self, + other=other._session_id, + reg_id=reg_id, + reg_details=reg_details, + details=details, + reg_session=reg_session, + reg=reg) # called when a registration is removed from the local router @inlineCallbacks def on_registration_delete(session_id, reg_id, details=None): self.log.debug( - "Registration deleted: {me} {session} {reg_id} {details}", + "on_registration_delete: {me} {session} {reg_id} {details}", me=self, session=session_id, reg_id=reg_id, details=details, ) - reg_details = self._regs.get(reg_id, None) - if not reg_details: - self.log.debug("registration not tracked - huh??") + if not self._active: + return + + link = self._regs.get(reg_id, None) + if not link: + self.log.debug("Attempting to delete registration {reg_id} from {me} that is not tracked", + reg_id=reg_id, + me=self) return + reg_details = link.details uri = reg_details['uri'] - reg = self._regs[reg_id]['reg'] - if reg is None: + remote_registration = link.chained + if remote_registration is None: # see above; we might have un-registered here before # we got an answer from the other router - self.log.debug("registration has no 'reg'") + self.log.debug( + "Attempting to delete registration {reg_id} from {me} that is has no remote registration id", + reg_id=reg_id, + me=self) else: - yield reg.unregister() + yield remote_registration.unregister() del self._regs[reg_id] - self.log.debug("{other} unsubscribed from {uri}".format(other=other, uri=uri)) + self.log.debug("deleted forwarding registration of {uri} on {me}", uri=uri, me=self) @inlineCallbacks def register_current(): + self.log.debug("Registering all current registrations from {me}", me=self) # get current registrations on the router + # Since live registrations are filtered by dealer, the expectation here is that other rlink registrations + # would be filtered out from the list, otherwise it ends up creating a snowball of registrations + regs = yield self.call("wamp.registration.list") - for reg_id in regs['exact']: - reg = yield self.call("wamp.registration.get", reg_id) - assert reg['id'] == reg_id, "Logic error, registration IDs don't match" - yield on_registration_create(self._session_id, reg) + + # support for all match types exact, prefix, wildcard #1959 + for reg_list in regs.values(): + for reg_id in reg_list: + reg = yield self.call("wamp.registration.get", reg_id) + assert reg['id'] == reg_id, "Logic error, registration IDs don't match" + yield on_registration_create(self._session_id, reg) @inlineCallbacks def on_remote_join(_session, _details): yield register_current() + self._active = True def on_remote_leave(_session, _details): # The remote session has ended, clear registration records. @@ -430,6 +615,7 @@ def on_remote_leave(_session, _details): # local procedures are not registered on the remote leg # on reestablishment of remote session. # See: https://github.com/crossbario/crossbar/issues/1909 + self._active = False self._regs = {} if self.IS_REMOTE_LEG: @@ -455,7 +641,16 @@ def on_remote_leave(_session, _details): "wamp.registration.on_delete", options=SubscribeOptions(details_arg="details")) - self.log.info("{me}: call forwarding setup done", me=self._session_id) + self.log.info("{me}: call forwarding setup done", me=self) + + def __str__(self): + return "{klass}({rlink} {attr})".format(klass=type(self).__name__, + rlink=self._rlink_id, + attr=pprint.pformat(self.marshal(), width=100)) + + def marshal(self): + obj = {'authid': self.authid, 'authrole': self.authrole, 'sessionid': self.session_id, 'active': self._active} + return obj class RLinkLocalSession(BridgeSession): @@ -485,8 +680,14 @@ def onJoin(self, details): remote = self.config.extra['other'] assert isinstance(remote, RLinkRemoteSession) + # Make sure this session is registered in router lookups for authroles + self.transport._router._session_joined(self, session_details=details) + self._exclude_authid = self.config.extra.get('exclude_authid', None) self._exclude_authrole = self.config.extra.get('exclude_authrole', None) + self._exclude_uri = self.config.extra.get('exclude_uri', None) + + self._common_setup(remote) # setup local->remote event forwarding forward_events = self.config.extra.get('forward_events', False) @@ -513,6 +714,7 @@ def onJoin(self, details): self.config.extra['on_ready'].callback(self) def onLeave(self, details): + self.other = None self.log.warn( 'Router link local session down! (realm={realm}, authid={authid}, authrole={authrole}, session={session}, details={details}) {method}', method=hltype(RLinkLocalSession.onLeave), @@ -543,7 +745,6 @@ def __init__(self, config): # import here to resolve import dependency issues from crossbar.worker.router import RouterController - self._subs = {} self._rlink_manager: RLinkManager = self.config.extra['rlink_manager'] self._router_controller: RouterController = self._rlink_manager.controller @@ -641,10 +842,13 @@ def onJoin(self, details): local = self.config.extra['other'] assert isinstance(local, RLinkLocalSession) local._tracker.connected = True + self._active = True self._exclude_authid = self.config.extra.get('exclude_authid', None) self._exclude_authrole = self.config.extra.get('exclude_authrole', None) + self._exclude_uri = self.config.extra.get('exclude_uri', None) + self._common_setup(local) # setup remote->local event forwarding forward_events = self.config.extra.get('forward_events', False) if forward_events: @@ -678,11 +882,13 @@ def onLeave(self, details): # This avoids duplicate events that would otherwise arrive # See: https://github.com/crossbario/crossbar/issues/1916 for k, v in self._subs.items(): - if v['sub'].active: - yield v['sub'].unsubscribe() + if v.chained.active: + yield v.chained.unsubscribe() self._subs = {} + self._active = False + self.config.extra['other']._tracker.connected = False self.log.warn( '{klass}.onLeave(): rlink remote session left! (realm={realm}, authid={authid}, authrole={authrole}, session={session}, details={details}) {method}', @@ -694,6 +900,7 @@ def onLeave(self, details): session=hlid(self._session_id), details=details) + self.other = None BridgeSession.onLeave(self, details) @@ -742,8 +949,9 @@ def marshal(self): class RLinkConfig(object): - def __init__(self, realm, transport, authid, exclude_authid, forward_local_events, forward_remote_events, - forward_local_invocations, forward_remote_invocations): + def __init__(self, realm, transport, authrole, authid, exclude_authid, exclude_authrole, exclude_uri, + forward_local_events, + forward_remote_events, forward_local_invocations, forward_remote_invocations): """ :param realm: The remote router realm. @@ -754,8 +962,12 @@ def __init__(self, realm, transport, authid, exclude_authid, forward_local_event """ self.realm = realm self.transport = transport + self.authrole = authrole self.authid = authid self.exclude_authid = exclude_authid + # Components are assigned UUID for authid, we need to rely on authroles + self.exclude_authrole = exclude_authrole + self.exclude_uri = exclude_uri self.forward_local_events = forward_local_events self.forward_remote_events = forward_remote_events self.forward_local_invocations = forward_local_invocations @@ -770,6 +982,8 @@ def marshal(self): 'transport': self.transport, 'authid': self.authid, 'exclude_authid': self.exclude_authid, + 'exclude_authrole': self.exclude_authrole, + 'exclude_uri': self.exclude_uri, 'forward_local_events': self.forward_local_events, 'forward_remote_events': self.forward_remote_events, 'forward_local_invocations': self.forward_local_invocations, @@ -802,7 +1016,10 @@ def parse(personality, obj, id=None): 'realm': (True, [str]), 'transport': (True, [Mapping]), 'authid': (False, [str]), + 'authrole': (False, [str]), 'exclude_authid': (False, [Sequence]), + 'exclude_authrole': (False, [Sequence]), + 'exclude_uri': (False, [Mapping]), 'forward_local_events': (False, [bool]), 'forward_remote_events': (False, [bool]), 'forward_local_invocations': (False, [bool]), @@ -810,10 +1027,23 @@ def parse(personality, obj, id=None): }, obj, 'router link configuration') realm = obj['realm'] + authrole = obj.get('authrole', None) authid = obj.get('authid', None) exclude_authid = obj.get('exclude_authid', []) for aid in exclude_authid: assert type(aid) == str + exclude_authrole = obj.get('exclude_authrole', []) + for rid in exclude_authrole: + assert type(rid) == str + + exclude_uri = obj.get('exclude_uri', None) + if exclude_uri: + for k, v in exclude_uri.items(): + assert type(k) == str + assert k in ['prefix', 'exact', 'wildcard'] + assert type(v) == list + for i in v: + assert type(i) == str forward_local_events = obj.get('forward_local_events', True) forward_remote_events = obj.get('forward_remote_events', True) forward_local_invocations = obj.get('forward_local_invocations', True) @@ -826,8 +1056,11 @@ def parse(personality, obj, id=None): config = RLinkConfig( realm=realm, transport=transport, + authrole=authrole, authid=authid, exclude_authid=exclude_authid, + exclude_authrole=exclude_authrole, + exclude_uri=exclude_uri, forward_local_events=forward_local_events, forward_remote_events=forward_remote_events, forward_local_invocations=forward_local_invocations, @@ -903,13 +1136,20 @@ def start_link(self, link_id, link_config, caller): 'other': None, 'on_ready': Deferred(), 'rlink': link_id, + 'rlink_id': link_id, + 'exclude_authid': link_config.exclude_authid, + 'exclude_authrole': link_config.exclude_authrole, + 'exclude_uri': URIExclusions(link_config.exclude_uri), 'forward_events': link_config.forward_local_events, 'forward_invocations': link_config.forward_local_invocations, } local_realm = self._realm.config['name'] local_authid = link_config.authid or util.generate_serial_number() - local_authrole = 'trusted' + # Having local authrole be 'trusted' allows this RLink to see other RLinks + # This may need to be changed in order to distinguish between actual trusted sessions and + # RLinks that can see other RLinks + local_authrole = link_config.authrole or 'trusted' local_config = ComponentConfig(local_realm, local_extra) local_session = RLinkLocalSession(local_config) @@ -920,7 +1160,10 @@ def start_link(self, link_id, link_config, caller): 'other': None, 'on_ready': Deferred(), 'authid': link_config.authid, + 'rlink_id': link_id, 'exclude_authid': link_config.exclude_authid, + 'exclude_authrole': link_config.exclude_authrole, + 'exclude_uri': URIExclusions(link_config.exclude_uri), 'forward_events': link_config.forward_remote_events, 'forward_invocations': link_config.forward_remote_invocations, } @@ -947,6 +1190,7 @@ def start_link(self, link_id, link_config, caller): try: # connect the local session # + # Adding to the router session factory will NOT add session to role->session map self._realm.controller._router_session_factory.add(local_session, self._realm.router, authid=local_authid, diff --git a/requirements-latest.txt b/requirements-latest.txt index 395518aca..86fda4a77 100644 --- a/requirements-latest.txt +++ b/requirements-latest.txt @@ -12,7 +12,7 @@ cookiecutter>=2.1.1 cryptography>=39.0.0 docker>=6.0.1 # required for python 3.11+ https://github.com/ethereum/eth-abi/pull/194 -eth-abi @ git+https://github.com/ethereum/eth-abi.git@v4.0.0-beta.2#egg=eth-abi +eth-abi>=4.0.0 # required for python 3.11+ (https://github.com/ethereum/eth-account/pull/212) eth-account @ git+https://github.com/crossbario/eth-account.git@fix-215#egg=eth-account eth-typing @ git+https://github.com/ethereum/eth-typing.git@v3.2.0#egg=eth-typing diff --git a/test/functests/cfctests/rlinks_tests.md b/test/functests/cfctests/rlinks_tests.md new file mode 100644 index 000000000..519305ec0 --- /dev/null +++ b/test/functests/cfctests/rlinks_tests.md @@ -0,0 +1,74 @@ +RLinks +========== + +Nomenclature + - 1..9, A..F - nodes with RLinks + - U,V,W,X,Y,Z - nodes without RLinks + - l - local + - r - remote + - lr - local and remote (both) + +Node specification **N(\[l | r | lr])**, where N is the node number, target is the target node number, and **l | r | lr** - Local/Remote/Both + +.. note:: + + Local (RLinkLocalSession) have authrole `trusted` and can see all registrations and events from + other all other sessions including remote (RLinkRemoteSession) sessions. + + Remote (RLinkRemoteSession) have authrole `rlink` and can only see registrations local to the router, + but not from other rlinks. This is not the case for subscriptions:, remote session can see all events from + all sessions which results in full mesh subscriptions + +Topologies +~~~~~~~~~~ + +Two routers + +- 1[Xl],X - simple node with RLink to X, forwarding local invocations/events to X +- 1[Xr],X - simple node with RLink to X, forwarding forwarding remote invocations/events from X +- 1[Xlr],X - bi-directional RLink to X + +Chain + +- 1[2l],2[3l],3[Xl],X - chain of nodes with RLinks to the next node, forwarding local invocations/events to the next node +- 1[2r],2[3r],3[Xr],X - chain of nodes with RLinks to the next node, forwarding remote invocations/events from the next node +- 1[2lr],2[3lr],3[Xlr],X - chain of nodes with bi-directional RLinks to the next node + +Chain with reciprocal links + +- 1[2l],2(1l,3l),3(2l,4l),4(3l) - chain of nodes with RLinks to the next node, forwarding local invocations/events to the next node, and reciprocal links to the previous node +- 1[2r],2(1r,3r),3(2r,4r),4(3r) - chain of nodes with RLinks to the next node, forwarding remote invocations/events from the next node, and reciprocal links to the previous node +- 1[2lr],2(1lr,3lr),3(2lr,4lr),4(3lr) - [INVALID] Oversubscribed chain of nodes with bi-directional RLinks to the next node, and reciprocal links to the previous node + +Ring + +- 1[2l],2[3l],3[4l],4[1l] - ring of nodes with RLinks to the next node, forwarding local invocations/events to the next node +- 1[2r],2[3r],3[4r],4[1r] - ring of nodes with RLinks to the next node, forwarding remote invocations/events from the next node +- 1[2lr],2[3lr],3[4lr],4[1lr] - ring of nodes with bi-directional RLinks to the next node + +Ring with reciprocal links + +- 1[2l],2(1l,3l),3(2l,4l),4(3l,1l) - ring of nodes with RLinks to the next node, forwarding local invocations/events to the next node, and reciprocal links to the previous node +- 1[2r],2(1r,3r),3(2r,4r),4(3r,1r) - ring of nodes with RLinks to the next node, forwarding remote invocations/events from the next node, and reciprocal links to the previous node +- 1[2lr],2(1lr,3lr),3(2lr,4lr),4(3lr,1lr) - [INVALID] Oversubscribed ring of nodes with bi-directional RLinks to the next node, and reciprocal links to the previous node + +Star + +- 1[Xl],2[Xl],3[Xl],4[Xl],X - star (inward) of nodes with RLinks to the central node, forwarding local invocations/events to the central node +- 1[Xr],2[Xr],3[Xr],4[Xr],X - star (inward) of nodes with RLinks to the central node, forwarding remote invocations/events from the central node +- 1[Xlr],2[Xlr],3[Xlr],4[Xlr],X - star (inward) of nodes with bi-directional RLinks to the central node +- 1(Xl, Yl, Zl),X,Y,Z - star (outward) of central node with RLinks to the outward nodes, forwarding local invocations/events to the outward nodes +- 1(Xr, Yr, Zr),X,Y,Z - star (outward) of central node with RLinks to the outward nodes, forwarding remote invocations/events from the outward nodes +- 1(Xlr, Ylr, Zlr),X,Y,Z - star (outward) of central node with bi-directional RLinks to the outward nodes + +Star with reciprocal links + +- 1[Cl],2[Cl],3[Cl],4[Cl],C(1l,2l,3l,4l) - star of nodes with RLinks to the central node, forwarding local invocations/events to the central node, and reciprocal links from the central node +- 1[Cr],2[Cr],3[Cr],4[Cr],C(1r,2r,3r,4r) - star of nodes with RLinks to the central node, forwarding remote invocations/events from the central node, and reciprocal links from the central node + +Mesh + +- 1[2l,3l,4l],2[1l,3l,4l],3[1l,2l,4l],4[1l,2l,3l] - mesh of nodes with RLinks to all other nodes and reciprocal links (all forwarding local invocations/events) +- 1[2r,3r,4r],2[1r,3r,4r],3[1r,2r,4r],4[1r,2r,3r] - mesh of nodes with RLinks to all other nodes and reciprocal links (all forwarding remote invocations/events) +- 1[2lr,3lr,4lr],2[1lr,3lr,4lr],3[1lr,2lr,4lr],4[1lr,2lr,3lr] - [OVERKILL/INVALID] mesh of nodes with bi-directional RLinks to all other nodes and reciprocal links +