From a902ba99eb340567b53f74a9e22c416d1e7a67fa Mon Sep 17 00:00:00 2001 From: Ori Hoch Date: Sun, 28 Apr 2024 13:26:34 +0300 Subject: [PATCH] add kafka streamer --- cwm_worker_operator/cli.py | 11 +- cwm_worker_operator/common.py | 9 ++ cwm_worker_operator/config.py | 14 ++ cwm_worker_operator/deployments_manager.py | 1 - cwm_worker_operator/domains_config.py | 12 ++ cwm_worker_operator/kafka_streamer.py | 143 +++++++++++++++++++++ requirements.txt | 1 + 7 files changed, 189 insertions(+), 2 deletions(-) create mode 100644 cwm_worker_operator/kafka_streamer.py diff --git a/cwm_worker_operator/cli.py b/cwm_worker_operator/cli.py index 9ae12c3..2878f34 100644 --- a/cwm_worker_operator/cli.py +++ b/cwm_worker_operator/cli.py @@ -69,6 +69,14 @@ def _callback(*args, **kwargs): ], 'help': 'Check and update a single worker, used by workers-checker to run async operations'} }}, {'name': 'throttler'}, + { + 'name': 'kafka_streamer', + 'extra_params': [ + click.Option(['--topic']), + click.Option(['--no-kafka-commit'], is_flag=True), + click.Option(['--no-kafka-delete'], is_flag=True), + ] + }, ]: try: daemon_module = importlib.import_module('cwm_worker_operator.{}'.format(daemon['name'].replace('-', '_'))) @@ -81,7 +89,8 @@ def _callback(*args, **kwargs): name='start_daemon', callback=daemon_module.start_daemon, params=[ - *([click.Option(['--once'], is_flag=True)] if daemon.get('with_once') != False else []) + *([click.Option(['--once'], is_flag=True)] if daemon.get('with_once') != False else []), + *(daemon['extra_params'] if 'extra_params' in daemon else []), ] ), **{ diff --git a/cwm_worker_operator/common.py b/cwm_worker_operator/common.py index 741497c..9f65ff9 100644 --- a/cwm_worker_operator/common.py +++ b/cwm_worker_operator/common.py @@ -66,6 +66,14 @@ def get_worker_id_from_namespace_name(namespace_name): return worker_id +@lru_cache(maxsize=9999) +def get_namespace_name_from_bucket_name(bucket_name): + if is_worker_namespace(bucket_name): + return bucket_name + else: + return None + + def is_worker_namespace(namespace_name): return ( namespace_name.startswith('cwm-worker-') @@ -75,6 +83,7 @@ def is_worker_namespace(namespace_name): ] ) + def is_hostnames_match(full_hostname, partial_hostname): if full_hostname.lower() == partial_hostname.lower(): return True diff --git a/cwm_worker_operator/config.py b/cwm_worker_operator/config.py index 75be71e..afb1b0f 100644 --- a/cwm_worker_operator/config.py +++ b/cwm_worker_operator/config.py @@ -154,3 +154,17 @@ MINIO_TENANT_ENDPOINT = os.environ.get('MINIO_TENANT_ENDPOINT') MINIO_TENANT_ADMIN_USER = os.environ.get('MINIO_TENANT_ADMIN_USER') MINIO_TENANT_ADMIN_PASSWORD = os.environ.get('MINIO_TENANT_ADMIN_PASSWORD') + +KAFKA_STREAMER_BOOTSTRAP_SERVERS = os.environ.get('KAFKA_BOOTSTRAP_SERVERS') +KAFKA_STREAMER_TOPIC = os.environ.get('KAFKA_STREAMER_TOPIC') or 'minio-tenant-main-audit-logs' +KAFKA_STREAMER_POD_NAMESPACE = os.environ.get('KAFKA_MINIO_AUDIT_POD_NAMESPACE') or 'strimzi' +KAFKA_STREAMER_POD_NAME = os.environ.get('KAFKA_MINIO_AUDIT_POD_NAME') or 'minio-audit-kafka-0' +KAFKA_STREAMER_OPERATOR_GROUP_ID = os.environ.get('KAFKA_STREAMER_OPERATOR_GROUP_ID') or f'cwm_worker_operator_{KAFKA_STREAMER_TOPIC}' +KAFKA_STREAMER_CONSUMER_CONFIG = json.loads(os.environ.get('KAFKA_STREAMER_CONSUMER_CONFIG_JSON') or '''{ + "auto.offset.reset": "earliest", + "enable.auto.commit": false, + "api.version.request.timeout.ms": 25000 +}''') +KAFKA_STREAMER_POLL_TIME_SECONDS = int(os.environ.get('KAFKA_STREAMER_POLL_TIME_SECONDS') or '60') +KAFKA_STREAMER_CONSUMER_POLL_TIMEOUT_SECONDS = float(os.environ.get('KAFKA_STREAMER_CONSUMER_POLL_TIMEOUT_SECONDS') or '1.0') +KAFKA_STREAMER_SLEEP_TIME_BETWEEN_ITERATIONS_SECONDS = int(os.environ.get('KAFKA_STREAMER_SLEEP_TIME_BETWEEN_ITERATIONS_SECONDS') or '1') diff --git a/cwm_worker_operator/deployments_manager.py b/cwm_worker_operator/deployments_manager.py index 402f8ed..324ac67 100644 --- a/cwm_worker_operator/deployments_manager.py +++ b/cwm_worker_operator/deployments_manager.py @@ -275,7 +275,6 @@ def deploy_preprocess_specs(self, specs): res[key] = True return res - def deploy_minio(self, deployment_config, dry_run=False): username = deployment_config['cwm-worker-deployment']['namespace'] password = deployment_config['minio']['access_key'] diff --git a/cwm_worker_operator/domains_config.py b/cwm_worker_operator/domains_config.py index 80c1e0b..59eee22 100644 --- a/cwm_worker_operator/domains_config.py +++ b/cwm_worker_operator/domains_config.py @@ -756,6 +756,15 @@ def get_deployment_api_metrics(self, namespace_name): for key in r.keys(base_key + "*") } + def update_deployment_api_metrics(self, namespace_name, data): + from .kafka_streamer import DEPLOYMENT_API_METRICS_BASE_DATA + assert data.keys() == DEPLOYMENT_API_METRICS_BASE_DATA.keys() + base_key = "{}:".format(self.keys.deployment_api_metric._(namespace_name)) + with self.keys.deployment_api_metric.get_redis() as r: + for metric_key, value in data.items(): + key = f'{base_key}{metric_key}' + r.incrby(key, value) + def set_worker_aggregated_metrics(self, worker_id, agg_metrics): self.keys.worker_aggregated_metrics.set(worker_id, json.dumps(agg_metrics)) @@ -786,6 +795,9 @@ def get_deployment_last_action(self, namespace_name): latest_value = value return latest_value if latest_value else None + def set_deployment_last_action(self, namespace_name): + self.keys.deployment_last_action.set(namespace_name, common.now().strftime("%Y%m%dT%H%M%S")) + def get_key_summary_single_multi_domain(self, r, key_name, key, max_keys_per_summary, is_api=False): if isinstance(key, DomainsConfigKeyStatic): match = key._() diff --git a/cwm_worker_operator/kafka_streamer.py b/cwm_worker_operator/kafka_streamer.py new file mode 100644 index 0000000..b7a2851 --- /dev/null +++ b/cwm_worker_operator/kafka_streamer.py @@ -0,0 +1,143 @@ +""" +Streams / aggregates data from a Kafka topic +This daemon can run multiple instances in parallel, each instance handling a different topic. +""" +import os +import json +import subprocess +from textwrap import dedent + +from confluent_kafka import Consumer + +from cwm_worker_operator.daemon import Daemon +from cwm_worker_operator import config, common, logs +from cwm_worker_operator.domains_config import DomainsConfig + + +MINIO_TENANT_MAIN_AUDIT_LOGS_TOPIC = 'minio-tenant-main-audit-logs' +DEPLOYMENT_API_METRICS_BASE_DATA = { + 'bytes_in': 0, + 'bytes_out': 0, + 'num_requests_in': 0, + 'num_requests_out': 0, + 'num_requests_misc': 0, +} + + +def get_request_type(name): + if name in ['WebUpload', 'PutObject', 'DeleteObject']: + return 'in' + elif name in ['WebDownload', 'GetObject']: + return 'out' + else: + return 'misc' + + +def process_minio_tenant_main_audit_logs(data, agg_data): + data_api = data.get('api', {}) + bucket = data_api.get('bucket') or None + if bucket: + namespace_name = common.get_namespace_name_from_bucket_name(bucket) + if namespace_name: + if namespace_name not in agg_data: + logs.debug(f"process_minio_tenant_main_audit_logs: {namespace_name}", 8) + agg_data[namespace_name] = DEPLOYMENT_API_METRICS_BASE_DATA.copy() + logs.debug('process_minio_tenant_main_audit_logs', 10, data_api=data_api) + tx = data_api.get('tx') or 0 + rx = data_api.get('rx') or 0 + agg_data[namespace_name][f'bytes_in'] += rx + agg_data[namespace_name][f'bytes_out'] += tx + request_type = get_request_type(data_api.get('name')) + agg_data[namespace_name][f'num_requests_{request_type}'] += 1 + + +def commit_minio_tenant_main_audit_logs(domains_config, agg_data): + logs.debug(f"commit_minio_tenant_main_audit_logs: {agg_data}", 8) + for namespace_name, data in agg_data.items(): + domains_config.update_deployment_api_metrics(namespace_name, data) + domains_config.set_deployment_last_action(namespace_name) + + +def process_data(topic, data, agg_data): + if topic == MINIO_TENANT_MAIN_AUDIT_LOGS_TOPIC: + process_minio_tenant_main_audit_logs(data, agg_data) + else: + raise NotImplementedError(f"topic {topic} is not supported") + + +def commit(topic, consumer, domains_config, agg_data, no_kafka_commit=False): + if topic == MINIO_TENANT_MAIN_AUDIT_LOGS_TOPIC: + commit_minio_tenant_main_audit_logs(domains_config, agg_data) + else: + raise NotImplementedError(f"topic {topic} is not supported") + if not no_kafka_commit: + consumer.commit() + + +def delete_records(topic, latest_partition_offset): + partitions = [ + {'topic': topic, 'partition': p, 'offset': o} + for p, o in latest_partition_offset.items() + ] + if len(partitions) > 0: + offset_json = json.dumps({'partitions': partitions, 'version': 1}) + logs.debug(f"Deleting records: {offset_json}", 8) + subprocess.check_call([ + 'kubectl', 'exec', '-n', config.KAFKA_STREAMER_POD_NAMESPACE, config.KAFKA_STREAMER_POD_NAME, '--', 'bash', '-c', dedent(f''' + TMPFILE=$(mktemp) &&\ + echo '{offset_json}' > $TMPFILE &&\ + bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file $TMPFILE &&\ + rm $TMPFILE + ''').strip() + ], env={**os.environ, 'DEBUG': ''}) + + +def run_single_iteration(domains_config: DomainsConfig, topic=None, no_kafka_commit=False, no_kafka_delete=False, **_): + start_time = common.now() + if not topic: + topic = config.KAFKA_STREAMER_TOPIC + assert topic, "topic is required" + logs.debug(f"running iteration for topic: {topic}", 8) + consumer = Consumer({ + 'bootstrap.servers': config.KAFKA_STREAMER_BOOTSTRAP_SERVERS, + 'group.id': config.KAFKA_STREAMER_OPERATOR_GROUP_ID, + **config.KAFKA_STREAMER_CONSUMER_CONFIG + }) + consumer.subscribe([topic]) + latest_partition_offset = {} + try: + agg_data = {} + while (common.now() - start_time).total_seconds() < config.KAFKA_STREAMER_POLL_TIME_SECONDS: + msg = consumer.poll(timeout=config.KAFKA_STREAMER_CONSUMER_POLL_TIMEOUT_SECONDS) + if msg is None: + # logs.debug("Waiting for messages...", 10) + pass + elif msg.error(): + raise Exception(f"Message ERROR: {msg.error()}") + else: + offset = msg.offset() + partition = msg.partition() + latest_partition_offset[partition] = offset + data = json.loads(msg.value()) + process_data(topic, data, agg_data) + commit(topic, consumer, domains_config, agg_data, no_kafka_commit=no_kafka_commit) + except KeyboardInterrupt: + pass + finally: + consumer.close() + if not no_kafka_delete: + delete_records(topic, latest_partition_offset) + + +def start_daemon(once=False, domains_config=None, topic=None, no_kafka_commit=False, no_kafka_delete=False): + assert topic + Daemon( + name=f"kafka_streamer_{topic}", + sleep_time_between_iterations_seconds=config.KAFKA_STREAMER_SLEEP_TIME_BETWEEN_ITERATIONS_SECONDS, + domains_config=domains_config, + run_single_iteration_callback=run_single_iteration, + run_single_iteration_extra_kwargs={'topic': topic, 'no_kafka_commit': no_kafka_commit, 'no_kafka_delete': no_kafka_delete}, + ).start( + once=once, + with_prometheus=False, + ) diff --git a/requirements.txt b/requirements.txt index 059a5cf..43dda05 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ uvicorn[standard]==0.27.0.post1 fastapi==0.109.1 gunicorn==21.2.0 minio==7.2.3 +confluent-kafka==2.3.0