Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example metric integration test #890

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
e3cb524
Karapace metrics
libretto Jun 9, 2023
32ce060
Merge branch 'master' into karapace-metrics
libretto Jun 9, 2023
8dab84d
fixup issues
libretto Jun 10, 2023
2898e31
fixup issues
libretto Jun 10, 2023
c974579
fixup annotations issue
libretto Jun 12, 2023
7256f5d
fixup exception message
libretto Jun 12, 2023
ab6ae96
get rid of multiple instances of class
libretto Jun 12, 2023
733d1f2
fixup issue
libretto Jun 12, 2023
8751eea
change code to send raw data only
libretto Jun 16, 2023
53d3e4b
merge with master
libretto Jun 16, 2023
fedff8f
fixup
libretto Jun 22, 2023
31d16d4
Merge branch 'master' into karapace-metrics
libretto Jun 22, 2023
b70ae03
fixup code
libretto Jun 22, 2023
a0387a3
fixup
libretto Jun 22, 2023
358facc
fixup
libretto Jun 22, 2023
a064624
merge
libretto Jul 3, 2023
8533959
improve code by request
libretto Aug 8, 2023
ac48829
merge with main
libretto Aug 8, 2023
90e221c
add psutil typing support
libretto Aug 8, 2023
4c48576
fixup
libretto Aug 8, 2023
f9cb6d8
fixup
libretto Aug 8, 2023
765864b
Merge branch 'main' into karapace-metrics
libretto Aug 30, 2023
073aa16
merge with master
libretto Sep 2, 2023
0c73a1a
refactor
libretto Sep 2, 2023
c495c50
fixup
libretto Sep 2, 2023
6fb96d0
prometheus support
libretto Sep 6, 2023
1f6fce2
fixup requirements
libretto Sep 14, 2023
569d5ef
merge with master
libretto Dec 20, 2023
0386ca9
remove connections counter
libretto Dec 20, 2023
5f302a8
fixup lint
libretto Dec 20, 2023
c785e1a
skip the README.rst updates
libretto Dec 21, 2023
5da8ca8
fixup metrics stats usage
libretto Jan 10, 2024
6f2093e
Merge branch 'master' into prometheus2
libretto Jan 10, 2024
03f1189
merge with master
libretto Jan 25, 2024
2c06480
merge with master and fixup conflict
libretto Feb 13, 2024
7e71ecb
merge with master
libretto Feb 21, 2024
c66b48f
merge with master
libretto Apr 3, 2024
80d92c0
add unit tests
libretto May 22, 2024
dafa3f2
pylint fixes
libretto May 22, 2024
c7e6796
merge with main branch
libretto May 23, 2024
a687a33
fixup test issues
libretto May 30, 2024
8da1e33
fixup prometheus client issue
libretto May 30, 2024
2b019c8
improve tests
libretto May 31, 2024
880a233
example setup docker image and query of the metrics
eliax1996 Jun 3, 2024
2ab87d7
Merge branch 'main' into example-metric-integration-test
eliax1996 Jun 4, 2024
f14df72
WIP
eliax1996 Jun 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions container/prometheus-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
global:
scrape_interval: 60s # How frequently to scrape targets by default.
scrape_timeout: 10s # How long until a scrape request times out.
evaluation_interval: 60s # How frequently to evaluate rules.

# A scrape configuration
scrape_configs:
- job_name: prometheus
honor_labels: true
honor_timestamps: true
scheme: http
scrape_interval: 60s
scrape_timeout: 55s
metrics_path: /metrics
static_configs:
- targets: ['localhost:9090']
9 changes: 8 additions & 1 deletion karapace.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,12 @@
"registry_authfile": null,
"topic_name": "_schemas",
"protobuf_runtime_directory": "runtime",
"session_timeout_ms": 10000
"session_timeout_ms": 10000,
"stats_service": "statsd",
"metrics_extended": true,
"statsd_host": "127.0.0.1",
"statsd_port": 8125,
"prometheus_host": "127.0.0.1",
"prometheus_port": 8005,

}
57 changes: 57 additions & 0 deletions karapace/base_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
karapace - basestats

Supports base class for statsd and prometheus protocols:

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from abc import ABC, abstractmethod
from contextlib import contextmanager
from karapace.config import Config
from karapace.sentry import get_sentry_client
from typing import Final, Iterator

import time


class StatsClient(ABC):
@abstractmethod
def __init__(
self,
config: Config,
) -> None:
self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None))

@contextmanager
def timing_manager(self, metric: str, tags: dict | None = None) -> Iterator[None]:
start_time = time.monotonic()
yield
self.timing(metric, time.monotonic() - start_time, tags)

@abstractmethod
def gauge(self, metric: str, value: float, tags: dict | None = None) -> None:
pass

@abstractmethod
def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None:
pass

@abstractmethod
def timing(self, metric: str, value: float, tags: dict | None = None) -> None:
pass

def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = None) -> None:
all_tags = {
"exception": ex.__class__.__name__,
"where": where,
}
all_tags.update(tags or {})
self.increase("exception", tags=all_tags)
scope_args = {**(tags or {}), "where": where}
self.sentry_client.unexpected_exception(error=ex, where=where, tags=scope_args)

def close(self) -> None:
self.sentry_client.close()
12 changes: 12 additions & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ class Config(TypedDict):
name_strategy_validation: bool
master_election_strategy: str
protobuf_runtime_directory: str
stats_service: str
metrics_extended: bool
statsd_host: str
statsd_port: int
prometheus_host: str | None
prometheus_port: int | None

