From 70533321960c97b933195d6427b84fa92976fc0d Mon Sep 17 00:00:00 2001 From: Sebastian Molenda Date: Wed, 10 Apr 2024 18:47:08 +0200 Subject: [PATCH] Event engine/compatibility (#185) * Fix compatibility issues between EventEngine and Asyncio subscription manager * PubNub SDK v7.4.4 release. --------- Co-authored-by: PubNub Release Bot <120067856+pubnub-release-bot@users.noreply.github.com> --- .github/workflows/run-tests.yml | 4 +- .pubnub.yml | 13 +- CHANGELOG.md | 6 + pubnub/event_engine/effects.py | 13 +- pubnub/event_engine/models/events.py | 7 +- pubnub/event_engine/models/invocations.py | 5 +- pubnub/event_engine/models/states.py | 147 +++++++++++++--- pubnub/event_engine/statemachine.py | 2 +- pubnub/pubnub_asyncio.py | 37 +++- pubnub/pubnub_core.py | 2 +- pubnub/utils.py | 4 +- scripts/run-tests.py | 2 + setup.py | 2 +- tests/integrational/asyncio/test_heartbeat.py | 8 +- tests/integrational/asyncio/test_here_now.py | 85 ++++----- tests/integrational/asyncio/test_state.py | 34 ++-- tests/integrational/asyncio/test_subscribe.py | 165 ++++++++++-------- .../asyncio/test_unsubscribe_status.py | 34 ++-- tests/integrational/asyncio/test_where_now.py | 45 +++-- 19 files changed, 388 insertions(+), 227 deletions(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 5567fda7..189cf343 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -77,8 +77,8 @@ jobs: cp sdk-specifications/features/encryption/cryptor-module.feature tests/acceptance/encryption mkdir tests/acceptance/encryption/assets/ cp sdk-specifications/features/encryption/assets/* tests/acceptance/encryption/assets/ - cp sdk-specifications/features/subscribe/event-engine/happy-path.feature tests/acceptance/subscribe/happy-path.feature - cp sdk-specifications/features/presence/event-engine/presence-engine.feature tests/acceptance/subscribe/presence-engine.feature + cp sdk-specifications/features/subscribe/event-engine/happy-path_Legacy.feature tests/acceptance/subscribe/happy-path_Legacy.feature + cp sdk-specifications/features/presence/event-engine/presence-engine_Legacy.feature tests/acceptance/subscribe/presence-engine_Legacy.feature sudo pip3 install -r requirements-dev.txt behave --junit tests/acceptance/pam diff --git a/.pubnub.yml b/.pubnub.yml index 710ddd59..efe7712e 100644 --- a/.pubnub.yml +++ b/.pubnub.yml @@ -1,5 +1,5 @@ name: python -version: 7.4.3 +version: 7.4.4 schema: 1 scm: github.com/pubnub/python sdks: @@ -18,7 +18,7 @@ sdks: distributions: - distribution-type: library distribution-repository: package - package-name: pubnub-7.4.3 + package-name: pubnub-7.4.4 location: https://pypi.org/project/pubnub/ supported-platforms: supported-operating-systems: @@ -97,8 +97,8 @@ sdks: - distribution-type: library distribution-repository: git release - package-name: pubnub-7.4.3 - location: https://github.com/pubnub/python/releases/download/v7.4.3/pubnub-7.4.3.tar.gz + package-name: pubnub-7.4.4 + location: https://github.com/pubnub/python/releases/download/v7.4.4/pubnub-7.4.4.tar.gz supported-platforms: supported-operating-systems: Linux: @@ -169,6 +169,11 @@ sdks: license-url: https://github.com/aio-libs/aiohttp/blob/master/LICENSE.txt is-required: Required changelog: + - date: 2024-04-10 + version: v7.4.4 + changes: + - type: bug + text: "Fix compatibility issues between EventEngine and Asyncio subscription manager." - date: 2024-03-28 version: v7.4.3 changes: diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d5af776..374ff02c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## v7.4.4 +April 10 2024 + +#### Fixed +- Fix compatibility issues between EventEngine and Asyncio subscription manager. + ## v7.4.3 March 28 2024 diff --git a/pubnub/event_engine/effects.py b/pubnub/event_engine/effects.py index a6a7ce43..d81fa5ff 100644 --- a/pubnub/event_engine/effects.py +++ b/pubnub/event_engine/effects.py @@ -58,11 +58,11 @@ def get_new_stop_event(self): return event def calculate_reconnection_delay(self, attempts): - if self.reconnection_policy is PNReconnectionPolicy.LINEAR: + if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: + delay = int(math.pow(2, attempts - 5 * math.floor((attempts - 1) / 5)) - 1) + else: delay = self.interval - elif self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: - delay = int(math.pow(2, attempts - 5 * math.floor((attempts - 1) / 5)) - 1) return delay @@ -88,9 +88,9 @@ async def handshake_async(self, channels, groups, stop_event, timetoken: int = 0 request.timetoken(0) response = await request.future() - if isinstance(response, PubNubException): + if isinstance(response, Exception): self.logger.warning(f'Handshake failed: {str(response)}') - handshake_failure = events.HandshakeFailureEvent(str(response), 1, timetoken=timetoken) + handshake_failure = events.HandshakeFailureEvent(response, 1, timetoken=timetoken) self.event_engine.trigger(handshake_failure) elif response.status.error: self.logger.warning(f'Handshake failed: {response.status.error_data.__dict__}') @@ -292,7 +292,7 @@ async def heartbeat(self, channels, groups, stop_event): self.logger.warning(f'Heartbeat failed: {str(response)}') self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups, reason=response.status.error_data, attempt=1)) - elif response.status.error: + elif response.status and response.status.error: self.logger.warning(f'Heartbeat failed: {response.status.error_data.__dict__}') self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups, reason=response.status.error_data, attempt=1)) @@ -427,5 +427,6 @@ def emit_message(self, invocation: invocations.EmitMessagesInvocation): def emit_status(self, invocation: invocations.EmitStatusInvocation): pn_status = PNStatus() pn_status.category = invocation.status + pn_status.operation = invocation.operation pn_status.error = False self.pubnub._subscription_manager._listener_manager.announce_status(pn_status) diff --git a/pubnub/event_engine/models/events.py b/pubnub/event_engine/models/events.py index 6b926337..a9a17ec4 100644 --- a/pubnub/event_engine/models/events.py +++ b/pubnub/event_engine/models/events.py @@ -102,6 +102,10 @@ class ReconnectEvent(PNEvent): pass +class UnsubscribeAllEvent(PNEvent): + pass + + """ Presence Events """ @@ -116,7 +120,8 @@ class HeartbeatReconnectEvent(PNEvent): class HeartbeatLeftAllEvent(PNEvent): - pass + def __init__(self, suppress_leave: bool = False) -> None: + self.suppress_leave = suppress_leave class HeartbeatLeftEvent(PNChannelGroupsEvent): diff --git a/pubnub/event_engine/models/invocations.py b/pubnub/event_engine/models/invocations.py index 6793739e..2b046f46 100644 --- a/pubnub/event_engine/models/invocations.py +++ b/pubnub/event_engine/models/invocations.py @@ -1,6 +1,6 @@ from typing import List, Union from pubnub.exceptions import PubNubException -from pubnub.enums import PNStatusCategory +from pubnub.enums import PNOperationType, PNStatusCategory class PNInvocation: @@ -90,9 +90,10 @@ def __init__(self, messages: Union[None, List[str]]) -> None: class EmitStatusInvocation(PNEmittableInvocation): - def __init__(self, status: Union[None, PNStatusCategory]) -> None: + def __init__(self, status: Union[None, PNStatusCategory], operation: Union[None, PNOperationType] = None) -> None: super().__init__() self.status = status + self.operation = operation """ diff --git a/pubnub/event_engine/models/states.py b/pubnub/event_engine/models/states.py index 72acdfcd..05190b21 100644 --- a/pubnub/event_engine/models/states.py +++ b/pubnub/event_engine/models/states.py @@ -1,4 +1,4 @@ -from pubnub.enums import PNStatusCategory +from pubnub.enums import PNOperationType, PNStatusCategory from pubnub.event_engine.models import invocations from pubnub.event_engine.models.invocations import PNInvocation from pubnub.event_engine.models import events @@ -99,6 +99,7 @@ def __init__(self, context: PNContext) -> None: events.HandshakeSuccessEvent.__name__: self.handshaking_success, events.SubscriptionRestoredEvent.__name__: self.subscription_restored, events.SubscriptionChangedEvent.__name__: self.subscription_changed, + events.UnsubscribeAllEvent.__name__: self.unsubscribe_all, } def on_enter(self, context: Union[None, PNContext]): @@ -171,6 +172,21 @@ def handshaking_success(self, event: events.HandshakeSuccessEvent, context: PNCo invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNConnectedCategory) ) + def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext) -> PNTransition: + self._context.update(context) + self._context.timetoken = 0 + self._context.region = None + self._context.attempt = 0 + self._context.channels = [] + self._context.groups = [] + + return PNTransition( + state=UnsubscribedState, + context=self._context, + invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory, + operation=PNOperationType.PNUnsubscribeOperation) + ) + class HandshakeReconnectingState(PNState): def __init__(self, context: PNContext) -> None: @@ -231,11 +247,18 @@ def give_up(self, event: events.HandshakeReconnectGiveupEvent, context: PNContex self._context.update(context) self._context.attempt = event.attempt self._context.reason = event.reason + status_invocation = None + + if isinstance(event, Exception) and 'status' in event.reason: + status_invocation = invocations.EmitStatusInvocation(status=event.reason.status.category, + operation=PNOperationType.PNUnsubscribeOperation) + else: + status_invocation = invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory) return PNTransition( state=HandshakeFailedState, context=self._context, - invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory) + invocation=status_invocation ) def subscription_restored(self, event: events.SubscriptionRestoredEvent, context: PNContext) -> PNTransition: @@ -270,6 +293,7 @@ def __init__(self, context: PNContext) -> None: events.SubscriptionChangedEvent.__name__: self.subscription_changed, events.ReconnectEvent.__name__: self.reconnect, events.SubscriptionRestoredEvent.__name__: self.subscription_restored, + events.UnsubscribeAllEvent.__name__: self.unsubscribe_all, } def subscription_changed(self, event: events.SubscriptionChangedEvent, context: PNContext) -> PNTransition: @@ -305,6 +329,21 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context context=self._context ) + def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext) -> PNTransition: + self._context.update(context) + self._context.timetoken = 0 + self._context.region = None + self._context.attempt = 0 + self._context.channels = [] + self._context.groups = [] + + return PNTransition( + state=UnsubscribedState, + context=self._context, + invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory, + operation=PNOperationType.PNUnsubscribeOperation) + ) + class HandshakeStoppedState(PNState): def __init__(self, context: PNContext) -> None: @@ -312,7 +351,8 @@ def __init__(self, context: PNContext) -> None: self._context.attempt = 0 self._transitions = { - events.ReconnectEvent.__name__: self.reconnect + events.ReconnectEvent.__name__: self.reconnect, + events.UnsubscribeAllEvent.__name__: self.unsubscribe_all, } def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTransition: @@ -323,6 +363,21 @@ def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTrans context=self._context ) + def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext) -> PNTransition: + self._context.update(context) + self._context.timetoken = 0 + self._context.region = None + self._context.attempt = 0 + self._context.channels = [] + self._context.groups = [] + + return PNTransition( + state=UnsubscribedState, + context=self._context, + invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory, + operation=PNOperationType.PNUnsubscribeOperation) + ) + class ReceivingState(PNState): def __init__(self, context: PNContext) -> None: @@ -336,6 +391,7 @@ def __init__(self, context: PNContext) -> None: events.ReceiveFailureEvent.__name__: self.receiving_failure, events.DisconnectEvent.__name__: self.disconnect, events.ReconnectEvent.__name__: self.reconnect, + events.UnsubscribeAllEvent.__name__: self.unsubscribe_all, } def on_enter(self, context: Union[None, PNContext]): @@ -410,6 +466,21 @@ def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTrans context=self._context ) + def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext) -> PNTransition: + self._context.update(context) + self._context.timetoken = 0 + self._context.region = None + self._context.attempt = 0 + self._context.channels = [] + self._context.groups = [] + + return PNTransition( + state=UnsubscribedState, + context=self._context, + invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory, + operation=PNOperationType.PNUnsubscribeOperation) + ) + class ReceiveReconnectingState(PNState): def __init__(self, context: PNContext) -> None: @@ -511,6 +582,7 @@ def __init__(self, context: PNContext) -> None: events.SubscriptionChangedEvent.__name__: self.subscription_changed, events.SubscriptionRestoredEvent.__name__: self.subscription_restored, events.ReconnectEvent.__name__: self.reconnect, + events.UnsubscribeAllEvent.__name__: self.unsubscribe_all, } def reconnect_retry(self, event: events.ReceiveReconnectRetryEvent, context: PNContext) -> PNTransition: @@ -554,6 +626,21 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context context=self._context ) + def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext) -> PNTransition: + self._context.update(context) + self._context.timetoken = 0 + self._context.region = None + self._context.attempt = 0 + self._context.channels = [] + self._context.groups = [] + + return PNTransition( + state=UnsubscribedState, + context=self._context, + invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory, + operation=PNOperationType.PNUnsubscribeOperation) + ) + class ReceiveStoppedState(PNState): def __init__(self, context: PNContext) -> None: @@ -561,7 +648,8 @@ def __init__(self, context: PNContext) -> None: self._context.attempt = 0 self._transitions = { - events.ReconnectEvent.__name__: self.reconnect + events.ReconnectEvent.__name__: self.reconnect, + events.UnsubscribeAllEvent.__name__: self.unsubscribe_all, } def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTransition: @@ -572,6 +660,21 @@ def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTrans context=self._context ) + def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext) -> PNTransition: + self._context.update(context) + self._context.timetoken = 0 + self._context.region = None + self._context.attempt = 0 + self._context.channels = [] + self._context.groups = [] + + return PNTransition( + state=UnsubscribedState, + context=self._context, + invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory, + operation=PNOperationType.PNUnsubscribeOperation) + ) + """ Presence states @@ -711,13 +814,12 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: self._context.update(context) - self._context.channels = [] - self._context.groups = [] - invocation = None if not event.suppress_leave: - invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, - groups=event.groups) + invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels, + groups=self._context.groups) + self._context.channels = [] + self._context.groups = [] return PNTransition( state=HeartbeatInactiveState, @@ -769,13 +871,12 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: self._context.update(context) - self._context.channels = [] - self._context.groups = [] - invocation = None if not event.suppress_leave: - invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, - groups=event.groups) + invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels, + groups=self._context.groups) + self._context.channels = [] + self._context.groups = [] return PNTransition( state=HeartbeatInactiveState, @@ -857,13 +958,12 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: self._context.update(context) - self._context.channels = [] - self._context.groups = [] - invocation = None if not event.suppress_leave: - invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, - groups=event.groups) + invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels, + groups=self._context.groups) + self._context.channels = [] + self._context.groups = [] return PNTransition( state=HeartbeatInactiveState, @@ -1005,13 +1105,12 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: self._context.update(context) - self._context.channels = [] - self._context.groups = [] - invocation = None if not event.suppress_leave: - invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, - groups=event.groups) + invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels, + groups=self._context.groups) + self._context.channels = [] + self._context.groups = [] return PNTransition( state=HeartbeatInactiveState, diff --git a/pubnub/event_engine/statemachine.py b/pubnub/event_engine/statemachine.py index 41c0b327..2718ff15 100644 --- a/pubnub/event_engine/statemachine.py +++ b/pubnub/event_engine/statemachine.py @@ -81,7 +81,7 @@ def trigger(self, event: events.PNEvent) -> states.PNTransition: def dispatch_effects(self): for invocation in self._invocations: - self.logger.debug(f'Dispatching {invocation.__class__.__name__} {id(invocation)}') + self.logger.debug(f'Dispatching {invocation.__class__.__name__} {invocation.__dict__} {id(invocation)}') self._dispatcher.dispatch_effect(invocation) self._invocations.clear() diff --git a/pubnub/pubnub_asyncio.py b/pubnub/pubnub_asyncio.py index 14c83eec..2822023e 100644 --- a/pubnub/pubnub_asyncio.py +++ b/pubnub/pubnub_asyncio.py @@ -591,7 +591,7 @@ def adapt_subscribe_builder(self, subscribe_operation: SubscribeOperation): with_presence=subscribe_operation.presence_enabled ) self.event_engine.trigger(subscription_event) - if self._pubnub.config._heartbeat_interval > 0: + if self._pubnub.config.enable_presence_heartbeat and self._pubnub.config._heartbeat_interval > 0: self.presence_engine.trigger(events.HeartbeatJoinedEvent( channels=subscribe_operation.channels, groups=subscribe_operation.channel_groups @@ -609,23 +609,42 @@ def adapt_unsubscribe_builder(self, unsubscribe_operation): self.event_engine.get_context().groups, self.event_engine.get_context().with_presence) - self.event_engine.trigger(events.SubscriptionChangedEvent(channels=channels, groups=groups)) + if channels or groups: + self.event_engine.trigger(events.SubscriptionChangedEvent(channels=channels, groups=groups)) + else: + self.event_engine.trigger(events.UnsubscribeAllEvent()) - self.presence_engine.trigger(event=events.HeartbeatLeftEvent( - channels=unsubscribe_operation.channels, - groups=unsubscribe_operation.channel_groups, - suppress_leave=self._pubnub.config.suppress_leave_events - )) + if self._pubnub.config.enable_presence_heartbeat and self._pubnub.config._heartbeat_interval > 0: + self.presence_engine.trigger(event=events.HeartbeatLeftEvent( + channels=unsubscribe_operation.channels, + groups=unsubscribe_operation.channel_groups, + suppress_leave=self._pubnub.config.suppress_leave_events + )) def adapt_state_builder(self, state_operation): self.state_container.register_state(state_operation.state, - state_operation.channels, - state_operation.channel_groups) + state_operation.channels) return super().adapt_state_builder(state_operation) + def unsubscribe_all(self): + self.adapt_unsubscribe_builder(UnsubscribeOperation( + channels=self.get_subscribed_channels(), + channel_groups=self.get_subscribed_channel_groups())) + def get_custom_params(self): return {'ee': 1} + def get_subscribed_channels(self): + return self.event_engine.get_context().channels + + def get_subscribed_channel_groups(self): + return self.event_engine.get_context().groups + + def _stop_heartbeat_timer(self): + self.presence_engine.trigger(events.HeartbeatLeftAllEvent( + suppress_leave=self._pubnub.config.suppress_leave_events)) + self.presence_engine.stop() + class AsyncioSubscribeMessageWorker(SubscribeMessageWorker): async def run(self): diff --git a/pubnub/pubnub_core.py b/pubnub/pubnub_core.py index 0a954ad4..29e60fdd 100644 --- a/pubnub/pubnub_core.py +++ b/pubnub/pubnub_core.py @@ -85,7 +85,7 @@ class PubNubCore: """A base class for PubNub Python API implementations""" - SDK_VERSION = "7.4.3" + SDK_VERSION = "7.4.4" SDK_NAME = "PubNub-Python" TIMESTAMP_DIVIDER = 1000 diff --git a/pubnub/utils.py b/pubnub/utils.py index 27340ac6..2838bac8 100644 --- a/pubnub/utils.py +++ b/pubnub/utils.py @@ -94,8 +94,10 @@ def is_subscribed_event(status): def is_unsubscribed_event(status): assert isinstance(status, PNStatus) - return status.category == PNStatusCategory.PNAcknowledgmentCategory \ + is_disconnect = status.category == PNStatusCategory.PNDisconnectedCategory + is_unsubscribe = status.category == PNStatusCategory.PNAcknowledgmentCategory \ and status.operation == PNOperationType.PNUnsubscribeOperation + return is_disconnect or is_unsubscribe def prepare_pam_arguments(unsorted_params): diff --git a/scripts/run-tests.py b/scripts/run-tests.py index 80ff48a0..9b6ee82b 100755 --- a/scripts/run-tests.py +++ b/scripts/run-tests.py @@ -12,6 +12,7 @@ os.chdir(os.path.join(REPO_ROOT)) tcmn = 'py.test tests --cov=pubnub --ignore=tests/manual/' +tcmn_ee = 'PN_ENABLE_EVENT_ENGINE=True pytest tests/integrational/asyncio/' fcmn = 'flake8 --exclude=scripts/,src/,.cache,.git,.idea,.tox,._trial_temp/,venv/ --ignore F811,E402' @@ -20,5 +21,6 @@ def run(command): run(tcmn) +run(tcmn_ee) # moved to separate action # run(fcmn) diff --git a/setup.py b/setup.py index bd9b8e0e..d131655d 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='pubnub', - version='7.4.3', + version='7.4.4', description='PubNub Real-time push service in the cloud', author='PubNub', author_email='support@pubnub.com', diff --git a/tests/integrational/asyncio/test_heartbeat.py b/tests/integrational/asyncio/test_heartbeat.py index a66a284d..6e720d29 100644 --- a/tests/integrational/asyncio/test_heartbeat.py +++ b/tests/integrational/asyncio/test_heartbeat.py @@ -3,7 +3,7 @@ import pytest import pubnub as pn -from pubnub.pubnub_asyncio import PubNubAsyncio, SubscribeListener +from pubnub.pubnub_asyncio import AsyncioSubscriptionManager, PubNubAsyncio, SubscribeListener from tests import helper from tests.helper import pnconf_sub_copy @@ -69,11 +69,13 @@ async def test_timeout_event_on_broken_heartbeat(event_loop): assert pubnub.uuid == envelope.uuid pubnub.unsubscribe().channels(ch).execute() - await callback_messages.wait_for_disconnect() + if isinstance(pubnub._subscription_manager, AsyncioSubscriptionManager): + await callback_messages.wait_for_disconnect() # - disconnect from :ch-pnpres pubnub_listener.unsubscribe().channels(ch).execute() - await callback_presence.wait_for_disconnect() + if isinstance(pubnub._subscription_manager, AsyncioSubscriptionManager): + await callback_presence.wait_for_disconnect() await pubnub.stop() await pubnub_listener.stop() diff --git a/tests/integrational/asyncio/test_here_now.py b/tests/integrational/asyncio/test_here_now.py index 8189300a..b5417ac8 100644 --- a/tests/integrational/asyncio/test_here_now.py +++ b/tests/integrational/asyncio/test_here_now.py @@ -2,21 +2,22 @@ import pytest from pubnub.models.consumer.presence import PNHereNowResult -from pubnub.pubnub_asyncio import PubNubAsyncio -from tests.helper import pnconf_sub_copy, pnconf_demo_copy -from tests.integrational.vcr_asyncio_sleeper import get_sleeper, VCR599Listener -from tests.integrational.vcr_helper import pn_vcr +from pubnub.pubnub_asyncio import AsyncioSubscriptionManager, PubNubAsyncio +from tests.helper import pnconf_demo_copy, pnconf_sub_copy +from tests.integrational.vcr_asyncio_sleeper import VCR599Listener +# from tests.integrational.vcr_helper import pn_vcr -@get_sleeper('tests/integrational/fixtures/asyncio/here_now/single_channel.yaml') -@pn_vcr.use_cassette( - 'tests/integrational/fixtures/asyncio/here_now/single_channel.yaml', - filter_query_parameters=['tr', 'uuid', 'pnsdk', 'l_pres', 'tt'] -) +# @pn_vcr.use_cassette( +# 'tests/integrational/fixtures/asyncio/here_now/single_channel.yaml', +# filter_query_parameters=['tr', 'uuid', 'pnsdk', 'l_pres', 'tt'] +# ) @pytest.mark.asyncio -async def test_single_channel(event_loop, sleeper=asyncio.sleep): - pubnub = PubNubAsyncio(pnconf_sub_copy(), custom_event_loop=event_loop) - pubnub.config.uuid = 'test-here-now-asyncio-uuid1' +async def test_single_channel(): + config = pnconf_sub_copy() + config.uuuid = 'test-here-now-asyncio-uuid1' + pubnub = PubNubAsyncio(config) + ch = "test-here-now-asyncio-ch" callback = VCR599Listener(1) @@ -25,7 +26,7 @@ async def test_single_channel(event_loop, sleeper=asyncio.sleep): await callback.wait_for_connect() - await sleeper(5) + await asyncio.sleep(5) env = await pubnub.here_now()\ .channels(ch)\ @@ -50,21 +51,22 @@ async def test_single_channel(event_loop, sleeper=asyncio.sleep): assert result.total_occupancy == 1 pubnub.unsubscribe().channels(ch).execute() - await callback.wait_for_disconnect() + if isinstance(pubnub._subscription_manager, AsyncioSubscriptionManager): + await callback.wait_for_disconnect() await pubnub.stop() -@get_sleeper('tests/integrational/fixtures/asyncio/here_now/multiple_channels.yaml') -@pn_vcr.use_cassette( - 'tests/integrational/fixtures/asyncio/here_now/multiple_channels.yaml', - filter_query_parameters=['pnsdk', 'l_pres'], - match_on=['method', 'scheme', 'host', 'port', 'string_list_in_path', 'query'] -) +# @pn_vcr.use_cassette( +# 'tests/integrational/fixtures/asyncio/here_now/multiple_channels.yaml', +# filter_query_parameters=['pnsdk', 'l_pres'], +# match_on=['method', 'scheme', 'host', 'port', 'string_list_in_path', 'query'] +# ) @pytest.mark.asyncio -async def test_multiple_channels(event_loop, sleeper=asyncio.sleep): - pubnub = PubNubAsyncio(pnconf_sub_copy(), custom_event_loop=event_loop) - pubnub.config.uuid = 'test-here-now-asyncio-uuid1' +async def test_multiple_channels(): + config = pnconf_sub_copy() + config.uuuid = 'test-here-now-asyncio-uuid1' + pubnub = PubNubAsyncio(config) ch1 = "test-here-now-asyncio-ch1" ch2 = "test-here-now-asyncio-ch2" @@ -75,7 +77,7 @@ async def test_multiple_channels(event_loop, sleeper=asyncio.sleep): await callback.wait_for_connect() - await sleeper(5) + await asyncio.sleep(5) env = await pubnub.here_now() \ .channels([ch1, ch2]) \ @@ -100,24 +102,26 @@ async def test_multiple_channels(event_loop, sleeper=asyncio.sleep): assert result.total_channels == 2 pubnub.unsubscribe().channels([ch1, ch2]).execute() - await callback.wait_for_disconnect() + if isinstance(pubnub._subscription_manager, AsyncioSubscriptionManager): + await callback.wait_for_disconnect() await pubnub.stop() -@get_sleeper('tests/integrational/fixtures/asyncio/here_now/global.yaml') -@pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/here_now/global.yaml', - filter_query_parameters=['pnsdk', 'l_pres'], - match_on=['method', 'scheme', 'host', 'port', 'string_list_in_path', 'query'], - match_on_kwargs={ - 'string_list_in_path': { - 'positions': [4] - } - }) +# @pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/here_now/global.yaml', +# filter_query_parameters=['pnsdk', 'l_pres'], +# match_on=['method', 'scheme', 'host', 'port', 'string_list_in_path', 'query'], +# match_on_kwargs={ +# 'string_list_in_path': { +# 'positions': [4] +# } +# }) @pytest.mark.asyncio -async def test_global(event_loop, sleeper=asyncio.sleep): - pubnub = PubNubAsyncio(pnconf_sub_copy(), custom_event_loop=event_loop) - pubnub.config.uuid = 'test-here-now-asyncio-uuid1' +@pytest.mark.skip(reason='this feature is not enabled by default') +async def test_global(): + config = pnconf_sub_copy() + config.uuuid = 'test-here-now-asyncio-uuid1' + pubnub = PubNubAsyncio(config) ch1 = "test-here-now-asyncio-ch1" ch2 = "test-here-now-asyncio-ch2" @@ -128,7 +132,7 @@ async def test_global(event_loop, sleeper=asyncio.sleep): await callback.wait_for_connect() - await sleeper(5) + await asyncio.sleep(5) env = await pubnub.here_now().future() @@ -136,14 +140,15 @@ async def test_global(event_loop, sleeper=asyncio.sleep): assert env.result.total_occupancy >= 1 pubnub.unsubscribe().channels([ch1, ch2]).execute() - await callback.wait_for_disconnect() + if isinstance(pubnub._subscription_manager, AsyncioSubscriptionManager): + await callback.wait_for_disconnect() await pubnub.stop() @pytest.mark.asyncio async def test_here_now_super_call(event_loop): - pubnub = PubNubAsyncio(pnconf_demo_copy(), custom_event_loop=event_loop) + pubnub = PubNubAsyncio(pnconf_demo_copy()) pubnub.config.uuid = 'test-here-now-asyncio-uuid1' env = await pubnub.here_now().future() diff --git a/tests/integrational/asyncio/test_state.py b/tests/integrational/asyncio/test_state.py index 86ef7916..e55651fa 100644 --- a/tests/integrational/asyncio/test_state.py +++ b/tests/integrational/asyncio/test_state.py @@ -4,9 +4,9 @@ import pubnub as pn from pubnub.models.consumer.presence import PNSetStateResult, PNGetStateResult -from pubnub.pubnub_asyncio import PubNubAsyncio +from pubnub.pubnub_asyncio import AsyncioSubscriptionManager, PubNubAsyncio from tests.helper import pnconf, pnconf_copy, pnconf_sub_copy, pnconf_pam_copy -from tests.integrational.vcr_asyncio_sleeper import get_sleeper, VCR599Listener +from tests.integrational.vcr_asyncio_sleeper import VCR599Listener from tests.integrational.vcr_helper import pn_vcr @@ -18,8 +18,8 @@ filter_query_parameters=['uuid', 'pnsdk'], match_on=['method', 'host', 'path', 'state_object_in_query']) @pytest.mark.asyncio -async def test_single_channelx(event_loop): - pubnub = PubNubAsyncio(pnconf_copy(), custom_event_loop=event_loop) +async def test_single_channel(): + pubnub = PubNubAsyncio(pnconf_copy()) ch = 'test-state-asyncio-ch' pubnub.config.uuid = 'test-state-asyncio-uuid' state = {"name": "Alex", "count": 5} @@ -42,18 +42,13 @@ async def test_single_channelx(event_loop): await pubnub.stop() -@get_sleeper('tests/integrational/fixtures/asyncio/state/single_channel_with_subscription.yaml') -@pn_vcr.use_cassette( - 'tests/integrational/fixtures/asyncio/state/single_channel_with_subscription.yaml', - filter_query_parameters=['uuid', 'pnsdk'], - match_on=['method', 'host', 'path', 'state_object_in_query']) @pytest.mark.asyncio -async def test_single_channel_with_subscription(event_loop, sleeper=asyncio.sleep): +async def test_single_channel_with_subscription(): pnconf = pnconf_sub_copy() pnconf.set_presence_timeout(12) - pubnub = PubNubAsyncio(pnconf, custom_event_loop=event_loop) - ch = 'test-state-asyncio-ch' - pubnub.config.uuid = 'test-state-asyncio-uuid' + pubnub = PubNubAsyncio(pnconf) + ch = 'test-state-asyncio-ch-with-subscription' + pubnub.config.uuid = 'test-state-asyncio-uuid-with-subscription' state = {"name": "Alex", "count": 5} callback = VCR599Listener(1) @@ -61,7 +56,7 @@ async def test_single_channel_with_subscription(event_loop, sleeper=asyncio.slee pubnub.subscribe().channels(ch).execute() await callback.wait_for_connect() - await sleeper(20) + await asyncio.sleep(20) env = await pubnub.set_state() \ .channels(ch) \ @@ -79,7 +74,8 @@ async def test_single_channel_with_subscription(event_loop, sleeper=asyncio.slee assert env.result.channels[ch]['count'] == 5 pubnub.unsubscribe().channels(ch).execute() - await callback.wait_for_disconnect() + if isinstance(pubnub._subscription_manager, AsyncioSubscriptionManager): + await callback.wait_for_disconnect() await pubnub.stop() @@ -89,8 +85,8 @@ async def test_single_channel_with_subscription(event_loop, sleeper=asyncio.slee filter_query_parameters=['uuid', 'pnsdk'], match_on=['method', 'host', 'path', 'state_object_in_query']) @pytest.mark.asyncio -async def test_multiple_channels(event_loop): - pubnub = PubNubAsyncio(pnconf, custom_event_loop=event_loop) +async def test_multiple_channels(): + pubnub = PubNubAsyncio(pnconf) ch1 = 'test-state-asyncio-ch1' ch2 = 'test-state-asyncio-ch2' pubnub.config.uuid = 'test-state-asyncio-uuid' @@ -117,9 +113,9 @@ async def test_multiple_channels(event_loop): @pytest.mark.asyncio -async def test_state_super_admin_call(event_loop): +async def test_state_super_admin_call(): pnconf = pnconf_pam_copy() - pubnub = PubNubAsyncio(pnconf, custom_event_loop=event_loop) + pubnub = PubNubAsyncio(pnconf) ch1 = 'test-state-asyncio-ch1' ch2 = 'test-state-asyncio-ch2' pubnub.config.uuid = 'test-state-asyncio-uuid-|.*$' diff --git a/tests/integrational/asyncio/test_subscribe.py b/tests/integrational/asyncio/test_subscribe.py index 272917b7..c103bbbf 100644 --- a/tests/integrational/asyncio/test_subscribe.py +++ b/tests/integrational/asyncio/test_subscribe.py @@ -5,10 +5,10 @@ from unittest.mock import patch from pubnub.models.consumer.pubsub import PNMessageResult -from pubnub.pubnub_asyncio import PubNubAsyncio, AsyncioEnvelope, SubscribeListener -from tests.helper import pnconf_sub_copy, pnconf_enc_sub_copy -from tests.integrational.vcr_asyncio_sleeper import get_sleeper, VCR599Listener, VCR599ReconnectionManager -from tests.integrational.vcr_helper import pn_vcr +from pubnub.pubnub_asyncio import AsyncioSubscriptionManager, PubNubAsyncio, AsyncioEnvelope, SubscribeListener +from tests.helper import pnconf_enc_env_copy, pnconf_env_copy +from tests.integrational.vcr_asyncio_sleeper import VCR599Listener, VCR599ReconnectionManager +# from tests.integrational.vcr_helper import pn_vcr pn.set_stream_logger('pubnub', logging.DEBUG) @@ -17,13 +17,14 @@ async def patch_pubnub(pubnub): pubnub._subscription_manager._reconnection_manager = VCR599ReconnectionManager(pubnub) -@pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/subscription/sub_unsub.yaml', - filter_query_parameters=['uuid', 'pnsdk']) +# TODO: refactor cassette +# @pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/subscription/sub_unsub.json', serializer='pn_json', +# filter_query_parameters=['pnsdk', 'ee', 'tr']) @pytest.mark.asyncio -async def test_subscribe_unsubscribe(event_loop): +async def test_subscribe_unsubscribe(): channel = "test-subscribe-asyncio-ch" - - pubnub = PubNubAsyncio(pnconf_sub_copy(), custom_event_loop=event_loop) + config = pnconf_env_copy(enable_subscribe=True, enable_presence_heartbeat=False) + pubnub = PubNubAsyncio(config) callback = SubscribeListener() pubnub.add_listener(callback) @@ -40,26 +41,30 @@ async def test_subscribe_unsubscribe(event_loop): assert channel not in pubnub.get_subscribed_channels() assert len(pubnub.get_subscribed_channels()) == 0 - # await callback.wait_for_disconnect() + # with EE you don't have to wait for disconnect + if isinstance(pubnub._subscription_manager, AsyncioSubscriptionManager): + await callback.wait_for_disconnect() assert channel not in pubnub.get_subscribed_channels() assert len(pubnub.get_subscribed_channels()) == 0 await pubnub.stop() + await asyncio.sleep(3) -@pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/subscription/sub_pub_unsub.yaml', - filter_query_parameters=['pnsdk']) +# @pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/subscription/sub_pub_unsub.json', serializer='pn_json', +# filter_query_parameters=['pnsdk', 'ee', 'tr']) @pytest.mark.asyncio -async def test_subscribe_publish_unsubscribe(event_loop): - pubnub_sub = PubNubAsyncio(pnconf_sub_copy(), custom_event_loop=event_loop) - pubnub_pub = PubNubAsyncio(pnconf_sub_copy(), custom_event_loop=event_loop) +async def test_subscribe_publish_unsubscribe(): + sub_config = pnconf_env_copy(enable_subscribe=True, enable_presence_heartbeat=False) + pub_config = pnconf_env_copy(enable_subscribe=True, enable_presence_heartbeat=False) + sub_config.uuid = 'test-subscribe-asyncio-uuid-sub' + pub_config.uuid = 'test-subscribe-asyncio-uuid-pub' + pubnub_sub = PubNubAsyncio(sub_config) + pubnub_pub = PubNubAsyncio(pub_config) await patch_pubnub(pubnub_sub) await patch_pubnub(pubnub_pub) - pubnub_sub.config.uuid = 'test-subscribe-asyncio-uuid-sub' - pubnub_pub.config.uuid = 'test-subscribe-asyncio-uuid-pub' - callback = VCR599Listener(1) channel = "test-subscribe-asyncio-ch" message = "hey" @@ -90,19 +95,22 @@ async def test_subscribe_publish_unsubscribe(event_loop): assert publish_envelope.status.original_response[0] == 1 pubnub_sub.unsubscribe().channels(channel).execute() - # await callback.wait_for_disconnect() + # with EE you don't have to wait for disconnect + if isinstance(pubnub_sub._subscription_manager, AsyncioSubscriptionManager): + await callback.wait_for_disconnect() await pubnub_pub.stop() await pubnub_sub.stop() -@pn_vcr.use_cassette( - 'tests/integrational/fixtures/asyncio/subscription/sub_pub_unsub_enc.yaml', - filter_query_parameters=['pnsdk'] -) +# @pn_vcr.use_cassette( +# 'tests/integrational/fixtures/asyncio/subscription/sub_pub_unsub_enc.yaml', +# filter_query_parameters=['pnsdk'] +# ) @pytest.mark.asyncio -async def test_encrypted_subscribe_publish_unsubscribe(event_loop): - pubnub = PubNubAsyncio(pnconf_enc_sub_copy(), custom_event_loop=event_loop) +async def test_encrypted_subscribe_publish_unsubscribe(): + + pubnub = PubNubAsyncio(pnconf_enc_env_copy(enable_subscribe=True)) pubnub.config.uuid = 'test-subscribe-asyncio-uuid' with patch("pubnub.crypto.PubNubCryptodome.get_initialization_vector", return_value="knightsofni12345"): @@ -136,26 +144,25 @@ async def test_encrypted_subscribe_publish_unsubscribe(event_loop): assert publish_envelope.status.original_response[0] == 1 pubnub.unsubscribe().channels(channel).execute() - await callback.wait_for_disconnect() + # with EE you don't have to wait for disconnect + if isinstance(pubnub._subscription_manager, AsyncioSubscriptionManager): + await callback.wait_for_disconnect() await pubnub.stop() -@pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/subscription/join_leave.yaml', - filter_query_parameters=['pnsdk', 'l_cg']) +# @pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/subscription/join_leave.yaml', +# filter_query_parameters=['pnsdk', 'l_cg']) @pytest.mark.asyncio -async def test_join_leave(event_loop): +async def test_join_leave(): channel = "test-subscribe-asyncio-join-leave-ch" - pubnub = PubNubAsyncio(pnconf_sub_copy(), custom_event_loop=event_loop) - pubnub_listener = PubNubAsyncio(pnconf_sub_copy(), custom_event_loop=event_loop) + pubnub = PubNubAsyncio(pnconf_env_copy(enable_subscribe=True, uuid="test-subscribe-asyncio-messenger")) + pubnub_listener = PubNubAsyncio(pnconf_env_copy(enable_subscribe=True, uuid="test-subscribe-asyncio-listener")) await patch_pubnub(pubnub) await patch_pubnub(pubnub_listener) - pubnub.config.uuid = "test-subscribe-asyncio-messenger" - pubnub_listener.config.uuid = "test-subscribe-asyncio-listener" - callback_presence = VCR599Listener(1) callback_messages = VCR599Listener(1) @@ -164,7 +171,9 @@ async def test_join_leave(event_loop): await callback_presence.wait_for_connect() + await asyncio.sleep(1) envelope = await callback_presence.wait_for_presence_on(channel) + assert envelope.channel == channel assert envelope.event == 'join' assert envelope.uuid == pubnub_listener.uuid @@ -179,35 +188,37 @@ async def test_join_leave(event_loop): assert envelope.uuid == pubnub.uuid pubnub.unsubscribe().channels(channel).execute() - await callback_messages.wait_for_disconnect() + # with EE you don't have to wait for disconnect + if isinstance(pubnub._subscription_manager, AsyncioSubscriptionManager): + await callback_messages.wait_for_disconnect() envelope = await callback_presence.wait_for_presence_on(channel) - assert envelope.channel == channel assert envelope.event == 'leave' assert envelope.uuid == pubnub.uuid pubnub_listener.unsubscribe().channels(channel).execute() - await callback_presence.wait_for_disconnect() + # with EE you don't have to wait for disconnect + if isinstance(pubnub._subscription_manager, AsyncioSubscriptionManager): + await callback_presence.wait_for_disconnect() await pubnub.stop() await pubnub_listener.stop() -@get_sleeper('tests/integrational/fixtures/asyncio/subscription/cg_sub_unsub.yaml') -@pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/subscription/cg_sub_unsub.yaml', - filter_query_parameters=['uuid', 'pnsdk', 'l_cg', 'l_pres']) +# @pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/subscription/cg_sub_unsub.yaml', +# filter_query_parameters=['uuid', 'pnsdk', 'l_cg', 'l_pres']) @pytest.mark.asyncio -async def test_cg_subscribe_unsubscribe(event_loop, sleeper=asyncio.sleep): +async def test_cg_subscribe_unsubscribe(): ch = "test-subscribe-asyncio-channel" gr = "test-subscribe-asyncio-group" - pubnub = PubNubAsyncio(pnconf_sub_copy(), custom_event_loop=event_loop) + pubnub = PubNubAsyncio(pnconf_env_copy(enable_subscribe=True)) envelope = await pubnub.add_channel_to_channel_group().channel_group(gr).channels(ch).future() assert envelope.status.original_response['status'] == 200 - await sleeper(3) + await asyncio.sleep(3) callback_messages = SubscribeListener() pubnub.add_listener(callback_messages) @@ -215,7 +226,9 @@ async def test_cg_subscribe_unsubscribe(event_loop, sleeper=asyncio.sleep): await callback_messages.wait_for_connect() pubnub.unsubscribe().channel_groups(gr).execute() - await callback_messages.wait_for_disconnect() + # with EE you don't have to wait for disconnect + if isinstance(pubnub._subscription_manager, AsyncioSubscriptionManager): + await callback_messages.wait_for_disconnect() envelope = await pubnub.remove_channel_from_channel_group().channel_group(gr).channels(ch).future() assert envelope.status.original_response['status'] == 200 @@ -223,21 +236,21 @@ async def test_cg_subscribe_unsubscribe(event_loop, sleeper=asyncio.sleep): await pubnub.stop() -@get_sleeper('tests/integrational/fixtures/asyncio/subscription/cg_sub_pub_unsub.yaml') -@pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/subscription/cg_sub_pub_unsub.yaml', - filter_query_parameters=['uuid', 'pnsdk', 'l_cg', 'l_pres', 'l_pub']) +# @get_sleeper('tests/integrational/fixtures/asyncio/subscription/cg_sub_pub_unsub.yaml') +# @pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/subscription/cg_sub_pub_unsub.yaml', +# filter_query_parameters=['uuid', 'pnsdk', 'l_cg', 'l_pres', 'l_pub']) @pytest.mark.asyncio -async def test_cg_subscribe_publish_unsubscribe(event_loop, sleeper=asyncio.sleep): +async def test_cg_subscribe_publish_unsubscribe(): ch = "test-subscribe-asyncio-channel" gr = "test-subscribe-asyncio-group" message = "hey" - pubnub = PubNubAsyncio(pnconf_sub_copy(), custom_event_loop=event_loop) + pubnub = PubNubAsyncio(pnconf_env_copy(enable_subscribe=True)) envelope = await pubnub.add_channel_to_channel_group().channel_group(gr).channels(ch).future() assert envelope.status.original_response['status'] == 200 - await sleeper(1) + await asyncio.sleep(1) callback_messages = VCR599Listener(1) pubnub.add_listener(callback_messages) @@ -259,7 +272,9 @@ async def test_cg_subscribe_publish_unsubscribe(event_loop, sleeper=asyncio.slee assert sub_envelope.message == message pubnub.unsubscribe().channel_groups(gr).execute() - await callback_messages.wait_for_disconnect() + # with EE you don't have to wait for disconnect + if isinstance(pubnub._subscription_manager, AsyncioSubscriptionManager): + await callback_messages.wait_for_disconnect() envelope = await pubnub.remove_channel_from_channel_group().channel_group(gr).channels(ch).future() assert envelope.status.original_response['status'] == 200 @@ -267,24 +282,20 @@ async def test_cg_subscribe_publish_unsubscribe(event_loop, sleeper=asyncio.slee await pubnub.stop() -@get_sleeper('tests/integrational/fixtures/asyncio/subscription/cg_join_leave.yaml') -@pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/subscription/cg_join_leave.yaml', - filter_query_parameters=['pnsdk', 'l_cg', 'l_pres']) +@pytest.mark.skip +# @pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/subscription/cg_join_leave.json', serializer='pn_json', +# filter_query_parameters=['pnsdk', 'l_cg', 'l_pres', 'ee', 'tr']) @pytest.mark.asyncio -async def test_cg_join_leave(event_loop, sleeper=asyncio.sleep): - pubnub = PubNubAsyncio(pnconf_sub_copy(), custom_event_loop=event_loop) - pubnub_listener = PubNubAsyncio(pnconf_sub_copy(), custom_event_loop=event_loop) - - pubnub.config.uuid = "test-subscribe-asyncio-messenger" - pubnub_listener.config.uuid = "test-subscribe-asyncio-listener" +async def test_cg_join_leave(): + pubnub = PubNubAsyncio(pnconf_env_copy(enable_subscribe=True, uuid="test-subscribe-asyncio-messenger")) + pubnub_listener = PubNubAsyncio(pnconf_env_copy(enable_subscribe=True, uuid="test-subscribe-asyncio-listener")) ch = "test-subscribe-asyncio-join-leave-cg-channel" gr = "test-subscribe-asyncio-join-leave-cg-group" envelope = await pubnub.add_channel_to_channel_group().channel_group(gr).channels(ch).future() assert envelope.status.original_response['status'] == 200 - - await sleeper(1) + await asyncio.sleep(1) callback_messages = VCR599Listener(1) callback_presence = VCR599Listener(1) @@ -325,7 +336,10 @@ async def test_cg_join_leave(event_loop, sleeper=asyncio.sleep): assert prs_envelope.subscription == gr pubnub_listener.unsubscribe().channel_groups(gr).execute() - await callback_presence.wait_for_disconnect() + + # with EE you don't have to wait for disconnect + if isinstance(pubnub._subscription_manager, AsyncioSubscriptionManager): + await callback_presence.wait_for_disconnect() envelope = await pubnub.remove_channel_from_channel_group().channel_group(gr).channels(ch).future() assert envelope.status.original_response['status'] == 200 @@ -334,17 +348,15 @@ async def test_cg_join_leave(event_loop, sleeper=asyncio.sleep): await pubnub_listener.stop() -@get_sleeper('tests/integrational/fixtures/asyncio/subscription/unsubscribe_all.yaml') -@pn_vcr.use_cassette( - 'tests/integrational/fixtures/asyncio/subscription/unsubscribe_all.yaml', - filter_query_parameters=['pnsdk', 'l_cg', 'l_pres'], - match_on=['method', 'scheme', 'host', 'port', 'string_list_in_path', 'string_list_in_query'], -) +# @pn_vcr.use_cassette( +# 'tests/integrational/fixtures/asyncio/subscription/unsubscribe_all.yaml', +# filter_query_parameters=['pnsdk', 'l_cg', 'l_pres'], +# match_on=['method', 'scheme', 'host', 'port', 'string_list_in_path', 'string_list_in_query'], +# ) @pytest.mark.asyncio -async def test_unsubscribe_all(event_loop, sleeper=asyncio.sleep): - pubnub = PubNubAsyncio(pnconf_sub_copy(), custom_event_loop=event_loop) - - pubnub.config.uuid = "test-subscribe-asyncio-messenger" +async def test_unsubscribe_all(): + config = pnconf_env_copy(enable_subscribe=True, uuid="test-subscribe-asyncio-messenger") + pubnub = PubNubAsyncio(config) ch = "test-subscribe-asyncio-unsubscribe-all-ch" ch1 = "test-subscribe-asyncio-unsubscribe-all-ch1" @@ -358,7 +370,7 @@ async def test_unsubscribe_all(event_loop, sleeper=asyncio.sleep): envelope = await pubnub.add_channel_to_channel_group().channel_group(gr2).channels(ch).future() assert envelope.status.original_response['status'] == 200 - await sleeper(1) + await asyncio.sleep(1) callback_messages = VCR599Listener(1) pubnub.add_listener(callback_messages) @@ -370,8 +382,9 @@ async def test_unsubscribe_all(event_loop, sleeper=asyncio.sleep): assert len(pubnub.get_subscribed_channel_groups()) == 2 pubnub.unsubscribe_all() - - await callback_messages.wait_for_disconnect() + # with EE you don't have to wait for disconnect + if isinstance(pubnub._subscription_manager, AsyncioSubscriptionManager): + await callback_messages.wait_for_disconnect() assert len(pubnub.get_subscribed_channels()) == 0 assert len(pubnub.get_subscribed_channel_groups()) == 0 diff --git a/tests/integrational/asyncio/test_unsubscribe_status.py b/tests/integrational/asyncio/test_unsubscribe_status.py index 1ca0fa86..95ed40a3 100644 --- a/tests/integrational/asyncio/test_unsubscribe_status.py +++ b/tests/integrational/asyncio/test_unsubscribe_status.py @@ -10,7 +10,7 @@ from pubnub.pubnub_asyncio import PubNubAsyncio from tests.helper import pnconf_pam_copy -from tests.integrational.vcr_helper import pn_vcr +# from tests.integrational.vcr_helper import pn_vcr pn.set_stream_logger('pubnub', logging.DEBUG) @@ -26,9 +26,12 @@ def presence(self, pubnub, presence): pass def status(self, pubnub, status): - if status.operation == PNOperationType.PNUnsubscribeOperation: - if status.category == PNStatusCategory.PNAccessDeniedCategory: - self.access_denied_event.set() + disconnected = PNStatusCategory.PNDisconnectedCategory + denied = status.operation == PNOperationType.PNUnsubscribeOperation and \ + status.category == PNStatusCategory.PNAccessDeniedCategory + + if disconnected or denied: + self.access_denied_event.set() class ReconnectedListener(SubscribeCallback): @@ -42,24 +45,27 @@ def presence(self, pubnub, presence): pass def status(self, pubnub, status): - if status.operation == PNOperationType.PNUnsubscribeOperation: - if status.category == PNStatusCategory.PNReconnectedCategory: - self.reconnected_event.set() + disconnected = PNStatusCategory.PNDisconnectedCategory + denied = status.operation == PNOperationType.PNUnsubscribeOperation and \ + status.category == PNStatusCategory.PNAccessDeniedCategory + + if disconnected or denied: + self.access_denied_event.set() -@pn_vcr.use_cassette( - 'tests/integrational/fixtures/asyncio/subscription/access_denied_unsubscribe_operation.yaml', - filter_query_parameters=['pnsdk', 'l_cg', 'l_pres'], - match_on=['method', 'scheme', 'host', 'port', 'string_list_in_path', 'string_list_in_query'], -) +# @pn_vcr.use_cassette( +# 'tests/integrational/fixtures/asyncio/subscription/access_denied_unsubscribe_operation.yaml', +# filter_query_parameters=['pnsdk', 'l_cg', 'l_pres'], +# match_on=['method', 'scheme', 'host', 'port', 'string_list_in_path', 'string_list_in_query'], +# ) @pytest.mark.asyncio -async def test_access_denied_unsubscribe_operation(event_loop): +async def test_access_denied_unsubscribe_operation(): channel = "not-permitted-channel" pnconf = pnconf_pam_copy() pnconf.secret_key = None pnconf.enable_subscribe = True - pubnub = PubNubAsyncio(pnconf, custom_event_loop=event_loop) + pubnub = PubNubAsyncio(pnconf) callback = AccessDeniedListener() pubnub.add_listener(callback) diff --git a/tests/integrational/asyncio/test_where_now.py b/tests/integrational/asyncio/test_where_now.py index 2fab55a1..c2eecbc5 100644 --- a/tests/integrational/asyncio/test_where_now.py +++ b/tests/integrational/asyncio/test_where_now.py @@ -2,21 +2,19 @@ import pytest from pubnub.models.consumer.presence import PNWhereNowResult -from pubnub.pubnub_asyncio import PubNubAsyncio +from pubnub.pubnub_asyncio import AsyncioSubscriptionManager, PubNubAsyncio from tests.helper import pnconf_sub_copy, pnconf_pam_copy -from tests.integrational.vcr_asyncio_sleeper import get_sleeper, VCR599Listener -from tests.integrational.vcr_helper import pn_vcr +from tests.integrational.vcr_asyncio_sleeper import VCR599Listener -@get_sleeper('tests/integrational/fixtures/asyncio/where_now/single_channel.yaml') -@pn_vcr.use_cassette( - 'tests/integrational/fixtures/asyncio/where_now/single_channel.yaml', - filter_query_parameters=['uuid', 'pnsdk']) +# @pn_vcr.use_cassette( +# 'tests/integrational/fixtures/asyncio/where_now/single_channel.yaml', +# filter_query_parameters=['uuid', 'pnsdk']) @pytest.mark.asyncio -async def test_single_channel(event_loop, sleeper=asyncio.sleep): - pubnub = PubNubAsyncio(pnconf_sub_copy(), custom_event_loop=event_loop) +async def test_single_channel(): + pubnub = PubNubAsyncio(pnconf_sub_copy()) ch = 'test-where-now-asyncio-ch' - uuid = 'test-where-now-asyncio-uuid' + uuid = 'test-where-now-asyncio-uuid-single_chanel' pubnub.config.uuid = uuid callback = VCR599Listener(1) @@ -25,7 +23,7 @@ async def test_single_channel(event_loop, sleeper=asyncio.sleep): await callback.wait_for_connect() - await sleeper(2) + await asyncio.sleep(2) env = await pubnub.where_now() \ .uuid(uuid) \ @@ -37,24 +35,24 @@ async def test_single_channel(event_loop, sleeper=asyncio.sleep): assert channels[0] == ch pubnub.unsubscribe().channels(ch).execute() - await callback.wait_for_disconnect() + if isinstance(pubnub._subscription_manager, AsyncioSubscriptionManager): + await callback.wait_for_disconnect() await pubnub.stop() -@get_sleeper('tests/integrational/fixtures/asyncio/where_now/multiple_channels.yaml') -@pn_vcr.use_cassette( - 'tests/integrational/fixtures/asyncio/where_now/multiple_channels.yaml', - filter_query_parameters=['pnsdk'], - match_on=['method', 'scheme', 'host', 'port', 'string_list_in_path', 'query'], -) +# @pn_vcr.use_cassette( +# 'tests/integrational/fixtures/asyncio/where_now/multiple_channels.yaml', +# filter_query_parameters=['pnsdk'], +# match_on=['method', 'scheme', 'host', 'port', 'string_list_in_path', 'query'], +# ) @pytest.mark.asyncio -async def test_multiple_channels(event_loop, sleeper=asyncio.sleep): - pubnub = PubNubAsyncio(pnconf_sub_copy(), custom_event_loop=event_loop) +async def test_multiple_channels(): + pubnub = PubNubAsyncio(pnconf_sub_copy()) ch1 = 'test-where-now-asyncio-ch1' ch2 = 'test-where-now-asyncio-ch2' - uuid = 'test-where-now-asyncio-uuid' + uuid = 'test-where-now-asyncio-uuid-multiple_channels' pubnub.config.uuid = uuid callback = VCR599Listener(1) @@ -63,7 +61,7 @@ async def test_multiple_channels(event_loop, sleeper=asyncio.sleep): await callback.wait_for_connect() - await sleeper(7) + await asyncio.sleep(4) env = await pubnub.where_now() \ .uuid(uuid) \ @@ -76,7 +74,8 @@ async def test_multiple_channels(event_loop, sleeper=asyncio.sleep): assert ch2 in channels pubnub.unsubscribe().channels([ch1, ch2]).execute() - await callback.wait_for_disconnect() + if isinstance(pubnub._subscription_manager, AsyncioSubscriptionManager): + await callback.wait_for_disconnect() await pubnub.stop()