Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query and dispatch ProfileTaskCommand #127

Merged
merged 11 commits into from
Jul 12, 2021
2 changes: 2 additions & 0 deletions docs/EnvVars.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ Environment Variable | Description | Default
| `SW_KAFKA_REPORTER_TOPIC_SEGMENT` | Specifying Kafka topic name for Tracing data. | `skywalking-segments` |
| `SW_KAFKA_REPORTER_CONFIG_key` | The configs to init KafkaProducer. it support the basic arguments (whose type is either `str`, `bool`, or `int`) listed [here](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer) | unset |
| `SW_CELERY_PARAMETERS_LENGTH`| The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off | `512` |
| `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `False` |
Humbertzhang marked this conversation as resolved.
Show resolved Hide resolved
| `SW_PROFILE_TASK_QUERY_INTERVAL` | The number of seconds between two profile task query. | `20` |
27 changes: 24 additions & 3 deletions skywalking/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@
from typing import TYPE_CHECKING

from skywalking import config, plugins, loggings
from skywalking.loggings import logger
from skywalking.agent.protocol import Protocol
from skywalking.command import command_service
from skywalking.config import profile_active, profile_task_query_interval
from skywalking.loggings import logger

if TYPE_CHECKING:
from skywalking.trace.context import Segment


__started = False
__protocol = Protocol() # type: Protocol
__heartbeat_thread = __report_thread = __queue = __finished = None
__heartbeat_thread = __report_thread = __query_profile_thread = __command_dispatch_thread = __queue = __finished = None


def __heartbeat():
Expand All @@ -50,16 +52,35 @@ def __report():
__finished.wait(1)


def __query_profile_command():
while not __finished.is_set():
if connected():
__protocol.query_profile_commands()

__finished.wait(profile_task_query_interval)


def __command_dispatch():
# command dispatch will stuck when there are no commands
command_service.dispatch()


def __init_threading():
global __heartbeat_thread, __report_thread, __queue, __finished
global __heartbeat_thread, __report_thread, __query_profile_thread, __command_dispatch_thread, __queue, __finished

__queue = Queue(maxsize=10000)
__finished = Event()
__heartbeat_thread = Thread(name='HeartbeatThread', target=__heartbeat, daemon=True)
__report_thread = Thread(name='ReportThread', target=__report, daemon=True)
__query_profile_thread = Thread(name='QueryProfileCommandThread', target=__query_profile_command, daemon=True)
__command_dispatch_thread = Thread(name="CommandDispatchThread", target=__command_dispatch, daemon=True)

__heartbeat_thread.start()
__report_thread.start()
__command_dispatch_thread.start()

if profile_active:
__query_profile_thread.start()


def __init():
Expand Down
3 changes: 3 additions & 0 deletions skywalking/agent/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ def heartbeat(self):

def report(self, queue: Queue, block: bool = True):
raise NotImplementedError()

def query_profile_commands(self):
pass
14 changes: 10 additions & 4 deletions skywalking/agent/protocol/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@
#

import logging
from skywalking.loggings import logger
import traceback
from queue import Queue, Empty, Full
from time import time

import grpc
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference

from skywalking import config
from skywalking.agent import Protocol
from skywalking.agent.protocol.interceptors import header_adder_interceptor
from skywalking.client.grpc import GrpcServiceManagementClient, GrpcTraceSegmentReportService
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference
from skywalking.client.grpc import GrpcServiceManagementClient, GrpcTraceSegmentReportService, \
GrpcProfileTaskChannelService
from skywalking.loggings import logger
from skywalking.trace.segment import Segment


Expand All @@ -44,6 +45,7 @@ def __init__(self):
self.channel.subscribe(self._cb, try_to_connect=True)
self.service_management = GrpcServiceManagementClient(self.channel)
self.traces_reporter = GrpcTraceSegmentReportService(self.channel)
self.profile_query = GrpcProfileTaskChannelService(self.channel)

def _cb(self, state):
logger.debug('grpc channel connectivity changed, [%s -> %s]', self.state, state)
Expand All @@ -54,6 +56,10 @@ def _cb(self, state):
except grpc.RpcError:
self.on_error()

def query_profile_commands(self):
logger.debug("query profile commands")
self.profile_query.do_query()

def heartbeat(self):
try:
self.service_management.send_heart_beat()
Expand Down
5 changes: 5 additions & 0 deletions skywalking/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,8 @@ def send_heart_beat(self):
class TraceSegmentReportService(object):
def report(self, generator):
raise NotImplementedError()


class ProfileTaskChannelService(object):
def do_query(self):
raise NotImplementedError()
23 changes: 22 additions & 1 deletion skywalking/client/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@
import grpc

from skywalking import config
from skywalking.client import ServiceManagementClient, TraceSegmentReportService
from skywalking.client import ServiceManagementClient, TraceSegmentReportService, ProfileTaskChannelService
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
from skywalking.protocol.language_agent.Tracing_pb2_grpc import TraceSegmentReportServiceStub
from skywalking.protocol.profile.Profile_pb2_grpc import ProfileTaskStub
from skywalking.protocol.profile.Profile_pb2 import ProfileTaskCommandQuery
from skywalking.protocol.management.Management_pb2 import InstancePingPkg, InstanceProperties
from skywalking.protocol.management.Management_pb2_grpc import ManagementServiceStub

from skywalking.command import command_service
from skywalking.profile import profile_task_execution_service


class GrpcServiceManagementClient(ServiceManagementClient):
def __init__(self, channel: grpc.Channel):
Expand Down Expand Up @@ -56,3 +61,19 @@ def __init__(self, channel: grpc.Channel):

def report(self, generator):
self.report_stub.collect(generator, timeout=config.GRPC_TIMEOUT)


class GrpcProfileTaskChannelService(ProfileTaskChannelService):
def __init__(self, channel: grpc.Channel):
self.task_stub = ProfileTaskStub(channel)

def do_query(self):

query = ProfileTaskCommandQuery(
service=config.service_name,
serviceInstance=config.service_instance,
lastCommandTime=profile_task_execution_service.get_last_command_create_time()
)

commands = self.task_stub.getProfileTaskCommands(query)
command_service.receive_command(commands)
20 changes: 20 additions & 0 deletions skywalking/command/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from skywalking.command.command_service import CommandService

command_service = CommandService()
24 changes: 24 additions & 0 deletions skywalking/command/base_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


class BaseCommand:
def __init__(self,
command: str = "",
serial_number: str = ""):
self.command = command # type: str
self.serial_number = serial_number # type: str
122 changes: 122 additions & 0 deletions skywalking/command/command_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import queue
from collections import deque

from skywalking.protocol.common.Common_pb2 import Commands, Command

from skywalking.command.base_command import BaseCommand
from skywalking.command.executors import noop_command_executor_instance
from skywalking.command.executors.profile_task_command_executor import ProfileTaskCommandExecutor
from skywalking.command.profile_task_command import ProfileTaskCommand
from skywalking.loggings import logger


class CommandService:

def __init__(self):
self.__commands = queue.Queue() # type: queue.Queue
# don't execute same command twice
self.__command_serial_number_cache = CommandSerialNumberCache()

def dispatch(self):
while True:
# block until a command is available
command = self.__commands.get() # type: BaseCommand
if not self.__is_command_executed(command):
command_executor_service.execute(command)
self.__command_serial_number_cache.add(command.serial_number)

def __is_command_executed(self, command: BaseCommand):
return self.__command_serial_number_cache.contains(command.serial_number)

def receive_command(self, commands: Commands):
for command in commands.commands:
try:
base_command = CommandDeserializer.deserialize(command)
logger.debug("Received command [{%s} {%s}]", base_command.command, base_command.serial_number)

if self.__is_command_executed(base_command):
logger.warning("Command[{%s}] is executed, ignored.", base_command.command)
continue

try:
self.__commands.put(base_command)
except queue.Full:
logger.warning("Command[{%s}, {%s}] cannot add to command list. because the command list is full.",
base_command.command, base_command.serial_number)
except UnsupportedCommandException as e:
logger.warning("Received unsupported command[{%s}].", e.command.command)


class CommandSerialNumberCache:

def __init__(self, maxlen=64):
self.queue = deque(maxlen=maxlen)

def add(self, number: str):
# Once a bounded length deque is full, when new items are added,
# a corresponding number of items are discarded from the opposite end.
self.queue.append(number)

def contains(self, number: str) -> bool:
try:
_ = self.queue.index(number)
return True
except ValueError:
return False


class CommandExecutorService:
"""
route commands to appropriate executor
"""

def __init__(self):
self.__command_executor_map = {ProfileTaskCommand.NAME: ProfileTaskCommandExecutor()}

def execute(self, command: BaseCommand):
self.__executor_for_command(command).execute(command)

def __executor_for_command(self, command: BaseCommand):
executor = self.__command_executor_map.get(command.command)
if not executor:
return noop_command_executor_instance
return executor


class CommandDeserializer:

@staticmethod
def deserialize(command: Command) -> BaseCommand:
command_name = command.command

if ProfileTaskCommand.NAME == command_name:
return ProfileTaskCommand.deserialize(command)
else:
raise UnsupportedCommandException(command)


class UnsupportedCommandException(Exception):

def __init__(self, command):
self.command = command


# init
command_executor_service = CommandExecutorService()
20 changes: 20 additions & 0 deletions skywalking/command/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from skywalking.command.executors.noop_command_executor import NoopCommandExecutor

noop_command_executor_instance = NoopCommandExecutor()
21 changes: 21 additions & 0 deletions skywalking/command/executors/command_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


class CommandExecutor:
def execute(self, command):
raise NotImplementedError()
Loading