diff --git a/src/SkyApm.Diagnostics.CAP/CapDiagnosticProcessor.cs b/src/SkyApm.Diagnostics.CAP/CapDiagnosticProcessor.cs index e6f4184c..e28f83b1 100644 --- a/src/SkyApm.Diagnostics.CAP/CapDiagnosticProcessor.cs +++ b/src/SkyApm.Diagnostics.CAP/CapDiagnosticProcessor.cs @@ -1,4 +1,4 @@ -/* +/* * Licensed to the SkyAPM under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -25,6 +25,7 @@ using SkyApm.Tracing.Segments; using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Text.Json; using CapEvents = DotNetCore.CAP.Diagnostics.CapDiagnosticListenerNames; @@ -35,7 +36,9 @@ namespace SkyApm.Diagnostics.CAP /// public class CapTracingDiagnosticProcessor : ITracingDiagnosticProcessor { - private readonly ConcurrentDictionary _contexts = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _contexts = + new ConcurrentDictionary(); + public string ListenerName => CapEvents.DiagnosticListenerName; private const string OperateNamePrefix = "CAP/"; @@ -46,18 +49,21 @@ public class CapTracingDiagnosticProcessor : ITracingDiagnosticProcessor private readonly IEntrySegmentContextAccessor _entrySegmentContextAccessor; private readonly IExitSegmentContextAccessor _exitSegmentContextAccessor; private readonly ILocalSegmentContextAccessor _localSegmentContextAccessor; + private readonly ICarrierPropagator _carrierPropagator; private readonly TracingConfig _tracingConfig; public CapTracingDiagnosticProcessor(ITracingContext tracingContext, IEntrySegmentContextAccessor entrySegmentContextAccessor, IExitSegmentContextAccessor exitSegmentContextAccessor, ILocalSegmentContextAccessor localSegmentContextAccessor, + ICarrierPropagator carrierPropagator, IConfigAccessor configAccessor) { _tracingContext = tracingContext; _exitSegmentContextAccessor = exitSegmentContextAccessor; _localSegmentContextAccessor = localSegmentContextAccessor; _entrySegmentContextAccessor = entrySegmentContextAccessor; + _carrierPropagator = carrierPropagator; _tracingConfig = configAccessor.Get(); } @@ -82,8 +88,8 @@ public void AfterPublishStore([Object] CapEventDataPubStore eventData) context.Span.AddLog(LogEvent.Event("Event Persistence End")); context.Span.AddLog(LogEvent.Message($"CAP message persistence succeeded!{Environment.NewLine}" + - $"--> Spend Time: { eventData.ElapsedTimeMs }ms.{Environment.NewLine}" + - $"--> Message Id: { eventData.Message.GetId() } , Name: { eventData.Operation} ")); + $"--> Spend Time: {eventData.ElapsedTimeMs}ms.{Environment.NewLine}" + + $"--> Message Id: {eventData.Message.GetId()} , Name: {eventData.Operation} ")); _tracingContext.Release(context); } @@ -97,7 +103,7 @@ public void ErrorPublishStore([Object] CapEventDataPubStore eventData) context.Span.AddLog(LogEvent.Event("Event Persistence Error")); context.Span.AddLog(LogEvent.Message($"CAP message persistence failed!{Environment.NewLine}" + $"--> Message Info:{Environment.NewLine}" + - $"{ JsonSerializer.Serialize(eventData.Message)}")); + $"{JsonSerializer.Serialize(eventData.Message)}")); context.Span.ErrorOccurred(eventData.Exception, _tracingConfig); _tracingContext.Release(context); @@ -108,19 +114,29 @@ public void BeforePublish([Object] CapEventDataPubSend eventData) { SegmentContext context = null; var host = eventData.BrokerAddress.Endpoint?.Replace("-1", "5672"); - if (_contexts.TryGetValue(eventData.TransportMessage.GetId(),out var ctx)) + if (_contexts.TryGetValue(eventData.TransportMessage.GetId(), out var ctx)) { - _localSegmentContextAccessor.Context = ctx; - context = _tracingContext.CreateExitSegmentContext(OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix, - host, new CapCarrierHeaderCollection(eventData.TransportMessage)); - + var header = new CapCarrierHeaderCollection(eventData.TransportMessage); + if (_entrySegmentContextAccessor.Context == null) + { + _carrierPropagator.Inject(ctx, header); + context = _tracingContext.CreateEntrySegmentContext( + OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix, + header); + } + else + { + context = _tracingContext.CreateExitSegmentContext( + OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix, + host, header); + _carrierPropagator.Inject(context, new CapCarrierHeaderCollection(eventData.TransportMessage)); + } } else { // may be come from retry loop var carrierHeader = new CapCarrierHeaderCollection(eventData.TransportMessage); - var eventName = OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix; - var operationName = OperateNamePrefix + eventName + ConsumerOperateNameSuffix; + var operationName = OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix; context = _tracingContext.CreateEntrySegmentContext(operationName, carrierHeader); } @@ -137,13 +153,13 @@ public void BeforePublish([Object] CapEventDataPubSend eventData) [DiagnosticName(CapEvents.AfterPublish)] public void AfterPublish([Object] CapEventDataPubSend eventData) { - var context = _exitSegmentContextAccessor.Context; + var context = _exitSegmentContextAccessor.Context ?? _entrySegmentContextAccessor.Context; if (context == null) return; context.Span.AddLog(LogEvent.Event("Event Publishing End")); context.Span.AddLog(LogEvent.Message($"CAP message publishing succeeded!{Environment.NewLine}" + - $"--> Spend Time: { eventData.ElapsedTimeMs }ms. {Environment.NewLine}" + - $"--> Message Id: { eventData.TransportMessage.GetId() }, Name: {eventData.Operation}")); + $"--> Spend Time: {eventData.ElapsedTimeMs}ms. {Environment.NewLine}" + + $"--> Message Id: {eventData.TransportMessage.GetId()}, Name: {eventData.Operation}")); _tracingContext.Release(context); @@ -158,8 +174,8 @@ public void ErrorPublish([Object] CapEventDataPubSend eventData) context.Span.AddLog(LogEvent.Event("Event Publishing Error")); context.Span.AddLog(LogEvent.Message($"CAP message publishing failed!{Environment.NewLine}" + - $"--> Spend Time: { eventData.ElapsedTimeMs }ms. {Environment.NewLine}" + - $"--> Message Id: { eventData.TransportMessage.GetId() }, Name: {eventData.Operation}")); + $"--> Spend Time: {eventData.ElapsedTimeMs}ms. {Environment.NewLine}" + + $"--> Message Id: {eventData.TransportMessage.GetId()}, Name: {eventData.Operation}")); context.Span.ErrorOccurred(eventData.Exception, _tracingConfig); _tracingContext.Release(context); @@ -194,8 +210,8 @@ public void CapAfterConsume([Object] CapEventDataSubStore eventData) context.Span.AddLog(LogEvent.Event("Event Persistence End")); context.Span.AddLog(LogEvent.Message($"CAP message persistence succeeded!{Environment.NewLine}" + - $"--> Spend Time: { eventData.ElapsedTimeMs }ms. {Environment.NewLine}" + - $"--> Message Id: { eventData.TransportMessage.GetId() }, Group: {eventData.TransportMessage.GetGroup()}, Name: {eventData.Operation}")); + $"--> Spend Time: {eventData.ElapsedTimeMs}ms. {Environment.NewLine}" + + $"--> Message Id: {eventData.TransportMessage.GetId()}, Group: {eventData.TransportMessage.GetGroup()}, Name: {eventData.Operation}")); _tracingContext.Release(context); } @@ -208,8 +224,8 @@ public void CapErrorConsume([Object] CapEventDataSubStore eventData) context.Span.AddLog(LogEvent.Event("Event Persistence Error")); context.Span.AddLog(LogEvent.Message($"CAP message publishing failed! {Environment.NewLine}" + - $"--> Spend Time: { eventData.ElapsedTimeMs }ms. {Environment.NewLine}" + - $"--> Message Id: { eventData.TransportMessage.GetId() }, Group: {eventData.TransportMessage.GetGroup()}, Name: {eventData.Operation}")); + $"--> Spend Time: {eventData.ElapsedTimeMs}ms. {Environment.NewLine}" + + $"--> Message Id: {eventData.TransportMessage.GetId()}, Group: {eventData.TransportMessage.GetGroup()}, Name: {eventData.Operation}")); context.Span.ErrorOccurred(eventData.Exception, _tracingConfig); _tracingContext.Release(context); @@ -219,10 +235,19 @@ public void CapErrorConsume([Object] CapEventDataSubStore eventData) public void CapBeforeSubscriberInvoke([Object] CapEventDataSubExecute eventData) { SegmentContext context = null; - if (_contexts.TryGetValue(eventData.Message.GetId() + eventData.Message.GetGroup(),out var ctx)) + if (_contexts.TryGetValue(eventData.Message.GetId() + eventData.Message.GetGroup(), out var ctx)) { - _entrySegmentContextAccessor.Context = ctx; - context = _tracingContext.CreateLocalSegmentContext("Subscriber Invoke: " + eventData.MethodInfo.Name); + if (_entrySegmentContextAccessor.Context == null) + { + context = _tracingContext.CreateEntrySegmentContext( + "Subscriber Invoke: " + eventData.MethodInfo.Name, + new CapCarrierHeaderCollection(eventData.Message)); + } + else + { + context = _tracingContext.CreateLocalSegmentContext("Subscriber Invoke: " + + eventData.MethodInfo.Name); + } } else { @@ -236,8 +261,9 @@ public void CapBeforeSubscriberInvoke([Object] CapEventDataSubExecute eventData) context.Span.SpanLayer = SpanLayer.MQ; context.Span.Component = Components.CAP; context.Span.AddLog(LogEvent.Event("Subscriber Invoke Start")); - context.Span.AddLog(LogEvent.Message($"Begin invoke the subscriber: {eventData.MethodInfo} {Environment.NewLine}" + - $"--> Message Id: { eventData.Message.GetId()}, Group: {eventData.Message.GetGroup()}, Name: {eventData.Operation}")); + context.Span.AddLog(LogEvent.Message( + $"Begin invoke the subscriber: {eventData.MethodInfo} {Environment.NewLine}" + + $"--> Message Id: {eventData.Message.GetId()}, Group: {eventData.Message.GetGroup()}, Name: {eventData.Operation}")); _contexts[eventData.Message.GetId() + eventData.Message.GetGroup()] = context; } @@ -250,8 +276,9 @@ public void CapAfterSubscriberInvoke([Object] CapEventDataSubExecute eventData) context.Span.AddLog(LogEvent.Event("Subscriber Invoke End")); context.Span.AddLog(LogEvent.Message("Subscriber invoke succeeded!")); - context.Span.AddLog(LogEvent.Message($"Subscriber invoke spend time: { eventData.ElapsedTimeMs}ms. {Environment.NewLine}" + - $"--> Method Info: {eventData.MethodInfo}")); + context.Span.AddLog(LogEvent.Message( + $"Subscriber invoke spend time: {eventData.ElapsedTimeMs}ms. {Environment.NewLine}" + + $"--> Method Info: {eventData.MethodInfo}")); _tracingContext.Release(context); @@ -266,9 +293,9 @@ public void CapErrorSubscriberInvoke([Object] CapEventDataSubExecute eventData) context.Span.AddLog(LogEvent.Event("Subscriber Invoke Error")); context.Span.AddLog(LogEvent.Message($"Subscriber invoke failed! {Environment.NewLine}" + - $"--> Method Info: { eventData.MethodInfo} {Environment.NewLine}" + + $"--> Method Info: {eventData.MethodInfo} {Environment.NewLine}" + $"--> Message Info: {Environment.NewLine}" + - $"{ JsonSerializer.Serialize(eventData.Message)}")); + $"{JsonSerializer.Serialize(eventData.Message)}")); context.Span.ErrorOccurred(eventData.Exception, _tracingConfig); @@ -284,9 +311,9 @@ private StringOrIntValue GetComponent(BrokerAddress address, bool isPub) switch (address.Name) { case "RabbitMQ": - return 52; // "rabbitmq-producer"; + return 52; // "rabbitmq-producer"; case "Kafka": - return 40; //"kafka-producer"; + return 40; //"kafka-producer"; } } else @@ -299,7 +326,8 @@ private StringOrIntValue GetComponent(BrokerAddress address, bool isPub) return 41; // "kafka-consumer"; } } + return Components.CAP; } } -} \ No newline at end of file +}