Skip to content

Commit

Permalink
Merge pull request #1111 from sergio-margale/producer_partition_for_t…
Browse files Browse the repository at this point in the history
…opic

Add partitionFor for given topic with KafkaProducer
  • Loading branch information
bplommer authored Dec 5, 2022
2 parents 5c0b459 + 08d3fff commit c35f421
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 15 deletions.
4 changes: 4 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,16 @@ ThisBuild / mimaBinaryIssueFilters ++= {

// package-private
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.kafka.KafkaProducer.from"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("fs2.kafka.KafkaProducer.from"),

// sealed
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ConsumerSettings.withDeserializers"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ProducerSettings.withSerializers"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.vulcan.AvroSettings.*"),
ProblemFilters.exclude[FinalMethodProblem]("fs2.kafka.vulcan.AvroSettings.*"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("fs2.kafka.KafkaProducerConnection.withSerializers"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaProducerConnection.withSerializers"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaProducerConnection.partitionsFor"),

// private
ProblemFilters.exclude[Problem]("fs2.kafka.vulcan.AvroSettings#AvroSettingsImpl.*")
Expand Down
33 changes: 24 additions & 9 deletions modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@

package fs2.kafka

import cats.{Apply, Functor}
import cats.effect._
import cats.syntax.all._
import cats.Apply
import fs2._
import fs2.{Chunk, _}
import fs2.kafka.internal._
import fs2.kafka.producer.MkProducer
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.{Metric, MetricName}
import fs2.Chunk
import cats.Functor
import org.apache.kafka.common.{Metric, MetricName, PartitionInfo}

import scala.annotation.nowarn
import scala.concurrent.Promise
Expand Down Expand Up @@ -122,6 +120,20 @@ object KafkaProducer {
def metrics: F[Map[MetricName, Metric]]
}

/**
* [[KafkaProducer.PartitionsFor]] extends [[KafkaProducer.Metrics]] to provide
* access to the underlying producer partitions.
*/
abstract class PartitionsFor[F[_], K, V] extends KafkaProducer.Metrics[F, K, V] {

/**
* Returns partition metadata for the given topic.
*
* @see org.apache.kafka.clients.producer.KafkaProducer#partitionsFor
*/
def partitionsFor(topic: String): F[List[PartitionInfo]]
}

/**
* Creates a new [[KafkaProducer]] in the `Resource` context,
* using the specified [[ProducerSettings]]. Note that there
Expand All @@ -135,15 +147,15 @@ object KafkaProducer {
*/
def resource[F[_], K, V](
settings: ProducerSettings[F, K, V]
)(implicit F: Async[F], mk: MkProducer[F]): Resource[F, KafkaProducer.Metrics[F, K, V]] =
)(implicit F: Async[F], mk: MkProducer[F]): Resource[F, KafkaProducer.PartitionsFor[F, K, V]] =
KafkaProducerConnection.resource(settings)(F, mk).evalMap(_.withSerializersFrom(settings))

private[kafka] def from[F[_], K, V](
connection: KafkaProducerConnection[F],
keySerializer: Serializer[F, K],
valueSerializer: Serializer[F, V]
): KafkaProducer.Metrics[F, K, V] =
new KafkaProducer.Metrics[F, K, V] {
): KafkaProducer.PartitionsFor[F, K, V] =
new KafkaProducer.PartitionsFor[F, K, V] {
override def produce[P](
records: ProducerRecords[P, K, V]
): F[F[ProducerResult[P, K, V]]] =
Expand All @@ -154,6 +166,9 @@ object KafkaProducer {

override def toString: String =
"KafkaProducer$" + System.identityHashCode(this)

override def partitionsFor(topic: String): F[List[PartitionInfo]] =
connection.partitionsFor(topic)
}

/**
Expand All @@ -169,7 +184,7 @@ object KafkaProducer {
*/
def stream[F[_], K, V](
settings: ProducerSettings[F, K, V]
)(implicit F: Async[F], mk: MkProducer[F]): Stream[F, KafkaProducer.Metrics[F, K, V]] =
)(implicit F: Async[F], mk: MkProducer[F]): Stream[F, KafkaProducer.PartitionsFor[F, K, V]] =
Stream.resource(KafkaProducer.resource(settings)(F, mk))

private[kafka] def produce[F[_]: Async, P, K, V](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ import fs2._
import fs2.kafka.internal._
import fs2.kafka.internal.converters.collection._
import fs2.kafka.producer.MkProducer
import org.apache.kafka.common.{Metric, MetricName, PartitionInfo}

import scala.annotation.nowarn
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.Metric

/**
* [[KafkaProducerConnection]] represents a connection to a Kafka broker
Expand All @@ -41,7 +40,7 @@ sealed abstract class KafkaProducerConnection[F[_]] {
def withSerializers[K, V](
keySerializer: Serializer[F, K],
valueSerializer: Serializer[F, V]
): KafkaProducer.Metrics[F, K, V]
): KafkaProducer.PartitionsFor[F, K, V]

/**
* Creates a new [[KafkaProducer]] in the `F` context,
Expand All @@ -53,7 +52,11 @@ sealed abstract class KafkaProducerConnection[F[_]] {
*/
def withSerializersFrom[K, V](
settings: ProducerSettings[F, K, V]
): F[KafkaProducer.Metrics[F, K, V]]
): F[KafkaProducer.PartitionsFor[F, K, V]]

def partitionsFor(
topic: String
): F[List[PartitionInfo]]
}

object KafkaProducerConnection {
Expand Down Expand Up @@ -132,13 +135,16 @@ object KafkaProducerConnection {
override def withSerializers[K, V](
keySerializer: Serializer[G, K],
valueSerializer: Serializer[G, V]
): KafkaProducer.Metrics[G, K, V] =
): KafkaProducer.PartitionsFor[G, K, V] =
KafkaProducer.from(this, keySerializer, valueSerializer)

override def withSerializersFrom[K, V](
settings: ProducerSettings[G, K, V]
): G[KafkaProducer.Metrics[G, K, V]] =
): G[KafkaProducer.PartitionsFor[G, K, V]] =
(settings.keySerializer, settings.valueSerializer).mapN(withSerializers)

override def partitionsFor(topic: String): G[List[PartitionInfo]] =
withProducer.blocking { _.partitionsFor(topic).asScala.toList }
}
}

Expand Down
22 changes: 22 additions & 0 deletions modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,26 @@ final class KafkaProducerSpec extends BaseKafkaSpec {
assert(res.nonEmpty)
}
}

describe("KafkaProducer#partitionsFor") {
it("should correctly return partitions for topic") {
withTopic { topic =>
val partitions = List(0, 1, 2)

createCustomTopic(topic, partitions = partitions.size)

val info =
KafkaProducer
.stream(producerSettings[IO])
.evalMap(_.partitionsFor(topic))

val res =
info.compile.lastOrError
.unsafeRunSync()

res.map(_.partition()) should contain theSameElementsAs partitions
res.map(_.topic()).toSet should contain theSameElementsAs Set(topic)
}
}
}
}

0 comments on commit c35f421

Please sign in to comment.