Skip to content

Commit

Permalink
Support async RESP responses to pending GET calls (due to disk read) (m…
Browse files Browse the repository at this point in the history
…icrosoft#387)

* Initial rough prototype checkin
* updates to logic
* update comment for spinLock
* fix formatting
* add comments
* Basic implementation of HELLO command
* Added command info for HELLO + added website docs
* update SE.Redis to latest, add HELLO unit test
* Add and use EqualsIgnoreCase
* use nameof(RespCommand.HELLO)
* support special pong in RESP2 subscribe
* only support RESP2 in this PR
* remove resp3 testcase
* make EqualsIgnoreCase extension method in utils
* improve the case-insensitive match test
* support push subscribe
* update version
* fix lowest flag
* fix info commands
* ensure full input received before processing array commands, if useAsync is true.
* async only works on RESP3 or later
* change respProtocolVersion to byte
* add comments
* protocol change is not allowed with pending async operations
* optimize sync case
* fix format
* fix push response header for async
* fix asyncStarted increment
* Added unit test, minor fixes
* Cancel and signal to ensure the async processor ends
  • Loading branch information
badrishc authored May 29, 2024
1 parent 72e7504 commit 53cff60
Show file tree
Hide file tree
Showing 38 changed files with 852 additions and 175 deletions.
60 changes: 39 additions & 21 deletions libs/common/LightClientRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,30 @@

namespace Garnet.common
{
public enum CountResponseType
{
Bytes,
Tokens,
Newlines
}

public unsafe class LightClientRequest : IDisposable
{
readonly LightClient client;
byte[] responseBuffer;
int bytesReceived = 0;
readonly bool countResponseLength;

/// <summary>
/// How to count the response length
/// </summary>
public CountResponseType countResponseType;

public string Address { get; set; }
public int Port { get; set; }

public LightClientRequest(string address, int port, int optType, LightClient.OnResponseDelegateUnsafe onReceive = null, SslClientAuthenticationOptions sslOptions = null, bool countResponseLength = false)
public LightClientRequest(string address, int port, int optType, LightClient.OnResponseDelegateUnsafe onReceive = null, SslClientAuthenticationOptions sslOptions = null, CountResponseType countResponseType = CountResponseType.Tokens)
{
this.countResponseLength = countResponseLength;
this.countResponseType = countResponseType;
client = new LightClient(address, port, optType, onReceive == null ? LightReceive : onReceive, sslOptions: sslOptions);
client.Connect();
Address = address;
Expand Down Expand Up @@ -103,26 +114,33 @@ public byte[] SendCommands(string cmd1, string cmd2, int numTokensCmd1 = 1, int
int count = 0;
for (int i = 0; i < bytesRead; i++)
{
if (countResponseLength)
{
count++;
}
else
switch (countResponseType)
{
// check for null value '$-1'
if (buf[i] == '$' && buf[i + 1] == '-' && buf[i + 2] == '1')
{
count++;
}
// check for error
else if (buf[i] == '-' && i == 0)
{
case CountResponseType.Bytes:
count++;
}
else if (buf[i] == '$' || buf[i] == '+' || buf[i] == ':' || buf[i] == '*')
{
count++;
}
break;
case CountResponseType.Newlines:
if (buf[i] == '\n')
count++;
break;
case CountResponseType.Tokens:
// check for null value '$-1'
if (buf[i] == '$' && buf[i + 1] == '-' && buf[i + 2] == '1')
{
count++;
}
// check for error
else if (buf[i] == '-' && i == 0)
{
count++;
}
else if (buf[i] == '$' || buf[i] == '+' || buf[i] == ':' || buf[i] == '*')
{
count++;
}
break;
default:
break;
}

// Appending value to accumulated buffer
Expand Down
49 changes: 49 additions & 0 deletions libs/common/Networking/GarnetTcpNetworkSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public class GarnetTcpNetworkSender : NetworkSenderBase

readonly LimitedFixedBufferPool networkPool;

/// <summary>
/// NOTE: This variable should not be marked as readonly as it is a mutable struct
/// </summary>
SpinLock spinLock;

/// <summary>
///
/// </summary>
Expand All @@ -67,6 +72,7 @@ public GarnetTcpNetworkSender(
this.saeaStack = new(2 * ThrottleMax);
this.responseObject = null;
this.ThrottleMax = throttleMax;
this.spinLock = new();

var endpoint = socket.RemoteEndPoint as IPEndPoint;
if (endpoint != null)
Expand All @@ -79,6 +85,49 @@ public GarnetTcpNetworkSender(
/// <inheritdoc />
public override string RemoteEndpointName => remoteEndpoint;

/// <inheritdoc />
public override void Enter()
{
var lockTaken = false;
spinLock.Enter(ref lockTaken);
Debug.Assert(lockTaken);
}

/// <inheritdoc />
public override unsafe void EnterAndGetResponseObject(out byte* head, out byte* tail)
{
var lockTaken = false;
spinLock.Enter(ref lockTaken);
Debug.Assert(lockTaken);
Debug.Assert(responseObject == null);
if (!saeaStack.TryPop(out responseObject, out bool disposed))
{
if (disposed)
ThrowDisposed();
responseObject = new GarnetSaeaBuffer(SeaaBuffer_Completed, networkPool);
}
head = responseObject.buffer.entryPtr;
tail = responseObject.buffer.entryPtr + responseObject.buffer.entry.Length;
}

/// <inheritdoc />
public override void Exit()
{
spinLock.Exit();
}

/// <inheritdoc />
public override unsafe void ExitAndReturnResponseObject()
{
if (responseObject != null)
{
ReturnBuffer(responseObject);
responseObject = null;
}
spinLock.Exit();
}


/// <inheritdoc />
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public override void GetResponseObject()
Expand Down
22 changes: 22 additions & 0 deletions libs/common/Networking/INetworkSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,28 @@ public interface INetworkSender : IDisposable
/// </summary>
string RemoteEndpointName { get; }

/// <summary>
/// Enter exclusive use of network sender.
/// </summary>
void Enter();

/// <summary>
/// Enter exclusive use of network sender. Allocate and get response object pointers.
/// </summary>
/// <param name="head"></param>
/// <param name="tail"></param>
unsafe void EnterAndGetResponseObject(out byte* head, out byte* tail);

/// <summary>
/// Exit exclusive use of network sender.
/// </summary>
void Exit();

/// <summary>
/// Exit exclusive use of network sender. Free response object.
/// </summary>
void ExitAndReturnResponseObject();

/// <summary>
/// Allocate a new response object
/// </summary>
Expand Down
22 changes: 22 additions & 0 deletions libs/common/Networking/NetworkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,28 @@ unsafe void ShiftTransportReceiveBuffer()
}
}

/// <inheritdoc />
public override void Enter()
=> networkSender.Enter();

/// <inheritdoc />
public override unsafe void EnterAndGetResponseObject(out byte* head, out byte* tail)
{
networkSender.Enter();
head = transportSendBufferPtr;
tail = transportSendBufferPtr + transportSendBuffer.Length;
}

/// <inheritdoc />
public override void Exit()
=> networkSender.Exit();

/// <inheritdoc />
public override void ExitAndReturnResponseObject()
{
networkSender.Exit();
}

/// <inheritdoc />
public override void GetResponseObject() { }

Expand Down
12 changes: 12 additions & 0 deletions libs/common/Networking/NetworkSenderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ public NetworkSenderBase(int serverBufferSize)
/// <inheritdoc />
public abstract string RemoteEndpointName { get; }

/// <inheritdoc />
public abstract void Enter();

/// <inheritdoc />
public abstract unsafe void EnterAndGetResponseObject(out byte* head, out byte* tail);

/// <inheritdoc />
public abstract void Exit();

/// <inheritdoc />
public abstract void ExitAndReturnResponseObject();

/// <inheritdoc />
public abstract void GetResponseObject();

Expand Down
25 changes: 25 additions & 0 deletions libs/common/RespReadUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,31 @@ public static bool ReadULongWithLengthHeader(out ulong number, ref byte* ptr, by
return true;
}

/// <summary>
/// Skip byte array with length header
/// </summary>
public static bool SkipByteArrayWithLengthHeader(ref byte* ptr, byte* end)
{
// Parse RESP string header
if (!ReadLengthHeader(out var length, ref ptr, end))
return false;

// Advance read pointer to the end of the array (including terminator)
var keyPtr = ptr;

ptr += length + 2;

if (ptr > end)
return false;

// Ensure terminator has been received
if (*(ushort*)(ptr - 2) != MemoryMarshal.Read<ushort>("\r\n"u8))
{
RespParsingException.ThrowUnexpectedToken(*(ptr - 2));
}
return true;
}

/// <summary>
/// Read byte array with length header
/// </summary>
Expand Down
15 changes: 15 additions & 0 deletions libs/common/RespWriteUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ public static bool WriteMapLength(int len, ref byte* curr, byte* end)
return true;
}

/// <summary>
/// Write push type length
/// </summary>
public static bool WritePushLength(int len, ref byte* curr, byte* end)
{
int numDigits = NumUtils.NumDigits(len);
int totalLen = 1 + numDigits + 2;
if (totalLen > (int)(end - curr))
return false;
*curr++ = (byte)'>';
NumUtils.IntToBytes(len, numDigits, ref curr);
WriteNewline(ref curr);
return true;
}

/// <summary>
/// Write array length
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class GarnetServer : IDisposable
/// <summary>
/// Resp protocol version
/// </summary>
readonly string redisProtocolVersion = "6.2.11";
readonly string redisProtocolVersion = "7.2.5";

/// <summary>
/// Metrics API
Expand Down
3 changes: 3 additions & 0 deletions libs/server/API/GarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public GarnetStatus GET_WithPending(ref SpanByte key, ref SpanByte input, ref Sp
public bool GET_CompletePending((GarnetStatus, SpanByteAndMemory)[] outputArr, bool wait = false)
=> storageSession.GET_CompletePending(outputArr, wait, ref context);

public bool GET_CompletePending(out CompletedOutputIterator<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long> completedOutputs, bool wait)
=> storageSession.GET_CompletePending(out completedOutputs, wait, ref context);

/// <inheritdoc />
public unsafe GarnetStatus GETForMemoryResult(ArgSlice key, out MemoryResult<byte> value)
=> storageSession.GET(key, out value, ref context);
Expand Down
8 changes: 8 additions & 0 deletions libs/server/API/IGarnetAdvancedApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ public interface IGarnetAdvancedApi
/// <param name="wait"></param>
bool GET_CompletePending((GarnetStatus, SpanByteAndMemory)[] outputArr, bool wait = false);

/// <summary>
/// Complete pending read operations on main store
/// </summary>
/// <param name="completedOutputs"></param>
/// <param name="wait"></param>
/// <returns></returns>
bool GET_CompletePending(out CompletedOutputIterator<SpanByte, SpanByte, SpanByte, SpanByteAndMemory, long> completedOutputs, bool wait = false);

/// <summary>
/// RMW operation on main store
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion libs/server/InputHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public struct RespInputHeader
/// Size of header
/// </summary>
public const int Size = 2;
internal const byte FlagMask = (byte)RespInputFlags.Deterministic - 1;
internal const byte FlagMask = (byte)RespInputFlags.SetGet - 1;

[FieldOffset(0)]
internal RespCommand cmd;
Expand Down
Loading

0 comments on commit 53cff60

Please sign in to comment.