Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: CAP trace broken #575

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 61 additions & 33 deletions src/SkyApm.Diagnostics.CAP/CapDiagnosticProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/*
* Licensed to the SkyAPM under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Expand All @@ -25,6 +25,7 @@
using SkyApm.Tracing.Segments;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text.Json;
using CapEvents = DotNetCore.CAP.Diagnostics.CapDiagnosticListenerNames;

Expand All @@ -35,7 +36,9 @@ namespace SkyApm.Diagnostics.CAP
/// </summary>
public class CapTracingDiagnosticProcessor : ITracingDiagnosticProcessor
{
private readonly ConcurrentDictionary<string, SegmentContext> _contexts = new ConcurrentDictionary<string, SegmentContext>();
private readonly ConcurrentDictionary<string, SegmentContext> _contexts =
new ConcurrentDictionary<string, SegmentContext>();

public string ListenerName => CapEvents.DiagnosticListenerName;

private const string OperateNamePrefix = "CAP/";
Expand All @@ -46,18 +49,21 @@ public class CapTracingDiagnosticProcessor : ITracingDiagnosticProcessor
private readonly IEntrySegmentContextAccessor _entrySegmentContextAccessor;
private readonly IExitSegmentContextAccessor _exitSegmentContextAccessor;
private readonly ILocalSegmentContextAccessor _localSegmentContextAccessor;
private readonly ICarrierPropagator _carrierPropagator;
private readonly TracingConfig _tracingConfig;

public CapTracingDiagnosticProcessor(ITracingContext tracingContext,
IEntrySegmentContextAccessor entrySegmentContextAccessor,
IExitSegmentContextAccessor exitSegmentContextAccessor,
ILocalSegmentContextAccessor localSegmentContextAccessor,
ICarrierPropagator carrierPropagator,
IConfigAccessor configAccessor)
{
_tracingContext = tracingContext;
_exitSegmentContextAccessor = exitSegmentContextAccessor;
_localSegmentContextAccessor = localSegmentContextAccessor;
_entrySegmentContextAccessor = entrySegmentContextAccessor;
_carrierPropagator = carrierPropagator;
_tracingConfig = configAccessor.Get<TracingConfig>();
}

Expand All @@ -82,8 +88,8 @@ public void AfterPublishStore([Object] CapEventDataPubStore eventData)

context.Span.AddLog(LogEvent.Event("Event Persistence End"));
context.Span.AddLog(LogEvent.Message($"CAP message persistence succeeded!{Environment.NewLine}" +
$"--> Spend Time: { eventData.ElapsedTimeMs }ms.{Environment.NewLine}" +
$"--> Message Id: { eventData.Message.GetId() } , Name: { eventData.Operation} "));
$"--> Spend Time: {eventData.ElapsedTimeMs}ms.{Environment.NewLine}" +
$"--> Message Id: {eventData.Message.GetId()} , Name: {eventData.Operation} "));

_tracingContext.Release(context);
}
Expand All @@ -97,7 +103,7 @@ public void ErrorPublishStore([Object] CapEventDataPubStore eventData)
context.Span.AddLog(LogEvent.Event("Event Persistence Error"));
context.Span.AddLog(LogEvent.Message($"CAP message persistence failed!{Environment.NewLine}" +
$"--> Message Info:{Environment.NewLine}" +
$"{ JsonSerializer.Serialize(eventData.Message)}"));
$"{JsonSerializer.Serialize(eventData.Message)}"));

context.Span.ErrorOccurred(eventData.Exception, _tracingConfig);
_tracingContext.Release(context);
Expand All @@ -108,19 +114,29 @@ public void BeforePublish([Object] CapEventDataPubSend eventData)
{
SegmentContext context = null;
var host = eventData.BrokerAddress.Endpoint?.Replace("-1", "5672");
if (_contexts.TryGetValue(eventData.TransportMessage.GetId(),out var ctx))
if (_contexts.TryGetValue(eventData.TransportMessage.GetId(), out var ctx))
{
_localSegmentContextAccessor.Context = ctx;
context = _tracingContext.CreateExitSegmentContext(OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix,
host, new CapCarrierHeaderCollection(eventData.TransportMessage));

var header = new CapCarrierHeaderCollection(eventData.TransportMessage);
if (_entrySegmentContextAccessor.Context == null)
{
_carrierPropagator.Inject(ctx, header);
context = _tracingContext.CreateEntrySegmentContext(
OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix,
header);
}
else
{
context = _tracingContext.CreateExitSegmentContext(
OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix,
host, header);
_carrierPropagator.Inject(context, new CapCarrierHeaderCollection(eventData.TransportMessage));
}
}
else
{
// may be come from retry loop
var carrierHeader = new CapCarrierHeaderCollection(eventData.TransportMessage);
var eventName = OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix;
var operationName = OperateNamePrefix + eventName + ConsumerOperateNameSuffix;
var operationName = OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix;
context = _tracingContext.CreateEntrySegmentContext(operationName, carrierHeader);
}

Expand All @@ -137,13 +153,13 @@ public void BeforePublish([Object] CapEventDataPubSend eventData)
[DiagnosticName(CapEvents.AfterPublish)]
public void AfterPublish([Object] CapEventDataPubSend eventData)
{
var context = _exitSegmentContextAccessor.Context;
var context = _exitSegmentContextAccessor.Context ?? _entrySegmentContextAccessor.Context;
if (context == null) return;

context.Span.AddLog(LogEvent.Event("Event Publishing End"));
context.Span.AddLog(LogEvent.Message($"CAP message publishing succeeded!{Environment.NewLine}" +
$"--> Spend Time: { eventData.ElapsedTimeMs }ms. {Environment.NewLine}" +
$"--> Message Id: { eventData.TransportMessage.GetId() }, Name: {eventData.Operation}"));
$"--> Spend Time: {eventData.ElapsedTimeMs}ms. {Environment.NewLine}" +
$"--> Message Id: {eventData.TransportMessage.GetId()}, Name: {eventData.Operation}"));

_tracingContext.Release(context);

Expand All @@ -158,8 +174,8 @@ public void ErrorPublish([Object] CapEventDataPubSend eventData)

context.Span.AddLog(LogEvent.Event("Event Publishing Error"));
context.Span.AddLog(LogEvent.Message($"CAP message publishing failed!{Environment.NewLine}" +
$"--> Spend Time: { eventData.ElapsedTimeMs }ms. {Environment.NewLine}" +
$"--> Message Id: { eventData.TransportMessage.GetId() }, Name: {eventData.Operation}"));
$"--> Spend Time: {eventData.ElapsedTimeMs}ms. {Environment.NewLine}" +
$"--> Message Id: {eventData.TransportMessage.GetId()}, Name: {eventData.Operation}"));
context.Span.ErrorOccurred(eventData.Exception, _tracingConfig);

_tracingContext.Release(context);
Expand Down Expand Up @@ -194,8 +210,8 @@ public void CapAfterConsume([Object] CapEventDataSubStore eventData)

context.Span.AddLog(LogEvent.Event("Event Persistence End"));
context.Span.AddLog(LogEvent.Message($"CAP message persistence succeeded!{Environment.NewLine}" +
$"--> Spend Time: { eventData.ElapsedTimeMs }ms. {Environment.NewLine}" +
$"--> Message Id: { eventData.TransportMessage.GetId() }, Group: {eventData.TransportMessage.GetGroup()}, Name: {eventData.Operation}"));
$"--> Spend Time: {eventData.ElapsedTimeMs}ms. {Environment.NewLine}" +
$"--> Message Id: {eventData.TransportMessage.GetId()}, Group: {eventData.TransportMessage.GetGroup()}, Name: {eventData.Operation}"));

_tracingContext.Release(context);
}
Expand All @@ -208,8 +224,8 @@ public void CapErrorConsume([Object] CapEventDataSubStore eventData)

context.Span.AddLog(LogEvent.Event("Event Persistence Error"));
context.Span.AddLog(LogEvent.Message($"CAP message publishing failed! {Environment.NewLine}" +
$"--> Spend Time: { eventData.ElapsedTimeMs }ms. {Environment.NewLine}" +
$"--> Message Id: { eventData.TransportMessage.GetId() }, Group: {eventData.TransportMessage.GetGroup()}, Name: {eventData.Operation}"));
$"--> Spend Time: {eventData.ElapsedTimeMs}ms. {Environment.NewLine}" +
$"--> Message Id: {eventData.TransportMessage.GetId()}, Group: {eventData.TransportMessage.GetGroup()}, Name: {eventData.Operation}"));
context.Span.ErrorOccurred(eventData.Exception, _tracingConfig);

_tracingContext.Release(context);
Expand All @@ -219,10 +235,19 @@ public void CapErrorConsume([Object] CapEventDataSubStore eventData)
public void CapBeforeSubscriberInvoke([Object] CapEventDataSubExecute eventData)
{
SegmentContext context = null;
if (_contexts.TryGetValue(eventData.Message.GetId() + eventData.Message.GetGroup(),out var ctx))
if (_contexts.TryGetValue(eventData.Message.GetId() + eventData.Message.GetGroup(), out var ctx))
{
_entrySegmentContextAccessor.Context = ctx;
context = _tracingContext.CreateLocalSegmentContext("Subscriber Invoke: " + eventData.MethodInfo.Name);
if (_entrySegmentContextAccessor.Context == null)
{
context = _tracingContext.CreateEntrySegmentContext(
"Subscriber Invoke: " + eventData.MethodInfo.Name,
new CapCarrierHeaderCollection(eventData.Message));
}
else
{
context = _tracingContext.CreateLocalSegmentContext("Subscriber Invoke: " +
eventData.MethodInfo.Name);
}
}
else
{
Expand All @@ -236,8 +261,9 @@ public void CapBeforeSubscriberInvoke([Object] CapEventDataSubExecute eventData)
context.Span.SpanLayer = SpanLayer.MQ;
context.Span.Component = Components.CAP;
context.Span.AddLog(LogEvent.Event("Subscriber Invoke Start"));
context.Span.AddLog(LogEvent.Message($"Begin invoke the subscriber: {eventData.MethodInfo} {Environment.NewLine}" +
$"--> Message Id: { eventData.Message.GetId()}, Group: {eventData.Message.GetGroup()}, Name: {eventData.Operation}"));
context.Span.AddLog(LogEvent.Message(
$"Begin invoke the subscriber: {eventData.MethodInfo} {Environment.NewLine}" +
$"--> Message Id: {eventData.Message.GetId()}, Group: {eventData.Message.GetGroup()}, Name: {eventData.Operation}"));

_contexts[eventData.Message.GetId() + eventData.Message.GetGroup()] = context;
}
Expand All @@ -250,8 +276,9 @@ public void CapAfterSubscriberInvoke([Object] CapEventDataSubExecute eventData)

context.Span.AddLog(LogEvent.Event("Subscriber Invoke End"));
context.Span.AddLog(LogEvent.Message("Subscriber invoke succeeded!"));
context.Span.AddLog(LogEvent.Message($"Subscriber invoke spend time: { eventData.ElapsedTimeMs}ms. {Environment.NewLine}" +
$"--> Method Info: {eventData.MethodInfo}"));
context.Span.AddLog(LogEvent.Message(
$"Subscriber invoke spend time: {eventData.ElapsedTimeMs}ms. {Environment.NewLine}" +
$"--> Method Info: {eventData.MethodInfo}"));

_tracingContext.Release(context);

Expand All @@ -266,9 +293,9 @@ public void CapErrorSubscriberInvoke([Object] CapEventDataSubExecute eventData)

context.Span.AddLog(LogEvent.Event("Subscriber Invoke Error"));
context.Span.AddLog(LogEvent.Message($"Subscriber invoke failed! {Environment.NewLine}" +
$"--> Method Info: { eventData.MethodInfo} {Environment.NewLine}" +
$"--> Method Info: {eventData.MethodInfo} {Environment.NewLine}" +
$"--> Message Info: {Environment.NewLine}" +
$"{ JsonSerializer.Serialize(eventData.Message)}"));
$"{JsonSerializer.Serialize(eventData.Message)}"));

context.Span.ErrorOccurred(eventData.Exception, _tracingConfig);

Expand All @@ -284,9 +311,9 @@ private StringOrIntValue GetComponent(BrokerAddress address, bool isPub)
switch (address.Name)
{
case "RabbitMQ":
return 52; // "rabbitmq-producer";
return 52; // "rabbitmq-producer";
case "Kafka":
return 40; //"kafka-producer";
return 40; //"kafka-producer";
}
}
else
Expand All @@ -299,7 +326,8 @@ private StringOrIntValue GetComponent(BrokerAddress address, bool isPub)
return 41; // "kafka-consumer";
}
}

return Components.CAP;
}
}
}
}
Loading