Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful integration test 2 #5926

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions tests/integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pycloudlib.cloud import ImageType
from pycloudlib.lxd.instance import LXDInstance

import tests.integration_tests.reaper as reaper
from tests.integration_tests import integration_settings
from tests.integration_tests.clouds import (
AzureCloud,
Expand Down Expand Up @@ -75,6 +76,7 @@ def disable_subp_usage(request):


_SESSION_CLOUD: IntegrationCloud
_REAPER: reaper.Reaper


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -472,9 +474,12 @@ def _generate_profile_report() -> None:
def pytest_sessionstart(session) -> None:
"""do session setup"""
global _SESSION_CLOUD
global _REAPER
try:
_SESSION_CLOUD = get_session_cloud()
setup_image(_SESSION_CLOUD)
_REAPER = reaper.Reaper()
_REAPER.reaper_start()
except Exception as e:
if _SESSION_CLOUD:
# if a _SESSION_CLOUD was allocated, clean it up
Expand All @@ -489,6 +494,7 @@ def pytest_sessionstart(session) -> None:

def pytest_sessionfinish(session, exitstatus) -> None:
"""do session teardown"""
global _REAPER
try:
if integration_settings.INCLUDE_COVERAGE:
_generate_coverage_report()
Expand All @@ -504,6 +510,14 @@ def pytest_sessionfinish(session, exitstatus) -> None:
_SESSION_CLOUD.snapshot_id,
e,
)
try:
_REAPER.reaper_stop()
except Exception as e:
log.warning(
"Could not tear down instance reaper thread: %s(%s)",
type(e).__name__,
e,
)
try:
_SESSION_CLOUD.destroy()
except Exception as e:
Expand Down
4 changes: 2 additions & 2 deletions tests/integration_tests/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from pycloudlib.result import Result

from tests.helpers import cloud_init_project_dir
from tests.integration_tests import integration_settings
from tests.integration_tests import integration_settings, reaper
from tests.integration_tests.decorators import retry
from tests.integration_tests.util import ASSETS_DIR

Expand Down Expand Up @@ -330,6 +330,6 @@ def __enter__(self):

def __exit__(self, exc_type, exc_val, exc_tb):
if not self.settings.KEEP_INSTANCE:
self.destroy()
reaper.reaper.reap(self)
else:
log.info("Keeping Instance, public ip: %s", self.ip())
194 changes: 194 additions & 0 deletions tests/integration_tests/reaper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
"""Destroy instances in a background thread

interface:

start_reaper() - spawns reaper thread
stop_reaper() - join thread and report leaked instances
reap(instance: IntegrationInstance) - queues instance for deletion

start_reaper() / stop_reaper() - must be called only once
"""

import logging
import queue
import threading
import warnings
from typing import Final, List

from tests.integration_tests.instances import IntegrationInstance

LOG = logging.getLogger()


class Reaper:
def __init__(self, timeout: float = 30.0):
# self.timeout sets the amount of time to sleep before retrying
self.timeout = timeout
# self.wake_reaper tells the reaper to wake up.
#
# A lock is used for synchronization. This means that notify() will
# block if
# the reaper is currently awake.
#
# It is set by:
# - signal interrupt indicating cleanup
# - session completion indicating cleanup
# - reaped instance indicating work to be done
self.wake_reaper: Final[threading.Condition] = threading.Condition()

# self.exit_reaper tells the reaper loop to tear down, called once at
# end of tests
self.exit_reaper: Final[threading.Event] = threading.Event()

# List of instances which temporarily escaped death
# The primary porpose of the reaper is to coax these instance towards
# eventual demise and report their insubordination on shutdown.
self.undead_ledger: Final[List[IntegrationInstance]] = []

# Queue of newly reaped instances
self.reaped_instances: Final[queue.Queue[IntegrationInstance]] = (
queue.Queue()
)

# Thread object, handle used to re-join the thread
self.reaper_thread: threading.Thread

