Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New transaction #1560

Merged
merged 6 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions LiteDB.Stress/LiteDB.Stress.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\LiteDB\LiteDB.csproj" />
</ItemGroup>

</Project>
191 changes: 191 additions & 0 deletions LiteDB.Stress/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
using LiteDB;
using LiteDB.Engine;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace LiteDB.Stress
{
public class Program
{
private const string DB_FILENAME = ".\\stress-test.db";

private const int INITAL_INSERT_THREADS = 10;
private const int INSERT_SLEEP = 10;

private static ConcurrentDictionary<int, ThreadInfo> _threads = new ConcurrentDictionary<int, ThreadInfo>();
private static LiteEngine _engine;
private static Stopwatch _timer = Stopwatch.StartNew();
private static bool _running = true;

static void Main()
{
ClearFiles();

_engine = new LiteEngine(DB_FILENAME);

CreateThread("REPORT", 1_000, ReportThread);
CreateThread("CHECKPOINT", 5_000, () => _engine.Checkpoint());
CreateThread("COUNT_1", 2_000, () => _engine.Query("col1", new Query { Select = "{ c: COUNT(*) }" }).ToArray()[0]["c"]);
CreateThread("COUNT_2", 2_000, () => _engine.Query("col2", new Query { Select = "{ c: COUNT(*) }" }).ToArray()[0]["c"]);

CreateThread("ENSURE_INDEX", 25_000, () => _engine.EnsureIndex("col1", "idx_ts", "$.timespan", false));
CreateThread("DROP_INDEX", 60_000, () => _engine.DropIndex("col1", "idx_ts"));

CreateThread("DELETE_ALL", 10 * 60 * 1_000, () => _engine.DeleteMany("col1", "1=1"));

CreateThread("REBUILD", 11 * 60 * 1_000, () => _engine.Rebuild(new RebuildOptions()));

CreateThread("FILE_SIZE", 1_000, () => _engine.Query("$database", new Query { Select = "{ data: FORMAT(dataFileSize, 'n0'), log: FORMAT(logFileSize, 'n0') }" }).ToArray()[0]);

for (int i = 0; i < INITAL_INSERT_THREADS; ++i)
{
CreateThread("INSERT_1", INSERT_SLEEP, () => InsertThread("col1"));
}

CreateThread("INSERT_2", INSERT_SLEEP, () => InsertThread("col2"));
CreateThread("INSERT_2", INSERT_SLEEP, () => InsertThread("col2"));
CreateThread("INSERT_2", INSERT_SLEEP, () => InsertThread("col2"));

while (_running)
{
var key = Console.ReadKey(true).Key;

if (key == ConsoleKey.Spacebar)
{
CreateThread("INSERT_1", INSERT_SLEEP, () => InsertThread("col1"));
}

if (key == ConsoleKey.Enter)
{
foreach(var t in _threads.Values)
{
t.CancellationTokenSource.Cancel();
t.Thread.Join();
}
break;
}
}

Console.WriteLine("End;");
Console.ReadKey();
}

static void ClearFiles()
{
var searchPattern = Path.GetFileNameWithoutExtension(DB_FILENAME);
var filesToDelete = Directory.GetFiles(".", $"{searchPattern}*.db");

foreach (var deleteFile in filesToDelete)
{
Console.WriteLine($"Deleting {deleteFile}");
File.Delete(deleteFile);
}
}

private static void CreateThread(string name, int sleep, Action fn)
{
CreateThread(name, sleep, () => { fn(); return null; });
}

private static void CreateThread(string name, int sleep, Func<BsonValue> fn)
{
var thread = new Thread(() =>
{
while (_running)
{
var info = _threads[Thread.CurrentThread.ManagedThreadId];
info.CancellationTokenSource.Token.WaitHandle.WaitOne(sleep);
if (info.CancellationTokenSource.Token.IsCancellationRequested) break;
info.Elapsed.Restart();
info.Running = true;
try
{
info.Result = fn();
}
catch (Exception ex)
{
info.Exception = ex;

foreach(var t in _threads)
{
t.Value.CancellationTokenSource.Cancel();
t.Value.Thread.Join();
}
break;
}
info.Running = false;
info.Elapsed.Stop();
info.Counter++;
info.LastRun = DateTime.Now;
}
});

_threads[thread.ManagedThreadId] = new ThreadInfo
{
Name = name,
Thread = thread,
};

thread.Name = name;
thread.Start();
}

private static void InsertThread(string col)
{
_engine.Insert(col, new BsonDocument[]
{
new BsonDocument
{
["timespan"] = DateTime.Now
}
}, BsonAutoId.Int32);
}

private static void ReportThread()
{
  Console.Clear();
Console.WriteLine($"LiteDB Multithreaded: {_threads.Count}, running for {_timer.Elapsed}");
Console.WriteLine("Press <ENTER> to stop processing, <SPACE> to add insert thread");
Console.WriteLine();

foreach (var thread in _threads)
{
var howLong = DateTime.Now - thread.Value.LastRun;

var id = thread.Key.ToString("00");
var name = (thread.Value.Name + (thread.Value.Running ? "*" : "")).PadRight(13, ' ');
var counter = thread.Value.Counter.ToString().PadRight(5, ' ');
var timer = howLong.TotalSeconds > 60 ?
((int)howLong.TotalMinutes).ToString().PadLeft(2, ' ') + " minutes" :
((int)howLong.TotalSeconds).ToString().PadLeft(2, ' ') + " seconds";
var result = thread.Value.Result != null ? $"[{thread.Value.Result.ToString()}]" : "";
var running = thread.Value.Elapsed.Elapsed.TotalSeconds > 1 ?
$"<LAST RUN {(int)thread.Value.Elapsed.Elapsed.TotalSeconds}s> " :
"";
var ex = thread.Value.Exception != null ?
" ERROR: " + thread.Value.Exception.Message :
"";

Console.WriteLine($"{id}. {name} :: {counter} >> {timer} {running}{result}{ex}" );
}
}
}

class ThreadInfo
{
public string Name { get; set; }
public int Counter { get; set; } = 0;
public bool Running { get; set; } = false;
public Stopwatch Elapsed { get; } = new Stopwatch();
public DateTime LastRun { get; set; } = DateTime.Now;
public BsonValue Result { get; set; }
public Exception Exception { get; set; }
public Thread Thread { get; set; }
public CancellationTokenSource CancellationTokenSource { get; } = new CancellationTokenSource();
}
}
4 changes: 2 additions & 2 deletions LiteDB.Tests/Expressions/Expressions_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,11 @@ public void Expression_AndAlso_OrElse()

