Skip to content

Commit

Permalink
Merging
Browse files Browse the repository at this point in the history
  • Loading branch information
mbdavid committed Feb 13, 2024
2 parents a726348 + 31e0ff7 commit 04e0eb8
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 74 deletions.
4 changes: 2 additions & 2 deletions LiteDB/Document/BsonValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ internal virtual int GetBytesCount(bool recalc)
case BsonType.Double: return 8;
case BsonType.Decimal: return 16;

case BsonType.String: return Encoding.UTF8.GetByteCount(this.AsString);
case BsonType.String: return StringEncoding.UTF8.GetByteCount(this.AsString);

case BsonType.Binary: return this.AsBinary.Length;
case BsonType.ObjectId: return 12;
Expand All @@ -674,7 +674,7 @@ protected int GetBytesCountElement(string key, BsonValue value)

return
1 + // element type
Encoding.UTF8.GetByteCount(key) + // CString
StringEncoding.UTF8.GetByteCount(key) + // CString
1 + // CString \0
value.GetBytesCount(true) +
(variant ? 5 : 0); // bytes.Length + 0x??
Expand Down
2 changes: 0 additions & 2 deletions LiteDB/Engine/Disk/DiskService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ public int WriteAsync(IEnumerable<PageBuffer> pages)
count++;
}

_queue.Value.Run();

return count;
}

Expand Down
89 changes: 38 additions & 51 deletions LiteDB/Engine/Disk/DiskWriterQueue.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using static LiteDB.Constants;
Expand All @@ -18,11 +17,12 @@ internal class DiskWriterQueue : IDisposable

// async thread controls
private Task _task;
private bool _shouldClose = false;

private readonly ConcurrentQueue<PageBuffer> _queue = new ConcurrentQueue<PageBuffer>();

private bool _aborted = false;
private int _running = 0;
private readonly object _queueSync = new object();
private readonly AsyncManualResetEvent _queueHasItems = new AsyncManualResetEvent();
private readonly ManualResetEventSlim _queueIsEmpty = new ManualResetEventSlim(true);

public DiskWriterQueue(Stream stream)
{
Expand All @@ -42,27 +42,15 @@ public void EnqueuePage(PageBuffer page)
{
ENSURE(page.Origin == FileOrigin.Log, "async writer must use only for Log file");

if (_aborted) return;

_queue.Enqueue(page);
}

/// <summary>
/// If queue contains pages and are not running, starts run queue again now
/// </summary>
public void Run()
{
lock (_queue)
lock (_queueSync)
{
if (_queue.Count == 0 || _aborted) return;
_queueIsEmpty.Reset();
_queue.Enqueue(page);
_queueHasItems.Set();

var oldValue = Interlocked.CompareExchange(ref _running, 1, 0);

if (oldValue == 0)
if (_task == null)
{
// schedule a new thread to process the pages in the queue.
// https://blog.stephencleary.com/2013/08/startnew-is-dangerous.html
_task = Task.Run(ExecuteQueue);
_task = Task.Factory.StartNew(ExecuteQueue, TaskCreationOptions.LongRunning);
}
}
}
Expand All @@ -72,16 +60,7 @@ public void Run()
/// </summary>
public void Wait()
{
lock (_queue)
{
if (_task != null)
{
_task.Wait();
}

Run();
}

_queueIsEmpty.Wait();
ENSURE(_queue.Count == 0, "queue should be empty after wait() call");
}

Expand All @@ -98,37 +77,40 @@ public void Abort()
/// <summary>
/// Execute all items in queue sync
/// </summary>
private void ExecuteQueue()
private async Task ExecuteQueue()
{
do
while (true)
{
if (_queue.TryDequeue(out var page))
{
WritePageToStream(page);
}

while (page == null)
else
{
_stream.FlushToDisk();
Volatile.Write(ref _running, 0);

if (!_queue.Any()) return;

// Another item was added to the queue after we detected it was empty.
var oldValue = Interlocked.CompareExchange(ref _running, 1, 0);

if (oldValue == 1)
lock (_queueSync)
{
// A new thread was already scheduled for execution, this thread can return.
return;
if (_queue.Count > 0) continue;
_queueIsEmpty.Set();
_queueHasItems.Reset();
if (_shouldClose) return;
}
TryFlushStream();

// This thread will continue to process the queue as a new thread was not scheduled.
_queue.TryDequeue(out page);
WritePageToStream(page);
await _queueHasItems.WaitAsync();
}
}
}

} while (true);
private void TryFlushStream()
{
try
{
_stream.FlushToDisk();
}
catch (IOException)
{
// Disk is probably full. This may be unrecoverable problem but until we have enough space in the buffer we may be ok.
}
}

