Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom stored procedures with variable number of parameters #369

Merged
merged 6 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions libs/server/Custom/CustomFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Buffers;
using System.Collections.Generic;
using Garnet.common;

namespace Garnet.server
Expand Down Expand Up @@ -76,6 +77,28 @@ protected static unsafe void WriteBulkStringArray(ref MemoryResult<byte> output,
}
}

/// <summary>
/// Create output as an array of bulk strings, from given array of ArgSlice values
/// </summary>
protected static unsafe void WriteBulkStringArray(ref MemoryResult<byte> output, List<ArgSlice> values)
{
int totalLen = 1 + NumUtils.NumDigits(values.Count) + 2;
for (int i = 0; i < values.Count; i++)
totalLen += RespWriteUtils.GetBulkStringLength(values[i].Length);

output.MemoryOwner?.Dispose();
output.MemoryOwner = MemoryPool.Rent(totalLen);
output.Length = totalLen;

fixed (byte* ptr = output.MemoryOwner.Memory.Span)
{
var curr = ptr;
RespWriteUtils.WriteArrayLength(values.Count, ref curr, ptr + totalLen);
for (int i = 0; i < values.Count; i++)
RespWriteUtils.WriteBulkString(values[i].Span, ref curr, ptr + totalLen);
}
}

/// <summary>
/// Create output as bulk string, from given Span
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ private bool ProcessOtherCommands<TGarnetApi>(RespCommand command, byte subcmd,
if (!success1) return false;
}

