Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow use of custom blocking context for Consumer and Producer #590

Merged
merged 1 commit into from
Apr 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import fs2.kafka.security.KafkaCredentialStore
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.requests.OffsetFetchResponse

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

/**
Expand Down Expand Up @@ -46,6 +47,24 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def valueDeserializer: F[Deserializer[F, V]]

/**
* A custom `ExecutionContext` to use for blocking Kafka operations. If not
* provided, a default single-threaded `ExecutionContext` will be created
* when creating a `KafkaConsumer` instance.
*/
def customBlockingContext: Option[ExecutionContext]

/**
* Returns a new [[ConsumerSettings]] instance with the
* specified [[ExecutionContext]] to use for blocking operations.
*
* Because the underlying Java consumer is not thread-safe,
* the ExecutionContext *must* be single-threaded. If in doubt,
* leave this unset so that a default single-threaded
* blocker will be provided.
*/
def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V]

/**
* Properties which can be provided when creating a Java `KafkaConsumer`
* instance. Numerous functions in [[ConsumerSettings]] add properties
Expand Down Expand Up @@ -378,6 +397,7 @@ object ConsumerSettings {
private[this] final case class ConsumerSettingsImpl[F[_], K, V](
override val keyDeserializer: F[Deserializer[F, K]],
override val valueDeserializer: F[Deserializer[F, V]],
override val customBlockingContext: Option[ExecutionContext],
override val properties: Map[String, String],
override val closeTimeout: FiniteDuration,
override val commitTimeout: FiniteDuration,
Expand All @@ -387,6 +407,8 @@ object ConsumerSettings {
override val recordMetadata: ConsumerRecord[K, V] => String,
override val maxPrefetchBatches: Int
) extends ConsumerSettings[F, K, V] {
override def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V] =
copy(customBlockingContext = Some(ec))

override def withBootstrapServers(bootstrapServers: String): ConsumerSettings[F, K, V] =
withProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
Expand Down Expand Up @@ -512,6 +534,7 @@ object ConsumerSettings {
valueDeserializer: F[Deserializer[F, V]]
): ConsumerSettings[F, K, V] =
ConsumerSettingsImpl(
customBlockingContext = None,
keyDeserializer = keyDeserializer,
valueDeserializer = valueDeserializer,
properties = Map(
Expand Down
21 changes: 21 additions & 0 deletions modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import cats.{Applicative, Show}
import fs2.kafka.security.KafkaCredentialStore
import org.apache.kafka.clients.producer.ProducerConfig

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

/**
Expand Down Expand Up @@ -37,6 +38,22 @@ sealed abstract class ProducerSettings[F[_], K, V] {
*/
def valueSerializer: F[Serializer[F, V]]

/**
* A custom [[ExecutionContext]] to use for blocking Kafka operations.
* If not provided, the default blocking ExecutionContext provided by
* [[cats.effect.Sync]] will be used.
*/
def customBlockingContext: Option[ExecutionContext]

/**
* Returns a new [[ProducerSettings]] instance with the
* specified [[ExecutionContext]] to use for blocking operations.
*
* If not provided, the default blocking ExecutionContext provided by
* [[cats.effect.Sync]] will be used. If in doubt, leave this unset.
*/
def withCustomBlockingContext(ec: ExecutionContext): ProducerSettings[F, K, V]

