Skip to content

Commit

Permalink
feature: 订阅并发场景下元数据查询优化 (closed TencentBlueKing#2034)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhuoZhuoCrayon committed Dec 26, 2023
1 parent 35d1ad1 commit 1a83749
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 39 deletions.
3 changes: 2 additions & 1 deletion apps/backend/components/collections/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,8 @@ def execute(self, data, parent_data):
]
models.SubscriptionInstanceStatusDetail.objects.bulk_create(to_be_created_sub_statuses)

self.set_current_id(subscription_instance_ids)
# TODO 鸡肋逻辑,待确认干掉无影响后完全删除
# self.set_current_id(subscription_instance_ids)
return self.run(self._execute, data, parent_data, common_data=common_data)

@translation.RespectsLanguage(get_language_func=get_language_func)
Expand Down
4 changes: 2 additions & 2 deletions apps/backend/subscription/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def task_result(
instance_status_list = []
else:
instance_status_list = task_tools.TaskResultTools.list_subscription_task_instance_status(
instance_records, need_detail=need_detail
instance_records=instance_records, need_detail=need_detail
)

# 兼容第三方平台全部拉取,无需返回状态统计
Expand Down Expand Up @@ -409,7 +409,7 @@ def task_result_detail(self, instance_id: str, task_id_list: List[int] = None) -
return instance_status

instance_status_list = task_tools.TaskResultTools.list_subscription_task_instance_status(
[instance_record], need_detail=True
instance_records=[instance_record], need_detail=True
)
if not instance_status_list:
raise errors.SubscriptionInstanceRecordNotExist()
Expand Down
158 changes: 127 additions & 31 deletions apps/backend/subscription/steps/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
"""
import copy
import logging
import typing
from collections import ChainMap, OrderedDict, defaultdict
from typing import Any, Dict, List, Union

from django.db.models import Max, Subquery
from django.db.models import Max, Subquery, Value
from django.utils.translation import ugettext as _
from rest_framework import exceptions, serializers

Expand Down Expand Up @@ -166,7 +167,7 @@ def config_tmpl_obj_gby_os_key(self) -> Dict[str, List[models.PluginConfigTempla
for os_key in os_key_gby_config_tmpl_id[config_template.id]:
config_tmpl_obj_gby_os_key[os_key].append(config_template)

logger.info(f"{self.log_prefix} config_tmpl_obj_gby_os_key -> {config_tmpl_obj_gby_os_key}")
# logger.info(f"{self.log_prefix} config_tmpl_obj_gby_os_key -> {config_tmpl_obj_gby_os_key}")
setattr(self, "_config_tmpl_obj_gby_os_key", config_tmpl_obj_gby_os_key)
return self._config_tmpl_obj_gby_os_key

Expand All @@ -185,7 +186,7 @@ def os_key_pkg_map(self) -> Dict[str, models.Packages]:
)
)

logger.info(f"{self.log_prefix} os_key_pkg_map -> {os_cpu_pkg_map}")
# logger.info(f"{self.log_prefix} os_key_pkg_map -> {os_cpu_pkg_map}")
setattr(self, "_os_key_pkg_map", os_cpu_pkg_map)
return self._os_key_pkg_map

Expand All @@ -201,26 +202,12 @@ def os_key_params_map(self) -> Dict[str, Dict[str, Any]]:
setattr(self, "_os_key_params_map", os_cpu_params_map)
return self._os_key_params_map

def format2policy_config(self, original_config: Dict):
try:
format_result = self.validated_data(data=original_config, serializer=PolicyStepConfigSerializer)
except exceptions.ValidationError:
pass
else:
return format_result

validated_config = self.validated_data(data=original_config, serializer=PluginStepConfigSerializer)
plugin_name = validated_config["plugin_name"]
plugin_version = validated_config["plugin_version"]

try:
plugin_desc = models.GsePluginDesc.objects.get(name=plugin_name)
except models.GsePluginDesc.DoesNotExist:
raise errors.PluginValidationError(msg="插件 [{name}] 信息不存在".format(name=self.plugin_name))

def format2policy_packages_old(
self, plugin_id: int, plugin_name: str, plugin_version: str, config_templates: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
latest_flag: str = "latest"
is_tag: bool = Tag.objects.filter(
target_id=plugin_desc.id, name=latest_flag, target_type=TargetType.PLUGIN.value
target_id=plugin_id, name=latest_flag, target_type=TargetType.PLUGIN.value
).exists()

if plugin_version != latest_flag or is_tag:
Expand All @@ -239,7 +226,7 @@ def format2policy_config(self, original_config: Dict):

latest_packages_version_set = set(packages.values_list("version", flat=True))
os_cpu__config_templates_map = defaultdict(list)
for template in validated_config["config_templates"]:
for template in config_templates:
is_main_template = template["is_main"]
if template["version"] != latest_flag or is_tag:
plugin_version_set = {plugin_version, "*"}
Expand All @@ -255,19 +242,19 @@ def format2policy_config(self, original_config: Dict):
.values("os", "cpu_arch")
.annotate(max_id=Max("id"))
)
config_templates = models.PluginConfigTemplate.objects.filter(
config_tmpl_objs = models.PluginConfigTemplate.objects.filter(
id__in=Subquery(config_templates_group_by_os_cpu.values("max_id"))
)

for config_template in config_templates:
os_cpu__config_templates_map[self.get_os_key(config_template.os, config_template.cpu_arch)].append(
for config_tmpl_obj in config_tmpl_objs:
os_cpu__config_templates_map[self.get_os_key(config_tmpl_obj.os, config_tmpl_obj.cpu_arch)].append(
{
"id": config_template.id,
"version": config_template.version,
"name": config_template.name,
"os": config_template.os,
"cpu_arch": config_template.cpu_arch,
"is_main": config_template.is_main,
"id": config_tmpl_obj.id,
"version": config_tmpl_obj.version,
"name": config_tmpl_obj.name,
"os": config_tmpl_obj.os,
"cpu_arch": config_tmpl_obj.cpu_arch,
"is_main": config_tmpl_obj.is_main,
}
)

Expand All @@ -284,6 +271,115 @@ def format2policy_config(self, original_config: Dict):
}
)

return policy_packages

def max_ids_by_key(self, contained_os_cpu_items: List[Dict[str, Any]]) -> List[int]:
os_cpu__max_id_map: Dict[str, int] = {}
for item in contained_os_cpu_items:
os_key: str = self.get_os_key(item["os"], item["cpu_arch"])
if os_cpu__max_id_map.get(os_key, 0) < item["id"]:
os_cpu__max_id_map[os_key] = item["id"]
return list(os_cpu__max_id_map.values())

def format2policy_packages_new(
self, plugin_id: int, plugin_name: str, plugin_version: str, config_templates: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
latest_flag: str = "latest"
is_tag: bool = Tag.objects.filter(
target_id=plugin_id, name=latest_flag, target_type=TargetType.PLUGIN.value
).exists()

if plugin_version != latest_flag or is_tag:
# 如果 latest 是 tag,走取指定版本的逻辑
packages = models.Packages.objects.filter(project=plugin_name, version=plugin_version)
else:
max_pkg_ids: List[int] = self.max_ids_by_key(
list(models.Packages.objects.filter(project=plugin_name).values("id", "os", "cpu_arch"))
)
packages = models.Packages.objects.filter(id__in=max_pkg_ids)

if not packages:
raise errors.PluginValidationError(
msg=_("插件包 [{name}-{version}] 不存在").format(name=plugin_name, version=plugin_version)
)

os_cpu__config_templates_map = defaultdict(list)
for template in config_templates:
is_main_template = template["is_main"]
if template["version"] != latest_flag or is_tag:
plugin_version_set = {plugin_version, "*"}
else:
latest_packages_version_set = set(packages.values_list("version", flat=True))
plugin_version_set = latest_packages_version_set | {"*"}

max_config_tmpl_ids: typing.List[int] = self.max_ids_by_key(
list(
models.PluginConfigTemplate.objects.filter(
name=template["name"],
plugin_name=plugin_name,
is_main=Value(1 if is_main_template else 0),
plugin_version__in=plugin_version_set,
).values("id", "os", "cpu_arch")
)
)
db_config_tmpl_infos = models.PluginConfigTemplate.objects.filter(id__in=max_config_tmpl_ids).values(
"id", "version", "os", "cpu_arch"
)
for db_config_tmpl_info in db_config_tmpl_infos:
os_cpu__config_templates_map[
self.get_os_key(db_config_tmpl_info["os"], db_config_tmpl_info["cpu_arch"])
].append(
{
"id": db_config_tmpl_info["id"],
"version": db_config_tmpl_info["version"],
"name": template["name"],
"os": db_config_tmpl_info["os"],
"cpu_arch": db_config_tmpl_info["cpu_arch"],
"is_main": is_main_template,
}
)

policy_packages: List[Dict[str, Union[str, List[Dict[str, Any]]]]] = []
for package in packages.values("id", "version", "cpu_arch", "os"):
policy_packages.append(
{
"id": package["id"],
"project": plugin_name,
"version": package["version"],
"cpu_arch": package["cpu_arch"],
"os": package["os"],
"config_templates": os_cpu__config_templates_map[
self.get_os_key(package["os"], package["cpu_arch"])
],
}
)

return policy_packages

def format2policy_config(self, original_config: Dict):
try:
format_result = self.validated_data(data=original_config, serializer=PolicyStepConfigSerializer)
except exceptions.ValidationError:
pass
else:
return format_result

validated_config = self.validated_data(data=original_config, serializer=PluginStepConfigSerializer)
plugin_name = validated_config["plugin_name"]
plugin_version = validated_config["plugin_version"]

try:
plugin_desc = models.GsePluginDesc.objects.get(name=plugin_name)
except models.GsePluginDesc.DoesNotExist:
raise errors.PluginValidationError(msg="插件 [{name}] 信息不存在".format(name=self.plugin_name))

policy_packages = self.format2policy_packages_new(
plugin_id=plugin_desc.id,
plugin_name=plugin_name,
plugin_version=plugin_version,
config_templates=validated_config["config_templates"],
)

policy_step_config = {**copy.deepcopy(validated_config), "details": policy_packages}

# 补充original_config中部分必要参数
Expand Down
32 changes: 30 additions & 2 deletions apps/backend/subscription/task_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@

from apps.backend.subscription import tools
from apps.backend.utils import pipeline_parser
from apps.core.concurrent import controller
from apps.node_man import constants, models
from apps.utils import concurrent
from apps.utils.time_handler import strftime_local

logger = logging.getLogger("app")


FUNC_CACHE = {}


class TaskResultTools:
@staticmethod
def list_pipeline_processes(pipeline_id: str) -> Dict[str, List[Dict]]:
Expand Down Expand Up @@ -89,7 +94,7 @@ def collect_status(steps: List[Dict[str, Any]], backtrace_steps=False) -> str:
return status

@classmethod
def list_subscription_task_instance_status(
def _list_subscription_task_instance_status(
cls, instance_records: List[models.SubscriptionInstanceRecord], need_detail=False
) -> List[Dict[str, Any]]:
if not instance_records:
Expand Down Expand Up @@ -136,6 +141,29 @@ def list_subscription_task_instance_status(
)
return instance_status_list

@classmethod
def list_subscription_task_instance_status(
cls, instance_records: List[models.SubscriptionInstanceRecord], need_detail=False
) -> List[Dict[str, Any]]:
def _inner(
_instance_records: List[models.SubscriptionInstanceRecord], _need_detail=False
) -> List[Dict[str, Any]]:
return cls._list_subscription_task_instance_status(
instance_records=_instance_records, need_detail=_need_detail
)

_inner_with_controller = FUNC_CACHE.get("list_subscription_task_instance_status__inner")
if not _inner_with_controller:
_inner_with_controller = controller.ConcurrentController(
data_list_name="_instance_records",
batch_call_func=concurrent.batch_call,
extend_result=True,
get_config_dict_func=lambda: {"limit": 500},
)(_inner)
FUNC_CACHE["list_subscription_task_instance_status__inner"] = _inner_with_controller

return _inner_with_controller(_instance_records=instance_records, _need_detail=need_detail)

@classmethod
def get_subscription_task_instance_status(
cls,
Expand Down Expand Up @@ -290,7 +318,7 @@ def update_inst_record_status(
else:
old_instance_records.append(instance_record)

instance_status_list = TaskResultTools.list_subscription_task_instance_status(new_instance_records)
instance_status_list = TaskResultTools.list_subscription_task_instance_status(instance_records=new_instance_records)

_pipeline_parser = pipeline_parser.PipelineParser([r.pipeline_id for r in old_instance_records])
for instance_record in old_instance_records:
Expand Down
4 changes: 3 additions & 1 deletion apps/backend/subscription/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,9 @@ def instance_status(self, request):
subscription_instance_record[instance_record.subscription_id][instance_record.instance_id] = instance_record
instance_records.append(instance_record)

instance_status_list = task_tools.TaskResultTools.list_subscription_task_instance_status(instance_records)
instance_status_list = task_tools.TaskResultTools.list_subscription_task_instance_status(
instance_records=instance_records
)
instance_status_map = {
instance_status["instance_id"]: instance_status for instance_status in instance_status_list
}
Expand Down
8 changes: 6 additions & 2 deletions apps/utils/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,17 @@ def batch_call(
if not params_list:
return result

# 单个子任务,直接串行跑,减少上下文切换
if len(params_list) == 1:
return batch_call_serial(func, params_list, get_data, extend_result, interval, **kwargs)

if inspect.iscoroutinefunction(func):
func = async_to_sync(func)

with ThreadPoolExecutor(max_workers=settings.CONCURRENT_NUMBER) as ex:
tasks = []
for idx, params in enumerate(params_list):
if idx != 0:
if idx != 0 and interval:
time.sleep(interval)
tasks.append(
ex.submit(translation.RespectsLanguage(language=get_language())(inject_request(func)), **params)
Expand Down Expand Up @@ -101,7 +105,7 @@ def batch_call_serial(

result = []
for idx, params in enumerate(params_list):
if idx != 0:
if idx != 0 and interval:
time.sleep(interval)
if extend_result:
result.extend(get_data(func(**params)))
Expand Down

0 comments on commit 1a83749

Please sign in to comment.