Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow transaction without upstream offsets #883

Merged

Conversation

janstenpickle
Copy link
Contributor

Kafka supports performing transactions without sending upstream offsets. The use case for this is where one might want to atomically send multiple messages as the result of a single action that has no associated upstream messages.

I've just hacked this together so far, but if the maintainers agree with the approach I'll add tests and create a PR for the 3.x series also.

@bplommer
Copy link
Member

bplommer commented Mar 5, 2022

This looks good to me, except that the overloaded produce is source-breaking - hence one of the CI jobs failing on building the docs. Maybe call it produceWithoutOffsets or something like that?

What did you have in mind for a pull request against the 3.x branch? I wonder if we should change TransactionalProducerRecords to allow it to contain zero commutable offsets, and have ProducerRecords extend it (variance is confusing!) - then TransactionalKafkaProducer) could extend KafkaProducerand just have its input type widened fromProducerRecordstoTransactionalProducerRecords`. What do you think?

@janstenpickle
Copy link
Contributor Author

janstenpickle commented Mar 5, 2022

This looks good to me, except that the overloaded produce is source-breaking - hence one of the CI jobs failing on building the docs. Maybe call it produceWithoutOffsets or something like that?

That sounds good, I'll name the class accordingly as well.

What did you have in mind for a pull request against the 3.x branch? I wonder if we should change TransactionalProducerRecords to allow it to contain zero commutable offsets, and have ProducerRecords extend it (variance is confusing!) - then TransactionalKafkaProducer) could extend KafkaProducerand just have its input type widened fromProducerRecordstoTransactionalProducerRecords`. What do you think?

That might be tricky because TransactionalProducerRecords has a layer before ProducerRecords, which is CommittableProducerRecords. So ProducerRecords contains a chunk of ProducerRecord and TransactionalProducerRecords contains a chunk of CommittableProducerRecords, which in turn contains ProducerRecords. Meaning we probably can't do widening here without a bit of a rethink around the collections

@janstenpickle janstenpickle changed the title [WIP] Allow transaction without upstream offsets Allow transaction without upstream offsets Mar 5, 2022
@janstenpickle janstenpickle marked this pull request as ready for review March 5, 2022 16:25
Also reorder implementation
@bplommer
Copy link
Member

bplommer commented Mar 5, 2022

Unrelated to the changes in this PR, but it looks like there's nothing to stop concurrent transactional operations from interfering with each other - WithProducer should use a one-permit semaphore to prevent this. Or am I missing something? Cc @vlovgr

@janstenpickle
Copy link
Contributor Author

janstenpickle commented Mar 5, 2022

Unrelated to the changes in this PR, but it looks like there's nothing to stop concurrent transactional operations from interfering with each other - WithProducer should use a one-permit semaphore to prevent this. Or am I missing something? Cc @vlovgr

I'm not 100% certain how transactions are handled in the producer, but looking at this I'm certain you're right. I'd imagine no bug has been reported because most people probably use one producer per stream.

The reminds me of the issue I reported with Hotswap, that had been ported from FS2 to CE3. It was only ever being used by one stream in FS2 usages, so was never used in multi-threaded code.

@janstenpickle
Copy link
Contributor Author

Unrelated to the changes in this PR, but it looks like there's nothing to stop concurrent transactional operations from interfering with each other - WithProducer should use a one-permit semaphore to prevent this. Or am I missing something? Cc @vlovgr

Confirmed in a test, I'll raise a PR shortly:

image

@bplommer bplommer requested review from LMnet and vlovgr March 7, 2022 10:21
@janstenpickle
Copy link
Contributor Author

I've branched off this PR with the concurrency fix, I'll raise once this is merged: janstenpickle/fs2-kafka@no-offsets-transactional-producer...janstenpickle:threadsafe-transactions

@bplommer
Copy link
Member

bplommer commented Mar 7, 2022

The concurrency fix looks good - just a few things:

  • I think it would be cleaner to introduce the semaphore in WithProducer.scala rather than here, so that the need for a permit is encapsulated behind the withProducer call.
  • It looks like the test is just verifying that nothing crashes rather than actually checking what was produced - that's fine I think, but the naming is a bit misleading in that regard.
  • Since this is a bugfix, would you mind opening it against series/1.x and I'll merge it forward?

@bplommer bplommer merged commit 8f9c1e9 into fd4s:series/2.x Mar 7, 2022
@janstenpickle
Copy link
Contributor Author

Thanks @bplommer, I'll make the changes

@janstenpickle janstenpickle deleted the no-offsets-transactional-producer branch March 7, 2022 15:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants