diff --git a/build.sbt b/build.sbt
index 0ca809c38..78bdfa9ba 100644
--- a/build.sbt
+++ b/build.sbt
@@ -18,6 +18,8 @@ val vulcanVersion = "1.10.1"
val munitVersion = "0.7.29"
+val weaverVersion = "0.8.3"
+
val scala212 = "2.12.18"
val scala213 = "2.13.12"
@@ -111,12 +113,13 @@ lazy val docs = project
lazy val dependencySettings = Seq(
resolvers += "confluent".at("https://packages.confluent.io/maven/"),
libraryDependencies ++= Seq(
- "com.dimafeng" %% "testcontainers-scala-scalatest" % testcontainersScalaVersion,
- "com.dimafeng" %% "testcontainers-scala-kafka" % testcontainersScalaVersion,
- "org.typelevel" %% "discipline-scalatest" % disciplineVersion,
- "org.typelevel" %% "cats-effect-laws" % catsEffectVersion,
- "org.typelevel" %% "cats-effect-testkit" % catsEffectVersion,
- "ch.qos.logback" % "logback-classic" % logbackVersion
+ "com.disneystreaming" %% "weaver-cats" % weaverVersion,
+ "com.dimafeng" %% "testcontainers-scala-scalatest" % testcontainersScalaVersion,
+ "com.dimafeng" %% "testcontainers-scala-kafka" % testcontainersScalaVersion,
+ "org.typelevel" %% "discipline-scalatest" % disciplineVersion,
+ "org.typelevel" %% "cats-effect-laws" % catsEffectVersion,
+ "org.typelevel" %% "cats-effect-testkit" % catsEffectVersion,
+ "ch.qos.logback" % "logback-classic" % logbackVersion
).map(_ % Test),
libraryDependencies ++= {
if (scalaVersion.value.startsWith("3")) Nil
@@ -319,7 +322,8 @@ lazy val scalaSettings = Seq(
lazy val testSettings = Seq(
Test / logBuffered := false,
Test / parallelExecution := false,
- Test / testOptions += Tests.Argument("-oDF")
+ Test / testOptions += Tests.Argument("-oDF"),
+ testFrameworks += new TestFramework("weaver.framework.CatsEffect")
)
def minorVersion(version: String): String = {
diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala
index 6a1f4fe02..3871b07ae 100644
--- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala
+++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala
@@ -16,6 +16,7 @@ import cats.effect.*
import cats.effect.std.*
import cats.effect.syntax.all.*
import cats.syntax.all.*
+import cats.Show
import fs2.kafka.*
import fs2.kafka.instances.*
import fs2.kafka.internal.converters.collection.*
@@ -609,6 +610,8 @@ private[kafka] object KafkaConsumerActor {
streaming = false
)
+ implicit def show[F[_], K, V]: Show[State[F, K, V]] = Show.fromToString
+
}
sealed abstract class FetchCompletedReason {
diff --git a/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala b/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala
index 782945a1d..afe3d4461 100644
--- a/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala
+++ b/modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala
@@ -10,6 +10,7 @@ import java.util.regex.Pattern
import scala.collection.immutable.SortedSet
+import cats.*
import cats.data.{Chain, NonEmptyList, NonEmptySet, NonEmptyVector}
import cats.syntax.all.*
import fs2.kafka.instances.*
@@ -31,15 +32,15 @@ sealed abstract private[kafka] class LogEntry {
private[kafka] object LogEntry {
- final case class SubscribedTopics[F[_]](
- topics: NonEmptyList[String],
- state: State[F, ?, ?]
+ final case class SubscribedTopics[G[_]: Reducible, State: Show](
+ topics: G[String],
+ state: State
) extends LogEntry {
override def level: LogLevel = Debug
override def message: String =
- s"Consumer subscribed to topics [${topics.toList.mkString(", ")}]. Current state [$state]."
+ show"Consumer subscribed to topics [${topics.mkString_(", ")}]. Current state [$state]."
}
@@ -55,26 +56,26 @@ private[kafka] object LogEntry {
}
- final case class SubscribedPattern[F[_]](
+ final case class SubscribedPattern[State: Show](
pattern: Pattern,
- state: State[F, ?, ?]
+ state: State
) extends LogEntry {
override def level: LogLevel = Debug
override def message: String =
- s"Consumer subscribed to pattern [$pattern]. Current state [$state]."
+ s"Consumer subscribed to pattern [$pattern]. Current state [${state.show}]."
}
- final case class Unsubscribed[F[_]](
- state: State[F, ?, ?]
+ final case class Unsubscribed[State: Show](
+ state: State
) extends LogEntry {
override def level: LogLevel = Debug
override def message: String =
- s"Consumer unsubscribed from all partitions. Current state [$state]."
+ show"Consumer unsubscribed from all partitions. Current state [$state]."
}
@@ -103,27 +104,27 @@ private[kafka] object LogEntry {
}
- final case class AssignedPartitions[F[_]](
+ final case class AssignedPartitions[State: Show](
partitions: SortedSet[TopicPartition],
- state: State[F, ?, ?]
+ state: State
) extends LogEntry {
override def level: LogLevel = Debug
override def message: String =
- s"Assigned partitions [${partitions.mkString(", ")}]. Current state [$state]."
+ s"Assigned partitions [${partitions.mkString(", ")}]. Current state [${state.show}]."
}
- final case class RevokedPartitions[F[_]](
+ final case class RevokedPartitions[State: Show](
partitions: SortedSet[TopicPartition],
- state: State[F, ?, ?]
+ state: State
) extends LogEntry {
override def level: LogLevel = Debug
override def message: String =
- s"Revoked partitions [${partitions.mkString(", ")}]. Current state [$state]."
+ s"Revoked partitions [${partitions.mkString(", ")}]. Current state [${state.show}]."
}
diff --git a/modules/core/src/main/scala/fs2/kafka/internal/experimental/KafkaConsumer.scala b/modules/core/src/main/scala/fs2/kafka/internal/experimental/KafkaConsumer.scala
new file mode 100644
index 000000000..d3fbccc21
--- /dev/null
+++ b/modules/core/src/main/scala/fs2/kafka/internal/experimental/KafkaConsumer.scala
@@ -0,0 +1,461 @@
+/*
+ * Copyright 2018-2024 OVO Energy Limited
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package fs2.kafka.internal.experimental
+
+import java.time.Duration
+import java.util
+
+import scala.collection.immutable.SortedSet
+import scala.util.matching.Regex
+
+import cats.*
+import cats.data.*
+import cats.effect.*
+import cats.effect.std.*
+import cats.effect.syntax.all.*
+import cats.syntax.all.*
+import fs2.{Chunk, Stream}
+import fs2.concurrent.Topic
+import fs2.kafka.*
+import fs2.kafka.consumer.{KafkaCommit, KafkaConsume, KafkaSubscription, MkConsumer}
+import fs2.kafka.instances.*
+import fs2.kafka.internal.{LogEntry, Logging, WithConsumer}
+import fs2.kafka.internal.converters.collection.*
+import fs2.kafka.internal.experimental.KafkaConsumer.{Request, State, Status}
+import fs2.kafka.internal.syntax.*
+
+import org.apache.kafka.clients.consumer.{
+ ConsumerConfig,
+ ConsumerRebalanceListener,
+ OffsetAndMetadata
+}
+import org.apache.kafka.common.TopicPartition
+
+class KafkaConsumer[F[_], K, V](
+ settings: ConsumerSettings[F, K, V],
+ keyDeserializer: KeyDeserializer[F, K],
+ valueDeserializer: ValueDeserializer[F, V],
+ state: AtomicCell[F, State[F, K, V]],
+ withConsumer: WithConsumer[F],
+ requests: Queue[F, Request[F, K, V]],
+ assignments: Topic[F, Map[TopicPartition, PartitionStream[F, K, V]]],
+ dispatcher: Dispatcher[F]
+)(implicit F: Async[F], logging: Logging[F], jitter: Jitter[F])
+ extends KafkaConsume[F, K, V]
+ with KafkaSubscription[F]
+ with KafkaCommit[F] {
+
+ private[this] type ConsumerRecords =
+ Map[TopicPartition, Chunk[CommittableConsumerRecord[F, K, V]]]
+
+ private[this] val consumerGroupId: Option[String] =
+ settings.properties.get(ConsumerConfig.GROUP_ID_CONFIG)
+
+ private[this] val pollTimeout: Duration = settings.pollTimeout.toJava
+
+ private[this] val consumerRebalanceListener: ConsumerRebalanceListener =
+ new ConsumerRebalanceListener {
+
+ override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit =
+ exec(partitions)(revoked)
+
+ override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit =
+ exec(partitions)(assigned)
+
+ override def onPartitionsLost(partitions: util.Collection[TopicPartition]): Unit =
+ exec(partitions)(lost)
+
+ private def exec(
+ partitions: util.Collection[TopicPartition]
+ )(f: SortedSet[TopicPartition] => F[Unit]): Unit =
+ dispatcher.unsafeRunSync(f(partitions.toSortedSet))
+
+ }
+
+ override def subscribe[G[_]: Reducible](topics: G[String]): F[Unit] =
+ subscribe(
+ _.subscribe(
+ topics.toList.asJava,
+ consumerRebalanceListener
+ ),
+ LogEntry.SubscribedTopics(topics, _)
+ )
+
+ override def subscribe(regex: Regex): F[Unit] =
+ subscribe(
+ _.subscribe(
+ regex.pattern,
+ consumerRebalanceListener
+ ),
+ LogEntry.SubscribedPattern(regex.pattern, _)
+ )
+
+ private def subscribe(f: KafkaByteConsumer => Unit, log: State[F, K, V] => LogEntry): F[Unit] =
+ state
+ .evalUpdateAndGet { state =>
+ transitionTo(state, Status.Subscribed) {
+ withConsumer.blocking(f)
+ }
+ }
+ .log(log)
+
+ override def unsubscribe: F[Unit] =
+ state
+ .evalUpdateAndGet { state =>
+ withConsumer
+ .blocking(_.unsubscribe())
+ .productR(state.assigned.values.toList.traverse_(_.close))
+ .as(State.empty)
+ }
+ .log(LogEntry.Unsubscribed(_))
+
+ override def stream: Stream[F, CommittableConsumerRecord[F, K, V]] =
+ partitionedStream.parJoinUnbounded
+
+ override def partitionedStream: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]] =
+ partitionsMapStream.flatMap(partitionsMap => Stream.iterable(partitionsMap.values))
+
+ override def partitionsMapStream
+ : Stream[F, Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]]] =
+ Stream
+ .resource(assignments.subscribeAwaitUnbounded)
+ .flatMap { subscription =>
+ subscription
+ .map(_.map { case (partition, stream) => partition -> stream.create })
+ .concurrently(
+ Stream(
+ Stream(Request.Poll).covary[F].repeat.debounce(settings.pollInterval),
+ Stream.fromQueueUnterminated(requests)
+ ).parJoin(2).foreach(handle)
+ )
+ }
+ .onFinalize(unsubscribe) // Ensure clean state after interruption/error
+
+ private[this] def handle(request: Request[F, K, V]): F[Unit] = request match {
+ case Request.Poll => handlePoll
+ case r: Request.Fetch[F, K, V] => handleFetch(r)
+ }
+
+ private[this] def handleFetch(request: Request.Fetch[F, K, V]): F[Unit] =
+ state.update { state =>
+ val partition = request.partition
+ val records = request.records
+ state.withFetch(partition, records)
+ // TODO Descartar fetch para particiones no asignadas (o paradas)
+ }
+
+ private[this] val handlePoll: F[Unit] =
+ state
+ .get
+ .flatMap { state =>
+ withConsumer.blocking { consumer =>
+ val requested = state.fetches.keySet
+ val paused = consumer.paused.toSet
+ val assigned = consumer.assignment.toSet
+
+ val resume = requested & paused
+ val pause = assigned &~ (requested | paused)
+
+ if (resume.nonEmpty)
+ consumer.resume(resume.asJava)
+
+ if (pause.nonEmpty)
+ consumer.pause(pause.asJava)
+
+ println(s"Pause: $pause")
+ println(s"Resume: $resume")
+ val records = consumer.poll(pollTimeout)
+
+ val returned = records.partitions.toSet
+ val unsolicited = returned &~ requested
+
+ // Instead of storing unsolicited records, we reposition the fetch offset, effectively discarding
+ // the records. This should happen very rarely, only in the event of a rebalance.
+ unsolicited.foreach { partition =>
+ records
+ .records(partition)
+ .toList
+ .headOption
+ .foreach { record =>
+ consumer.seek(partition, record.offset)
+ }
+ }
+
+ (returned -- unsolicited)
+ .view
+ .map(partition => partition -> records.records(partition).toList)
+ .toList
+ }
+ }
+ .flatMap(committableConsumerRecords)
+ .flatMap(completeFetches)
+
+ private[this] def committableConsumerRecords(
+ records: List[(TopicPartition, List[KafkaByteConsumerRecord])]
+ ): F[List[(TopicPartition, Chunk[CommittableConsumerRecord[F, K, V]])]] =
+ records.traverse { case (partition, recs) =>
+ Chunk
+ .from(recs)
+ .traverse { rec =>
+ ConsumerRecord
+ .fromJava(rec, keyDeserializer, valueDeserializer)
+ .map(committableConsumerRecord(_, partition))
+ }
+ .fmap(partition -> _)
+ }
+
+ private[this] def committableConsumerRecord(
+ record: ConsumerRecord[K, V],
+ partition: TopicPartition
+ ): CommittableConsumerRecord[F, K, V] =
+ CommittableConsumerRecord(
+ record = record,
+ offset = CommittableOffset(
+ topicPartition = partition,
+ consumerGroupId = consumerGroupId,
+ offsetAndMetadata = new OffsetAndMetadata(
+ record.offset + 1L,
+ settings.recordMetadata(record)
+ ),
+ commit = commitAsync
+ )
+ )
+
+ private[this] def completeFetches(
+ records: List[(TopicPartition, Chunk[CommittableConsumerRecord[F, K, V]])]
+ ): F[Unit] =
+ state.update(state => state.completeFetches(records.toMap.lift))
+
+ override def commitAsync(offsets: Map[TopicPartition, OffsetAndMetadata]): F[Unit] =
+ commitWithRecovery(
+ offsets,
+ F.async { (cb: Either[Throwable, Unit] => Unit) =>
+ withConsumer
+ .blocking {
+ _.commitAsync(
+ offsets.asJava,
+ (_, exception) => cb(Option(exception).toLeft(()))
+ )
+ }
+ .handleErrorWith(e => F.delay(cb(Left(e))))
+ .as(Some(F.unit))
+ }
+ .timeoutTo(
+ settings.commitTimeout,
+ F.defer(F.raiseError {
+ CommitTimeoutException(
+ settings.commitTimeout,
+ offsets
+ )
+ })
+ )
+ )
+
+ override def commitSync(offsets: Map[TopicPartition, OffsetAndMetadata]): F[Unit] =
+ commitWithRecovery(
+ offsets,
+ withConsumer.blocking(_.commitSync(offsets.asJava, settings.commitTimeout.toJava))
+ )
+
+ private[this] def commitWithRecovery(
+ offsets: Map[TopicPartition, OffsetAndMetadata],
+ commit: F[Unit]
+ ): F[Unit] =
+ commit.handleErrorWith(settings.commitRecovery.recoverCommitWith(offsets, commit))
+
+ override def stopConsuming: F[Unit] = unsubscribe // TODO Close `records` stream
+
+ private[this] def transitionTo[A](state: State[F, K, V], status: Status)(
+ f: F[A]
+ ): F[State[F, K, V]] =
+ if (state.status.isTransitionAllowed(status)) f.as(state.withStatus(status))
+ else
+ F.raiseError(
+ new IllegalStateException(s"Invalid consumer transition from ${state.status} to $status")
+ )
+
+ private[this] def assigned(assigned: SortedSet[TopicPartition]): F[Unit] =
+ state
+ .evalUpdateAndGet { state =>
+// val add = assigned.filter() &~ state.assigned.keySet
+ assigned
+ .toList
+ .foldM(state) { (state, partition) =>
+ state.assigned.get(partition) match {
+ case Some(paused) if assigned(partition) => paused.resume.as(state)
+ case Some(_) => state.withoutAssignment(partition).pure
+ case None =>
+ PartitionStream(partition, requests).map(state.withAssignment(partition, _))
+ }
+ }
+ }
+ .flatTap { state =>
+ assignments
+ .publish1(state.assigned.filter { case (partition, _) => assigned(partition) })
+ .void
+ }
+ .log(LogEntry.AssignedPartitions(assigned, _))
+
+ /**
+ * Depending on the partition assignor used, it will revoke all the partitions before reassigning
+ * them (eager) or revoke only the ones reassigned to other consumer (cooperative). At this point
+ * we don't know if a revoked partition will be reassigned immediately, thus we pause them
+ * preemptively, avoiding finishing the selected partition streams.
+ * @see
+ * org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol
+ */
+ private[this] def revoked(revoked: SortedSet[TopicPartition]): F[Unit] =
+ state
+ .get
+ .flatTap { state =>
+ state.assigned.view.filterKeys(revoked).values.toList.traverse_(_.pause)
+ }
+ .log(LogEntry.RevokedPartitions(revoked, _))
+
+ private[this] def lost(lost: SortedSet[TopicPartition]): F[Unit] =
+ F.raiseError(new UnsupportedOperationException) // TODO
+
+}
+
+object KafkaConsumer {
+
+ sealed abstract class Request[-F[_], -K, -V]
+
+ object Request {
+
+ final case object Poll extends Request[Any, Any, Any]
+
+ final case class Fetch[F[_], K, V](
+ partition: TopicPartition,
+ records: Either[Throwable, Chunk[CommittableConsumerRecord[F, K, V]]] => Unit
+ ) extends Request[F, K, V]
+
+ }
+
+ sealed trait Status { self =>
+
+ // TODO Permitir suscribirse multiples veces?
+ def isTransitionAllowed(status: Status): Boolean =
+ (self, status) match {
+ case (Status.Unsubscribed, Status.Subscribed) => true
+ case (Status.Subscribed, Status.Streaming) => true
+ case (Status.Subscribed, Status.Unsubscribed) => true
+ case (Status.Streaming, Status.Unsubscribed) => true
+ case _ => false
+ }
+
+ }
+
+ object Status {
+
+ case object Unsubscribed extends Status
+ case object Subscribed extends Status
+ case object Streaming extends Status
+
+ implicit val eq: Eq[Status] = Eq.fromUniversalEquals
+
+ }
+
+ // TODO Prefetch
+ final case class State[F[_], K, V](
+ assigned: Map[TopicPartition, PartitionStream[F, K, V]],
+ fetches: Map[
+ TopicPartition,
+ Either[Throwable, Chunk[CommittableConsumerRecord[F, K, V]]] => Unit
+ ],
+ status: Status
+ ) {
+
+ def withAssignment(
+ partition: TopicPartition,
+ stream: PartitionStream[F, K, V]
+ ): State[F, K, V] =
+ if (assigned.contains(partition)) this else copy(assigned = assigned + (partition -> stream))
+
+ def withoutAssignment(partition: TopicPartition): State[F, K, V] =
+ if (!assigned.contains(partition)) this else copy(assigned = assigned - partition)
+
+ def withFetch(
+ partition: TopicPartition,
+ records: Either[Throwable, Chunk[CommittableConsumerRecord[F, K, V]]] => Unit
+ ): State[F, K, V] =
+ copy(fetches = fetches + (partition -> records))
+
+ def withoutFetches(partitions: Iterable[TopicPartition]): State[F, K, V] =
+ copy(fetches = fetches -- partitions)
+
+ def completeFetches(
+ f: TopicPartition => Option[Chunk[CommittableConsumerRecord[F, K, V]]]
+ ): State[F, K, V] = {
+ val completed = fetches
+ .toList
+ .mapFilter { case (partition, cb) =>
+ f(partition).map { records =>
+ cb(Right(records))
+ partition
+ }
+ }
+ if (completed.isEmpty) this else copy(fetches = fetches -- completed)
+ }
+
+ def withStatus(status: Status): State[F, K, V] =
+ if (this.status === status) this else copy(status = status)
+
+ }
+
+ object State {
+
+ def empty[F[_], K, V]: State[F, K, V] =
+ State(
+ assigned = Map.empty,
+ fetches = Map.empty,
+ status = Status.Unsubscribed
+ )
+
+ implicit def show[F[_], K, V]: Show[State[F, K, V]] = Show.fromToString
+
+ }
+
+ def stream[F[_]: Async: MkConsumer, K, V](
+ settings: ConsumerSettings[F, K, V]
+ ): Stream[F, KafkaConsumer[F, K, V]] =
+ Stream.resource(resource(settings))
+
+ def resource[F[_], K, V](
+ settings: ConsumerSettings[F, K, V]
+ )(implicit
+ F: Async[F],
+ mk: MkConsumer[F]
+ ): Resource[F, KafkaConsumer[F, K, V]] =
+ for {
+ keyDeserializer <- settings.keyDeserializer
+ valueDeserializer <- settings.valueDeserializer
+ case implicit0(logging: Logging[F]) <- Resource
+ .eval(Logging.default[F](new Object().hashCode))
+ case implicit0(jitter: Jitter[F]) <- Resource.eval(Jitter.default[F])
+ requests <- Resource.eval(Queue.unbounded[F, Request[F, K, V]])
+ dispatcher <- Dispatcher.sequential[F]
+ withConsumer <- WithConsumer(mk, settings)
+ state <- Resource.eval(AtomicCell[F].of(State.empty[F, K, V]))
+ assignments <- Resource.eval(Topic[F, Map[TopicPartition, PartitionStream[F, K, V]]])
+ consumer <- Resource.make(
+ F.pure(
+ new KafkaConsumer(
+ settings,
+ keyDeserializer,
+ valueDeserializer,
+ state,
+ withConsumer,
+ requests,
+ assignments,
+ dispatcher
+ )
+ )
+ )(_.stopConsuming) // TODO Testear resource.release graceful (cuando no se interrumpe el stream directamente)
+ } yield consumer
+
+}
diff --git a/modules/core/src/main/scala/fs2/kafka/internal/experimental/PartitionStream.scala b/modules/core/src/main/scala/fs2/kafka/internal/experimental/PartitionStream.scala
new file mode 100644
index 000000000..623244bb5
--- /dev/null
+++ b/modules/core/src/main/scala/fs2/kafka/internal/experimental/PartitionStream.scala
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2018-2024 OVO Energy Limited
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package fs2.kafka.internal.experimental
+
+import cats.*
+import cats.effect.*
+import cats.effect.std.*
+import cats.syntax.all.*
+import fs2.*
+import fs2.concurrent.*
+import fs2.kafka.internal.experimental.KafkaConsumer.Request
+import fs2.kafka.internal.experimental.PartitionStream.Status
+import fs2.kafka.CommittableConsumerRecord
+
+import org.apache.kafka.common.TopicPartition
+
+class PartitionStream[F[_], K, V](
+ partition: TopicPartition,
+ store: Request.Fetch[F, K, V] => F[Unit],
+ status: SignallingRef[F, Status]
+)(implicit F: Async[F]) {
+
+ private val pauseSignal = status.map(_ == Status.Paused)
+ private val interruptSignal = status.map(_ == Status.Stopped)
+
+ def create: Stream[F, CommittableConsumerRecord[F, K, V]] =
+ Stream.exec(transitionTo(Status.Running)) ++
+ status
+ .continuous
+ .takeWhile(_.isNotFinished)
+ .evalMap { _ =>
+ F.async[Chunk[CommittableConsumerRecord[F, K, V]]] { cb =>
+ store(Request.Fetch(partition, cb)).as(Some(F.unit))
+ }
+ }
+ .debugChunks()
+ .unchunks
+ .pauseWhen(pauseSignal)
+ .interruptWhen(interruptSignal)
+ .onFinalize(transitionTo(Status.Init).flatTap(_ => F.pure(println(s"Finalizing $this"))))
+
+ def isInit: F[Boolean] = status.get.map(_ == Status.Init)
+
+ def pause: F[Unit] = transitionTo(Status.Paused)
+
+ def isPaused: F[Boolean] = status.get.map(_ == Status.Paused)
+
+ def resume: F[Unit] = transitionTo(Status.Running)
+
+ def stop: F[Unit] = transitionTo(Status.Stopped) *> status.waitUntil(_ == Status.Init)
+
+ def close: F[Unit] = transitionTo(Status.Closed) *> status.waitUntil(_ == Status.Init)
+
+ private def transitionTo(newState: Status): F[Unit] =
+ status
+ .get
+ .flatMap { currentState =>
+ if (currentState.isTransitionAllowed(newState)) status.set(newState)
+ else
+ F.raiseError(
+ new IllegalStateException(s"Invalid transition from $currentState to $newState")
+ )
+ }
+
+}
+
+object PartitionStream {
+
+ sealed trait Status { self =>
+
+ def isTransitionAllowed(status: Status): Boolean =
+ (self, status) match {
+ case (Status.Init, Status.Running) => true
+ case (Status.Running, Status.Paused) => true
+ case (Status.Running, Status.Closed) => true
+ case (Status.Paused, Status.Running) => true
+ case (Status.Paused, Status.Stopped) => true
+ case (_, Status.Init) => true
+ case (x, y) if x === y => true
+ case _ => false
+ }
+
+ def isNotFinished: Boolean = self match {
+ case Status.Stopped | Status.Closed => false
+ case _ => true
+ }
+
+ }
+
+ object Status {
+
+ case object Init extends Status
+ case object Running extends Status
+ case object Paused extends Status
+ case object Stopped extends Status
+ case object Closed extends Status
+
+ implicit val eq: Eq[Status] = Eq.fromUniversalEquals
+
+ }
+
+ def apply[F[_]: Async, K, V](
+ partition: TopicPartition,
+ requests: Queue[F, Request[F, K, V]]
+ ): F[PartitionStream[F, K, V]] =
+ SignallingRef
+ .of[F, Status](Status.Init)
+ .map(state => new PartitionStream(partition, requests.offer, state))
+
+}
diff --git a/modules/core/src/test/resources/logback-test.xml b/modules/core/src/test/resources/logback-test.xml
index 840f42d31..6429b58ef 100644
--- a/modules/core/src/test/resources/logback-test.xml
+++ b/modules/core/src/test/resources/logback-test.xml
@@ -6,6 +6,8 @@
+
+
diff --git a/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala b/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala
index 22ddfb0a4..18cedac2e 100644
--- a/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala
+++ b/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala
@@ -12,13 +12,14 @@ import java.util.concurrent.CancellationException
import cats.effect.std.CountDownLatch
import cats.effect.unsafe.implicits.global
import cats.effect.IO
+import fs2.concurrent.Channel
import fs2.kafka.*
import fs2.kafka.internal.syntax.*
-import fs2.kafka.BaseSpec
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.KafkaFuture
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
final class SyntaxSpec extends BaseSpec {
@@ -86,6 +87,17 @@ final class SyntaxSpec extends BaseSpec {
} yield ()
test.unsafeRunSync()
}
+
+ it("playground") {
+ (for {
+ channel <- Channel.unbounded[IO, String]
+ _ <- channel.send("hola")
+ _ <- channel.send("adios")
+ _ <- IO.sleep(5.seconds)
+ _ <- channel.stream.take(2).foreach(s => IO.println(s)).compile.drain.timeout(10.seconds)
+ } yield ()).unsafeRunSync()
+
+ }
}
}
diff --git a/modules/core/src/test/scala/fs2/kafka/internal/experimental/KafkaConsumerSpec.scala b/modules/core/src/test/scala/fs2/kafka/internal/experimental/KafkaConsumerSpec.scala
new file mode 100644
index 000000000..58519504d
--- /dev/null
+++ b/modules/core/src/test/scala/fs2/kafka/internal/experimental/KafkaConsumerSpec.scala
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2018-2024 OVO Energy Limited
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package fs2.kafka.internal.experimental
+
+import scala.concurrent.duration.*
+
+import cats.data.NonEmptyList
+import cats.effect.unsafe.implicits.global
+import cats.effect.IO
+import fs2.kafka.{BaseKafkaSpec, CommittableConsumerRecord}
+import fs2.Stream
+
+final class KafkaConsumerSpec extends BaseKafkaSpec {
+
+ type Consumer = KafkaConsumer[IO, String, String]
+
+ type ConsumerStream = Stream[IO, CommittableConsumerRecord[IO, String, String]]
+
+ describe("KafkaConsumer#stream") {
+ it("should consume all records with subscribe") {
+ withTopic { topic =>
+ createCustomTopic(topic, partitions = 3)
+
+ val produced = (0 until 5).map(n => s"key-$n" -> s"value->$n")
+ publishToKafka(topic, produced)
+
+ val consumed =
+ (for {
+ consumer <- KafkaConsumer.stream(consumerSettings[IO])
+ _ <- Stream.eval(consumer.subscribe(NonEmptyList.one(topic)))
+ committable <- consumer.records
+ } yield committable.record.key -> committable.record.value)
+ .interruptAfter(10.seconds)
+ .compile
+ .toVector
+ .unsafeRunSync()
+
+ consumed should contain theSameElementsAs produced
+ }
+ }
+ }
+
+}