-
Notifications
You must be signed in to change notification settings - Fork 102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Threadsafe transactions #888
Threadsafe transactions #888
Conversation
Introduces a semaphore around the transactional produce `WithProducer` implementation. This is because only one transaction may be started per producer at any one time, so produce requests must be queued.
modules/core/src/main/scala/fs2/kafka/internal/WithProducer.scala
Outdated
Show resolved
Hide resolved
Adds separate `WithProducer` implementation that uses a semaphore for guaranteeing exclusive access to the producer for concurrent transactional operations.
val close = withProducer.exclusiveAccess { | ||
case (producer, blocking) => | ||
blocking(producer.close(settings.producerSettings.closeTimeout.asJava)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realised that closing the producer should probably have exclusive access too!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure whether it's necessary but definitely shouldn't do any harm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think you're right it might not be necessary, because the producer will wait for any in flight records to be flushed or the provided timeout to expire. From the perspective of not letting any other records get added to the producer during shutdown this is quite nice, but this shouldn't happen thanks to Resource
, unless someone has done something silly like leak the producer out of the resource.
It also changes the behaviour of the closeTimeout
setting for the transactional producer; we're using the semaphore to sequence transactions where we wait for the client buffer to flush and transaction comitted to the broker, the closeTimeout
effectively becomes redundant when we use the semaphore with close
because the buffer will always be empty. Given this do you think it's better to remove the exclusiveAccess
call here?
Add comment explaining why exclusive access there is bad
@bplommer it's probably not a problem because no one has reported any issues with concurrent access, but this change will make concurrent access to a single producer effectively single-threaded. I wonder if it's worth updating the documentation noting this, maybe even providing guidance on pooling producers? |
@janstenpickle that does sound worthwhile - I'm happy to merge this PR without it and am unlikely to get to it myself in the near future, but if you want to add that change here or in a follow-up PR (or just open an issue) you're more than welcome to 😅 |
Yes, of course. Happy to add the warning, I haven't experimented with pooled transactional producers myself, so probably won't add anything other than a vague recommendation for now. As you said elsewhere pooling functionality something that could be added to |
Addresses a bug with concurrent transactional producer access discovered in #883.
Introduces a semaphore around the transactional produce
WithProducer
implementation. This is because only one transaction may be started per producer at any one time, so produce requests must be queued.