Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
aartigao committed Mar 28, 2024
1 parent 06c249c commit 5167376
Show file tree
Hide file tree
Showing 8 changed files with 668 additions and 24 deletions.
18 changes: 11 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ val vulcanVersion = "1.10.1"

val munitVersion = "0.7.29"

val weaverVersion = "0.8.3"

val scala212 = "2.12.18"

val scala213 = "2.13.12"
Expand Down Expand Up @@ -111,12 +113,13 @@ lazy val docs = project
lazy val dependencySettings = Seq(
resolvers += "confluent".at("https://packages.confluent.io/maven/"),
libraryDependencies ++= Seq(
"com.dimafeng" %% "testcontainers-scala-scalatest" % testcontainersScalaVersion,
"com.dimafeng" %% "testcontainers-scala-kafka" % testcontainersScalaVersion,
"org.typelevel" %% "discipline-scalatest" % disciplineVersion,
"org.typelevel" %% "cats-effect-laws" % catsEffectVersion,
"org.typelevel" %% "cats-effect-testkit" % catsEffectVersion,
"ch.qos.logback" % "logback-classic" % logbackVersion
"com.disneystreaming" %% "weaver-cats" % weaverVersion,
"com.dimafeng" %% "testcontainers-scala-scalatest" % testcontainersScalaVersion,
"com.dimafeng" %% "testcontainers-scala-kafka" % testcontainersScalaVersion,
"org.typelevel" %% "discipline-scalatest" % disciplineVersion,
"org.typelevel" %% "cats-effect-laws" % catsEffectVersion,
"org.typelevel" %% "cats-effect-testkit" % catsEffectVersion,
"ch.qos.logback" % "logback-classic" % logbackVersion
).map(_ % Test),
libraryDependencies ++= {
if (scalaVersion.value.startsWith("3")) Nil
Expand Down Expand Up @@ -319,7 +322,8 @@ lazy val scalaSettings = Seq(
lazy val testSettings = Seq(
Test / logBuffered := false,
Test / parallelExecution := false,
Test / testOptions += Tests.Argument("-oDF")
Test / testOptions += Tests.Argument("-oDF"),
testFrameworks += new TestFramework("weaver.framework.CatsEffect")
)

def minorVersion(version: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import cats.effect.*
import cats.effect.std.*
import cats.effect.syntax.all.*
import cats.syntax.all.*
import cats.Show
import fs2.kafka.*
import fs2.kafka.instances.*
import fs2.kafka.internal.converters.collection.*
Expand Down Expand Up @@ -609,6 +610,8 @@ private[kafka] object KafkaConsumerActor {
streaming = false
)

implicit def show[F[_], K, V]: Show[State[F, K, V]] = Show.fromToString

}

sealed abstract class FetchCompletedReason {
Expand Down
33 changes: 17 additions & 16 deletions modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import java.util.regex.Pattern

import scala.collection.immutable.SortedSet

import cats.*
import cats.data.{Chain, NonEmptyList, NonEmptySet, NonEmptyVector}
import cats.syntax.all.*
import fs2.kafka.instances.*
Expand All @@ -31,15 +32,15 @@ sealed abstract private[kafka] class LogEntry {

private[kafka] object LogEntry {

final case class SubscribedTopics[F[_]](
topics: NonEmptyList[String],
state: State[F, ?, ?]
final case class SubscribedTopics[G[_]: Reducible, State: Show](
topics: G[String],
state: State
) extends LogEntry {

override def level: LogLevel = Debug

override def message: String =
s"Consumer subscribed to topics [${topics.toList.mkString(", ")}]. Current state [$state]."
show"Consumer subscribed to topics [${topics.mkString_(", ")}]. Current state [$state]."

}

Expand All @@ -55,26 +56,26 @@ private[kafka] object LogEntry {

}

final case class SubscribedPattern[F[_]](
final case class SubscribedPattern[State: Show](
pattern: Pattern,
state: State[F, ?, ?]
state: State
) extends LogEntry {

override def level: LogLevel = Debug

override def message: String =
s"Consumer subscribed to pattern [$pattern]. Current state [$state]."
s"Consumer subscribed to pattern [$pattern]. Current state [${state.show}]."

}

final case class Unsubscribed[F[_]](
state: State[F, ?, ?]
final case class Unsubscribed[State: Show](
state: State
) extends LogEntry {

override def level: LogLevel = Debug

override def message: String =
s"Consumer unsubscribed from all partitions. Current state [$state]."
show"Consumer unsubscribed from all partitions. Current state [$state]."

}

Expand Down Expand Up @@ -103,27 +104,27 @@ private[kafka] object LogEntry {

}

final case class AssignedPartitions[F[_]](
final case class AssignedPartitions[State: Show](
partitions: SortedSet[TopicPartition],
state: State[F, ?, ?]
state: State
) extends LogEntry {

override def level: LogLevel = Debug

override def message: String =
s"Assigned partitions [${partitions.mkString(", ")}]. Current state [$state]."
s"Assigned partitions [${partitions.mkString(", ")}]. Current state [${state.show}]."

}

final case class RevokedPartitions[F[_]](
final case class RevokedPartitions[State: Show](
partitions: SortedSet[TopicPartition],
state: State[F, ?, ?]
state: State
) extends LogEntry {

override def level: LogLevel = Debug

override def message: String =
s"Revoked partitions [${partitions.mkString(", ")}]. Current state [$state]."
s"Revoked partitions [${partitions.mkString(", ")}]. Current state [${state.show}]."

}

Expand Down
Loading

0 comments on commit 5167376

Please sign in to comment.