-
Notifications
You must be signed in to change notification settings - Fork 102
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge remote-tracking branch 'origin/series/2.x' into series/3.x
- Loading branch information
Showing
9 changed files
with
365 additions
and
133 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
74 changes: 74 additions & 0 deletions
74
modules/core/src/main/scala/fs2/kafka/internal/WithTransactionalProducer.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
/* | ||
* Copyright 2018-2022 OVO Energy Limited | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package fs2.kafka.internal | ||
|
||
import cats.effect.std.Semaphore | ||
import cats.effect.{Async, MonadCancelThrow, Resource} | ||
import cats.implicits._ | ||
import scala.jdk.DurationConverters._ | ||
import fs2.kafka.producer.MkProducer | ||
import fs2.kafka.{KafkaByteProducer, TransactionalProducerSettings} | ||
|
||
private[kafka] sealed abstract class WithTransactionalProducer[F[_]] { | ||
def apply[A](f: (KafkaByteProducer, Blocking[F], ExclusiveAccess[F, A]) => F[A]): F[A] | ||
|
||
def exclusiveAccess[A](f: (KafkaByteProducer, Blocking[F]) => F[A]): F[A] = apply { | ||
case (producer, blocking, exclusive) => exclusive(f(producer, blocking)) | ||
} | ||
|
||
def blocking[A](f: KafkaByteProducer => A): F[A] = apply { | ||
case (producer, blocking, _) => blocking(f(producer)) | ||
} | ||
} | ||
|
||
private[kafka] object WithTransactionalProducer { | ||
def apply[F[_], K, V]( | ||
mk: MkProducer[F], | ||
settings: TransactionalProducerSettings[F, K, V] | ||
)( | ||
implicit F: Async[F] | ||
): Resource[F, WithTransactionalProducer[F]] = | ||
Resource[F, WithTransactionalProducer[F]] { | ||
(mk(settings.producerSettings), Semaphore(1)).tupled.flatMap { | ||
case (producer, semaphore) => | ||
val blocking = settings.producerSettings.customBlockingContext | ||
.fold(Blocking.fromSync[F])(Blocking.fromExecutionContext) | ||
|
||
val withProducer = create(producer, blocking, semaphore) | ||
|
||
val initTransactions = withProducer.blocking { _.initTransactions() } | ||
|
||
/* | ||
Deliberately does not use the exclusive access functionality to close the producer. The close method on | ||
the underlying client waits until the buffer has been flushed to the broker or the timeout is exceeded. | ||
Because the transactional producer _always_ waits until the buffer is flushed and the transaction | ||
committed on the broker before proceeding, upon gaining exclusive access to the producer the buffer will | ||
always be empty. Therefore if we used exclusive access to close the underlying producer, the buffer | ||
would already be empty and the close timeout setting would be redundant. | ||
TLDR: not using exclusive access here preserves the behaviour of the underlying close method and timeout | ||
setting | ||
*/ | ||
val close = withProducer.blocking { | ||
_.close(settings.producerSettings.closeTimeout.toJava) | ||
} | ||
|
||
initTransactions.as((withProducer, close)) | ||
} | ||
} | ||
|
||
private def create[F[_]: MonadCancelThrow]( | ||
producer: KafkaByteProducer, | ||
_blocking: Blocking[F], | ||
transactionSemaphore: Semaphore[F] | ||
): WithTransactionalProducer[F] = new WithTransactionalProducer[F] { | ||
override def apply[A]( | ||
f: (KafkaByteProducer, Blocking[F], ExclusiveAccess[F, A]) => F[A] | ||
): F[A] = | ||
f(producer, _blocking, transactionSemaphore.permit.surround) | ||
} | ||
} |
12 changes: 12 additions & 0 deletions
12
modules/core/src/main/scala/fs2/kafka/internal/package.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
/* | ||
* Copyright 2018-2022 OVO Energy Limited | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package fs2.kafka | ||
|
||
package object internal { | ||
private[kafka] type ExclusiveAccess[F[_], A] = F[A] => F[A] | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.