Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify the publicly exported sync/async daemon-stopping flag #757

Merged
merged 2 commits into from
May 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/kwargs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ Resource daemon kwargs
Stop-flag
---------

The daemons also have ``stopped``. It is a flag object for sync daemons
to check if they should stop. See also: `DaemonStopperChecker`.
Daemons also have ``stopped``. It is a flag object for sync & async daemons
(mostly, sync) to check if they should stop. See also: `DaemonStopped`.

To check, ``.is_set()`` method can be called, or the object itself can be used
as a boolean expression: e.g. ``while not stopped: ...``.
Expand Down
2 changes: 1 addition & 1 deletion examples/11-filtering-handlers/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def update_with_field_change_satisfied(logger, **kwargs):


@kopf.daemon('kopfexamples', field='spec.field', value='value')
def daemon_with_field(stopped, logger, **kwargs):
def daemon_with_field(stopped: kopf.DaemonStopped, logger, **kwargs):
while not stopped:
logger.info("Field daemon is satisfied.")
stopped.wait(1)
2 changes: 1 addition & 1 deletion examples/14-daemons/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# Sync daemons in threads are non-interruptable, they must check for the `stopped` flag.
# This daemon exits after 3 attempts and then 30 seconds of running (unless stopped).
@kopf.daemon('kopfexamples', backoff=3)
def background_sync(spec, stopped, logger, retry, patch, **_):
def background_sync(stopped: kopf.DaemonStopped, spec, logger, retry, patch, **_):
if retry < 3:
patch.status['message'] = f"Failed {retry+1} times."
raise kopf.TemporaryError("Simulated failure.", delay=1)
Expand Down
12 changes: 7 additions & 5 deletions kopf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@
Patch,
)
from kopf.structs.primitives import (
DaemonStopped,
DaemonStoppingReason,
SyncDaemonStopperChecker,
AsyncDaemonStopperChecker,
SyncDaemonStopperChecker, # deprecated
AsyncDaemonStopperChecker, # deprecated
)
from kopf.structs.references import (
Resource,
Expand Down Expand Up @@ -218,7 +219,6 @@
'StatusProgressStorage',
'MultiProgressStorage',
'SmartProgressStorage',
'DaemonStoppingReason',
'RawEventType',
'RawEvent',
'RawBody',
Expand All @@ -243,7 +243,9 @@
'HandlerId',
'Reason',
'Patch',
'SyncDaemonStopperChecker',
'AsyncDaemonStopperChecker',
'DaemonStopped',
'DaemonStoppingReason',
'SyncDaemonStopperChecker', # deprecated
'AsyncDaemonStopperChecker', # deprecated
'Resource', 'EVERYTHING',
]
4 changes: 2 additions & 2 deletions kopf/reactor/causation.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,11 @@ def _kwargs(self) -> Mapping[str, Any]:

@property
def _sync_kwargs(self) -> Mapping[str, Any]:
return dict(super()._sync_kwargs, stopped=self.stopper.sync_checker)
return dict(super()._sync_kwargs, stopped=self.stopper.sync_waiter)

@property
def _async_kwargs(self) -> Mapping[str, Any]:
return dict(super()._async_kwargs, stopped=self.stopper.async_checker)
return dict(super()._async_kwargs, stopped=self.stopper.async_waiter)


