Skip to content

Commit

Permalink
CI: Add typed_flaky to streaming_pull system tests (#895)
Browse files Browse the repository at this point in the history
  • Loading branch information
acocuzzo authored Apr 5, 2023
1 parent ee2ea73 commit 0b51b1b
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 24 deletions.
1 change: 1 addition & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
]
SYSTEM_TEST_EXTERNAL_DEPENDENCIES = [
"psutil",
"flaky",
]
SYSTEM_TEST_LOCAL_DEPENDENCIES = []
SYSTEM_TEST_DEPENDENCIES = []
Expand Down
2 changes: 1 addition & 1 deletion owlbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@
versions=gcp.common.detect_versions(path="./google", default_first=True),
unit_test_python_versions=["3.7", "3.8", "3.9", "3.10", "3.11"],
system_test_python_versions=["3.10"],
system_test_external_dependencies=["psutil"],
system_test_external_dependencies=["psutil","flaky"],
)
s.move(templated_files, excludes=[".coveragerc", ".github/release-please.yml", "README.rst", "docs/index.rst"])

Expand Down
119 changes: 96 additions & 23 deletions tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import sys
import threading
import time
from typing import Any, Callable, cast, TypeVar

# special case python < 3.8
if sys.version_info.major == 3 and sys.version_info.minor < 8:
import mock
else:
from unittest import mock

from flaky import flaky
import pytest

import google.auth
Expand All @@ -43,6 +45,9 @@

from test_utils.system import unique_resource_id

C = TypeVar("C", bound=Callable[..., Any])
typed_flaky = cast(Callable[[C], C], flaky(max_runs=3, min_passes=1))


@pytest.fixture(scope="module")
def project():
Expand All @@ -61,13 +66,13 @@ def subscriber(request):


@pytest.fixture
def topic_path(project, publisher):
def topic_path_base(project, publisher):
topic_name = "t" + unique_resource_id("-")
yield publisher.topic_path(project, topic_name)


@pytest.fixture
def subscription_path(project, subscriber):
def subscription_path_base(project, subscriber):
sub_name = "s" + unique_resource_id("-")
yield subscriber.subscription_path(project, sub_name)

Expand All @@ -82,7 +87,9 @@ def cleanup():
to_call(*args, **kwargs)


def test_publish_messages(publisher, topic_path, cleanup):
def test_publish_messages(publisher, topic_path_base, cleanup):
# Customize topic path to test.
topic_path = topic_path_base + "-publish-messages"
# Make sure the topic gets deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))

Expand All @@ -100,7 +107,9 @@ def test_publish_messages(publisher, topic_path, cleanup):
assert isinstance(result, str)


def test_publish_large_messages(publisher, topic_path, cleanup):
def test_publish_large_messages(publisher, topic_path_base, cleanup):
# Customize topic path to test.
topic_path = topic_path_base + "-publish-large-messages"
# Make sure the topic gets deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))

Expand Down Expand Up @@ -130,8 +139,11 @@ def test_publish_large_messages(publisher, topic_path, cleanup):


def test_subscribe_to_messages(
publisher, topic_path, subscriber, subscription_path, cleanup
publisher, topic_path_base, subscriber, subscription_path_base, cleanup
):
# Customize topic path to test.
topic_path = topic_path_base + "-subscribe-to-messages"
subscription_path = subscription_path_base + "-subscribe-to-messages"
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand Down Expand Up @@ -175,8 +187,12 @@ def test_subscribe_to_messages(


def test_subscribe_to_messages_async_callbacks(
publisher, topic_path, subscriber, subscription_path, cleanup
publisher, topic_path_base, subscriber, subscription_path_base, cleanup
):
# Customize topic path to test.
custom_str = "-subscribe-to-messages-async-callback"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand Down Expand Up @@ -227,8 +243,12 @@ def test_subscribe_to_messages_async_callbacks(


def test_creating_subscriptions_with_non_default_settings(
publisher, subscriber, project, topic_path, subscription_path, cleanup
publisher, subscriber, project, topic_path_base, subscription_path_base, cleanup
):
# Customize topic path to test.
custom_str = "-creating-subscriptions-with-non-default-settings"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand Down Expand Up @@ -346,7 +366,8 @@ def test_listing_topic_subscriptions(publisher, subscriber, project, cleanup):
assert subscriptions == {subscription_paths[0], subscription_paths[2]}


def test_managing_topic_iam_policy(publisher, topic_path, cleanup):
def test_managing_topic_iam_policy(publisher, topic_path_base, cleanup):
topic_path = topic_path_base + "-managing-topic-iam-policy"
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))

# create a topic and customize its policy
Expand Down Expand Up @@ -375,8 +396,11 @@ def test_managing_topic_iam_policy(publisher, topic_path, cleanup):


