Skip to content

Commit

Permalink
Add de-duplication of telemetry logs
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewlock committed Nov 13, 2023
1 parent 0072c39 commit 7f0f0cc
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ namespace Datadog.Trace.Logging.DirectSubmission.Formatting
internal static class EventIdHash
{
/// <summary>
/// Compute a 32-bit hash of the provided <paramref name="messageTemplate"/>. The
/// Compute a 32-bit hash of the provided <paramref name="messageTemplate"/>.
/// and optionally a <paramref name="stackTrace"/>. The
/// resulting hash value can be uses as an event id in lieu of transmitting the
/// full template string.
/// </summary>
/// <param name="messageTemplate">A message template.</param>
/// <param name="stackTrace">An optional stack trace to consider in the calculation.</param>
/// <returns>A 32-bit hash of the template.</returns>
public static uint Compute(string messageTemplate)
public static uint Compute(string messageTemplate, string? stackTrace = null)
{
if (messageTemplate == null)
{
Expand All @@ -54,6 +56,16 @@ public static uint Compute(string messageTemplate)
hash ^= (hash >> 6);
}

if (stackTrace is not null)
{
for (var i = 0; i < stackTrace.Length; ++i)
{
hash += stackTrace[i];
hash += (hash << 10);
hash ^= (hash >> 6);
}
}

hash += (hash << 3);
hash ^= (hash >> 11);
hash += (hash << 15);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using Datadog.Trace.ClrProfiler;
using Datadog.Trace.Telemetry;
using Datadog.Trace.Telemetry.Collectors;
using Datadog.Trace.Telemetry.DTOs;
using Datadog.Trace.Vendors.Serilog.Core;
using Datadog.Trace.Vendors.Serilog.Events;

Expand All @@ -33,12 +32,8 @@ public void Emit(LogEvent? logEvent)
}

// Note: we're using the raw message template here to remove any chance of including customer information
var telemetryLog = new LogMessageData(logEvent.MessageTemplate.Text, ToLogLevel(logEvent.Level), logEvent.Timestamp)
{
StackTrace = logEvent.Exception is { } ex ? ExceptionRedactor.Redact(ex) : null
};

_collector.EnqueueLog(telemetryLog);
var stackTrace = logEvent.Exception is { } ex ? ExceptionRedactor.Redact(ex) : null;
_collector.EnqueueLog(logEvent.MessageTemplate.Text, ToLogLevel(logEvent.Level), logEvent.Timestamp, stackTrace);
}

private static TelemetryLogLevel ToLogLevel(LogEventLevel logEventLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@

#nullable enable

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Datadog.Trace.Logging.DirectSubmission.Formatting;
using Datadog.Trace.Telemetry.DTOs;
using Datadog.Trace.Util;

Expand All @@ -26,15 +29,28 @@ internal class RedactedErrorLogCollector
private TaskCompletionSource<bool> _tcs = new(TaskOptions);
private bool _isEnabled = true;

private ConcurrentDictionary<uint, int> _logCounts = new();
private ConcurrentDictionary<uint, int> _logCountsReserve = new();

public List<List<LogMessageData>>? GetLogs()
{
// This method should only be called in a single-threaded loop
List<List<LogMessageData>>? batches = null;
var batchSize = 0;
List<LogMessageData>? logs = null;
var logCounts = Interlocked.Exchange(ref _logCounts, _logCountsReserve)!;

while (_queue.TryDequeue(out var log))
{
var logSize = log.GetApproximateSerializationSize();
// modify the message to add the final log count
var eventId = EventIdHash.Compute(log.Message, log.StackTrace);
if (logCounts.TryGetValue(eventId, out var count) && count > 1)
{
// TODO: Add the count to the message (not yet supported in Telemetry)
// log.InstanceCount = count - 1;
}

if (batchSize + logSize < MaximumBatchSizeBytes)
{
batchSize += log.GetApproximateSerializationSize();
Expand Down Expand Up @@ -62,14 +78,30 @@ internal class RedactedErrorLogCollector
batches.Add(logs);
}

logCounts.Clear();
_logCountsReserve = logCounts;

return batches;
}

public void EnqueueLog(LogMessageData log)
// For testing
internal void EnqueueLog(LogMessageData log)
=> EnqueueLog(log.Message, log.Level, DateTimeOffset.FromUnixTimeSeconds(log.TracerTime), log.StackTrace);

public void EnqueueLog(string message, TelemetryLogLevel logLevel, DateTimeOffset timestamp, string? stackTrace)
{
if (Volatile.Read(ref _isEnabled))
{
_queue.TryEnqueue(log);
var eventId = EventIdHash.Compute(message, stackTrace);
var logCounts = _logCounts;
var newCount = logCounts.AddOrUpdate(eventId, addValue: 1, updateValueFactory: static (_, prev) => prev + 1);
if (newCount != 1)
{
// already have this log, so don't enqueue it again
return;
}

_queue.TryEnqueue(new(message, logLevel, timestamp) { StackTrace = stackTrace });

if (_queue.Count > QueueSizeTrigger)
{
Expand Down
Loading

0 comments on commit 7f0f0cc

Please sign in to comment.