def detect_watching_cause(
Expand Down
18 changes: 9 additions & 9 deletions kopf/reactor/daemons.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ async def _runner(

# Prevent future re-spawns for those exited on their own, for no reason.
# Only the filter-mismatching or peering-pausing daemons can be re-spawned.
if stopper.reason == primitives.DaemonStoppingReason.NONE:
if stopper.reason is None:
memory.forever_stopped.add(handler.id)

# Save the memory by not remembering the exited daemons (they may be never re-spawned).
Expand Down Expand Up @@ -414,7 +414,7 @@ async def _daemon(
body = cause.body

if handler.initial_delay is not None:
await primitives.sleep_or_wait(handler.initial_delay, cause.stopper)
await primitives.sleep_or_wait(handler.initial_delay, cause.stopper.async_event)

# Similar to activities (in-memory execution), but applies patches on every attempt.
state = states.State.from_scratch().with_handlers([handler])
Expand All @@ -434,7 +434,7 @@ async def _daemon(

# The in-memory sleep does not react to resource changes, but only to stopping.
if state.delay:
await primitives.sleep_or_wait(state.delay, cause.stopper)
await primitives.sleep_or_wait(state.delay, cause.stopper.async_event)

if stopper.is_set():
logger.debug(f"{handler} has exited on request and will not be retried or restarted.")
Expand Down Expand Up @@ -477,7 +477,7 @@ async def _timer(
body = cause.body

if handler.initial_delay is not None:
await primitives.sleep_or_wait(handler.initial_delay, stopper)
await primitives.sleep_or_wait(handler.initial_delay, wakeup=stopper.async_event)

# Similar to activities (in-memory execution), but applies patches on every attempt.
state = states.State.from_scratch().with_handlers([handler])
Expand All @@ -493,7 +493,7 @@ async def _timer(
if handler.idle is not None:
while not stopper.is_set() and time.monotonic() - memory.idle_reset_time < handler.idle:
delay = memory.idle_reset_time + handler.idle - time.monotonic()
await primitives.sleep_or_wait(delay, stopper)
await primitives.sleep_or_wait(delay, wakeup=stopper.async_event)
if stopper.is_set():
continue

Expand All @@ -516,27 +516,27 @@ async def _timer(
# For temporary errors, override the schedule by the one provided by errors themselves.
# It can be either a delay from TemporaryError, or a backoff for an arbitrary exception.
if not state.done:
await primitives.sleep_or_wait(state.delays, stopper)
await primitives.sleep_or_wait(state.delays, wakeup=stopper.async_event)

# For sharp timers, calculate how much time is left to fit the interval grid:
# |-----|-----|-----|-----|-----|-----|---> (interval=5, sharp=True)
# [slow_handler]....[slow_handler]....[slow...
elif handler.interval is not None and handler.sharp:
passed_duration = time.monotonic() - started
remaining_delay = handler.interval - (passed_duration % handler.interval)
await primitives.sleep_or_wait(remaining_delay, stopper)
await primitives.sleep_or_wait(remaining_delay, wakeup=stopper.async_event)

# For regular (non-sharp) timers, simply sleep from last exit to the next call:
# |-----|-----|-----|-----|-----|-----|---> (interval=5, sharp=False)
# [slow_handler].....[slow_handler].....[slow...
elif handler.interval is not None:
await primitives.sleep_or_wait(handler.interval, stopper)
await primitives.sleep_or_wait(handler.interval, wakeup=stopper.async_event)

# For idle-only no-interval timers, wait till the next change (i.e. idling reset).
# NB: This will skip the handler in the same tact (1/64th of a second) even if changed.
elif handler.idle is not None:
while memory.idle_reset_time <= started:
await primitives.sleep_or_wait(handler.idle, stopper)
await primitives.sleep_or_wait(handler.idle, wakeup=stopper.async_event)

# Only in case there are no intervals and idling, treat it as a one-shot handler.
# This makes the handler practically meaningless, but technically possible.
Expand Down
2 changes: 1 addition & 1 deletion kopf/reactor/effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async def throttled(
*,
throttler: containers.Throttler,
delays: Iterable[float],
wakeup: Optional[Union[asyncio.Event, primitives.DaemonStopper]] = None,
wakeup: Optional[asyncio.Event] = None,
logger: Union[logging.Logger, logging.LoggerAdapter],
errors: Union[Type[BaseException], Tuple[Type[BaseException], ...]] = Exception,
) -> AsyncGenerator[bool, None]:
Expand Down
2 changes: 1 addition & 1 deletion kopf/structs/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@

DaemonFn = Callable[
[
NamedArg(primitives.SyncAsyncDaemonStopperChecker, "stopped"),
NamedArg(primitives.DaemonStopped, "stopped"),
NamedArg(int, "retry"),
NamedArg(datetime.datetime, "started"),
NamedArg(datetime.timedelta, "runtime"),
Expand Down
Loading