Skip to content

Commit

Permalink
API to clear scratch buffer (#700)
Browse files Browse the repository at this point in the history
When repeated large values are retrieved from custom commands, it is possible that the scratch buffer could overflow.
In such scenarios, the GetScratchBufferOffset and ResetScratchBuffer APIs could be invoked to free up the space occupied in the scratch buffer.
  • Loading branch information
yrajas authored Oct 16, 2024
1 parent 80fae44 commit 63133aa
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 0 deletions.
7 changes: 7 additions & 0 deletions libs/server/API/GarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,13 @@ public ITsavoriteScanIterator<byte[], IGarnetObject> IterateObjectStore()
public GarnetStatus ObjectScan(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput outputFooter)
=> storageSession.ObjectScan(key, ref input, ref outputFooter, ref objectContext);

/// <inheritdoc />
public int GetScratchBufferOffset()
=> storageSession.scratchBufferManager.ScratchBufferOffset;

/// <inheritdoc />
public bool ResetScratchBuffer(int offset)
=> storageSession.scratchBufferManager.ResetScratchBuffer(offset);
#endregion
}
}
8 changes: 8 additions & 0 deletions libs/server/API/GarnetWatchApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,14 @@ public GarnetStatus ObjectScan(byte[] key, ref ObjectInput input, ref GarnetObje
return garnetApi.ObjectScan(key, ref input, ref outputFooter);
}

/// <inheritdoc />
public int GetScratchBufferOffset()
=> garnetApi.GetScratchBufferOffset();

/// <inheritdoc />
public bool ResetScratchBuffer(int offset)
=> garnetApi.ResetScratchBuffer(offset);

#endregion
}
}
13 changes: 13 additions & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,19 @@ public bool IterateObjectStore<TScanFunctions>(ref TScanFunctions scanFunctions,
/// <param name="outputFooter"></param>
GarnetStatus ObjectScan(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput outputFooter);

/// <summary>
/// Retrieve the current scratch buffer offset.
/// </summary>
/// <returns>Current offset</returns>
int GetScratchBufferOffset();

/// <summary>
/// Resets the scratch buffer to the given offset.
/// </summary>
/// <param name="offset">Offset to reset to</param>
/// <returns>True if successful, else false</returns>
bool ResetScratchBuffer(int offset);

#endregion

}
Expand Down
17 changes: 17 additions & 0 deletions libs/server/ArgSlice/ScratchBufferManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ internal sealed unsafe class ScratchBufferManager
/// </summary>
int scratchBufferOffset;

/// <summary>Current offset in scratch buffer</summary>
internal int ScratchBufferOffset => scratchBufferOffset;

public ScratchBufferManager()
{
}
Expand All @@ -55,6 +58,20 @@ public bool RewindScratchBuffer(ref ArgSlice slice)
return false;
}

/// <summary>
/// Resets scratch buffer offset to the specified offset.
/// </summary>
/// <param name="offset">Offset to reset to</param>
/// <returns>True if successful, else false</returns>
public bool ResetScratchBuffer(int offset)
{
if (offset < 0 || offset > scratchBufferOffset)
return false;

scratchBufferOffset = offset;
return true;
}

/// <summary>
/// Create ArgSlice in scratch buffer, from given ReadOnlySpan
/// </summary>
Expand Down
11 changes: 11 additions & 0 deletions libs/server/Custom/CustomFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,17 @@ protected static unsafe void WriteError(ref (IMemoryOwner<byte>, int) output, Re
output.Item2 = len;
}

/// <summary>
/// Create output as error message, from given string
/// </summary>
protected static unsafe void WriteError(ref MemoryResult<byte> output, ReadOnlySpan<char> errorMessage)
{
var _output = (output.MemoryOwner, output.Length);
WriteError(ref _output, errorMessage);
output.MemoryOwner = _output.MemoryOwner;
output.Length = _output.Length;
}

