diff --git a/docs/en/connector-v2/sink/Hbase.md b/docs/en/connector-v2/sink/Hbase.md index f2c4bcae24e..6efe11bfa4e 100644 --- a/docs/en/connector-v2/sink/Hbase.md +++ b/docs/en/connector-v2/sink/Hbase.md @@ -26,6 +26,7 @@ Output data to Hbase | encoding | string | no | utf8 | | hbase_extra_config | string | no | - | | common-options | | no | - | +| ttl | long | no | - | ### zookeeper_quorum [string] @@ -95,6 +96,10 @@ The encoding of string field, support [`utf8`, `gbk`], default `utf8` The extra configuration of hbase +### ttl [long] + +Hbase writes data TTL time, the default is based on the TTL set in the table, unit: milliseconds + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details @@ -114,6 +119,7 @@ Hbase { ``` + ### Multiple Table ```hocon @@ -185,6 +191,20 @@ sink { } ``` +## Writes To The Specified Column Family + +```hocon +Hbase { + zookeeper_quorum = "hbase_e2e:2181" + table = "assign_cf_table" + rowkey_column = ["id"] + family_name { + c_double = "cf1" + c_bigint = "cf2" + } +} +``` + ## Changelog ### next version diff --git a/docs/zh/connector-v2/sink/Hbase.md b/docs/zh/connector-v2/sink/Hbase.md index 871cad206c6..c46d83f4fb4 100644 --- a/docs/zh/connector-v2/sink/Hbase.md +++ b/docs/zh/connector-v2/sink/Hbase.md @@ -15,7 +15,7 @@ | 名称 | 类型 | 是否必须 | 默认值 | |--------------------|---------|------|-----------------| | zookeeper_quorum | string | yes | - | -| table | string | yes | - | +| table | string | no | - | | rowkey_column | list | yes | - | | family_name | config | yes | - | | rowkey_delimiter | string | no | "" | @@ -119,6 +119,78 @@ Hbase { ``` + +### 写入多表 + +```hocon +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "hbase_sink_1" + fields { + name = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + rows = [ + { + kind = INSERT + fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 1627529632356] + } + ] + }, + { + schema = { + table = "hbase_sink_2" + fields { + name = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + rows = [ + { + kind = INSERT + fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true, 1627529632357] + } + ] + } + ] + } +} + +sink { + Hbase { + zookeeper_quorum = "hbase:2181" + rowkey_column = ["name"] + family_name { + all_columns = info + } + } +} +``` + ## 写入指定列族 ```hocon diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java index 1255fbc1b6c..84188f47a91 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java @@ -32,6 +32,7 @@ import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG; +import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_TTL_CONFIG; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.QUERY_COLUMNS; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS; @@ -60,6 +61,8 @@ public class HbaseParameters implements Serializable { private Map hbaseExtraConfig; + @Builder.Default private Long ttl = HBASE_TTL_CONFIG.defaultValue(); + @Builder.Default private String rowkeyDelimiter = ROWKEY_DELIMITER.defaultValue(); @Builder.Default private HbaseConfig.NullMode nullMode = NULL_MODE.defaultValue(); @@ -88,6 +91,7 @@ public static HbaseParameters buildWithConfig(ReadonlyConfig config) { String encoding = String.valueOf(config.get(ENCODING)); builder.enCoding(HbaseConfig.EnCoding.valueOf(encoding.toUpperCase())); builder.hbaseExtraConfig(config.get(HBASE_EXTRA_CONFIG)); + builder.ttl(config.get(HBASE_TTL_CONFIG)); return builder.build(); } diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java index a00f442dd0e..638c3570ad7 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java @@ -119,6 +119,9 @@ private Put convertRowToPut(SeaTunnelRow row) { timestamp = (Long) row.getField(versionColumnIndex); } Put put = new Put(rowkey, timestamp); + if (hbaseParameters.getTtl() != -1 && hbaseParameters.getTtl() > 0) { + put.setTTL(hbaseParameters.getTtl()); + } if (!hbaseParameters.isWalWrite()) { put.setDurability(Durability.SKIP_WAL); }