Skip to content

Commit

Permalink
Enforce memory size constraint during recovery (microsoft#276)
Browse files Browse the repository at this point in the history
* Initial draft.

* Additiional changes.

* Fixed setting head address.

* Cleanup.

* Updates to free previously allocated pages that could violate size constraint.
Added a wait for page flush before reading into the same slot.

* Moved to a batch read approach.

* Added logger to test instance of GarnetServer.

* Cleanup.

* Fix read page range up to snapshotendpage.

* Resolved PR comments.

* Added benchmark.
Resolved PR comments.

* Initial draft.

* Additiional changes.

* Fixed setting head address.

* Cleanup.

* Updates to free previously allocated pages that could violate size constraint.
Added a wait for page flush before reading into the same slot.

* Moved to a batch read approach.

* Added logger to test instance of GarnetServer.

* Cleanup.

* Fix read page range up to snapshotendpage.

* Resolved PR comments.

* Added benchmark.
Resolved PR comments.

* Fixed formatting.

* Opt out of test logger by default

---------

Co-authored-by: Yoganand Rajasekaran <yrajas@ntdev.microsoft.com>
Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
3 people authored Apr 24, 2024
1 parent dc7b517 commit 41e9ebd
Show file tree
Hide file tree
Showing 10 changed files with 360 additions and 84 deletions.
69 changes: 69 additions & 0 deletions benchmark/BDN.benchmark/RecoveryBenchmark.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Columns;
using BenchmarkDotNet.Configs;
using Embedded.perftest;
using Garnet.server;

namespace BDN.benchmark
{
public class CustomConfig : ManualConfig
{
public CustomConfig()
{
AddColumn(StatisticColumn.Mean);
AddColumn(StatisticColumn.StdDev);
AddColumn(StatisticColumn.Median);
AddColumn(StatisticColumn.P90);
AddColumn(StatisticColumn.P95);
}
}

[Config(typeof(CustomConfig))]
public class RecoveryBenchmark
{
[ParamsSource(nameof(CommandLineArgsProvider))]
public string LogDir { get; set; }

public IEnumerable<string> CommandLineArgsProvider()
{
// Return the command line arguments as an enumerable
return Environment.GetCommandLineArgs().Skip(1);
}

[Params("100m")]
public string MemorySize { get; set; }

EmbeddedRespServer server;

[IterationSetup]
public void Setup()
{
Console.WriteLine($"LogDir: {LogDir}");
server = new EmbeddedRespServer(new GarnetServerOptions()
{
EnableStorageTier = true,
LogDir = LogDir,
CheckpointDir = LogDir,
IndexSize = "1m",
DisableObjects = true,
MemorySize = MemorySize,
PageSize = "32k",
});
}

[IterationCleanup]
public void Cleanup()
{
server.Dispose();
}

[Benchmark]
public void Recover()
{
server.StoreWrapper.RecoverCheckpoint();
}
}
}
21 changes: 21 additions & 0 deletions libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,11 @@ public virtual void Dispose()
/// </summary>
public int AllocatedPageCount;

/// <summary>
/// Max number of pages that have been allocated at any point in time
/// </summary>
public int MaxAllocatedPageCount;

/// <summary>
/// Maximum possible number of empty pages in circular buffer
/// </summary>
Expand Down Expand Up @@ -1116,6 +1121,22 @@ public int EmptyPageCount
/// </summary>
internal abstract void DeleteFromMemory();

/// <summary>
/// Increments AllocatedPageCount
/// Update MaxAllocatedPageCount, if a higher number of pages have been allocated.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected void IncrementAllocatedPageCount()
{
var newAllocatedPageCount = Interlocked.Increment(ref AllocatedPageCount);
var currMaxAllocatedPageCount = MaxAllocatedPageCount;
while (currMaxAllocatedPageCount < newAllocatedPageCount)
{
if (Interlocked.CompareExchange(ref MaxAllocatedPageCount, newAllocatedPageCount, currMaxAllocatedPageCount) == currMaxAllocatedPageCount)
return;
currMaxAllocatedPageCount = MaxAllocatedPageCount;
}
}

/// <summary>
/// Segment size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public override void Dispose()
/// <param name="index"></param>
internal override void AllocatePage(int index)
{
Interlocked.Increment(ref AllocatedPageCount);
IncrementAllocatedPageCount();

if (overflowPagePool.TryGet(out var item))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ internal override void AllocatePage(int index)

internal Record<Key, Value>[] AllocatePage()
{
Interlocked.Increment(ref AllocatedPageCount);
IncrementAllocatedPageCount();

if (overflowPagePool.TryGet(out var item))
return item;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public override void Dispose()
/// <param name="index"></param>
internal override void AllocatePage(int index)
{
Interlocked.Increment(ref AllocatedPageCount);
IncrementAllocatedPageCount();

if (overflowPagePool.TryGet(out var item))
{
Expand Down
207 changes: 127 additions & 80 deletions libs/storage/Tsavorite/cs/src/core/Index/Recovery/Recovery.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public partial class TsavoriteKV<Key, Value> : TsavoriteBase, IDisposable
/// </summary>
public long EntryCount => GetEntryCount();

/// <summary>
/// Maximum number of memory pages ever allocated
/// </summary>
public long MaxAllocatedPageCount => hlog.MaxAllocatedPageCount;

/// <summary>
/// Size of index in #cache lines (64 bytes each)
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions playground/Embedded.perftest/EmbeddedRespServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public EmbeddedRespServer(GarnetServerOptions opts, ILoggerFactory loggerFactory
/// </summary>
public new void Dispose() => base.Dispose();

public StoreWrapper StoreWrapper => storeWrapper;

/// <summary>
/// Return a RESP session to this server
/// </summary>
Expand Down
114 changes: 114 additions & 0 deletions test/Garnet.test/RespAdminCommandsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,120 @@ public void SeSaveRecoverObjectTest()
Assert.AreEqual(ldata, returnedData);
}
}
[Test]
[TestCase(63, 15, 1)]
[TestCase(63, 1, 1)]
[TestCase(16, 16, 1)]
[TestCase(5, 64, 1)]
public void SeSaveRecoverMultipleObjectsTest(int memorySize, int recoveryMemorySize, int pageSize)
{
string sizeToString(int size) => size + "k";

server.Dispose();
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, lowMemory: true, MemorySize: sizeToString(memorySize), PageSize: sizeToString(pageSize));
server.Start();

var ldata = new RedisValue[] { "a", "b", "c", "d" };
var ldataArr = ldata.Select(x => x).Reverse().ToArray();
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true)))
{
var db = redis.GetDatabase(0);
for (int i = 0; i < 3000; i++)
{
var key = $"SeSaveRecoverTestKey{i:0000}";
db.ListLeftPush(key, ldata);
var retval = db.ListRange(key);
Assert.AreEqual(ldataArr, retval, $"key {key}");
}

// Issue and wait for DB save
var server = redis.GetServer($"{TestUtils.Address}:{TestUtils.Port}");
server.Save(SaveType.BackgroundSave);
while (server.LastSave().Ticks == DateTimeOffset.FromUnixTimeSeconds(0).Ticks) Thread.Sleep(10);
}

server.Dispose(false);
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, lowMemory: true, MemorySize: sizeToString(recoveryMemorySize), PageSize: sizeToString(pageSize));
server.Start();

