Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[logs] Fix pooling issues when wrapping batch export processor #5255

Merged
19 changes: 16 additions & 3 deletions src/OpenTelemetry/Batch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ public void Dispose()
T item = this.circularBuffer.Read();
if (typeof(T) == typeof(LogRecord))
{
LogRecordSharedPool.Current.Return((LogRecord)(object)item);
var logRecord = (LogRecord)(object)item;
if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool)
{
LogRecordSharedPool.Current.Return(logRecord);
}
}
}
}
Expand Down Expand Up @@ -134,7 +138,11 @@ public struct Enumerator : IEnumerator<T>

if (currentItem != null)
{
LogRecordSharedPool.Current.Return((LogRecord)(object)currentItem);
var logRecord = (LogRecord)(object)currentItem;
if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool)
{
LogRecordSharedPool.Current.Return(logRecord);
}
}

if (circularBuffer!.RemovedCount < enumerator.targetCount)
Expand Down Expand Up @@ -215,7 +223,12 @@ public void Dispose()
var currentItem = this.current;
if (currentItem != null)
{
LogRecordSharedPool.Current.Return((LogRecord)(object)currentItem);
var logRecord = (LogRecord)(object)currentItem;
if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool)
{
LogRecordSharedPool.Current.Return(logRecord);
}

this.current = null;
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
state for cumulative temporality.
[#5230](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5230)

* Fixed an issue causing `LogRecord`s to be incorrectly reused when wrapping an
vishweshbankwar marked this conversation as resolved.
Show resolved Hide resolved
instance of `BatchLogRecordExportProcessor` inside another
`BaseProcessor<LogRecord>`.
[#5255](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5255)

## 1.7.0

Released 2023-Dec-08
Expand Down
24 changes: 19 additions & 5 deletions src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,27 @@ public override void OnEnd(LogRecord data)
// happen here.
Debug.Assert(data != null, "LogRecord was null.");

data!.Buffer();
switch (data!.Source)
{
case LogRecord.LogRecordSource.FromSharedPool:
data.Buffer();
data.AddReference();
if (!this.TryExport(data))
{
LogRecordSharedPool.Current.Return(data);
}

data.AddReference();
break;
case LogRecord.LogRecordSource.CreatedManually:
data.Buffer();
this.TryExport(data);
break;
default:
Debug.Assert(data.Source == LogRecord.LogRecordSource.FromThreadStaticPool, "LogRecord source was something unexpected");

if (!this.TryExport(data))
{
LogRecordSharedPool.Current.Return(data);
// Note: If we are using ThreadStatic pool we make a copy of the record.
this.TryExport(data.Copy());
break;
}
}
}
19 changes: 19 additions & 0 deletions src/OpenTelemetry/Logs/LogRecord.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public sealed class LogRecord
internal IReadOnlyList<KeyValuePair<string, object?>>? AttributeData;
internal List<KeyValuePair<string, object?>>? AttributeStorage;
internal List<object?>? ScopeStorage;
internal LogRecordSource Source = LogRecordSource.CreatedManually;
internal int PoolReferenceCount = int.MaxValue;

private static readonly Action<object?, List<object?>> AddScopeToBufferedList = (object? scope, List<object?> state) =>
Expand Down Expand Up @@ -80,6 +81,24 @@ internal LogRecord(
}
}

internal enum LogRecordSource
{
/// <summary>
/// A <see cref="LogRecord"/> created manually.
/// </summary>
CreatedManually,

/// <summary>
/// A <see cref="LogRecord"/> rented from the <see cref="LogRecordThreadStaticPool"/>.
/// </summary>
FromThreadStaticPool,

/// <summary>
/// A <see cref="LogRecord"/> rented from the <see cref="LogRecordSharedPool"/>.
/// </summary>
FromSharedPool,
}

/// <summary>
/// Gets or sets the log timestamp.
/// </summary>
Expand Down
11 changes: 9 additions & 2 deletions src/OpenTelemetry/Logs/Pool/LogRecordSharedPool.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using OpenTelemetry.Internal;

Expand All @@ -17,7 +18,7 @@ internal sealed class LogRecordSharedPool : ILogRecordPool
private long rentIndex;
private long returnIndex;

public LogRecordSharedPool(int capacity)
private LogRecordSharedPool(int capacity)
{
this.Capacity = capacity;
this.pool = new LogRecord?[capacity];
Expand Down Expand Up @@ -54,18 +55,24 @@ public LogRecord Rent()
continue;
}

Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromSharedPool, "logRecord.Source was not FromSharedPool");
logRecord.ResetReferenceCount();
return logRecord;
}
}

var newLogRecord = new LogRecord();
var newLogRecord = new LogRecord()
{
Source = LogRecord.LogRecordSource.FromSharedPool,
};
newLogRecord.ResetReferenceCount();
return newLogRecord;
}

public void Return(LogRecord logRecord)
{
Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromSharedPool, "logRecord.Source was not FromSharedPool");

