Skip to content

Commit

Permalink
Improvements in command handling #2 (microsoft#486)
Browse files Browse the repository at this point in the history
* partial checkin

* nit

* nit

* wip

* wip

* Fixing non-determinism + refactoring HashGet

* dotnet format

* Fixed some API calls

* dotnet format

* wip

* small fix

* Removing unused method

* wip - refactoring ProcessAdminCommands

* Undoing changes to RandomUtils

* Continued refactoring of AdminCommands

* Added "TryGetInt" and "TryGetLong" to parse state api

* dotnet format

* wip

* format

* wip

* wip

* wip

* cleanup

* wip

* wip

* format

* wip

* cont

* bugfix

* bugfix

* Few small fixes

* format

* bugfix

* Added range validation to SessionParseState read methods

---------

Co-authored-by: Badrish Chandramouli <badrishc@microsoft.com>
  • Loading branch information
TalZaccai and badrishc authored Jul 19, 2024
1 parent 7ce336a commit 35d99dd
Show file tree
Hide file tree
Showing 32 changed files with 3,166 additions and 3,269 deletions.
54 changes: 54 additions & 0 deletions benchmark/BDN.benchmark/Resp/RespParseStress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ public unsafe class RespParseStress
byte[] zAddRemRequestBuffer;
byte* zAddRemRequestBufferPointer;

static ReadOnlySpan<byte> LPUSHPOP => "*3\r\n$5\r\nLPUSH\r\n$1\r\nd\r\n$1\r\ne\r\n*2\r\n$4\r\nLPOP\r\n$1\r\nd\r\n"u8;
byte[] lPushPopRequestBuffer;
byte* lPushPopRequestBufferPointer;

static ReadOnlySpan<byte> SADDREM => "*3\r\n$4\r\nSADD\r\n$1\r\ne\r\n$1\r\na\r\n*3\r\n$4\r\nSREM\r\n$1\r\ne\r\n$1\r\na\r\n"u8;
byte[] sAddRemRequestBuffer;
byte* sAddRemRequestBufferPointer;

static ReadOnlySpan<byte> HSETDEL => "*4\r\n$4\r\nHSET\r\n$1\r\nf\r\n$1\r\na\r\n$1\r\na\r\n*3\r\n$4\r\nHDEL\r\n$1\r\nf\r\n$1\r\na\r\n"u8;
byte[] hSetDelRequestBuffer;
byte* hSetDelRequestBufferPointer;

[GlobalSetup]
public void GlobalSetup()
{
Expand Down Expand Up @@ -74,8 +86,32 @@ public void GlobalSetup()
for (int i = 0; i < batchSize; i++)
ZADDREM.CopyTo(new Span<byte>(zAddRemRequestBuffer).Slice(i * ZADDREM.Length));

lPushPopRequestBuffer = GC.AllocateArray<byte>(LPUSHPOP.Length * batchSize, pinned: true);
lPushPopRequestBufferPointer = (byte*)Unsafe.AsPointer(ref lPushPopRequestBuffer[0]);
for (int i = 0; i < batchSize; i++)
LPUSHPOP.CopyTo(new Span<byte>(lPushPopRequestBuffer).Slice(i * LPUSHPOP.Length));

sAddRemRequestBuffer = GC.AllocateArray<byte>(SADDREM.Length * batchSize, pinned: true);
sAddRemRequestBufferPointer = (byte*)Unsafe.AsPointer(ref sAddRemRequestBuffer[0]);
for (int i = 0; i < batchSize; i++)
SADDREM.CopyTo(new Span<byte>(sAddRemRequestBuffer).Slice(i * SADDREM.Length));

hSetDelRequestBuffer = GC.AllocateArray<byte>(HSETDEL.Length * batchSize, pinned: true);
hSetDelRequestBufferPointer = (byte*)Unsafe.AsPointer(ref hSetDelRequestBuffer[0]);
for (int i = 0; i < batchSize; i++)
HSETDEL.CopyTo(new Span<byte>(hSetDelRequestBuffer).Slice(i * HSETDEL.Length));

// Pre-populate sorted set with a single element to avoid repeatedly emptying it during the benchmark
SlowConsumeMessage("*4\r\n$4\r\nZADD\r\n$1\r\nc\r\n$1\r\n1\r\n$1\r\nd\r\n"u8);

// Pre-populate list with a single element to avoid repeatedly emptying it during the benchmark
SlowConsumeMessage("*3\r\n$4\r\nLPUSH\r\n$1\r\nd\r\n$1\r\nf\r\n"u8);

// Pre-populate set with a single element to avoid repeatedly emptying it during the benchmark
SlowConsumeMessage("*3\r\n$4\r\nSADD\r\n$1\r\ne\r\n$1\r\nb\r\n"u8);

// Pre-populate hash with a single element to avoid repeatedly emptying it during the benchmark
SlowConsumeMessage("*3\r\n$4\r\nHSET\r\n$1\r\nf\r\n$1\r\nb\r\n$1\r\nb\r\n"u8);
}

