Skip to content

Commit

Permalink
Misc Fixes (#615)
Browse files Browse the repository at this point in the history
* fix bug setslotsrange command

* fix bug with flushall command in cluster

* batch reset slot state

* code cleanup

* augment migrate bench

* add logging at InitializeKV

* addressing comments

---------

Co-authored-by: Tal Zaccai <talzacc@microsoft.com>
  • Loading branch information
vazois and TalZaccai authored Aug 30, 2024
1 parent 68a7a85 commit 0d03980
Show file tree
Hide file tree
Showing 13 changed files with 87 additions and 24 deletions.
1 change: 1 addition & 0 deletions libs/cluster/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ static class CmdStrings
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_SLOTNOTMIGRATING => "ERR slot state not set to MIGRATING state"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_FAILEDTOADDKEY => "ERR Failed to add key for migration tracking"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_PARSING => "ERR Parsing error"u8;
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_SLOT_STATE => "ERR Invalid slot state"u8;

/// <summary>
/// Simple error respone strings, i.e. these are of the form "-errorString\r\n"
Expand Down
15 changes: 15 additions & 0 deletions libs/cluster/Server/ClusterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,21 @@ public ClusterConfig UpdateMultiSlotState(HashSet<int> slots, int workerId, Slot
return new ClusterConfig(newSlotMap, workers);
}

public ClusterConfig ResetMultiSlotState(HashSet<int> slots)
{
var newSlotMap = new HashSlot[MAX_HASH_SLOT_VALUE];
Array.Copy(slotMap, newSlotMap, slotMap.Length);

foreach (ushort slot in slots)
{
var slotState = GetState(slot);
var workerId = slotState == SlotState.MIGRATING ? 1 : GetWorkerIdFromSlot(slot);
newSlotMap[slot]._workerId = (ushort)workerId;
newSlotMap[slot]._state = SlotState.STABLE;
}
return new ClusterConfig(newSlotMap, workers);
}

/// <summary>
/// Update config epoch for worker in new version of config.
/// </summary>
Expand Down
12 changes: 8 additions & 4 deletions libs/cluster/Server/ClusterManagerSlotState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ public bool TryPrepareSlotsForOwnershipChange(HashSet<int> slots, string nodeid,
/// Reset slot state to <see cref="SlotState.STABLE"/>
/// </summary>
/// <param name="slot">Slot id to reset state</param>
public void ResetSlotState(int slot)
public void TryResetSlotState(int slot)
{
var current = currentConfig;
var slotState = current.GetState((ushort)slot);
Expand All @@ -445,12 +445,16 @@ public void ResetSlotState(int slot)
/// Reset local slot state to <see cref="SlotState.STABLE"/>
/// </summary>
/// <param name="slots">Slot list</param>
public void ResetSlotsState(HashSet<int> slots)
public void TryResetSlotState(HashSet<int> slots)
{
foreach (var slot in slots)
while (true)
{
ResetSlotState(slot);
var current = currentConfig;
var newConfig = currentConfig.ResetMultiSlotState(slots);
if (Interlocked.CompareExchange(ref currentConfig, newConfig, current) == current)
break;
}
FlushConfig();
}

/// <summary>
Expand Down
4 changes: 2 additions & 2 deletions libs/cluster/Server/Migration/MigrateSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ internal MigrateSession(
this._timeout = TimeSpan.FromMilliseconds(_timeout);
this._sslots = _slots;
this._slotRanges = GetRanges();
this._keys = keys == null ? new MigratingKeysWorkingSet() : keys;
this._keys = keys ?? new MigratingKeysWorkingSet();
this.transferOption = transferOption;

if (clusterProvider != null)
Expand Down Expand Up @@ -266,7 +266,7 @@ public bool TrySetSlotRanges(string nodeid, MigrateState state)
/// <summary>
/// Reset local slot state
/// </summary>
public void ResetLocalSlot() => clusterProvider.clusterManager.ResetSlotsState(_sslots);
public void ResetLocalSlot() => clusterProvider.clusterManager.TryResetSlotState(_sslots);

/// <summary>
/// Prepare remote node for importing
Expand Down
6 changes: 4 additions & 2 deletions libs/cluster/Server/Migration/MigrateSessionSlots.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ public bool MigrateSlotsDriver()
{
logger?.LogTrace("Initializing MainStore Iterator");
var storeTailAddress = clusterProvider.storeWrapper.store.Log.TailAddress;
MigrationKeyIterationFunctions.MainStoreGetKeysInSlots mainStoreGetKeysInSlots = new(this, _sslots, bufferSize: 1 << clusterProvider.serverOptions.PageSizeBits());
var bufferSize = 1 << clusterProvider.serverOptions.PageSizeBits();
MigrationKeyIterationFunctions.MainStoreGetKeysInSlots mainStoreGetKeysInSlots = new(this, _sslots, bufferSize: bufferSize);

logger?.LogTrace("Begin MainStore Iteration");
while (true)
Expand Down Expand Up @@ -50,7 +51,8 @@ public bool MigrateSlotsDriver()
{
logger?.LogTrace("Initializing ObjectStore Iterator");
var objectStoreTailAddress = clusterProvider.storeWrapper.objectStore.Log.TailAddress;
MigrationKeyIterationFunctions.ObjectStoreGetKeysInSlots objectStoreGetKeysInSlots = new(this, _sslots, bufferSize: 1 << clusterProvider.serverOptions.ObjectStorePageSizeBits());
var objectBufferSize = 1 << clusterProvider.serverOptions.ObjectStorePageSizeBits();
MigrationKeyIterationFunctions.ObjectStoreGetKeysInSlots objectStoreGetKeysInSlots = new(this, _sslots, bufferSize: objectBufferSize);

logger?.LogTrace("Begin ObjectStore Iteration");
while (true)
Expand Down
17 changes: 12 additions & 5 deletions libs/cluster/Session/RespClusterSlotManagementCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ private bool NetworkClusterSetSlot(out bool invalidParameters)
{
case SlotState.STABLE:
setSlotsSucceeded = true;
clusterProvider.clusterManager.ResetSlotState(slot);
clusterProvider.clusterManager.TryResetSlotState(slot);
break;
case SlotState.IMPORTING:
setSlotsSucceeded = clusterProvider.clusterManager.TryPrepareSlotForImport(slot, nodeId, out errorMessage);
Expand Down Expand Up @@ -524,21 +524,28 @@ private bool NetworkClusterSetSlotsRange(out bool invalidParameters)
var subcommand = parseState.GetString(0);

// Try parse slot state
if (!Enum.TryParse(subcommand, out SlotState slotState))
if (!Enum.TryParse(subcommand, ignoreCase: true, out SlotState slotState))
{
// Log error for invalid slot state option
logger?.LogError("The given input '{input}' is not a valid slot state option.", subcommand);
slotState = SlotState.INVALID;
}

if (slotState == SlotState.INVALID)
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SLOT_STATE, ref dcurr, dend))
SendAndReset();
return true;
}

// Extract nodeid for operations other than stable
if (slotState != SlotState.STABLE && slotState != SlotState.INVALID)
if (slotState is not SlotState.STABLE and not SlotState.INVALID)
{
nodeId = parseState.GetString(1);
}

// Try to parse slot ranges. The parsing may give errorMessage even if the TryParseSlots returns true.
var slotsParsed = TryParseSlots(2, out var slots, out var errorMessage, range: true);
var slotsParsed = TryParseSlots(slotState == SlotState.STABLE ? 1 : 2, out var slots, out var errorMessage, range: true);
if (!slotsParsed)
{
while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend))
Expand All @@ -552,7 +559,7 @@ private bool NetworkClusterSetSlotsRange(out bool invalidParameters)
{
case SlotState.STABLE:
setSlotsSucceeded = true;
clusterProvider.clusterManager.ResetSlotsState(slots);
clusterProvider.clusterManager.TryResetSlotState(slots);
break;
case SlotState.IMPORTING:
setSlotsSucceeded = clusterProvider.clusterManager.TryPrepareSlotsForImport(slots, nodeId, out errorMessage);
Expand Down
2 changes: 1 addition & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private void InitializeServer()

private void CreateMainStore(IClusterFactory clusterFactory, out string checkpointDir)
{
kvSettings = opts.GetSettings(this.loggerFactory?.CreateLogger("TsavoriteKV [main]"), out logFactory);
kvSettings = opts.GetSettings(loggerFactory, out logFactory);

checkpointDir = opts.CheckpointDir ?? opts.LogDir;

Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/Parser/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ public static bool IsDataCommand(this RespCommand cmd)
RespCommand.DBSIZE => false,
RespCommand.MEMORY_USAGE => false,
RespCommand.FLUSHDB => false,
RespCommand.FLUSHALL => false,
_ => cmd >= FirstReadCommand() && cmd <= LastWriteCommand()
};
}
Expand Down
13 changes: 10 additions & 3 deletions libs/server/Servers/GarnetServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,13 @@ public GarnetServerOptions(ILogger logger = null) : base(logger)
}

/// <summary>
/// Get KVSettings for the main store log
/// Get main store settings
/// </summary>
public KVSettings<SpanByte, SpanByte> GetSettings(ILogger logger, out INamedDeviceFactory logFactory)
/// <param name="loggerFactory">Logger factory for debugging and error tracing</param>
/// <param name="logFactory">Tsavorite Log factory instance</param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
public KVSettings<SpanByte, SpanByte> GetSettings(ILoggerFactory loggerFactory, out INamedDeviceFactory logFactory)
{
if (MutablePercent is < 10 or > 95)
throw new Exception("MutablePercent must be between 10 and 95");
Expand All @@ -382,8 +386,11 @@ public KVSettings<SpanByte, SpanByte> GetSettings(ILogger logger, out INamedDevi
IndexSize = indexCacheLines * 64L,
PreallocateLog = false,
MutableFraction = MutablePercent / 100.0,
PageSize = 1L << PageSizeBits()
PageSize = 1L << PageSizeBits(),
loggerFactory = loggerFactory,
logger = loggerFactory?.CreateLogger("TsavoriteKV [main]")
};

logger?.LogInformation("[Store] Using page size of {PageSize}", PrettySize(kvSettings.PageSize));

kvSettings.MemorySize = 1L << MemorySizeBits(MemorySize, PageSize, out var storeEmptyPageCount);
Expand Down
4 changes: 2 additions & 2 deletions libs/storage/Tsavorite/cs/src/core/Index/Common/KVSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ public sealed class KVSettings<TKey, TValue> : IDisposable
/// </summary>
public KVSettings() { }

internal readonly ILoggerFactory loggerFactory;
internal readonly ILogger logger;
public ILoggerFactory loggerFactory;
public ILogger logger;

/// <summary>
/// Create default configuration backed by local storage at given base directory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ internal void Initialize(int version, long size, int sector_size)
long aligned_size_bytes = sector_size +
((size_bytes + (sector_size - 1)) & ~(sector_size - 1));

logger?.LogTrace("KV Initialize size:{size}, sizeBytes:{sizeBytes} sectorSize:{sectorSize} alignedSizeBytes:{alignedSizeBytes}", size, size_bytes, sector_size, aligned_size_bytes);
//Over-allocate and align the table to the cacheline
state[version].size = size;
state[version].size_mask = size - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ public TsavoriteKVIterator(TsavoriteKV<TKey, TValue, TStoreFunctions, TAllocator
IndexSize = KVSettings<TKey, TValue>.SetIndexSizeFromCacheLines(store.IndexSize),
LogDevice = new NullDevice(),
ObjectLogDevice = new NullDevice(),
MutableFraction = 1
MutableFraction = 1,
loggerFactory = loggerFactory
};

tempKv = new TsavoriteKV<TKey, TValue, TStoreFunctions, TAllocator>(tempKVSettings, store.storeFunctions, store.allocatorFactory);
Expand Down
32 changes: 28 additions & 4 deletions playground/MigrateBench/MigrateRequest.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
using System.Diagnostics;
using System.Net;
using Garnet.client;
using Microsoft.Extensions.Logging;

Expand All @@ -27,6 +28,12 @@ public class MigrateRequest
GarnetClientSession sourceNode;
GarnetClientSession targetNode;

string targetNodeId;
string sourceNodeId;

IPEndPoint sourceNodeEndpoint;
IPEndPoint targetNodeEndpoint;

public MigrateRequest(Options opts, ILogger logger = null)
{
this.opts = opts;
Expand All @@ -49,6 +56,23 @@ public void Run()
{
sourceNode.Connect();
targetNode.Connect();

sourceNodeId = sourceNode.ExecuteAsync(["CLUSTER", "MYID"]).GetAwaiter().GetResult();
targetNodeId = targetNode.ExecuteAsync(["CLUSTER", "MYID"]).GetAwaiter().GetResult();

var endpoint = sourceNode.ExecuteAsync(["CLUSTER", "ENDPOINT", sourceNodeId]).GetAwaiter().GetResult();
if (!IPEndPoint.TryParse(endpoint, out sourceNodeEndpoint))
{
logger?.LogError("ERR Source Endpoint ({endpoint}) is not valid!", endpoint);
return;
}
endpoint = targetNode.ExecuteAsync(["CLUSTER", "ENDPOINT", targetNodeId]).GetAwaiter().GetResult();
if (!IPEndPoint.TryParse(endpoint, out targetNodeEndpoint))
{
logger?.LogError("ERR Target Endpoint ({endpoint}) is not valid!", endpoint);
return;
}

var sourceNodeKeys = dbsize(ref sourceNode);
var targetNodeKeys = dbsize(ref targetNode);
logger?.LogInformation("SourceNode: {endpoint} KeyCount: {keys}", opts.SourceEndpoint, sourceNodeKeys);
Expand Down Expand Up @@ -101,8 +125,8 @@ private void MigrateSlotRanges()
return;
}

// migrate 192.168.1.20 7001 "" 0 5000 SLOTSRANGE 1000 7000
ICollection<string> migrate = ["MIGRATE", targetAddress, targetPort.ToString(), "", "0", timeout.ToString(), "REPLACE", "SLOTSRANGE"];
// migrate 192.168.1.20 7001 "" 0 5000 SLOTSRANGE 1000 7000
ICollection<string> migrate = ["MIGRATE", targetNodeEndpoint.Address.ToString(), targetNodeEndpoint.Port.ToString(), "", "0", timeout.ToString(), "REPLACE", "SLOTSRANGE"];
foreach (var slot in slots)
migrate.Add(slot.ToString());

Expand Down Expand Up @@ -149,7 +173,7 @@ private void MigrateSlots()
_slots.Add(j);
}

ICollection<string> migrate = ["MIGRATE", targetAddress, targetPort.ToString(), "", "0", timeout.ToString(), "REPLACE", "SLOTS"];
ICollection<string> migrate = ["MIGRATE", targetNodeEndpoint.Address.ToString(), targetNodeEndpoint.Port.ToString(), "", "0", timeout.ToString(), "REPLACE", "SLOTS"];
foreach (var slot in _slots)
migrate.Add(slot.ToString());

Expand Down Expand Up @@ -218,7 +242,7 @@ private void MigrateKeys()
resp = sourceNode.ExecuteAsync(countkeysinslot).GetAwaiter().GetResult();
var keys = sourceNode.ExecuteForArrayAsync(getkeysinslot).GetAwaiter().GetResult();

ICollection<string> migrate = ["MIGRATE", targetAddress, targetPort.ToString(), "", "0", timeout.ToString(), "REPLACE", "KEYS"];
ICollection<string> migrate = ["MIGRATE", targetNodeEndpoint.Address.ToString(), targetNodeEndpoint.Port.ToString(), "", "0", timeout.ToString(), "REPLACE", "KEYS"];
foreach (var key in keys)
migrate.Add(key);

Expand Down

0 comments on commit 0d03980

Please sign in to comment.