From 63133aa39406fc29a156d57e87d9863648626ed0 Mon Sep 17 00:00:00 2001 From: Yoganand Rajasekaran <60369795+yrajas@users.noreply.github.com> Date: Tue, 15 Oct 2024 18:08:19 -0700 Subject: [PATCH] API to clear scratch buffer (#700) When repeated large values are retrieved from custom commands, it is possible that the scratch buffer could overflow. In such scenarios, the GetScratchBufferOffset and ResetScratchBuffer APIs could be invoked to free up the space occupied in the scratch buffer. --- libs/server/API/GarnetApi.cs | 7 + libs/server/API/GarnetWatchApi.cs | 8 + libs/server/API/IGarnetApi.cs | 13 ++ libs/server/ArgSlice/ScratchBufferManager.cs | 17 ++ libs/server/Custom/CustomFunctions.cs | 11 ++ test/Garnet.test/RespCustomCommandTests.cs | 165 +++++++++++++++++++ website/docs/extensions/procedure.md | 4 + website/docs/extensions/transactions.md | 5 + 8 files changed, 230 insertions(+) diff --git a/libs/server/API/GarnetApi.cs b/libs/server/API/GarnetApi.cs index ac364ad74e..a1e12775f0 100644 --- a/libs/server/API/GarnetApi.cs +++ b/libs/server/API/GarnetApi.cs @@ -397,6 +397,13 @@ public ITsavoriteScanIterator IterateObjectStore() public GarnetStatus ObjectScan(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput outputFooter) => storageSession.ObjectScan(key, ref input, ref outputFooter, ref objectContext); + /// + public int GetScratchBufferOffset() + => storageSession.scratchBufferManager.ScratchBufferOffset; + + /// + public bool ResetScratchBuffer(int offset) + => storageSession.scratchBufferManager.ResetScratchBuffer(offset); #endregion } } \ No newline at end of file diff --git a/libs/server/API/GarnetWatchApi.cs b/libs/server/API/GarnetWatchApi.cs index 8a4e6044b2..a3ddb19a2c 100644 --- a/libs/server/API/GarnetWatchApi.cs +++ b/libs/server/API/GarnetWatchApi.cs @@ -546,6 +546,14 @@ public GarnetStatus ObjectScan(byte[] key, ref ObjectInput input, ref GarnetObje return garnetApi.ObjectScan(key, ref input, ref outputFooter); } + /// + public int GetScratchBufferOffset() + => garnetApi.GetScratchBufferOffset(); + + /// + public bool ResetScratchBuffer(int offset) + => garnetApi.ResetScratchBuffer(offset); + #endregion } } \ No newline at end of file diff --git a/libs/server/API/IGarnetApi.cs b/libs/server/API/IGarnetApi.cs index 53af70fdb2..12fe0f29c5 100644 --- a/libs/server/API/IGarnetApi.cs +++ b/libs/server/API/IGarnetApi.cs @@ -1700,6 +1700,19 @@ public bool IterateObjectStore(ref TScanFunctions scanFunctions, /// GarnetStatus ObjectScan(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput outputFooter); + /// + /// Retrieve the current scratch buffer offset. + /// + /// Current offset + int GetScratchBufferOffset(); + + /// + /// Resets the scratch buffer to the given offset. + /// + /// Offset to reset to + /// True if successful, else false + bool ResetScratchBuffer(int offset); + #endregion } diff --git a/libs/server/ArgSlice/ScratchBufferManager.cs b/libs/server/ArgSlice/ScratchBufferManager.cs index a3b83d728f..68861d3985 100644 --- a/libs/server/ArgSlice/ScratchBufferManager.cs +++ b/libs/server/ArgSlice/ScratchBufferManager.cs @@ -31,6 +31,9 @@ internal sealed unsafe class ScratchBufferManager /// int scratchBufferOffset; + /// Current offset in scratch buffer + internal int ScratchBufferOffset => scratchBufferOffset; + public ScratchBufferManager() { } @@ -55,6 +58,20 @@ public bool RewindScratchBuffer(ref ArgSlice slice) return false; } + /// + /// Resets scratch buffer offset to the specified offset. + /// + /// Offset to reset to + /// True if successful, else false + public bool ResetScratchBuffer(int offset) + { + if (offset < 0 || offset > scratchBufferOffset) + return false; + + scratchBufferOffset = offset; + return true; + } + /// /// Create ArgSlice in scratch buffer, from given ReadOnlySpan /// diff --git a/libs/server/Custom/CustomFunctions.cs b/libs/server/Custom/CustomFunctions.cs index dd6842e64d..0c1f41998c 100644 --- a/libs/server/Custom/CustomFunctions.cs +++ b/libs/server/Custom/CustomFunctions.cs @@ -171,6 +171,17 @@ protected static unsafe void WriteError(ref (IMemoryOwner, int) output, Re output.Item2 = len; } + /// + /// Create output as error message, from given string + /// + protected static unsafe void WriteError(ref MemoryResult output, ReadOnlySpan errorMessage) + { + var _output = (output.MemoryOwner, output.Length); + WriteError(ref _output, errorMessage); + output.MemoryOwner = _output.MemoryOwner; + output.Length = _output.Length; + } + /// /// Get argument from input, at specified offset (starting from 0) /// diff --git a/test/Garnet.test/RespCustomCommandTests.cs b/test/Garnet.test/RespCustomCommandTests.cs index 5949a20ce0..ad59e7e7d9 100644 --- a/test/Garnet.test/RespCustomCommandTests.cs +++ b/test/Garnet.test/RespCustomCommandTests.cs @@ -10,13 +10,116 @@ using System.Text; using System.Threading; using System.Threading.Tasks; +using Garnet.common; using Garnet.server; using NUnit.Framework; using NUnit.Framework.Legacy; using StackExchange.Redis; +using Tsavorite.core; namespace Garnet.test { + public class LargeGet : CustomProcedure + { + public override bool Execute(IGarnetApi garnetApi, ArgSlice input, ref MemoryResult output) + { + static bool ResetBuffer(IGarnetApi garnetApi, ref MemoryResult output, int buffOffset) + { + bool status = garnetApi.ResetScratchBuffer(buffOffset); + if (!status) + WriteError(ref output, "ERR ResetScratchBuffer failed"); + + return status; + } + + var offset = 0; + var key = GetNextArg(input, ref offset); + + var buffOffset = garnetApi.GetScratchBufferOffset(); + for (var i = 0; i < 120_000; i++) + { + garnetApi.GET(key, out var outval); + if (i % 100 == 0) + { + if (!ResetBuffer(garnetApi, ref output, buffOffset)) + return false; + } + } + + buffOffset = garnetApi.GetScratchBufferOffset(); + garnetApi.GET(key, out var outval1); + garnetApi.GET(key, out var outval2); + if (!ResetBuffer(garnetApi, ref output, buffOffset)) return false; + + buffOffset = garnetApi.GetScratchBufferOffset(); + var hashKey = GetNextArg(input, ref offset); + var field = GetNextArg(input, ref offset); + garnetApi.HashGet(hashKey, field, out var value); + if (!ResetBuffer(garnetApi, ref output, buffOffset)) return false; + + return true; + } + } + + public class LargeGetTxn : CustomTransactionProcedure + { + public override bool Prepare(TGarnetReadApi api, ArgSlice input) + { + int offset = 0; + AddKey(GetNextArg(input, ref offset), LockType.Shared, false); + return true; + } + + public override void Main(TGarnetApi garnetApi, ArgSlice input, ref MemoryResult output) + { + int offset = 0; + var key = GetNextArg(input, ref offset); + var buffOffset = garnetApi.GetScratchBufferOffset(); + for (int i = 0; i < 120_000; i++) + { + garnetApi.GET(key, out var outval); + if (i % 100 == 0) + { + if (!garnetApi.ResetScratchBuffer(buffOffset)) + { + WriteError(ref output, "ERR ResetScratchBuffer failed"); + return; + } + } + } + } + } + + public class OutOfOrderFreeBuffer : CustomProcedure + { + public override bool Execute(IGarnetApi garnetApi, ArgSlice input, ref MemoryResult output) + { + var offset = 0; + var key = GetNextArg(input, ref offset); + + var buffOffset1 = garnetApi.GetScratchBufferOffset(); + garnetApi.GET(key, out var outval1); + + var buffOffset2 = garnetApi.GetScratchBufferOffset(); + garnetApi.GET(key, out var outval2); + + if (!garnetApi.ResetScratchBuffer(buffOffset1)) + { + WriteError(ref output, "ERR ResetScratchBuffer failed"); + return false; + } + + // Previous reset call would have shrunk the buffer. This call should fail otherwise it will expand the buffer. + if (garnetApi.ResetScratchBuffer(buffOffset2)) + { + WriteError(ref output, "ERR ResetScratchBuffer shouldn't expand the buffer"); + return false; + } + + return true; + } + } + [TestFixture] public class RespCustomCommandTests { @@ -547,6 +650,68 @@ public void CustomCommandRegistrationTest() ClassicAssert.AreEqual("30", retValue.ToString()); } + [Test] + public void CustomProcedureFreeBufferTest() + { + server.Register.NewProcedure("LARGEGET", new LargeGet()); + var key = "key"; + var hashKey = "hashKey"; + var hashField = "field"; + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + byte[] value = new byte[10_000]; + db.StringSet(key, value); + db.HashSet(hashKey, [new HashEntry(hashField, value)]); + + try + { + var result = db.Execute("LARGEGET", key, hashKey, hashField); + ClassicAssert.AreEqual("OK", result.ToString()); + } + catch (RedisServerException rse) + { + ClassicAssert.Fail(rse.Message); + } + } + + [Test] + public void CustomTxnFreeBufferTest() + { + server.Register.NewTransactionProc("LARGEGETTXN", () => new LargeGetTxn()); + var key = "key"; + var hashKey = "hashKey"; + var hashField = "field"; + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + byte[] value = new byte[10_000]; + db.StringSet(key, value); + db.HashSet(hashKey, [new HashEntry(hashField, value)]); + + try + { + var result = db.Execute("LARGEGETTXN", key); + ClassicAssert.AreEqual("OK", result.ToString()); + } + catch (RedisServerException rse) + { + ClassicAssert.Fail(rse.Message); + } + } + + [Test] + public void CustomProcedureOutOfOrderFreeBufferTest() + { + server.Register.NewProcedure("OUTOFORDERFREE", new OutOfOrderFreeBuffer()); + var key = "key"; + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + byte[] value = new byte[10_000]; + db.StringSet(key, value); + + var result = db.Execute("OUTOFORDERFREE", key); + ClassicAssert.AreEqual("OK", result.ToString()); + } + private string[] CreateTestLibraries() { var runtimePath = RuntimeEnvironment.GetRuntimeDirectory(); diff --git a/website/docs/extensions/procedure.md b/website/docs/extensions/procedure.md index 70a13afd33..ba3178707b 100644 --- a/website/docs/extensions/procedure.md +++ b/website/docs/extensions/procedure.md @@ -24,6 +24,10 @@ Registering the custom procedure is done on the server-side by calling the method on the Garnet server object's `RegisterAPI` object with its name, an instance of the custom procedure class and optional commandInfo. +**NOTE** When invoking APIs on `IGarnetApi` multiple times with large outputs, it is possible to exhaust the internal buffer capacity. If such usage scenarios are expected, the buffer could be reset as described below. +* Retrieve the initial buffer offset using `IGarnetApi.GetScratchBufferOffset` +* Invoke necessary apis on `IGarnetApi` +* Reset the buffer back to where it was using `IGarnetApi.ResetScratchBuffer(offset)` :::tip As a reference of an implementation of a custom procedure, see the example in GarnetServer\Extensions\Sum.cs. diff --git a/website/docs/extensions/transactions.md b/website/docs/extensions/transactions.md index 5c91355926..c97576c91c 100644 --- a/website/docs/extensions/transactions.md +++ b/website/docs/extensions/transactions.md @@ -26,6 +26,11 @@ These are the helper methods for developing custom transactions. - `GetNextArg(ArgSlice input, ref int offset)` This method is used to retrieve the next argument from the input at the specified offset. It takes an ArgSlice parameter representing the input and a reference to an int offset. It returns an ArgSlice object representing the argument as a span. The method internally reads a pointer with a length header to extract the argument. These member functions provide utility and convenience methods for manipulating and working with the transaction data, scratch buffer, and input arguments within the CustomTransactionProcedure class. +**NOTE** When invoking APIs on `IGarnetApi` multiple times with large outputs, it is possible to exhaust the internal buffer capacity. If such usage scenarios are expected, the buffer could be reset as described below. +* Retrieve the initial buffer offset using `IGarnetApi.GetScratchBufferOffset` +* Invoke necessary apis on `IGarnetApi` +* Reset the buffer back to where it was using `IGarnetApi.ResetScratchBuffer(offset)` + Registering the custom transaction is done on the server-side by calling the `NewTransactionProc(string name, int numParams, Func proc)` method on the Garnet server object's `RegisterAPI` object with its name, number of parameters and a method that returns an instance of the custom transaction class.\ It is possible to register the custom transaction from the client-side as well (as an admin command, given that the code already resides on the server) by using the `REGISTER` command (see [Custom Commands](../dev/custom-commands.md)).