Skip to content

Commit

Permalink
Fix concurrency bug in caching transport (#2026)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattjohnsonpint authored Oct 31, 2022
1 parent ff194db commit 0a13531
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 11 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

### Fixes

- Fix concurrency bug in caching transport ([#2026](https://github.com/getsentry/sentry-dotnet/pull/2026))

## 3.23.0

### Features
Expand Down
35 changes: 25 additions & 10 deletions src/Sentry/Internal/Http/CachingTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ internal class CachingTransport : ITransport, IAsyncDisposable, IDisposable
// Pre-released because the directory might already have files from previous sessions.
private readonly Signal _workerSignal = new(true);

// Signal that ensures only one thread can process items from the cache at a time
private readonly Signal _processingSignal = new(true);

// Lock to synchronize file system operations inside the cache directory.
// It's required because there are multiple threads that may attempt to both read
// and write from/to the cache directory.
Expand Down Expand Up @@ -137,7 +140,7 @@ private async Task CachedTransportBackgroundTaskAsync()
try
{
await _workerSignal.WaitAsync(_workerCts.Token).ConfigureAwait(false);
_options.LogDebug("Worker signal triggered: flushing cached envelopes.");
_options.LogDebug("CachingTransport worker signal triggered.");
await ProcessCacheAsync(_workerCts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException) when (_workerCts.IsCancellationRequested)
Expand Down Expand Up @@ -254,17 +257,29 @@ private IEnumerable<string> GetCacheFilePaths() =>

private async Task ProcessCacheAsync(CancellationToken cancellation)
{
// Signal that we can start waiting for _options.InitCacheFlushTimeout
_preInitCacheResetEvent?.Set();
try
{
// Make sure we're the only thread processing items
await _processingSignal.WaitAsync(cancellation).ConfigureAwait(false);

// Process the cache
while (await TryPrepareNextCacheFileAsync(cancellation).ConfigureAwait(false) is { } file)
// Signal that we can start waiting for _options.InitCacheFlushTimeout
_preInitCacheResetEvent?.Set();

// Process the cache
_options.LogDebug("Flushing cached envelopes.");
while (await TryPrepareNextCacheFileAsync(cancellation).ConfigureAwait(false) is { } file)
{
await InnerProcessCacheAsync(file, cancellation).ConfigureAwait(false);
}

// Signal that we can continue with initialization, if we're using _options.InitCacheFlushTimeout
_initCacheResetEvent?.Set();
}
finally
{
await InnerProcessCacheAsync(file, cancellation).ConfigureAwait(false);
// Release the signal so the next pass or another thread can enter
_processingSignal.Release();
}

// Signal that we can continue with initialization, if we're using _options.InitCacheFlushTimeout
_initCacheResetEvent?.Set();
}

private async Task InnerProcessCacheAsync(string file, CancellationToken cancellation)
Expand Down Expand Up @@ -465,7 +480,7 @@ public Task StopWorkerAsync()

public Task FlushAsync(CancellationToken cancellationToken = default)
{
_options.LogDebug("External FlushAsync invocation: flushing cached envelopes.");
_options.LogDebug("CachingTransport received request to flush the cache.");
return ProcessCacheAsync(cancellationToken);
}

Expand Down
36 changes: 36 additions & 0 deletions test/Sentry.Tests/HubTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1147,4 +1147,40 @@ public void CaptureEvent_MessageOnlyEvent_SpanLinkedToEventContext(SentryLevel l
Assert.False(child.IsFinished);
Assert.Null(child.Status);
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task FlushOnDispose_SendsEnvelope(bool cachingEnabled)
{
// Arrange
var fileSystem = new FakeFileSystem();
using var cacheDirectory = new TempDirectory(fileSystem);
var transport = Substitute.For<ITransport>();

var options = new SentryOptions
{
Dsn = ValidDsn,
Transport = transport
};

if (cachingEnabled)
{
options.CacheDirectoryPath = cacheDirectory.Path;
options.FileSystem = fileSystem;
}

// Act
// Disposing the hub should flush the client and send the envelope.
// If caching is enabled, it should flush the cache as well.
// Either way, the envelope should be sent.
using (var hub = new Hub(options))
{
hub.CaptureEvent(new SentryEvent());
}

// Assert
await transport.Received(1)
.SendEnvelopeAsync(Arg.Any<Envelope>(), Arg.Any<CancellationToken>());
}
}
2 changes: 1 addition & 1 deletion test/Sentry.Tests/Internals/BackgroundWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,6 @@ public async Task FlushAsync_Calls_CachingTransport_FlushAsync()

// Assert
_fixture.Logger.Received(1)
.Log(SentryLevel.Debug, "External FlushAsync invocation: flushing cached envelopes.");
.Log(SentryLevel.Debug, "CachingTransport received request to flush the cache.");
}
}

0 comments on commit 0a13531

Please sign in to comment.