private void WritePageToStream(PageBuffer page)
Expand All @@ -154,8 +136,13 @@ public void Dispose()
{
LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK");

_shouldClose = true;
_queueHasItems.Set(); // unblock the running loop in case there are no items

// run all items in queue before dispose
this.Wait();
_task?.Wait();
_task = null;
}
}
}
8 changes: 4 additions & 4 deletions LiteDB/Engine/Disk/Serializer/BufferReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public string ReadString(int count)
// if fits in current segment, use inner array - otherwise copy from multiples segments
if (_currentPosition + count <= _current.Count)
{
value = Encoding.UTF8.GetString(_current.Array, _current.Offset + _currentPosition, count);
value = StringEncoding.UTF8.GetString(_current.Array, _current.Offset + _currentPosition, count);

this.MoveForward(count);
}
Expand All @@ -165,7 +165,7 @@ public string ReadString(int count)

this.Read(buffer, 0, count);

value = Encoding.UTF8.GetString(buffer, 0, count);
value = StringEncoding.UTF8.GetString(buffer, 0, count);

BufferPool.Return(buffer);
}
Expand Down Expand Up @@ -204,7 +204,7 @@ public string ReadCString()

this.MoveForward(1); // +1 to '\0'

return Encoding.UTF8.GetString(mem.ToArray());
return StringEncoding.UTF8.GetString(mem.ToArray());
}
}
}
Expand All @@ -220,7 +220,7 @@ private bool TryReadCStringCurrentSegment(out string value)
{
if (_current[pos] == 0x00)
{
value = Encoding.UTF8.GetString(_current.Array, _current.Offset + _currentPosition, count);
value = StringEncoding.UTF8.GetString(_current.Array, _current.Offset + _currentPosition, count);
this.MoveForward(count + 1); // +1 means '\0'
return true;
}
Expand Down
12 changes: 6 additions & 6 deletions LiteDB/Engine/Disk/Serializer/BufferWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,13 @@ public void WriteCString(string value)
{
if (value.IndexOf('\0') > -1) throw LiteException.InvalidNullCharInString();

var bytesCount = Encoding.UTF8.GetByteCount(value);
var bytesCount = StringEncoding.UTF8.GetByteCount(value);
var available = _current.Count - _currentPosition; // avaiable in current segment

// can write direct in current segment (use < because need +1 \0)
if (bytesCount < available)
{
Encoding.UTF8.GetBytes(value, 0, value.Length, _current.Array, _current.Offset + _currentPosition);
StringEncoding.UTF8.GetBytes(value, 0, value.Length, _current.Array, _current.Offset + _currentPosition);

_current[_currentPosition + bytesCount] = 0x00;

Expand All @@ -168,7 +168,7 @@ public void WriteCString(string value)
{
var buffer = BufferPool.Rent(bytesCount);

Encoding.UTF8.GetBytes(value, 0, value.Length, buffer, 0);
StringEncoding.UTF8.GetBytes(value, 0, value.Length, buffer, 0);

this.Write(buffer, 0, bytesCount);

Expand All @@ -186,7 +186,7 @@ public void WriteCString(string value)
/// </summary>
public void WriteString(string value, bool specs)
{
var count = Encoding.UTF8.GetByteCount(value);
var count = StringEncoding.UTF8.GetByteCount(value);

if (specs)
{
Expand All @@ -195,7 +195,7 @@ public void WriteString(string value, bool specs)

if (count <= _current.Count - _currentPosition)
{
Encoding.UTF8.GetBytes(value, 0, value.Length, _current.Array, _current.Offset + _currentPosition);
StringEncoding.UTF8.GetBytes(value, 0, value.Length, _current.Array, _current.Offset + _currentPosition);

this.MoveForward(count);
}
Expand All @@ -204,7 +204,7 @@ public void WriteString(string value, bool specs)
// rent a buffer to be re-usable
var buffer = BufferPool.Rent(count);

Encoding.UTF8.GetBytes(value, 0, value.Length, buffer, 0);
StringEncoding.UTF8.GetBytes(value, 0, value.Length, buffer, 0);

this.Write(buffer, 0, count);

Expand Down
86 changes: 86 additions & 0 deletions LiteDB/Engine/Engine/Upgrade.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using static LiteDB.Constants;

namespace LiteDB.Engine
{
public partial class LiteEngine
{
/// <summary>
/// Upgrade old version of LiteDB into new LiteDB file structure. Returns true if database was completed converted
/// If database already in current version just return false
/// </summary>
public static bool Upgrade(string filename, string password = null, Collation collation = null)
{
if (filename.IsNullOrWhiteSpace()) throw new ArgumentNullException(nameof(filename));
if (!File.Exists(filename)) return false;

var settings = new EngineSettings
{
Filename = filename,
Password = password,
Collation = collation
};

var backup = FileHelper.GetSuffixFile(filename, "-backup", true);

settings.Filename = FileHelper.GetSuffixFile(filename, "-temp", true);

var buffer = new byte[PAGE_SIZE * 2];
IFileReader reader;

using (var stream = new FileStream(filename, FileMode.Open, FileAccess.Read))
{
// read first 16k
stream.Read(buffer, 0, buffer.Length);

// checks if v8 plain data or encrypted (first byte = 1)
if ((Encoding.UTF8.GetString(buffer, HeaderPage.P_HEADER_INFO, HeaderPage.HEADER_INFO.Length) == HeaderPage.HEADER_INFO &&
buffer[HeaderPage.P_FILE_VERSION] == HeaderPage.FILE_VERSION) ||
buffer[0] == 1)
{
return false;
}

// checks if v7 (plain or encrypted)
if (Encoding.UTF8.GetString(buffer, 25, HeaderPage.HEADER_INFO.Length) == HeaderPage.HEADER_INFO &&
buffer[52] == 7)
{
reader = new FileReaderV7(stream, password);
}
else
{
throw new LiteException(0, "Invalid data file format to upgrade");
}

using (var engine = new LiteEngine(settings))
{
// copy all database to new Log file with NO checkpoint during all rebuild
engine.Pragma(Pragmas.CHECKPOINT, 0);

engine.RebuildContent(reader);

// after rebuild, copy log bytes into data file
engine.Checkpoint();

// re-enable auto-checkpoint pragma
engine.Pragma(Pragmas.CHECKPOINT, 1000);

// copy userVersion from old datafile
engine.Pragma("USER_VERSION", (reader as FileReaderV7).UserVersion);
}
}

// rename source filename to backup name
File.Move(filename, backup);

// rename temp file into filename
File.Move(settings.Filename, filename);

return true;
}
}
}
2 changes: 1 addition & 1 deletion LiteDB/Engine/Pages/HeaderPage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace LiteDB.Engine
{
/// <summary>
/// Header page represent first page on datafile. Engine contains a single instance of HeaderPage and all changes
/// must be syncornized (using lock).
/// must be synchronized (using lock).
/// </summary>
internal class HeaderPage : BasePage
{
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Services/TransactionMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private int GetInitialSize()
{
var sum = 0;

// if there is no avaiable pages, reduce all open transactions
// if there is no available pages, reduce all open transactions
foreach (var trans in _transactions.Values)
{
//TODO: revisar estas contas, o reduce tem que fechar 1000
Expand Down
4 changes: 2 additions & 2 deletions LiteDB/Engine/Services/TransactionService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public void Commit()
}
}

// dispose all snapshosts
// dispose all snapshots
foreach (var snapshot in _snapshots.Values)
{
snapshot.Dispose();
Expand All @@ -291,7 +291,7 @@ public void Rollback()
this.ReturnNewPages();
}

// dispose all snaphosts
// dispose all snapshots
foreach (var snapshot in _snapshots.Values)
{
// but first, if writable, discard changes
Expand Down
Loading

0 comments on commit 04e0eb8

Please sign in to comment.