Skip to content

Commit

Permalink
Migration Addition Logging & Fixes (#596)
Browse files Browse the repository at this point in the history
* add more descriptive log messages

* playground migration proj

* add slots option bench

* add more logging messages for migration

* remove counting migrate set since we expect the complete packet

* propagate error correctly at SendMigrate

* remove unused code

* augment playground options

* verbose trace message for keys exchanged

* simplify log messages

* fix sln and log message

* use expression initialization

* verbose log message

* nit

* add verbose log messages

* simplify SetClusterMigrate

* remove newline

* add importing logging to target node

* remove unused code

* remove unused member

* use logger source generator for new log messages

* add logging frequency

* cleanup migration implementation

* update logging
  • Loading branch information
vazois authored Aug 21, 2024
1 parent 9da8c90 commit 0f32b57
Show file tree
Hide file tree
Showing 26 changed files with 861 additions and 320 deletions.
10 changes: 10 additions & 0 deletions Garnet.sln
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SampleModule", "playground\SampleModule\SampleModule.csproj", "{A8CA619E-8F13-4EF8-943F-2D5E3FEBFB3F}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GarnetJSON", "playground\GarnetJSON\GarnetJSON.csproj", "{2C8F1F5D-31E5-4D00-A46E-F3B1D9BC098F}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MigrateBench", "playground\MigrateBench\MigrateBench.csproj", "{6B66B394-E410-4B61-9A5A-1595FF6F5E08}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -290,6 +291,14 @@ Global
{2C8F1F5D-31E5-4D00-A46E-F3B1D9BC098F}.Release|Any CPU.Build.0 = Release|Any CPU
{2C8F1F5D-31E5-4D00-A46E-F3B1D9BC098F}.Release|x64.ActiveCfg = Release|Any CPU
{2C8F1F5D-31E5-4D00-A46E-F3B1D9BC098F}.Release|x64.Build.0 = Release|Any CPU
{6B66B394-E410-4B61-9A5A-1595FF6F5E08}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6B66B394-E410-4B61-9A5A-1595FF6F5E08}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6B66B394-E410-4B61-9A5A-1595FF6F5E08}.Debug|x64.ActiveCfg = Debug|Any CPU
{6B66B394-E410-4B61-9A5A-1595FF6F5E08}.Debug|x64.Build.0 = Debug|Any CPU
{6B66B394-E410-4B61-9A5A-1595FF6F5E08}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6B66B394-E410-4B61-9A5A-1595FF6F5E08}.Release|Any CPU.Build.0 = Release|Any CPU
{6B66B394-E410-4B61-9A5A-1595FF6F5E08}.Release|x64.ActiveCfg = Release|Any CPU
{6B66B394-E410-4B61-9A5A-1595FF6F5E08}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -320,6 +329,7 @@ Global
{9BE474A2-1547-43AC-B4F2-FB48A01FA995} = {69A71E2C-00E3-42F3-854E-BE157A24834E}
{A8CA619E-8F13-4EF8-943F-2D5E3FEBFB3F} = {69A71E2C-00E3-42F3-854E-BE157A24834E}
{2C8F1F5D-31E5-4D00-A46E-F3B1D9BC098F} = {69A71E2C-00E3-42F3-854E-BE157A24834E}
{6B66B394-E410-4B61-9A5A-1595FF6F5E08} = {69A71E2C-00E3-42F3-854E-BE157A24834E}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2C02C405-4798-41CA-AF98-61EDFEF6772E}
Expand Down
238 changes: 120 additions & 118 deletions libs/client/ClientSession/GarnetClientSessionMigrationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading.Tasks;
using Garnet.common;
using Garnet.networking;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

namespace Garnet.client
Expand All @@ -24,6 +25,11 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
static ReadOnlySpan<byte> DELKEY => "DELKEY"u8;
static ReadOnlySpan<byte> GETKVPAIRINSLOT => "GETKVPAIRINSLOT"u8;

static ReadOnlySpan<byte> MAIN_STORE => "SSTORE"u8;
static ReadOnlySpan<byte> OBJECT_STORE => "OSTORE"u8;
static ReadOnlySpan<byte> T => "T"u8;
static ReadOnlySpan<byte> F => "F"u8;

