diff --git a/changelog.md b/changelog.md index 453d7ee..6074f14 100644 --- a/changelog.md +++ b/changelog.md @@ -1,6 +1,7 @@ # Changelog ## Version 0.2 (working) +* Rework `Redis` -> `RedisClient` and make thread-safe * Update `SocketOptions` -> `SocketConfig` ## Version 0.1 (13-Apr-2021) diff --git a/src/redis/fan/RedisClient.fan b/src/redis/fan/RedisClient.fan new file mode 100644 index 0000000..9cea3f4 --- /dev/null +++ b/src/redis/fan/RedisClient.fan @@ -0,0 +1,129 @@ +// +// Copyright (c) 2023, Novant LLC +// Licensed under the MIT License +// +// History: +// 8 Jan 2023 Andy Frank Creation +// + +using concurrent +using inet + +************************************************************************** +** RedisClient +************************************************************************** + +** Redis client. +const class RedisClient +{ + ** Create a new client instance for given host and port. + new make(Str host, Int port := 6379) + { + this.host = host + this.port = port + } + + ** Host name of Redis server. + const Str host + + ** Port number of Redis server. + const Int port + + ** Close this client all connections if applicable. + Void close() + { + actor.pool.stop.join + } + + ** Convenience for 'invoke(["GET", key])'. + Str? get(Str key) + { + invoke(["GET", key]) + } + + ** Convenience for 'invoke(["SET", key, val])'. + Void set(Str key, Obj val) + { + invoke(["SET", key, val]) + } + + ** Convenience for 'invoke(["DEL", key])'. + Void del(Str key) + { + invoke(["DEL", key]) + } + + ** Invoke the given command and return response. + Obj? invoke(Obj[] args) + { + // NOTE: we use unsafe on returns since we can guaretee the + // reference is not touched again; we also use Unsafe for + // args for performance to avoid serialization; and in _most_ + // cases this should be fine; but it does create an edge case + + Unsafe u := actor.send(RMsg('v', args)).get + return u.val + } + + ** Pipeline multiple `invoke` requests and return batched results. + Obj?[] pipeline(Obj[] invokes) + { + // NOTE: we use unsafe on returns since we can guaretee the + // reference is not touched again; we also use Unsafe for + // args for performance to avoid serialization; and in _most_ + // cases this should be fine; but it does create an edge case + + Unsafe u := actor.send(RMsg('p', invokes)).get + return u.val + } + + ** Log instance. + internal const Log log := Log("redis", false) + + // Actor + private const ActorPool pool := ActorPool { name="RedisClient" } + private const Actor actor := Actor(pool) |msg| + { + RedisConn? c + try + { + c = Actor.locals["c"] + if (c == null) Actor.locals["c"] = c = RedisConn(host, port) + + RMsg m := msg + switch (m.op) + { + case 'v': return Unsafe(c.invoke(m.a)) + case 'p': return Unsafe(c.pipeline(m.a)) + default: throw ArgErr("Unknown op '${m.op.toChar}'") + } + } + catch (Err err) + { + // TODO: this could be smarter; only teardown for network errs? + log.err("Unexpected error", err) + c?.close + Actor.locals["c"] = null + throw err + } + return null + } +} + +************************************************************************** +** RMsg +************************************************************************** + +internal const class RMsg +{ + new make(Int op, Obj? a := null) + { + this.op = op + this.ua = Unsafe(a) + } + + const Int op + Obj? a() { ua.val } + + private const Unsafe? ua +} diff --git a/src/redis/fan/Redis.fan b/src/redis/fan/RedisConn.fan similarity index 70% rename from src/redis/fan/Redis.fan rename to src/redis/fan/RedisConn.fan index 0879ddc..f556201 100644 --- a/src/redis/fan/Redis.fan +++ b/src/redis/fan/RedisConn.fan @@ -1,27 +1,27 @@ // -// Copyright (c) 2021, Novant LLC +// Copyright (c) 2023, Novant LLC // Licensed under the MIT License // // History: -// 11 Apr 2021 Andy Frank Creation +// 8 Jan 2023 Andy Frank Creation // using inet ** -** Redis client. +** Redis client connection. ** -class Redis +internal class RedisConn { ** Open a new Redis API client connection to given host. - static new open(Str host, Int port := 6379) + static new open(Str host, Int port) { s := TcpSocket(SocketConfig { it.connectTimeout = 10sec it.receiveTimeout = 10sec }) s.connect(IpAddr(host), port) - return Redis(s) + return RedisConn(s) } ** Private ctor. @@ -30,15 +30,6 @@ class Redis this.socket = s } - ** Convenience for 'invoke(["GET", key])'. - Str? get(Str key) { invoke(["GET", key]) } - - ** Convenience for 'invoke(["SET", key, val])'. - Void set(Str key, Obj val) { invoke(["SET", key, val]) } - - ** Convenience for 'invoke(["DEL", key])'. - Void del(Str key) { invoke(["DEL", key]) } - ** Invoke the given command and return response. Obj? invoke(Obj[] args) { diff --git a/src/redis/test/RedisTest.fan b/src/redis/test/RedisTest.fan index 24b21d5..daaee37 100644 --- a/src/redis/test/RedisTest.fan +++ b/src/redis/test/RedisTest.fan @@ -34,7 +34,7 @@ class RedisTest : Test Void testBasics() { startServer - r := Redis.open(server.host, server.port) + r := RedisClient(server.host, server.port) verifyEq(r.get("foo"), null) r.set("foo", 5) verifyEq(r.get("foo"), "5") @@ -46,7 +46,7 @@ class RedisTest : Test Void testPipeline() { startServer - r := Redis.open(server.host, server.port) + r := RedisClient(server.host, server.port) v := r.pipeline([ ["GET", "foo"], ["SET", "foo", 5], diff --git a/src/redis/test/TestServer.fan b/src/redis/test/TestServer.fan index d026206..92ff5dc 100644 --- a/src/redis/test/TestServer.fan +++ b/src/redis/test/TestServer.fan @@ -34,5 +34,6 @@ internal class TestServer { if (proc != null) this.proc.kill.join } + private Process? proc } \ No newline at end of file