def test_managing_subscription_iam_policy(
publisher, subscriber, topic_path, subscription_path, cleanup
publisher, subscriber, topic_path_base, subscription_path_base, cleanup
):
custom_str = "-managing-subscription-iam-policy"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand Down Expand Up @@ -410,7 +434,7 @@ def test_managing_subscription_iam_policy(


def test_subscriber_not_leaking_open_sockets(
publisher, topic_path, subscription_path, cleanup
publisher, topic_path_base, subscription_path_base, cleanup
):
# Make sure the topic and the supscription get deleted.
# NOTE: Since subscriber client will be closed in the test, we should not
Expand All @@ -419,8 +443,12 @@ def test_subscriber_not_leaking_open_sockets(
# Also, since the client will get closed, we need another subscriber client
# to clean up the subscription. We also need to make sure that auxiliary
# subscriber releases the sockets, too.
custom_str = "-not-leaking-open-sockets"
subscription_path = subscription_path_base + custom_str
topic_path = topic_path_base + custom_str
subscriber = pubsub_v1.SubscriberClient(transport="grpc")
subscriber_2 = pubsub_v1.SubscriberClient(transport="grpc")

cleanup.append(
(subscriber_2.delete_subscription, (), {"subscription": subscription_path})
)
Expand Down Expand Up @@ -460,8 +488,11 @@ def test_subscriber_not_leaking_open_sockets(


def test_synchronous_pull_no_deadline_error_if_no_messages(
publisher, topic_path, subscriber, subscription_path, cleanup
publisher, topic_path_base, subscriber, subscription_path_base, cleanup
):
custom_str = "-synchronous-pull-deadline-error-if-no-messages"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand All @@ -485,8 +516,11 @@ def test_synchronous_pull_no_deadline_error_if_no_messages(

class TestStreamingPull(object):
def test_streaming_pull_callback_error_propagation(
self, publisher, topic_path, subscriber, subscription_path, cleanup
self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup
):
custom_str = "-streaming-pull-callback-error-propagation"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand All @@ -512,9 +546,19 @@ class CallbackError(Exception):
with pytest.raises(CallbackError):
future.result(timeout=30)

@typed_flaky
def test_streaming_pull_ack_deadline(
self, publisher, subscriber, project, topic_path, subscription_path, cleanup
self,
publisher,
subscriber,
project,
topic_path_base,
subscription_path_base,
cleanup,
):
custom_str = "-streaming-pull-ack-deadline"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand Down Expand Up @@ -564,8 +608,11 @@ def test_streaming_pull_ack_deadline(
subscription_future.cancel()

def test_streaming_pull_max_messages(
self, publisher, topic_path, subscriber, subscription_path, cleanup
self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup
):
custom_str = "-streaming-pull-max-messages"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand Down Expand Up @@ -619,9 +666,13 @@ def test_streaming_pull_max_messages(
finally:
subscription_future.cancel() # trigger clean shutdown

@typed_flaky
def test_streaming_pull_blocking_shutdown(
self, publisher, topic_path, subscriber, subscription_path, cleanup
self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup
):
custom_str = "-streaming-pull-blocking-shutdown"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand Down Expand Up @@ -702,9 +753,11 @@ def callback2(message):
)
class TestBasicRBAC(object):
def test_streaming_pull_subscriber_permissions_sufficient(
self, publisher, topic_path, subscriber, subscription_path, cleanup
self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup
):

custom_str = "-streaming-pull-subscriber-permissions-sufficient"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand Down Expand Up @@ -739,9 +792,11 @@ def test_streaming_pull_subscriber_permissions_sufficient(
future.cancel()

def test_publisher_role_can_publish_messages(
self, publisher, topic_path, subscriber, subscription_path, cleanup
self, publisher, topic_path_base, subscriber, subscription_path_base, cleanup
):

custom_str = "-publisher-role-can-publish-messages"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
Expand All @@ -767,8 +822,17 @@ def test_publisher_role_can_publish_messages(
"Snapshot creation is not instant on the backend, causing test falkiness."
)
def test_snapshot_seek_subscriber_permissions_sufficient(
self, project, publisher, topic_path, subscriber, subscription_path, cleanup
self,
project,
publisher,
topic_path_base,
subscriber,
subscription_path_base,
cleanup,
):
custom_str = "-snapshot-seek-subscriber-permissions-sufficient"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
snapshot_name = "snap" + unique_resource_id("-")
snapshot_path = "projects/{}/snapshots/{}".format(project, snapshot_name)

Expand Down Expand Up @@ -813,10 +877,10 @@ def test_snapshot_seek_subscriber_permissions_sufficient(
assert len(response.received_messages) == 1

def test_viewer_role_can_list_resources(
self, project, publisher, topic_path, subscriber, cleanup
self, project, publisher, topic_path_base, subscriber, cleanup
):
project_path = "projects/" + project

topic_path = topic_path_base + "-viewer-role-can-list-resources"
# Make sure the created topic gets deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))

Expand Down Expand Up @@ -844,8 +908,17 @@ def test_viewer_role_can_list_resources(
next(iter(viewer_only_subscriber.list_snapshots(project=project_path)), None)

def test_editor_role_can_create_resources(
self, project, publisher, topic_path, subscriber, subscription_path, cleanup
self,
project,
publisher,
topic_path_base,
subscriber,
subscription_path_base,
cleanup,
):
custom_str = "-editor-role-can-create-resources"
topic_path = topic_path_base + custom_str
subscription_path = subscription_path_base + custom_str
snapshot_name = "snap" + unique_resource_id("-")
snapshot_path = "projects/{}/snapshots/{}".format(project, snapshot_name)

Expand Down

0 comments on commit 0b51b1b

Please sign in to comment.