From 6b8f2e5a80aeded2e149201c4aee8efe721e4f03 Mon Sep 17 00:00:00 2001 From: Andrew Lock Date: Tue, 7 Nov 2023 16:27:44 +0000 Subject: [PATCH] Add de-duplication of telemetry logs --- .../Formatting/EventIdHash.cs | 16 +- .../Logging/Internal/RedactedErrorLogSink.cs | 9 +- .../Collectors/RedactedErrorLogCollector.cs | 36 ++- .../RedactedErrorLogCollectorTests.cs | 218 ++++++++++++++++-- 4 files changed, 254 insertions(+), 25 deletions(-) diff --git a/tracer/src/Datadog.Trace/Logging/DirectSubmission/Formatting/EventIdHash.cs b/tracer/src/Datadog.Trace/Logging/DirectSubmission/Formatting/EventIdHash.cs index d61c5529d33b..174fc89cc063 100644 --- a/tracer/src/Datadog.Trace/Logging/DirectSubmission/Formatting/EventIdHash.cs +++ b/tracer/src/Datadog.Trace/Logging/DirectSubmission/Formatting/EventIdHash.cs @@ -30,13 +30,15 @@ namespace Datadog.Trace.Logging.DirectSubmission.Formatting internal static class EventIdHash { /// - /// Compute a 32-bit hash of the provided . The + /// Compute a 32-bit hash of the provided . + /// and optionally a . The /// resulting hash value can be uses as an event id in lieu of transmitting the /// full template string. /// /// A message template. + /// An optional stack trace to consider in the calculation. /// A 32-bit hash of the template. - public static uint Compute(string messageTemplate) + public static uint Compute(string messageTemplate, string? stackTrace = null) { if (messageTemplate == null) { @@ -54,6 +56,16 @@ public static uint Compute(string messageTemplate) hash ^= (hash >> 6); } + if (stackTrace is not null) + { + for (var i = 0; i < stackTrace.Length; ++i) + { + hash += stackTrace[i]; + hash += (hash << 10); + hash ^= (hash >> 6); + } + } + hash += (hash << 3); hash ^= (hash >> 11); hash += (hash << 15); diff --git a/tracer/src/Datadog.Trace/Logging/Internal/RedactedErrorLogSink.cs b/tracer/src/Datadog.Trace/Logging/Internal/RedactedErrorLogSink.cs index 27b6006d20a2..3ce778ed849b 100644 --- a/tracer/src/Datadog.Trace/Logging/Internal/RedactedErrorLogSink.cs +++ b/tracer/src/Datadog.Trace/Logging/Internal/RedactedErrorLogSink.cs @@ -9,7 +9,6 @@ using Datadog.Trace.ClrProfiler; using Datadog.Trace.Telemetry; using Datadog.Trace.Telemetry.Collectors; -using Datadog.Trace.Telemetry.DTOs; using Datadog.Trace.Vendors.Serilog.Core; using Datadog.Trace.Vendors.Serilog.Events; @@ -33,12 +32,8 @@ public void Emit(LogEvent? logEvent) } // Note: we're using the raw message template here to remove any chance of including customer information - var telemetryLog = new LogMessageData(logEvent.MessageTemplate.Text, ToLogLevel(logEvent.Level), logEvent.Timestamp) - { - StackTrace = logEvent.Exception is { } ex ? ExceptionRedactor.Redact(ex) : null - }; - - _collector.EnqueueLog(telemetryLog); + var stackTrace = logEvent.Exception is { } ex ? ExceptionRedactor.Redact(ex) : null; + _collector.EnqueueLog(logEvent.MessageTemplate.Text, ToLogLevel(logEvent.Level), logEvent.Timestamp, stackTrace); } private static TelemetryLogLevel ToLogLevel(LogEventLevel logEventLevel) diff --git a/tracer/src/Datadog.Trace/Telemetry/Collectors/RedactedErrorLogCollector.cs b/tracer/src/Datadog.Trace/Telemetry/Collectors/RedactedErrorLogCollector.cs index c6feccbe53fa..d5046d854103 100644 --- a/tracer/src/Datadog.Trace/Telemetry/Collectors/RedactedErrorLogCollector.cs +++ b/tracer/src/Datadog.Trace/Telemetry/Collectors/RedactedErrorLogCollector.cs @@ -5,9 +5,12 @@ #nullable enable +using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using Datadog.Trace.Logging.DirectSubmission.Formatting; using Datadog.Trace.Telemetry.DTOs; using Datadog.Trace.Util; @@ -25,15 +28,28 @@ internal class RedactedErrorLogCollector private readonly BoundedConcurrentQueue _queue = new(MaximumQueueSize); private TaskCompletionSource _tcs = new(TaskOptions); + private ConcurrentDictionary _logCounts = new(); + private ConcurrentDictionary _logCountsReserve = new(); + public List>? GetLogs() { // This method should only be called in a single-threaded loop List>? batches = null; var batchSize = 0; List? logs = null; + var logCounts = Interlocked.Exchange(ref _logCounts, _logCountsReserve)!; + while (_queue.TryDequeue(out var log)) { var logSize = log.GetApproximateSerializationSize(); + // modify the message to add the final log count + var eventId = EventIdHash.Compute(log.Message, log.StackTrace); + if (logCounts.TryGetValue(eventId, out var count) && count > 1) + { + // TODO: Add the count to the message (not yet supported in Telemetry) + // log.InstanceCount = count - 1; + } + if (batchSize + logSize < MaximumBatchSizeBytes) { batchSize += log.GetApproximateSerializationSize(); @@ -61,12 +77,28 @@ internal class RedactedErrorLogCollector batches.Add(logs); } + logCounts.Clear(); + _logCountsReserve = logCounts; + return batches; } - public void EnqueueLog(LogMessageData log) + // For testing + internal void EnqueueLog(LogMessageData log) + => EnqueueLog(log.Message, log.Level, DateTimeOffset.FromUnixTimeSeconds(log.TracerTime), log.StackTrace); + + public void EnqueueLog(string message, TelemetryLogLevel logLevel, DateTimeOffset timestamp, string? stackTrace) { - _queue.TryEnqueue(log); + var eventId = EventIdHash.Compute(message, stackTrace); + var logCounts = _logCounts; + var newCount = logCounts.AddOrUpdate(eventId, addValue: 1, updateValueFactory: static (_, prev) => prev + 1); + if (newCount != 1) + { + // already have this log, so don't enqueue it again + return; + } + + _queue.TryEnqueue(new(message, logLevel, timestamp) { StackTrace = stackTrace }); if (_queue.Count > QueueSizeTrigger && _tcs is { Task.IsCompleted: false } tcs) { diff --git a/tracer/test/Datadog.Trace.Tests/Telemetry/Collectors/RedactedErrorLogCollectorTests.cs b/tracer/test/Datadog.Trace.Tests/Telemetry/Collectors/RedactedErrorLogCollectorTests.cs index 9a330345ac51..6ba0b8bba38b 100644 --- a/tracer/test/Datadog.Trace.Tests/Telemetry/Collectors/RedactedErrorLogCollectorTests.cs +++ b/tracer/test/Datadog.Trace.Tests/Telemetry/Collectors/RedactedErrorLogCollectorTests.cs @@ -4,9 +4,9 @@ // using System; -using System.IO; using System.Linq; using System.Text; +using Datadog.Trace.Logging.Internal; using Datadog.Trace.Telemetry; using Datadog.Trace.Telemetry.Collectors; using Datadog.Trace.Telemetry.DTOs; @@ -23,10 +23,10 @@ public void DoesNotQueueMoreThanMaximumQueueSize() { var collector = new RedactedErrorLogCollector(); var messagesToSend = RedactedErrorLogCollector.MaximumQueueSize * 2; - while (messagesToSend > 0) + for (var i = messagesToSend; i > 0; i--) { - collector.EnqueueLog(new("Something", TelemetryLogLevel.WARN, DateTimeOffset.UtcNow)); - messagesToSend--; + // Make messages unique to avoid de-duplication + collector.EnqueueLog(new($"Something {i}", TelemetryLogLevel.WARN, DateTimeOffset.UtcNow)); } var logs = collector.GetLogs(); @@ -40,21 +40,19 @@ public void CompletesTaskWhenQueueSizeTriggerIsReached() var collector = new RedactedErrorLogCollector(); var threshold = RedactedErrorLogCollector.QueueSizeTrigger; var task = collector.WaitForLogsAsync(); - var messages = 0; - while (messages < threshold) + for (var i = 0; i < threshold; i++) { - collector.EnqueueLog(new("Something", TelemetryLogLevel.WARN, DateTimeOffset.UtcNow)); - messages++; + // make the messages unique to avoid de-duplication + collector.EnqueueLog(new($"Something {i}", TelemetryLogLevel.WARN, DateTimeOffset.UtcNow)); task.IsCompleted.Should().BeFalse(); } // sending one more message should complete the task // and should remain completed subsequently - messages = 0; - while (messages < 100) + for (var i = 0; i < 100; i++) { - collector.EnqueueLog(new("Something", TelemetryLogLevel.WARN, DateTimeOffset.UtcNow)); - messages++; + // make the messages unique to avoid de-duplication + collector.EnqueueLog(new($"Something {i + threshold}", TelemetryLogLevel.WARN, DateTimeOffset.UtcNow)); task.IsCompleted.Should().BeTrue(); } } @@ -119,7 +117,8 @@ public void SeparatesLogsIntoBatches() var collector = new RedactedErrorLogCollector(); // using a big string to make sure we don't exceed queue size - var log = new LogMessageData(RandomString(10_000), TelemetryLogLevel.WARN, DateTimeOffset.UtcNow); + var randomString = RandomString(10_000); + var log = new LogMessageData(randomString + "_", TelemetryLogLevel.WARN, DateTimeOffset.UtcNow); var logSize = log.GetApproximateSerializationSize(); var logsPerBatch = RedactedErrorLogCollector.MaximumBatchSizeBytes / logSize; @@ -130,7 +129,8 @@ public void SeparatesLogsIntoBatches() for (var i = 0; i < logsToSend; i++) { - collector.EnqueueLog(log); + // using a different log each time to avoid de-duplication + collector.EnqueueLog(new LogMessageData(randomString + i, TelemetryLogLevel.WARN, DateTimeOffset.UtcNow)); } var logs = collector.GetLogs(); @@ -155,6 +155,196 @@ public void RejectsOverSizedLogs() logs.Should().BeNull(); } + [Fact] + public void WhenDeDupeEnabled_OnlySendsSingleLog() + { + var collector = new RedactedErrorLogCollector(); + const string message = "This is my message"; + + // Add multiple identical messages + for (var i = 0; i < 5; i++) + { + collector.EnqueueLog(new LogMessageData(message, TelemetryLogLevel.ERROR, DateTimeOffset.UtcNow)); + } + + var logs = collector.GetLogs(); + + logs.Should() + .NotBeNull() + .And.ContainSingle() + .Which.Should() + .ContainSingle() + .Subject.Message.Should() + .Be(message); // TODO: Verify instance count + } + + [Fact] + public void WhenDeDupeEnabled_AndIncludesExceptionFromSameLocation_OnlySendsSingleLog() + { + const string message = "This is my message"; + var collector = new RedactedErrorLogCollector(); + + // Add multiple identical messages with exception from same location + for (var i = 0; i < 5; i++) + { + var ex = GetException1(); + collector.EnqueueLog(new LogMessageData(message, TelemetryLogLevel.ERROR, DateTimeOffset.UtcNow) + { + StackTrace = ExceptionRedactor.Redact(ex) + }); + } + + var logs = collector.GetLogs(); + + logs.Should() + .NotBeNull() + .And.ContainSingle() + .Which.Should() + .ContainSingle() + .Subject.Message.Should() + .Be(message); // TODO: Verify instance count + + Exception GetException1() + { + try + { + throw new Exception(nameof(GetException1)); + } + catch (Exception ex) + { + return ex; + } + } + } + + [Fact] + public void WhenDeDupeEnabled_AndIncludesExceptionFromDifferentLocation_SendsDifferentLog() + { + const string message = "This is my message"; + var collector = new RedactedErrorLogCollector(); + + // Add multiple identical messages with exception from same location + for (var i = 0; i < 5; i++) + { + var ex = GetException1(); + collector.EnqueueLog(new LogMessageData(message, TelemetryLogLevel.ERROR, DateTimeOffset.UtcNow) + { + StackTrace = ExceptionRedactor.Redact(ex) + }); + } + + // Add multiple identical messages with exception from different location + for (var i = 0; i < 5; i++) + { + var ex = GetException2(); + collector.EnqueueLog(new LogMessageData(message, TelemetryLogLevel.ERROR, DateTimeOffset.UtcNow) + { + StackTrace = ExceptionRedactor.Redact(ex) + }); + } + + var batch = collector.GetLogs() + .Should() + .HaveCount(1) + .And.ContainSingle() + .Subject; + + batch.Should().HaveCount(2); + batch[0].Message.Should().Be("This is my message"); + batch[1].Message.Should().Be("This is my message"); + + Exception GetException1() + { + try + { + throw new Exception(nameof(GetException1)); + } + catch (Exception ex) + { + return ex; + } + } + + Exception GetException2() + { + try + { + throw new Exception(nameof(GetException1)); + } + catch (Exception ex) + { + return ex; + } + } + } + + [Fact] + public void WhenDeDupeEnabled_AndSendsBatchInBetween_SendsSingleLogPerBatch() + { + const string message = "This is my message"; + var collector = new RedactedErrorLogCollector(); + + // Add multiple identical messages + for (var i = 0; i < 5; i++) + { + collector.EnqueueLog(new LogMessageData(message, TelemetryLogLevel.ERROR, DateTimeOffset.UtcNow)); + } + + var firstBatch = collector.GetLogs(); + + // Add multiple identical messages + for (var i = 0; i < 5; i++) + { + collector.EnqueueLog(new LogMessageData(message, TelemetryLogLevel.ERROR, DateTimeOffset.UtcNow)); + } + + var secondBatch = collector.GetLogs(); + var thirdBatch = collector.GetLogs(); + + firstBatch.Should() + .NotBeNull() + .And.ContainSingle() + .Which.Should() + .ContainSingle() + .Subject.Message.Should() + .Be(message); + + secondBatch.Should() + .NotBeNull() + .And.ContainSingle() + .Which.Should() + .ContainSingle() + .Subject.Message.Should() + .Be(message); + + thirdBatch.Should().BeNull(); + } + + [Fact] + public void WhenDeDupeEnabled_AndOverfills_GroupsAllIntoSingleBatch() + { + const string message = "This is my message"; + var collector = new RedactedErrorLogCollector(); + var messagesToSend = RedactedErrorLogCollector.MaximumQueueSize * 2; + + // Add too many identical messages + for (var i = 0; i < messagesToSend; i++) + { + collector.EnqueueLog(new LogMessageData(message, TelemetryLogLevel.ERROR, DateTimeOffset.UtcNow)); + } + + var logs = collector.GetLogs(); + collector.GetLogs().Should().BeNull(); + + logs.Should() + .NotBeNull() + .And.ContainSingle() + .Subject.Should() + .ContainSingle() + .Subject.Message.Should() + .Be(message); + } + private static string RandomString(int length) { const string options = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 ,!\"£$%^&*()[]{}<>`¬";