Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Profile function #155

Merged
merged 12 commits into from
Aug 22, 2021
6 changes: 5 additions & 1 deletion docs/EnvVars.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ Environment Variable | Description | Default
| `SW_KAFKA_REPORTER_TOPIC_LOG` | Specifying Kafka topic name for Log data. | `skywalking-logs` |
| `SW_KAFKA_REPORTER_CONFIG_key` | The configs to init KafkaProducer. it support the basic arguments (whose type is either `str`, `bool`, or `int`) listed [here](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer) | unset |
| `SW_CELERY_PARAMETERS_LENGTH`| The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off | `512` |
| `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `False` |
| `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `True` |
| `SW_PROFILE_TASK_QUERY_INTERVAL` | The number of seconds between two profile task query. | `20` |
| `SW_AGENT_PROFILE_MAX_PARALLEL` | The number of parallel monitor segment count. | `5` |
| `SW_AGENT_PROFILE_DURATION` | The maximum monitor segment time(minutes), if current segment monitor time out of limit, then stop it. | `10` |
| `SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH` | The number of max dump thread stack depth | `500` |
| `SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE` | The number of snapshot transport to backend buffer size | `50` |
| `SW_AGENT_LOG_REPORTER_ACTIVE` | If `True`, Python agent will report collected logs to the OAP or Satellite. Otherwise, it disables the feature. | `False` |
| `SW_AGENT_LOG_REPORTER_BUFFER_SIZE` | The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped. | `10000` |
| `SW_AGENT_LOG_REPORTER_LEVEL` | This config specifies the logger levels of concern, any logs with a level below the config will be ignored. | `WARNING` |
Expand Down
51 changes: 44 additions & 7 deletions skywalking/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@
from skywalking import loggings
from skywalking.agent.protocol import Protocol
from skywalking.command import command_service
from skywalking.config import profile_active, profile_task_query_interval
from skywalking.loggings import logger
from skywalking import profile
from skywalking.profile.profile_task import ProfileTask
from skywalking.profile.snapshot import TracingThreadSnapshot

if TYPE_CHECKING:
from skywalking.trace.context import Segment

__started = False
__protocol = Protocol() # type: Protocol
__heartbeat_thread = __report_thread = __log_report_thread = __query_profile_thread = __command_dispatch_thread \
= __queue = __log_queue = __finished = None
= __send_profile_thread = __queue = __log_queue = __snapshot_queue = __finished = None


def __heartbeat():
Expand Down Expand Up @@ -69,14 +71,24 @@ def __report_log():
__finished.wait(0)


def __send_profile_snapshot():
while not __finished.is_set():
try:
__protocol.send_snapshot(__snapshot_queue)
except Exception as exc:
logger.error(str(exc))

__finished.wait(0.5)


def __query_profile_command():
while not __finished.is_set():
try:
__protocol.query_profile_commands()
except Exception as exc:
logger.error(str(exc))

__finished.wait(profile_task_query_interval)
__finished.wait(config.get_profile_task_interval)


def __command_dispatch():
Expand All @@ -86,13 +98,12 @@ def __command_dispatch():

def __init_threading():
global __heartbeat_thread, __report_thread, __log_report_thread, __query_profile_thread, \
__command_dispatch_thread, __queue, __log_queue, __finished
__command_dispatch_thread, __send_profile_thread, __queue, __log_queue, __snapshot_queue, __finished

__queue = Queue(maxsize=config.max_buffer_size)
__finished = Event()
__heartbeat_thread = Thread(name='HeartbeatThread', target=__heartbeat, daemon=True)
__report_thread = Thread(name='ReportThread', target=__report, daemon=True)
__query_profile_thread = Thread(name='QueryProfileCommandThread', target=__query_profile_command, daemon=True)
__command_dispatch_thread = Thread(name="CommandDispatchThread", target=__command_dispatch, daemon=True)

__heartbeat_thread.start()
Expand All @@ -104,13 +115,18 @@ def __init_threading():
__log_report_thread = Thread(name='LogReportThread', target=__report_log, daemon=True)
__log_report_thread.start()

if profile_active:
if config.profile_active:
__snapshot_queue = Queue(maxsize=config.profile_snapshot_transport_buffer_size)

__query_profile_thread = Thread(name='QueryProfileCommandThread', target=__query_profile_command, daemon=True)
__query_profile_thread.start()

__send_profile_thread = Thread(name='SendProfileSnapShotThread', target=__send_profile_snapshot, daemon=True)
__send_profile_thread.start()


def __init():
global __protocol

if config.protocol == 'grpc':
from skywalking.agent.protocol.grpc import GrpcProtocol
__protocol = GrpcProtocol()
Expand All @@ -132,9 +148,15 @@ def __init():
def __fini():
__protocol.report(__queue, False)
__queue.join()

if config.log_reporter_active:
__protocol.report_log(__log_queue, False)
__log_queue.join()

if config.profile_active:
__protocol.send_snapshot(__snapshot_queue, False)
__snapshot_queue.join()

__finished.set()


Expand Down Expand Up @@ -175,6 +197,7 @@ def start():

loggings.init()
config.finalize()
profile.init()

__init()

Expand Down Expand Up @@ -210,3 +233,17 @@ def archive_log(log_data: 'LogData'):
__log_queue.put(log_data, block=False)
except Full:
logger.warning('the queue is full, the log will be abandoned')


def add_profiling_snapshot(snapshot: TracingThreadSnapshot):
try:
__snapshot_queue.put(snapshot)
except Full:
logger.warning('the snapshot queue is full, the snapshot will be abandoned')


def notify_profile_finish(task: ProfileTask):
try:
__protocol.notify_profile_task_finish(task)
except Exception as e:
logger.error("notify profile task finish to backend fail. " + str(e))
6 changes: 6 additions & 0 deletions skywalking/agent/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,9 @@ def report_log(self, queue: Queue, block: bool = True):

def query_profile_commands(self):
pass

def send_snapshot(self, queue: Queue, block: bool = True):
pass

def notify_profile_task_finish(self, task):
pass
40 changes: 38 additions & 2 deletions skywalking/agent/protocol/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference
from skywalking.protocol.logging.Logging_pb2 import LogData
from skywalking.protocol.profile.Profile_pb2 import ThreadSnapshot, ThreadStack

from skywalking import config
from skywalking.agent import Protocol
Expand All @@ -32,6 +33,8 @@
GrpcProfileTaskChannelService, GrpcLogDataReportService
from skywalking.loggings import logger
from skywalking.trace.segment import Segment
from skywalking.profile.snapshot import TracingThreadSnapshot
from skywalking.profile.profile_task import ProfileTask


class GrpcProtocol(Protocol):
Expand All @@ -52,7 +55,7 @@ def __init__(self):
self.channel.subscribe(self._cb, try_to_connect=True)
self.service_management = GrpcServiceManagementClient(self.channel)
self.traces_reporter = GrpcTraceSegmentReportService(self.channel)
self.profile_query = GrpcProfileTaskChannelService(self.channel)
self.profile_channel = GrpcProfileTaskChannelService(self.channel)
self.log_reporter = GrpcLogDataReportService(self.channel)

def _cb(self, state):
Expand All @@ -61,7 +64,10 @@ def _cb(self, state):

def query_profile_commands(self):
logger.debug("query profile commands")
self.profile_query.do_query()
self.profile_channel.do_query()

def notify_profile_task_finish(self, task: ProfileTask):
self.profile_channel.finish(task)

def heartbeat(self):
try:
Expand Down Expand Up @@ -163,3 +169,33 @@ def generator():
self.log_reporter.report(generator())
except grpc.RpcError:
self.on_error()

def send_snapshot(self, queue: Queue, block: bool = True):
start = time()

