From 98add7f6e3ae0a6749029f50c5f31a7472c00fd2 Mon Sep 17 00:00:00 2001 From: Emor-nj Date: Thu, 7 Jul 2022 11:00:26 +0800 Subject: [PATCH 1/5] [Connector-V2]Add Hudi Source --- plugin-mapping.properties | 1 + pom.xml | 8 +- seatunnel-connectors-v2-dist/pom.xml | 5 + .../connector-hudi/pom.xml | 61 ++++++++ .../hudi/config/HudiSourceConfig.java | 34 ++++ .../hudi/exception/HudiPluginException.java | 29 ++++ .../seatunnel/hudi/source/HudiSource.java | 145 ++++++++++++++++++ .../hudi/source/HudiSourceReader.java | 141 +++++++++++++++++ .../hudi/source/HudiSourceSplit.java | 45 ++++++ .../source/HudiSourceSplitEnumerator.java | 144 +++++++++++++++++ .../hudi/source/HudiSourceState.java | 35 +++++ .../seatunnel/hudi/util/HudiUtil.java | 108 +++++++++++++ seatunnel-connectors-v2/pom.xml | 1 + tools/dependencies/known-dependencies.txt | 66 +++++++- 14 files changed, 821 insertions(+), 2 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-hudi/pom.xml create mode 100644 seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java create mode 100644 seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/exception/HudiPluginException.java create mode 100644 seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java create mode 100644 seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.java create mode 100644 seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplit.java create mode 100644 seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplitEnumerator.java create mode 100644 seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceState.java create mode 100644 seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/util/HudiUtil.java diff --git a/plugin-mapping.properties b/plugin-mapping.properties index bfd9bb70ccb..51b7b9c3ebf 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -105,3 +105,4 @@ seatunnel.sink.Jdbc = connector-jdbc seatunnel.sink.HdfsFile = connector-file-hadoop seatunnel.sink.LocalFile = connector-file-local seatunnel.source.Pulsar = connector-pulsar +seatunnel.source.Hudi = connector-hudi diff --git a/pom.xml b/pom.xml index 5ac90416a68..ba68322bcd2 100644 --- a/pom.xml +++ b/pom.xml @@ -109,7 +109,7 @@ 4.1.0 0.13.1 1.13.6 - 0.10.0 + 0.11.1 1.5.6 2.3.9 1.2 @@ -499,6 +499,12 @@ ${flink.version} + + org.apache.hudi + hudi-hadoop-mr-bundle + ${hudi.version} + + org.apache.hudi hudi-spark-bundle_${scala.binary.version} diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml index 4c5d65a82ef..fe37965a1b8 100644 --- a/seatunnel-connectors-v2-dist/pom.xml +++ b/seatunnel-connectors-v2-dist/pom.xml @@ -91,6 +91,11 @@ connector-file-local ${project.version} + + org.apache.seatunnel + connector-hudi + ${project.version} + diff --git a/seatunnel-connectors-v2/connector-hudi/pom.xml b/seatunnel-connectors-v2/connector-hudi/pom.xml new file mode 100644 index 00000000000..26b3c7b7c0b --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/pom.xml @@ -0,0 +1,61 @@ + + + + + seatunnel-connectors-v2 + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-hudi + + + + + org.apache.seatunnel + seatunnel-hive-shade + ${project.version} + + + + org.apache.seatunnel + seatunnel-api + ${project.version} + + + + org.apache.hudi + hudi-hadoop-mr-bundle + + + + org.apache.commons + commons-lang3 + + + + junit + junit + + + diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java new file mode 100644 index 00000000000..9f1b86e94a2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.config; + +public class HudiSourceConfig { + + public static final String TABLE_PATH = "table.path"; + + public static final String TABLE_TYPE = "table.type"; + + public static final String CONF_FILES = "conf.files"; + + public static final String USE_KERBEROS = "use.kerberos"; + + public static final String KERBEROS_PRINCIPAL = "kerberos.principal"; + + public static final String KERBEROS_PRINCIPAL_FILE = "kerberos.principal.file"; + +} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/exception/HudiPluginException.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/exception/HudiPluginException.java new file mode 100644 index 00000000000..6971e029e08 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/exception/HudiPluginException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.exception; + +public class HudiPluginException extends Exception{ + + public HudiPluginException(String message) { + super(message); + } + + public HudiPluginException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java new file mode 100644 index 00000000000..dbd68da7f50 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.source; + +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.CONF_FILES; +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.KERBEROS_PRINCIPAL; +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.KERBEROS_PRINCIPAL_FILE; +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.TABLE_PATH; +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.TABLE_TYPE; +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.USE_KERBEROS; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelContext; +import org.apache.seatunnel.api.serialization.DefaultSerializer; +import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiPluginException; +import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.io.IOException; + +@AutoService(SeaTunnelSource.class) +public class HudiSource implements SeaTunnelSource { + + private SeaTunnelContext seaTunnelContext; + + private SeaTunnelRowType typeInfo; + + private String filePath; + + private String tablePath; + + private String confFiles; + + private boolean useKerberos = false; + + @Override + public String getPluginName() { + return "Hudi"; + } + + @Override + public void prepare(Config pluginConfig) { + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, TABLE_PATH, CONF_FILES); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); + } + // default hudi table tupe is cow + // TODO: support hudi mor table + // TODO: support Incremental Query and Read Optimized Query + if (!"cow".equalsIgnoreCase(pluginConfig.getString(TABLE_TYPE))) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Do not support hudi mor table yet!"); + } + try { + this.confFiles = pluginConfig.getString(CONF_FILES); + this.tablePath = pluginConfig.getString(TABLE_PATH); + if (CheckConfigUtil.isValidParam(pluginConfig, USE_KERBEROS)) { + this.useKerberos = pluginConfig.getBoolean(USE_KERBEROS); + if (this.useKerberos) { + CheckResult kerberosCheckResult = CheckConfigUtil.checkAllExists(pluginConfig, KERBEROS_PRINCIPAL, KERBEROS_PRINCIPAL_FILE); + if (!kerberosCheckResult.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); + } + HudiUtil.initKerberosAuthentication(HudiUtil.getConfiguration(this.confFiles), pluginConfig.getString(KERBEROS_PRINCIPAL), pluginConfig.getString(KERBEROS_PRINCIPAL_FILE)); + } + } + this.filePath = HudiUtil.getParquetFileByPath(this.confFiles, tablePath); + if (this.filePath == null) { + throw new HudiPluginException(String.format("%s has no parquet file, please check!", tablePath)); + } + // should read from config or read from hudi metadata( wait catlog done) + this.typeInfo = HudiUtil.getSeaTunnelRowTypeInfo(this.confFiles, this.filePath); + + } catch (HudiPluginException | IOException e) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Prepare HudiSource error.", e); + } + + } + + @Override + public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) { + this.seaTunnelContext = seaTunnelContext; + } + + @Override + public SeaTunnelDataType getProducedType() { + return this.typeInfo; + } + + @Override + public SourceReader createReader(SourceReader.Context readerContext) throws Exception { + return new HudiSourceReader(this.confFiles, readerContext, typeInfo); + } + + @Override + public Boundedness getBoundedness() { + // Only support Snapshot Query now. + // After support Incremental Query and Read Optimized Query, we should supoort UNBOUNDED. + // TODO: support UNBOUNDED + return Boundedness.BOUNDED; + } + + @Override + public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws Exception { + return new HudiSourceSplitEnumerator(enumeratorContext, tablePath, this.confFiles); + } + + @Override + public SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, HudiSourceState checkpointState) throws Exception { + return new HudiSourceSplitEnumerator(enumeratorContext, tablePath, this.confFiles, checkpointState); + } + + @Override + public Serializer getEnumeratorStateSerializer() { + return new DefaultSerializer<>(); + } +} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.java new file mode 100644 index 00000000000..15127ae5ba1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.source; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Properties; +import java.util.Set; + +public class HudiSourceReader implements SourceReader { + + private static final long THREAD_WAIT_TIME = 500L; + + private String confPaths; + + private Set sourceSplits; + + private final SourceReader.Context context; + + private SeaTunnelRowType seaTunnelRowType; + + public HudiSourceReader(String confPaths, SourceReader.Context context, SeaTunnelRowType seaTunnelRowType) { + this.confPaths = confPaths; + this.context = context; + this.sourceSplits = new HashSet<>(); + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public void open() { + } + + @Override + public void close() { + + } + + @Override + public void pollNext(Collector output) throws Exception { + if (sourceSplits.isEmpty()) { + Thread.sleep(THREAD_WAIT_TIME); + return; + } + Configuration configuration = HudiUtil.getConfiguration(this.confPaths); + JobConf jobConf = HudiUtil.toJobConf(configuration); + sourceSplits.forEach(source -> { + try { + HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat(); + RecordReader reader = inputFormat.getRecordReader(source.getInputSplit(), jobConf, Reporter.NULL); + ParquetHiveSerDe serde = new ParquetHiveSerDe(); + Properties properties = new Properties(); + List types = new ArrayList<>(); + for (SeaTunnelDataType type: seaTunnelRowType.getFieldTypes()) { + types.add(type.getSqlType().name()); + } + String columns = StringUtils.join(seaTunnelRowType.getFieldNames(), ","); + String columnTypes = StringUtils.join(types, ",").toLowerCase(Locale.ROOT); + properties.setProperty("columns", columns); + properties.setProperty("columns.types", columnTypes); + serde.initialize(jobConf, properties); + StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector(); + List fields = inspector.getAllStructFieldRefs(); + NullWritable key = reader.createKey(); + ArrayWritable value = reader.createValue(); + while (reader.next(key, value)) { + Object[] datas = new Object[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + Object data = inspector.getStructFieldData(value, fields.get(i)); + if (null != data) { + datas[i] = String.valueOf(data); + } else { + datas[i] = null; + } + } + output.collect(new SeaTunnelRow(datas)); + } + reader.close(); + } catch (Exception e) { + throw new RuntimeException("Hudi source read error", e); + } + + }); + context.signalNoMoreElement(); + } + + @Override + public List snapshotState(long checkpointId) { + return new ArrayList<>(sourceSplits); + } + + @Override + public void addSplits(List splits) { + sourceSplits.addAll(splits); + } + + @Override + public void handleNoMoreSplits() { + + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + + } +} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplit.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplit.java new file mode 100644 index 00000000000..b08f6f68e11 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplit.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.source; + +import org.apache.seatunnel.api.source.SourceSplit; + +import org.apache.hadoop.mapred.InputSplit; + +public class HudiSourceSplit implements SourceSplit { + + private static final long serialVersionUID = -1L; + + private String splitId; + + private InputSplit inputSplit; + + public HudiSourceSplit(String splitId, InputSplit inputSplit) { + this.splitId = splitId; + this.inputSplit = inputSplit; + } + + @Override + public String splitId() { + return this.splitId; + } + + public InputSplit getInputSplit() { + return this.inputSplit; + } +} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplitEnumerator.java new file mode 100644 index 00000000000..42b072af6c1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplitEnumerator.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.source; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class HudiSourceSplitEnumerator implements SourceSplitEnumerator { + + private final Context context; + private Set pendingSplit; + private Set assignedSplit; + private String tablePath; + private String confPaths; + + public HudiSourceSplitEnumerator(SourceSplitEnumerator.Context context, String tablePath, String confPaths) { + this.context = context; + this.tablePath = tablePath; + this.confPaths = confPaths; + } + + public HudiSourceSplitEnumerator(SourceSplitEnumerator.Context context, String tablePath, + String confPaths, + HudiSourceState sourceState) { + this(context, tablePath, confPaths); + this.assignedSplit = sourceState.getAssignedSplit(); + } + + @Override + public void open() { + this.assignedSplit = new HashSet<>(); + this.pendingSplit = new HashSet<>(); + } + + @Override + public void run() throws Exception { + pendingSplit = getHudiSplit(); + assignSplit(context.registeredReaders()); + } + + private Set getHudiSplit() throws IOException { + Set hudiSourceSplits = new HashSet<>(); + Path path = new Path(tablePath); + Configuration configuration = HudiUtil.getConfiguration(confPaths); + JobConf jobConf = HudiUtil.toJobConf(configuration); + FileInputFormat.setInputPaths(jobConf, path); + HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat(); + inputFormat.setConf(jobConf); + for (InputSplit split: inputFormat.getSplits(jobConf, 0)) { + hudiSourceSplits.add(new HudiSourceSplit(split.toString(), split)); + } + return hudiSourceSplits; + } + + @Override + public void close() throws IOException { + + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + if (!splits.isEmpty()) { + pendingSplit.addAll(splits); + assignSplit(Collections.singletonList(subtaskId)); + } + } + + private void assignSplit(Collection taskIDList) { + Map> readySplit = new HashMap<>(Common.COLLECTION_SIZE); + for (int taskID : taskIDList) { + readySplit.computeIfAbsent(taskID, id -> new ArrayList<>()); + } + + pendingSplit.forEach(s -> readySplit.get(getSplitOwner(s.splitId(), taskIDList.size())) + .add(s)); + readySplit.forEach(context::assignSplit); + assignedSplit.addAll(pendingSplit); + pendingSplit.clear(); + } + + private static int getSplitOwner(String tp, int numReaders) { + return tp.hashCode() % numReaders; + } + + @Override + public int currentUnassignedSplitSize() { + return pendingSplit.size(); + } + + @Override + public void registerReader(int subtaskId) { + if (!pendingSplit.isEmpty()) { + assignSplit(Collections.singletonList(subtaskId)); + } + } + + @Override + public HudiSourceState snapshotState(long checkpointId) { + return new HudiSourceState(assignedSplit); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + + } + + @Override + public void handleSplitRequest(int subtaskId) { + + } +} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceState.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceState.java new file mode 100644 index 00000000000..6235ca694ae --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceState.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.source; + +import java.io.Serializable; +import java.util.Set; + +public class HudiSourceState implements Serializable { + + + private Set assignedSplit; + + public HudiSourceState(Set assignedSplit) { + this.assignedSplit = assignedSplit; + } + + public Set getAssignedSplit() { + return assignedSplit; + } +} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/util/HudiUtil.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/util/HudiUtil.java new file mode 100644 index 00000000000..346232b9534 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/util/HudiUtil.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.util; + +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiPluginException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +import java.io.IOException; +import java.util.Arrays; + +public class HudiUtil { + + public static Configuration getConfiguration(String confPaths) { + Configuration configuration = new Configuration(); + Arrays.stream(confPaths.split(";")).forEach(file -> configuration.addResource(new Path(file))); + return configuration; + } + + public static String getParquetFileByPath(String confPaths, String path) throws IOException { + Configuration configuration = getConfiguration(confPaths); + FileSystem hdfs = FileSystem.get(configuration); + Path listFiles = new Path(path); + FileStatus[] stats = hdfs.listStatus(listFiles); + for (FileStatus fileStatus : stats) { + if (fileStatus.isDirectory()) { + String filePath = getParquetFileByPath(confPaths, fileStatus.getPath().toString()); + if (filePath == null) { + continue; + } else { + return filePath; + } + } + if (fileStatus.isFile()) { + if (fileStatus.getPath().toString().endsWith("parquet")) { + return fileStatus.getPath().toString(); + } + } + } + return null; + } + + public static SeaTunnelRowType getSeaTunnelRowTypeInfo(String confPaths, String path) throws HudiPluginException { + Configuration configuration = getConfiguration(confPaths); + Path dstDir = new Path(path); + ParquetMetadata footer; + try { + footer = ParquetFileReader.readFooter(configuration, dstDir, NO_FILTER); + } catch (IOException e) { + throw new HudiPluginException("Create ParquetMetadata Fail!", e); + } + MessageType schema = footer.getFileMetaData().getSchema(); + String[] fields = new String[schema.getFields().size()]; + SeaTunnelDataType[] types = new SeaTunnelDataType[schema.getFields().size()]; + + for (int i = 0; i < schema.getFields().size(); i++) { + fields[i] = schema.getFields().get(i).getName(); + types[i] = BasicType.STRING_TYPE; + } + return new SeaTunnelRowType(fields, types); + } + + public static JobConf toJobConf(Configuration conf) { + if (conf instanceof JobConf) { + return (JobConf) conf; + } + return new JobConf(conf); + } + + public static void initKerberosAuthentication(Configuration conf, String principal, String principalFile) throws HudiPluginException { + try { + UserGroupInformation.setConfiguration(conf); + UserGroupInformation.loginUserFromKeytab(principal, principalFile); + } catch (IOException e) { + throw new HudiPluginException("Kerberos Authorized Fail!", e); + } + + } + +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 01431229a3e..516c86cfce5 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -42,6 +42,7 @@ connector-socket connector-hive connector-file + connector-hudi connector-assert diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index 3ff6a42893b..cfa34a6128f 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -17,8 +17,10 @@ aggdesigner-algorithm-6.0.jar aggs-matrix-stats-client-6.3.1.jar aggs-matrix-stats-client-7.5.1.jar aircompressor-0.10.jar +aircompressor-0.15.jar airline-0.7.jar annotations-13.0.jar +annotations-17.0.0.jar ant-1.6.5.jar ant-1.9.1.jar ant-launcher-1.9.1.jar @@ -63,6 +65,7 @@ bcprov-jdk15on-1.68.jar bouncy-castle-bc-2.8.0-pkg.jar caffeine-2.8.0.jar caffeine-2.8.4.jar +caffeine-2.9.1.jar calcite-core-1.29.0.jar calcite-druid-1.29.0.jar calcite-linq4j-1.29.0.jar @@ -112,6 +115,7 @@ commons-logging-1.2.jar commons-math3-3.1.1.jar commons-math3-3.4.1.jar commons-math3-3.5.jar +commons-math3-3.6.1.jar commons-net-3.1.jar commons-net-3.6.jar commons-pool-1.6.jar @@ -137,6 +141,7 @@ derby-10.14.2.0.jar derbyclient-10.14.2.0.jar derbynet-10.14.2.0.jar disruptor-3.3.6.jar +disruptor-3.4.2.jar druid-aws-common-0.22.1.jar druid-console-0.22.1.jar druid-core-0.22.1.jar @@ -166,6 +171,7 @@ elasticsearch-x-content-6.3.1.jar elasticsearch-x-content-7.5.1.jar error_prone_annotations-2.3.4.jar error_prone_annotations-2.8.0.jar +error_prone_annotations-2.5.1.jar esri-geometry-api-2.2.0.jar extendedset-0.22.1.jar fastutil-6.5.6.jar @@ -187,6 +193,7 @@ flink-orc_2.11-1.13.6.jar flink-parquet_2.11-1.13.6.jar flink-shaded-hadoop-2-2.7.5-7.0.jar flink-statebackend-rocksdb_2.11-1.13.6.jar +fluent-hc-4.4.1.jar force-shading-1.13.6.jar frocksdbjni-5.17.2-ververica-2.1.jar geronimo-annotation_1.0_spec-1.1.1.jar @@ -208,70 +215,101 @@ guice-servlet-3.0.jar guice-servlet-4.0.jar guice-servlet-4.1.0.jar hadoop-annotations-2.6.5.jar +hadoop-annotations-2.10.0.jar hadoop-annotations-3.0.0.jar hadoop-auth-2.6.5.jar hadoop-auth-2.7.4.jar +hadoop-auth-2.10.0.jar hadoop-auth-3.0.0.jar hadoop-client-2.6.5.jar hadoop-client-3.0.0.jar hadoop-common-2.6.5.jar hadoop-common-2.7.7.jar +hadoop-common-2.10.0.jar hadoop-common-3.0.0.jar hadoop-distcp-2.7.4.jar +hadoop-distcp-2.10.0.jar hadoop-hdfs-2.6.5.jar hadoop-hdfs-2.7.4.jar +hadoop-hdfs-2.10.0.jar +hadoop-hdfs-client-2.10.0.jar hadoop-hdfs-client-3.0.0.jar hadoop-mapreduce-client-app-2.6.5.jar hadoop-mapreduce-client-common-2.6.5.jar hadoop-mapreduce-client-common-3.0.0.jar hadoop-mapreduce-client-core-2.6.5.jar hadoop-mapreduce-client-core-2.7.7.jar +hadoop-mapreduce-client-core-2.10.0.jar hadoop-mapreduce-client-core-3.0.0.jar hadoop-mapreduce-client-jobclient-2.6.5.jar hadoop-mapreduce-client-jobclient-3.0.0.jar hadoop-mapreduce-client-shuffle-2.6.5.jar hadoop-yarn-api-2.6.5.jar +hadoop-yarn-api-2.10.0.jar hadoop-yarn-api-3.0.0.jar hadoop-yarn-client-2.6.5.jar +hadoop-yarn-client-2.10.0.jar hadoop-yarn-client-3.0.0.jar hadoop-yarn-common-2.6.5.jar hadoop-yarn-common-2.7.7.jar +hadoop-yarn-common-2.10.0.jar hadoop-yarn-common-3.0.0.jar hadoop-yarn-server-common-2.6.5.jar hbase-annotations-2.0.0.jar +hbase-asyncfs-2.4.9.jar hbase-client-2.0.0.jar hbase-client-2.1.0.jar +hbase-client-2.4.9.jar hbase-common-2.0.0-tests.jar hbase-common-2.0.0.jar hbase-common-2.1.0.jar +hbase-common-2.4.9.jar hbase-hadoop-compat-2.0.0.jar hbase-hadoop-compat-2.1.0.jar +hbase-hadoop-compat-2.4.9.jar hbase-hadoop2-compat-2.0.0.jar hbase-hadoop2-compat-2.1.0.jar +hbase-hadoop2-compat-2.4.9.jar hbase-http-2.0.0.jar hbase-http-2.1.0.jar +hbase-http-2.4.9.jar +hbase-logging-2.4.9.jar hbase-mapreduce-2.0.0.jar hbase-mapreduce-2.1.0.jar hbase-metrics-2.0.0.jar hbase-metrics-2.1.0.jar +hbase-metrics-2.4.9.jar hbase-metrics-api-2.0.0.jar hbase-metrics-api-2.1.0.jar +hbase-metrics-api-2.4.9.jar hbase-procedure-2.0.0.jar hbase-procedure-2.1.0.jar +hbase-procedure-2.4.9.jar hbase-protocol-2.0.0.jar hbase-protocol-2.1.0.jar +hbase-protocol-2.4.9.jar hbase-protocol-shaded-2.0.0.jar hbase-protocol-shaded-2.1.0.jar +hbase-protocol-shaded-2.4.9.jar hbase-replication-2.0.0.jar hbase-replication-2.1.0.jar +hbase-replication-2.4.9.jar hbase-server-2.0.0.jar hbase-server-2.1.0.jar +hbase-server-2.4.9.jar +hbase-shaded-gson-3.5.1.jar +hbase-shaded-jersey-3.5.1.jar +hbase-shaded-jetty-3.5.1.jar hbase-shaded-miscellaneous-2.1.0.jar +hbase-shaded-miscellaneous-3.5.1.jar hbase-shaded-netty-2.1.0.jar +hbase-shaded-netty-3.5.1.jar hbase-shaded-protobuf-2.1.0.jar +hbase-shaded-protobuf-3.5.1.jar hbase-spark-1.0.0.jar hbase-zookeeper-2.0.0.jar hbase-zookeeper-2.1.0.jar +hbase-zookeeper-2.4.9.jar hibernate-validator-5.2.5.Final.jar hibernate-validator-6.2.2.Final.jar HikariCP-4.0.3.jar @@ -310,7 +348,10 @@ httpcore-4.4.4.jar httpcore-nio-4.4.4.jar httpmime-4.5.13.jar httpmime-4.5.2.jar -hudi-spark-bundle_2.11-0.10.0.jar +hudi-common-0.11.1.jar +hudi-hadoop-mr-0.11.1.jar +hudi-hadoop-mr-bundle-0.11.1.jar +hudi-spark-bundle_2.11-0.11.1.jar i18n-util-1.0.4.jar iceberg-api-0.13.1.jar iceberg-bundled-guava-0.13.1.jar @@ -359,6 +400,7 @@ jakarta.activation-api-1.2.1.jar jakarta.xml.bind-api-2.3.2.jar jakarta.activation-api-1.2.2.jar jakarta.annotation-api-1.3.5.jar +jakarta.inject-2.6.1.jar jakarta.servlet-api-4.0.4.jar jakarta.validation-api-2.0.2.jar jakarta.websocket-api-1.1.2.jar @@ -373,11 +415,13 @@ jasper-runtime-5.5.23.jar java-xmlbuilder-0.4.jar javassist-3.18.1-GA.jar javassist-3.20.0-GA.jar +javassist-3.25.0-GA.jar javax.activation-1.2.0.jar javax.activation-api-1.2.0.jar javax.annotation-api-1.2.jar javax.annotation-api-1.3.2.jar javax.el-3.0.0.jar +javax.el-3.0.1-b12.jar javax.el-api-3.0.0.jar javax.inject-1.jar javax.inject-2.4.0-b34.jar @@ -385,8 +429,10 @@ javax.inject-2.5.0-b32.jar javax.mail-1.5.6.jar javax.servlet-3.0.0.v201112011016.jar javax.servlet-api-3.1.0.jar +javax.servlet.jsp-2.3.2.jar javax.servlet.jsp-api-2.3.1.jar javax.ws.rs-api-2.0.1.jar +javax.ws.rs-api-2.1.1.jar jaxb-api-2.2.11.jar jaxb-api-2.2.2.jar jaxb-api-2.3.0.jar @@ -401,6 +447,7 @@ jcl-over-slf4j-1.7.16.jar jcl-over-slf4j-1.7.30.jar jcodings-1.0.18.jar jcodings-1.0.43.jar +jcodings-1.0.55.jar jcommander-1.81.jar jdbi-2.63.1.jar jedis-3.2.0.jar @@ -475,6 +522,7 @@ joda-time-2.9.9.jar joni-2.1.11.jar joni-2.1.2.jar joni-2.1.27.jar +joni-2.1.31.jar jopt-simple-5.0.2.jar jpam-1.1.jar jsch-0.1.54.jar @@ -580,6 +628,7 @@ memory-0.9.0.jar metrics-core-3.1.0.jar metrics-core-3.1.5.jar metrics-core-3.2.1.jar +metrics-core-3.2.6.jar metrics-core-4.0.0.jar metrics-json-3.1.0.jar metrics-json-3.1.5.jar @@ -605,6 +654,7 @@ netty-all-4.0.23.Final.jar netty-all-4.1.17.Final.jar netty-buffer-4.1.16.Final.jar netty-buffer-4.1.43.Final.jar +netty-buffer-4.1.45.Final.jar netty-buffer-4.1.68.Final.jar netty-codec-4.1.16.Final.jar netty-codec-4.1.43.Final.jar @@ -612,37 +662,47 @@ netty-codec-4.1.68.Final.jar netty-codec-dns-4.1.29.Final.jar netty-codec-http-4.1.16.Final.jar netty-codec-http-4.1.43.Final.jar +netty-codec-4.1.45.Final.jar netty-codec-http-4.1.68.Final.jar netty-codec-socks-4.1.29.Final.jar netty-common-4.1.16.Final.jar netty-common-4.1.43.Final.jar +netty-common-4.1.45.Final.jar netty-common-4.1.68.Final.jar netty-handler-4.1.16.Final.jar netty-handler-4.1.43.Final.jar +netty-handler-4.1.45.Final.jar netty-handler-4.1.68.Final.jar netty-handler-proxy-4.1.29.Final.jar netty-reactive-streams-2.0.0.jar netty-resolver-4.1.16.Final.jar netty-resolver-4.1.43.Final.jar +netty-resolver-4.1.45.Final.jar netty-resolver-4.1.68.Final.jar netty-resolver-dns-4.1.29.Final.jar netty-transport-4.1.16.Final.jar netty-transport-4.1.43.Final.jar +netty-transport-4.1.45.Final.jar netty-transport-4.1.68.Final.jar netty-transport-native-epoll-4.1.29.Final-linux-x86_64.jar netty-transport-native-unix-common-4.1.29.Final.jar +netty-transport-native-unix-common-4.1.45.Final.jar nimbus-jose-jwt-4.41.1.jar objenesis-2.5.1.jar okhttp-1.0.2.jar okhttp-2.4.0.jar +okhttp-2.7.5.jar okhttp-4.9.1.jar okio-1.4.0.jar +okio-1.6.0.jar okio-2.8.0.jar opencsv-2.3.jar opencsv-4.6.jar orc-core-1.5.6.jar +orc-core-1.6.0-nohive.jar orc-shims-1.5.2.jar orc-shims-1.5.6.jar +orc-shims-1.6.0.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.3.jar @@ -661,6 +721,7 @@ parquet-format-structures-1.11.1.jar parquet-hadoop-1.10.0.jar parquet-hadoop-1.11.1.jar parquet-hadoop-bundle-1.8.1.jar +parquet-hadoop-bundle-1.10.1.jar parquet-jackson-1.10.0.jar parquet-jackson-1.11.1.jar percolator-client-6.3.1.jar @@ -689,6 +750,7 @@ resilience4j-bulkhead-1.3.1.jar resilience4j-core-1.3.1.jar retrofit-2.9.0.jar rhino-1.7.11.jar +rocksdbjni-5.17.2.jar scala-compiler-2.11.12.jar scala-library-2.11.12.jar scala-parser-combinators_2.11-1.0.4.jar @@ -801,7 +863,9 @@ zkclient-0.3.jar zookeeper-3.3.1.jar zookeeper-3.4.10.jar zookeeper-3.4.6.jar +zookeeper-3.5.7.jar zookeeper-3.5.9.jar +zookeeper-jute-3.5.7.jar zookeeper-jute-3.5.9.jar zstd-jni-1.3.3-1.jar zstd-jni-1.4.3-1.jar From d737a6b25e800457d6e812df891892b9bbd750ff Mon Sep 17 00:00:00 2001 From: Emor-nj Date: Thu, 7 Jul 2022 11:32:29 +0800 Subject: [PATCH 2/5] fix dependency licenses --- tools/dependencies/known-dependencies.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index cfa34a6128f..1ebbed6581d 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -685,6 +685,7 @@ netty-transport-4.1.43.Final.jar netty-transport-4.1.45.Final.jar netty-transport-4.1.68.Final.jar netty-transport-native-epoll-4.1.29.Final-linux-x86_64.jar +netty-transport-native-epoll-4.1.45.Final.jar netty-transport-native-unix-common-4.1.29.Final.jar netty-transport-native-unix-common-4.1.45.Final.jar nimbus-jose-jwt-4.41.1.jar From 11102a3000007432060821866f4632a06da0d455 Mon Sep 17 00:00:00 2001 From: Emor-nj Date: Fri, 8 Jul 2022 10:08:17 +0800 Subject: [PATCH 3/5] add hudi souce v2 doc --- docs/en/connector-v2/source/Hudi.md | 57 +++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 docs/en/connector-v2/source/Hudi.md diff --git a/docs/en/connector-v2/source/Hudi.md b/docs/en/connector-v2/source/Hudi.md new file mode 100644 index 00000000000..e99db57796d --- /dev/null +++ b/docs/en/connector-v2/source/Hudi.md @@ -0,0 +1,57 @@ +# Hudi + +## Description + +Used to read data from Hudi. Currently, only supports hudi cow table and Snapshot Query with Batch Mode. + +## Options + +| name | type | required | default value | +|--------------------------|---------|----------|---------------| +| table.path | string | yes | - | +| table.type | string | yes | cow | +| conf.files | string | yes | - | +| use.kerberos | boolean | no | false | +| kerberos.principal | string | no | - | +| kerberos.principal.file | string | no | - | + +### table.path [string] + +`table.path` The hdfs root path of hudi table,such as 'hdfs://nameserivce/data/hudi/hudi_table/'. + +### table.type [string] + +`table.type` The type of hudi table. Now we only support 'cow', 'mor' is not support yet. + +### conf.files [string] + +`conf.files` The environment conf file path list(local path), which used to init hdfs client to read hudi table file. The example is '/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml'. + +### use.kerberos [boolean] + +`use.kerberos` Whether to enable Kerberos, default is false. + +### kerberos.principal [string] + +`kerberos.principal` When use kerberos, we should set kerberos princal such as 'test_user@YOUR_COMPANY.COM'. + +### kerberos.principal.file [string] + +`kerberos.principal.file` When use kerberos, we should set kerberos princal file such as '/home/test/test_user.keytab'. + +## Examples + +```hocon +source { + + Hudi { + table.path = "hdfs://nameserivce/data/hudi/hudi_table/" + table.type = "cow" + conf.files = "/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml" + use.kerberos = true + kerberos.principal = "test_user@YOUR_COMPANY.COM" + kerberos.principal.file = "/home/test/test_user.keytab" + } + +} +``` \ No newline at end of file From cb4f3c0b00befac2f3c32097c2c78a4230ec8b5b Mon Sep 17 00:00:00 2001 From: Emor-nj Date: Fri, 8 Jul 2022 10:10:27 +0800 Subject: [PATCH 4/5] fix doc --- docs/en/connector-v2/source/Hudi.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/source/Hudi.md b/docs/en/connector-v2/source/Hudi.md index e99db57796d..13fb2551a88 100644 --- a/docs/en/connector-v2/source/Hudi.md +++ b/docs/en/connector-v2/source/Hudi.md @@ -9,7 +9,7 @@ Used to read data from Hudi. Currently, only supports hudi cow table and Snapsho | name | type | required | default value | |--------------------------|---------|----------|---------------| | table.path | string | yes | - | -| table.type | string | yes | cow | +| table.type | string | yes | - | | conf.files | string | yes | - | | use.kerberos | boolean | no | false | | kerberos.principal | string | no | - | From 70b464e6a64c2ea13188ed55843a587df295d05e Mon Sep 17 00:00:00 2001 From: Emor-nj Date: Fri, 8 Jul 2022 10:13:57 +0800 Subject: [PATCH 5/5] fix doc links checked --- docs/en/connector-v2/source/Hudi.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/connector-v2/source/Hudi.md b/docs/en/connector-v2/source/Hudi.md index 13fb2551a88..4803b1f44f3 100644 --- a/docs/en/connector-v2/source/Hudi.md +++ b/docs/en/connector-v2/source/Hudi.md @@ -33,7 +33,7 @@ Used to read data from Hudi. Currently, only supports hudi cow table and Snapsho ### kerberos.principal [string] -`kerberos.principal` When use kerberos, we should set kerberos princal such as 'test_user@YOUR_COMPANY.COM'. +`kerberos.principal` When use kerberos, we should set kerberos princal such as 'test_user@xxx'. ### kerberos.principal.file [string] @@ -49,7 +49,7 @@ source { table.type = "cow" conf.files = "/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml" use.kerberos = true - kerberos.principal = "test_user@YOUR_COMPANY.COM" + kerberos.principal = "test_user@xxx" kerberos.principal.file = "/home/test/test_user.keytab" }