Skip to content

Commit

Permalink
Merge pull request #8 from ChristopherDavenport/fibersOverPossibleDea…
Browse files Browse the repository at this point in the history
…dlock
  • Loading branch information
ChristopherDavenport authored Aug 17, 2020
2 parents 12ba4e4 + 7030ada commit c5d2432
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object RedisConnection{
) extends RedisConnection[F]
private case class DirectConnection[F[_]](socket: Socket[F]) extends RedisConnection[F]

private case class Cluster[F[_]](queue: Chunk[(Deferred[F, Either[Throwable, Resp]], Option[String], Option[(String, Int)], Int, Resp)] => F[Unit]) extends RedisConnection[F]
private case class Cluster[F[_]](queue: Queue[F, Chunk[(Deferred[F, Either[Throwable, Resp]], Option[String], Option[(String, Int)], Int, Resp)]]) extends RedisConnection[F]

// Guarantees With Socket That Each Call Receives a Response
// Chunk must be non-empty but to do so incurs a penalty
Expand Down Expand Up @@ -75,7 +75,7 @@ object RedisConnection{
}
}
case Cluster(queue) => chunk.traverse(resp => Deferred[F, Either[Throwable, Resp]].map((_, key, None, 0, resp))).flatMap{ c =>
queue(c).as {
queue.enqueue1(c).as {
c.traverse(_._1.get).flatMap(_.sequence.traverse(l => Sync[F].delay(l.toNel.getOrElse(throw RedisError.Generic("Rediculous: Impossible Return List was Empty but we guarantee output matches input"))))).rethrow
}
}
Expand Down Expand Up @@ -197,8 +197,10 @@ object RedisConnection{
sockets <- Resource.liftF(keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_)))
refTopology <- Resource.liftF(Ref[F].of(sockets))
queue <- Resource.liftF(Queue.bounded[F, Chunk[(Deferred[F, Either[Throwable,Resp]], Option[String], Option[(String, Int)], Int, Resp)]](maxQueued))
cluster = Cluster(queue.enqueue1)
refreshTopology = ClusterCommands.clusterslots[Redis[F, *]].run(cluster).flatMap(refTopology.set)
cluster = Cluster(queue)
refreshTopology = refTopology.get.flatMap(_.random).flatMap{ case (host, port) =>
keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_))
}.flatMap(s => refTopology.set(s))
_ <-
queue.dequeue.chunks.map{chunkChunk =>
val chunk = chunkChunk.flatten
Expand All @@ -219,24 +221,41 @@ object RedisConnection{
}
}.flatMap{
case Right(n) =>
n.zipWithIndex.parTraverseN(10){ // Parallelized Due to Possible Holding Loops
n.zipWithIndex.traverse_{
case (ref, i) =>
val (toSet, key, server, retries, initialCommand) = rest(i)
val (toSet, key, _, retries, initialCommand) = rest(i)
ref match {
case Resp.Error(s) if (s.startsWith("MOVED") && retries <= 5) => // MOVED 1234-2020 127.0.0.1:6381
refreshTopology >> cluster.queue(Chunk.singleton((toSet, key, extractServer(s).orElse(server), retries + 1, initialCommand))) // We only end up here max
case Resp.Error(s) if (s.startsWith("ASK") && retries <= 5) => // ASK 1234-2020 127.0.0.1:6381
val serverRedirect = extractServer(s).orElse(server)
Deferred[F, Either[Throwable, Resp]].flatMap{d => // No One Cares About this Callback
val asking = (d, key, serverRedirect, 6, Resp.renderRequest(NonEmptyList.of("ASKING"))) // Never Repeat Asking
val repeat = (toSet, key, serverRedirect, retries + 1, initialCommand)
val chunk = Chunk(asking, repeat)
cluster.queue(chunk)
case e@Resp.Error(s) if (s.startsWith("MOVED") && retries <= 5) => // MOVED 1234-2020 127.0.0.1:6381
refreshTopology.attempt.void >>
// Offer To Have it reprocessed.
// If the queue is full return the error to the user
cluster.queue.offer1(Chunk.singleton((toSet, key, extractServer(s), retries + 1, initialCommand)))
.ifM(
Applicative[F].unit,
toSet.complete(Either.right(e))
)
case e@Resp.Error(s) if (s.startsWith("ASK") && retries <= 5) => // ASK 1234-2020 127.0.0.1:6381
val serverRedirect = extractServer(s)
serverRedirect match {
case s@Some(_) => // This is a Special One Off, Requires a Redirect
Deferred[F, Either[Throwable, Resp]].flatMap{d => // No One Cares About this Callback
val asking = (d, key, s, 6, Resp.renderRequest(NonEmptyList.of("ASKING"))) // Never Repeat Asking
val repeat = (toSet, key, s, retries + 1, initialCommand)
val chunk = Chunk(asking, repeat)
cluster.queue.offer1(chunk) // Offer To Have it reprocessed.
//If the queue is full return the error to the user
.ifM(
Applicative[F].unit,
toSet.complete(Either.right(e))
)
}
case None =>
toSet.complete(Either.right(e))
}
case otherwise =>
toSet.complete(Either.right(otherwise))
}
}.void
}
case e@Left(_) =>
rest.traverse_{ case (deff, _, _, _, _) => deff.complete(e.asInstanceOf[Either[Throwable, Resp]])}
}
Expand Down
15 changes: 9 additions & 6 deletions examples/src/main/scala/ClusterExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import fs2.Stream
import fs2.io.tcp._
import scala.concurrent.duration._

// Send a Single Transaction to the Redis Server
// Mimics 150 req/s load with 15 operations per request.
// Completes 1,000,000 redis operations
// Completes in <8 s
object ClusterExample extends IOApp {

def run(args: List[String]): IO[ExitCode] = {
Expand All @@ -14,7 +16,7 @@ object ClusterExample extends IOApp {
sg <- SocketGroup[IO](blocker)
// maxQueued: How many elements before new submissions semantically block.
// Default 1000 is good for small servers. But can easily take 100,000.
connection <- RedisConnection.cluster[IO](sg, "localhost", 30001, maxQueued = 10000)
connection <- RedisConnection.cluster[IO](sg, "localhost", 30001, maxQueued = 10000, workers = 2)
} yield connection

r.use {client =>
Expand All @@ -28,14 +30,15 @@ object ClusterExample extends IOApp {

val r = (keyed("foo"), keyed("bar"), keyed("baz")).parTupled

val r2= List.fill(10)(r.run(client)).parSequence.map{_.flatMap{
case ((_,_,_,_, _), (_,_,_,_, _),(_,_,_,_, _)) => List((), (), (), (), (), (), (), (), (), (), (), (), (), (), ())
}}
val r2= r.run(client).map{
case ((_,_,_,_, _), (_,_,_,_, _),(_,_,_,_, _)) => // 3 x 5
List.fill(15)(())
}

val now = IO(java.time.Instant.now)
(
now,
Stream(()).covary[IO].repeat.map(_ => Stream.evalSeq(r2)).parJoin(15).take(1000000).compile.drain,
Stream(()).covary[IO].repeat.map(_ => Stream.evalSeq(r2)).parJoin(150).take(1000000).compile.drain,
now
).mapN{
case (before, _, after) => (after.toEpochMilli() - before.toEpochMilli()).millis
Expand Down

0 comments on commit c5d2432

Please sign in to comment.