Skip to content

Commit

Permalink
[feature][connector-v2-hbase-sink] Support Connector v2 HBase sink TT…
Browse files Browse the repository at this point in the history
…L data writing (apache#7116)
  • Loading branch information
zhangshenghang authored and chaorongzhi committed Aug 21, 2024
1 parent 42b9d77 commit 67c1d81
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 0 deletions.
5 changes: 5 additions & 0 deletions docs/en/connector-v2/sink/Hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Output data to Hbase
| encoding | string | no | utf8 |
| hbase_extra_config | string | no | - |
| common-options | | no | - |
| ttl | long | no | - |

### zookeeper_quorum [string]

Expand Down Expand Up @@ -95,6 +96,10 @@ The encoding of string field, support [`utf8`, `gbk`], default `utf8`

The extra configuration of hbase

### ttl [long]

Hbase writes data TTL time, the default is based on the TTL set in the table, unit: milliseconds

### common options

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
Expand Down
5 changes: 5 additions & 0 deletions docs/zh/connector-v2/sink/Hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
| encoding | string | no | utf8 |
| hbase_extra_config | string | no | - |
| common-options | | no | - |
| ttl | long | no | - |

### zookeeper_quorum [string]

Expand Down Expand Up @@ -95,6 +96,10 @@ hbase 客户端的写入缓冲区大小,默认 8 * 1024 * 1024

hbase扩展配置

### ttl [long]

hbase 写入数据 TTL 时间,默认以表设置的TTL为准,单位毫秒

### 常见选项

Sink 插件常用参数,详见 Sink 常用选项 [Sink Common Options](common-options.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ public class HbaseConfig {
.noDefaultValue()
.withDescription("Hbase extra config");

public static final Option<Long> HBASE_TTL_CONFIG =
Options.key("ttl")
.longType()
.defaultValue(-1L)
.withDescription(
"The expiration time configuration for writing hbase data. The default value is -1, indicating no expiration time.");

public enum NullMode {
SKIP,
EMPTY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_TTL_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.QUERY_COLUMNS;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
Expand Down Expand Up @@ -59,6 +60,8 @@ public class HbaseParameters implements Serializable {

private Map<String, String> hbaseExtraConfig;

@Builder.Default private Long ttl = HBASE_TTL_CONFIG.defaultValue();

@Builder.Default private String rowkeyDelimiter = ROWKEY_DELIMITER.defaultValue();

@Builder.Default private HbaseConfig.NullMode nullMode = NULL_MODE.defaultValue();
Expand All @@ -80,6 +83,9 @@ public static HbaseParameters buildWithConfig(Config pluginConfig) {
TypesafeConfigUtils.configToMap(pluginConfig.getConfig(FAMILY_NAME.key())));

// optional parameters
if (pluginConfig.hasPath(HBASE_TTL_CONFIG.key())) {
builder.ttl(pluginConfig.getLong(HBASE_TTL_CONFIG.key()));
}
if (pluginConfig.hasPath(ROWKEY_DELIMITER.key())) {
builder.rowkeyDelimiter(pluginConfig.getString(ROWKEY_DELIMITER.key()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ private Put convertRowToPut(SeaTunnelRow row) {
timestamp = (Long) row.getField(versionColumnIndex);
}
Put put = new Put(rowkey, timestamp);
if (hbaseParameters.getTtl() != -1 && hbaseParameters.getTtl() > 0) {
put.setTTL(hbaseParameters.getTtl());
}
if (!hbaseParameters.isWalWrite()) {
put.setDurability(Durability.SKIP_WAL);
}
Expand Down

0 comments on commit 67c1d81

Please sign in to comment.