Skip to content

Commit

Permalink
Merge branch 'main' into binaryCommands
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherDavenport authored Mar 10, 2022
2 parents 0d4512b + 34a7b40 commit 3297a19
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 22 deletions.
15 changes: 14 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ lazy val core = crossProject(JVMPlatform, JSPlatform)
.crossType(CrossType.Pure)
.in(file("core"))
.settings(yPartial)
.settings(yKindProjector)
.settings(
name := "rediculous",
mimaPreviousArtifacts := Set(), // Bincompat breaking till next release
Expand Down Expand Up @@ -87,4 +88,16 @@ lazy val yPartial =
if (scalaVersion.value.startsWith("2.12")) Seq("-Ypartial-unification")
else Seq()
}
)
)

lazy val yKindProjector =
Seq(
scalacOptions ++= {
if(scalaVersion.value.startsWith("3")) Seq("-Ykind-projector")
else Seq()
},
libraryDependencies ++= {
if(scalaVersion.value.startsWith("3")) Seq()
else Seq(compilerPlugin("org.typelevel" % "kind-projector" % "0.13.2" cross CrossVersion.full))
}
)
18 changes: 9 additions & 9 deletions core/src/main/scala/io/chrisdavenport/rediculous/Redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ object Redis {
def unwrap[F[_], A](fa: Type[F, A]): Redis[F, A] =
fa.asInstanceOf[Redis[F, A]]

def parallel[F[_]]: ({ type M[A] = Redis[F, A] })#M ~> ({ type M[A] = Par[F, A] })#M = new ~>[({ type M[A] = Redis[F, A] })#M, ({ type M[A] = Par[F, A] })#M]{
def parallel[F[_]]: Redis[F, *] ~> Par[F, *] = new ~>[Redis[F, *], Par[F, *]]{
def apply[A](fa: Redis[F,A]): Par[F,A] = Par(fa)
}
def sequential[F[_]]: ({ type M[A] = Par[F, A] })#M ~> ({ type M[A] = Redis[F, A] })#M = new ~>[({ type M[A] = Par[F, A] })#M, ({ type M[A] = Redis[F, A] })#M]{
def sequential[F[_]]: Par[F, *] ~> Redis[F, *] = new ~>[Par[F, *], Redis[F, *]]{
def apply[A](fa: Par[F,A]): Redis[F,A] = unwrap(fa)
}

implicit def parApplicative[F[_]: Parallel: Monad]: Applicative[({ type M[A] = Par[F, A] })#M] = new Applicative[({ type M[A] = Par[F, A] })#M]{
implicit def parApplicative[F[_]: Parallel: Monad]: Applicative[Par[F, *]] = new Applicative[Par[F, *]]{
def ap[A, B](ff: Par[F,A => B])(fa: Par[F,A]): Par[F,B] = Par(Redis{
val kfx : Kleisli[F, RedisConnection[F], A => B] = Par.unwrap(ff).unRedis
val kfa : Kleisli[F, RedisConnection[F], A] = Par.unwrap(fa).unRedis
Expand All @@ -68,10 +68,10 @@ object Redis {

}

implicit def monad[F[_]: Monad]: Monad[({ type M[A] = Redis[F, A] })#M] = new Monad[({ type M[A] = Redis[F, A] })#M]{
implicit def monad[F[_]: Monad]: Monad[Redis[F, *]] = new Monad[Redis[F, *]]{

def tailRecM[A, B](a: A)(f: A => Redis[F,Either[A,B]]): Redis[F,B] = Redis(
Monad[({ type M[A] = Kleisli[F, RedisConnection[F], A]})#M].tailRecM[A, B](a)(f.andThen(_.unRedis.flatMap(fe => Kleisli.liftF(fe.pure[F]))))
Monad[Kleisli[F, RedisConnection[F], *]].tailRecM[A, B](a)(f.andThen(_.unRedis.flatMap(fe => Kleisli.liftF(fe.pure[F]))))
)

def flatMap[A, B](fa: Redis[F,A])(f: A => Redis[F,B]): Redis[F,B] = Redis(
Expand All @@ -84,17 +84,17 @@ object Redis {
)
}

implicit def parRedis[M[_]: Parallel: Concurrent]: Parallel[({ type L[A] = Redis[M, A] })#L] = new Parallel[({ type L[A] = Redis[M, A] })#L]{
implicit def parRedis[M[_]: Parallel: Concurrent]: Parallel[Redis[M, *]] = new Parallel[Redis[M, *]]{

type F[A] = Par[M, A]

def sequential: F ~> ({ type L[X] = Redis[M, X] })#L =
def sequential: F ~> Redis[M, *] =
Par.sequential[M]

def parallel: ({ type L[A] = Redis[M, A] })#L ~> F = Par.parallel[M]
def parallel: Redis[M, *] ~> F = Par.parallel[M]

def applicative: Applicative[F] = Par.parApplicative[M]

def monad: Monad[({ type L[A] = Redis[M, A] })#L] = Redis.monad[M]
def monad: Monad[Redis[M, *]] = Redis.monad[M]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object RedisConnection{
def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk)

connection match {
case PooledConnection(pool) => Functor[({type M[A] = KeyPool[F, Unit, A]})#M].map(pool)(_._1).take(()).use{
case PooledConnection(pool) => Functor[KeyPool[F, Unit, *]].map(pool)(_._1).take(()).use{
m => withSocket(m.value).attempt.flatTap{
case Left(_) => m.canBeReused.set(Reusable.DontReuse)
case _ => Applicative[F].unit
Expand Down Expand Up @@ -273,7 +273,7 @@ object RedisConnection{
Stream.fromQueueUnterminatedChunk(queue, chunkSizeLimit).chunks.map{chunk =>
val s = if (chunk.nonEmpty) {
Stream.eval(
Functor[({type M[A] = KeyPool[F, Unit, A]})#M].map(keypool)(_._1).take(()).attempt.use{
Functor[KeyPool[F, Unit, *]].map(keypool)(_._1).take(()).attempt.use{
case Right(m) =>
val out = chunk.map(_._2)
explicitPipelineRequest(m.value, out).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize
Expand All @@ -299,7 +299,7 @@ object RedisConnection{
.compile
.drain
.background
} yield Queued(queue, keypool.take(()).map(Functor[({type M[A] = Managed[F, A]})#M].map(_)(_._1)))
} yield Queued(queue, keypool.take(()).map(Functor[Managed[F, *]].map(_)(_._1)))
}
}

Expand Down Expand Up @@ -384,7 +384,7 @@ object RedisConnection{
).build

// Cluster Topology Acquisition and Management
sockets <- Resource.eval(keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[({ type M[A] = Redis[F, A] })#M].run(_)))
sockets <- Resource.eval(keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_)))
now <- Resource.eval(Temporal[F].realTime.map(_.toMillis))
refreshLock <- Resource.eval(Semaphore[F](1L))
refTopology <- Resource.eval(Ref[F].of((sockets, now)))
Expand All @@ -403,7 +403,7 @@ object RedisConnection{
case ((_, setAt), now) if setAt >= (now - cacheTopologySeconds.toMillis) => Applicative[F].unit
case ((l, _), _) =>
val nelActions: NonEmptyList[F[ClusterSlots]] = l.map{ case (host, port) =>
keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[({ type M[A] = Redis[F, A] })#M].run(_))
keypool.take((host, port)).map(_.value._1).map(DirectConnection(_)).use(ClusterCommands.clusterslots[Redis[F, *]].run(_))
}
raceNThrowFirst(nelActions)
.flatMap(s => Clock[F].realTime.map(_.toMillis).flatMap(now => refTopology.set((s,now))))
Expand All @@ -422,7 +422,7 @@ object RedisConnection{
}.toSeq
).evalMap{
case (server, rest) =>
Functor[({type M[A] = KeyPool[F, (Host, Port), A]})#M].map(keypool)(_._1).take(server).attempt.use{
Functor[KeyPool[F, (Host, Port), *]].map(keypool)(_._1).take(server).attempt.use{
case Right(m) =>
val out = Chunk.seq(rest.map(_._5))
explicitPipelineRequest(m.value, out).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ object RedisCtx {
}
}

implicit def redis[F[_]: Concurrent]: RedisCtx[({ type M[A] = Redis[F, A] })#M] = new RedisCtx[({ type M[A] = Redis[F, A] })#M]{
def keyedBV[A: RedisResult](key: ByteVector, command: NonEmptyList[ByteVector]): Redis[F,A] =
implicit def redis[F[_]: Concurrent]: RedisCtx[Redis[F, *]] = new RedisCtx[Redis[F, *]]{
def keyedBV[A: RedisResult](key: ByteVector, command: NonEmptyList[ByteVector]): Redis[F,A] =
RedisConnection.runRequestTotal(command, Some(key))
def unkeyedBV[A: RedisResult](command: NonEmptyList[ByteVector]): Redis[F, A] =
RedisConnection.runRequestTotal(command, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ object RedisPubSub {
val pSubPrefix = "ps:"

def publish(s: String, message: String): F[Int] =
RedisCtx[({type M[A] = Redis[F, A]})#M].unkeyed[Int](cats.data.NonEmptyList.of("PUBLISH", s, message)).run(connection)
RedisCtx[Redis[F, *]].unkeyed[Int](cats.data.NonEmptyList.of("PUBLISH", s, message)).run(connection)

def unsubscribeAll: F[Unit] = cbStorage.get.map(_.keys.toList).flatMap{list =>
val channelSubscriptions = list.collect{ case x if x.startsWith("c") => x.drop(3)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object RedisTransaction {
object RedisTxState {

implicit val m: Monad[RedisTxState] = new StackSafeMonad[RedisTxState]{
def pure[A](a: A): RedisTxState[A] = RedisTxState(Monad[({ type F[A] = State[(Int, List[NonEmptyList[ByteVector]], Option[ByteVector]), A]})#F].pure(a))
def pure[A](a: A): RedisTxState[A] = RedisTxState(Monad[State[(Int, List[NonEmptyList[ByteVector]], Option[ByteVector]), *]].pure(a))
def flatMap[A, B](fa: RedisTxState[A])(f: A => RedisTxState[B]): RedisTxState[B] = RedisTxState(
fa.value.flatMap(f.andThen(_.value))
)
Expand All @@ -110,10 +110,10 @@ object RedisTransaction {
// Operations
// ----------
def watch[F[_]: Concurrent](keys: List[String]): Redis[F, Status] =
RedisCtx[({ type M[A] = Redis[F, A] })#M].unkeyed(NonEmptyList("WATCH", keys))
RedisCtx[Redis[F, *]].unkeyed(NonEmptyList("WATCH", keys))

def unwatch[F[_]: Concurrent]: Redis[F, Status] =
RedisCtx[({ type M[A] = Redis[F, A] })#M].unkeyed(NonEmptyList.of("UNWATCH"))
RedisCtx[Redis[F, *]].unkeyed(NonEmptyList.of("UNWATCH"))

def multiExec[F[_]] = new MultiExecPartiallyApplied[F]

Expand Down

0 comments on commit 3297a19

Please sign in to comment.