def reap(self, instance: IntegrationInstance):
"""reap() submits an instance to the reaper thread.

An instance that is passed to the reaper must not be used again. It may
not be dead yet, but it has no place among the living.
"""
LOG.info("Reaper: receiving %s", instance.instance.id)

self.reaped_instances.put(instance)
with self.wake_reaper:
self.wake_reaper.notify()
LOG.info("Reaper: awakened to reap")

def reaper_start(self):
"""Spawn the reaper background thread."""
LOG.info("Reaper: starting")
self.reaper_thread = threading.Thread(
target=self._reaper_loop, name="reaper"
)
self.reaper_thread.start()

def reaper_stop(self):
"""Stop the reaper background thread and wait for completion."""
LOG.info("Reaper: stopping")
self.exit_reaper.set()
with self.wake_reaper:
self.wake_reaper.notify()
LOG.info("Reaper: awakened to reap")
if self.reaper_thread:
self.reaper_thread.join()
LOG.info("Reaper: stopped")

def _destroy(self, instance: IntegrationInstance) -> bool:
"""destroy() destroys an instance and returns True on success."""
try:
LOG.info("Reaper: destroying %s", instance.instance.id)
instance.destroy()
return True
except Exception as e:
LOG.warning(
"Error while tearing down instance %s: %s ", instance, e
)
return False

def _reaper_loop(self) -> None:
"""reaper_loop() manages all instances that have been reaped

tasks:
- destroy newly reaped instances
- manage a ledger undead instances
- periodically attempt to kill undead instances
- die when instructed to
- ensure that every reaped instance is destroyed at least once before
reaper dies
"""
LOG.info("Reaper: exalted in life, to assist others in death")
while True:
# nap until woken or timeout
with self.wake_reaper:
self.wake_reaper.wait(timeout=self.timeout)
if self._do_reap():
break
LOG.info("Reaper: exited")

def _do_reap(self) -> bool:
"""_do_reap does a single pass of the reaper loop

return True if the loop should exit
"""

new_undead_instances: List[IntegrationInstance] = []

# first destroy all newly reaped instances
while not self.reaped_instances.empty():
instance = self.reaped_instances.get_nowait()
success = self._destroy(instance)
if not success:
LOG.warning(
"Reaper: failed to destroy %s",
instance.instance.id,
)
# failure to delete, add to the ledger
new_undead_instances.append(instance)
else:
LOG.info("Reaper: destroyed %s", instance.instance.id)

# every instance has tried at least once and the reaper has been
# instructed to tear down - so do it
if self.exit_reaper.is_set():
if not self.reaped_instances.empty():
# race: an instance was added to the queue after iteration
# completed. Destroy the latest instance.
self._update_undead_ledger(new_undead_instances)
return False
self._update_undead_ledger(new_undead_instances)
LOG.info("Reaper: exiting")
if self.undead_ledger:
# undead instances exist - unclean teardown
LOG.info(
"Reaper: the faults of incompetent abilities will be "
"consigned to oblivion, as myself must soon be to the "
"mansions of rest."
)
warnings.warn(f"Test instance(s) leaked: {self.undead_ledger}")
else:
LOG.info("Reaper: duties complete, my turn to rest")
return True

# attempt to destroy all instances which previously refused to
# destroy
for instance in self.undead_ledger:
if self._destroy(instance):
self.undead_ledger.remove(instance)
LOG.info("Reaper: destroyed %s (undead)", instance.instance.id)
self._update_undead_ledger(new_undead_instances)
return False

def _update_undead_ledger(
self, new_undead_instances: List[IntegrationInstance]
):
"""update the ledger with newly undead instances"""
if new_undead_instances:
if self.undead_ledger:
LOG.info(
"Reaper: instance(s) not ready to die %s, will now join "
"the ranks of the undead: %s",
new_undead_instances,
self.undead_ledger,
)
else:
LOG.info(
"Reaper: instance(s) not ready to die %s",
new_undead_instances,
)
self.undead_ledger.extend(new_undead_instances)
return False


reaper = Reaper()
Loading
Loading