Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert internal timestamps to TZ-aware, treat user-provided TZ-naive ones as UTC #1068

Merged
merged 3 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/authentication.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ or an instance of :class:`kopf.ConnectionInfo`::
expiration=datetime.datetime(2099, 12, 31, 23, 59, 59),
)

Both TZ-naive & TZ-aware expiration times are supported.
The TZ-naive timestamps are always treated as UTC.

As with any other handlers, the login handler can be async if the network
communication is needed and async mode is supported::

Expand Down
2 changes: 1 addition & 1 deletion docs/errors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ is no need to retry over time, as it will not become better::
@kopf.on.create('kopfexamples')
def create_fn(spec, **_):
valid_until = datetime.datetime.fromisoformat(spec['validUntil'])
if valid_until <= datetime.datetime.utcnow():
if valid_until <= datetime.datetime.now(datetime.timezone.utc):
raise kopf.PermanentError("The object is not valid anymore.")

See also: :ref:`never-again-filters` to prevent handlers from being invoked
Expand Down
4 changes: 2 additions & 2 deletions docs/probing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ probing handlers:

@kopf.on.probe(id='now')
def get_current_timestamp(**kwargs):
return datetime.datetime.utcnow().isoformat()
return datetime.datetime.now(datetime.timezone.utc).isoformat()

@kopf.on.probe(id='random')
def get_random_value(**kwargs):
Expand All @@ -91,7 +91,7 @@ The handler results will be reported as the content of the liveness response:
.. code-block:: console

$ curl http://localhost:8080/healthz
{"now": "2019-11-07T18:03:52.513803", "random": 765846}
{"now": "2019-11-07T18:03:52.513803+00:00", "random": 765846}

.. note::
The liveness status report is simplistic and minimalistic at the moment.
Expand Down
2 changes: 1 addition & 1 deletion examples/13-hooks/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def login_fn(**kwargs):
certificate_path=cert.filename() if cert else None, # can be a temporary file
private_key_path=pkey.filename() if pkey else None, # can be a temporary file
default_namespace=config.namespace,
expiration=datetime.datetime.utcnow() + datetime.timedelta(seconds=30),
expiration=datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=30),
)


