Skip to content

Commit

Permalink
improve efficiency of reflector
Browse files Browse the repository at this point in the history
The reflector performs a _list_and_update every watch timeout (default
10s) requesting most recent data from the apiserver via the default
resource_version="".

This requires the apiserver to retrieve the latest data from the backing
store (etcd) which can be very expensive in large clusters.
For example if omit_namespace is set the full cluster pod listings
require several seconds retrieve all data from the etcd in clusters with
more then 10000 pods. The label selector filtering is only done in the
apiserver.

As the _list_and_update is used for watches the listing should set
the resource version and resource version match to tell the apiserver
that its locally cached data is sufficient.
The initial listing will fetch any data the apiserver currently has to
prepopulate the watch, the watch itself will then fetch whatever is
remaining from the etcd.

The subsequent relistings after the watch timeouts use their last
received resource version in the same way.

For details see https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions

Closes gh-753
  • Loading branch information
juliantaylor committed Jul 25, 2023
1 parent def501f commit e740189
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions kubespawner/reflector.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def __init__(self, *args, **kwargs):

self.watch_task = None

async def _list_and_update(self):
async def _list_and_update(self, resource_version=None):
"""
Update current list of resources by doing a full fetch.
Expand All @@ -221,6 +221,9 @@ async def _list_and_update(self):
_request_timeout=self.request_timeout,
_preload_content=False,
)
if resource_version is not None:
kwargs["resource_version"] = resource_version
kwargs["resource_version_match"] = "NotOlderThan"
if not self.omit_namespace:
kwargs["namespace"] = self.namespace

Expand Down Expand Up @@ -264,6 +267,11 @@ async def _watch_and_update(self):
selectors.append("field selector=%r" % self.field_selector)
log_selector = ', '.join(selectors)

# fetch Any (=api-server cached) data from apiserver on initial fetch
# see https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions
# for more information
resource_version = "0"

cur_delay = 0.1

if self.omit_namespace:
Expand All @@ -282,7 +290,7 @@ async def _watch_and_update(self):
start = time.monotonic()
w = watch.Watch()
try:
resource_version = await self._list_and_update()
resource_version = await self._list_and_update(resource_version)
watch_args = {
"label_selector": self.label_selector,
"field_selector": self.field_selector,
Expand Down Expand Up @@ -325,6 +333,7 @@ async def _watch_and_update(self):
else:
# This is an atomic operation on the dictionary!
self.resources[ref_key] = resource
resource_version = resource["metadata"]["resourceVersion"]
if self._stopping:
self.log.info("%s watcher stopped: inner", self.kind)
break
Expand All @@ -346,6 +355,9 @@ async def _watch_and_update(self):
self.log.debug("Cancelled watching %s", self.kind)
raise
except Exception:
# ensure we request a valid resource version on retry,
# needed on 410 Gone errors
resource_version = "0"
cur_delay = cur_delay * 2
if cur_delay > 30:
self.log.exception("Watching resources never recovered, giving up")
Expand Down

0 comments on commit e740189

Please sign in to comment.