Skip to content

Commit

Permalink
simplify config merging and add testing to detect divergence
Browse files Browse the repository at this point in the history
  • Loading branch information
vazois committed Oct 23, 2024
1 parent abce412 commit 84124b3
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 70 deletions.
141 changes: 83 additions & 58 deletions libs/cluster/Server/ClusterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#pragma warning restore IDE0005
using System.Runtime.CompilerServices;
using System.Diagnostics;
using Microsoft.Extensions.Logging;

namespace Garnet.cluster
{
Expand Down Expand Up @@ -873,94 +874,108 @@ public void LazyUpdateLocalReplicationOffset(long newReplicationOffset)
/// <summary>
/// Merging incoming configuration from gossip with local configuration copy.
/// </summary>
/// <param name="other">Incoming config object.</param>
/// <param name="senderConfig">Sender config object.</param>
/// <param name="workerBanList">Worker ban list used to prevent merging.</param>
/// <returns>Cluster config object.</returns>
public ClusterConfig Merge(ClusterConfig other, ConcurrentDictionary<string, long> workerBanList)
public ClusterConfig Merge(ClusterConfig senderConfig, ConcurrentDictionary<string, long> workerBanList)
{
var localId = LocalNodeId;
var newConfig = this;
for (ushort i = 1; i < other.NumWorkers + 1; i++)
for (ushort i = 1; i < senderConfig.NumWorkers + 1; i++)
{
//Do not update local node config
if (localId.Equals(other.workers[i].Nodeid, StringComparison.OrdinalIgnoreCase))
// Do not update local node config
if (localId.Equals(senderConfig.workers[i].Nodeid, StringComparison.OrdinalIgnoreCase))
continue;
//Skip any nodes scheduled for deletion
if (workerBanList.ContainsKey(other.workers[i].Nodeid))

// Skip any nodes scheduled for deletion
if (workerBanList.ContainsKey(senderConfig.workers[i].Nodeid))
continue;

newConfig = newConfig.InPlaceUpdateWorker(
other.workers[i].Nodeid,
other.workers[i].Address,
other.workers[i].Port,
other.workers[i].ConfigEpoch,
other.workers[i].Role,
other.workers[i].ReplicaOfNodeId,
other.workers[i].hostname,
other.slotMap,
i);
newConfig = newConfig.MergeWorkerInfo(senderConfig.workers[i]);
}
return newConfig;

return newConfig.MergeSlotMap(senderConfig);
}

private ClusterConfig InPlaceUpdateWorker(
string nodeId,
string address,
int port,
long configEpoch,
NodeRole role,
string replicaOfNodeId,
string hostname,
HashSlot[] inSlotMap,
ushort inWorkerId)
private ClusterConfig MergeWorkerInfo(Worker worker)
{
ushort workerId = 0;
// Find workerId offset from my local configuration
for (var i = 1; i < workers.Length; i++)
{
if (workers[i].Nodeid.Equals(nodeId, StringComparison.OrdinalIgnoreCase))
if (workers[i].Nodeid.Equals(worker.Nodeid, StringComparison.OrdinalIgnoreCase))
{
// Skip update if received config is smaller or equal than local worker epoch
// Update only if received config epoch is strictly greater
if (configEpoch <= workers[i].ConfigEpoch) return this;
if (worker.ConfigEpoch <= workers[i].ConfigEpoch) return this;
workerId = (ushort)i;
break;
}
}

// If we reached this point we can update the worker metadata
var newWorkers = workers;
// Check if we need to add worker to the known workers list
if (workerId == 0)
{
newWorkers = new Worker[workers.Length + 1];
workerId = (ushort)workers.Length;
Array.Copy(workers, newWorkers, workers.Length);
}
newWorkers[workerId].Address = address;
newWorkers[workerId].Port = port;
newWorkers[workerId].Nodeid = nodeId;
newWorkers[workerId].ConfigEpoch = configEpoch;
newWorkers[workerId].Role = role;
newWorkers[workerId].ReplicaOfNodeId = replicaOfNodeId;
newWorkers[workerId].hostname = hostname;

// Insert or update worker information
newWorkers[workerId].Address = worker.Address;
newWorkers[workerId].Port = worker.Port;
newWorkers[workerId].Nodeid = worker.Nodeid;
newWorkers[workerId].ConfigEpoch = worker.ConfigEpoch;
newWorkers[workerId].Role = worker.Role;
newWorkers[workerId].ReplicaOfNodeId = worker.ReplicaOfNodeId;
newWorkers[workerId].hostname = worker.hostname;

return new(slotMap, newWorkers);
}

public ClusterConfig MergeSlotMap(ClusterConfig senderConfig)
{
var senderSlotMap = senderConfig.slotMap;
var senderWorkerId = GetWorkerIdFromNodeId(senderConfig.LocalNodeId);

// Create a copy of the local slotMap
var newSlotMap = new HashSlot[MAX_HASH_SLOT_VALUE];
Array.Copy(slotMap, newSlotMap, inSlotMap.Length);
Array.Copy(slotMap, newSlotMap, senderSlotMap.Length);
for (var i = 0; i < MAX_HASH_SLOT_VALUE; i++)
{
var ownerId = newSlotMap[i].workerId;
// Update slot ownership if all of the below hold
// 1. Slot owned by the inWorker and is in stable state
// 2. ConfigEpoch of current owner is less than ConfigEpoch of node claiming ownership
if (inSlotMap[i].workerId == inWorkerId && inSlotMap[i]._state == SlotState.STABLE && newWorkers[ownerId].ConfigEpoch < configEpoch)
var currentOwnerId = newSlotMap[i].workerId;

// Process this slot information when it is in stable state
if (senderSlotMap[i]._state != SlotState.STABLE)
continue;

// Process this slot information when sender is claimant of this slot
if (senderSlotMap[i]._workerId != 1)
{
newSlotMap[i]._workerId = workerId;
newSlotMap[i]._state = SlotState.STABLE;
var currentOwnerNodeId = workers[currentOwnerId].Nodeid;
// Sender does not own node but local node believe it does
// This can happen if epoch collision occurred at the sender and its epoch got bumped,
// in that case slot state should be set to offline to give the opportunity to the actual owner to claim the slot.
// Otherwise the sender will falsely remain the owner and its epoch will be greater than that of the new owner and the new owner
// will not be able to claim the slot without outside intervention
if (currentOwnerNodeId != null && currentOwnerNodeId.Equals(senderConfig.LocalNodeId, StringComparison.OrdinalIgnoreCase))
{
newSlotMap[i]._workerId = 0;
newSlotMap[i]._state = SlotState.OFFLINE;
}
continue;
}

// Process this slot information when config epoch of original owner is greater than config epoch of sender
if (workers[currentOwnerId].ConfigEpoch >= senderConfig.LocalNodeConfigEpoch)
continue;

// Update ownership of node
newSlotMap[i]._workerId = senderWorkerId;
newSlotMap[i]._state = SlotState.STABLE;
}

return new ClusterConfig(newSlotMap, newWorkers);
return new(newSlotMap, workers);
}

/// <summary>
Expand Down Expand Up @@ -1225,24 +1240,34 @@ public ClusterConfig BumpLocalNodeConfigEpoch()
/// <summary>
/// Check if sender has same local worker epoch as the receiver node and resolve collision.
/// </summary>
/// <param name="other">Incoming configuration object.</param>
/// <param name="senderConfig">Incoming configuration object.</param>
/// <returns>ClusterConfig object with updates.</returns>
public ClusterConfig HandleConfigEpochCollision(ClusterConfig other)
public ClusterConfig HandleConfigEpochCollision(ClusterConfig senderConfig, ILogger logger = null)
{
var localNodeConfigEpoch = LocalNodeConfigEpoch;
var senderConfigEpoch = senderConfig.LocalNodeConfigEpoch;

// If incoming config epoch different than local don't need to do anything
if (LocalNodeConfigEpoch != other.LocalNodeConfigEpoch || !IsPrimary || !other.IsPrimary)
if (localNodeConfigEpoch != senderConfigEpoch || !IsPrimary || !senderConfig.IsPrimary)
return this;

var remoteNodeId = other.LocalNodeId;
var senderNodeId = senderConfig.LocalNodeId;
var localNodeId = LocalNodeId;

// If remoteNodeId is lesser than localNodeId do nothing
if (remoteNodeId.CompareTo(localNodeId) <= 0) return this;

var newWorkers = new Worker[workers.Length];
Array.Copy(workers, newWorkers, workers.Length);
newWorkers[1].ConfigEpoch++;
return new ClusterConfig(slotMap, newWorkers);
if (senderNodeId.CompareTo(localNodeId) <= 0) return this;

logger?.LogWarning("Epoch Collision {localNodeConfigEpoch} <> {senderConfigEpoch} [{LocalNodeIp}:{LocalNodePort},{localNodeId}] [{senderIp}:{senderPort},{senderNodeId}]",
localNodeConfigEpoch,
senderConfigEpoch,
LocalNodeIp,
LocalNodePort,
LocalNodeId.Substring(0, 8),
senderConfig.LocalNodeIp,
senderConfig.LocalNodePort,
senderConfig.LocalNodeId.Substring(0, 8));

return BumpLocalNodeConfigEpoch();
}
}
}
5 changes: 2 additions & 3 deletions libs/cluster/Server/ClusterManagerSlotState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ public bool TryPrepareSlotForOwnershipChange(int slot, string nodeid, out ReadOn
if (Interlocked.CompareExchange(ref currentConfig, newConfig, current) == current)
break;
}
logger?.LogWarning("Bumped Epoch ({LocalNodeConfigEpoch}) [{LocalIp}:{LocalPort},{LocalNodeId}]", currentConfig.LocalNodeConfigEpoch, currentConfig.LocalNodeIp, currentConfig.LocalNodePort, currentConfig.LocalNodeId);
FlushConfig();
logger?.LogTrace("[Processed] SetSlot NODE {slot} IMPORTED TO {nodeid}", slot, nodeid);
return true;
}
return true;
Expand Down Expand Up @@ -411,9 +411,8 @@ public bool TryPrepareSlotsForOwnershipChange(HashSet<int> slots, string nodeid,
if (Interlocked.CompareExchange(ref currentConfig, newConfig, current) == current)
break;
}