[GlobalCleanup]
Expand Down Expand Up @@ -115,6 +151,24 @@ public void ZAddRem()
_ = session.TryConsumeMessages(zAddRemRequestBufferPointer, zAddRemRequestBuffer.Length);
}

[Benchmark]
public void LPushPop()
{
_ = session.TryConsumeMessages(lPushPopRequestBufferPointer, lPushPopRequestBuffer.Length);
}

[Benchmark]
public void SAddRem()
{
_ = session.TryConsumeMessages(sAddRemRequestBufferPointer, sAddRemRequestBuffer.Length);
}

[Benchmark]
public void HSetDel()
{
_ = session.TryConsumeMessages(hSetDelRequestBufferPointer, hSetDelRequestBuffer.Length);
}

private void SlowConsumeMessage(ReadOnlySpan<byte> message)
{
var buffer = GC.AllocateArray<byte>(message.Length, pinned: true);
Expand Down
2 changes: 1 addition & 1 deletion libs/common/AsciiUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static void ToUpperInPlace(Span<byte> command)

/// <inheritdoc cref="EqualsUpperCaseSpanIgnoringCase(ReadOnlySpan{byte}, ReadOnlySpan{byte})"/>
public static bool EqualsUpperCaseSpanIgnoringCase(this Span<byte> left, ReadOnlySpan<byte> right)
=> EqualsUpperCaseSpanIgnoringCase(left, right);
=> EqualsUpperCaseSpanIgnoringCase((ReadOnlySpan<byte>)left, right);

/// <summary>
/// Check if two byte spans are equal, where right is an all-upper-case span, ignoring case if there are ASCII bytes.
Expand Down
2 changes: 1 addition & 1 deletion libs/common/RespReadUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private static bool TryReadUlong(ref byte* ptr, byte* end, out ulong value, out
/// a valid integer or the end of the string was reached before finishing parsing.
/// </returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static bool TryReadLong(ref byte* ptr, byte* end, out long value, out ulong bytesRead)
public static bool TryReadLong(ref byte* ptr, byte* end, out long value, out ulong bytesRead)
{
bytesRead = 0;
value = 0;
Expand Down
41 changes: 19 additions & 22 deletions libs/server/Custom/CustomRespCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,33 +63,33 @@ public bool RunTransactionProc(byte id, ArgSlice input, ref MemoryResult<byte> o
private bool TryCustomCommand<TGarnetApi>(byte* ptr, byte* end, RespCommand cmd, long expirationTicks, CommandType type, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetAdvancedApi
{
byte* keyPtr = null, inputPtr = null;
int ksize = 0, isize = 0;
var sbKey = parseState.GetArgSliceByRef(0).SpanByte;
var keyPtr = sbKey.ToPointer();
var kSize = sbKey.Length;

if (!RespReadUtils.ReadPtrWithLengthHeader(ref keyPtr, ref ksize, ref ptr, recvBufferPtr + bytesRead))
return false;
ptr = keyPtr + kSize + 2;

int metadataSize = 8;
var metadataSize = 8;
if (expirationTicks == 0) metadataSize = 0;

// Move key back if needed
if (metadataSize > 0)
{
Buffer.MemoryCopy(keyPtr, keyPtr - metadataSize, ksize, ksize);
Buffer.MemoryCopy(keyPtr, keyPtr - metadataSize, kSize, kSize);
keyPtr -= metadataSize;
}

// write key header size
keyPtr -= sizeof(int);
*(int*)keyPtr = ksize;
*(int*)keyPtr = kSize;

inputPtr = ptr;
isize = (int)(end - ptr);
var inputPtr = ptr;
var iSize = (int)(end - ptr);

inputPtr -= RespInputHeader.Size; // input header
inputPtr -= metadataSize; // metadata header

var input = new SpanByte(metadataSize + RespInputHeader.Size + isize, (nint)inputPtr);
var input = new SpanByte(metadataSize + RespInputHeader.Size + iSize, (nint)inputPtr);

((RespInputHeader*)(inputPtr + metadataSize))->cmd = cmd;
((RespInputHeader*)(inputPtr + metadataSize))->flags = 0;
Expand All @@ -99,7 +99,7 @@ private bool TryCustomCommand<TGarnetApi>(byte* ptr, byte* end, RespCommand cmd,
else if (expirationTicks > 0)
input.ExtraMetadata = DateTimeOffset.UtcNow.Ticks + expirationTicks;

SpanByteAndMemory output = new SpanByteAndMemory(null);
var output = new SpanByteAndMemory(null);
GarnetStatus status;
if (type == CommandType.ReadModifyWrite)
{
Expand Down Expand Up @@ -142,27 +142,24 @@ private bool TryCustomCommand<TGarnetApi>(byte* ptr, byte* end, RespCommand cmd,
private bool TryCustomObjectCommand<TGarnetApi>(byte* ptr, byte* end, RespCommand cmd, byte subid, CommandType type, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetAdvancedApi
{
byte* keyPtr = null, inputPtr = null;
int ksize = 0, isize = 0;
var sbKey = parseState.GetArgSliceByRef(0).SpanByte;
var keyBytes = sbKey.ToByteArray();

if (!RespReadUtils.ReadPtrWithLengthHeader(ref keyPtr, ref ksize, ref ptr, recvBufferPtr + bytesRead))
return false;
ptr = sbKey.ToPointer() + sbKey.Length + 2;

byte[] key = new Span<byte>(keyPtr, ksize).ToArray();

inputPtr = ptr;
isize = (int)(end - ptr);
var inputPtr = ptr;
var iSize = (int)(end - ptr);
inputPtr -= sizeof(int);
inputPtr -= RespInputHeader.Size;
*(int*)inputPtr = RespInputHeader.Size + isize;
*(int*)inputPtr = RespInputHeader.Size + iSize;
((RespInputHeader*)(inputPtr + sizeof(int)))->cmd = cmd;
((RespInputHeader*)(inputPtr + sizeof(int)))->SubId = subid;

var output = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(null) };
GarnetStatus status;
if (type == CommandType.ReadModifyWrite)
{
status = storageApi.RMW_ObjectStore(ref key, ref Unsafe.AsRef<SpanByte>(inputPtr), ref output);
status = storageApi.RMW_ObjectStore(ref keyBytes, ref Unsafe.AsRef<SpanByte>(inputPtr), ref output);
Debug.Assert(!output.spanByteAndMemory.IsSpanByte);

switch (status)
Expand All @@ -182,7 +179,7 @@ private bool TryCustomObjectCommand<TGarnetApi>(byte* ptr, byte* end, RespComman
}
else
{
status = storageApi.Read_ObjectStore(ref key, ref Unsafe.AsRef<SpanByte>(inputPtr), ref output);
status = storageApi.Read_ObjectStore(ref keyBytes, ref Unsafe.AsRef<SpanByte>(inputPtr), ref output);
Debug.Assert(!output.spanByteAndMemory.IsSpanByte);

switch (status)
Expand Down
8 changes: 2 additions & 6 deletions libs/server/Metrics/Info/InfoCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Garnet.server
{
internal sealed unsafe partial class RespServerSession : ServerSessionBase
{
private bool ProcessInfoCommand(int count)
private bool NetworkINFO(int count)
{
HashSet<InfoMetricsType> sections = null;
bool invalid = false;
Expand All @@ -19,14 +19,11 @@ private bool ProcessInfoCommand(int count)
string invalidSection = null;
if (count > 0)
{
var ptr = recvBufferPtr + readHead;
sections = new HashSet<InfoMetricsType>();
for (int i = 0; i < count; i++)
{
if (!RespReadUtils.ReadStringWithLengthHeader(out var section, ref ptr, recvBufferPtr + bytesRead))
return false;
var section = parseState.GetString(i).ToUpper();

section = section.ToUpper();
switch (section)
{
case InfoHelp.RESET:
Expand All @@ -50,7 +47,6 @@ private bool ProcessInfoCommand(int count)
break;
}
}
readHead = (int)(ptr - recvBufferPtr);
}

if (invalid)
Expand Down
14 changes: 2 additions & 12 deletions libs/server/Metrics/Latency/RespLatencyCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ private bool NetworkLatencyHelp(int count)
SendAndReset();
}

var ptr = recvBufferPtr + readHead;
readHead = (int)(ptr - recvBufferPtr);
List<string> latencyCommands = RespLatencyHelp.GetLatencyCommands();
while (!RespWriteUtils.WriteArrayLength(latencyCommands.Count, ref dcurr, dend))
SendAndReset();
Expand All @@ -46,7 +44,6 @@ private bool NetworkLatencyHelp(int count)
/// <returns>true if parsing succeeded correctly, false if not all tokens could be consumed and further processing is necessary.</returns>
private bool NetworkLatencyHistogram(int count)
{
var ptr = recvBufferPtr + readHead;
HashSet<LatencyMetricsType> events = null;
bool invalid = false;
string invalidEvent = null;
Expand All @@ -55,8 +52,7 @@ private bool NetworkLatencyHistogram(int count)
events = new();
for (int i = 0; i < count; i++)
{
if (!RespReadUtils.ReadStringWithLengthHeader(out var eventStr, ref ptr, recvBufferPtr + bytesRead))
return false;
var eventStr = parseState.GetString(i);

if (Enum.TryParse(eventStr, ignoreCase: true, out LatencyMetricsType eventType))
{
Expand Down Expand Up @@ -87,8 +83,6 @@ private bool NetworkLatencyHistogram(int count)
SendAndReset();
}

readHead = (int)(ptr - recvBufferPtr);

return true;
}

Expand All @@ -100,16 +94,14 @@ private bool NetworkLatencyHistogram(int count)
private bool NetworkLatencyReset(int count)
{
HashSet<LatencyMetricsType> events = null;
var ptr = recvBufferPtr + readHead;
bool invalid = false;
string invalidEvent = null;
if (count > 0)
{
events = new();
for (int i = 0; i < count; i++)
{
if (!RespReadUtils.ReadStringWithLengthHeader(out var eventStr, ref ptr, recvBufferPtr + bytesRead))
return false;
var eventStr = parseState.GetString(i);

if (Enum.TryParse(eventStr, ignoreCase: true, out LatencyMetricsType eventType))
{
Expand Down Expand Up @@ -144,8 +136,6 @@ private bool NetworkLatencyReset(int count)
SendAndReset();
}

readHead = (int)(ptr - recvBufferPtr);

return true;
}
}
Expand Down
70 changes: 27 additions & 43 deletions libs/server/Objects/SortedSet/SortedSetObjectImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -789,67 +789,51 @@ private void GetRank(byte* input, int length, ref SpanByteAndMemory output, bool
byte* ptr = output.SpanByte.ToPointer();
var curr = ptr;
var end = curr + output.Length;
var error = false;

ObjectOutputHeader _output = default;
try
{
if (!RespReadUtils.ReadByteArrayWithLengthHeader(out var member, ref input_currptr, input + length))
return;

if (_input->arg1 == 3) // ZRANK key member WITHSCORE
if (_input->arg2 == 1) // ZRANK key member WITHSCORE
{
if (!RespReadUtils.TrySliceWithLengthHeader(out var token, ref input_currptr, input + length))
return;
withScore = true;
}

if (token.EqualsUpperCaseSpanIgnoringCase("WITHSCORE"u8))
if (!sortedSetDict.TryGetValue(member, out var score))
{
while (!RespWriteUtils.WriteNull(ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
}
else
{
var rank = 0;
foreach (var item in sortedSet)
{
withScore = true;
if (item.Item2.SequenceEqual(member))
break;
rank++;
}
else

if (!ascending)
rank = sortedSet.Count - rank - 1;

if (withScore)
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR, ref curr, end))
while (!RespWriteUtils.WriteArrayLength(2, ref curr, end)) // rank and score
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
error = true;
}
}

if (!error)
{
if (!sortedSetDict.TryGetValue(member, out var score))
{
while (!RespWriteUtils.WriteNull(ref curr, end))
while (!RespWriteUtils.WriteInteger(rank, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);

while (!RespWriteUtils.TryWriteDoubleBulkString(score, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
}
else
{
var rank = 0;
foreach (var item in sortedSet)
{
if (item.Item2.SequenceEqual(member))
break;
rank++;
}

if (!ascending)
rank = sortedSet.Count - rank - 1;

if (withScore)
{
while (!RespWriteUtils.WriteArrayLength(2, ref curr, end)) // rank and score
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);

while (!RespWriteUtils.WriteInteger(rank, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);

while (!RespWriteUtils.TryWriteDoubleBulkString(score, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
}
else
{
while (!RespWriteUtils.WriteInteger(rank, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
}
while (!RespWriteUtils.WriteInteger(rank, ref curr, end))
ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end);
}
}

Expand Down
Loading

0 comments on commit 35d99dd

Please sign in to comment.