diff --git a/docs/en/connector-v2/sink/Hbase.md b/docs/en/connector-v2/sink/Hbase.md index 0f808f5e52c..3ceba0982d4 100644 --- a/docs/en/connector-v2/sink/Hbase.md +++ b/docs/en/connector-v2/sink/Hbase.md @@ -116,6 +116,79 @@ Hbase { all_columns = seatunnel } } + +``` + +### Multiple Table + +```hocon +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "hbase_sink_1" + fields { + name = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + rows = [ + { + kind = INSERT + fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 1627529632356] + } + ] + }, + { + schema = { + table = "hbase_sink_2" + fields { + name = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + rows = [ + { + kind = INSERT + fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true, 1627529632357] + } + ] + } + ] + } +} + +sink { + Hbase { + zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181" + table = "${table_name}" + rowkey_column = ["name"] + family_name { + all_columns = info + } + } +} ``` ## Writes To The Specified Column Family diff --git a/docs/zh/connector-v2/sink/Hbase.md b/docs/zh/connector-v2/sink/Hbase.md index edc9e48510e..f028a8c93ee 100644 --- a/docs/zh/connector-v2/sink/Hbase.md +++ b/docs/zh/connector-v2/sink/Hbase.md @@ -119,6 +119,78 @@ Hbase { ``` +### 写入多表 + +```hocon +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "hbase_sink_1" + fields { + name = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + rows = [ + { + kind = INSERT + fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 1627529632356] + } + ] + }, + { + schema = { + table = "hbase_sink_2" + fields { + name = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + rows = [ + { + kind = INSERT + fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true, 1627529632357] + } + ] + } + ] + } +} + +sink { + Hbase { + zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181" + table = "${table_name}" + rowkey_column = ["name"] + family_name { + all_columns = info + } + } +} +``` + ## 写入指定列族 ```hocon diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java index c25f04b3753..4d020700ad6 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.common.config.TypesafeConfigUtils; import lombok.Builder; @@ -80,44 +81,25 @@ public class HbaseParameters implements Serializable { @Builder.Default private HbaseConfig.EnCoding enCoding = ENCODING.defaultValue(); - public static HbaseParameters buildWithSinkConfig(Config pluginConfig) { + public static HbaseParameters buildWithConfig(ReadonlyConfig config) { HbaseParametersBuilder builder = HbaseParameters.builder(); // required parameters - builder.zookeeperQuorum(pluginConfig.getString(ZOOKEEPER_QUORUM.key())); - builder.table(pluginConfig.getString(TABLE.key())); - builder.rowkeyColumns(pluginConfig.getStringList(ROWKEY_COLUMNS.key())); - builder.familyNames( - TypesafeConfigUtils.configToMap(pluginConfig.getConfig(FAMILY_NAME.key()))); - - // optional parameters - if (pluginConfig.hasPath(HBASE_TTL_CONFIG.key())) { - builder.ttl(pluginConfig.getLong(HBASE_TTL_CONFIG.key())); - } - if (pluginConfig.hasPath(ROWKEY_DELIMITER.key())) { - builder.rowkeyDelimiter(pluginConfig.getString(ROWKEY_DELIMITER.key())); - } - if (pluginConfig.hasPath(VERSION_COLUMN.key())) { - builder.versionColumn(pluginConfig.getString(VERSION_COLUMN.key())); - } - if (pluginConfig.hasPath(NULL_MODE.key())) { - String nullMode = pluginConfig.getString(NULL_MODE.key()); - builder.nullMode(HbaseConfig.NullMode.valueOf(nullMode.toUpperCase())); - } - if (pluginConfig.hasPath(WAL_WRITE.key())) { - builder.walWrite(pluginConfig.getBoolean(WAL_WRITE.key())); - } - if (pluginConfig.hasPath(WRITE_BUFFER_SIZE.key())) { - builder.writeBufferSize(pluginConfig.getInt(WRITE_BUFFER_SIZE.key())); - } - if (pluginConfig.hasPath(ENCODING.key())) { - String encoding = pluginConfig.getString(ENCODING.key()); - builder.enCoding(HbaseConfig.EnCoding.valueOf(encoding.toUpperCase())); - } - if (pluginConfig.hasPath(HBASE_EXTRA_CONFIG.key())) { - Config extraConfig = pluginConfig.getConfig(HBASE_EXTRA_CONFIG.key()); - builder.hbaseExtraConfig(TypesafeConfigUtils.configToMap(extraConfig)); - } + builder.zookeeperQuorum(config.get(ZOOKEEPER_QUORUM)); + builder.rowkeyColumns(config.get(ROWKEY_COLUMNS)); + builder.familyNames(config.get(FAMILY_NAME)); + + builder.table(config.get(TABLE)); + builder.rowkeyDelimiter(config.get(ROWKEY_DELIMITER)); + builder.versionColumn(config.get(VERSION_COLUMN)); + String nullMode = String.valueOf(config.get(NULL_MODE)); + builder.nullMode(HbaseConfig.NullMode.valueOf(nullMode.toUpperCase())); + builder.walWrite(config.get(WAL_WRITE)); + builder.writeBufferSize(config.get(WRITE_BUFFER_SIZE)); + String encoding = String.valueOf(config.get(ENCODING)); + builder.enCoding(HbaseConfig.EnCoding.valueOf(encoding.toUpperCase())); + builder.hbaseExtraConfig(config.get(HBASE_EXTRA_CONFIG)); + builder.ttl(config.get(HBASE_TTL_CONFIG)); return builder.build(); } diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java index 4f7b929223f..0c592dd65a0 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java @@ -19,33 +19,20 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; -import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters; -import org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException; - -import com.google.auto.service.AutoService; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE; -import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM; - -@AutoService(SeaTunnelSink.class) -public class HbaseSink extends AbstractSimpleSink { +public class HbaseSink extends AbstractSimpleSink + implements SupportMultiTableSink { private Config pluginConfig; @@ -62,34 +49,9 @@ public String getPluginName() { return HbaseSinkFactory.IDENTIFIER; } - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { - this.pluginConfig = pluginConfig; - CheckResult result = - CheckConfigUtil.checkAllExists( - pluginConfig, - ZOOKEEPER_QUORUM.key(), - TABLE.key(), - ROWKEY_COLUMNS.key(), - FAMILY_NAME.key()); - if (!result.isSuccess()) { - throw new HbaseConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SINK, result.getMsg())); - } - this.hbaseParameters = HbaseParameters.buildWithSinkConfig(pluginConfig); - if (hbaseParameters.getFamilyNames().size() == 0) { - throw new HbaseConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - "The corresponding field options should be configured and should not be empty Refer to the hbase sink document"); - } - } - - @Override - public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { - this.seaTunnelRowType = seaTunnelRowType; + public HbaseSink(HbaseParameters hbaseParameters, CatalogTable catalogTable) { + this.hbaseParameters = hbaseParameters; + this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType(); for (String rowkeyColumn : hbaseParameters.getRowkeyColumns()) { this.rowkeyColumnIndexes.add(seaTunnelRowType.indexOf(rowkeyColumn)); } @@ -99,8 +61,7 @@ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { } @Override - public AbstractSinkWriter createWriter(SinkWriter.Context context) - throws IOException { + public HbaseSinkWriter createWriter(SinkWriter.Context context) throws IOException { return new HbaseSinkWriter( seaTunnelRowType, hbaseParameters, rowkeyColumnIndexes, versionColumnIndex); } diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java index 3038473c4ed..1bbeb43f4e3 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java @@ -18,8 +18,13 @@ package org.apache.seatunnel.connectors.seatunnel.hbase.sink; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters; import com.google.auto.service.AutoService; @@ -50,6 +55,7 @@ public OptionRule optionRule() { return OptionRule.builder() .required(ZOOKEEPER_QUORUM, TABLE, ROWKEY_COLUMNS, FAMILY_NAME) .optional( + SinkCommonOptions.MULTI_TABLE_SINK_REPLICA, ROWKEY_DELIMITER, VERSION_COLUMN, NULL_MODE, @@ -59,4 +65,11 @@ public OptionRule optionRule() { HBASE_EXTRA_CONFIG) .build(); } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + HbaseParameters hbaseParameters = HbaseParameters.buildWithConfig(context.getOptions()); + CatalogTable catalogTable = context.getCatalogTable(); + return () -> new HbaseSink(hbaseParameters, catalogTable); + } } diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java index 7683d6aab0b..e1e312d3057 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.hbase.sink; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -41,11 +42,11 @@ import java.io.IOException; import java.nio.charset.Charset; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; -public class HbaseSinkWriter extends AbstractSinkWriter { +public class HbaseSinkWriter extends AbstractSinkWriter + implements SupportMultiTableSinkWriter { private static final String ALL_COLUMNS = "all_columns"; @@ -63,7 +64,7 @@ public class HbaseSinkWriter extends AbstractSinkWriter { private final int versionColumnIndex; - private String writeAllColumnFamily; + private String defaultFamilyName = "value"; public HbaseSinkWriter( SeaTunnelRowType seaTunnelRowType, @@ -77,7 +78,8 @@ public HbaseSinkWriter( this.versionColumnIndex = versionColumnIndex; if (hbaseParameters.getFamilyNames().size() == 1) { - this.writeAllColumnFamily = hbaseParameters.getFamilyNames().get(ALL_COLUMNS); + defaultFamilyName = + hbaseParameters.getFamilyNames().getOrDefault(ALL_COLUMNS, defaultFamilyName); } // initialize hbase configuration @@ -132,14 +134,8 @@ private Put convertRowToPut(SeaTunnelRow row) { .collect(Collectors.toList()); for (Integer writeColumnIndex : writeColumnIndexes) { String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex); - // This is the family of columns that we define to be written through the.conf file - Map configurationFamilyNames = hbaseParameters.getFamilyNames(); String familyName = - configurationFamilyNames.getOrDefault(fieldName, writeAllColumnFamily); - if (!configurationFamilyNames.containsKey(ALL_COLUMNS) - && !configurationFamilyNames.containsKey(fieldName)) { - continue; - } + hbaseParameters.getFamilyNames().getOrDefault(fieldName, defaultFamilyName); byte[] bytes = convertColumnToBytes(row, writeColumnIndex); if (bytes != null) { put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(fieldName), bytes); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java index 85ceef92353..fe736f965ef 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java @@ -56,8 +56,13 @@ public class HbaseIT extends TestSuiteBase implements TestResource { private static final String TABLE_NAME = "seatunnel_test"; + private static final String ASSIGN_CF_TABLE_NAME = "assign_cf_table"; + private static final String MULTI_TABLE_ONE_NAME = "hbase_sink_1"; + + private static final String MULTI_TABLE_TWO_NAME = "hbase_sink_2"; + private static final String FAMILY_NAME = "info"; private Connection hbaseConnection; @@ -77,9 +82,14 @@ public void startUp() throws Exception { // Create table for hbase sink test log.info("initial"); hbaseCluster.createTable(TABLE_NAME, Arrays.asList(FAMILY_NAME)); + // Create table for hbase assign cf table sink test hbaseCluster.createTable(ASSIGN_CF_TABLE_NAME, Arrays.asList("cf1", "cf2")); table = TableName.valueOf(TABLE_NAME); tableAssign = TableName.valueOf(ASSIGN_CF_TABLE_NAME); + + // Create table for hbase multi-table sink test + hbaseCluster.createTable(MULTI_TABLE_ONE_NAME, Arrays.asList(FAMILY_NAME)); + hbaseCluster.createTable(MULTI_TABLE_TWO_NAME, Arrays.asList(FAMILY_NAME)); } @AfterAll @@ -93,7 +103,11 @@ public void tearDown() throws Exception { @TestTemplate public void testHbaseSink(TestContainer container) throws IOException, InterruptedException { - fakeToHbase(container); + deleteData(table); + Container.ExecResult sinkExecResult = container.executeJob("/fake-to-hbase.conf"); + Assertions.assertEquals(0, sinkExecResult.getExitCode()); + ArrayList results = readData(table); + Assertions.assertEquals(results.size(), 5); Container.ExecResult sourceExecResult = container.executeJob("/hbase-to-assert.conf"); Assertions.assertEquals(0, sourceExecResult.getExitCode()); } @@ -166,6 +180,25 @@ public void testHbaseSinkAssignCfSink(TestContainer container) Assertions.assertEquals(cf2Count, 5); } + @DisabledOnContainer( + value = {}, + type = {EngineType.FLINK}, + disabledReason = "Currently FLINK does not support multiple table write") + public void testHbaseMultiTableSink(TestContainer container) + throws IOException, InterruptedException { + TableName multiTable1 = TableName.valueOf(MULTI_TABLE_ONE_NAME); + TableName multiTable2 = TableName.valueOf(MULTI_TABLE_TWO_NAME); + deleteData(multiTable1); + deleteData(multiTable2); + Container.ExecResult sinkExecResult = + container.executeJob("/fake-to-hbase-with-multipletable.conf"); + Assertions.assertEquals(0, sinkExecResult.getExitCode()); + ArrayList results = readData(multiTable1); + Assertions.assertEquals(results.size(), 1); + results = readData(multiTable2); + Assertions.assertEquals(results.size(), 1); + } + @TestTemplate public void testHbaseSourceWithBatchQuery(TestContainer container) throws IOException, InterruptedException { @@ -200,4 +233,16 @@ private void deleteData(TableName table) throws IOException { hbaseTable.delete(deleteRow); } } + + public ArrayList readData(TableName table) throws IOException { + Table hbaseTable = hbaseConnection.getTable(table); + Scan scan = new Scan(); + ResultScanner scanner = hbaseTable.getScanner(scan); + ArrayList results = new ArrayList<>(); + for (Result result : scanner) { + results.add(result); + } + scanner.close(); + return results; + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-with-multipletable.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-with-multipletable.conf new file mode 100644 index 00000000000..8972bf13249 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/fake-to-hbase-with-multipletable.conf @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "hbase_sink_1" + fields { + name = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + rows = [ + { + kind = INSERT + fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 1627529632356] + } + ] + }, + { + schema = { + table = "hbase_sink_2" + fields { + name = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + } + } + rows = [ + { + kind = INSERT + fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true] + } + ] + } + ] + } +} + +sink { + Hbase { + zookeeper_quorum = "hbase_e2e:2181" + table = "${table_name}" + rowkey_column = ["name"] + family_name { + all_columns = info + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert-with-multipletable.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert-with-multipletable.conf new file mode 100644 index 00000000000..6dc7530b4bd --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-to-assert-with-multipletable.conf @@ -0,0 +1,129 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Hbase { + zookeeper_quorum = "hbase_e2e:2181" + table = "seatunnel_test" + query_columns=["rowkey", "info:age", "info:c_double", "info:c_boolean","info:c_bigint","info:c_smallint","info:c_tinyint","info:c_float"] + schema = { + columns = [ + { + name = rowkey + type = string + }, + { + name = "info:age" + type = int + }, + { + name = "info:c_double" + type = double + }, + { + name = "info:c_boolean" + type = boolean + }, + { + name = "info:c_bigint" + type = bigint + }, + { + name = "info:c_smallint" + type = smallint + }, + { + name = "info:c_tinyint" + type = tinyint + }, + { + name = "info:c_float" + type = float + } + ] + } + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 11 + }, + { + rule_type = MIN_ROW + rule_value = 11 + } + ], + field_rules = [ + { + field_name = rowkey + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = "info:c_boolean" + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = "info:c_double" + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = "info:c_bigint" + field_type = bigint + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = "info:age" + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file