if (logRecord.RemoveReference() != 0)
{
return;
Expand Down
14 changes: 12 additions & 2 deletions src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics;

namespace OpenTelemetry.Logs;

internal sealed class LogRecordThreadStaticPool : ILogRecordPool
Expand All @@ -19,15 +21,23 @@ public LogRecord Rent()
var logRecord = Storage;
if (logRecord != null)
{
Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromThreadStaticPool, "logRecord.Source was not FromThreadStaticPool");
Storage = null;
return logRecord;
}
else
{
logRecord = new()
{
Source = LogRecord.LogRecordSource.FromThreadStaticPool,
};
}

return new();
return logRecord;
}

public void Return(LogRecord logRecord)
{
Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromThreadStaticPool, "logRecord.Source was not FromThreadStaticPool");
if (Storage == null)
{
LogRecordPoolHelper.Clear(logRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ public void StateValuesAndScopeBufferingTest()

using var scope = scopeProvider.Push(exportedItems);

var logRecord = new LogRecord();
var pool = LogRecordSharedPool.Current;

var logRecord = pool.Rent();

var state = new LogRecordTest.DisposingState("Hello world");

Expand Down Expand Up @@ -60,6 +62,7 @@ public void StateValuesAndScopeBufferingTest()
processor.Shutdown();

Assert.Single(exportedItems);
Assert.Same(logRecord, exportedItems[0]);
}

[Fact]
Expand All @@ -74,14 +77,19 @@ public void StateBufferingTest()
using var processor = new BatchLogRecordExportProcessor(
new InMemoryExporter<LogRecord>(exportedItems));

var logRecord = new LogRecord();
var pool = LogRecordSharedPool.Current;

var logRecord = pool.Rent();

var state = new LogRecordTest.DisposingState("Hello world");
logRecord.State = state;

processor.OnEnd(logRecord);
processor.Shutdown();

Assert.Single(exportedItems);
Assert.Same(logRecord, exportedItems[0]);

state.Dispose();

Assert.Throws<ObjectDisposedException>(() =>
Expand All @@ -93,5 +101,41 @@ public void StateBufferingTest()
}
});
}

[Fact]
public void CopyMadeWhenLogRecordIsFromThreadStaticPoolTest()
{
List<LogRecord> exportedItems = new();

using var processor = new BatchLogRecordExportProcessor(
new InMemoryExporter<LogRecord>(exportedItems));

var pool = LogRecordThreadStaticPool.Instance;

var logRecord = pool.Rent();

processor.OnEnd(logRecord);
processor.Shutdown();

Assert.Single(exportedItems);
Assert.NotSame(logRecord, exportedItems[0]);
}

[Fact]
public void LogRecordAddedToBatchIfNotFromAnyPoolTest()
{
List<LogRecord> exportedItems = new();

using var processor = new BatchLogRecordExportProcessor(
new InMemoryExporter<LogRecord>(exportedItems));

var logRecord = new LogRecord();

processor.OnEnd(logRecord);
processor.Shutdown();

Assert.Single(exportedItems);
Assert.Same(logRecord, exportedItems[0]);
}
}
#endif
4 changes: 2 additions & 2 deletions test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void RentReturnTests()
Assert.Equal(1, pool.Count);

// Note: This is ignored because logRecord manually created has PoolReferenceCount = int.MaxValue.
LogRecord manualRecord = new();
LogRecord manualRecord = new() { Source = LogRecord.LogRecordSource.FromSharedPool };
CodeBlanch marked this conversation as resolved.
Show resolved Hide resolved
Assert.Equal(int.MaxValue, manualRecord.PoolReferenceCount);
pool.Return(manualRecord);

Expand Down Expand Up @@ -163,7 +163,7 @@ public async Task ExportTest(bool warmup)
{
for (int i = 0; i < LogRecordSharedPool.DefaultMaxPoolSize; i++)
{
pool.Return(new LogRecord { PoolReferenceCount = 1 });
pool.Return(new LogRecord { Source = LogRecord.LogRecordSource.FromSharedPool, PoolReferenceCount = 1 });
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ public void RentReturnTests()
Assert.NotNull(LogRecordThreadStaticPool.Storage);
Assert.Equal(logRecord, LogRecordThreadStaticPool.Storage);

LogRecordThreadStaticPool.Instance.Return(new());
LogRecordThreadStaticPool.Instance.Return(new() { Source = LogRecord.LogRecordSource.FromThreadStaticPool });
Assert.NotNull(LogRecordThreadStaticPool.Storage);
Assert.Equal(logRecord, LogRecordThreadStaticPool.Storage);

LogRecordThreadStaticPool.Storage = null;

var manual = new LogRecord();
var manual = new LogRecord() { Source = LogRecord.LogRecordSource.FromThreadStaticPool };
LogRecordThreadStaticPool.Instance.Return(manual);
Assert.NotNull(LogRecordThreadStaticPool.Storage);
Assert.Equal(manual, LogRecordThreadStaticPool.Storage);
Expand Down
Loading