From 7e20120ceebd69ee8d2e3fd44356fe54438df036 Mon Sep 17 00:00:00 2001 From: HumbertZhang <504490160@qq.com> Date: Sun, 11 Jul 2021 11:09:45 +0800 Subject: [PATCH 1/9] add profile command receive from oapserver and commandservice --- skywalking/agent/__init__.py | 19 +++ skywalking/agent/protocol/__init__.py | 3 + skywalking/agent/protocol/grpc.py | 6 +- skywalking/client/__init__.py | 5 + skywalking/client/grpc.py | 23 +++- skywalking/command/__init__.py | 4 + skywalking/command/base_command.py | 7 ++ skywalking/command/command_service.py | 109 ++++++++++++++++++ skywalking/command/executors/__init__.py | 3 + .../command/executors/command_executor.py | 7 ++ .../executors/noop_command_executor.py | 13 +++ .../profile_task_command_executor.py | 23 ++++ skywalking/command/profile_task_command.py | 68 +++++++++++ skywalking/profile/__init__.py | 3 + skywalking/profile/profile_constants.py | 9 ++ skywalking/profile/profile_task.py | 23 ++++ .../profile/profile_task_execution_service.py | 84 ++++++++++++++ 17 files changed, 407 insertions(+), 2 deletions(-) create mode 100644 skywalking/command/__init__.py create mode 100644 skywalking/command/base_command.py create mode 100644 skywalking/command/command_service.py create mode 100644 skywalking/command/executors/__init__.py create mode 100644 skywalking/command/executors/command_executor.py create mode 100644 skywalking/command/executors/noop_command_executor.py create mode 100644 skywalking/command/executors/profile_task_command_executor.py create mode 100644 skywalking/command/profile_task_command.py create mode 100644 skywalking/profile/__init__.py create mode 100644 skywalking/profile/profile_constants.py create mode 100644 skywalking/profile/profile_task.py create mode 100644 skywalking/profile/profile_task_execution_service.py diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index 356a28a3..faabb909 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -24,6 +24,8 @@ from skywalking.loggings import logger from skywalking.agent.protocol import Protocol +from skywalking.command import command_service + if TYPE_CHECKING: from skywalking.trace.context import Segment @@ -44,8 +46,23 @@ def __report(): __finished.wait(1) +def __query(): + while not __finished.is_set(): + if connected(): + __protocol.query_commands() + + __finished.wait(10) + + +def __command_dispatch(): + # command dispatch will stuck when there are no commands + command_service.dispatch() + + __heartbeat_thread = Thread(name='HeartbeatThread', target=__heartbeat, daemon=True) __report_thread = Thread(name='ReportThread', target=__report, daemon=True) +__query_thread = Thread(name='QueryCommandThread', target=__query, daemon=True) +__command_dispatch_thread = Thread(name="CommandDispatchThread", target=__command_dispatch, daemon=True) __queue = Queue(maxsize=10000) __finished = Event() __protocol = Protocol() # type: Protocol @@ -91,6 +108,8 @@ def start(): __init() __heartbeat_thread.start() __report_thread.start() + __query_thread.start() + __command_dispatch_thread.start() atexit.register(__fini) diff --git a/skywalking/agent/protocol/__init__.py b/skywalking/agent/protocol/__init__.py index 0f6e62e5..bb770fcd 100644 --- a/skywalking/agent/protocol/__init__.py +++ b/skywalking/agent/protocol/__init__.py @@ -28,3 +28,6 @@ def heartbeat(self): def report(self, queue: Queue, block: bool = True): raise NotImplementedError() + + def query_commands(self): + raise NotImplementedError() diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py index 3f2c6381..107195dd 100644 --- a/skywalking/agent/protocol/grpc.py +++ b/skywalking/agent/protocol/grpc.py @@ -26,7 +26,7 @@ from skywalking import config from skywalking.agent import Protocol from skywalking.agent.protocol.interceptors import header_adder_interceptor -from skywalking.client.grpc import GrpcServiceManagementClient, GrpcTraceSegmentReportService +from skywalking.client.grpc import GrpcServiceManagementClient, GrpcTraceSegmentReportService, GrpcProfileTaskChannelService from skywalking.protocol.common.Common_pb2 import KeyStringValuePair from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference from skywalking.trace.segment import Segment @@ -44,6 +44,7 @@ def __init__(self): self.channel.subscribe(self._cb, try_to_connect=True) self.service_management = GrpcServiceManagementClient(self.channel) self.traces_reporter = GrpcTraceSegmentReportService(self.channel) + self.profile_query = GrpcProfileTaskChannelService(self.channel) def _cb(self, state): logger.debug('grpc channel connectivity changed, [%s -> %s]', self.state, state) @@ -54,6 +55,9 @@ def _cb(self, state): except grpc.RpcError: self.on_error() + def query_commands(self): + self.profile_query.do_query() + def heartbeat(self): try: self.service_management.send_heart_beat() diff --git a/skywalking/client/__init__.py b/skywalking/client/__init__.py index 163c9ea2..3cd175f2 100644 --- a/skywalking/client/__init__.py +++ b/skywalking/client/__init__.py @@ -27,3 +27,8 @@ def send_heart_beat(self): class TraceSegmentReportService(object): def report(self, generator): raise NotImplementedError() + + +class ProfileTaskChannelService(object): + def do_query(self): + raise NotImplementedError() diff --git a/skywalking/client/grpc.py b/skywalking/client/grpc.py index ef0f02d4..55451704 100644 --- a/skywalking/client/grpc.py +++ b/skywalking/client/grpc.py @@ -20,12 +20,17 @@ import grpc from skywalking import config -from skywalking.client import ServiceManagementClient, TraceSegmentReportService +from skywalking.client import ServiceManagementClient, TraceSegmentReportService, ProfileTaskChannelService from skywalking.protocol.common.Common_pb2 import KeyStringValuePair from skywalking.protocol.language_agent.Tracing_pb2_grpc import TraceSegmentReportServiceStub +from skywalking.protocol.profile.Profile_pb2_grpc import ProfileTaskStub +from skywalking.protocol.profile.Profile_pb2 import ProfileTaskCommandQuery from skywalking.protocol.management.Management_pb2 import InstancePingPkg, InstanceProperties from skywalking.protocol.management.Management_pb2_grpc import ManagementServiceStub +from skywalking.command import command_service +from skywalking.profile import profile_task_execution_service + class GrpcServiceManagementClient(ServiceManagementClient): def __init__(self, channel: grpc.Channel): @@ -56,3 +61,19 @@ def __init__(self, channel: grpc.Channel): def report(self, generator): self.report_stub.collect(generator, timeout=config.GRPC_TIMEOUT) + + +class GrpcProfileTaskChannelService(ProfileTaskChannelService): + def __init__(self, channel: grpc.Channel): + self.task_stub = ProfileTaskStub(channel) + + def do_query(self): + + query = ProfileTaskCommandQuery( + service=config.service_name, + serviceInstance=config.service_instance, + lastCommandTime=profile_task_execution_service.get_last_command_create_time() + ) + + commands = self.task_stub.getProfileTaskCommands(query) + command_service.receive_command(commands) diff --git a/skywalking/command/__init__.py b/skywalking/command/__init__.py new file mode 100644 index 00000000..722077c0 --- /dev/null +++ b/skywalking/command/__init__.py @@ -0,0 +1,4 @@ +from skywalking.command.command_service import CommandService + +command_service = CommandService() + diff --git a/skywalking/command/base_command.py b/skywalking/command/base_command.py new file mode 100644 index 00000000..7f2890ac --- /dev/null +++ b/skywalking/command/base_command.py @@ -0,0 +1,7 @@ + +class BaseCommand: + def __init__(self, + command: str = None, + serial_number: str = None): + self.command = command # type: str + self.serial_number = serial_number # type: str diff --git a/skywalking/command/command_service.py b/skywalking/command/command_service.py new file mode 100644 index 00000000..c1771eba --- /dev/null +++ b/skywalking/command/command_service.py @@ -0,0 +1,109 @@ +from collections import deque +import queue +from typing import List + +from skywalking.loggings import logger +from skywalking.command.base_command import BaseCommand +from skywalking.command.profile_task_command import ProfileTaskCommand +from skywalking.command.executors.command_executor import CommandExecutor +from skywalking.command.executors.profile_task_command_executor import ProfileTaskCommandExecutor +from skywalking.command.executors import noop_command_executor_instance + +from skywalking.protocol.common.Common_pb2 import Commands, Command + + +class CommandService: + + def __init__(self): + self.__commands = queue.Queue() # type: queue.Queue + # don't execute same command twice + self.__command_serial_number_cache = CommandSerialNumberCache() + + def dispatch(self): + while True: + command = self.__commands.get() # type: BaseCommand + logger.debug("dispatch command: %s", command) + + if not self.__is_command_executed(command): + command_executor_service.execute(command) + self.__command_serial_number_cache.add(command.serial_number) + + def __is_command_executed(self, command: BaseCommand): + return self.__command_serial_number_cache.contains(command.serial_number) + + def receive_command(self, commands: Commands): + for command in commands.commands: + try: + base_command = CommandDeserializer.deserialize(command) + logger.debug("Received command [{%s} {%s}]", base_command.command, base_command.serial_number) + + if self.__is_command_executed(base_command): + logger.warning("Command[{%s}] is executed, ignored.", base_command.command) + continue + + try: + self.__commands.put(base_command) + except queue.Full: + logger.warning("Command[{%s}, {%s}] cannot add to command list. because the command list is full.", + base_command.command, base_command.serial_number) + except UnsupportedCommandException as e: + logger.warning("Received unsupported command[{%s}].", e.command.command) + + +class CommandSerialNumberCache: + + def __init__(self, maxlen=64): + self.queue = deque(maxlen=maxlen) + self.max_capacity = maxlen + + def add(self, number: str): + if len(self.queue) >= self.max_capacity: + self.queue.pop() + self.queue.append(number) + + def contains(self, number: str) -> bool: + try: + _ = self.queue.index(number) + return True + except ValueError: + return False + + +class CommandExecutorService: + """ + route commands to appropriate executor + """ + + def __init__(self): + self.__command_executor_map = {ProfileTaskCommand.NAME: ProfileTaskCommandExecutor()} + + def execute(self, command: BaseCommand): + self.__executor_for_command(command).execute(command) + + def __executor_for_command(self, command: BaseCommand): + executor = self.__command_executor_map.get(command.command) + if not executor: + return noop_command_executor_instance + return executor + + +class CommandDeserializer: + + @staticmethod + def deserialize(command: Command) -> BaseCommand: + command_name = command.command + + if ProfileTaskCommand.NAME == command_name: + return ProfileTaskCommand.deserialize(command) + else: + raise UnsupportedCommandException(command) + + +class UnsupportedCommandException(Exception): + + def __init__(self, command): + self.command = command + + +# init +command_executor_service = CommandExecutorService() diff --git a/skywalking/command/executors/__init__.py b/skywalking/command/executors/__init__.py new file mode 100644 index 00000000..209c28eb --- /dev/null +++ b/skywalking/command/executors/__init__.py @@ -0,0 +1,3 @@ +from skywalking.command.executors.noop_command_executor import NoopCommandExecutor + +noop_command_executor_instance = NoopCommandExecutor() diff --git a/skywalking/command/executors/command_executor.py b/skywalking/command/executors/command_executor.py new file mode 100644 index 00000000..ee2c017e --- /dev/null +++ b/skywalking/command/executors/command_executor.py @@ -0,0 +1,7 @@ +from skywalking.command.base_command import BaseCommand + + +class CommandExecutor: + def execute(self, command: BaseCommand): + raise NotImplementedError() + diff --git a/skywalking/command/executors/noop_command_executor.py b/skywalking/command/executors/noop_command_executor.py new file mode 100644 index 00000000..14b7e9cc --- /dev/null +++ b/skywalking/command/executors/noop_command_executor.py @@ -0,0 +1,13 @@ +from skywalking.command.executors.command_executor import CommandExecutor +from skywalking.command.base_command import BaseCommand + + +class NoopCommandExecutor(CommandExecutor): + def __init__(self): + pass + + def execute(self, command: BaseCommand): + pass + + + diff --git a/skywalking/command/executors/profile_task_command_executor.py b/skywalking/command/executors/profile_task_command_executor.py new file mode 100644 index 00000000..55612ff7 --- /dev/null +++ b/skywalking/command/executors/profile_task_command_executor.py @@ -0,0 +1,23 @@ +from skywalking.command.executors.command_executor import CommandExecutor +from skywalking.command.profile_task_command import ProfileTaskCommand +from skywalking.loggings import logger +from skywalking.profile import profile_task_execution_service +from skywalking.profile.profile_task import ProfileTask + + +class ProfileTaskCommandExecutor(CommandExecutor): + + def execute(self, command: ProfileTaskCommand): + logger.debug("start execute ProfileTaskCommand [{%s}]", command.serial_number) + + profile_task = ProfileTask(task_id=command.task_id, + first_span_op_name=command.endpoint_name, + duration=command.duration, + min_duration_threshold=command.min_duration_threshold, + thread_dump_period=command.dump_period, + max_sampling_count=command.max_sampling_count, + start_time=command.start_time, + create_time=command.create_time) + + profile_task_execution_service.add_profile_task(profile_task) + diff --git a/skywalking/command/profile_task_command.py b/skywalking/command/profile_task_command.py new file mode 100644 index 00000000..99f4661f --- /dev/null +++ b/skywalking/command/profile_task_command.py @@ -0,0 +1,68 @@ +from skywalking.utils.lang import tostring +from skywalking.command.base_command import BaseCommand +from skywalking.protocol.common.Common_pb2 import Command + + +@tostring +class ProfileTaskCommand(BaseCommand): + NAME = "ProfileTaskQuery" + + def __init__(self, + serial_number: str = None, + task_id: str = None, + endpoint_name: str = None, + duration: int = None, + min_duration_threshold: int = None, + dump_period: int = None, + max_sampling_count: int = None, + start_time: int = None, + create_time: int = None): + + BaseCommand.__init__(self, self.NAME, serial_number) + + self.task_id = task_id # type: str + self.endpoint_name = endpoint_name # type: str + self.duration = duration # type: int + self.min_duration_threshold = min_duration_threshold # type: int + self.dump_period = dump_period # type: int + self.max_sampling_count = max_sampling_count # type: int + self.start_time = start_time # type: int + self.create_time = create_time # type: int + + @staticmethod + def deserialize(command: Command): + serial_number = None + task_id = None + endpoint_name = None + duration = None + min_duration_threshold = None + dump_period = None + max_sampling_count = None + start_time = None + create_time = None + + for pair in command.args: + if pair.key == "SerialNumber": + serial_number = pair.value + elif pair.key == "EndpointName": + endpoint_name = pair.value + elif pair.key == "TaskId": + task_id = pair.value + elif pair.key == "Duration": + duration = pair.value + elif pair.key == "MinDurationThreshold": + min_duration_threshold = pair.value + elif pair.key == "DumpPeriod": + dump_period = pair.value + elif pair.key == "MaxSamplingCount": + max_sampling_count = pair.value + elif pair.key == "StartTime": + start_time = pair.value + elif pair.key == "CreateTime": + create_time = pair.value + + return ProfileTaskCommand(serial_number=serial_number, task_id=task_id, + endpoint_name=endpoint_name, duration=duration, + min_duration_threshold=min_duration_threshold, dump_period=dump_period, + max_sampling_count=max_sampling_count, start_time=start_time, + create_time=create_time) diff --git a/skywalking/profile/__init__.py b/skywalking/profile/__init__.py new file mode 100644 index 00000000..92a6ec74 --- /dev/null +++ b/skywalking/profile/__init__.py @@ -0,0 +1,3 @@ +from .profile_task_execution_service import ProfileTaskExecutionService + +profile_task_execution_service = ProfileTaskExecutionService() diff --git a/skywalking/profile/profile_constants.py b/skywalking/profile/profile_constants.py new file mode 100644 index 00000000..309d7bcf --- /dev/null +++ b/skywalking/profile/profile_constants.py @@ -0,0 +1,9 @@ +class ProfileConstants: + # Monitor duration must greater than 1 minutes + TASK_DURATION_MIN_MINUTE = 1 + # The duration of the monitoring task cannot be greater than 15 minutes + TASK_DURATION_MAX_MINUTE = 15 + # Dump period must be greater than or equals 10 milliseconds + TASK_DUMP_PERIOD_MIN_MILLIS = 10 + # Max sampling count must less than 10 + TASK_MAX_SAMPLING_COUNT = 10 diff --git a/skywalking/profile/profile_task.py b/skywalking/profile/profile_task.py new file mode 100644 index 00000000..7f91a36e --- /dev/null +++ b/skywalking/profile/profile_task.py @@ -0,0 +1,23 @@ +from skywalking.utils.lang import tostring + + +@tostring +class ProfileTask: + + def __init__(self, + task_id: str = None, + first_span_op_name: str = None, + duration: int = None, + min_duration_threshold: int = None, + thread_dump_period: int = None, + max_sampling_count: int = None, + start_time: int = None, + create_time: int = None): + self.task_id = str(task_id) # type: str + self.first_span_op_name = str(first_span_op_name) # type: str + self.duration = int(duration) # type: int + self.min_duration_threshold = int(min_duration_threshold) # type: int + self.thread_dump_period = int(thread_dump_period) # type: int + self.max_sampling_count = int(max_sampling_count) # type: int + self.start_time = int(start_time) # type: int + self.create_time = int(create_time) # type: int diff --git a/skywalking/profile/profile_task_execution_service.py b/skywalking/profile/profile_task_execution_service.py new file mode 100644 index 00000000..54089679 --- /dev/null +++ b/skywalking/profile/profile_task_execution_service.py @@ -0,0 +1,84 @@ +from skywalking.profile.profile_task import ProfileTask +from skywalking.profile.profile_constants import ProfileConstants +from collections import deque +from skywalking.loggings import logger + + +class ProfileTaskExecutionService: + MINUTE_TO_MILLS = 60000 + + def __init__(self): + self.__profile_task_list = deque() # type: deque + self.__last_command_create_time = -1 # type: int + + def get_last_command_create_time(self) -> int: + return self.__last_command_create_time + + def add_profile_task(self, task: ProfileTask): + # update last command create time, which will be used in command query + if task.create_time > self.__last_command_create_time: + self.__last_command_create_time = task.create_time + + # check profile task object + result = self.__check_profile_task(task) + if not result.success: + logger.warning("check command error, cannot process this profile task. reason: %s", result.error_reason) + return + + # add task to list + self.__profile_task_list.append(task) + + class CheckResult: + def __init__(self, success: bool, error_reason: str): + self.success = success # type: bool + self.error_reason = error_reason # type: str + + def __check_profile_task(self, task: ProfileTask) -> CheckResult: + try: + # endpoint name + if len(task.first_span_op_name) == 0: + return self.CheckResult(False, "endpoint name [{}] error, " + "should be str and not empty".format(task.first_span_op_name)) + # duration + if task.duration < ProfileConstants.TASK_DURATION_MIN_MINUTE: + return self.CheckResult(False, "monitor duration must greater" + " than {} minutes".format(ProfileConstants.TASK_DURATION_MIN_MINUTE)) + if task.duration > ProfileConstants.TASK_DURATION_MAX_MINUTE: + return self.CheckResult(False, "monitor duration must less" + " than {} minutes".format(ProfileConstants.TASK_DURATION_MAX_MINUTE)) + # min duration threshold + if task.min_duration_threshold < 0: + return self.CheckResult(False, "min duration threshold must greater than or equals zero") + + # dump period + if task.thread_dump_period < ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS: + return self.CheckResult(False, "dump period must be greater than or equals to {}" + " milliseconds".format(ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS)) + + # max sampling count + if task.max_sampling_count <= 0: + return self.CheckResult(False, "max sampling count must greater than zero") + if task.max_sampling_count >= ProfileConstants.TASK_MAX_SAMPLING_COUNT: + return self.CheckResult(False, "max sampling count must less" + " than {}".format(ProfileConstants.TASK_MAX_SAMPLING_COUNT)) + + # check task queue + task_finish_time = self.__cal_profile_task_finish_time(task) + + for profile_task in self.__profile_task_list: # type: ProfileTask + # if the end time of the task to be added is during the execution of any data, means is a error data + if task.start_time <= task_finish_time <= self.__cal_profile_task_finish_time(profile_task): + return self.CheckResult(False, "there already have processing task in time range, " + "could not add a new task again. processing task monitor " + "endpoint name: {}".format(profile_task.first_span_op_name)) + + return self.CheckResult(True, "") + + except TypeError as e: + print(e) + return self.CheckResult(False, "ProfileTask attributes has type error") + + def __cal_profile_task_finish_time(self, task: ProfileTask) -> int: + return task.start_time + task.duration * self.MINUTE_TO_MILLS + + From 339e95925d04ee2b360fc6c133c96f8bebd07db9 Mon Sep 17 00:00:00 2001 From: HumbertZhang <504490160@qq.com> Date: Sun, 11 Jul 2021 15:01:08 +0800 Subject: [PATCH 2/9] format profile log --- skywalking/command/command_service.py | 2 -- skywalking/command/executors/profile_task_command_executor.py | 2 +- skywalking/profile/profile_task_execution_service.py | 1 - 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/skywalking/command/command_service.py b/skywalking/command/command_service.py index c1771eba..34231ef9 100644 --- a/skywalking/command/command_service.py +++ b/skywalking/command/command_service.py @@ -22,8 +22,6 @@ def __init__(self): def dispatch(self): while True: command = self.__commands.get() # type: BaseCommand - logger.debug("dispatch command: %s", command) - if not self.__is_command_executed(command): command_executor_service.execute(command) self.__command_serial_number_cache.add(command.serial_number) diff --git a/skywalking/command/executors/profile_task_command_executor.py b/skywalking/command/executors/profile_task_command_executor.py index 55612ff7..b1c2b10f 100644 --- a/skywalking/command/executors/profile_task_command_executor.py +++ b/skywalking/command/executors/profile_task_command_executor.py @@ -8,7 +8,7 @@ class ProfileTaskCommandExecutor(CommandExecutor): def execute(self, command: ProfileTaskCommand): - logger.debug("start execute ProfileTaskCommand [{%s}]", command.serial_number) + logger.debug("ProfileTaskCommandExecutor start execute ProfileTaskCommand [{%s}]", command.serial_number) profile_task = ProfileTask(task_id=command.task_id, first_span_op_name=command.endpoint_name, diff --git a/skywalking/profile/profile_task_execution_service.py b/skywalking/profile/profile_task_execution_service.py index 54089679..c68a2890 100644 --- a/skywalking/profile/profile_task_execution_service.py +++ b/skywalking/profile/profile_task_execution_service.py @@ -75,7 +75,6 @@ def __check_profile_task(self, task: ProfileTask) -> CheckResult: return self.CheckResult(True, "") except TypeError as e: - print(e) return self.CheckResult(False, "ProfileTask attributes has type error") def __cal_profile_task_finish_time(self, task: ProfileTask) -> int: From 41bb5ed751cd509bbd942472727716f8554233f1 Mon Sep 17 00:00:00 2001 From: HumbertZhang <504490160@qq.com> Date: Sun, 11 Jul 2021 16:41:48 +0800 Subject: [PATCH 3/9] add configs for profile --- skywalking/agent/__init__.py | 17 ++++++++++------- skywalking/agent/protocol/__init__.py | 2 +- skywalking/agent/protocol/grpc.py | 3 ++- skywalking/command/__init__.py | 1 - skywalking/command/command_service.py | 3 +-- skywalking/config.py | 3 +++ skywalking/profile/__init__.py | 2 +- 7 files changed, 18 insertions(+), 13 deletions(-) diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index 7d1dc118..3917c07a 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -26,6 +26,7 @@ from skywalking.agent.protocol import Protocol from skywalking.command import command_service +from skywalking.config import profile_active, profile_task_query_interval if TYPE_CHECKING: from skywalking.trace.context import Segment @@ -33,7 +34,7 @@ __started = False __protocol = Protocol() # type: Protocol -__heartbeat_thread = __report_thread = __query_thread = __command_dispatch_thread = __queue = __finished = None +__heartbeat_thread = __report_thread = __query_profile_thread = __command_dispatch_thread = __queue = __finished = None def __heartbeat(): @@ -52,12 +53,12 @@ def __report(): __finished.wait(1) -def __query_command(): +def __query_profile_command(): while not __finished.is_set(): if connected(): - __protocol.query_commands() + __protocol.query_profile_commands() - __finished.wait(10) + __finished.wait(profile_task_query_interval) def __command_dispatch(): @@ -66,20 +67,22 @@ def __command_dispatch(): def __init_threading(): - global __heartbeat_thread, __report_thread, __query_thread, __command_dispatch_thread, __queue, __finished + global __heartbeat_thread, __report_thread, __query_profile_thread, __command_dispatch_thread, __queue, __finished __queue = Queue(maxsize=10000) __finished = Event() __heartbeat_thread = Thread(name='HeartbeatThread', target=__heartbeat, daemon=True) __report_thread = Thread(name='ReportThread', target=__report, daemon=True) - __query_thread = Thread(name='QueryCommandThread', target=__query_command, daemon=True) + __query_profile_thread = Thread(name='QueryProfileCommandThread', target=__query_profile_command, daemon=True) __command_dispatch_thread = Thread(name="CommandDispatchThread", target=__command_dispatch, daemon=True) __heartbeat_thread.start() __report_thread.start() - __query_thread.start() __command_dispatch_thread.start() + if profile_active: + __query_profile_thread.start() + def __init(): global __protocol diff --git a/skywalking/agent/protocol/__init__.py b/skywalking/agent/protocol/__init__.py index d26eeb9a..f47720d9 100644 --- a/skywalking/agent/protocol/__init__.py +++ b/skywalking/agent/protocol/__init__.py @@ -38,5 +38,5 @@ def heartbeat(self): def report(self, queue: Queue, block: bool = True): raise NotImplementedError() - def query_commands(self): + def query_profile_commands(self): raise NotImplementedError() diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py index 107195dd..4be57ff6 100644 --- a/skywalking/agent/protocol/grpc.py +++ b/skywalking/agent/protocol/grpc.py @@ -55,7 +55,8 @@ def _cb(self, state): except grpc.RpcError: self.on_error() - def query_commands(self): + def query_profile_commands(self): + logger.debug("query profile commands") self.profile_query.do_query() def heartbeat(self): diff --git a/skywalking/command/__init__.py b/skywalking/command/__init__.py index 722077c0..e623cbc8 100644 --- a/skywalking/command/__init__.py +++ b/skywalking/command/__init__.py @@ -1,4 +1,3 @@ from skywalking.command.command_service import CommandService command_service = CommandService() - diff --git a/skywalking/command/command_service.py b/skywalking/command/command_service.py index 34231ef9..dc544496 100644 --- a/skywalking/command/command_service.py +++ b/skywalking/command/command_service.py @@ -1,11 +1,9 @@ from collections import deque import queue -from typing import List from skywalking.loggings import logger from skywalking.command.base_command import BaseCommand from skywalking.command.profile_task_command import ProfileTaskCommand -from skywalking.command.executors.command_executor import CommandExecutor from skywalking.command.executors.profile_task_command_executor import ProfileTaskCommandExecutor from skywalking.command.executors import noop_command_executor_instance @@ -21,6 +19,7 @@ def __init__(self): def dispatch(self): while True: + # block until a command is available command = self.__commands.get() # type: BaseCommand if not self.__is_command_executed(command): command_executor_service.execute(command) diff --git a/skywalking/config.py b/skywalking/config.py index b1ff2e43..4ea20673 100644 --- a/skywalking/config.py +++ b/skywalking/config.py @@ -60,6 +60,9 @@ kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements" # type: str kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments" # type: str celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512') +profile_active = True if os.getenv('SW_AGENT_PROFILE_ACTIVE') and \ + os.getenv('SW_AGENT_PROFILE_ACTIVE') == 'True' else False # type: bool +profile_task_query_interval = int(os.getenv('SW_PROFILE_TASK_QUERY_INTERVAL') or '20') def init( diff --git a/skywalking/profile/__init__.py b/skywalking/profile/__init__.py index 92a6ec74..c4ffeca7 100644 --- a/skywalking/profile/__init__.py +++ b/skywalking/profile/__init__.py @@ -1,3 +1,3 @@ -from .profile_task_execution_service import ProfileTaskExecutionService +from skywalking.profile.profile_task_execution_service import ProfileTaskExecutionService profile_task_execution_service = ProfileTaskExecutionService() From bd88da5bc58cab49097756521612c633265b4cbf Mon Sep 17 00:00:00 2001 From: HumbertZhang <504490160@qq.com> Date: Sun, 11 Jul 2021 16:52:42 +0800 Subject: [PATCH 4/9] polish codes --- skywalking/agent/__init__.py | 3 +- skywalking/agent/protocol/grpc.py | 9 +++--- skywalking/command/__init__.py | 17 +++++++++++ skywalking/command/base_command.py | 17 +++++++++++ skywalking/command/command_service.py | 29 +++++++++++++++---- skywalking/command/executors/__init__.py | 17 +++++++++++ .../command/executors/command_executor.py | 18 +++++++++++- .../executors/noop_command_executor.py | 22 +++++++++++--- .../profile_task_command_executor.py | 18 +++++++++++- skywalking/command/profile_task_command.py | 22 ++++++++++++-- skywalking/profile/__init__.py | 17 +++++++++++ skywalking/profile/profile_constants.py | 18 ++++++++++++ skywalking/profile/profile_task.py | 17 +++++++++++ .../profile/profile_task_execution_service.py | 26 +++++++++++++---- 14 files changed, 225 insertions(+), 25 deletions(-) diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index 3917c07a..9ec960a9 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -22,11 +22,10 @@ from typing import TYPE_CHECKING from skywalking import config, plugins, loggings -from skywalking.loggings import logger from skywalking.agent.protocol import Protocol - from skywalking.command import command_service from skywalking.config import profile_active, profile_task_query_interval +from skywalking.loggings import logger if TYPE_CHECKING: from skywalking.trace.context import Segment diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py index 4be57ff6..6c4e353f 100644 --- a/skywalking/agent/protocol/grpc.py +++ b/skywalking/agent/protocol/grpc.py @@ -16,19 +16,20 @@ # import logging -from skywalking.loggings import logger import traceback from queue import Queue, Empty, Full from time import time import grpc +from skywalking.protocol.common.Common_pb2 import KeyStringValuePair +from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference from skywalking import config from skywalking.agent import Protocol from skywalking.agent.protocol.interceptors import header_adder_interceptor -from skywalking.client.grpc import GrpcServiceManagementClient, GrpcTraceSegmentReportService, GrpcProfileTaskChannelService -from skywalking.protocol.common.Common_pb2 import KeyStringValuePair -from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference +from skywalking.client.grpc import GrpcServiceManagementClient, GrpcTraceSegmentReportService, \ + GrpcProfileTaskChannelService +from skywalking.loggings import logger from skywalking.trace.segment import Segment diff --git a/skywalking/command/__init__.py b/skywalking/command/__init__.py index e623cbc8..1b98e386 100644 --- a/skywalking/command/__init__.py +++ b/skywalking/command/__init__.py @@ -1,3 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + from skywalking.command.command_service import CommandService command_service = CommandService() diff --git a/skywalking/command/base_command.py b/skywalking/command/base_command.py index 7f2890ac..95d991ab 100644 --- a/skywalking/command/base_command.py +++ b/skywalking/command/base_command.py @@ -1,3 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + class BaseCommand: def __init__(self, diff --git a/skywalking/command/command_service.py b/skywalking/command/command_service.py index dc544496..3be74b78 100644 --- a/skywalking/command/command_service.py +++ b/skywalking/command/command_service.py @@ -1,13 +1,30 @@ -from collections import deque +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + import queue +from collections import deque + +from skywalking.protocol.common.Common_pb2 import Commands, Command -from skywalking.loggings import logger from skywalking.command.base_command import BaseCommand -from skywalking.command.profile_task_command import ProfileTaskCommand -from skywalking.command.executors.profile_task_command_executor import ProfileTaskCommandExecutor from skywalking.command.executors import noop_command_executor_instance - -from skywalking.protocol.common.Common_pb2 import Commands, Command +from skywalking.command.executors.profile_task_command_executor import ProfileTaskCommandExecutor +from skywalking.command.profile_task_command import ProfileTaskCommand +from skywalking.loggings import logger class CommandService: diff --git a/skywalking/command/executors/__init__.py b/skywalking/command/executors/__init__.py index 209c28eb..a3f690b7 100644 --- a/skywalking/command/executors/__init__.py +++ b/skywalking/command/executors/__init__.py @@ -1,3 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + from skywalking.command.executors.noop_command_executor import NoopCommandExecutor noop_command_executor_instance = NoopCommandExecutor() diff --git a/skywalking/command/executors/command_executor.py b/skywalking/command/executors/command_executor.py index ee2c017e..dc2b73d4 100644 --- a/skywalking/command/executors/command_executor.py +++ b/skywalking/command/executors/command_executor.py @@ -1,7 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + from skywalking.command.base_command import BaseCommand class CommandExecutor: def execute(self, command: BaseCommand): raise NotImplementedError() - diff --git a/skywalking/command/executors/noop_command_executor.py b/skywalking/command/executors/noop_command_executor.py index 14b7e9cc..dbc97719 100644 --- a/skywalking/command/executors/noop_command_executor.py +++ b/skywalking/command/executors/noop_command_executor.py @@ -1,5 +1,22 @@ -from skywalking.command.executors.command_executor import CommandExecutor +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + from skywalking.command.base_command import BaseCommand +from skywalking.command.executors.command_executor import CommandExecutor class NoopCommandExecutor(CommandExecutor): @@ -8,6 +25,3 @@ def __init__(self): def execute(self, command: BaseCommand): pass - - - diff --git a/skywalking/command/executors/profile_task_command_executor.py b/skywalking/command/executors/profile_task_command_executor.py index b1c2b10f..53fa5aa1 100644 --- a/skywalking/command/executors/profile_task_command_executor.py +++ b/skywalking/command/executors/profile_task_command_executor.py @@ -1,3 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + from skywalking.command.executors.command_executor import CommandExecutor from skywalking.command.profile_task_command import ProfileTaskCommand from skywalking.loggings import logger @@ -20,4 +37,3 @@ def execute(self, command: ProfileTaskCommand): create_time=command.create_time) profile_task_execution_service.add_profile_task(profile_task) - diff --git a/skywalking/command/profile_task_command.py b/skywalking/command/profile_task_command.py index 99f4661f..579529ae 100644 --- a/skywalking/command/profile_task_command.py +++ b/skywalking/command/profile_task_command.py @@ -1,7 +1,25 @@ -from skywalking.utils.lang import tostring -from skywalking.command.base_command import BaseCommand +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + from skywalking.protocol.common.Common_pb2 import Command +from skywalking.command.base_command import BaseCommand +from skywalking.utils.lang import tostring + @tostring class ProfileTaskCommand(BaseCommand): diff --git a/skywalking/profile/__init__.py b/skywalking/profile/__init__.py index c4ffeca7..fdf00ab3 100644 --- a/skywalking/profile/__init__.py +++ b/skywalking/profile/__init__.py @@ -1,3 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + from skywalking.profile.profile_task_execution_service import ProfileTaskExecutionService profile_task_execution_service = ProfileTaskExecutionService() diff --git a/skywalking/profile/profile_constants.py b/skywalking/profile/profile_constants.py index 309d7bcf..2a94669c 100644 --- a/skywalking/profile/profile_constants.py +++ b/skywalking/profile/profile_constants.py @@ -1,3 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + class ProfileConstants: # Monitor duration must greater than 1 minutes TASK_DURATION_MIN_MINUTE = 1 diff --git a/skywalking/profile/profile_task.py b/skywalking/profile/profile_task.py index 7f91a36e..3dda3c60 100644 --- a/skywalking/profile/profile_task.py +++ b/skywalking/profile/profile_task.py @@ -1,3 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + from skywalking.utils.lang import tostring diff --git a/skywalking/profile/profile_task_execution_service.py b/skywalking/profile/profile_task_execution_service.py index c68a2890..d9cfb586 100644 --- a/skywalking/profile/profile_task_execution_service.py +++ b/skywalking/profile/profile_task_execution_service.py @@ -1,7 +1,25 @@ -from skywalking.profile.profile_task import ProfileTask -from skywalking.profile.profile_constants import ProfileConstants +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + from collections import deque + from skywalking.loggings import logger +from skywalking.profile.profile_constants import ProfileConstants +from skywalking.profile.profile_task import ProfileTask class ProfileTaskExecutionService: @@ -74,10 +92,8 @@ def __check_profile_task(self, task: ProfileTask) -> CheckResult: return self.CheckResult(True, "") - except TypeError as e: + except TypeError: return self.CheckResult(False, "ProfileTask attributes has type error") def __cal_profile_task_finish_time(self, task: ProfileTask) -> int: return task.start_time + task.duration * self.MINUTE_TO_MILLS - - From 5a748f59b425adb2ec0f8c9d9b0f2c498ae767fe Mon Sep 17 00:00:00 2001 From: HumbertZhang <504490160@qq.com> Date: Sun, 11 Jul 2021 17:05:16 +0800 Subject: [PATCH 5/9] update EnvVars doc --- docs/EnvVars.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/EnvVars.md b/docs/EnvVars.md index 46754332..c669a7f7 100644 --- a/docs/EnvVars.md +++ b/docs/EnvVars.md @@ -27,3 +27,5 @@ Environment Variable | Description | Default | `SW_KAFKA_REPORTER_TOPIC_SEGMENT` | Specifying Kafka topic name for Tracing data. | `skywalking-segments` | | `SW_KAFKA_REPORTER_CONFIG_key` | The configs to init KafkaProducer. it support the basic arguments (whose type is either `str`, `bool`, or `int`) listed [here](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer) | unset | | `SW_CELERY_PARAMETERS_LENGTH`| The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off | `512` | +| `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `False` | +| `SW_PROFILE_TASK_QUERY_INTERVAL` | Sniffer get profile task list interval. | `20` | From a4ae2b048aed915c6982c591a8a7b547af45f525 Mon Sep 17 00:00:00 2001 From: HumbertZhang <504490160@qq.com> Date: Sun, 11 Jul 2021 18:41:01 +0800 Subject: [PATCH 6/9] replace deque with Queue --- .../profile/profile_task_execution_service.py | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/skywalking/profile/profile_task_execution_service.py b/skywalking/profile/profile_task_execution_service.py index d9cfb586..b363b895 100644 --- a/skywalking/profile/profile_task_execution_service.py +++ b/skywalking/profile/profile_task_execution_service.py @@ -15,8 +15,7 @@ # limitations under the License. # -from collections import deque - +from queue import Queue from skywalking.loggings import logger from skywalking.profile.profile_constants import ProfileConstants from skywalking.profile.profile_task import ProfileTask @@ -26,7 +25,8 @@ class ProfileTaskExecutionService: MINUTE_TO_MILLS = 60000 def __init__(self): - self.__profile_task_list = deque() # type: deque + # Queue is thread safe + self.__profile_task_list = Queue() # type: Queue self.__last_command_create_time = -1 # type: int def get_last_command_create_time(self) -> int: @@ -44,7 +44,7 @@ def add_profile_task(self, task: ProfileTask): return # add task to list - self.__profile_task_list.append(task) + self.__profile_task_list.put(task) class CheckResult: def __init__(self, success: bool, error_reason: str): @@ -83,12 +83,14 @@ def __check_profile_task(self, task: ProfileTask) -> CheckResult: # check task queue task_finish_time = self.__cal_profile_task_finish_time(task) - for profile_task in self.__profile_task_list: # type: ProfileTask - # if the end time of the task to be added is during the execution of any data, means is a error data - if task.start_time <= task_finish_time <= self.__cal_profile_task_finish_time(profile_task): - return self.CheckResult(False, "there already have processing task in time range, " - "could not add a new task again. processing task monitor " - "endpoint name: {}".format(profile_task.first_span_op_name)) + # lock the self.__profile_task_list.queue when check the item in it, avoid concurrency errors + with self.__profile_task_list.mutex: + for profile_task in self.__profile_task_list.queue: # type: ProfileTask + # if the end time of the task to be added is during the execution of any data, means is a error data + if task.start_time <= task_finish_time <= self.__cal_profile_task_finish_time(profile_task): + return self.CheckResult(False, "there already have processing task in time range, " + "could not add a new task again. processing task monitor " + "endpoint name: {}".format(profile_task.first_span_op_name)) return self.CheckResult(True, "") From c33a4954e27a8f2f16ca92f641fe088176b7f9d0 Mon Sep 17 00:00:00 2001 From: HumbertZhang <504490160@qq.com> Date: Sun, 11 Jul 2021 19:14:09 +0800 Subject: [PATCH 7/9] polish CommandSerialNumberCache's deque code --- skywalking/command/command_service.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/skywalking/command/command_service.py b/skywalking/command/command_service.py index 3be74b78..e1d4bbf6 100644 --- a/skywalking/command/command_service.py +++ b/skywalking/command/command_service.py @@ -68,11 +68,10 @@ class CommandSerialNumberCache: def __init__(self, maxlen=64): self.queue = deque(maxlen=maxlen) - self.max_capacity = maxlen def add(self, number: str): - if len(self.queue) >= self.max_capacity: - self.queue.pop() + # Once a bounded length deque is full, when new items are added, + # a corresponding number of items are discarded from the opposite end. self.queue.append(number) def contains(self, number: str) -> bool: From 60f2970c0d560eb85efa4316d6a4d3c40d8e4540 Mon Sep 17 00:00:00 2001 From: HumbertZhang <504490160@qq.com> Date: Sun, 11 Jul 2021 19:40:42 +0800 Subject: [PATCH 8/9] modify protocol --- skywalking/agent/protocol/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skywalking/agent/protocol/__init__.py b/skywalking/agent/protocol/__init__.py index f47720d9..38f84826 100644 --- a/skywalking/agent/protocol/__init__.py +++ b/skywalking/agent/protocol/__init__.py @@ -39,4 +39,4 @@ def report(self, queue: Queue, block: bool = True): raise NotImplementedError() def query_profile_commands(self): - raise NotImplementedError() + pass From 08805d955b9fa8edfcb2d34410f7d1b55def9a0f Mon Sep 17 00:00:00 2001 From: HumbertZhang <504490160@qq.com> Date: Sun, 11 Jul 2021 20:11:50 +0800 Subject: [PATCH 9/9] modify according to code reviews and ci --- docs/EnvVars.md | 2 +- skywalking/command/base_command.py | 4 ++-- .../command/executors/command_executor.py | 4 +--- skywalking/command/profile_task_command.py | 18 +++++++++--------- skywalking/profile/profile_task.py | 16 ++++++++-------- .../profile/profile_task_execution_service.py | 4 ++-- 6 files changed, 23 insertions(+), 25 deletions(-) diff --git a/docs/EnvVars.md b/docs/EnvVars.md index c669a7f7..0484a998 100644 --- a/docs/EnvVars.md +++ b/docs/EnvVars.md @@ -28,4 +28,4 @@ Environment Variable | Description | Default | `SW_KAFKA_REPORTER_CONFIG_key` | The configs to init KafkaProducer. it support the basic arguments (whose type is either `str`, `bool`, or `int`) listed [here](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer) | unset | | `SW_CELERY_PARAMETERS_LENGTH`| The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off | `512` | | `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `False` | -| `SW_PROFILE_TASK_QUERY_INTERVAL` | Sniffer get profile task list interval. | `20` | +| `SW_PROFILE_TASK_QUERY_INTERVAL` | The number of seconds between two profile task query. | `20` | diff --git a/skywalking/command/base_command.py b/skywalking/command/base_command.py index 95d991ab..ab761502 100644 --- a/skywalking/command/base_command.py +++ b/skywalking/command/base_command.py @@ -18,7 +18,7 @@ class BaseCommand: def __init__(self, - command: str = None, - serial_number: str = None): + command: str = "", + serial_number: str = ""): self.command = command # type: str self.serial_number = serial_number # type: str diff --git a/skywalking/command/executors/command_executor.py b/skywalking/command/executors/command_executor.py index dc2b73d4..09c181d1 100644 --- a/skywalking/command/executors/command_executor.py +++ b/skywalking/command/executors/command_executor.py @@ -15,9 +15,7 @@ # limitations under the License. # -from skywalking.command.base_command import BaseCommand - class CommandExecutor: - def execute(self, command: BaseCommand): + def execute(self, command): raise NotImplementedError() diff --git a/skywalking/command/profile_task_command.py b/skywalking/command/profile_task_command.py index 579529ae..88607a0e 100644 --- a/skywalking/command/profile_task_command.py +++ b/skywalking/command/profile_task_command.py @@ -26,15 +26,15 @@ class ProfileTaskCommand(BaseCommand): NAME = "ProfileTaskQuery" def __init__(self, - serial_number: str = None, - task_id: str = None, - endpoint_name: str = None, - duration: int = None, - min_duration_threshold: int = None, - dump_period: int = None, - max_sampling_count: int = None, - start_time: int = None, - create_time: int = None): + serial_number: str = "", + task_id: str = "", + endpoint_name: str = "", + duration: int = -1, + min_duration_threshold: int = -1, + dump_period: int = -1, + max_sampling_count: int = -1, + start_time: int = -1, + create_time: int = -1): BaseCommand.__init__(self, self.NAME, serial_number) diff --git a/skywalking/profile/profile_task.py b/skywalking/profile/profile_task.py index 3dda3c60..a11df519 100644 --- a/skywalking/profile/profile_task.py +++ b/skywalking/profile/profile_task.py @@ -22,14 +22,14 @@ class ProfileTask: def __init__(self, - task_id: str = None, - first_span_op_name: str = None, - duration: int = None, - min_duration_threshold: int = None, - thread_dump_period: int = None, - max_sampling_count: int = None, - start_time: int = None, - create_time: int = None): + task_id: str = "", + first_span_op_name: str = "", + duration: int = -1, + min_duration_threshold: int = -1, + thread_dump_period: int = -1, + max_sampling_count: int = -1, + start_time: int = -1, + create_time: int = -1): self.task_id = str(task_id) # type: str self.first_span_op_name = str(first_span_op_name) # type: str self.duration = int(duration) # type: int diff --git a/skywalking/profile/profile_task_execution_service.py b/skywalking/profile/profile_task_execution_service.py index b363b895..35ab65d0 100644 --- a/skywalking/profile/profile_task_execution_service.py +++ b/skywalking/profile/profile_task_execution_service.py @@ -22,7 +22,7 @@ class ProfileTaskExecutionService: - MINUTE_TO_MILLS = 60000 + MINUTE_TO_MILLIS = 60000 def __init__(self): # Queue is thread safe @@ -98,4 +98,4 @@ def __check_profile_task(self, task: ProfileTask) -> CheckResult: return self.CheckResult(False, "ProfileTask attributes has type error") def __cal_profile_task_finish_time(self, task: ProfileTask) -> int: - return task.start_time + task.duration * self.MINUTE_TO_MILLS + return task.start_time + task.duration * self.MINUTE_TO_MILLIS