// OK (true)
doc1["x"] = "12345";
var r1 = ex1.ExecuteScalar(doc1);
ex1.ExecuteScalar(doc1);

// KO (expected: false, actual: exception)
doc1["x"] = "123";
var r2 = ex1.ExecuteScalar(doc1);
ex1.ExecuteScalar(doc1);
}

[Fact]
Expand Down
4 changes: 2 additions & 2 deletions LiteDB.Tests/Internals/FreePage_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void FreeSlot_Insert()
e.BeginTrans();

// get transaction/snapshot "col1"
var t = e.GetMonitor().GetTransaction(false, out var isNew);
var t = e.GetMonitor().GetTransaction(false, false, out var isNew);
var s = t.CreateSnapshot(LockMode.Write, "col1", true);

e.Insert("col1", new BsonDocument[] {new BsonDocument {["n"] = new byte[200]}}, BsonAutoId.Int32);
Expand Down Expand Up @@ -96,7 +96,7 @@ public void FreeSlot_Delete()
e.BeginTrans();

// get transaction/snapshot "col1"
var t = e.GetMonitor().GetTransaction(false, out var isNew);
var t = e.GetMonitor().GetTransaction(false, false, out var isNew);
var s = t.CreateSnapshot(LockMode.Write, "col1", true);