/**
* Properties which can be provided when creating a Java `KafkaProducer`
* instance. Numerous functions in [[ProducerSettings]] add properties
Expand Down Expand Up @@ -220,10 +237,13 @@ object ProducerSettings {
private[this] final case class ProducerSettingsImpl[F[_], K, V](
override val keySerializer: F[Serializer[F, K]],
override val valueSerializer: F[Serializer[F, V]],
override val customBlockingContext: Option[ExecutionContext],
override val properties: Map[String, String],
override val closeTimeout: FiniteDuration,
override val parallelism: Int
) extends ProducerSettings[F, K, V] {
override def withCustomBlockingContext(ec: ExecutionContext): ProducerSettings[F, K, V] =
copy(customBlockingContext = Some(ec))

override def withBootstrapServers(bootstrapServers: String): ProducerSettings[F, K, V] =
withProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
Expand Down Expand Up @@ -298,6 +318,7 @@ object ProducerSettings {
ProducerSettingsImpl(
keySerializer = keySerializer,
valueSerializer = valueSerializer,
customBlockingContext = None,
properties = Map(
ProducerConfig.RETRIES_CONFIG -> "0"
),
Expand Down
29 changes: 12 additions & 17 deletions modules/core/src/main/scala/fs2/kafka/internal/Blocking.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ private[kafka] trait Blocking[F[_]] {
}

private[kafka] object Blocking {
def apply[F[_]: Sync]: Blocking[F] = new Blocking[F] {
def fromSync[F[_]: Sync]: Blocking[F] = new Blocking[F] {
override def apply[A](a: => A): F[A] = Sync[F].blocking(a)
}

def fromExecutionContext[F[_]](ec: ExecutionContext)(implicit F: Async[F]): Blocking[F] =
new Blocking[F] {
def apply[A](a: => A): F[A] = F.delay(a).evalOn(ec)
}

def singleThreaded[F[_]](name: String)(implicit F: Async[F]): Resource[F, Blocking[F]] =
Resource {
F.delay {
val executor =
Resource
.make(
F.delay(
Executors.newSingleThreadExecutor(
(runnable: Runnable) => {
val thread = new Thread(runnable)
Expand All @@ -33,17 +38,7 @@ private[kafka] object Blocking {
thread
}
)

val ec = ExecutionContext.fromExecutor(executor)

val blocking: Blocking[F] = new Blocking[F] {
def apply[A](a: => A): F[A] = F.delay(a).evalOn(ec)
}

val shutdown =
F.delay(executor.shutdown())

(blocking, shutdown)
}
}
)
)(ex => F.delay(ex.shutdown()))
.map(ex => fromExecutionContext(ExecutionContext.fromExecutor(ex)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ private[kafka] object WithConsumer {
def apply[F[_]: Async, K, V](
mk: MkConsumer[F],
settings: ConsumerSettings[F, K, V]
): Resource[F, WithConsumer[F]] =
Blocking.singleThreaded[F]("fs2-kafka-consumer").flatMap { b =>
): Resource[F, WithConsumer[F]] = {
val blocking: Resource[F, Blocking[F]] = settings.customBlockingContext match {
case None => Blocking.singleThreaded[F]("fs2-kafka-consumer")
case Some(ec) => Resource.pure(Blocking.fromExecutionContext(ec))
}

blocking.flatMap { b =>
Resource.make {
mk(settings).map { consumer =>
new WithConsumer[F] {
Expand All @@ -31,4 +36,5 @@ private[kafka] object WithConsumer {
}
}(_.blocking { _.close(settings.closeTimeout.asJava) })
}
}
}
18 changes: 12 additions & 6 deletions modules/core/src/main/scala/fs2/kafka/internal/WithProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
package fs2.kafka.internal

import fs2.kafka.producer.MkProducer
import cats.effect.{Resource, Sync}
import cats.effect.{Async, Resource}
import cats.implicits._
import fs2.kafka.{KafkaByteProducer, ProducerSettings, TransactionalProducerSettings}
import fs2.kafka.internal.syntax._
Expand All @@ -25,21 +25,27 @@ private[kafka] object WithProducer {
mk: MkProducer[F],
settings: ProducerSettings[F, K, V]
)(
implicit F: Sync[F]
): Resource[F, WithProducer[F]] =
implicit F: Async[F]
): Resource[F, WithProducer[F]] = {
val blocking =
settings.customBlockingContext.fold(Blocking.fromSync[F])(Blocking.fromExecutionContext)
Resource.make(
mk(settings).map(create(_, Blocking[F]))
mk(settings).map(create(_, blocking))
)(_.blocking { _.close(settings.closeTimeout.asJava) })
}

def apply[F[_], K, V](
mk: MkProducer[F],
settings: TransactionalProducerSettings[F, K, V]
)(
implicit F: Sync[F]
implicit F: Async[F]
): Resource[F, WithProducer[F]] =
Resource[F, WithProducer[F]] {
mk(settings.producerSettings).flatMap { producer =>
val withProducer = create(producer, Blocking[F])
val blocking = settings.producerSettings.customBlockingContext
.fold(Blocking.fromSync[F])(Blocking.fromExecutionContext)

val withProducer = create(producer, blocking)

val initTransactions = withProducer.blocking { _.initTransactions() }

Expand Down
11 changes: 11 additions & 0 deletions modules/core/src/test/scala/fs2/kafka/ConsumerSettingsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,21 @@ import cats.effect.IO
import cats.effect.unsafe.implicits.global
import cats.implicits._
import org.apache.kafka.clients.consumer.ConsumerConfig

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

final class ConsumerSettingsSpec extends BaseSpec {
describe("ConsumerSettings") {
it("should be able to set a custom blocking context") {
assert {
settings.customBlockingContext.isEmpty &&
settings.withCustomBlockingContext(ExecutionContext.global).customBlockingContext === Some(
ExecutionContext.global
)
}
}

it("should provide withBootstrapServers") {
assert {
settings
Expand Down
12 changes: 12 additions & 0 deletions modules/core/src/test/scala/fs2/kafka/ProducerSettingsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import cats.effect.IO
import cats.effect.unsafe.implicits.global
import cats.implicits._
import org.apache.kafka.clients.producer.ProducerConfig

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

final class ProducerSettingsSpec extends BaseSpec {
Expand Down Expand Up @@ -159,6 +161,16 @@ final class ProducerSettingsSpec extends BaseSpec {
ProducerSettings[IO, Int, String].valueSerializer.unsafeRunSync() shouldBe serializerInstance
ProducerSettings[IO, String, String]
}

it("should be able to set a custom blocking context") {
assert {
settings.customBlockingContext.isEmpty &&
settings.withCustomBlockingContext(ExecutionContext.global).customBlockingContext === Some(
ExecutionContext.global
)
}
}

}

val settings = ProducerSettings[IO, String, String]
Expand Down