From 32661adec24629e50a3416f37d6e37fd8ec6ef5b Mon Sep 17 00:00:00 2001 From: Eric Erhardt Date: Thu, 2 Nov 2023 17:14:32 -0500 Subject: [PATCH] Implement RabbitMQ logging (#662) (#665) * Implement RabbitMQ logging Add an EventSource listener to listen to the RabbitMQ event source which logs info, warn, and error messages. Forward these messages to an ILogger. Fix #564 * Stop coding like a dinosaur. --- .../AspireRabbitMQExtensions.cs | 6 +- .../ConfigurationSchema.json | 9 + .../RabbitMQEventSourceLogForwarder.cs | 218 ++++++++++++++++++ src/Components/Telemetry.md | 2 +- 4 files changed, 233 insertions(+), 2 deletions(-) create mode 100644 src/Components/Aspire.RabbitMQ.Client/RabbitMQEventSourceLogForwarder.cs diff --git a/src/Components/Aspire.RabbitMQ.Client/AspireRabbitMQExtensions.cs b/src/Components/Aspire.RabbitMQ.Client/AspireRabbitMQExtensions.cs index b80b14816d..8dee478c60 100644 --- a/src/Components/Aspire.RabbitMQ.Client/AspireRabbitMQExtensions.cs +++ b/src/Components/Aspire.RabbitMQ.Client/AspireRabbitMQExtensions.cs @@ -76,7 +76,8 @@ private static void AddRabbitMQ( IConnectionFactory CreateConnectionFactory(IServiceProvider sp) { - var connectionString = settings.ConnectionString; + // ensure the log forwarder is initialized + sp.GetRequiredService().Start(); var factory = new ConnectionFactory(); @@ -84,6 +85,7 @@ IConnectionFactory CreateConnectionFactory(IServiceProvider sp) configurationOptionsSection.Bind(factory); // the connection string from settings should win over the one from the ConnectionFactory section + var connectionString = settings.ConnectionString; if (!string.IsNullOrEmpty(connectionString)) { factory.Uri = new(connectionString); @@ -105,6 +107,8 @@ IConnectionFactory CreateConnectionFactory(IServiceProvider sp) builder.Services.AddKeyedSingleton(serviceKey, (sp, key) => CreateConnection(sp.GetRequiredKeyedService(key), settings.MaxConnectRetryCount)); } + builder.Services.AddSingleton(); + if (settings.Tracing) { // Note that RabbitMQ.Client v6.6 doesn't have built-in support for tracing. See https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1261 diff --git a/src/Components/Aspire.RabbitMQ.Client/ConfigurationSchema.json b/src/Components/Aspire.RabbitMQ.Client/ConfigurationSchema.json index fdcd2edc21..a62e7f896d 100644 --- a/src/Components/Aspire.RabbitMQ.Client/ConfigurationSchema.json +++ b/src/Components/Aspire.RabbitMQ.Client/ConfigurationSchema.json @@ -1,4 +1,13 @@ { + "definitions": { + "logLevel": { + "properties": { + "RabbitMQ.Client": { + "$ref": "#/definitions/logLevelThreshold" + } + } + } + }, "properties": { "Aspire": { "type": "object", diff --git a/src/Components/Aspire.RabbitMQ.Client/RabbitMQEventSourceLogForwarder.cs b/src/Components/Aspire.RabbitMQ.Client/RabbitMQEventSourceLogForwarder.cs new file mode 100644 index 0000000000..e7064f215a --- /dev/null +++ b/src/Components/Aspire.RabbitMQ.Client/RabbitMQEventSourceLogForwarder.cs @@ -0,0 +1,218 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections; +using System.Diagnostics; +using System.Diagnostics.Tracing; +using Microsoft.Extensions.Logging; + +namespace Aspire.RabbitMQ.Client; + +internal sealed class RabbitMQEventSourceLogForwarder : IDisposable +{ + private static readonly Func s_formatErrorEvent = FormatErrorEvent; + private static readonly Func s_formatEvent = FormatEvent; + + private readonly ILogger _logger; + private RabbitMQEventSourceListener? _listener; + + public RabbitMQEventSourceLogForwarder(ILoggerFactory loggerFactory) + { + _logger = loggerFactory.CreateLogger("RabbitMQ.Client"); + } + + /// + /// Initiates the log forwarding from the RabbitMQ event sources to a provided , call to stop forwarding. + /// + public void Start() + { + _listener ??= new RabbitMQEventSourceListener(LogEvent, EventLevel.Verbose); + } + + private void LogEvent(EventWrittenEventArgs eventData) + { + var level = MapLevel(eventData.Level); + var eventId = new EventId(eventData.EventId, eventData.EventName); + + // Special case the Error event so the Exception Details are written correctly + if (eventData.EventId == 3 && + eventData.EventName == "Error" && + eventData.PayloadNames?.Count == 2 && + eventData.Payload?.Count == 2 && + eventData.PayloadNames[0] == "message" && + eventData.PayloadNames[1] == "ex") + { + _logger.Log(level, eventId, new ErrorEventSourceEvent(eventData), null, s_formatErrorEvent); + } + else + { + Debug.Assert( + (eventData.EventId == 1 && eventData.EventName == "Info") || + (eventData.EventId == 2 && eventData.EventName == "Warn")); + + _logger.Log(level, eventId, new EventSourceEvent(eventData), null, s_formatEvent); + } + } + + private static string FormatErrorEvent(ErrorEventSourceEvent eventSourceEvent, Exception? ex) => + eventSourceEvent.EventData.Payload?[0]?.ToString() ?? ""; + + private static string FormatEvent(EventSourceEvent eventSourceEvent, Exception? ex) => + eventSourceEvent.EventData.Payload?[0]?.ToString() ?? ""; + + public void Dispose() => _listener?.Dispose(); + + private static LogLevel MapLevel(EventLevel level) => level switch + { + EventLevel.Critical => LogLevel.Critical, + EventLevel.Error => LogLevel.Error, + EventLevel.Informational => LogLevel.Information, + EventLevel.Verbose => LogLevel.Debug, + EventLevel.Warning => LogLevel.Warning, + EventLevel.LogAlways => LogLevel.Information, + _ => throw new ArgumentOutOfRangeException(nameof(level), level, null), + }; + + private readonly struct EventSourceEvent : IReadOnlyList> + { + public EventWrittenEventArgs EventData { get; } + + public EventSourceEvent(EventWrittenEventArgs eventData) + { + // only Info and Warn events are expected, which always have 'message' as the only payload + Debug.Assert(eventData.PayloadNames?.Count == 1 && eventData.PayloadNames[0] == "message"); + + EventData = eventData; + } + + public IEnumerator> GetEnumerator() + { + for (var i = 0; i < Count; i++) + { + yield return this[i]; + } + } + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + public int Count => EventData.PayloadNames?.Count ?? 0; + + public KeyValuePair this[int index] => new(EventData.PayloadNames![index], EventData.Payload![index]); + } + + private readonly struct ErrorEventSourceEvent : IReadOnlyList> + { + public EventWrittenEventArgs EventData { get; } + + public ErrorEventSourceEvent(EventWrittenEventArgs eventData) + { + EventData = eventData; + } + + public IEnumerator> GetEnumerator() + { + for (var i = 0; i < Count; i++) + { + yield return this[i]; + } + } + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + public int Count => 5; + + public KeyValuePair this[int index] + { + get + { + Debug.Assert(EventData.PayloadNames?.Count == 2 && EventData.Payload?.Count == 2); + Debug.Assert(EventData.PayloadNames[0] == "message"); + Debug.Assert(EventData.PayloadNames[1] == "ex"); + + ArgumentOutOfRangeException.ThrowIfGreaterThanOrEqual(index, 5); + + return index switch + { + 0 => new(EventData.PayloadNames[0], EventData.Payload[0]), + < 5 => GetExData(EventData, index), + _ => throw new UnreachableException() + }; + + static KeyValuePair GetExData(EventWrittenEventArgs eventData, int index) + { + Debug.Assert(index >= 1 && index <= 4); + Debug.Assert(eventData.Payload?.Count == 2); + var exData = eventData.Payload[1] as IDictionary; + Debug.Assert(exData is not null && exData.Count == 4); + + return index switch + { + 1 => new("ex.Type", exData["Type"]), + 2 => new("ex.Message", exData["Message"]), + 3 => new("ex.StackTrace", exData["StackTrace"]), + 4 => new("ex.InnerException", exData["InnerException"]), + _ => throw new UnreachableException() + }; + } + } + } + } + + /// + /// Implementation of that listens to events produced by the RabbitMQ.Client library. + /// + private sealed class RabbitMQEventSourceListener : EventListener + { + private readonly List _eventSources = new List(); + + private readonly Action _log; + private readonly EventLevel _level; + + public RabbitMQEventSourceListener(Action log, EventLevel level) + { + _log = log; + _level = level; + + foreach (EventSource eventSource in _eventSources) + { + OnEventSourceCreated(eventSource); + } + + _eventSources.Clear(); + } + + protected sealed override void OnEventSourceCreated(EventSource eventSource) + { + base.OnEventSourceCreated(eventSource); + + if (_log == null) + { + _eventSources.Add(eventSource); + } + + if (eventSource.Name == "rabbitmq-dotnet-client" || eventSource.Name == "rabbitmq-client") + { + EnableEvents(eventSource, _level); + } + } + + protected sealed override void OnEventWritten(EventWrittenEventArgs eventData) + { + // Workaround https://github.com/dotnet/corefx/issues/42600 + if (eventData.EventId == -1) + { + return; + } + + // There is a very tight race during the listener creation where EnableEvents was called + // and the thread producing events not observing the `_log` field assignment + _log?.Invoke(eventData); + } + } +} diff --git a/src/Components/Telemetry.md b/src/Components/Telemetry.md index 715f8d8a9f..b81469c780 100644 --- a/src/Components/Telemetry.md +++ b/src/Components/Telemetry.md @@ -191,7 +191,7 @@ Aspire.Npgsql.EntityFrameworkCore.PostgreSQL: Aspire.RabbitMQ.Client: - Log categories: - - TODO + - "RabbitMQ.Client" - Activity source names: - "Aspire.RabbitMQ.Client" - Metric names: