Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add de-duplication of telemetry logs #4835

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ namespace Datadog.Trace.Logging.DirectSubmission.Formatting
internal static class EventIdHash
{
/// <summary>
/// Compute a 32-bit hash of the provided <paramref name="messageTemplate"/>. The
/// Compute a 32-bit hash of the provided <paramref name="messageTemplate"/>.
/// and optionally a <paramref name="stackTrace"/>. The
/// resulting hash value can be uses as an event id in lieu of transmitting the
/// full template string.
/// </summary>
/// <param name="messageTemplate">A message template.</param>
/// <param name="stackTrace">An optional stack trace to consider in the calculation.</param>
/// <returns>A 32-bit hash of the template.</returns>
public static uint Compute(string messageTemplate)
public static uint Compute(string messageTemplate, string? stackTrace = null)
{
if (messageTemplate == null)
{
Expand All @@ -54,6 +56,16 @@ public static uint Compute(string messageTemplate)
hash ^= (hash >> 6);
}

if (stackTrace is not null)
{
for (var i = 0; i < stackTrace.Length; ++i)
{
hash += stackTrace[i];
hash += (hash << 10);
hash ^= (hash >> 6);
}
}

hash += (hash << 3);
hash ^= (hash >> 11);
hash += (hash << 15);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using Datadog.Trace.ClrProfiler;
using Datadog.Trace.Telemetry;
using Datadog.Trace.Telemetry.Collectors;
using Datadog.Trace.Telemetry.DTOs;
using Datadog.Trace.Vendors.Serilog.Core;
using Datadog.Trace.Vendors.Serilog.Events;

Expand All @@ -33,12 +32,8 @@ public void Emit(LogEvent? logEvent)
}

// Note: we're using the raw message template here to remove any chance of including customer information
var telemetryLog = new LogMessageData(logEvent.MessageTemplate.Text, ToLogLevel(logEvent.Level), logEvent.Timestamp)
{
StackTrace = logEvent.Exception is { } ex ? ExceptionRedactor.Redact(ex) : null
};

_collector.EnqueueLog(telemetryLog);
var stackTrace = logEvent.Exception is { } ex ? ExceptionRedactor.Redact(ex) : null;
_collector.EnqueueLog(logEvent.MessageTemplate.Text, ToLogLevel(logEvent.Level), logEvent.Timestamp, stackTrace);
}

private static TelemetryLogLevel ToLogLevel(LogEventLevel logEventLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@

#nullable enable

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Datadog.Trace.Logging.DirectSubmission.Formatting;
using Datadog.Trace.Telemetry.DTOs;
using Datadog.Trace.Util;

Expand All @@ -26,15 +29,28 @@ internal class RedactedErrorLogCollector
private TaskCompletionSource<bool> _tcs = new(TaskOptions);
private bool _isEnabled = true;

private ConcurrentDictionary<uint, int> _logCounts = new();
private ConcurrentDictionary<uint, int> _logCountsReserve = new();

public List<List<LogMessageData>>? GetLogs()
{
// This method should only be called in a single-threaded loop
List<List<LogMessageData>>? batches = null;
var batchSize = 0;
List<LogMessageData>? logs = null;
var logCounts = Interlocked.Exchange(ref _logCounts, _logCountsReserve)!;

while (_queue.TryDequeue(out var log))
{
var logSize = log.GetApproximateSerializationSize();
// modify the message to add the final log count
var eventId = EventIdHash.Compute(log.Message, log.StackTrace);
if (logCounts.TryGetValue(eventId, out var count) && count > 1)
{
// TODO: Add the count to the message (not yet supported in Telemetry)
// log.InstanceCount = count - 1;
}

if (batchSize + logSize < MaximumBatchSizeBytes)
{
batchSize += log.GetApproximateSerializationSize();
Expand Down Expand Up @@ -62,14 +78,30 @@ internal class RedactedErrorLogCollector
batches.Add(logs);
}

logCounts.Clear();
_logCountsReserve = logCounts;

return batches;
}

public void EnqueueLog(LogMessageData log)
// For testing
internal void EnqueueLog(LogMessageData log)
=> EnqueueLog(log.Message, log.Level, DateTimeOffset.FromUnixTimeSeconds(log.TracerTime), log.StackTrace);

public void EnqueueLog(string message, TelemetryLogLevel logLevel, DateTimeOffset timestamp, string? stackTrace)
{
if (Volatile.Read(ref _isEnabled))
{
_queue.TryEnqueue(log);
var eventId = EventIdHash.Compute(message, stackTrace);
var logCounts = _logCounts;
var newCount = logCounts.AddOrUpdate(eventId, addValue: 1, updateValueFactory: static (_, prev) => prev + 1);
if (newCount != 1)
{
// already have this log, so don't enqueue it again
return;
}

_queue.TryEnqueue(new(message, logLevel, timestamp) { StackTrace = stackTrace });

if (_queue.Count > QueueSizeTrigger)
{
Expand Down
Loading
Loading