/// <summary>
/// Get argument from input, at specified offset (starting from 0)
/// </summary>
Expand Down
165 changes: 165 additions & 0 deletions test/Garnet.test/RespCustomCommandTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,116 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Garnet.common;
using Garnet.server;
using NUnit.Framework;
using NUnit.Framework.Legacy;
using StackExchange.Redis;
using Tsavorite.core;

namespace Garnet.test
{
public class LargeGet : CustomProcedure
{
public override bool Execute(IGarnetApi garnetApi, ArgSlice input, ref MemoryResult<byte> output)
{
static bool ResetBuffer(IGarnetApi garnetApi, ref MemoryResult<byte> output, int buffOffset)
{
bool status = garnetApi.ResetScratchBuffer(buffOffset);
if (!status)
WriteError(ref output, "ERR ResetScratchBuffer failed");

return status;
}

var offset = 0;
var key = GetNextArg(input, ref offset);

var buffOffset = garnetApi.GetScratchBufferOffset();
for (var i = 0; i < 120_000; i++)
{
garnetApi.GET(key, out var outval);
if (i % 100 == 0)
{
if (!ResetBuffer(garnetApi, ref output, buffOffset))
return false;
}
}

buffOffset = garnetApi.GetScratchBufferOffset();
garnetApi.GET(key, out var outval1);
garnetApi.GET(key, out var outval2);
if (!ResetBuffer(garnetApi, ref output, buffOffset)) return false;

buffOffset = garnetApi.GetScratchBufferOffset();
var hashKey = GetNextArg(input, ref offset);
var field = GetNextArg(input, ref offset);
garnetApi.HashGet(hashKey, field, out var value);
if (!ResetBuffer(garnetApi, ref output, buffOffset)) return false;

return true;
}
}

public class LargeGetTxn : CustomTransactionProcedure
{
public override bool Prepare<TGarnetReadApi>(TGarnetReadApi api, ArgSlice input)
{
int offset = 0;
AddKey(GetNextArg(input, ref offset), LockType.Shared, false);
return true;
}

public override void Main<TGarnetApi>(TGarnetApi garnetApi, ArgSlice input, ref MemoryResult<byte> output)
{
int offset = 0;
var key = GetNextArg(input, ref offset);
var buffOffset = garnetApi.GetScratchBufferOffset();
for (int i = 0; i < 120_000; i++)
{
garnetApi.GET(key, out var outval);
if (i % 100 == 0)
{
if (!garnetApi.ResetScratchBuffer(buffOffset))
{
WriteError(ref output, "ERR ResetScratchBuffer failed");
return;
}
}
}
}
}

public class OutOfOrderFreeBuffer : CustomProcedure
{
public override bool Execute(IGarnetApi garnetApi, ArgSlice input, ref MemoryResult<byte> output)
{
var offset = 0;
var key = GetNextArg(input, ref offset);

var buffOffset1 = garnetApi.GetScratchBufferOffset();
garnetApi.GET(key, out var outval1);

var buffOffset2 = garnetApi.GetScratchBufferOffset();
garnetApi.GET(key, out var outval2);

if (!garnetApi.ResetScratchBuffer(buffOffset1))
{
WriteError(ref output, "ERR ResetScratchBuffer failed");
return false;
}

// Previous reset call would have shrunk the buffer. This call should fail otherwise it will expand the buffer.
if (garnetApi.ResetScratchBuffer(buffOffset2))
{
WriteError(ref output, "ERR ResetScratchBuffer shouldn't expand the buffer");
return false;
}

return true;
}
}

[TestFixture]
public class RespCustomCommandTests
{
Expand Down Expand Up @@ -547,6 +650,68 @@ public void CustomCommandRegistrationTest()
ClassicAssert.AreEqual("30", retValue.ToString());
}

[Test]
public void CustomProcedureFreeBufferTest()
{
server.Register.NewProcedure("LARGEGET", new LargeGet());
var key = "key";
var hashKey = "hashKey";
var hashField = "field";
using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
var db = redis.GetDatabase(0);
byte[] value = new byte[10_000];
db.StringSet(key, value);
db.HashSet(hashKey, [new HashEntry(hashField, value)]);

try
{
var result = db.Execute("LARGEGET", key, hashKey, hashField);
ClassicAssert.AreEqual("OK", result.ToString());
}
catch (RedisServerException rse)
{
ClassicAssert.Fail(rse.Message);
}
}

[Test]
public void CustomTxnFreeBufferTest()
{
server.Register.NewTransactionProc("LARGEGETTXN", () => new LargeGetTxn());
var key = "key";
var hashKey = "hashKey";
var hashField = "field";
using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
var db = redis.GetDatabase(0);
byte[] value = new byte[10_000];
db.StringSet(key, value);
db.HashSet(hashKey, [new HashEntry(hashField, value)]);

try
{
var result = db.Execute("LARGEGETTXN", key);
ClassicAssert.AreEqual("OK", result.ToString());
}
catch (RedisServerException rse)
{
ClassicAssert.Fail(rse.Message);
}
}

[Test]
public void CustomProcedureOutOfOrderFreeBufferTest()
{
server.Register.NewProcedure("OUTOFORDERFREE", new OutOfOrderFreeBuffer());
var key = "key";
using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
var db = redis.GetDatabase(0);
byte[] value = new byte[10_000];
db.StringSet(key, value);

var result = db.Execute("OUTOFORDERFREE", key);
ClassicAssert.AreEqual("OK", result.ToString());
}

private string[] CreateTestLibraries()
{
var runtimePath = RuntimeEnvironment.GetRuntimeDirectory();
Expand Down
4 changes: 4 additions & 0 deletions website/docs/extensions/procedure.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ Registering the custom procedure is done on the server-side by calling the

method on the Garnet server object's `RegisterAPI` object with its name, an instance of the custom procedure class and optional commandInfo.

**NOTE** When invoking APIs on `IGarnetApi` multiple times with large outputs, it is possible to exhaust the internal buffer capacity. If such usage scenarios are expected, the buffer could be reset as described below.
* Retrieve the initial buffer offset using `IGarnetApi.GetScratchBufferOffset`
* Invoke necessary apis on `IGarnetApi`
* Reset the buffer back to where it was using `IGarnetApi.ResetScratchBuffer(offset)`

:::tip
As a reference of an implementation of a custom procedure, see the example in GarnetServer\Extensions\Sum.cs.
5 changes: 5 additions & 0 deletions website/docs/extensions/transactions.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ These are the helper methods for developing custom transactions.
- `GetNextArg(ArgSlice input, ref int offset)` This method is used to retrieve the next argument from the input at the specified offset. It takes an ArgSlice parameter representing the input and a reference to an int offset. It returns an ArgSlice object representing the argument as a span. The method internally reads a pointer with a length header to extract the argument.
These member functions provide utility and convenience methods for manipulating and working with the transaction data, scratch buffer, and input arguments within the CustomTransactionProcedure class.

**NOTE** When invoking APIs on `IGarnetApi` multiple times with large outputs, it is possible to exhaust the internal buffer capacity. If such usage scenarios are expected, the buffer could be reset as described below.
* Retrieve the initial buffer offset using `IGarnetApi.GetScratchBufferOffset`
* Invoke necessary apis on `IGarnetApi`
* Reset the buffer back to where it was using `IGarnetApi.ResetScratchBuffer(offset)`

Registering the custom transaction is done on the server-side by calling the `NewTransactionProc(string name, int numParams, Func<CustomTransactionProcedure> proc)` method on the Garnet server object's `RegisterAPI` object with its name, number of parameters and a method that returns an instance of the custom transaction class.\
It is possible to register the custom transaction from the client-side as well (as an admin command, given that the code already resides on the server) by using the `REGISTER` command (see [Custom Commands](../dev/custom-commands.md)).

Expand Down

0 comments on commit 63133aa

Please sign in to comment.