Skip to content

Commit

Permalink
Custom stored procedures with variable number of parameters (#369)
Browse files Browse the repository at this point in the history
* Support custom commands (stored procedures) with a variable number of parameters. This can be used to implement commands such as a custom MSET with expiration. Example of MSETPX is included in the PR.

* Add unit test

* Add sample for multi-get where values match a specified prefix.
  • Loading branch information
badrishc authored May 9, 2024
1 parent aef49e1 commit 8a1d5f8
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 7 deletions.
23 changes: 23 additions & 0 deletions libs/server/Custom/CustomFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Buffers;
using System.Collections.Generic;
using Garnet.common;

namespace Garnet.server
Expand Down Expand Up @@ -76,6 +77,28 @@ protected static unsafe void WriteBulkStringArray(ref MemoryResult<byte> output,
}
}

/// <summary>
/// Create output as an array of bulk strings, from given array of ArgSlice values
/// </summary>
protected static unsafe void WriteBulkStringArray(ref MemoryResult<byte> output, List<ArgSlice> values)
{
int totalLen = 1 + NumUtils.NumDigits(values.Count) + 2;
for (int i = 0; i < values.Count; i++)
totalLen += RespWriteUtils.GetBulkStringLength(values[i].Length);

output.MemoryOwner?.Dispose();
output.MemoryOwner = MemoryPool.Rent(totalLen);
output.Length = totalLen;

fixed (byte* ptr = output.MemoryOwner.Memory.Span)
{
var curr = ptr;
RespWriteUtils.WriteArrayLength(values.Count, ref curr, ptr + totalLen);
for (int i = 0; i < values.Count; i++)
RespWriteUtils.WriteBulkString(values[i].Span, ref curr, ptr + totalLen);
}
}

/// <summary>
/// Create output as bulk string, from given Span
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ private bool ProcessOtherCommands<TGarnetApi>(RespCommand command, byte subcmd,
if (!success1) return false;
}

