Skip to content

Commit

Permalink
Rework Redis -> RedisClient and make thread-safe
Browse files Browse the repository at this point in the history
  • Loading branch information
afrankvt committed Jan 8, 2024
1 parent 0c1555e commit ead928d
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 17 deletions.
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Version 0.2 (working)
* Rework `Redis` -> `RedisClient` and make thread-safe
* Update `SocketOptions` -> `SocketConfig`

## Version 0.1 (13-Apr-2021)
Expand Down
129 changes: 129 additions & 0 deletions src/redis/fan/RedisClient.fan
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
//
// Copyright (c) 2023, Novant LLC
// Licensed under the MIT License
//
// History:
// 8 Jan 2023 Andy Frank Creation
//

using concurrent
using inet

**************************************************************************
** RedisClient
**************************************************************************

** Redis client.
const class RedisClient
{
** Create a new client instance for given host and port.
new make(Str host, Int port := 6379)
{
this.host = host
this.port = port
}

** Host name of Redis server.
const Str host

** Port number of Redis server.
const Int port

** Close this client all connections if applicable.
Void close()
{
actor.pool.stop.join
}

** Convenience for 'invoke(["GET", key])'.
Str? get(Str key)
{
invoke(["GET", key])
}

** Convenience for 'invoke(["SET", key, val])'.
Void set(Str key, Obj val)
{
invoke(["SET", key, val])
}

** Convenience for 'invoke(["DEL", key])'.
Void del(Str key)
{
invoke(["DEL", key])
}

** Invoke the given command and return response.
Obj? invoke(Obj[] args)
{
// NOTE: we use unsafe on returns since we can guaretee the
// reference is not touched again; we also use Unsafe for
// args for performance to avoid serialization; and in _most_
// cases this should be fine; but it does create an edge case

Unsafe u := actor.send(RMsg('v', args)).get
return u.val
}

** Pipeline multiple `invoke` requests and return batched results.
Obj?[] pipeline(Obj[] invokes)
{
// NOTE: we use unsafe on returns since we can guaretee the
// reference is not touched again; we also use Unsafe for
// args for performance to avoid serialization; and in _most_
// cases this should be fine; but it does create an edge case

Unsafe u := actor.send(RMsg('p', invokes)).get
return u.val
}

** Log instance.
internal const Log log := Log("redis", false)

// Actor
private const ActorPool pool := ActorPool { name="RedisClient" }
private const Actor actor := Actor(pool) |msg|
{
RedisConn? c
try
{
c = Actor.locals["c"]
if (c == null) Actor.locals["c"] = c = RedisConn(host, port)

RMsg m := msg
switch (m.op)
{
case 'v': return Unsafe(c.invoke(m.a))
case 'p': return Unsafe(c.pipeline(m.a))
default: throw ArgErr("Unknown op '${m.op.toChar}'")
}
}
catch (Err err)
{
// TODO: this could be smarter; only teardown for network errs?
log.err("Unexpected error", err)
c?.close
Actor.locals["c"] = null
throw err
}
return null
}
}

**************************************************************************
** RMsg
**************************************************************************

internal const class RMsg
{
new make(Int op, Obj? a := null)
{
this.op = op
this.ua = Unsafe(a)
}

const Int op
Obj? a() { ua.val }

private const Unsafe? ua
}
21 changes: 6 additions & 15 deletions src/redis/fan/Redis.fan → src/redis/fan/RedisConn.fan
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
//
// Copyright (c) 2021, Novant LLC
// Copyright (c) 2023, Novant LLC
// Licensed under the MIT License
//
// History:
// 11 Apr 2021 Andy Frank Creation
// 8 Jan 2023 Andy Frank Creation
//

using inet

**
** Redis client.
** Redis client connection.
**
class Redis
internal class RedisConn
{
** Open a new Redis API client connection to given host.
static new open(Str host, Int port := 6379)
static new open(Str host, Int port)
{
s := TcpSocket(SocketConfig {
it.connectTimeout = 10sec
it.receiveTimeout = 10sec
})
s.connect(IpAddr(host), port)
return Redis(s)
return RedisConn(s)
}

** Private ctor.
Expand All @@ -30,15 +30,6 @@ class Redis
this.socket = s
}

** Convenience for 'invoke(["GET", key])'.
Str? get(Str key) { invoke(["GET", key]) }

** Convenience for 'invoke(["SET", key, val])'.
Void set(Str key, Obj val) { invoke(["SET", key, val]) }

** Convenience for 'invoke(["DEL", key])'.
Void del(Str key) { invoke(["DEL", key]) }

** Invoke the given command and return response.
Obj? invoke(Obj[] args)
{
Expand Down
4 changes: 2 additions & 2 deletions src/redis/test/RedisTest.fan
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class RedisTest : Test
Void testBasics()
{
startServer
r := Redis.open(server.host, server.port)
r := RedisClient(server.host, server.port)
verifyEq(r.get("foo"), null)
r.set("foo", 5)
verifyEq(r.get("foo"), "5")
Expand All @@ -46,7 +46,7 @@ class RedisTest : Test
Void testPipeline()
{
startServer
r := Redis.open(server.host, server.port)
r := RedisClient(server.host, server.port)
v := r.pipeline([
["GET", "foo"],
["SET", "foo", 5],
Expand Down
1 change: 1 addition & 0 deletions src/redis/test/TestServer.fan
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ internal class TestServer
{
if (proc != null) this.proc.kill.join
}

private Process? proc
}

0 comments on commit ead928d

Please sign in to comment.