-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature][Kudu] Refactor Kudu functionality and Sink support CDC data. #5437
Conversation
docs/en/connector-v2/sink/Kudu.md
Outdated
|
||
## Key features | ||
|
||
- [x] [batch](../../concept/connector-v2-features.md) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that you have added the features of Source to Sink's documentation
| TIMESTAMP | UNIXTIME_MICROS | | ||
| BYTES | BINARY | | ||
|
||
## Sink Options |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Format have some error:
| UNIXTIME_MICROS | TIMESTAMP | | ||
| BINARY | BYTES | | ||
|
||
## Source Options |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kudu_masters = "kudu-master-cdc:7051" | ||
table_name = "kudu_sink_table" | ||
} | ||
} | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add examples for kerberos enable.
source_table_name = "kudu" | ||
kudu_masters = "kudu-master:7051" | ||
table_name = "kudu_sink_table" | ||
} | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add kerberos enable examples.
parameters[i] = new Long[] {start, end}; | ||
start = end + 1; | ||
private void addPendingSplit(Collection<KuduSourceSplit> splits) { | ||
int readerCount = enumeratorContext.currentParallelism(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need synchronized (stateLock)
too.
@Override | ||
public void registerReader(int subtaskId) { | ||
log.debug("Register reader {} to KuduSourceSplitEnumerator.", subtaskId); | ||
if (!pendingSplits.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need synchronized (stateLock) too.
} | ||
|
||
@Override | ||
public KuduSourceState snapshotState(long checkpointId) throws Exception { | ||
return null; | ||
synchronized (stateLock) { | ||
return new KuduSourceState(pendingSplits); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return new KuduSourceState(pendingSplits); | |
return new KuduSourceState(new HashMap<>(pendingSplits)); |
} | ||
Configuration conf = new Configuration(); | ||
conf.set(HADOOP_AUTH_KEY, KRB); | ||
UserGroupInformation.setConfiguration(conf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UserGroupInformation
is s static Class, If there are more than one place use UserGroupInformation.setConfiguration(conf);
in one JVM, this will generate mutual coverage. So new a UserGroupInformation Object is more suggested approach.
docs/en/connector-v2/source/Kudu.md
Outdated
|
||
## Key features | ||
|
||
- [x] [batch](../../concept/connector-v2-features.md) | ||
- [ ] [stream](../../concept/connector-v2-features.md) | ||
- [x] [stream](../../concept/connector-v2-features.md) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the code it seems not supported stream
} | ||
|
||
private static UserGroupInformation loginAndReturnUgi(CommonConfig config) throws IOException { | ||
if (StringUtils.isBlank(config.getPrincipal()) || StringUtils.isBlank(config.getKeytab())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synchronized(UserGroupInformation.class) {
}
Can fix UGI coverage problem.
type = {EngineType.FLINK, EngineType.SPARK}, | ||
disabledReason = "Currently SPARK and FLINK do not support cdc") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove flink ?
type = {EngineType.FLINK, EngineType.SPARK}, | |
disabledReason = "Currently SPARK and FLINK do not support cdc") | |
type = {EngineType.SPARK}, | |
disabledReason = "Currently SPARK do not support cdc") |
.untilAsserted( | ||
() -> { | ||
System.out.println(readData(KUDU_SINK_TABLE).size()); | ||
Assertions.assertEquals(readData(KUDU_SINK_TABLE).size(), 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert rows & field data?
LGTM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM if CI complete.
#5240
1.Refactored the process of Kudu tablet splitting.
2.Adjusted the mapping of error types.
3.Sink Supported CDC Data (Change Data Capture).
4.Supported Kerberos.
5.add e2e
Purpose of this pull request
Check list
New License Guide
release-note
.