Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple & isolate handling from specialised causes & handlers #759

Merged
merged 6 commits into from
May 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions kopf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@
AdmissionError,
)
from kopf.reactor.handling import (
Logger,
ErrorsMode,
TemporaryError,
PermanentError,
HandlerTimeoutError,
HandlerRetriesError,
)
from kopf.reactor.subhandling import (
execute,
)
from kopf.reactor.lifecycles import (
Expand Down Expand Up @@ -88,7 +92,6 @@
build_owner_reference,
)
from kopf.structs.callbacks import (
Logger,
not_,
all_,
any_,
Expand Down Expand Up @@ -119,8 +122,7 @@
ABSENT,
PRESENT,
)
from kopf.structs.handlers import (
ErrorsMode,
from kopf.reactor.causation import (
Reason,
)
from kopf.structs.ids import (
Expand Down
8 changes: 4 additions & 4 deletions kopf/engines/probing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

import aiohttp.web

from kopf.reactor import activities, lifecycles, registries
from kopf.structs import callbacks, configuration, ephemera, handlers, ids
from kopf.reactor import activities, causation, handling, lifecycles, registries
from kopf.structs import configuration, ephemera, ids

logger = logging.getLogger(__name__)

Expand All @@ -33,7 +33,7 @@ async def health_reporter(
is cancelled or failed). Once it will stop responding for any reason,
Kubernetes will assume the pod is not alive anymore, and will restart it.
"""
probing_container: MutableMapping[ids.HandlerId, callbacks.Result] = {}
probing_container: MutableMapping[ids.HandlerId, handling.Result] = {}
probing_timestamp: Optional[datetime.datetime] = None
probing_max_age = datetime.timedelta(seconds=10.0)
probing_lock = asyncio.Lock()
Expand All @@ -55,7 +55,7 @@ async def get_health(
lifecycle=lifecycles.all_at_once,
registry=registry,
settings=settings,
activity=handlers.Activity.PROBE,
activity=causation.Activity.PROBE,
indices=indices,
memo=memo,
)
Expand Down
54 changes: 27 additions & 27 deletions kopf/on.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def creation_handler(**kwargs):
# TODO: add cluster=True support (different API methods)
from typing import Any, Callable, Optional, Union

from kopf.reactor import handling, registries
from kopf.reactor import causation, handling, registries, subhandling
from kopf.structs import callbacks, dicts, filters, handlers, references, reviews

ActivityDecorator = Callable[[callbacks.ActivityFn], callbacks.ActivityFn]
Expand All @@ -30,7 +30,7 @@ def startup( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -45,7 +45,7 @@ def decorator( # lgtm[py/similar-function]
handler = handlers.ActivityHandler(
fn=fn, id=real_id, param=param,
errors=errors, timeout=timeout, retries=retries, backoff=backoff,
activity=handlers.Activity.STARTUP,
activity=causation.Activity.STARTUP,
)
real_registry._activities.append(handler)
return fn
Expand All @@ -57,7 +57,7 @@ def cleanup( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -72,7 +72,7 @@ def decorator( # lgtm[py/similar-function]
handler = handlers.ActivityHandler(
fn=fn, id=real_id, param=param,
errors=errors, timeout=timeout, retries=retries, backoff=backoff,
activity=handlers.Activity.CLEANUP,
activity=causation.Activity.CLEANUP,
)
real_registry._activities.append(handler)
return fn
Expand All @@ -84,7 +84,7 @@ def login( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -100,7 +100,7 @@ def decorator( # lgtm[py/similar-function]
handler = handlers.ActivityHandler(
fn=fn, id=real_id, param=param,
errors=errors, timeout=timeout, retries=retries, backoff=backoff,
activity=handlers.Activity.AUTHENTICATION,
activity=causation.Activity.AUTHENTICATION,
)
real_registry._activities.append(handler)
return fn
Expand All @@ -112,7 +112,7 @@ def probe( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand All @@ -128,7 +128,7 @@ def decorator( # lgtm[py/similar-function]
handler = handlers.ActivityHandler(
fn=fn, id=real_id, param=param,
errors=errors, timeout=timeout, retries=retries, backoff=backoff,
activity=handlers.Activity.PROBE,
activity=causation.Activity.PROBE,
)
real_registry._activities.append(handler)
return fn
Expand Down Expand Up @@ -183,7 +183,7 @@ def decorator( # lgtm[py/similar-function]
errors=None, timeout=None, retries=None, backoff=None, # TODO: add some meaning later
selector=selector, labels=labels, annotations=annotations, when=when,
field=real_field, value=value,
reason=handlers.WebhookType.VALIDATING, operation=operation,
reason=causation.WebhookType.VALIDATING, operation=operation,
persistent=persistent, side_effects=side_effects, ignore_failures=ignore_failures,
)
real_registry._webhooks.append(handler)
Expand Down Expand Up @@ -239,7 +239,7 @@ def decorator( # lgtm[py/similar-function]
errors=None, timeout=None, retries=None, backoff=None, # TODO: add some meaning later
selector=selector, labels=labels, annotations=annotations, when=when,
field=real_field, value=value,
reason=handlers.WebhookType.MUTATING, operation=operation,
reason=causation.WebhookType.MUTATING, operation=operation,
persistent=persistent, side_effects=side_effects, ignore_failures=ignore_failures,
)
real_registry._webhooks.append(handler)
Expand All @@ -263,7 +263,7 @@ def resume( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -320,7 +320,7 @@ def create( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -353,7 +353,7 @@ def decorator( # lgtm[py/similar-function]
selector=selector, labels=labels, annotations=annotations, when=when,
field=real_field, value=value, old=None, new=None, field_needs_change=False,
initial=None, deleted=None, requires_finalizer=None,
reason=handlers.Reason.CREATE,
reason=causation.Reason.CREATE,
)
real_registry._changing.append(handler)
return fn
Expand All @@ -376,7 +376,7 @@ def update( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -411,7 +411,7 @@ def decorator( # lgtm[py/similar-function]
selector=selector, labels=labels, annotations=annotations, when=when,
field=real_field, value=value, old=old, new=new, field_needs_change=True,
initial=None, deleted=None, requires_finalizer=None,
reason=handlers.Reason.UPDATE,
reason=causation.Reason.UPDATE,
)
real_registry._changing.append(handler)
return fn
Expand All @@ -434,7 +434,7 @@ def delete( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -468,7 +468,7 @@ def decorator( # lgtm[py/similar-function]
selector=selector, labels=labels, annotations=annotations, when=when,
field=real_field, value=value, old=None, new=None, field_needs_change=False,
initial=None, deleted=None, requires_finalizer=bool(not optional),
reason=handlers.Reason.DELETE,
reason=causation.Reason.DELETE,
)
real_registry._changing.append(handler)
return fn
Expand All @@ -491,7 +491,7 @@ def field( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -549,7 +549,7 @@ def index( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -653,7 +653,7 @@ def daemon( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -715,7 +715,7 @@ def timer( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -764,7 +764,7 @@ def subhandler( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -814,7 +814,7 @@ def decorator( # lgtm[py/similar-function]
_warn_incompatible_parent_with_oldnew(parent_handler, old, new)
_warn_conflicting_values(field, value, old, new)
_verify_filters(labels, annotations)
real_registry = handling.subregistry_var.get()
real_registry = subhandling.subregistry_var.get()
real_field = dicts.parse_field(field) or None # to not store tuple() as a no-field case.
real_id = registries.generate_id(fn=fn, id=id,
prefix=parent_handler.id if parent_handler else None)
Expand All @@ -838,7 +838,7 @@ def register( # lgtm[py/similar-function]
# Handler's behaviour specification:
id: Optional[str] = None,
param: Optional[Any] = None,
errors: Optional[handlers.ErrorsMode] = None,
errors: Optional[handling.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
Expand Down Expand Up @@ -908,13 +908,13 @@ def _warn_conflicting_values(


def _warn_incompatible_parent_with_oldnew(
handler: handlers.BaseHandler,
handler: handling.Handler,
old: Any,
new: Any,
) -> None:
if old is not None or new is not None:
if isinstance(handler, handlers.ChangingHandler):
is_on_update = handler.reason == handlers.Reason.UPDATE
is_on_update = handler.reason == causation.Reason.UPDATE
is_on_field = handler.reason is None and not handler.initial
if not is_on_update and not is_on_field:
raise TypeError("Filters old=/new= can only be used in update handlers.")
15 changes: 7 additions & 8 deletions kopf/reactor/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@

from kopf.reactor import causation, handling, lifecycles, registries
from kopf.storage import states
from kopf.structs import callbacks, configuration, credentials, ephemera, \
handlers as handlers_, ids, primitives
from kopf.structs import configuration, credentials, ephemera, ids, primitives

logger = logging.getLogger(__name__)

Expand All @@ -34,7 +33,7 @@ def __init__(
self,
msg: str,
*,
outcomes: Mapping[ids.HandlerId, states.HandlerOutcome],
outcomes: Mapping[ids.HandlerId, handling.Outcome],
) -> None:
super().__init__(msg)
self.outcomes = outcomes
Expand Down Expand Up @@ -83,7 +82,7 @@ async def authenticate(
lifecycle=lifecycles.all_at_once,
registry=registry,
settings=settings,
activity=handlers_.Activity.AUTHENTICATION,
activity=causation.Activity.AUTHENTICATION,
indices=indices,
memo=memo,
)
Expand All @@ -100,13 +99,13 @@ async def authenticate(

async def run_activity(
*,
lifecycle: lifecycles.LifeCycleFn,
lifecycle: handling.LifeCycleFn,
registry: registries.OperatorRegistry,
settings: configuration.OperatorSettings,
activity: handlers_.Activity,
activity: causation.Activity,
indices: ephemera.Indices,
memo: ephemera.AnyMemo,
) -> Mapping[ids.HandlerId, callbacks.Result]:
) -> Mapping[ids.HandlerId, handling.Result]:
logger = logging.getLogger(f'kopf.activities.{activity.value}')

# For the activity handlers, we have neither bodies, nor patches, just the state.
Expand All @@ -119,7 +118,7 @@ async def run_activity(
)
handlers = registry._activities.get_handlers(activity=activity)
state = states.State.from_scratch().with_handlers(handlers)
outcomes: MutableMapping[ids.HandlerId, states.HandlerOutcome] = {}
outcomes: MutableMapping[ids.HandlerId, handling.Outcome] = {}
while not state.done:
current_outcomes = await handling.execute_handlers_once(
lifecycle=lifecycle,
Expand Down
Loading