Skip to content

Commit

Permalink
Merge pull request #125 from nolar/background-events
Browse files Browse the repository at this point in the history
Post k8s-events in the background
  • Loading branch information
Sergey Vasilyev authored Jul 3, 2019
2 parents b977f12 + aba10c7 commit bdea97b
Show file tree
Hide file tree
Showing 16 changed files with 265 additions and 92 deletions.
2 changes: 1 addition & 1 deletion kopf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
EventsConfig,
WorkersConfig
)
from kopf.events import (
from kopf.engines.posting import (
event,
info,
warn,
Expand Down
26 changes: 15 additions & 11 deletions kopf/clients/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,33 @@
import kubernetes.client.rest

from kopf import config
from kopf.structs import hierarchies

logger = logging.getLogger(__name__)

MAX_MESSAGE_LENGTH = 1024
CUT_MESSAGE_INFIX = '...'


async def post_event(*, obj, type, reason, message=''):
async def post_event(*, obj=None, ref=None, type, reason, message=''):
"""
Issue an event for the object.
This is where they can also be accumulated, aggregated, grouped,
and where the rate-limits should be maintained. It can (and should)
be done by the client library, as it is done in the Go client.
"""

# Object reference - similar to the owner reference, but different.
if obj is not None and ref is not None:
raise TypeError("Only one of obj= and ref= is allowed for a posted event. Got both.")
if obj is None and ref is None:
raise TypeError("One of obj= and ref= is required for a posted event. Got none.")
if ref is None:
ref = hierarchies.build_object_reference(obj)

now = datetime.datetime.utcnow()
namespace = obj['metadata']['namespace']
namespace = ref['namespace'] or 'default'

# Prevent a common case of event posting errors but shortening the message.
if len(message) > MAX_MESSAGE_LENGTH:
Expand All @@ -28,15 +41,6 @@ async def post_event(*, obj, type, reason, message=''):
suffix = message[-MAX_MESSAGE_LENGTH // 2 + (len(infix) - len(infix) // 2):]
message = f'{prefix}{infix}{suffix}'

# Object reference - similar to the owner reference, but different.
ref = dict(
apiVersion=obj['apiVersion'],
kind=obj['kind'],
name=obj['metadata']['name'],
uid=obj['metadata']['uid'],
namespace=obj['metadata']['namespace'],
)

meta = kubernetes.client.V1ObjectMeta(
namespace=namespace,
generate_name='kopf-event-',
Expand Down
88 changes: 88 additions & 0 deletions kopf/engines/posting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""
All the functions to write the Kubernetes events for the Kubernetes objects.
They are used internally in the handling routines to show the progress,
and can be used directly from the handlers to add arbitrary custom events.
The actual k8s-event posting runs in the background,
and posts the k8s-events as soon as they are queued.
"""
import asyncio
import sys
from contextvars import ContextVar
from typing import Mapping, Text, NamedTuple

from kopf import config
from kopf.clients import events
from kopf.structs import dicts
from kopf.structs import hierarchies

event_queue_var: ContextVar[asyncio.Queue] = ContextVar('event_queue_var')


class K8sEvent(NamedTuple):
"""
A single k8s-event to be posted, with all ref-information preserved.
It can exist and be posted even after the object is garbage-collected.
"""
ref: Mapping
type: Text
reason: Text
message: Text


def event(objs, *, type, reason, message=''):
queue = event_queue_var.get()
for obj in dicts.walk(objs):
ref = hierarchies.build_object_reference(obj)
event = K8sEvent(ref=ref, type=type, reason=reason, message=message)
queue.put_nowait(event)


def info(obj, *, reason, message=''):
if config.EventsConfig.events_loglevel > config.LOGLEVEL_INFO:
return
event(obj, type='Normal', reason=reason, message=message)


def warn(obj, *, reason, message=''):
if config.EventsConfig.events_loglevel > config.LOGLEVEL_WARNING:
return
event(obj, type='Warning', reason=reason, message=message)


def exception(obj, *, reason='', message='', exc=None):
if config.EventsConfig.events_loglevel > config.LOGLEVEL_ERROR:
return
if exc is None:
_, exc, _ = sys.exc_info()
reason = reason if reason else type(exc).__name__
message = f'{message} {exc}' if message and exc else f'{exc}' if exc else f'{message}'
event(obj, type='Error', reason=reason, message=message)


async def poster(
event_queue: asyncio.Queue,
):
"""
Post events in the background as they are queued.
When the events come from the logging system, they have
their reason, type, and other fields adjusted to meet Kubernetes's concepts.
When the events are explicitly defined via `kopf.event` and similar calls,
they have these special fields defined already.
In either case, we pass the queued events directly to the K8s client
(or a client wrapper/adapter), with no extra processing.
This task is defined in this module only because all other tasks are here,
so we keep all forever-running tasks together.
"""
while True:
posted_event = await event_queue.get()
await events.post_event(
ref=posted_event.ref,
type=posted_event.type,
reason=posted_event.reason,
message=posted_event.message)
80 changes: 14 additions & 66 deletions kopf/events.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,20 @@
"""
All the functions to write the Kubernetes events on the Kubernetes objects.
They are used internally in the handling routine to show the progress,
and can be used directly from the handlers to add arbitrary custom events.
The events look like this:
kubectl describe -f myres.yaml
...
TODO
**THIS MODULE IS DEPRECATED AND WILL BE REMOVED.**
"""
import asyncio
import sys

from kopf import config
from kopf.clients import events


# TODO: rename it it kopf.log()? kopf.events.log()? kopf.events.warn()?
async def event_async(obj, *, type, reason, message=''):
"""
Issue an event for the object.
"""
if isinstance(obj, (list, tuple)):
for item in obj:
await events.post_event(obj=item, type=type, reason=reason, message=message)
else:
await events.post_event(obj=obj, type=type, reason=reason, message=message)


# Shortcuts for the only two officially documented event types as of now.
# However, any arbitrary strings can be used as an event type to the base function.
async def info_async(obj, *, reason, message=''):
if config.EventsConfig.events_loglevel > config.LOGLEVEL_INFO:
return
await event_async(obj, reason=reason, message=message, type='Normal')


async def warn_async(obj, *, reason, message=''):
if config.EventsConfig.events_loglevel > config.LOGLEVEL_WARNING:
return
await event_async(obj, reason=reason, message=message, type='Warning')


async def exception_async(obj, *, reason='', message='', exc=None):
if config.EventsConfig.events_loglevel > config.LOGLEVEL_ERROR:
return

if exc is None:
_, exc, _ = sys.exc_info()
reason = reason if reason else type(exc).__name__
message = f'{message} {exc}' if message else f'{exc}'
await event_async(obj, reason=reason, message=message, type='Error')


# Next 4 funcs are just synchronous interface for async event functions.
def event(obj, *, type, reason, message=''):
asyncio.wait_for(event_async(obj, type=type, reason=reason, message=message), timeout=None)


def info(obj, *, reason, message=''):
asyncio.wait_for(info_async(obj, reason=reason, message=message), timeout=None)
import warnings

from kopf.engines.posting import (
event,
info,
warn,
exception,
)

def warn(obj, *, reason, message=''):
asyncio.wait_for(warn_async(obj, reason=reason, message=message), timeout=None)
__all__ = ['event', 'info', 'warn', 'exception']


def exception(obj, *, reason='', message='', exc=None):
asyncio.wait_for(exception_async(obj, reason=reason, message=message, exc=exc), timeout=None)
# Triggered on explicit `import kopf.events` (not imported this way normally).
warnings.warn(
"`kopf.events` is deprecated; "
"use `kopf` directly: e.g. `kopf.event(...)`.",
DeprecationWarning, stacklevel=0)
16 changes: 9 additions & 7 deletions kopf/reactor/handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from contextvars import ContextVar
from typing import Optional, Callable, Iterable, Union, Collection

from kopf import events
from kopf.clients import patching
from kopf.engines import posting
from kopf.reactor import causation
from kopf.reactor import invocation
from kopf.reactor import registries
Expand Down Expand Up @@ -79,6 +79,7 @@ async def custom_object_handler(
resource: registries.Resource,
event: dict,
freeze: asyncio.Event,
event_queue: asyncio.Queue,
) -> None:
"""
Handle a single custom object low-level watch-event.
Expand All @@ -98,6 +99,7 @@ async def custom_object_handler(
namespace=body.get('metadata', {}).get('namespace', 'default'),
name=body.get('metadata', {}).get('name', body.get('metadata', {}).get('uid', None)),
))
posting.event_queue_var.set(event_queue) # till the end of this object's task.

# If the global freeze is set for the processing (i.e. other operator overrides), do nothing.
if freeze.is_set():
Expand Down Expand Up @@ -205,7 +207,7 @@ async def handle_cause(
done = False
else:
logger.info(f"All handlers succeeded for {title}.")
await events.info_async(cause.body, reason='Success', message=f"All handlers succeeded for {title}.")
posting.info(cause.body, reason='Success', message=f"All handlers succeeded for {title}.")
done = True
else:
skip = True
Expand Down Expand Up @@ -383,34 +385,34 @@ async def _execute(
# Definitely retriable error, no matter what is the error-reaction mode.
except HandlerRetryError as e:
logger.exception(f"Handler {handler.id!r} failed with a retry exception. Will retry.")
await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will retry.")
posting.exception(cause.body, message=f"Handler {handler.id!r} failed. Will retry.")
status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=e.delay)
handlers_left.append(handler)

# Definitely fatal error, no matter what is the error-reaction mode.
except HandlerFatalError as e:
logger.exception(f"Handler {handler.id!r} failed with a fatal exception. Will stop.")
await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will stop.")
posting.exception(cause.body, message=f"Handler {handler.id!r} failed. Will stop.")
status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e)
# TODO: report the handling failure somehow (beside logs/events). persistent status?

# Regular errors behave as either retriable or fatal depending on the error-reaction mode.
except Exception as e:
if retry_on_errors:
logger.exception(f"Handler {handler.id!r} failed with an exception. Will retry.")
await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will retry.")
posting.exception(cause.body, message=f"Handler {handler.id!r} failed. Will retry.")
status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=DEFAULT_RETRY_DELAY)
handlers_left.append(handler)
else:
logger.exception(f"Handler {handler.id!r} failed with an exception. Will stop.")
await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will stop.")
posting.exception(cause.body, message=f"Handler {handler.id!r} failed. Will stop.")
status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e)
# TODO: report the handling failure somehow (beside logs/events). persistent status?

# No errors means the handler should be excluded from future runs in this reaction cycle.
else:
logger.info(f"Handler {handler.id!r} succeeded.")
await events.info_async(cause.body, reason='Success', message=f"Handler {handler.id!r} succeeded.")
posting.info(cause.body, reason='Success', message=f"Handler {handler.id!r} succeeded.")
status.store_success(body=cause.body, patch=cause.patch, handler=handler, result=result)

# Provoke the retry of the handling cycle if there were any unfinished handlers,
Expand Down
10 changes: 10 additions & 0 deletions kopf/reactor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from kopf import config
from kopf.clients import watching
from kopf.engines import peering
from kopf.engines import posting
from kopf.reactor import handling
from kopf.reactor import lifecycles
from kopf.reactor import registries
Expand Down Expand Up @@ -174,9 +175,17 @@ def create_tasks(
# The freezer and the registry are scoped to this whole task-set, to sync them all.
lifecycle = lifecycle if lifecycle is not None else lifecycles.get_default_lifecycle()
registry = registry if registry is not None else registries.get_default_registry()
event_queue = asyncio.Queue()
freeze = asyncio.Event()
tasks = []

# K8s-event posting. Events are queued in-memory and posted in the background.
# NB: currently, it is a global task, but can be made per-resource or per-object.
tasks.extend([
loop.create_task(posting.poster(
event_queue=event_queue)),
])

# Monitor the peers, unless explicitly disabled.
ourselves: Optional[peering.Peer] = peering.Peer.detect(
id=peering.detect_own_id(), priority=priority,
Expand Down Expand Up @@ -204,6 +213,7 @@ def create_tasks(
lifecycle=lifecycle,
registry=registry,
resource=resource,
event_queue=event_queue,
freeze=freeze))), # freeze is only checked
])

Expand Down
Loading

0 comments on commit bdea97b

Please sign in to comment.