Skip to content

Commit

Permalink
fix: PubSub incompatibility with api-core 1.17.0+ (googleapis#103)
Browse files Browse the repository at this point in the history
* fix: disable pre-fetching first streaming pull item

* Remove api-core version cap, but ban 1.17.0 release

* Regenerate gapic layer with synth

* Revert "Regenerate gapic layer with synth"

This reverts commit 1d24853.

* Retain only the relevant fix in generated code

* Exclude multiple incompatible api-core versions

* Fix syntax error in synth.py

* Ban all bugfix versions of problematic api-core minor versions
  • Loading branch information
plamut authored Jun 9, 2020
1 parent 6c7677e commit c02060f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 3 deletions.
5 changes: 5 additions & 0 deletions google/cloud/pubsub_v1/gapic/subscriber_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,11 @@ def streaming_pull(
client_info=self._client_info,
)

# Wrappers in api-core should not automatically pre-fetch the first
# stream result, as this breaks the stream when re-opening it.
# https://github.com/googleapis/python-pubsub/issues/93#issuecomment-630762257
self.transport.streaming_pull._prefetch_first_result_ = False

return self._inner_api_calls["streaming_pull"](
requests, retry=retry, timeout=timeout, metadata=metadata
)
Expand Down
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
# 'Development Status :: 5 - Production/Stable'
release_status = "Development Status :: 5 - Production/Stable"
dependencies = [
# google-api-core[grpc] 1.17.0 causes problems, thus restricting its
# version until the issue gets fixed.
# google-api-core[grpc] 1.17.0 up to 1.19.1 causes problems with stream
# recovery, thus those versions should not be used.
# https://github.com/googleapis/python-pubsub/issues/74
"google-api-core[grpc] >= 1.14.0, < 1.17.0",
"google-api-core[grpc] >= 1.14.0, != 1.17.*, != 1.18.*, != 1.19.*",
"grpc-google-iam-v1 >= 0.12.3, < 0.13dev",
'enum34; python_version < "3.4"',
]
Expand Down
14 changes: 14 additions & 0 deletions synth.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,20 @@ def _merge_dict(d1, d2):
"from google.iam.v1 import iam_policy_pb2_grpc as iam_policy_pb2",
)

# Monkey patch the streaming_pull() GAPIC method to disable pre-fetching stream
# results.
s.replace(
"google/cloud/pubsub_v1/gapic/subscriber_client.py",
r"return self\._inner_api_calls\['streaming_pull'\]\(.*",
"""
# Wrappers in api-core should not automatically pre-fetch the first
# stream result, as this breaks the stream when re-opening it.
# https://github.com/googleapis/python-pubsub/issues/93#issuecomment-630762257
self.transport.streaming_pull._prefetch_first_result_ = False
\g<0>"""
)

# Add missing blank line before Attributes: in generated docstrings
# https://github.com/googleapis/protoc-docs-plugin/pull/31
s.replace(
Expand Down
12 changes: 12 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_subscriber_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,15 @@ def test_closes_channel_as_context_manager():
pass

mock_transport.channel.close.assert_called()


def test_streaming_pull_gapic_monkeypatch():
transport = mock.NonCallableMock(spec=["streaming_pull"])
transport.streaming_pull = mock.Mock(spec=[])
client = subscriber.Client(transport=transport)

client.streaming_pull(requests=iter([]))

assert client.api.transport is transport
assert hasattr(transport.streaming_pull, "_prefetch_first_result_")
assert not transport.streaming_pull._prefetch_first_result_

0 comments on commit c02060f

Please sign in to comment.