Skip to content

Commit

Permalink
Merge pull request #759 from nolar/refactor-handler-execution
Browse files Browse the repository at this point in the history
Decouple & isolate handling from specialised causes & handlers
  • Loading branch information
nolar authored May 10, 2021
2 parents c0b6451 + 7924793 commit daf42e4
Show file tree
Hide file tree
Showing 45 changed files with 670 additions and 612 deletions.
8 changes: 5 additions & 3 deletions kopf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@
AdmissionError,
)
from kopf.reactor.handling import (
Logger,
ErrorsMode,
TemporaryError,
PermanentError,
HandlerTimeoutError,
HandlerRetriesError,
)
from kopf.reactor.subhandling import (
execute,
)
from kopf.reactor.lifecycles import (
Expand Down Expand Up @@ -88,7 +92,6 @@
build_owner_reference,
)
from kopf.structs.callbacks import (
Logger,
not_,
all_,
any_,
Expand Down Expand Up @@ -119,8 +122,7 @@
ABSENT,
PRESENT,
)
from kopf.structs.handlers import (
ErrorsMode,
from kopf.reactor.causation import (
Reason,
)
from kopf.structs.ids import (
Expand Down
8 changes: 4 additions & 4 deletions kopf/engines/probing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

import aiohttp.web

from kopf.reactor import activities, lifecycles, registries
from kopf.structs import callbacks, configuration, ephemera, handlers, ids
from kopf.reactor import activities, causation, handling, lifecycles, registries
from kopf.structs import configuration, ephemera, ids

logger = logging.getLogger(__name__)

Expand All @@ -33,7 +33,7 @@ async def health_reporter(
is cancelled or failed). Once it will stop responding for any reason,
Kubernetes will assume the pod is not alive anymore, and will restart it.
"""
probing_container: MutableMapping[ids.HandlerId, callbacks.Result] = {}
probing_container: MutableMapping[ids.HandlerId, handling.Result] = {}
probing_timestamp: Optional[datetime.datetime] = None
probing_max_age = datetime.timedelta(seconds=10.0)
probing_lock = asyncio.Lock()
Expand All @@ -55,7 +55,7 @@ async def get_health(
lifecycle=lifecycles.all_at_once,
registry=registry,
settings=settings,
activity=handlers.Activity.PROBE,
activity=causation.Activity.PROBE,
indices=indices,
memo=memo,
)
Expand Down
54 changes: 27 additions & 27 deletions kopf/on.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def creation_handler(**kwargs):
# TODO: add cluster=True support (different API methods)
from typing import Any, Callable, Optional, Union

from kopf.reactor import handling, registries
from kopf.reactor import causation, handling, registries, subhandling
from kopf.structs import callbacks, dicts, filters, handlers, references, reviews

ActivityDecorator = Callable[[callbacks.ActivityFn], callbacks.ActivityFn]
Expand All @@ -30,7 +30,7 @@ def startup( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -45,7 +45,7 @@ def decorator( # lgtm[py/similar-function]
handler = handlers.ActivityHandler(
fn=fn, id=real_id, param=param,
errors=errors, timeout=timeout, retries=retries, backoff=backoff,
activity=handlers.Activity.STARTUP,
activity=causation.Activity.STARTUP,
)
real_registry._activities.append(handler)
return fn
Expand All @@ -57,7 +57,7 @@ def cleanup( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -72,7 +72,7 @@ def decorator( # lgtm[py/similar-function]
handler = handlers.ActivityHandler(
fn=fn, id=real_id, param=param,
errors=errors, timeout=timeout, retries=retries, backoff=backoff,
activity=handlers.Activity.CLEANUP,
activity=causation.Activity.CLEANUP,
)
real_registry._activities.append(handler)
return fn
Expand All @@ -84,7 +84,7 @@ def login( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -100,7 +100,7 @@ def decorator( # lgtm[py/similar-function]
handler = handlers.ActivityHandler(
fn=fn, id=real_id, param=param,
errors=errors, timeout=timeout, retries=retries, backoff=backoff,
activity=handlers.Activity.AUTHENTICATION,
activity=causation.Activity.AUTHENTICATION,
)
real_registry._activities.append(handler)
return fn
Expand All @@ -112,7 +112,7 @@ def probe( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -128,7 +128,7 @@ def decorator( # lgtm[py/similar-function]
handler = handlers.ActivityHandler(
fn=fn, id=real_id, param=param,
errors=errors, timeout=timeout, retries=retries, backoff=backoff,
activity=handlers.Activity.PROBE,
activity=causation.Activity.PROBE,
)
real_registry._activities.append(handler)
return fn
Expand Down Expand Up @@ -183,7 +183,7 @@ def decorator( # lgtm[py/similar-function]
errors=None, timeout=None, retries=None, backoff=None, # TODO: add some meaning later
selector=selector, labels=labels, annotations=annotations, when=when,
field=real_field, value=value,
reason=handlers.WebhookType.VALIDATING, operation=operation,
reason=causation.WebhookType.VALIDATING, operation=operation,
persistent=persistent, side_effects=side_effects, ignore_failures=ignore_failures,
)
real_registry._webhooks.append(handler)
Expand Down Expand Up @@ -239,7 +239,7 @@ def decorator( # lgtm[py/similar-function]
errors=None, timeout=None, retries=None, backoff=None, # TODO: add some meaning later
selector=selector, labels=labels, annotations=annotations, when=when,
field=real_field, value=value,
reason=handlers.WebhookType.MUTATING, operation=operation,
reason=causation.WebhookType.MUTATING, operation=operation,
persistent=persistent, side_effects=side_effects, ignore_failures=ignore_failures,
)
real_registry._webhooks.append(handler)
Expand All @@ -263,7 +263,7 @@ def resume( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -320,7 +320,7 @@ def create( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -353,7 +353,7 @@ def decorator( # lgtm[py/similar-function]
selector=selector, labels=labels, annotations=annotations, when=when,
field=real_field, value=value, old=None, new=None, field_needs_change=False,
initial=None, deleted=None, requires_finalizer=None,
reason=handlers.Reason.CREATE,
reason=causation.Reason.CREATE,
)
real_registry._changing.append(handler)
return fn
Expand All @@ -376,7 +376,7 @@ def update( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -411,7 +411,7 @@ def decorator( # lgtm[py/similar-function]
selector=selector, labels=labels, annotations=annotations, when=when,
field=real_field, value=value, old=old, new=new, field_needs_change=True,
initial=None, deleted=None, requires_finalizer=None,
reason=handlers.Reason.UPDATE,
reason=causation.Reason.UPDATE,
)
real_registry._changing.append(handler)
return fn
Expand All @@ -434,7 +434,7 @@ def delete( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -468,7 +468,7 @@ def decorator( # lgtm[py/similar-function]
selector=selector, labels=labels, annotations=annotations, when=when,
field=real_field, value=value, old=None, new=None, field_needs_change=False,
initial=None, deleted=None, requires_finalizer=bool(not optional),
reason=handlers.Reason.DELETE,
reason=causation.Reason.DELETE,
)
real_registry._changing.append(handler)
return fn
Expand All @@ -491,7 +491,7 @@ def field( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -549,7 +549,7 @@ def index( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -653,7 +653,7 @@ def daemon( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -715,7 +715,7 @@ def timer( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -764,7 +764,7 @@ def subhandler( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -814,7 +814,7 @@ def decorator( # lgtm[py/similar-function]
_warn_incompatible_parent_with_oldnew(parent_handler, old, new)
_warn_conflicting_values(field, value, old, new)
_verify_filters(labels, annotations)
real_registry = handling.subregistry_var.get()
real_registry = subhandling.subregistry_var.get()
real_field = dicts.parse_field(field) or None # to not store tuple() as a no-field case.
real_id = registries.generate_id(fn=fn, id=id,
prefix=parent_handler.id if parent_handler else None)
Expand All @@ -838,7 +838,7 @@ def register( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -908,13 +908,13 @@ def _warn_conflicting_values(


def _warn_incompatible_parent_with_oldnew(
handler: handlers.BaseHandler,
handler: handling.Handler,
old: Any,
new: Any,
) -> None:
if old is not None or new is not None:
if isinstance(handler, handlers.ChangingHandler):
is_on_update = handler.reason == handlers.Reason.UPDATE
is_on_update = handler.reason == causation.Reason.UPDATE
is_on_field = handler.reason is None and not handler.initial
if not is_on_update and not is_on_field:
raise TypeError("Filters old=/new= can only be used in update handlers.")
15 changes: 7 additions & 8 deletions kopf/reactor/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@

from kopf.reactor import causation, handling, lifecycles, registries
from kopf.storage import states
from kopf.structs import callbacks, configuration, credentials, ephemera, \
handlers as handlers_, ids, primitives
from kopf.structs import configuration, credentials, ephemera, ids, primitives

logger = logging.getLogger(__name__)

Expand All @@ -34,7 +33,7 @@ def __init__(
self,
msg: str,
*,
outcomes: Mapping[ids.HandlerId, states.HandlerOutcome],
outcomes: Mapping[ids.HandlerId, handling.Outcome],
) -> None:
super().__init__(msg)
self.outcomes = outcomes
Expand Down Expand Up @@ -83,7 +82,7 @@ async def authenticate(
lifecycle=lifecycles.all_at_once,
registry=registry,
settings=settings,
activity=handlers_.Activity.AUTHENTICATION,
activity=causation.Activity.AUTHENTICATION,
indices=indices,
memo=memo,
)
Expand All @@ -100,13 +99,13 @@ async def authenticate(

async def run_activity(
*,
lifecycle: lifecycles.LifeCycleFn,
lifecycle: handling.LifeCycleFn,
registry: registries.OperatorRegistry,
settings: configuration.OperatorSettings,
activity: handlers_.Activity,
activity: causation.Activity,
indices: ephemera.Indices,
memo: ephemera.AnyMemo,
) -> Mapping[ids.HandlerId, callbacks.Result]:
) -> Mapping[ids.HandlerId, handling.Result]:
logger = logging.getLogger(f'kopf.activities.{activity.value}')

# For the activity handlers, we have neither bodies, nor patches, just the state.
Expand All @@ -119,7 +118,7 @@ async def run_activity(
)
handlers = registry._activities.get_handlers(activity=activity)
state = states.State.from_scratch().with_handlers(handlers)
outcomes: MutableMapping[ids.HandlerId, states.HandlerOutcome] = {}
outcomes: MutableMapping[ids.HandlerId, handling.Outcome] = {}
while not state.done:
current_outcomes = await handling.execute_handlers_once(
lifecycle=lifecycle,
Expand Down
Loading

0 comments on commit daf42e4

Please sign in to comment.