From 0a13531faa7703e4fe92ec3310cd7c5674de1bd4 Mon Sep 17 00:00:00 2001 From: Matt Johnson-Pint Date: Mon, 31 Oct 2022 07:31:12 -0700 Subject: [PATCH] Fix concurrency bug in caching transport (#2026) --- CHANGELOG.md | 6 ++++ src/Sentry/Internal/Http/CachingTransport.cs | 35 ++++++++++++------ test/Sentry.Tests/HubTests.cs | 36 +++++++++++++++++++ .../Internals/BackgroundWorkerTests.cs | 2 +- 4 files changed, 68 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b021ebc36..2d4b38eddb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Unreleased + +### Fixes + +- Fix concurrency bug in caching transport ([#2026](https://github.com/getsentry/sentry-dotnet/pull/2026)) + ## 3.23.0 ### Features diff --git a/src/Sentry/Internal/Http/CachingTransport.cs b/src/Sentry/Internal/Http/CachingTransport.cs index 18995e8c6a..2e817b666e 100644 --- a/src/Sentry/Internal/Http/CachingTransport.cs +++ b/src/Sentry/Internal/Http/CachingTransport.cs @@ -25,6 +25,9 @@ internal class CachingTransport : ITransport, IAsyncDisposable, IDisposable // Pre-released because the directory might already have files from previous sessions. private readonly Signal _workerSignal = new(true); + // Signal that ensures only one thread can process items from the cache at a time + private readonly Signal _processingSignal = new(true); + // Lock to synchronize file system operations inside the cache directory. // It's required because there are multiple threads that may attempt to both read // and write from/to the cache directory. @@ -137,7 +140,7 @@ private async Task CachedTransportBackgroundTaskAsync() try { await _workerSignal.WaitAsync(_workerCts.Token).ConfigureAwait(false); - _options.LogDebug("Worker signal triggered: flushing cached envelopes."); + _options.LogDebug("CachingTransport worker signal triggered."); await ProcessCacheAsync(_workerCts.Token).ConfigureAwait(false); } catch (OperationCanceledException) when (_workerCts.IsCancellationRequested) @@ -254,17 +257,29 @@ private IEnumerable GetCacheFilePaths() => private async Task ProcessCacheAsync(CancellationToken cancellation) { - // Signal that we can start waiting for _options.InitCacheFlushTimeout - _preInitCacheResetEvent?.Set(); + try + { + // Make sure we're the only thread processing items + await _processingSignal.WaitAsync(cancellation).ConfigureAwait(false); - // Process the cache - while (await TryPrepareNextCacheFileAsync(cancellation).ConfigureAwait(false) is { } file) + // Signal that we can start waiting for _options.InitCacheFlushTimeout + _preInitCacheResetEvent?.Set(); + + // Process the cache + _options.LogDebug("Flushing cached envelopes."); + while (await TryPrepareNextCacheFileAsync(cancellation).ConfigureAwait(false) is { } file) + { + await InnerProcessCacheAsync(file, cancellation).ConfigureAwait(false); + } + + // Signal that we can continue with initialization, if we're using _options.InitCacheFlushTimeout + _initCacheResetEvent?.Set(); + } + finally { - await InnerProcessCacheAsync(file, cancellation).ConfigureAwait(false); + // Release the signal so the next pass or another thread can enter + _processingSignal.Release(); } - - // Signal that we can continue with initialization, if we're using _options.InitCacheFlushTimeout - _initCacheResetEvent?.Set(); } private async Task InnerProcessCacheAsync(string file, CancellationToken cancellation) @@ -465,7 +480,7 @@ public Task StopWorkerAsync() public Task FlushAsync(CancellationToken cancellationToken = default) { - _options.LogDebug("External FlushAsync invocation: flushing cached envelopes."); + _options.LogDebug("CachingTransport received request to flush the cache."); return ProcessCacheAsync(cancellationToken); } diff --git a/test/Sentry.Tests/HubTests.cs b/test/Sentry.Tests/HubTests.cs index 881dfd406a..6a4413af11 100644 --- a/test/Sentry.Tests/HubTests.cs +++ b/test/Sentry.Tests/HubTests.cs @@ -1147,4 +1147,40 @@ public void CaptureEvent_MessageOnlyEvent_SpanLinkedToEventContext(SentryLevel l Assert.False(child.IsFinished); Assert.Null(child.Status); } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task FlushOnDispose_SendsEnvelope(bool cachingEnabled) + { + // Arrange + var fileSystem = new FakeFileSystem(); + using var cacheDirectory = new TempDirectory(fileSystem); + var transport = Substitute.For(); + + var options = new SentryOptions + { + Dsn = ValidDsn, + Transport = transport + }; + + if (cachingEnabled) + { + options.CacheDirectoryPath = cacheDirectory.Path; + options.FileSystem = fileSystem; + } + + // Act + // Disposing the hub should flush the client and send the envelope. + // If caching is enabled, it should flush the cache as well. + // Either way, the envelope should be sent. + using (var hub = new Hub(options)) + { + hub.CaptureEvent(new SentryEvent()); + } + + // Assert + await transport.Received(1) + .SendEnvelopeAsync(Arg.Any(), Arg.Any()); + } } diff --git a/test/Sentry.Tests/Internals/BackgroundWorkerTests.cs b/test/Sentry.Tests/Internals/BackgroundWorkerTests.cs index 1c571823dc..ea1fee7a18 100644 --- a/test/Sentry.Tests/Internals/BackgroundWorkerTests.cs +++ b/test/Sentry.Tests/Internals/BackgroundWorkerTests.cs @@ -488,6 +488,6 @@ public async Task FlushAsync_Calls_CachingTransport_FlushAsync() // Assert _fixture.Logger.Received(1) - .Log(SentryLevel.Debug, "External FlushAsync invocation: flushing cached envelopes."); + .Log(SentryLevel.Debug, "CachingTransport received request to flush the cache."); } }