/// <summary>
/// Send AUTH command to target node to authenticate connection.
/// </summary>
Expand Down Expand Up @@ -161,19 +167,54 @@ public Task<string> SetSlotRange(Memory<byte> state, string nodeid, List<(int, i
}

/// <summary>
/// MIGRATE data internal command
/// Check if migrate command parameters need to be initialized
/// </summary>
public bool InitMigrateCommand => curr == null;

/// <summary>
/// Getter to compute how much space to leave at the front of the buffer
/// in order to write the maximum possible RESP length header (of length bufferSize)
/// </summary>
int ExtraSpace =>
1 // $
+ bufferSizeDigits // Number of digits in maximum possible length (will be written with zero padding)
+ 2 // \r\n
+ 4; // We write a 4-byte int keyCount at the start of the payload

bool isMainStore;
byte* curr, head;
int keyCount;
TaskCompletionSource<string> currTcsMigrate = null;

/// <summary>
/// Flush and initialize buffers/parameters used for migrate command
/// </summary>
/// <param name="migrateProgressFreq"></param>
public void InitMigrateBuffer(TimeSpan migrateProgressFreq = default)
{
Flush();
currTcsMigrate = null;
curr = head = null;
keyCount = 0;
this.migrateProgressFreq = default ? TimeSpan.FromSeconds(5) : migrateProgressFreq;
}

/// <summary>
/// Write parameters of CLUSTER MIGRATE directly to the client buffer
/// </summary>
/// <param name="sourceNodeId"></param>
/// <param name="replaceOption"></param>
/// <param name="storeType"></param>
/// <param name="data"></param>
/// <returns></returns>
public Task<string> MigrateData(string sourceNodeId, Memory<byte> replaceOption, Memory<byte> storeType, Memory<byte> data)
/// <param name="replace"></param>
/// <param name="isMainStore"></param>
public void SetClusterMigrate(string sourceNodeId, bool replace, bool isMainStore)
{
var tcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
tcsQueue.Enqueue(tcs);
byte* curr = offset;
int arraySize = 6;
currTcsMigrate = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
tcsQueue.Enqueue(currTcsMigrate);
curr = offset;
this.isMainStore = isMainStore;
var storeType = isMainStore ? MAIN_STORE : OBJECT_STORE;
var replaceOption = replace ? T : F;

var arraySize = 6;

while (!RespWriteUtils.WriteArrayLength(arraySize, ref curr, end))
{
Expand All @@ -182,103 +223,108 @@ public Task<string> MigrateData(string sourceNodeId, Memory<byte> replaceOption,
}
offset = curr;

//1
// 1
while (!RespWriteUtils.WriteDirect(CLUSTER, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//2
// 2
while (!RespWriteUtils.WriteBulkString(MIGRATE, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//3
// 3
while (!RespWriteUtils.WriteAsciiBulkString(sourceNodeId, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//4
while (!RespWriteUtils.WriteBulkString(replaceOption.Span, ref curr, end))
// 4
while (!RespWriteUtils.WriteBulkString(replaceOption, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//5
while (!RespWriteUtils.WriteBulkString(storeType.Span, ref curr, end))
// 5
while (!RespWriteUtils.WriteBulkString(storeType, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//6
while (!RespWriteUtils.WriteBulkString(data.Span, ref curr, end))
// 6
// Reserve space for the bulk string header + final newline
while (ExtraSpace + 2 > (int)(end - curr))
{
Flush();
curr = offset;
}
offset = curr;

Flush();
Interlocked.Increment(ref numCommands);
return tcs.Task;
head = curr;
curr += ExtraSpace;
}

/// <summary>
/// Check if migrate command parameters need to be initialized
/// Send key value pair and reset migrate buffers
/// </summary>
public bool InitMigrateCommand => curr == null;
public Task<string> SendAndResetMigrate()
{
if (keyCount == 0) return null;

/// <summary>
/// Getter to compute how much space to leave at the front of the buffer
/// in order to write the maximum possible RESP length header (of length bufferSize)
/// </summary>
int ExtraSpace =>
1 // $
+ bufferSizeDigits // Number of digits in maximum possible length (will be written with zero padding)
+ 2 // \r\n
+ 4; // We write a 4-byte int keyCount at the start of the payload
Debug.Assert(end - curr >= 2);
*curr++ = (byte)'\r';
*curr++ = (byte)'\n';

byte* curr, head;
int keyCount;
TaskCompletionSource<string> currTcsMigrate = null;
// Payload format = [$length\r\n][number of keys (4 bytes)][raw key value pairs]\r\n
var size = (int)(curr - 2 - head - (ExtraSpace - 4));
TrackMigrateProgress(keyCount, size);
var success = RespWriteUtils.WritePaddedBulkStringLength(size, ExtraSpace - 4, ref head, end);
Debug.Assert(success);

/// <summary>
/// Flush and initialize buffers/parameters used for migrate command
/// </summary>
public void InitMigrateBuffer()
{
// Number of key value pairs in payload
*(int*)head = keyCount;

// Reset offset and flush buffer
offset = curr;
Flush();
Interlocked.Increment(ref numCommands);

// Return outstanding task and reset current tcs
var task = currTcsMigrate.Task;
currTcsMigrate = null;
curr = head = null;
keyCount = 0;
return task;
}

/// <summary>
/// Send key value pair and reset migrate buffers
/// Signal completion of migration by sending an empty payload
/// </summary>
/// <param name="sourceNodeId"></param>
/// <param name="replace"></param>
/// <param name="isMainStore"></param>
/// <returns></returns>
public Task<string> SendAndResetMigrate()
public Task<string> CompleteMigrate(string sourceNodeId, bool replace, bool isMainStore)
{
if (keyCount == 0) return null;
SetClusterMigrate(sourceNodeId, replace, isMainStore);

Debug.Assert(end - curr >= 2);
*curr++ = (byte)'\r';
*curr++ = (byte)'\n';

// Payload format = [$length\r\n][number of keys (4 bytes)][raw key value pairs]\r\n
int size = (int)(curr - 2 - head - (ExtraSpace - 4));
var size = (int)(curr - 2 - head - (ExtraSpace - 4));
TrackMigrateProgress(keyCount, size, completed: true);
var success = RespWriteUtils.WritePaddedBulkStringLength(size, ExtraSpace - 4, ref head, end);
Debug.Assert(success);

Expand Down Expand Up @@ -345,77 +391,6 @@ public bool TryWriteKeyValueByteArray(byte[] key, byte[] value, long expiration,
return true;
}

/// <summary>
/// Write parameters of CLUSTER MIGRATE directly to the client buffer
/// </summary>
/// <param name="sourceNodeId"></param>
/// <param name="replaceOption"></param>
/// <param name="storeType"></param>
public void SetClusterMigrate(string sourceNodeId, Memory<byte> replaceOption, Memory<byte> storeType)
{
currTcsMigrate = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
tcsQueue.Enqueue(currTcsMigrate);
curr = offset;
int arraySize = 6;

while (!RespWriteUtils.WriteArrayLength(arraySize, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 1
while (!RespWriteUtils.WriteDirect(CLUSTER, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 2
while (!RespWriteUtils.WriteBulkString(MIGRATE, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 3
while (!RespWriteUtils.WriteAsciiBulkString(sourceNodeId, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 4
while (!RespWriteUtils.WriteBulkString(replaceOption.Span, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 5
while (!RespWriteUtils.WriteBulkString(storeType.Span, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

// 6
// Reserve space for the bulk string header + final newline
while (ExtraSpace + 2 > (int)(end - curr))
{
Flush();
curr = offset;
}
head = curr;
curr += ExtraSpace;
}

private bool WriteSerializedSpanByte(ref SpanByte key, ref SpanByte value)
{
// We include space for newline at the end, to be added before sending
Expand Down Expand Up @@ -462,5 +437,32 @@ private bool WriteSerializedKeyValueByteArray(byte[] key, byte[] value, long exp

return true;
}

long lastLog = 0;
long totalKeyCount = 0;
long totalPayloadSize = 0;
TimeSpan migrateProgressFreq;

/// <summary>
/// Logging of migrate session status
/// </summary>
/// <param name="keyCount"></param>
/// <param name="size"></param>
/// <param name="completed"></param>
private void TrackMigrateProgress(int keyCount, int size, bool completed = false)
{
totalKeyCount += keyCount;
totalPayloadSize += size;
var duration = TimeSpan.FromTicks(Stopwatch.GetTimestamp() - lastLog);
if (completed || lastLog == 0 || duration >= migrateProgressFreq)
{
logger?.LogTrace("[{op}]: isMainStore:({storeType}) totalKeyCount:({totalKeyCount}), totalPayloadSize:({totalPayloadSize} KB)",
completed ? "COMPLETED" : "MIGRATING",
isMainStore,
totalKeyCount.ToString("N0"),
((long)((double)totalPayloadSize / 1024)).ToString("N0"));
lastLog = Stopwatch.GetTimestamp();
}
}
}
}
2 changes: 1 addition & 1 deletion libs/cluster/Server/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public string GetInfo()
return ClusterInfo;
}

private static string GetRange(int[] slots)
public static string GetRange(int[] slots)
{
var range = "> ";
var start = slots[0];
Expand Down
Loading

0 comments on commit 0f32b57

Please sign in to comment.