def generator():
while True:
try:
timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
if timeout <= 0:
return
snapshot = queue.get(block=block, timeout=timeout) # type: TracingThreadSnapshot
except Empty:
return

queue.task_done()

transform_snapshot = ThreadSnapshot(
taskId=str(snapshot.task_id),
traceSegmentId=str(snapshot.trace_segment_id),
time=int(snapshot.time),
sequence=int(snapshot.sequence),
stack=ThreadStack(codeSignatures=snapshot.stack_list)
)

yield transform_snapshot

try:
self.profile_channel.send(generator())
except grpc.RpcError:
self.on_error()
3 changes: 3 additions & 0 deletions skywalking/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ def report(self, generator):
class ProfileTaskChannelService(object):
def do_query(self):
raise NotImplementedError()

def send(self, generator):
raise NotImplementedError()
20 changes: 16 additions & 4 deletions skywalking/client/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
import grpc
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
from skywalking.protocol.language_agent.Tracing_pb2_grpc import TraceSegmentReportServiceStub
from skywalking.protocol.profile.Profile_pb2_grpc import ProfileTaskStub
from skywalking.protocol.profile.Profile_pb2 import ProfileTaskCommandQuery, ProfileTaskFinishReport
from skywalking.protocol.logging.Logging_pb2_grpc import LogReportServiceStub
from skywalking.protocol.management.Management_pb2 import InstancePingPkg, InstanceProperties
from skywalking.protocol.management.Management_pb2_grpc import ManagementServiceStub
from skywalking.protocol.profile.Profile_pb2 import ProfileTaskCommandQuery
from skywalking.protocol.profile.Profile_pb2_grpc import ProfileTaskStub

from skywalking import config
from skywalking.client import ServiceManagementClient, TraceSegmentReportService, ProfileTaskChannelService, \
LogDataReportService
from skywalking.command import command_service
from skywalking.loggings import logger
from skywalking.profile import profile_task_execution_service
from skywalking.profile.profile_task import ProfileTask


class GrpcServiceManagementClient(ServiceManagementClient):
Expand Down Expand Up @@ -73,7 +74,7 @@ def report(self, generator):

class GrpcProfileTaskChannelService(ProfileTaskChannelService):
def __init__(self, channel: grpc.Channel):
self.task_stub = ProfileTaskStub(channel)
self.profile_stub = ProfileTaskStub(channel)

def do_query(self):

Expand All @@ -83,5 +84,16 @@ def do_query(self):
lastCommandTime=profile_task_execution_service.get_last_command_create_time()
)

commands = self.task_stub.getProfileTaskCommands(query)
commands = self.profile_stub.getProfileTaskCommands(query)
command_service.receive_command(commands)

def send(self, generator):
self.profile_stub.collectSnapshot(generator)

def finish(self, task: ProfileTask):
finish_report = ProfileTaskFinishReport(
service=config.service_name,
serviceInstance=config.service_instance,
taskId=task.task_id
)
self.profile_stub.reportTaskFinish(finish_report)
20 changes: 10 additions & 10 deletions skywalking/command/command_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,38 +30,38 @@
class CommandService:

def __init__(self):
self.__commands = queue.Queue() # type: queue.Queue
self._commands = queue.Queue() # type: queue.Queue
# don't execute same command twice
self.__command_serial_number_cache = CommandSerialNumberCache()
self._command_serial_number_cache = CommandSerialNumberCache()

def dispatch(self):
while True:
# block until a command is available
command = self.__commands.get() # type: BaseCommand
command = self._commands.get() # type: BaseCommand
if not self.__is_command_executed(command):
command_executor_service.execute(command)
self.__command_serial_number_cache.add(command.serial_number)
self._command_serial_number_cache.add(command.serial_number)

def __is_command_executed(self, command: BaseCommand):
return self.__command_serial_number_cache.contains(command.serial_number)
return self._command_serial_number_cache.contains(command.serial_number)