if (count != currentCustomTransaction.NumParams)
if (currentCustomTransaction.NumParams < int.MaxValue && count != currentCustomTransaction.NumParams)
{
while (!RespWriteUtils.WriteError($"ERR Invalid number of parameters to stored proc {currentCustomTransaction.nameStr}, expected {currentCustomTransaction.NumParams}, actual {count}", ref dcurr, dend))
SendAndReset();
Expand Down
21 changes: 15 additions & 6 deletions libs/server/Servers/RegisterApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public RegisterApi(GarnetProvider provider)
/// Register custom command with Garnet
/// </summary>
/// <param name="name">Name of command</param>
/// <param name="numParams">Numer of parameters (excluding the key, which is always the first parameter)</param>
/// <param name="numParams">Number of parameters (excluding the key, which is always the first parameter)</param>
/// <param name="type">Type of command (e.g., read)</param>
/// <param name="customFunctions">Custom functions for command logic</param>
/// <param name="commandInfo">RESP command info</param>
Expand All @@ -39,15 +39,24 @@ public int NewCommand(string name, int numParams, CommandType type, CustomRawStr
=> provider.StoreWrapper.customCommandManager.Register(name, numParams, type, customFunctions, commandInfo, expirationTicks);

/// <summary>
/// Register transaction procedure with Garnet
/// Register transaction procedure with Garnet, with a fixed number of parameters
/// </summary>
/// <param name="name"></param>
/// <param name="numParams"></param>
/// <param name="proc"></param>
/// <returns></returns>
/// <param name="name">Name of command</param>
/// <param name="numParams">Number of parameters</param>
/// <param name="proc">Custom stored procedure</param>
/// <returns>ID of the registered command</returns>
public int NewTransactionProc(string name, int numParams, Func<CustomTransactionProcedure> proc)
=> provider.StoreWrapper.customCommandManager.Register(name, numParams, proc);

/// <summary>
/// Register transaction procedure with Garnet, with a variable number of parameters
/// </summary>
/// <param name="name">Name of command</param>
/// <param name="proc">Custom stored procedure</param>
/// <returns>ID of the registered command</returns>
public int NewTransactionProc(string name, Func<CustomTransactionProcedure> proc)
=> provider.StoreWrapper.customCommandManager.Register(name, int.MaxValue, proc);

/// <summary>
/// Register object type with server
/// </summary>
Expand Down
62 changes: 62 additions & 0 deletions main/GarnetServer/Extensions/MGetIfPM.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Collections.Generic;
using Garnet.common;
using Garnet.server;

namespace Garnet
{
/// <summary>
/// Functions to implement custom transaction MGETIFPM - get multiple keys whose values match with the given prefix
///
/// Format: MGETIFPM prefix key1 key2 ...
/// Output: array of matching key-value pairs
///
/// Description: Perform a non-transactional multi-get with value condition (prefix match) for the given set of keys
/// </summary>
sealed class MGetIfPM : CustomTransactionProcedure
{
/// <summary>
/// No transactional phase, skip Prepare
/// </summary>
public override bool Prepare<TGarnetReadApi>(TGarnetReadApi api, ArgSlice input)
=> false;

/// <summary>
/// Main will not be called because Prepare returns false
/// </summary>
public override void Main<TGarnetApi>(TGarnetApi api, ArgSlice input, ref MemoryResult<byte> output)
=> throw new InvalidOperationException();

/// <summary>
/// Perform the MGETIFPM operation
/// </summary>
public override void Finalize<TGarnetApi>(TGarnetApi api, ArgSlice input, ref MemoryResult<byte> output)
{
int offset = 0;

// Read prefix
var prefix = GetNextArg(input, ref offset);

// Read key, check condition, add to output
ArgSlice key;
List<ArgSlice> values = [];
while ((key = GetNextArg(input, ref offset)).Length > 0)
{
if (api.GET(key, out var value) == GarnetStatus.OK)
{
if (value.ReadOnlySpan.StartsWith(prefix.ReadOnlySpan))
{
values.Add(key);
values.Add(value);
}
}
}

// Return the matching key-value pairs as an array of bulk strings
WriteBulkStringArray(ref output, values);
}
}
}
51 changes: 51 additions & 0 deletions main/GarnetServer/Extensions/MSetPx.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using Garnet.common;
using Garnet.server;

namespace Garnet
{
/// <summary>
/// Functions to implement custom transaction MSETPX - set multiple keys with given expiration in milliseconds
///
/// Format: MSETPX 60000 key1 value1 key2 value2 ...
///
/// Description: Perform a non-transactional multi-set with expiry for the given set of key-value pairs
/// </summary>
sealed class MSetPxTxn : CustomTransactionProcedure
{
/// <summary>
/// No transactional phase, skip Prepare
/// </summary>
public override bool Prepare<TGarnetReadApi>(TGarnetReadApi api, ArgSlice input)
=> false;

/// <summary>
/// Main will not be called because Prepare returns false
/// </summary>
public override void Main<TGarnetApi>(TGarnetApi api, ArgSlice input, ref MemoryResult<byte> output)
=> throw new InvalidOperationException();

/// <summary>
/// Perform the MSETPX operation
/// </summary>
public override void Finalize<TGarnetApi>(TGarnetApi api, ArgSlice input, ref MemoryResult<byte> output)
{
int offset = 0;

// Read expiry
var expiryMs = GetNextArg(input, ref offset);

// Read and set key-value pairs with expiry
ArgSlice key, value;
while ((key = GetNextArg(input, ref offset)).Length > 0)
{
value = GetNextArg(input, ref offset);
api.SETEX(key, value, expiryMs);
}
WriteSimpleString(ref output, "OK");
}
}
}
6 changes: 6 additions & 0 deletions main/GarnetServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ static void RegisterExtensions(GarnetServer server)
// Register stored procedure to run a transactional command
server.Register.NewTransactionProc("READWRITETX", 3, () => new ReadWriteTxn());

// Register stored procedure to run a transactional command
server.Register.NewTransactionProc("MSETPX", () => new MSetPxTxn());

// Register stored procedure to run a transactional command
server.Register.NewTransactionProc("MGETIFPM", () => new MGetIfPM());

// Register stored procedure to run a non-transactional command
server.Register.NewTransactionProc("GETTWOKEYSNOTXN", 2, () => new GetTwoKeysNoTxn());

Expand Down
2 changes: 2 additions & 0 deletions test/Garnet.test/Garnet.test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

<ItemGroup>
<Compile Include="..\..\main\GarnetServer\Extensions\DeleteIfMatch.cs" Link="DeleteIfMatch.cs" />
<Compile Include="..\..\main\GarnetServer\Extensions\MGetIfPM.cs" Link="MGetIfPM.cs" />
<Compile Include="..\..\main\GarnetServer\Extensions\MSetPx.cs" Link="MSetPx.cs" />
<Compile Include="..\..\main\GarnetServer\Extensions\MyDictObject.cs" Link="MyDictObject.cs" />
<Compile Include="..\..\main\GarnetServer\Extensions\GetTwoKeysNoTxn.cs" Link="GetTwoKeysNoTxn.cs" />
<Compile Include="..\..\main\GarnetServer\Extensions\ReadWriteTxn.cs" Link="ReadWriteTxn.cs" />
Expand Down
110 changes: 110 additions & 0 deletions test/Garnet.test/RespTransactionProcTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -363,5 +363,115 @@ public void TransactionProcFinalizeTest()
Assert.AreEqual(value1, ((string[])result)[0]);
Assert.AreEqual(value2, ((string[])result)[1]);
}

[Test]
public void TransactionProcMSetPxTest()
{
server.Register.NewTransactionProc("MSETPX", () => new MSetPxTxn());

using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
var db = redis.GetDatabase(0);
const int NumKeys = 5;

var args = new string[1 + 2 * NumKeys];

// Set expiry to 2 seconds
args[0] = "2000";

// Set key-value pairs
for (int i = 0; i < NumKeys; i++)
{
args[2 * i + 1] = $"key{i}";
args[2 * i + 2] = $"value{i}";
}

// Execute transaction
var result = db.Execute("MSETPX", args);

// Verify transaction succeeded
Assert.AreEqual("OK", (string)result);

// Read keys to verify transaction succeeded
for (int i = 0; i < NumKeys; i++)
{
string key = $"key{i}";
string value = $"value{i}";
string retValue = db.StringGet(key);
Assert.AreEqual(value, retValue);
}

// Wait for keys to expire
Thread.Sleep(2100);

// Verify that keys have expired
for (int i = 0; i < NumKeys; i++)
{
string key = $"key{i}";
string retValue = db.StringGet(key);
Assert.IsNull(retValue);
}
}

[Test]
public void TransactionProcMGetIfPMTest()
{
server.Register.NewTransactionProc("MSETPX", () => new MSetPxTxn());
server.Register.NewTransactionProc("MGETIFPM", () => new MGetIfPM());

using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
var db = redis.GetDatabase(0);
const int NumKeys = 15;
const string prefix = "value1";

var args1 = new string[1 + 2 * NumKeys];

// Set expiry to 600 seconds
args1[0] = "600000";

// Set key-value pairs
for (int i = 0; i < NumKeys; i++)
{
args1[2 * i + 1] = $"key{i}";
args1[2 * i + 2] = $"value{i}";
}

// Execute transaction
var result1 = (string)db.Execute("MSETPX", args1);

// Verify transaction succeeded
Assert.AreEqual("OK", result1);

// Read keys to verify transaction succeeded
for (int i = 0; i < NumKeys; i++)
{
string key = $"key{i}";
string value = $"value{i}";
string retValue = db.StringGet(key);
Assert.AreEqual(value, retValue);
}

var args2 = new string[1 + NumKeys];

// Set prefix
args2[0] = prefix;

// Set keys
for (int i = 0; i < NumKeys; i++)
{
args2[i + 1] = $"key{i}";
}

// Execute transaction
var result2 = (string[])db.Execute("MGETIFPM", args2);

// Verify results
int expectedCount = NumKeys - 9; // only values with specified prefix
Assert.AreEqual(2 * expectedCount, result2.Length);
// Verify that keys have the correct prefix
for (int i = 0; i < expectedCount; i++)
{
Assert.AreEqual(prefix, result2[2 * i + 1].Substring(0, prefix.Length));
}
}
}
}