Skip to content

Commit

Permalink
Consolidate Cluster Slot Verification (microsoft#508)
Browse files Browse the repository at this point in the history
* update redirect test to reduce execution time

* enhance unit test fail message

* add cluster slot validation immediately after parsing completes

* remove unused slot verification

* consolidate NetworMultiKeySlotVerify into CanServerSlot

* clean unused unit test properties

* skip MIGRATE for slot verification

* fix edge cases for centralized slot verification

* use parseState.count in main store commands

* check for null in test name

* first attempt using keyspecs

* generalized keyspecs use

* fix readwrite bugs

* remove unused code

* fix slot verification test

* add missing assert fail
  • Loading branch information
vazois authored Aug 18, 2024
1 parent b7fe600 commit 0528f10
Show file tree
Hide file tree
Showing 17 changed files with 824 additions and 399 deletions.
2 changes: 1 addition & 1 deletion libs/cluster/Session/ClusterSlotVerify.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ ClusterSlotVerificationResult SingleKeyReadWriteSlotVerify(ref ClusterConfig con
var state = config.GetState(_slot);

// Redirect r/w requests towards primary
if (config.LocalNodeRole == NodeRole.REPLICA)
if (config.LocalNodeRole == NodeRole.REPLICA && !readWriteSession)
return new(SlotVerifiedState.MOVED, _slot);

if (IsLocal)
Expand Down
23 changes: 0 additions & 23 deletions libs/server/Resp/ArrayCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ private bool NetworkMGET<TGarnetApi>(ref TGarnetApi storageApi)

SpanByte input = default;

if (NetworkMultiKeySlotVerify(readOnly: true))
return true;

while (!RespWriteUtils.WriteArrayLength(parseState.count, ref dcurr, dend))
SendAndReset();

Expand Down Expand Up @@ -66,11 +63,6 @@ private bool NetworkMGET_SG<TGarnetApi>(ref TGarnetApi storageApi)
SpanByte input = default;
long ctx = default;

if (NetworkMultiKeySlotVerify(readOnly: true))
{
return true;
}

int firstPending = -1;
(GarnetStatus, SpanByteAndMemory)[] outputArr = null;

Expand Down Expand Up @@ -170,11 +162,6 @@ private bool NetworkMSET<TGarnetApi>(ref TGarnetApi storageApi)
{
Debug.Assert(parseState.count % 2 == 0);

if (NetworkMultiKeySlotVerify(readOnly: false, step: 2))
{
return true;
}

for (int c = 0; c < parseState.count; c += 2)
{
var key = parseState.GetArgSliceByRef(c).SpanByte;
Expand All @@ -192,11 +179,6 @@ private bool NetworkMSET<TGarnetApi>(ref TGarnetApi storageApi)
private bool NetworkMSETNX<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (NetworkMultiKeySlotVerify(readOnly: false))
{
return true;
}

byte* hPtr = stackalloc byte[RespInputHeader.Size];

bool anyValuesSet = false;
Expand Down Expand Up @@ -245,11 +227,6 @@ private bool NetworkMSETNX<TGarnetApi>(ref TGarnetApi storageApi)
private bool NetworkDEL<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
if (NetworkMultiKeySlotVerify(readOnly: false))
{
return true;
}

int keysDeleted = 0;
for (int c = 0; c < parseState.count; c++)
{
Expand Down
40 changes: 2 additions & 38 deletions libs/server/Resp/BasicCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ bool NetworkGET<TGarnetApi>(ref TGarnetApi storageApi)
return NetworkGETAsync(ref storageApi);

var key = parseState.GetArgSliceByRef(0).SpanByte;
if (NetworkMultiKeySlotVerify(readOnly: true))
return true;
var o = new SpanByteAndMemory(dcurr, (int)(dend - dcurr));
SpanByte input = default;
var status = storageApi.GET(ref key, ref input, ref o);
Expand Down Expand Up @@ -61,9 +59,6 @@ bool NetworkGETAsync<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
var key = parseState.GetArgSliceByRef(0).SpanByte;
if (NetworkMultiKeySlotVerify(readOnly: true))
return true;

// Optimistically ask storage to write output to network buffer
var o = new SpanByteAndMemory(dcurr, (int)(dend - dcurr));

Expand Down Expand Up @@ -116,10 +111,6 @@ bool NetworkGET_SG<TGarnetApi>(ref TGarnetApi storageApi)
if (c > 0 && !ParseGETAndKey(ref key))
break;

// Cluster verification
if (NetworkMultiKeySlotVerify(readOnly: true))
continue;

// Store index in context, since completions are not in order
ctx = firstPending == -1 ? 0 : c - firstPending;

Expand Down Expand Up @@ -268,9 +259,6 @@ private bool NetworkSET<TGarnetApi>(ref TGarnetApi storageApi)
var key = parseState.GetArgSliceByRef(0).SpanByte;
var value = parseState.GetArgSliceByRef(1).SpanByte;

if (NetworkMultiKeySlotVerify(readOnly: false, firstKey: 0, lastKey: -2))
return true;

var status = storageApi.SET(ref key, ref value);

while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
Expand All @@ -287,9 +275,6 @@ private bool NetworkSetRange<TGarnetApi>(ref TGarnetApi storageApi)
{
var key = parseState.GetArgSliceByRef(0);

if (NetworkMultiKeySlotVerify(readOnly: false, firstKey: 0, lastKey: 0))
return true;

if (!parseState.TryGetInt(1, out var offset))
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER, ref dcurr, dend))
Expand Down Expand Up @@ -323,8 +308,6 @@ private bool NetworkGetRange<TGarnetApi>(ref TGarnetApi storageApi)
var key = parseState.GetArgSliceByRef(0);
var sbKey = key.SpanByte;

if (NetworkMultiKeySlotVerify(readOnly: true, firstKey: 0, lastKey: 0))
return true;

if (!parseState.TryGetInt(1, out var sliceStart) || !parseState.TryGetInt(2, out var sliceLength))
{
Expand Down Expand Up @@ -367,9 +350,6 @@ private bool NetworkSETEX<TGarnetApi>(bool highPrecision, ref TGarnetApi storage
{
var key = parseState.GetArgSliceByRef(0).SpanByte;

if (NetworkMultiKeySlotVerify(readOnly: false, firstKey: 0, lastKey: 1))
return true;

if (!parseState.TryGetInt(1, out var expiry))
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER, ref dcurr, dend))
Expand All @@ -385,7 +365,6 @@ private bool NetworkSETEX<TGarnetApi>(bool highPrecision, ref TGarnetApi storage
}

var val = parseState.GetArgSliceByRef(2).SpanByte;

var valPtr = val.ToPointer() - (sizeof(int) + sizeof(long));
var vSize = val.Length;

Expand Down Expand Up @@ -431,7 +410,7 @@ enum ExistOptions : byte
/// <summary>
/// SET EX NX
/// </summary>
private bool NetworkSETEXNX<TGarnetApi>(int count, ref TGarnetApi storageApi)
private bool NetworkSETEXNX<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
var key = parseState.GetArgSliceByRef(0);
Expand All @@ -449,7 +428,7 @@ private bool NetworkSETEXNX<TGarnetApi>(int count, ref TGarnetApi storageApi)
var tokenIdx = 2;
Span<byte> nextOpt = default;
var optUpperCased = false;
while (tokenIdx < count || optUpperCased)
while (tokenIdx < parseState.count || optUpperCased)
{
if (!optUpperCased)
{
Expand Down Expand Up @@ -556,11 +535,6 @@ private bool NetworkSETEXNX<TGarnetApi>(int count, ref TGarnetApi storageApi)
return true;
}

if (NetworkMultiKeySlotVerify(readOnly: false, firstKey: 0, lastKey: 0))
{
return true;
}

// Make space for key header
var keyPtr = sbKey.ToPointer() - sizeof(int);

Expand Down Expand Up @@ -785,8 +759,6 @@ private bool NetworkIncrement<TGarnetApi>(RespCommand cmd, ref TGarnetApi storag
input = new ArgSlice(valPtr, vSize);
}

if (NetworkMultiKeySlotVerify(readOnly: false, firstKey: 0, lastKey: 0))
return true;
Span<byte> outputBuffer = stackalloc byte[NumUtils.MaximumFormatInt64Length + 1];
var output = ArgSlice.FromPinnedSpan(outputBuffer);

Expand Down Expand Up @@ -820,10 +792,6 @@ private bool NetworkAppend<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
var sbKey = parseState.GetArgSliceByRef(0).SpanByte;

if (NetworkMultiKeySlotVerify(readOnly: false, firstKey: 0, lastKey: 0))
return true;

var sbVal = parseState.GetArgSliceByRef(1).SpanByte;

var keyPtr = sbKey.ToPointer() - sizeof(int);
Expand Down Expand Up @@ -959,10 +927,6 @@ private bool NetworkSTRLEN<TGarnetApi>(ref TGarnetApi storageApi)

//STRLEN key
var key = parseState.GetArgSliceByRef(0);

if (NetworkMultiKeySlotVerify(readOnly: true, firstKey: 0, lastKey: 0))
return true;

var status = storageApi.GET(key, out var value);

switch (status)
Expand Down
Loading

0 comments on commit 0528f10

Please sign in to comment.