// first page
Expand Down
8 changes: 7 additions & 1 deletion LiteDB.sln
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LiteDB.Tests", "LiteDB.Test
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LiteDB.Shell", "LiteDB.Shell\LiteDB.Shell.csproj", "{99887C89-CAE4-4A8D-AC4B-87E28B9B1F87}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LiteDB.Benchmarks", "LiteDB.Benchmarks\LiteDB.Benchmarks.csproj", "{DF9C82C1-446F-458A-AA50-78E58BA17273}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LiteDB.Benchmarks", "LiteDB.Benchmarks\LiteDB.Benchmarks.csproj", "{DF9C82C1-446F-458A-AA50-78E58BA17273}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LiteDB.Stress", "LiteDB.Stress\LiteDB.Stress.csproj", "{FFBC5669-DA32-4907-8793-7B414279DA3B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand All @@ -32,6 +34,10 @@ Global
{DF9C82C1-446F-458A-AA50-78E58BA17273}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DF9C82C1-446F-458A-AA50-78E58BA17273}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DF9C82C1-446F-458A-AA50-78E58BA17273}.Release|Any CPU.Build.0 = Release|Any CPU
{FFBC5669-DA32-4907-8793-7B414279DA3B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FFBC5669-DA32-4907-8793-7B414279DA3B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FFBC5669-DA32-4907-8793-7B414279DA3B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FFBC5669-DA32-4907-8793-7B414279DA3B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
31 changes: 16 additions & 15 deletions LiteDB/Engine/Disk/DiskService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,26 +120,27 @@ public DiskReader GetReader()
/// <summary>
/// When a page are requested as Writable but not saved in disk, must be discard before release
/// </summary>
public void DiscardPages(IEnumerable<PageBuffer> pages, bool isDirty)
public void DiscardDirtyPages(IEnumerable<PageBuffer> pages)
{
if (isDirty == false)
// only for ROLLBACK action
foreach (var page in pages)
{
foreach (var page in pages)
{
// if page was not modified, try move to readable list
if (_cache.TryMoveToReadable(page) == false)
{
// if already in readable list, just discard
_cache.DiscardPage(page);
}
}
// complete discard page and content
_cache.DiscardPage(page);
}
else
}

/// <summary>
/// Discard pages that contains valid data and was not modified
/// </summary>
public void DiscardCleanPages(IEnumerable<PageBuffer> pages)
{
foreach (var page in pages)
{
// only for ROLLBACK action
foreach (var page in pages)
// if page was not modified, try move to readable list
if (_cache.TryMoveToReadable(page) == false)
{
// complete discard page and content
// if already in readable list, just discard
_cache.DiscardPage(page);
}
}
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Disk/DiskWriterQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private void ExecuteQueue()
// after this I will have 100% sure data are safe on log file
_stream.FlushToDisk();
}
catch (IOException ex)
catch (IOException)
{
//TODO: notify database to stop working (throw error in all operations)
}
Expand Down
57 changes: 27 additions & 30 deletions LiteDB/Engine/Engine/Delete.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,48 +59,45 @@ public int DeleteMany(string collection, BsonExpression predicate)
if (collection.IsNullOrWhiteSpace()) throw new ArgumentNullException(nameof(collection));
if (predicate == null) throw new ArgumentNullException(nameof(predicate));

return this.AutoTransaction(transaction =>
// do optimization for when using "_id = value" key
if (predicate.Type == BsonExpressionType.Equal &&
predicate.Left.Type == BsonExpressionType.Path &&
predicate.Left.Source == "$._id" &&
predicate.Right.IsValue)
{
// do optimization for when using "_id = value" key
if (predicate.Type == BsonExpressionType.Equal &&
predicate.Left.Type == BsonExpressionType.Path &&
predicate.Left.Source == "$._id" &&
predicate.Right.IsValue)
{
predicate.Parameters.CopyTo(predicate.Right.Parameters);
predicate.Parameters.CopyTo(predicate.Right.Parameters);

var id = predicate.Right.Execute(_header.Pragmas.Collation).First();
var id = predicate.Right.Execute(_header.Pragmas.Collation).First();

return this.Delete(collection, new BsonValue[] { id });
}
else
return this.Delete(collection, new BsonValue[] { id });
}
else
{
IEnumerable<BsonValue> getIds()
{
IEnumerable<BsonValue> getIds()
{
// this is intresting: if _id returns an document (like in FileStorage) you can't run direct _id
// field because "reader.Current" will return _id document - but not - { _id: [document] }
// create inner document to ensure _id will be a document
var query = new Query { Select = "{ i: _id }", ForUpdate = true };
// this is intresting: if _id returns an document (like in FileStorage) you can't run direct _id
// field because "reader.Current" will return _id document - but not - { _id: [document] }
// create inner document to ensure _id will be a document
var query = new Query { Select = "{ i: _id }", ForUpdate = true };

query.Where.Add(predicate);
query.Where.Add(predicate);

using (var reader = this.Query(collection, query))
using (var reader = this.Query(collection, query))
{
while (reader.Read())
{
while (reader.Read())
{
var value = reader.Current["i"];
var value = reader.Current["i"];

if (value != BsonValue.Null)
{
yield return value;
}
if (value != BsonValue.Null)
{
yield return value;
}
}
}

return this.Delete(collection, getIds());
}
});

return this.Delete(collection, getIds());
}
}
}
}
3 changes: 2 additions & 1 deletion LiteDB/Engine/Engine/Query.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ namespace LiteDB.Engine
public partial class LiteEngine
{
/// <summary>
/// Run query over collection using a query definition
/// Run query over collection using a query definition.
/// Returns a new IBsonDataReader that run and return first document result (open transaction)
/// </summary>
public IBsonDataReader Query(string collection, Query query)
{
Expand Down
Loading