From 62f944a2bcbeed2cc291b7582737d8da5444019c Mon Sep 17 00:00:00 2001 From: Lubomir Tetak Date: Thu, 25 Jan 2024 09:07:58 +0100 Subject: [PATCH 1/5] Fix all kind of typos --- LiteDB/Engine/Engine/Upgrade.cs | 4 ++-- LiteDB/Engine/Pages/HeaderPage.cs | 2 +- LiteDB/Engine/Services/TransactionMonitor.cs | 2 +- LiteDB/Engine/Services/TransactionService.cs | 4 ++-- LiteDB/Utils/FileHelper.cs | 6 +++--- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/LiteDB/Engine/Engine/Upgrade.cs b/LiteDB/Engine/Engine/Upgrade.cs index e19f17e68..4ec8a01aa 100644 --- a/LiteDB/Engine/Engine/Upgrade.cs +++ b/LiteDB/Engine/Engine/Upgrade.cs @@ -25,9 +25,9 @@ public static bool Upgrade(string filename, string password = null, Collation co Collation = collation }; - var backup = FileHelper.GetSufixFile(filename, "-backup", true); + var backup = FileHelper.GetSuffixFile(filename, "-backup", true); - settings.Filename = FileHelper.GetSufixFile(filename, "-temp", true); + settings.Filename = FileHelper.GetSuffixFile(filename, "-temp", true); var buffer = new byte[PAGE_SIZE * 2]; IFileReader reader; diff --git a/LiteDB/Engine/Pages/HeaderPage.cs b/LiteDB/Engine/Pages/HeaderPage.cs index 5addbbc1e..40efc95bd 100644 --- a/LiteDB/Engine/Pages/HeaderPage.cs +++ b/LiteDB/Engine/Pages/HeaderPage.cs @@ -11,7 +11,7 @@ namespace LiteDB.Engine { /// /// Header page represent first page on datafile. Engine contains a single instance of HeaderPage and all changes - /// must be syncornized (using lock). + /// must be synchronized (using lock). /// internal class HeaderPage : BasePage { diff --git a/LiteDB/Engine/Services/TransactionMonitor.cs b/LiteDB/Engine/Services/TransactionMonitor.cs index 97a850701..ebfee2ce3 100644 --- a/LiteDB/Engine/Services/TransactionMonitor.cs +++ b/LiteDB/Engine/Services/TransactionMonitor.cs @@ -171,7 +171,7 @@ private int GetInitialSize() { var sum = 0; - // if there is no avaiable pages, reduce all open transactions + // if there is no available pages, reduce all open transactions foreach (var trans in _transactions.Values) { //TODO: revisar estas contas, o reduce tem que fechar 1000 diff --git a/LiteDB/Engine/Services/TransactionService.cs b/LiteDB/Engine/Services/TransactionService.cs index 0a03e5837..7256254c5 100644 --- a/LiteDB/Engine/Services/TransactionService.cs +++ b/LiteDB/Engine/Services/TransactionService.cs @@ -266,7 +266,7 @@ public void Commit() } } - // dispose all snapshosts + // dispose all snapshots foreach (var snapshot in _snapshots.Values) { snapshot.Dispose(); @@ -291,7 +291,7 @@ public void Rollback() this.ReturnNewPages(); } - // dispose all snaphosts + // dispose all snapshots foreach (var snapshot in _snapshots.Values) { // but first, if writable, discard changes diff --git a/LiteDB/Utils/FileHelper.cs b/LiteDB/Utils/FileHelper.cs index 81bc3c72e..8ce72dd17 100644 --- a/LiteDB/Utils/FileHelper.cs +++ b/LiteDB/Utils/FileHelper.cs @@ -16,7 +16,7 @@ internal static class FileHelper /// /// Create a temp filename based on original filename - checks if file exists (if exists, append counter number) /// - public static string GetSufixFile(string filename, string suffix = "-temp", bool checkIfExists = true) + public static string GetSuffixFile(string filename, string suffix = "-temp", bool checkIfExists = true) { var count = 0; var temp = Path.Combine(Path.GetDirectoryName(filename), @@ -37,12 +37,12 @@ public static string GetSufixFile(string filename, string suffix = "-temp", bool /// /// Get LOG file based on data file /// - public static string GetLogFile(string filename) => GetSufixFile(filename, "-log", false); + public static string GetLogFile(string filename) => GetSuffixFile(filename, "-log", false); /// /// Get TEMP file based on data file /// - public static string GetTempFile(string filename) => GetSufixFile(filename, "-tmp", false); + public static string GetTempFile(string filename) => GetSuffixFile(filename, "-tmp", false); /// /// Test if file are used by any process From 33f85d53048c8a746df79cbfc999aedfa85464c3 Mon Sep 17 00:00:00 2001 From: Lubomir Tetak Date: Thu, 25 Jan 2024 11:19:11 +0100 Subject: [PATCH 2/5] Simplified DiskWriterQueue with blocking concurrency --- LiteDB/Engine/Disk/DiskService.cs | 2 - LiteDB/Engine/Disk/DiskWriterQueue.cs | 73 ++++++++------------------- 2 files changed, 22 insertions(+), 53 deletions(-) diff --git a/LiteDB/Engine/Disk/DiskService.cs b/LiteDB/Engine/Disk/DiskService.cs index ad2bc8344..de6d0b503 100644 --- a/LiteDB/Engine/Disk/DiskService.cs +++ b/LiteDB/Engine/Disk/DiskService.cs @@ -186,8 +186,6 @@ public int WriteAsync(IEnumerable pages) count++; } - _queue.Value.Run(); - return count; } diff --git a/LiteDB/Engine/Disk/DiskWriterQueue.cs b/LiteDB/Engine/Disk/DiskWriterQueue.cs index 0592cc6b9..88f0c1653 100644 --- a/LiteDB/Engine/Disk/DiskWriterQueue.cs +++ b/LiteDB/Engine/Disk/DiskWriterQueue.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Concurrent; using System.IO; -using System.Linq; using System.Threading; using System.Threading.Tasks; using static LiteDB.Constants; @@ -18,14 +17,17 @@ internal class DiskWriterQueue : IDisposable // async thread controls private Task _task; + private bool _shouldClose = false; private readonly ConcurrentQueue _queue = new ConcurrentQueue(); - - private int _running = 0; + private readonly object _queueSync = new object(); + private readonly ManualResetEventSlim _queueHasItems = new ManualResetEventSlim(false); + private readonly ManualResetEventSlim _queueIsEmpty = new ManualResetEventSlim(true); public DiskWriterQueue(Stream stream) { _stream = stream; + _task = Task.Run(ExecuteQueue); } /// @@ -40,27 +42,11 @@ public DiskWriterQueue(Stream stream) public void EnqueuePage(PageBuffer page) { ENSURE(page.Origin == FileOrigin.Log, "async writer must use only for Log file"); - - _queue.Enqueue(page); - } - - /// - /// If queue contains pages and are not running, starts run queue again now - /// - public void Run() - { - lock (_queue) + lock (_queueSync) { - if (_queue.Count == 0) return; - - var oldValue = Interlocked.CompareExchange(ref _running, 1, 0); - - if (oldValue == 0) - { - // Schedule a new thread to process the pages in the queue. - // https://blog.stephencleary.com/2013/08/startnew-is-dangerous.html - _task = Task.Run(ExecuteQueue); - } + _queueIsEmpty.Reset(); + _queue.Enqueue(page); + _queueHasItems.Set(); } } @@ -69,16 +55,7 @@ public void Run() /// public void Wait() { - lock (_queue) - { - if (_task != null) - { - _task.Wait(); - } - - Run(); - } - + _queueIsEmpty.Wait(); ENSURE(_queue.Count == 0, "queue should be empty after wait() call"); } @@ -87,35 +64,25 @@ public void Wait() /// private void ExecuteQueue() { - do + while (true) { if (_queue.TryDequeue(out var page)) { WritePageToStream(page); } - - while (page == null) + else { _stream.FlushToDisk(); - Volatile.Write(ref _running, 0); - - if (!_queue.Any()) return; - - // Another item was added to the queue after we detected it was empty. - var oldValue = Interlocked.CompareExchange(ref _running, 1, 0); - - if (oldValue == 1) + lock (_queueSync) { - // A new thread was already scheduled for execution, this thread can return. - return; + if (_queue.Count > 0) continue; + _queueIsEmpty.Set(); } - // This thread will continue to process the queue as a new thread was not scheduled. - _queue.TryDequeue(out page); - WritePageToStream(page); + _queueHasItems.Wait(); + if (_shouldClose) return; } - - } while (true); + } } private void WritePageToStream(PageBuffer page) @@ -137,8 +104,12 @@ public void Dispose() { LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK"); + _shouldClose = true; + // run all items in queue before dispose this.Wait(); + _task?.Wait(); + _task = null; } } } \ No newline at end of file From f21cd8405eda7c7b59b018c9a541b0c3605eb5da Mon Sep 17 00:00:00 2001 From: Lubomir Tetak Date: Fri, 26 Jan 2024 10:06:08 +0100 Subject: [PATCH 3/5] Async DiskWriterQueue implementation --- LiteDB/Engine/Disk/DiskWriterQueue.cs | 18 +++++++++----- LiteDB/Utils/AsyncManualResetEvent.cs | 35 +++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 6 deletions(-) create mode 100644 LiteDB/Utils/AsyncManualResetEvent.cs diff --git a/LiteDB/Engine/Disk/DiskWriterQueue.cs b/LiteDB/Engine/Disk/DiskWriterQueue.cs index 88f0c1653..2b2e67041 100644 --- a/LiteDB/Engine/Disk/DiskWriterQueue.cs +++ b/LiteDB/Engine/Disk/DiskWriterQueue.cs @@ -21,13 +21,12 @@ internal class DiskWriterQueue : IDisposable private readonly ConcurrentQueue _queue = new ConcurrentQueue(); private readonly object _queueSync = new object(); - private readonly ManualResetEventSlim _queueHasItems = new ManualResetEventSlim(false); + private readonly AsyncManualResetEvent _queueHasItems = new AsyncManualResetEvent(); private readonly ManualResetEventSlim _queueIsEmpty = new ManualResetEventSlim(true); public DiskWriterQueue(Stream stream) { _stream = stream; - _task = Task.Run(ExecuteQueue); } /// @@ -47,6 +46,11 @@ public void EnqueuePage(PageBuffer page) _queueIsEmpty.Reset(); _queue.Enqueue(page); _queueHasItems.Set(); + + if (_task == null) + { + _task = Task.Factory.StartNew(ExecuteQueue, TaskCreationOptions.LongRunning); + } } } @@ -62,7 +66,7 @@ public void Wait() /// /// Execute all items in queue sync /// - private void ExecuteQueue() + private async Task ExecuteQueue() { while (true) { @@ -72,15 +76,16 @@ private void ExecuteQueue() } else { - _stream.FlushToDisk(); lock (_queueSync) { if (_queue.Count > 0) continue; _queueIsEmpty.Set(); + _queueHasItems.Reset(); + if (_shouldClose) return; } + _stream.FlushToDisk(); - _queueHasItems.Wait(); - if (_shouldClose) return; + await _queueHasItems.WaitAsync(); } } } @@ -105,6 +110,7 @@ public void Dispose() LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK"); _shouldClose = true; + _queueHasItems.Set(); // unblock the running loop in case there are no items // run all items in queue before dispose this.Wait(); diff --git a/LiteDB/Utils/AsyncManualResetEvent.cs b/LiteDB/Utils/AsyncManualResetEvent.cs new file mode 100644 index 000000000..0cbaf3421 --- /dev/null +++ b/LiteDB/Utils/AsyncManualResetEvent.cs @@ -0,0 +1,35 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace LiteDB +{ + /// + /// Async implementation of ManualResetEvent + /// https://devblogs.microsoft.com/pfxteam/building-async-coordination-primitives-part-1-asyncmanualresetevent/ + /// + internal class AsyncManualResetEvent + { + private volatile TaskCompletionSource _tcs = new TaskCompletionSource(); + + public Task WaitAsync() + { + return _tcs.Task; + } + + public void Set() + { + _tcs.TrySetResult(true); + } + + public void Reset() + { + while (true) + { + var tcs = _tcs; + if (!tcs.Task.IsCompleted || + Interlocked.CompareExchange(ref _tcs, new TaskCompletionSource(), tcs) == tcs) + return; + } + } + } +} \ No newline at end of file From fab1a407a1764be7c2675319d3ece97dbaee222c Mon Sep 17 00:00:00 2001 From: Lubomir Tetak Date: Thu, 25 Jan 2024 11:19:11 +0100 Subject: [PATCH 4/5] DiskWriterQueue resiliency --- LiteDB/Engine/Disk/DiskWriterQueue.cs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/LiteDB/Engine/Disk/DiskWriterQueue.cs b/LiteDB/Engine/Disk/DiskWriterQueue.cs index 2b2e67041..3f7826ddd 100644 --- a/LiteDB/Engine/Disk/DiskWriterQueue.cs +++ b/LiteDB/Engine/Disk/DiskWriterQueue.cs @@ -83,13 +83,25 @@ private async Task ExecuteQueue() _queueHasItems.Reset(); if (_shouldClose) return; } - _stream.FlushToDisk(); + TryFlushStream(); await _queueHasItems.WaitAsync(); } } } + private void TryFlushStream() + { + try + { + _stream.FlushToDisk(); + } + catch (IOException) + { + // Disk is probably full. This may be unrecoverable problem but until we have enough space in the buffer we may be ok. + } + } + private void WritePageToStream(PageBuffer page) { if (page == null) return; From 289e9b1a5c0c4506668e8edb587ac9eef5e1016b Mon Sep 17 00:00:00 2001 From: anatawa12 Date: Thu, 1 Feb 2024 16:01:36 +0900 Subject: [PATCH 5/5] chore: throw exception when encounter unpaired surrogate instead of replace with U+FFFD --- LiteDB/Document/BsonValue.cs | 4 ++-- LiteDB/Engine/Disk/Serializer/BufferReader.cs | 8 ++++---- LiteDB/Engine/Disk/Serializer/BufferWriter.cs | 12 ++++++------ LiteDB/Engine/Structures/CollectionIndex.cs | 4 ++-- LiteDB/Utils/Encoding.cs | 12 ++++++++++++ 5 files changed, 26 insertions(+), 14 deletions(-) create mode 100644 LiteDB/Utils/Encoding.cs diff --git a/LiteDB/Document/BsonValue.cs b/LiteDB/Document/BsonValue.cs index ad4261b1a..572e549a9 100644 --- a/LiteDB/Document/BsonValue.cs +++ b/LiteDB/Document/BsonValue.cs @@ -648,7 +648,7 @@ internal virtual int GetBytesCount(bool recalc) case BsonType.Double: return 8; case BsonType.Decimal: return 16; - case BsonType.String: return Encoding.UTF8.GetByteCount(this.AsString); + case BsonType.String: return StringEncoding.UTF8.GetByteCount(this.AsString); case BsonType.Binary: return this.AsBinary.Length; case BsonType.ObjectId: return 12; @@ -674,7 +674,7 @@ protected int GetBytesCountElement(string key, BsonValue value) return 1 + // element type - Encoding.UTF8.GetByteCount(key) + // CString + StringEncoding.UTF8.GetByteCount(key) + // CString 1 + // CString \0 value.GetBytesCount(true) + (variant ? 5 : 0); // bytes.Length + 0x?? diff --git a/LiteDB/Engine/Disk/Serializer/BufferReader.cs b/LiteDB/Engine/Disk/Serializer/BufferReader.cs index 255ca4cb8..0118763bb 100644 --- a/LiteDB/Engine/Disk/Serializer/BufferReader.cs +++ b/LiteDB/Engine/Disk/Serializer/BufferReader.cs @@ -154,7 +154,7 @@ public string ReadString(int count) // if fits in current segment, use inner array - otherwise copy from multiples segments if (_currentPosition + count <= _current.Count) { - value = Encoding.UTF8.GetString(_current.Array, _current.Offset + _currentPosition, count); + value = StringEncoding.UTF8.GetString(_current.Array, _current.Offset + _currentPosition, count); this.MoveForward(count); } @@ -165,7 +165,7 @@ public string ReadString(int count) this.Read(buffer, 0, count); - value = Encoding.UTF8.GetString(buffer, 0, count); + value = StringEncoding.UTF8.GetString(buffer, 0, count); BufferPool.Return(buffer); } @@ -204,7 +204,7 @@ public string ReadCString() this.MoveForward(1); // +1 to '\0' - return Encoding.UTF8.GetString(mem.ToArray()); + return StringEncoding.UTF8.GetString(mem.ToArray()); } } } @@ -220,7 +220,7 @@ private bool TryReadCStringCurrentSegment(out string value) { if (_current[pos] == 0x00) { - value = Encoding.UTF8.GetString(_current.Array, _current.Offset + _currentPosition, count); + value = StringEncoding.UTF8.GetString(_current.Array, _current.Offset + _currentPosition, count); this.MoveForward(count + 1); // +1 means '\0' return true; } diff --git a/LiteDB/Engine/Disk/Serializer/BufferWriter.cs b/LiteDB/Engine/Disk/Serializer/BufferWriter.cs index 093aa66b2..caa43bf3e 100644 --- a/LiteDB/Engine/Disk/Serializer/BufferWriter.cs +++ b/LiteDB/Engine/Disk/Serializer/BufferWriter.cs @@ -152,13 +152,13 @@ public void WriteCString(string value) { if (value.IndexOf('\0') > -1) throw LiteException.InvalidNullCharInString(); - var bytesCount = Encoding.UTF8.GetByteCount(value); + var bytesCount = StringEncoding.UTF8.GetByteCount(value); var available = _current.Count - _currentPosition; // avaiable in current segment // can write direct in current segment (use < because need +1 \0) if (bytesCount < available) { - Encoding.UTF8.GetBytes(value, 0, value.Length, _current.Array, _current.Offset + _currentPosition); + StringEncoding.UTF8.GetBytes(value, 0, value.Length, _current.Array, _current.Offset + _currentPosition); _current[_currentPosition + bytesCount] = 0x00; @@ -168,7 +168,7 @@ public void WriteCString(string value) { var buffer = BufferPool.Rent(bytesCount); - Encoding.UTF8.GetBytes(value, 0, value.Length, buffer, 0); + StringEncoding.UTF8.GetBytes(value, 0, value.Length, buffer, 0); this.Write(buffer, 0, bytesCount); @@ -186,7 +186,7 @@ public void WriteCString(string value) /// public void WriteString(string value, bool specs) { - var count = Encoding.UTF8.GetByteCount(value); + var count = StringEncoding.UTF8.GetByteCount(value); if (specs) { @@ -195,7 +195,7 @@ public void WriteString(string value, bool specs) if (count <= _current.Count - _currentPosition) { - Encoding.UTF8.GetBytes(value, 0, value.Length, _current.Array, _current.Offset + _currentPosition); + StringEncoding.UTF8.GetBytes(value, 0, value.Length, _current.Array, _current.Offset + _currentPosition); this.MoveForward(count); } @@ -204,7 +204,7 @@ public void WriteString(string value, bool specs) // rent a buffer to be re-usable var buffer = BufferPool.Rent(count); - Encoding.UTF8.GetBytes(value, 0, value.Length, buffer, 0); + StringEncoding.UTF8.GetBytes(value, 0, value.Length, buffer, 0); this.Write(buffer, 0, count); diff --git a/LiteDB/Engine/Structures/CollectionIndex.cs b/LiteDB/Engine/Structures/CollectionIndex.cs index fce53b032..6ea3a0316 100644 --- a/LiteDB/Engine/Structures/CollectionIndex.cs +++ b/LiteDB/Engine/Structures/CollectionIndex.cs @@ -121,8 +121,8 @@ public static int GetLength(string name, string expr) return 1 + // Slot 1 + // IndexType - Encoding.UTF8.GetByteCount(name) + 1 + // Name + \0 - Encoding.UTF8.GetByteCount(expr) + 1 + // Expression + \0 + StringEncoding.UTF8.GetByteCount(name) + 1 + // Name + \0 + StringEncoding.UTF8.GetByteCount(expr) + 1 + // Expression + \0 1 + // Unique PageAddress.SIZE + // Head PageAddress.SIZE + // Tail diff --git a/LiteDB/Utils/Encoding.cs b/LiteDB/Utils/Encoding.cs new file mode 100644 index 000000000..ce28bea2f --- /dev/null +++ b/LiteDB/Utils/Encoding.cs @@ -0,0 +1,12 @@ +using System.Text; + +namespace LiteDB +{ + internal class StringEncoding + { + // Original Encoding.UTF8 will replace unpaired surrogate with U+FFFD, which is not suitable for database + // so, we need to use new UTF8Encoding(false, true) to make throw exception when unpaired surrogate is found + //public static System.Text.Encoding UTF8 = new UTF8Encoding(false, true); + public static Encoding UTF8 = new UTF8Encoding(false, true); + } +}