Assert.LessOrEqual(server.Provider.StoreWrapper.objectStore.MaxAllocatedPageCount, (recoveryMemorySize / pageSize) + 1);
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true)))
{
var db = redis.GetDatabase(0);
for (int i = 0; i < 3000; i++)
{
var key = $"SeSaveRecoverTestKey{i:0000}";
var returnedData = db.ListRange(key);
Assert.AreEqual(ldataArr, returnedData, $"key {key}");
}
}
}

[Test]
[TestCase("63k", "15k")]
[TestCase("63k", "3k")]
[TestCase("63k", "1k")]
[TestCase("8k", "5k")]
[TestCase("16k", "16k")]
[TestCase("5k", "8k")]
[TestCase("5k", "64k")]
public void SeSaveRecoverMultipleKeysTest(string memorySize, string recoveryMemorySize)
{
bool disableObj = true;

server.Dispose();
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, DisableObjects: disableObj, lowMemory: true, MemorySize: memorySize, PageSize: "1k", enableAOF: true);
server.Start();

using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true)))
{
var db = redis.GetDatabase(0);
for (int i = 0; i < 1000; i++)
{
db.StringSet($"SeSaveRecoverTestKey{i:0000}", $"SeSaveRecoverTestValue");
}

for (int i = 0; i < 1000; i++)
{
var recoveredValue = db.StringGet($"SeSaveRecoverTestKey{i:0000}");
Assert.AreEqual("SeSaveRecoverTestValue", recoveredValue.ToString());
}

var inforesult = db.Execute("INFO");

// Issue and wait for DB save
var server = redis.GetServer($"{TestUtils.Address}:{TestUtils.Port}");
server.Save(SaveType.BackgroundSave);
while (server.LastSave().Ticks == DateTimeOffset.FromUnixTimeSeconds(0).Ticks) Thread.Sleep(10);

for (int i = 1000; i < 2000; i++)
{
db.StringSet($"SeSaveRecoverTestKey{i:0000}", $"SeSaveRecoverTestValue");
}

for (int i = 1000; i < 2000; i++)
{
var recoveredValue = db.StringGet($"SeSaveRecoverTestKey{i:0000}");
Assert.AreEqual("SeSaveRecoverTestValue", recoveredValue.ToString());
}

db.Execute("COMMITAOF");
}

server.Dispose(false);
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, DisableObjects: disableObj, tryRecover: true, lowMemory: true, MemorySize: recoveryMemorySize, PageSize: "1k", enableAOF: true);
server.Start();

using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig(allowAdmin: true)))
{
var db = redis.GetDatabase(0);
for (int i = 0; i < 2000; i++)
{
var recoveredValue = db.StringGet($"SeSaveRecoverTestKey{i:0000}");
Assert.AreEqual("SeSaveRecoverTestValue", recoveredValue.ToString(), $"Key SeSaveRecoverTestKey{i:0000}");
}
}
}

[Test]
public void SeAofRecoverTest()
Expand Down
20 changes: 19 additions & 1 deletion test/Garnet.test/TestUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ internal static class TestUtils
/// </summary>
public static int Port = 33278;

/// <summary>
/// Whether to use a test progress logger
/// </summary>
static readonly bool useTestLogger = false;

internal static string AzureTestContainer
{
get
Expand Down Expand Up @@ -197,7 +202,20 @@ public static GarnetServer CreateGarnetServer(
opts.PageSize = opts.ObjectStorePageSize = PageSize == default ? "512" : PageSize;
}

return new GarnetServer(opts);
if (useTestLogger)
{
var loggerFactory = LoggerFactory.Create(builder =>
{
builder.AddProvider(new NUnitLoggerProvider(TestContext.Progress, "GarnetServer", null, false, false, LogLevel.Trace));
builder.SetMinimumLevel(LogLevel.Trace);
});

return new GarnetServer(opts, loggerFactory);
}
else
{
return new GarnetServer(opts);
}
}

/// <summary>
Expand Down

0 comments on commit 41e9ebd

Please sign in to comment.