Expand Down
8 changes: 4 additions & 4 deletions kopf/_cogs/clients/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async def post_event(
suffix = message[-MAX_MESSAGE_LENGTH // 2 + (len(infix) - len(infix) // 2):]
message = f'{prefix}{infix}{suffix}'

now = datetime.datetime.utcnow()
now = datetime.datetime.now(datetime.timezone.utc)
body = {
'metadata': {
'namespace': namespace,
Expand All @@ -67,9 +67,9 @@ async def post_event(

'involvedObject': full_ref,

'firstTimestamp': now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' -- seen in `kubectl describe ...`
'lastTimestamp': now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' - seen in `kubectl get events`
'eventTime': now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z'
'firstTimestamp': now.isoformat(), # seen in `kubectl describe ...`
'lastTimestamp': now.isoformat(), # seen in `kubectl get events`
'eventTime': now.isoformat(),
}

try:
Expand Down
39 changes: 23 additions & 16 deletions kopf/_cogs/structs/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
private_key_data: Optional[bytes] = None
default_namespace: Optional[str] = None # used for cluster objects' k8s-events.
priority: int = 0
expiration: Optional[datetime.datetime] = None # TZ-naive, the same as utcnow()
expiration: Optional[datetime.datetime] = None # TZ-aware or TZ-naive (implies UTC)


_T = TypeVar('_T', bound=object)
Expand Down Expand Up @@ -118,7 +118,7 @@
self._current = {}
self._invalid = collections.defaultdict(list)
self._lock = asyncio.Lock()
self._next_expiration = datetime.datetime.max
self._next_expiration: Optional[datetime.datetime] = None

if __src is not None:
self._update_converted(__src)
Expand Down Expand Up @@ -229,13 +229,19 @@
Unlike invalidation, the expired credentials are not remembered
and not blocked from reappearing.
"""
now = datetime.datetime.utcnow()
if now >= self._next_expiration: # quick & lockless for speed: it is done on every API call
now = datetime.datetime.now(datetime.timezone.utc)

# Quick & lockless for speed: it is done on every API call, we have no time for locks.
if self._next_expiration is not None and now >= self._next_expiration:
async with self._lock:
for key, item in list(self._current.items()):
if item.info.expiration is not None and now >= item.info.expiration:
await self._flush_caches(item)
del self._current[key]
expiration = item.info.expiration
if expiration is not None:
if expiration.tzinfo is None:
expiration = expiration.replace(tzinfo=datetime.timezone.utc)

Check warning on line 241 in kopf/_cogs/structs/credentials.py

View check run for this annotation

Codecov / codecov/patch

kopf/_cogs/structs/credentials.py#L241

Added line #L241 was not covered by tests
if now >= expiration:
await self._flush_caches(item)
del self._current[key]
self._update_expiration()
need_reauth = not self._current # i.e. nothing is left at all

Expand Down Expand Up @@ -315,11 +321,12 @@
await self._ready.turn_to(True)

def is_empty(self) -> bool:
now = datetime.datetime.utcnow()
return all(
item.info.expiration is not None and now >= item.info.expiration # i.e. expired
for key, item in self._current.items()
)
now = datetime.datetime.now(datetime.timezone.utc)
expirations = [
dt if dt is None or dt.tzinfo is not None else dt.replace(tzinfo=datetime.timezone.utc)
for dt in (item.info.expiration for item in self._current.values())
]
return all(dt is not None and now >= dt for dt in expirations) # i.e. expired

async def wait_for_readiness(self) -> None:
await self._ready.wait_for(True)
Expand Down Expand Up @@ -381,8 +388,8 @@

def _update_expiration(self) -> None:
expirations = [
item.info.expiration
for item in self._current.values()
if item.info.expiration is not None
dt if dt.tzinfo is not None else dt.replace(tzinfo=datetime.timezone.utc)
for dt in (item.info.expiration for item in self._current.values())
if dt is not None
]
self._next_expiration = min(expirations + [datetime.datetime.max])
self._next_expiration = min(expirations) if expirations else None
2 changes: 1 addition & 1 deletion kopf/_core/actions/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ async def apply(
logger.debug(f"Sleeping was interrupted by new changes, {unslept_delay} seconds left.")
else:
# Any unique always-changing value will work; not necessary a timestamp.
value = datetime.datetime.utcnow().isoformat()
value = datetime.datetime.now(datetime.timezone.utc).isoformat()
touch = patches.Patch()
settings.persistence.progress_storage.touch(body=body, patch=touch, value=value)
await patch_and_check(
Expand Down
6 changes: 3 additions & 3 deletions kopf/_core/actions/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def finished(self) -> bool:
@property
def sleeping(self) -> bool:
ts = self.delayed
now = datetime.datetime.utcnow()
now = datetime.datetime.now(datetime.timezone.utc)
return not self.finished and ts is not None and ts > now

@property
Expand All @@ -122,7 +122,7 @@ def awakened(self) -> bool:

@property
def runtime(self) -> datetime.timedelta:
now = datetime.datetime.utcnow()
now = datetime.datetime.now(datetime.timezone.utc)
return now - (self.started if self.started else now)


Expand Down Expand Up @@ -277,7 +277,7 @@ async def execute_handler_once(
handler=handler,
cause=cause,
retry=state.retries,
started=state.started or datetime.datetime.utcnow(), # "or" is for type-checking.
started=state.started or datetime.datetime.now(datetime.timezone.utc), # "or" is for type-checking.
runtime=state.runtime,
settings=settings,
lifecycle=lifecycle, # just a default for the sub-handlers, not used directly.
Expand Down
42 changes: 19 additions & 23 deletions kopf/_core/actions/progression.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from typing import Any, Collection, Dict, Iterable, Iterator, \
Mapping, NamedTuple, Optional, overload

import iso8601

from kopf._cogs.configs import progress
from kopf._cogs.structs import bodies, ids, patches
from kopf._core.actions import execution
Expand Down Expand Up @@ -54,17 +56,17 @@ class HandlerState(execution.HandlerState):
def from_scratch(cls, *, purpose: Optional[str] = None) -> "HandlerState":
return cls(
active=True,
started=datetime.datetime.utcnow(),
started=datetime.datetime.now(datetime.timezone.utc),
purpose=purpose,
)

@classmethod
def from_storage(cls, __d: progress.ProgressRecord) -> "HandlerState":
return cls(
active=False,
started=_datetime_fromisoformat(__d.get('started')) or datetime.datetime.utcnow(),
stopped=_datetime_fromisoformat(__d.get('stopped')),
delayed=_datetime_fromisoformat(__d.get('delayed')),
started=_parse_iso8601(__d.get('started')) or datetime.datetime.now(datetime.timezone.utc),
stopped=_parse_iso8601(__d.get('stopped')),
delayed=_parse_iso8601(__d.get('delayed')),
purpose=__d.get('purpose') if __d.get('purpose') else None,
retries=__d.get('retries') or 0,
success=__d.get('success') or False,
Expand All @@ -76,9 +78,9 @@ def from_storage(cls, __d: progress.ProgressRecord) -> "HandlerState":

def for_storage(self) -> progress.ProgressRecord:
return progress.ProgressRecord(
started=None if self.started is None else _datetime_toisoformat(self.started),
stopped=None if self.stopped is None else _datetime_toisoformat(self.stopped),
delayed=None if self.delayed is None else _datetime_toisoformat(self.delayed),
started=None if self.started is None else _format_iso8601(self.started),
stopped=None if self.stopped is None else _format_iso8601(self.stopped),
delayed=None if self.delayed is None else _format_iso8601(self.delayed),
purpose=None if self.purpose is None else str(self.purpose),
retries=None if self.retries is None else int(self.retries),
success=None if self.success is None else bool(self.success),
Expand All @@ -104,7 +106,7 @@ def with_outcome(
self,
outcome: execution.Outcome,
) -> "HandlerState":
now = datetime.datetime.utcnow()
now = datetime.datetime.now(datetime.timezone.utc)
cls = type(self)
return cls(
active=self.active,
Expand Down Expand Up @@ -313,7 +315,7 @@ def delays(self) -> Collection[float]:
processing routine, based on all delays of different origin:
e.g. postponed daemons, stopping daemons, temporarily failed handlers.
"""
now = datetime.datetime.utcnow()
now = datetime.datetime.now(datetime.timezone.utc)
return [
max(0, (handler_state.delayed - now).total_seconds()) if handler_state.delayed else 0
for handler_state in self._states.values()
Expand Down Expand Up @@ -355,30 +357,24 @@ def deliver_results(


@overload
def _datetime_toisoformat(val: None) -> None: ...
def _format_iso8601(val: None) -> None: ...
Dismissed Show dismissed Hide dismissed


@overload
def _datetime_toisoformat(val: datetime.datetime) -> str: ...
def _format_iso8601(val: datetime.datetime) -> str: ...
Dismissed Show dismissed Hide dismissed


def _datetime_toisoformat(val: Optional[datetime.datetime]) -> Optional[str]:
if val is None:
return None
else:
return val.isoformat(timespec='microseconds')
def _format_iso8601(val: Optional[datetime.datetime]) -> Optional[str]:
return None if val is None else val.isoformat(timespec='microseconds')


@overload
def _datetime_fromisoformat(val: None) -> None: ...
def _parse_iso8601(val: None) -> None: ...
Dismissed Show dismissed Hide dismissed


@overload
def _datetime_fromisoformat(val: str) -> datetime.datetime: ...
def _parse_iso8601(val: str) -> datetime.datetime: ...
Dismissed Show dismissed Hide dismissed


def _datetime_fromisoformat(val: Optional[str]) -> Optional[datetime.datetime]:
if val is None:
return None
else:
return datetime.datetime.fromisoformat(val)
def _parse_iso8601(val: Optional[str]) -> Optional[datetime.datetime]:
return None if val is None else iso8601.parse_date(val) # always TZ-aware
9 changes: 4 additions & 5 deletions kopf/_core/engines/peering.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,9 @@ def __init__(
self.priority = priority
self.lifetime = datetime.timedelta(seconds=int(lifetime))
self.lastseen = (iso8601.parse_date(lastseen) if lastseen is not None else
datetime.datetime.utcnow())
self.lastseen = self.lastseen.replace(tzinfo=None) # only the naive utc -- for comparison
datetime.datetime.now(datetime.timezone.utc))
self.deadline = self.lastseen + self.lifetime
self.is_dead = self.deadline <= datetime.datetime.utcnow()
self.is_dead = self.deadline <= datetime.datetime.now(datetime.timezone.utc)

def __repr__(self) -> str:
clsname = self.__class__.__name__
Expand Down Expand Up @@ -149,7 +148,7 @@ async def process_peering_event(
# are expected to expire, and force the immediate re-evaluation by a certain change of self.
# This incurs an extra PATCH request besides usual keepalives, but in the complete silence
# from other peers that existed a moment earlier, this should not be a problem.
now = datetime.datetime.utcnow()
now = datetime.datetime.now(datetime.timezone.utc)
delays = [(peer.deadline - now).total_seconds() for peer in same_peers + prio_peers]
unslept = await aiotime.sleep(delays, wakeup=stream_pressure)
if unslept is None and delays:
Expand Down Expand Up @@ -279,7 +278,7 @@ def detect_own_id(*, manual: bool) -> Identity:

user = getpass.getuser()
host = hostnames.get_descriptive_hostname()
now = datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S")
now = datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d%H%M%S")
rnd = ''.join(random.choices('abcdefhijklmnopqrstuvwxyz0123456789', k=3))
return Identity(f'{user}@{host}' if manual else f'{user}@{host}/{now}/{rnd}')

Expand Down
6 changes: 3 additions & 3 deletions kopf/_core/engines/probing.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ async def get_health(

# Recollect the data on-demand, and only if is is older that a reasonable caching period.
# Protect against multiple parallel requests performing the same heavy activity.
now = datetime.datetime.utcnow()
now = datetime.datetime.now(datetime.timezone.utc)
if probing_timestamp is None or now - probing_timestamp >= probing_max_age:
async with probing_lock:
now = datetime.datetime.utcnow()
now = datetime.datetime.now(datetime.timezone.utc)
if probing_timestamp is None or now - probing_timestamp >= probing_max_age:

activity_results = await activities.run_activity(
Expand All @@ -64,7 +64,7 @@ async def get_health(
)
probing_container.clear()
probing_container.update(activity_results)
probing_timestamp = datetime.datetime.utcnow()
probing_timestamp = datetime.datetime.now(datetime.timezone.utc)

return aiohttp.web.json_response(probing_container)

Expand Down
9 changes: 5 additions & 4 deletions tests/authentication/test_vault.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime

import freezegun
import iso8601
import pytest

from kopf._cogs.structs.credentials import ConnectionInfo, LoginError, Vault, VaultKey
Expand Down Expand Up @@ -55,7 +56,7 @@ async def test_yielding_after_population(mocker):

@freezegun.freeze_time('2020-01-01T00:00:00')
async def test_yielding_items_before_expiration(mocker):
future = datetime.datetime(2020, 1, 1, 0, 0, 0, 1)
future = iso8601.parse_date('2020-01-01T00:00:00.000001')
key1 = VaultKey('some-key')
info1 = ConnectionInfo(server='https://expected/', expiration=future)
vault = Vault()
Expand All @@ -74,8 +75,8 @@ async def test_yielding_items_before_expiration(mocker):
@pytest.mark.parametrize('delta', [0, 1])
@freezegun.freeze_time('2020-01-01T00:00:00')
async def test_yielding_ignores_expired_items(mocker, delta):
future = datetime.datetime(2020, 1, 1, 0, 0, 0, 1)
past = datetime.datetime(2020, 1, 1) - datetime.timedelta(microseconds=delta)
future = iso8601.parse_date('2020-01-01T00:00:00.000001')
past = iso8601.parse_date('2020-01-01') - datetime.timedelta(microseconds=delta)
key1 = VaultKey('some-key')
key2 = VaultKey('other-key')
info1 = ConnectionInfo(server='https://expected/', expiration=past)
Expand All @@ -96,7 +97,7 @@ async def test_yielding_ignores_expired_items(mocker, delta):
@pytest.mark.parametrize('delta', [0, 1])
@freezegun.freeze_time('2020-01-01T00:00:00')
async def test_yielding_when_everything_is_expired(mocker, delta):
past = datetime.datetime(2020, 1, 1) - datetime.timedelta(microseconds=delta)
past = iso8601.parse_date('2020-01-01') - datetime.timedelta(microseconds=delta)
key1 = VaultKey('some-key')
info1 = ConnectionInfo(server='https://expected/', expiration=past)
vault = Vault()
Expand Down
2 changes: 1 addition & 1 deletion tests/handling/daemons/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def frozen_time():
A helper to simulate time movements to step over long sleeps/timeouts.
"""
# TODO LATER: Either freezegun should support the system clock, or find something else.
with freezegun.freeze_time("2020-01-01 00:00:00") as frozen:
with freezegun.freeze_time("2020-01-01T00:00:00") as frozen:
# Use freezegun-supported time instead of system clocks -- for testing purposes only.
# NB: Patch strictly after the time is frozen -- to use fake_time(), not real time().
with patch('time.monotonic', time.time), patch('time.perf_counter', time.time):
Expand Down
Loading
Loading