logger?.LogWarning("Bumped Epoch ({LocalNodeConfigEpoch}) [{LocalIp}:{LocalPort},{LocalNodeId}]", currentConfig.LocalNodeConfigEpoch, currentConfig.LocalNodeIp, currentConfig.LocalNodePort, currentConfig.LocalNodeId.Substring(0, 8));
FlushConfig();
logger?.LogTrace("[Processed] SetSlotsRange {slot} IMPORTED TO {endpoint}", GetRange([.. slots]), nodeid);
return true;
}

Expand Down
8 changes: 4 additions & 4 deletions libs/cluster/Server/Gossip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,22 @@ void TryStartGossipTasks()
/// <summary>
/// Merge incoming config to evolve local version
/// </summary>
public bool TryMerge(ClusterConfig other)
public bool TryMerge(ClusterConfig senderConfig)
{
try
{
activeMergeLock.ReadLock();
if (workerBanList.ContainsKey(other.LocalNodeId))
if (workerBanList.ContainsKey(senderConfig.LocalNodeId))
{
logger?.LogTrace("Cannot merge node <{nodeid}> because still in ban list", other.LocalNodeId);
logger?.LogTrace("Cannot merge node <{nodeid}> because still in ban list", senderConfig.LocalNodeId);
return false;
}

while (true)
{
var current = currentConfig;
var currentCopy = current.Copy();
var next = currentCopy.Merge(other, workerBanList).HandleConfigEpochCollision(other);
var next = currentCopy.Merge(senderConfig, workerBanList).HandleConfigEpochCollision(senderConfig, logger);
if (currentCopy == next) return false;
if (Interlocked.CompareExchange(ref currentConfig, next, current) == current)
break;
Expand Down
Loading

0 comments on commit 84124b3

Please sign in to comment.