Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: set stream_ack_deadline to max_duration_per_lease_extension or 60 s, set ack_deadline to min_duration_per_lease_extension or 10 s #760

Merged
merged 3 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import logging
import threading
import typing
from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple, Union
from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple
import uuid

import grpc # type: ignore
Expand Down Expand Up @@ -73,6 +73,13 @@
"""The minimum ack_deadline, in seconds, for when exactly_once is enabled for
a subscription. We do this to reduce premature ack expiration.
"""
_DEFAULT_STREAM_ACK_DEADLINE = 60
"""The default stream ack deadline, in seconds."""

_MAX_STREAM_ACK_DEADLINE = 600
""""""
_MIN_STREAM_ACK_DEADLINE = 10
""""""

_EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS = {
code_pb2.DEADLINE_EXCEEDED,
Expand Down Expand Up @@ -270,7 +277,36 @@ def __init__(
self._await_callbacks_on_shutdown = await_callbacks_on_shutdown
self._ack_histogram = histogram.Histogram()
self._last_histogram_size = 0
self._ack_deadline: Union[int, float] = histogram.MIN_ACK_DEADLINE

# If max_duration_per_lease_extension is the default
# we set the stream_ack_deadline to the default of 60
if self._flow_control.max_duration_per_lease_extension == 0:
self._stream_ack_deadline = _DEFAULT_STREAM_ACK_DEADLINE
# We will not be able to extend more than the default minimum
elif (
self._flow_control.max_duration_per_lease_extension
< _MIN_STREAM_ACK_DEADLINE
):
self._stream_ack_deadline = _MIN_STREAM_ACK_DEADLINE
# Will not be able to extend past the max
elif (
self._flow_control.max_duration_per_lease_extension
> _MAX_STREAM_ACK_DEADLINE
):
self._stream_ack_deadline = _MAX_STREAM_ACK_DEADLINE
else:
self._stream_ack_deadline = (
self._flow_control.max_duration_per_lease_extension
)

self._ack_deadline = max(
min(
self._flow_control.min_duration_per_lease_extension,
histogram.MAX_ACK_DEADLINE,
),
histogram.MIN_ACK_DEADLINE,
)

self._rpc: Optional[bidi.ResumableBidiRpc] = None
self._callback: Optional[functools.partial] = None
self._closing = threading.Lock()
Expand Down Expand Up @@ -741,10 +777,10 @@ def heartbeat(self) -> bool:

if send_new_ack_deadline:
request = gapic_types.StreamingPullRequest(
stream_ack_deadline_seconds=self.ack_deadline
stream_ack_deadline_seconds=self._stream_ack_deadline
)
_LOGGER.debug(
"Sending new ack_deadline of %d seconds.", self.ack_deadline
"Sending new ack_deadline of %d seconds.", self._stream_ack_deadline
)
else:
request = gapic_types.StreamingPullRequest()
Expand Down Expand Up @@ -796,7 +832,7 @@ def open(

_LOGGER.debug(
"Creating a stream, default ACK deadline set to {} seconds.".format(
stream_ack_deadline_seconds
self._stream_ack_deadline
)
)

Expand Down Expand Up @@ -928,6 +964,8 @@ def _get_initial_request(
suitable for any other purpose).
"""
# Put the request together.
# We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt
# anyway. Set to some big-ish value in case we modack late.
request = gapic_types.StreamingPullRequest(
stream_ack_deadline_seconds=stream_ack_deadline_seconds,
modify_deadline_ack_ids=[],
Expand Down
70 changes: 61 additions & 9 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,61 @@ def test_constructor_and_default_state():
assert manager._client_id is not None


def test_constructor_with_options():
def test_constructor_with_default_options():
flow_control_ = types.FlowControl()
manager = streaming_pull_manager.StreamingPullManager(
mock.sentinel.client,
mock.sentinel.subscription,
flow_control=mock.sentinel.flow_control,
flow_control=flow_control_,
scheduler=mock.sentinel.scheduler,
)

assert manager.flow_control == mock.sentinel.flow_control
assert manager.flow_control == flow_control_
assert manager._scheduler == mock.sentinel.scheduler
assert manager._ack_deadline == 10
assert manager._stream_ack_deadline == 60


def test_constructor_with_min_and_max_duration_per_lease_extension_():
flow_control_ = types.FlowControl(
min_duration_per_lease_extension=15, max_duration_per_lease_extension=20
)
manager = streaming_pull_manager.StreamingPullManager(
mock.sentinel.client,
mock.sentinel.subscription,
flow_control=flow_control_,
scheduler=mock.sentinel.scheduler,
)
assert manager._ack_deadline == 15
assert manager._stream_ack_deadline == 20


def test_constructor_with_min_duration_per_lease_extension_too_low():
flow_control_ = types.FlowControl(
min_duration_per_lease_extension=9, max_duration_per_lease_extension=9
)
manager = streaming_pull_manager.StreamingPullManager(
mock.sentinel.client,
mock.sentinel.subscription,
flow_control=flow_control_,
scheduler=mock.sentinel.scheduler,
)
assert manager._ack_deadline == 10
assert manager._stream_ack_deadline == 10


def test_constructor_with_max_duration_per_lease_extension_too_high():
flow_control_ = types.FlowControl(
max_duration_per_lease_extension=601, min_duration_per_lease_extension=601
)
manager = streaming_pull_manager.StreamingPullManager(
mock.sentinel.client,
mock.sentinel.subscription,
flow_control=flow_control_,
scheduler=mock.sentinel.scheduler,
)
assert manager._ack_deadline == 600
assert manager._stream_ack_deadline == 600


def make_manager(**kwargs):
Expand Down Expand Up @@ -164,9 +209,13 @@ def test__obtain_ack_deadline_no_custom_flow_control_setting():
manager._flow_control = types.FlowControl(
min_duration_per_lease_extension=0, max_duration_per_lease_extension=0
)
assert manager._stream_ack_deadline == 60
assert manager._ack_deadline == 10
assert manager._obtain_ack_deadline(maybe_update=False) == 10

deadline = manager._obtain_ack_deadline(maybe_update=True)
assert deadline == histogram.MIN_ACK_DEADLINE
assert manager._stream_ack_deadline == 60

# When we get some historical data, the deadline is adjusted.
manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 2)
Expand All @@ -186,11 +235,14 @@ def test__obtain_ack_deadline_with_max_duration_per_lease_extension():
manager._flow_control = types.FlowControl(
max_duration_per_lease_extension=histogram.MIN_ACK_DEADLINE + 1
)
assert manager._ack_deadline == 10

manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 3) # make p99 value large

# The deadline configured in flow control should prevail.
deadline = manager._obtain_ack_deadline(maybe_update=True)
assert deadline == histogram.MIN_ACK_DEADLINE + 1
assert manager._stream_ack_deadline == 60


def test__obtain_ack_deadline_with_min_duration_per_lease_extension():
Expand Down Expand Up @@ -292,12 +344,12 @@ def test__obtain_ack_deadline_no_value_update():

def test_client_id():
manager1 = make_manager()
request1 = manager1._get_initial_request(stream_ack_deadline_seconds=10)
request1 = manager1._get_initial_request(stream_ack_deadline_seconds=60)
client_id_1 = request1.client_id
assert client_id_1

manager2 = make_manager()
request2 = manager2._get_initial_request(stream_ack_deadline_seconds=10)
request2 = manager2._get_initial_request(stream_ack_deadline_seconds=60)
client_id_2 = request2.client_id
assert client_id_2

Expand All @@ -308,7 +360,7 @@ def test_streaming_flow_control():
manager = make_manager(
flow_control=types.FlowControl(max_messages=10, max_bytes=1000)
)
request = manager._get_initial_request(stream_ack_deadline_seconds=10)
request = manager._get_initial_request(stream_ack_deadline_seconds=60)
assert request.max_outstanding_messages == 10
assert request.max_outstanding_bytes == 1000

Expand All @@ -318,7 +370,7 @@ def test_streaming_flow_control_use_legacy_flow_control():
flow_control=types.FlowControl(max_messages=10, max_bytes=1000),
use_legacy_flow_control=True,
)
request = manager._get_initial_request(stream_ack_deadline_seconds=10)
request = manager._get_initial_request(stream_ack_deadline_seconds=60)
assert request.max_outstanding_messages == 0
assert request.max_outstanding_bytes == 0

Expand Down Expand Up @@ -1046,12 +1098,12 @@ def test_heartbeat_stream_ack_deadline_seconds(caplog):
result = manager.heartbeat()

manager._rpc.send.assert_called_once_with(
gapic_types.StreamingPullRequest(stream_ack_deadline_seconds=10)
gapic_types.StreamingPullRequest(stream_ack_deadline_seconds=60)
)
assert result
# Set to false after a send is initiated.
assert not manager._send_new_ack_deadline
assert "Sending new ack_deadline of 10 seconds." in caplog.text
assert "Sending new ack_deadline of 60 seconds." in caplog.text


@mock.patch("google.api_core.bidi.ResumableBidiRpc", autospec=True)
Expand Down