From a90bdac162330620b3e09b3dcf6b6b2db12739e4 Mon Sep 17 00:00:00 2001 From: Benjamin Wohlwend Date: Wed, 26 May 2021 18:06:28 +0200 Subject: [PATCH 1/2] Ensure that metrics are flushed before shutting down To ensure that the flushed metrics are actually sent, we need to make sure that the transport thread is shut down last. --- elasticapm/base.py | 10 +++++----- elasticapm/metrics/base_metrics.py | 2 ++ elasticapm/transport/base.py | 2 ++ elasticapm/utils/threading.py | 1 + tests/metrics/base_tests.py | 18 ++++++++++++++++++ 5 files changed, 28 insertions(+), 5 deletions(-) diff --git a/elasticapm/base.py b/elasticapm/base.py index 31828815f..cc3264bb7 100644 --- a/elasticapm/base.py +++ b/elasticapm/base.py @@ -213,11 +213,11 @@ def __init__(self, config=None, **inline): set_client(self) def start_threads(self): - with self._thread_starter_lock: - current_pid = os.getpid() - if self._pid != current_pid: + current_pid = os.getpid() + if self._pid != current_pid: + with self._thread_starter_lock: self.logger.debug("Detected PID change from %r to %r, starting threads", self._pid, current_pid) - for manager_type, manager in self._thread_managers.items(): + for manager_type, manager in sorted(self._thread_managers.items(), key=lambda item: item[1].priority): self.logger.debug("Starting %s thread", manager_type) manager.start_thread(pid=current_pid) self._pid = current_pid @@ -303,7 +303,7 @@ def end_transaction(self, name=None, result="", duration=None): def close(self): if self.config.enabled: with self._thread_starter_lock: - for _, manager in self._thread_managers.items(): + for _, manager in sorted(self._thread_managers.items(), key=lambda item: item[1].priority): manager.stop_thread() global CLIENT_SINGLETON CLIENT_SINGLETON = None diff --git a/elasticapm/metrics/base_metrics.py b/elasticapm/metrics/base_metrics.py index fe6b15e58..393d4c8be 100644 --- a/elasticapm/metrics/base_metrics.py +++ b/elasticapm/metrics/base_metrics.py @@ -103,6 +103,8 @@ def stop_thread(self): logger.debug("Cancelling collect timer") self._collect_timer.cancel() self._collect_timer = None + # collect one last time + self.collect() @property def collect_interval(self): diff --git a/elasticapm/transport/base.py b/elasticapm/transport/base.py index 7af72bcf3..625519fe0 100644 --- a/elasticapm/transport/base.py +++ b/elasticapm/transport/base.py @@ -33,6 +33,7 @@ import gzip import os import random +import sys import threading import time import timeit @@ -86,6 +87,7 @@ def __init__( self._closed = False self._processors = processors if processors is not None else [] super(Transport, self).__init__() + self.priority = sys.maxsize # ensure that the transport thread is always started/stopped last @property def _max_flush_time(self): diff --git a/elasticapm/utils/threading.py b/elasticapm/utils/threading.py index fd5139591..65b288978 100644 --- a/elasticapm/utils/threading.py +++ b/elasticapm/utils/threading.py @@ -91,6 +91,7 @@ def cancel(self): class ThreadManager(object): def __init__(self): self.pid = None + self.priority = 100 def start_thread(self, pid=None): if not pid: diff --git a/tests/metrics/base_tests.py b/tests/metrics/base_tests.py index 948b56016..36e4b3223 100644 --- a/tests/metrics/base_tests.py +++ b/tests/metrics/base_tests.py @@ -125,6 +125,24 @@ def target(): assert metricset.counter("x").val == expected +@pytest.mark.parametrize("sending_elasticapm_client", [{"metrics_interval": "30s"}], indirect=True) +def test_metrics_flushed_on_shutdown(sending_elasticapm_client): + # this is ugly, we need an API for this at some point... + metricset = MetricsSet(sending_elasticapm_client._metrics) + sending_elasticapm_client._metrics._metricsets["foo"] = metricset + metricset.counter("x").inc() + sending_elasticapm_client.close() + assert sending_elasticapm_client.httpserver.payloads + for item in sending_elasticapm_client.httpserver.payloads[0]: + try: + assert item["metricset"]["samples"]["x"]["value"] == 1 + break + except KeyError: + pass + else: + assert False, "no item found with matching dict path metricset.samples.x.value" + + @mock.patch("elasticapm.metrics.base_metrics.DISTINCT_LABEL_LIMIT", 3) def test_metric_limit(caplog, elasticapm_client): m = MetricsSet(MetricsRegistry(elasticapm_client)) From 41cec53a680cb46f39459966d84747c7e9628a28 Mon Sep 17 00:00:00 2001 From: Benjamin Wohlwend Date: Fri, 28 May 2021 10:48:34 +0200 Subject: [PATCH 2/2] use a variable name that doesn't clash with the thread priority concept --- elasticapm/base.py | 6 ++++-- elasticapm/transport/base.py | 2 +- elasticapm/utils/threading.py | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/elasticapm/base.py b/elasticapm/base.py index cc3264bb7..cbf25f4a4 100644 --- a/elasticapm/base.py +++ b/elasticapm/base.py @@ -217,7 +217,9 @@ def start_threads(self): if self._pid != current_pid: with self._thread_starter_lock: self.logger.debug("Detected PID change from %r to %r, starting threads", self._pid, current_pid) - for manager_type, manager in sorted(self._thread_managers.items(), key=lambda item: item[1].priority): + for manager_type, manager in sorted( + self._thread_managers.items(), key=lambda item: item[1].start_stop_order + ): self.logger.debug("Starting %s thread", manager_type) manager.start_thread(pid=current_pid) self._pid = current_pid @@ -303,7 +305,7 @@ def end_transaction(self, name=None, result="", duration=None): def close(self): if self.config.enabled: with self._thread_starter_lock: - for _, manager in sorted(self._thread_managers.items(), key=lambda item: item[1].priority): + for _, manager in sorted(self._thread_managers.items(), key=lambda item: item[1].start_stop_order): manager.stop_thread() global CLIENT_SINGLETON CLIENT_SINGLETON = None diff --git a/elasticapm/transport/base.py b/elasticapm/transport/base.py index 625519fe0..ad5187a89 100644 --- a/elasticapm/transport/base.py +++ b/elasticapm/transport/base.py @@ -87,7 +87,7 @@ def __init__( self._closed = False self._processors = processors if processors is not None else [] super(Transport, self).__init__() - self.priority = sys.maxsize # ensure that the transport thread is always started/stopped last + self.start_stop_order = sys.maxsize # ensure that the transport thread is always started/stopped last @property def _max_flush_time(self): diff --git a/elasticapm/utils/threading.py b/elasticapm/utils/threading.py index 65b288978..a36b91fda 100644 --- a/elasticapm/utils/threading.py +++ b/elasticapm/utils/threading.py @@ -91,7 +91,7 @@ def cancel(self): class ThreadManager(object): def __init__(self): self.pid = None - self.priority = 100 + self.start_stop_order = 100 def start_thread(self, pid=None): if not pid: