diff --git a/apps/backend/constants.py b/apps/backend/constants.py index 5a3305cac..e923d4b76 100644 --- a/apps/backend/constants.py +++ b/apps/backend/constants.py @@ -160,3 +160,7 @@ def needs_batch_request(self) -> bool: return False return True + + +DEFAULT_ALIVE_TIME = 30 +DEFAULT_CLEAN_RECORD_LIMIT = 5000 diff --git a/apps/backend/periodic_tasks/__init__.py b/apps/backend/periodic_tasks/__init__.py index d55003dcc..2e55ed9e6 100644 --- a/apps/backend/periodic_tasks/__init__.py +++ b/apps/backend/periodic_tasks/__init__.py @@ -14,5 +14,8 @@ from .calculate_statistics import calculate_statistics # noqa from .check_zombie_sub_inst_record import check_zombie_sub_inst_record # noqa from .clean_subscription_data import clean_subscription_data # noqa +from .clean_subscription_instance_record_data import ( # noqa + clean_subscription_instance_record_data, +) from .collect_auto_trigger_job import collect_auto_trigger_job # noqa from .update_subscription_instances import update_subscription_instances # noqa diff --git a/apps/backend/periodic_tasks/clean_subscription_data.py b/apps/backend/periodic_tasks/clean_subscription_data.py index b1b585861..6bb7e8ac6 100644 --- a/apps/backend/periodic_tasks/clean_subscription_data.py +++ b/apps/backend/periodic_tasks/clean_subscription_data.py @@ -15,12 +15,11 @@ from django.db import connection from django.utils import timezone +from apps.backend.constants import DEFAULT_ALIVE_TIME, DEFAULT_CLEAN_RECORD_LIMIT from apps.node_man import constants, models from apps.utils.time_handler import strftime_local from common.log import logger -DEFAULT_ALIVE_TIME = 30 -DEFAULT_CLEAN_RECORD_LIMIT = 5000 SUBSCRIPTION_INSTANCE_DETAIL_TABLE = "node_man_subscriptioninstancestatusdetail" JOB_SUB_INSTANCE_MAP_TABLE = "node_man_jobsubscriptioninstancemap" diff --git a/apps/backend/periodic_tasks/clean_subscription_instance_record_data.py b/apps/backend/periodic_tasks/clean_subscription_instance_record_data.py new file mode 100644 index 000000000..9c821525f --- /dev/null +++ b/apps/backend/periodic_tasks/clean_subscription_instance_record_data.py @@ -0,0 +1,189 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from typing import Dict, Union + +from celery.schedules import crontab +from celery.task import periodic_task +from django.db import connection +from django.utils import timezone + +from apps.backend.constants import DEFAULT_ALIVE_TIME, DEFAULT_CLEAN_RECORD_LIMIT +from apps.node_man import models +from apps.prometheus import metrics +from apps.prometheus.helper import observe +from apps.utils.time_handler import strftime_local +from common.log import logger + +SUBSCRIPTION_INSTANCE_RECORD_TABLE = "node_man_subscriptioninstancerecord" +SUBSCRIPTION_TASK_TABLE = "node_man_subscriptiontask" +PIPELINE_TREE_TABLE = "node_man_pipelinetree" + + +@periodic_task( + queue="default", + options={"queue": "default"}, + run_every=crontab(minute="*/5"), +) +def clean_subscription_instance_record_data(): + # 清理数据开关以及相关配置 + enable_clean_subscription_data, limit, alive_days = get_configuration() + + if not enable_clean_subscription_data: + logger.info("clean_subscription_data is not enable, delete subscription data will be skipped") + return + + logger.info( + f"periodic_task -> clean_subscription_instance_record_data, " + f"start to clean subscription instance record and pipeline tree data," + f" alive_days: {alive_days}, limit: {limit}" + ) + + with connection.cursor() as cursor: + policy_subscription_ids = get_policy_subscription_ids(cursor) + + need_clean_task_ids = get_need_clean_task_ids(cursor, policy_subscription_ids, alive_days, limit) + + if not need_clean_task_ids: + logger.info( + "need clean instance records_ids is empty, " "delete subscription instance record data will be skipped" + ) + return + + need_clean_pipeline_tree_ids = get_need_clean_pipeline_tree_ids(cursor, need_clean_task_ids) + + # 删除 subscription instance record 表数据 + delete_subscription_instance_record_sql = generate_delete_sql( + SUBSCRIPTION_INSTANCE_RECORD_TABLE, + {"field": "subscription_id", "value": policy_subscription_ids, "reverse": True}, + alive_days, + {"field": "is_latest", "value": 0}, + limit, + ) + delete_data( + cursor, + SUBSCRIPTION_INSTANCE_RECORD_TABLE, + delete_subscription_instance_record_sql, + need_clean_task_ids, + ) + + # 删除 subscription task 表数据 + delete_subscription_task_sql = generate_delete_sql( + SUBSCRIPTION_TASK_TABLE, + {"field": "id", "value": need_clean_task_ids}, + ) + delete_data( + cursor, + SUBSCRIPTION_TASK_TABLE, + delete_subscription_task_sql, + need_clean_task_ids, + ) + + if need_clean_pipeline_tree_ids: + # 删除 pipeline tree 表数据 + delete_pipeline_tree_sql = generate_delete_sql( + PIPELINE_TREE_TABLE, + {"field": "id", "value": need_clean_pipeline_tree_ids}, + ) + delete_data(cursor, PIPELINE_TREE_TABLE, delete_pipeline_tree_sql, need_clean_pipeline_tree_ids) + + +def log_sql(sql_str: str): + logger.info( + f"periodic_task -> clean_subscription_instance_record_data, time -> {strftime_local(timezone.now())}, " + f"start to execute sql -> [{sql_str}]" + ) + + +def generate_scope_query(fields_name: str, scopes: list) -> str: + if not scopes: + return "" + + if len(scopes) == 1: + return f"{fields_name}='{scopes[0]}'" + + return f"{fields_name} in {tuple(scopes)}" + + +def get_configuration(): + """获取 settings 配置数据""" + clean_subscription_data_map: Dict[str, Union[int, str, bool]] = ( + models.GlobalSettings.get_config(models.GlobalSettings.KeyEnum.CLEAN_SUBSCRIPTION_DATA_MAP.value) or {} + ) + enable_clean_subscription_data: bool = clean_subscription_data_map.get("enable_clean_subscription_data", True) + limit: int = clean_subscription_data_map.get("limit", DEFAULT_CLEAN_RECORD_LIMIT) + alive_days: int = clean_subscription_data_map.get("alive_days", DEFAULT_ALIVE_TIME) + return enable_clean_subscription_data, limit, alive_days + + +def get_policy_subscription_ids(cursor): + """获取策略类型任务""" + select_policy_subscription_sql: str = ( + f"SELECT id FROM node_man_subscription " f"WHERE category = '{models.Subscription.CategoryType.POLICY}';" + ) + cursor.execute(select_policy_subscription_sql) + return [row[0] for row in cursor.fetchall()] + + +def get_need_clean_task_ids(cursor, policy_subscription_ids: list, alive_days, limit): + """获取需要清理的 instance_record_ids""" + if policy_subscription_ids: + select_clean_records_sql: str = ( + f"SELECT DISTINCT task_id FROM {SUBSCRIPTION_INSTANCE_RECORD_TABLE} " + f"WHERE NOT({generate_scope_query('subscription_id', policy_subscription_ids)}) " + f"AND create_time < DATE_SUB(NOW(), INTERVAL {alive_days} DAY) AND is_latest = 0 LIMIT {limit};" + ) + else: + select_clean_records_sql: str = ( + f"SELECT DISTINCT task_id FROM {SUBSCRIPTION_INSTANCE_RECORD_TABLE} " + f"WHERE create_time < DATE_SUB(NOW(), INTERVAL {alive_days} DAY) AND is_latest = 0 LIMIT {limit};" + ) + cursor.execute(select_clean_records_sql) + return [row[0] for row in cursor.fetchall()] + + +def get_need_clean_pipeline_tree_ids(cursor, need_clean_task_ids: list): + """获取需要清理的 pipeline_tree_ids""" + select_pipeline_ids_sql: str = ( + f"SELECT pipeline_id from {SUBSCRIPTION_TASK_TABLE} " + f"WHERE {generate_scope_query('id', need_clean_task_ids)} " + f"AND pipeline_id <> '';" + ) + cursor.execute(select_pipeline_ids_sql) + return [row[0] for row in cursor.fetchall()] + + +def generate_delete_sql( + table_name: str, + scope: Dict[str, Union[str, bool, list]], + time_cond: int = None, + additional_cond: dict = None, + limit: int = None, +): + """生成删除语句""" + scope_query: str = generate_scope_query(scope["field"], scope["value"]) if scope else "" + if scope_query and scope.get("reverse"): + scope_query: str = f"NOT({scope_query})" + + time_query: str = f" AND create_time < DATE_SUB(NOW(), INTERVAL {time_cond} DAY)" if time_cond else "" + additional_query: str = f" AND {additional_cond['field']} = {additional_cond['value']}" if additional_cond else "" + limit_query: str = f" LIMIT {limit}" if limit else "" + + delete_sql: str = f"DELETE FROM {table_name} WHERE {scope_query}{time_query}{additional_query}{limit_query};" + return delete_sql + + +def delete_data(cursor, table_name: str, sql: str, ids: list): + """删除数据并记录metrics""" + log_sql(sql) + with observe(metrics.app_clean_subscription_instance_records_seconds, sql=sql): + cursor.execute(sql) + metrics.app_clean_subscription_instance_records_total.labels(table_name=table_name).inc(len(ids)) diff --git a/apps/backend/tests/periodic_tasks/test_clean_subscription_instance_record_data.py b/apps/backend/tests/periodic_tasks/test_clean_subscription_instance_record_data.py new file mode 100644 index 000000000..657087e46 --- /dev/null +++ b/apps/backend/tests/periodic_tasks/test_clean_subscription_instance_record_data.py @@ -0,0 +1,191 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from datetime import timedelta +from unittest.mock import patch + +from django.utils import timezone + +from apps.backend.periodic_tasks import clean_subscription_instance_record_data +from apps.node_man import models +from apps.utils.unittest.testcase import CustomBaseTestCase + + +class SubscriptionInstanceRecordCleanTestCase(CustomBaseTestCase): + def setUp(self): + super().setUp() + self.init_db() + + @classmethod + def init_db(cls): + cls.create_subscription_data() + pipeline_ids = cls.create_subscription_task_data() + cls.create_node_man_pipelinetree_data(pipeline_ids) + cls.create_subscription_instance_record_data() + + @classmethod + def create_subscription_data(cls): + models.Subscription.objects.create( + id=2, + name="policy_subscription", + object_type=models.Subscription.ObjectType.HOST, + node_type=models.Subscription.NodeType.INSTANCE, + from_system="blueking", + creator="admin", + category=models.Subscription.CategoryType.POLICY, + ) + + @classmethod + def create_subscription_task_data(cls): + subscription_tasks = list() + + default_clean_pipeline_id = "e785e7129bda4ab49b451ed45d93ec32" + policy_pipeline_id = "4411e2d3958d46ce8e8268513c9c9f50" + latest_pipeline_id = "1acf69efba324e4ca97bf90d7a93c6ec" + config_clean_pipeline_id = "62c321effcdd4e2d85408f9eb6ad05c8" + + subscription_tasks.append( + models.SubscriptionTask( + id=1, + subscription_id=1, + scope={}, + actions={}, + pipeline_id=default_clean_pipeline_id, + ) + ) + subscription_tasks.append( + models.SubscriptionTask( + id=2, + subscription_id=2, + scope={}, + actions={}, + pipeline_id=policy_pipeline_id, + ) + ) + subscription_tasks.append( + models.SubscriptionTask( + id=3, + subscription_id=3, + scope={}, + actions={}, + pipeline_id=latest_pipeline_id, + ) + ) + subscription_tasks.append( + models.SubscriptionTask( + id=4, + subscription_id=4, + scope={}, + actions={}, + pipeline_id=config_clean_pipeline_id, + ) + ) + + models.SubscriptionTask.objects.bulk_create(subscription_tasks) + + return [default_clean_pipeline_id, policy_pipeline_id, latest_pipeline_id, config_clean_pipeline_id] + + @classmethod + def create_subscription_instance_record_data(cls): + with patch("django.utils.timezone.now", return_value=cls.generate_dateTime_field_value(40)): + subscription_instance_records = list() + + subscription_instance_records.append( + models.SubscriptionInstanceRecord( + id=1, + task_id=1, + subscription_id=1, + instance_id="", + instance_info={}, + steps={}, + is_latest=False, + ) + ) + subscription_instance_records.append( + models.SubscriptionInstanceRecord( + id=2, + task_id=2, + subscription_id=2, + instance_id="", + instance_info={}, + steps={}, + is_latest=False, + ) + ) + subscription_instance_records.append( + models.SubscriptionInstanceRecord( + id=3, + task_id=3, + subscription_id=3, + instance_id="", + instance_info={}, + steps={}, + is_latest=True, + ) + ) + + models.SubscriptionInstanceRecord.objects.bulk_create(subscription_instance_records) + + with patch("django.utils.timezone.now", return_value=cls.generate_dateTime_field_value(20)): + models.SubscriptionInstanceRecord.objects.create( + id=4, + task_id=4, + subscription_id=4, + instance_id="", + instance_info={}, + steps={}, + is_latest=False, + ) + + @classmethod + def create_node_man_pipelinetree_data(cls, pipeline_ids): + pipeline_trees = list() + + for pipeline_id in pipeline_ids: + pipeline_trees.append( + models.PipelineTree( + id=pipeline_id, + tree={}, + ) + ) + + models.PipelineTree.objects.bulk_create(pipeline_trees) + + @classmethod + def generate_dateTime_field_value(cls, days): + delta = timedelta(days=days) + now = timezone.now() + return now - delta + + def test_sub_instance_record(self): + self.assertEqual(models.SubscriptionTask.objects.count(), 4) + self.assertEqual(models.SubscriptionInstanceRecord.objects.count(), 4) + self.assertEqual(models.PipelineTree.objects.count(), 4) + + clean_subscription_instance_record_data() + + self.assertEqual(models.SubscriptionTask.objects.count(), 3) + self.assertEqual(models.SubscriptionInstanceRecord.objects.count(), 3) + self.assertEqual(models.PipelineTree.objects.count(), 3) + + # 手动配置 + sub_clean_map = { + "enable_clean_subscription_data": True, + "alive_days": 10, + "limit": 10, + } + + models.GlobalSettings.set_config(models.GlobalSettings.KeyEnum.CLEAN_SUBSCRIPTION_DATA_MAP.value, sub_clean_map) + + clean_subscription_instance_record_data() + + self.assertEqual(models.SubscriptionTask.objects.count(), 2) + self.assertEqual(models.SubscriptionInstanceRecord.objects.count(), 2) + self.assertEqual(models.PipelineTree.objects.count(), 2) diff --git a/apps/prometheus/metrics.py b/apps/prometheus/metrics.py index 021f4cc9c..4deab23ab 100644 --- a/apps/prometheus/metrics.py +++ b/apps/prometheus/metrics.py @@ -269,3 +269,16 @@ def get_histogram_buckets_from_env(env_name): documentation="Cumulative count of resource watch biz events per bk_biz_id.", labelnames=["bk_biz_id"], ) + +app_clean_subscription_instance_records_total = Counter( + name="app_clean_subscription_instance_records_total", + documentation="Cumulative count of clean subscription instance records delete data per table.", + labelnames=["table_name"], +) + +app_clean_subscription_instance_records_seconds = Histogram( + name="app_clean_subscription_instance_records_seconds", + documentation="Histogram of clean subscription instance records per sql", + buckets=get_histogram_buckets_from_env("BKAPP_MONITOR_METRICS_CLEAN_BUCKETS"), + labelnames=["sql"], +)