sentry: NotRequired[Mapping[str, object]]
tags: NotRequired[Mapping[str, object]]
Expand Down Expand Up @@ -150,6 +156,12 @@ class ConfigDefaults(Config, total=False):
"name_strategy_validation": True,
"master_election_strategy": "lowest",
"protobuf_runtime_directory": "runtime",
"stats_service": "statsd",
"metrics_extended": True,
"statsd_host": "127.0.0.1",
"statsd_port": 8125,
"prometheus_host": "127.0.0.1",
"prometheus_port": 8005,
}
SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD]

Expand Down
103 changes: 103 additions & 0 deletions karapace/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""
karapace - metrics
Supports collection of system metrics
list of supported metrics:
connections-active - The number of active HTTP(S) connections to server.
Data collected inside aiohttp request handler.

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from karapace.base_stats import StatsClient
from karapace.config import Config
from karapace.prometheus import PrometheusClient
from karapace.statsd import StatsdClient

import threading


class MetricsException(Exception):
pass


class Singleton(type):
_instance: Singleton | None = None

def __call__(cls, *args: str, **kwargs: int) -> Singleton:
if cls._instance is None:
instance = super().__call__(*args, **kwargs)
cls._instance = instance
return cls._instance


class Metrics(metaclass=Singleton):
stats_client: StatsClient

def __init__(
self,
) -> None:
self.is_ready = False
self.lock = threading.Lock()
self.request_size_total = 0
self.request_count = 0

def setup(self, config: Config) -> None:
with self.lock:
if self.is_ready:
return

if not config.get("metrics_extended"):
return
stats_service = config.get("stats_service")
if stats_service == "statsd":
self.stats_client = StatsdClient(config=config)
elif stats_service == "prometheus":
self.stats_client = PrometheusClient(config=config)
else:
raise MetricsException('Config variable "stats_service" is not defined')
self.is_ready = True

def request(self, size: int) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.increase("request-size-total", size)
self.stats_client.increase("request-count", 1)

def response(self, size: int) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.increase("response-size-total", size)
self.stats_client.increase("response-count", 1)

def are_we_master(self, is_master: bool) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("master-slave-role", int(is_master))

def latency(self, latency_ms: float) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.timing("latency_ms", latency_ms)

def error(self) -> None:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.increase("error-total", 1)

def cleanup(self) -> None:
if self.stats_client:
self.stats_client.close()
if not self.is_ready:
return
124 changes: 124 additions & 0 deletions karapace/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""
karapace - prometheus

Supports telegraf's statsd protocol extension for 'key=value' tags:

https://github.com/influxdata/telegraf/tree/master/plugins/inputs/statsd

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from karapace.base_stats import StatsClient
from karapace.config import Config
from prometheus_client import Counter, Gauge, REGISTRY, Summary
from prometheus_client.exposition import make_wsgi_app
from socketserver import ThreadingMixIn
from typing import Final
from wsgiref.simple_server import make_server, WSGIRequestHandler, WSGIServer

import logging
import socket
import threading

LOG = logging.getLogger(__name__)
HOST: Final = "127.0.0.1"
PORT: Final = 8005


class PrometheusException(Exception):
pass


class ThreadingWSGIServer(ThreadingMixIn, WSGIServer):
"""Thread per request HTTP server."""

# Make worker threads "fire and forget". Beginning with Python 3.7 this
# prevents a memory leak because ``ThreadingMixIn`` starts to gather all
# non-daemon threads in a list in order to join on them at server close.
daemon_threads = True


class _SilentHandler(WSGIRequestHandler):
"""WSGI handler that does not log requests."""

# pylint: disable=W0622
def log_message(self, format, *args):
"""Log nothing."""


def get_family(address, port):
infos = socket.getaddrinfo(address, port)
family, _, _, _, sockaddr = next(iter(infos))
return family, sockaddr[0]


class PrometheusClient(StatsClient):
server_is_active: bool = False

def __init__(self, config: Config) -> None:
super().__init__(config)
self.lock = threading.Lock()
self.httpd = None
self.thread = None
with self.lock:
_host = config.get("prometheus_host", None)
_port = config.get("prometheus_port", None)
if _host is None:
raise PrometheusException("prometheus_host host is undefined")
if _port is None:
raise PrometheusException("prometheus_host port is undefined")
if not PrometheusClient.server_is_active:
# We wrapped httpd server creation from prometheus client to allow stop this server"""
self.start_server(_host, _port)

PrometheusClient.server_is_active = True
else:
raise PrometheusException("Double instance of Prometheus interface")
self._gauge: dict[str, Gauge] = dict()
self._summary: dict[str, Summary] = dict()
self._counter: dict[str, Counter] = dict()

def gauge(self, metric: str, value: float, tags: dict | None = None) -> None:
m = self._gauge.get(metric)
if m is None:
m = Gauge(metric, metric)
self._gauge[metric] = m
m.set(value)

def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None:
m = self._counter.get(metric)
if m is None:
m = Counter(metric, metric)
self._counter[metric] = m
m.inc(inc_value)

def timing(self, metric: str, value: float, tags: dict | None = None) -> None:
m = self._summary.get(metric)
if m is None:
m = Summary(metric, metric)
self._summary[metric] = m
m.observe(value)

def start_server(self, addr: str, port: int) -> None:
class TmpServer(ThreadingWSGIServer):
pass

TmpServer.address_family, addr = get_family(addr, port)
app = make_wsgi_app(REGISTRY)
self.httpd = make_server(addr, port, app, TmpServer, handler_class=_SilentHandler)
self.thread = threading.Thread(target=self.httpd.serve_forever)
self.thread.daemon = True
self.thread.start()

def stop_server(self) -> None:
self.httpd.shutdown()
self.httpd.server_close()
self.thread.join()

def close(self):
with self.lock:
if self.server_is_active:
self.stop_server()
PrometheusClient.server_is_active = False
Loading
Loading