Skip to content

Commit

Permalink
feat: instance_status 接口优化 (closed TencentBlueKing#2149)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhuoZhuoCrayon authored and wyyalt committed Mar 27, 2024
1 parent e5b5681 commit 213ca9c
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 96 deletions.
105 changes: 105 additions & 0 deletions apps/backend/subscription/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
from apps.backend.subscription import errors, task_tools, tasks, tools
from apps.backend.subscription.errors import InstanceTaskIsRunning
from apps.backend.utils.pipeline_parser import PipelineParser
from apps.core.concurrent import controller
from apps.node_man import constants, models
from apps.utils import concurrent
from apps.utils.basic import filter_values
from pipeline.engine.models import PipelineProcess
from pipeline.service import task_service
Expand Down Expand Up @@ -554,3 +556,106 @@ def statistic(subscription_id_list: List[int]) -> List[Dict]:
sub_statistic_list.append(sub_statistic)

return sub_statistic_list + hit_sub_statistic_list

@staticmethod
@controller.ConcurrentController(
data_list_name="subscription_id_list",
batch_call_func=concurrent.batch_call,
extend_result=True,
get_config_dict_func=lambda: {"limit": 5},
)
def instance_status(subscription_id_list: List[int], show_task_detail: bool) -> List[Dict[str, Any]]:

subscriptions = models.Subscription.objects.filter(id__in=subscription_id_list)

# 查出所有HostStatus
instance_host_statuses = defaultdict(list)
for host_status in models.ProcessStatus.objects.filter(source_id__in=subscription_id_list).only(
"name", "status", "version", "group_id"
):
instance_host_statuses[host_status.group_id].append(host_status)

# 查出所有InstanceRecord
subscription_instance_record: Dict[int, Dict[str, models.SubscriptionInstanceRecord]] = defaultdict(dict)
instance_records = []
for instance_record in models.SubscriptionInstanceRecord.objects.filter(
subscription_id__in=subscription_id_list, is_latest=True
):
subscription_instance_record[instance_record.subscription_id][instance_record.instance_id] = instance_record
instance_records.append(instance_record)

instance_status_list = task_tools.TaskResultTools.list_subscription_task_instance_status(
instance_records=instance_records
)
instance_status_map = {
instance_status["instance_id"]: instance_status for instance_status in instance_status_list
}
running_records = {}
# 更新每条record的status字段
for subscription_id, id_record_map in subscription_instance_record.items():
for instance_id, record in id_record_map.items():
# 注入 status 属性。查不到执行记录的,默认设为 PENDING
record.status = instance_status_map.get(instance_id, {"status": "PENDING"})["status"]
if record.status in ["PENDING", "RUNNING"]:
# 如果实例正在执行,则记下它对应的ID
running_records[record.task_id] = record

# 查出正在运行实例对应的订阅任务,并建立record到task的映射关系
subscription_tasks = models.SubscriptionTask.objects.filter(id__in=list(running_records.keys())).only(
"id", "is_auto_trigger"
)

record_tasks = {}
for task in subscription_tasks:
record = running_records[task.id]
record_tasks[record.id] = task

result = []
for subscription in subscriptions:
subscription_result = []
current_instances = tools.get_instances_by_scope(
subscription.scope, get_cache=True, source="instance_status"
)

# 对于每个instance,通过group_id找到其对应的host_status
for instance_id in current_instances:
if instance_id in subscription_instance_record[subscription.id]:
instance_record = subscription_instance_record[subscription.id][instance_id]
group_id = tools.create_group_id(subscription, instance_record.instance_info)

# 检查该实例是否有正在执行的任务
try:
related_task = record_tasks[instance_record.id]
running_task = {
"id": related_task.id,
"is_auto_trigger": related_task.is_auto_trigger,
}
except KeyError:
running_task = None

instance_result = {
"instance_id": instance_id,
"status": instance_record.status,
"create_time": instance_record.create_time,
"host_statuses": [],
"instance_info": instance_record.simple_instance_info(),
"running_task": running_task,
"last_task": {"id": instance_record.task_id},
}

if show_task_detail:
# 展示任务详情
instance_status = instance_status_map[instance_id]
instance_status.pop("instance_info", None)
instance_status.pop("task_id", None)
instance_status.pop("instance_id", None)
instance_result["last_task"].update(instance_status)

for host_status in instance_host_statuses[group_id]:
instance_result["host_statuses"].append(
{"name": host_status.name, "status": host_status.status, "version": host_status.version}
)
subscription_result.append(instance_result)
result.append({"subscription_id": subscription.id, "instances": subscription_result})

return result
100 changes: 5 additions & 95 deletions apps/backend/subscription/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import logging
import operator
from collections import defaultdict
from dataclasses import asdict
from functools import cmp_to_key, reduce
from typing import Any, Dict, List, Set
Expand All @@ -31,7 +30,7 @@
from apps.backend.agent.tools import gen_commands
from apps.backend.constants import SubscriptionSwithBizAction
from apps.backend.serializers import response
from apps.backend.subscription import errors, serializers, task_tools, tasks, tools
from apps.backend.subscription import errors, serializers, tasks, tools
from apps.backend.subscription.errors import InstanceTaskIsRunning
from apps.backend.subscription.handler import SubscriptionHandler
from apps.backend.subscription.steps.agent_adapter.adapter import AgentStepAdapter
Expand Down Expand Up @@ -436,100 +435,11 @@ def instance_status(self, request):
@apiGroup subscription
"""
params = self.validated_data

subscriptions = models.Subscription.objects.filter(id__in=params["subscription_id_list"])

# 查出所有HostStatus
instance_host_statuses = defaultdict(list)
for host_status in models.ProcessStatus.objects.filter(source_id__in=params["subscription_id_list"]).only(
"name", "status", "version", "group_id"
):
instance_host_statuses[host_status.group_id].append(host_status)

# 查出所有InstanceRecord
subscription_instance_record: Dict[int, Dict[str, models.SubscriptionInstanceRecord]] = defaultdict(dict)
instance_records = []
for instance_record in models.SubscriptionInstanceRecord.objects.filter(
subscription_id__in=params["subscription_id_list"], is_latest=True
):
subscription_instance_record[instance_record.subscription_id][instance_record.instance_id] = instance_record
instance_records.append(instance_record)

instance_status_list = task_tools.TaskResultTools.list_subscription_task_instance_status(
instance_records=instance_records
)
instance_status_map = {
instance_status["instance_id"]: instance_status for instance_status in instance_status_list
}
running_records = {}
# 更新每条record的status字段
for subscription_id, id_record_map in subscription_instance_record.items():
for instance_id, record in id_record_map.items():
# 注入 status 属性。查不到执行记录的,默认设为 PENDING
record.status = instance_status_map.get(instance_id, {"status": "PENDING"})["status"]
if record.status in ["PENDING", "RUNNING"]:
# 如果实例正在执行,则记下它对应的ID
running_records[record.task_id] = record

# 查出正在运行实例对应的订阅任务,并建立record到task的映射关系
subscription_tasks = models.SubscriptionTask.objects.filter(id__in=list(running_records.keys())).only(
"id", "is_auto_trigger"
)

record_tasks = {}
for task in subscription_tasks:
record = running_records[task.id]
record_tasks[record.id] = task

result = []
for subscription in subscriptions:
subscription_result = []
current_instances = tools.get_instances_by_scope(
subscription.scope, get_cache=True, source="instance_status"
return Response(
SubscriptionHandler.instance_status(
subscription_id_list=params["subscription_id_list"], show_task_detail=params["show_task_detail"]
)

# 对于每个instance,通过group_id找到其对应的host_status
for instance_id in current_instances:
if instance_id in subscription_instance_record[subscription.id]:
instance_record = subscription_instance_record[subscription.id][instance_id]
group_id = tools.create_group_id(subscription, instance_record.instance_info)

# 检查该实例是否有正在执行的任务
try:
related_task = record_tasks[instance_record.id]
running_task = {
"id": related_task.id,
"is_auto_trigger": related_task.is_auto_trigger,
}
except KeyError:
running_task = None

instance_result = {
"instance_id": instance_id,
"status": instance_record.status,
"create_time": instance_record.create_time,
"host_statuses": [],
"instance_info": instance_record.simple_instance_info(),
"running_task": running_task,
"last_task": {"id": instance_record.task_id},
}

if params["show_task_detail"]:
# 展示任务详情
instance_status = instance_status_map[instance_id]
instance_status.pop("instance_info", None)
instance_status.pop("task_id", None)
instance_status.pop("instance_id", None)
instance_result["last_task"].update(instance_status)

for host_status in instance_host_statuses[group_id]:
instance_result["host_statuses"].append(
{"name": host_status.name, "status": host_status.status, "version": host_status.version}
)
subscription_result.append(instance_result)

result.append({"subscription_id": subscription.id, "instances": subscription_result})
return Response(result)
)

@swagger_auto_schema(operation_summary="订阅启停", tags=SUBSCRIPTION_VIEW_TAGS)
@action(detail=False, methods=["POST"], serializer_class=serializers.SwitchSubscriptionSerializer)
Expand Down
3 changes: 3 additions & 0 deletions apps/node_man/periodic_tasks/sync_cmdb_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def query_biz_hosts(bk_biz_id: int, bk_host_ids: typing.List[int]) -> typing.Lis
:param bk_host_ids: 主机ID 列表
:return: 主机列表
"""
if not bk_host_ids:
return []

query_params = {
"fields": constants.CC_HOST_FIELDS,
"host_property_filter": {
Expand Down
2 changes: 1 addition & 1 deletion script_tools/agent_tools/agent2/setup_agent.sh
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ remove_agent () {
stop_agent

log remove_agent - "trying to remove old agent directory(${AGENT_SETUP_PATH}/${AGENT_CLEAN_UP_DIRS[@]})"
cd "${AGENT_SETUP_PATH}"
cd "${AGENT_SETUP_PATH}" || return 0
for file in `lsattr -R |egrep "i-" |awk '{print $NF}'`;do echo "--- $file" && chattr -i $file ;done
cd -

Expand Down

0 comments on commit 213ca9c

Please sign in to comment.