def receive_command(self, commands: Commands):
for command in commands.commands:
try:
base_command = CommandDeserializer.deserialize(command)
logger.debug("Received command [{%s} {%s}]", base_command.command, base_command.serial_number)
logger.debug("received command [{%s} {%s}]", base_command.command, base_command.serial_number)

if self.__is_command_executed(base_command):
logger.warning("Command[{%s}] is executed, ignored.", base_command.command)
logger.warning("command[{%s}] is executed, ignored.", base_command.command)
continue

try:
self.__commands.put(base_command)
self._commands.put(base_command)
except queue.Full:
logger.warning("Command[{%s}, {%s}] cannot add to command list. because the command list is full.",
logger.warning("command[{%s}, {%s}] cannot add to command list. because the command list is full.",
base_command.command, base_command.serial_number)
except UnsupportedCommandException as e:
logger.warning("Received unsupported command[{%s}].", e.command.command)
logger.warning("received unsupported command[{%s}].", e.command.command)


class CommandSerialNumberCache:
Expand Down
7 changes: 2 additions & 5 deletions skywalking/command/executors/profile_task_command_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@

from skywalking.command.executors.command_executor import CommandExecutor
from skywalking.command.profile_task_command import ProfileTaskCommand
from skywalking.loggings import logger
from skywalking.profile import profile_task_execution_service
from skywalking import profile
from skywalking.profile.profile_task import ProfileTask


class ProfileTaskCommandExecutor(CommandExecutor):

def execute(self, command: ProfileTaskCommand):
logger.debug("ProfileTaskCommandExecutor start execute ProfileTaskCommand [{%s}]", command.serial_number)

profile_task = ProfileTask(task_id=command.task_id,
first_span_op_name=command.endpoint_name,
duration=command.duration,
Expand All @@ -36,4 +33,4 @@ def execute(self, command: ProfileTaskCommand):
start_time=command.start_time,
create_time=command.create_time)

profile_task_execution_service.add_profile_task(profile_task)
profile.profile_task_execution_service.add_profile_task(profile_task)
12 changes: 9 additions & 3 deletions skywalking/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,15 @@
kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments" # type: str
kafka_topic_log = os.getenv('SW_KAFKA_REPORTER_TOPIC_LOG') or "skywalking-logs" # type: str
celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512')
profile_active = True if os.getenv('SW_AGENT_PROFILE_ACTIVE') and \
os.getenv('SW_AGENT_PROFILE_ACTIVE') == 'True' else False # type: bool
profile_task_query_interval = int(os.getenv('SW_PROFILE_TASK_QUERY_INTERVAL') or '20')

# profile configs
get_profile_task_interval = int(os.getenv('SW_PROFILE_TASK_QUERY_INTERVAL') or '20') # type: int
profile_active = False if os.getenv('SW_AGENT_PROFILE_ACTIVE') and \
os.getenv('SW_AGENT_PROFILE_ACTIVE') == 'False' else True # type: bool
profile_max_parallel = int(os.getenv("SW_AGENT_PROFILE_MAX_PARALLEL") or '5') # type: int
profile_duration = int(os.getenv('SW_AGENT_PROFILE_DURATION') or '10') # type: int
profile_dump_max_stack_depth = int(os.getenv('SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH') or '500') # type: int
profile_snapshot_transport_buffer_size = int(os.getenv('SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE') or '50')

# NOTE - Log reporting requires a separate channel, will merge in the future.
log_reporter_active = True if os.getenv('SW_AGENT_LOG_REPORTER_ACTIVE') and \
Expand Down
12 changes: 10 additions & 2 deletions skywalking/profile/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
# limitations under the License.
#

from skywalking.profile.profile_task_execution_service import ProfileTaskExecutionService
profile_task_execution_service = None

profile_task_execution_service = ProfileTaskExecutionService()

def init():
from skywalking.profile.profile_service import ProfileTaskExecutionService

global profile_task_execution_service
if profile_task_execution_service:
return

profile_task_execution_service = ProfileTaskExecutionService()
Loading