-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Guarantee correct ordering across partitions #18
Conversation
db5bafd
to
e0afd4c
Compare
e0afd4c
to
542019f
Compare
futures.values().forEach(g -> g.cancel(true)); | ||
if (isRetriableException(e)) { | ||
if (context != null) { | ||
context.timeout(config.getRetryBackoffMs()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be set once during initialization? I guess changing this config requires restart anyways?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has to be set every time since Kafka Connect resets it (otherwise you would just retry forever from this point onwards) https://github.com/apache/kafka/blob/9b468fb278701be836a2641650356907bf84860a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L330-L334
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah got it - thanks for the code pointer!
f.get(); | ||
} catch (Exception e) { | ||
// Stop all tasks still executing since this put() will be retried anyway | ||
futures.values().forEach(g -> g.cancel(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for my own understanding - does cancelling cause other tasks to throw ConnectException
even if the current future throws RetriableException
? Does that halt from sending more requests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This cancels the thread which will throw an InterruptedException, but that's not handled anywhere. We also don't process the futures for any of the other writes once we see one of them has failed, so even if they did throw an exception it wouldn't matter. We're doing here is propagating the first error that we see. So if there are a combination of RetriableExceptions and ConnectExceptions across all the futures, just the first one will win.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a unit test to cover such scenario?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great! A unit test for failure scenario (if not already exists) will be good to add
e448d7e
to
38bcb06
Compare
38bcb06
to
723bae7
Compare
Previously due to calls to RocksetSinkTask::put() just registering async calls to Rockset, certain combinations of configured parallelism, topic partition count, and occurrence of retries could lead to events for a partition being delivered out of order to Rockset's API.
This solves that and as an added bonus simplifies the connector logic and pushes the responsibility of retries up to Kafka Connect instead of handling it internally in the sink task.
Also took the opportunity to remove some deprecated configuration options and bump the version to a new major version– 2.0.0.