-
Notifications
You must be signed in to change notification settings - Fork 102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace Deferred with Promise in KafkaProducer, remove Dispatcher #587
Conversation
): ProducerRecord[K, V] => F[F[(ProducerRecord[K, V], RecordMetadata)]] = | ||
record => | ||
asJavaRecord(keySerializer, valueSerializer, record).flatMap { javaRecord => | ||
Deferred[F, Either[Throwable, (ProducerRecord[K, V], RecordMetadata)]].flatMap { deferred => | ||
F.delay(Promise[(ProducerRecord[K, V], RecordMetadata)]()).flatMap { promise => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit concerned about this change. But my concern is mostly ideological.
Why do we have to use Promise
in place of Deferred
? As conceived, Deferred
should work as a pure functional Promise
. Does this case show that cats-effect primitives are not strong enough to cover all impure code interop use cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this case show that cats-effect primitives are not strong enough to cover all impure code interop use cases?
In some sense, maybe yes - we need a side effect to be performed by a piece of impure library code, so we have to provide a side-effecting callback.
My first idea was to add unsafeComplete
to Deferred
, initially by copy-pasting the implementation from cats-effect. That's straightforward to do, since the default implementation is just a functional wrapper around a Java AtomicReference
that holds a bunch of impure callbacks. Then it was pointed out to me on Gitter that I could use Promise
instead and I did so since it seemed simpler. I'm completely open to revising that decision, but I feel like it doesn't have any real drawbacks given the very small scope in which it's being used.
As for the previous Dispatcher
-based version - wrapping a side-effect in Sync
and then immediately unwrapping again with Dispatcher
(which is what Dispatcher.unsafeRunSync(deferred.complete(???))
effectively does) doesn't seem to provide any particular benefit and adds significant extra complication. In particular it couples the creation effect of KafkaProducer
to the effect in which it runs, since Dispatcher
is (unavoidably) allocated as Resource[F, Dispatcher[F]]
. (I'm shortly going to open a PR for dual-effect KafkaProducer contstructors, which should make that clearer.)
Like I said though, I'm open to alternatives - particularly to adding unsafeComplete
to Deferred
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All that you said sounds reasonable. But in spite of this, we have to use impure primitive to keep all Kafka semantics in this place of code. Moreover, this is not the first try. We tried to use Async
, Deferred
, ConcurrentEffect
. And now Promise
. Looks like cats-effect missing something that will be able to handle this case.
Maybe this case should be examined more closely as a good corner case for the cats-effect?
): ProducerRecord[K, V] => F[F[(ProducerRecord[K, V], RecordMetadata)]] = | ||
record => | ||
asJavaRecord(keySerializer, valueSerializer, record).flatMap { javaRecord => | ||
Deferred[F, Either[Throwable, (ProducerRecord[K, V], RecordMetadata)]].flatMap { deferred => | ||
F.delay(Promise[(ProducerRecord[K, V], RecordMetadata)]()).flatMap { promise => | ||
F.blocking { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I understand, in ce3 code in this block will execute on the default blocking thread-pool. But it should work on a dedicated thread-pool, not default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is Producer, so the default blocking pool is fine. But this does make me notice a problem with #590 - if the user provided a custom pool then we should be using it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. We should fix this before 2.0 final release.
Something like this will be needed for #553, since Dispatcher is allocated as
Resource[F, Dispatcher[F]]
. This change also removes a potential performance bottleneck (since Dispatcher does everything on a single fiber), and a potential source of bugs around Dispatcher lifecycle (it cancels running fibers when the resource is released - I've seen some error logs in tests relating to this, though I couldn't track down the cause.)