-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fence writer zombies (breaking change) #255
base: main
Are you sure you want to change the base?
Conversation
36f5b85
to
0e62a0d
Compare
send(events, offsets, new ConsumerGroupMetadata(config.controlGroupId())); | ||
send(ImmutableList.of(), offsets, new ConsumerGroupMetadata(config.connectGroupId())); | ||
send(events, offsets, consumerGroupMetadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Notice how we commit offsets against only one consumer group now; the connect-<connector-name>
consumer group.
We no longer commit source topic offsets to the config.controlGroupId
and TBH I don't understand why we ever did since we could have always taken this approach (irrespective of zombie fencing) cc @bryanck if you can shed any light here as to why this was necessary in the past or if it was just an oversight.
try { | ||
groupMetadata = KafkaUtils.consumerGroupMetadata(context); | ||
} catch (IllegalArgumentException e) { | ||
LOG.warn("Could not extract ConsumerGroupMetadata from consumer inside Kafka Connect, falling back to simple ConsumerGroupMetadata which can result in duplicates from zombie tasks"); | ||
groupMetadata = new ConsumerGroupMetadata(config.connectGroupId()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We fetch the consumer-group-metadata via reflection from inside the Kafka Connect framework. This is technically unsafe as we are relying on private, implementation details. Hence I also implemented falling back to simple ConsumerGroupMetadata (which is basically what we were doing previously) and does not do zombie fencing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just fail?
private static final String WorkerSinkTaskContextClassName = | ||
WorkerSinkTaskContext.class.getName(); | ||
|
||
@SuppressWarnings("unchecked") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worth a comment around using reflection to get at some very specific implementation detail stuff here but otherwise 👍
@@ -170,6 +172,105 @@ from the classpath are loaded. Next, if `iceberg.hadoop-conf-dir` is specified, | |||
are loaded from that location. Finally, any `iceberg.hadoop.*` properties from the sink config are | |||
applied. When merging these, the order of precedence is sink config > config dir > classpath. | |||
|
|||
# Upgrade |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice docs. Thanks for looking out for the users.
0e62a0d
to
682a5ad
Compare
Out of scope for this PR: Coordinator zombie fencing