if (count != currentCustomTransaction.NumParams)
if (currentCustomTransaction.NumParams < int.MaxValue && count != currentCustomTransaction.NumParams)
{
while (!RespWriteUtils.WriteError($"ERR Invalid number of parameters to stored proc {currentCustomTransaction.nameStr}, expected {currentCustomTransaction.NumParams}, actual {count}", ref dcurr, dend))
SendAndReset();
Expand Down
21 changes: 15 additions & 6 deletions libs/server/Servers/RegisterApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public RegisterApi(GarnetProvider provider)
/// Register custom command with Garnet
/// </summary>
/// <param name="name">Name of command</param>
/// <param name="numParams">Numer of parameters (excluding the key, which is always the first parameter)</param>
/// <param name="numParams">Number of parameters (excluding the key, which is always the first parameter)</param>
/// <param name="type">Type of command (e.g., read)</param>
/// <param name="customFunctions">Custom functions for command logic</param>
/// <param name="commandInfo">RESP command info</param>
Expand All @@ -39,15 +39,24 @@ public int NewCommand(string name, int numParams, CommandType type, CustomRawStr
=> provider.StoreWrapper.customCommandManager.Register(name, numParams, type, customFunctions, commandInfo, expirationTicks);

/// <summary>
/// Register transaction procedure with Garnet
/// Register transaction procedure with Garnet, with a fixed number of parameters
/// </summary>
/// <param name="name"></param>
/// <param name="numParams"></param>
/// <param name="proc"></param>
/// <returns></returns>
/// <param name="name">Name of command</param>
/// <param name="numParams">Number of parameters</param>
/// <param name="proc">Custom stored procedure</param>
/// <returns>ID of the registered command</returns>
public int NewTransactionProc(string name, int numParams, Func<CustomTransactionProcedure> proc)
=> provider.StoreWrapper.customCommandManager.Register(name, numParams, proc);

/// <summary>
/// Register transaction procedure with Garnet, with a variable number of parameters
/// </summary>
/// <param name="name">Name of command</param>
/// <param name="proc">Custom stored procedure</param>
/// <returns>ID of the registered command</returns>
public int NewTransactionProc(string name, Func<CustomTransactionProcedure> proc)
=> provider.StoreWrapper.customCommandManager.Register(name, int.MaxValue, proc);

/// <summary>
/// Register object type with server
/// </summary>
Expand Down
62 changes: 62 additions & 0 deletions main/GarnetServer/Extensions/MGetIfPM.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Collections.Generic;
using Garnet.common;
using Garnet.server;

namespace Garnet
{
/// <summary>
/// Functions to implement custom transaction MGETIFPM - get multiple keys whose values match with the given prefix
///
/// Format: MGETIFPM prefix key1 key2 ...
/// Output: array of matching key-value pairs
///
/// Description: Perform a non-transactional multi-get with value condition (prefix match) for the given set of keys
/// </summary>
sealed class MGetIfPM : CustomTransactionProcedure
{
/// <summary>
/// No transactional phase, skip Prepare
/// </summary>
public override bool Prepare<TGarnetReadApi>(TGarnetReadApi api, ArgSlice input)
=> false;

/// <summary>
/// Main will not be called because Prepare returns false
/// </summary>
public override void Main<TGarnetApi>(TGarnetApi api, ArgSlice input, ref MemoryResult<byte> output)
=> throw new InvalidOperationException();

/// <summary>
/// Perform the MGETIFPM operation
/// </summary>
public override void Finalize<TGarnetApi>(TGarnetApi api, ArgSlice input, ref MemoryResult<byte> output)
{
int offset = 0;

// Read prefix
var prefix = GetNextArg(input, ref offset);

// Read key, check condition, add to output
ArgSlice key;
List<ArgSlice> values = [];
while ((key = GetNextArg(input, ref offset)).Length > 0)
{
if (api.GET(key, out var value) == GarnetStatus.OK)
{
if (value.ReadOnlySpan.StartsWith(prefix.ReadOnlySpan))
{
values.Add(key);
values.Add(value);
}
}
}

// Return the matching key-value pairs as an array of bulk strings
WriteBulkStringArray(ref output, values);
}
}
}
51 changes: 51 additions & 0 deletions main/GarnetServer/Extensions/MSetPx.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using Garnet.common;
using Garnet.server;

namespace Garnet
{
/// <summary>
/// Functions to implement custom transaction MSETPX - set multiple keys with given expiration in milliseconds
///
/// Format: MSETPX 60000 key1 value1 key2 value2 ...
///
/// Description: Perform a non-transactional multi-set with expiry for the given set of key-value pairs
/// </summary>
sealed class MSetPxTxn : CustomTransactionProcedure
{
/// <summary>
/// No transactional phase, skip Prepare
/// </summary>
public override bool Prepare<TGarnetReadApi>(TGarnetReadApi api, ArgSlice input)
=> false;

/// <summary>
/// Main will not be called because Prepare returns false
/// </summary>
public override void Main<TGarnetApi>(TGarnetApi api, ArgSlice input, ref MemoryResult<byte> output)
=> throw new InvalidOperationException();

/// <summary>
/// Perform the MSETPX operation
/// </summary>
public override void Finalize<TGarnetApi>(TGarnetApi api, ArgSlice input, ref MemoryResult<byte> output)
{
int offset = 0;

// Read expiry
var expiryMs = GetNextArg(input, ref offset);

// Read and set key-value pairs with expiry
ArgSlice key, value;
while ((key = GetNextArg(input, ref offset)).Length > 0)
{
value = GetNextArg(input, ref offset);
api.SETEX(key, value, expiryMs);
}
WriteSimpleString(ref output, "OK");
}
}
}
6 changes: 6 additions & 0 deletions main/GarnetServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ static void RegisterExtensions(GarnetServer server)
// Register stored procedure to run a transactional command
server.Register.NewTransactionProc("READWRITETX", 3, () => new ReadWriteTxn());

// Register stored procedure to run a transactional command
server.Register.NewTransactionProc("MSETPX", () => new MSetPxTxn());

// Register stored procedure to run a transactional command
server.Register.NewTransactionProc("MGETIFPM", () => new MGetIfPM());

// Register stored procedure to run a non-transactional command
server.Register.NewTransactionProc("GETTWOKEYSNOTXN", 2, () => new GetTwoKeysNoTxn());

Expand Down
2 changes: 2 additions & 0 deletions test/Garnet.test/Garnet.test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

<ItemGroup>
<Compile Include="..\..\main\GarnetServer\Extensions\DeleteIfMatch.cs" Link="DeleteIfMatch.cs" />
<Compile Include="..\..\main\GarnetServer\Extensions\MGetIfPM.cs" Link="MGetIfPM.cs" />
<Compile Include="..\..\main\GarnetServer\Extensions\MSetPx.cs" Link="MSetPx.cs" />
<Compile Include="..\..\main\GarnetServer\Extensions\MyDictObject.cs" Link="MyDictObject.cs" />
<Compile Include="..\..\main\GarnetServer\Extensions\GetTwoKeysNoTxn.cs" Link="GetTwoKeysNoTxn.cs" />
<Compile Include="..\..\main\GarnetServer\Extensions\ReadWriteTxn.cs" Link="ReadWriteTxn.cs" />
Expand Down
110 changes: 110 additions & 0 deletions test/Garnet.test/RespTransactionProcTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -363,5 +363,115 @@ public void TransactionProcFinalizeTest()
Assert.AreEqual(value1, ((string[])result)[0]);
Assert.AreEqual(value2, ((string[])result)[1]);
}

[Test]
public void TransactionProcMSetPxTest()
{
server.Register.NewTransactionProc("MSETPX", () => new MSetPxTxn());

using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
var db = redis.GetDatabase(0);
const int NumKeys = 5;

var args = new string[1 + 2 * NumKeys];

// Set expiry to 2 seconds
args[0] = "2000";

// Set key-value pairs
for (int i = 0; i < NumKeys; i++)
{
args[2 * i + 1] = $"key{i}";
args[2 * i + 2] = $"value{i}";
}

// Execute transaction
var result = db.Execute("MSETPX", args);

// Verify transaction succeeded
Assert.AreEqual("OK", (string)result);

// Read keys to verify transaction succeeded
for (int i = 0; i < NumKeys; i++)
{
string key = $"key{i}";
string value = $"value{i}";
string retValue = db.StringGet(key);
Assert.AreEqual(value, retValue);
}

// Wait for keys to expire
Thread.Sleep(2100);

// Verify that keys have expired
for (int i = 0; i < NumKeys; i++)
{
string key = $"key{i}";
string retValue = db.StringGet(key);
Assert.IsNull(retValue);
}
}

[Test]
public void TransactionProcMGetIfPMTest()
{
server.Register.NewTransactionProc("MSETPX", () => new MSetPxTxn());
server.Register.NewTransactionProc("MGETIFPM", () => new MGetIfPM());

using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
var db = redis.GetDatabase(0);
const int NumKeys = 15;
const string prefix = "value1";

var args1 = new string[1 + 2 * NumKeys];

// Set expiry to 600 seconds
args1[0] = "600000";

// Set key-value pairs
for (int i = 0; i < NumKeys; i++)
{
args1[2 * i + 1] = $"key{i}";
args1[2 * i + 2] = $"value{i}";
}

// Execute transaction
var result1 = (string)db.Execute("MSETPX", args1);

// Verify transaction succeeded
Assert.AreEqual("OK", result1);

// Read keys to verify transaction succeeded
for (int i = 0; i < NumKeys; i++)
{
string key = $"key{i}";
string value = $"value{i}";
string retValue = db.StringGet(key);
Assert.AreEqual(value, retValue);
}

var args2 = new string[1 + NumKeys];

// Set prefix
args2[0] = prefix;

// Set keys
for (int i = 0; i < NumKeys; i++)
{
args2[i + 1] = $"key{i}";
}

// Execute transaction
var result2 = (string[])db.Execute("MGETIFPM", args2);

// Verify results
int expectedCount = NumKeys - 9; // only values with specified prefix
Assert.AreEqual(2 * expectedCount, result2.Length);
// Verify that keys have the correct prefix
for (int i = 0; i < expectedCount; i++)
{
Assert.AreEqual(prefix, result2[2 * i + 1].Substring(0, prefix.Length));
}
}
}
}

0 comments on commit 8a1d5f8

Please sign in to comment.