diff --git a/LiteDB/Engine/Disk/DiskWriterQueue.cs b/LiteDB/Engine/Disk/DiskWriterQueue.cs index 88f0c1653..2b2e67041 100644 --- a/LiteDB/Engine/Disk/DiskWriterQueue.cs +++ b/LiteDB/Engine/Disk/DiskWriterQueue.cs @@ -21,13 +21,12 @@ internal class DiskWriterQueue : IDisposable private readonly ConcurrentQueue _queue = new ConcurrentQueue(); private readonly object _queueSync = new object(); - private readonly ManualResetEventSlim _queueHasItems = new ManualResetEventSlim(false); + private readonly AsyncManualResetEvent _queueHasItems = new AsyncManualResetEvent(); private readonly ManualResetEventSlim _queueIsEmpty = new ManualResetEventSlim(true); public DiskWriterQueue(Stream stream) { _stream = stream; - _task = Task.Run(ExecuteQueue); } /// @@ -47,6 +46,11 @@ public void EnqueuePage(PageBuffer page) _queueIsEmpty.Reset(); _queue.Enqueue(page); _queueHasItems.Set(); + + if (_task == null) + { + _task = Task.Factory.StartNew(ExecuteQueue, TaskCreationOptions.LongRunning); + } } } @@ -62,7 +66,7 @@ public void Wait() /// /// Execute all items in queue sync /// - private void ExecuteQueue() + private async Task ExecuteQueue() { while (true) { @@ -72,15 +76,16 @@ private void ExecuteQueue() } else { - _stream.FlushToDisk(); lock (_queueSync) { if (_queue.Count > 0) continue; _queueIsEmpty.Set(); + _queueHasItems.Reset(); + if (_shouldClose) return; } + _stream.FlushToDisk(); - _queueHasItems.Wait(); - if (_shouldClose) return; + await _queueHasItems.WaitAsync(); } } } @@ -105,6 +110,7 @@ public void Dispose() LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK"); _shouldClose = true; + _queueHasItems.Set(); // unblock the running loop in case there are no items // run all items in queue before dispose this.Wait(); diff --git a/LiteDB/Utils/AsyncManualResetEvent.cs b/LiteDB/Utils/AsyncManualResetEvent.cs new file mode 100644 index 000000000..0cbaf3421 --- /dev/null +++ b/LiteDB/Utils/AsyncManualResetEvent.cs @@ -0,0 +1,35 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace LiteDB +{ + /// + /// Async implementation of ManualResetEvent + /// https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-1-asyncmanualresetevent/ + /// + internal class AsyncManualResetEvent + { + private volatile TaskCompletionSource _tcs = new TaskCompletionSource(); + + public Task WaitAsync() + { + return _tcs.Task; + } + + public void Set() + { + _tcs.TrySetResult(true); + } + + public void Reset() + { + while (true) + { + var tcs = _tcs; + if (!tcs.Task.IsCompleted || + Interlocked.CompareExchange(ref _tcs, new TaskCompletionSource(), tcs) == tcs) + return; + } + } + } +} \ No newline at end of file