Skip to content

Commit

Permalink
fix ttl and doc
Browse files Browse the repository at this point in the history
  • Loading branch information
BruceWong96 committed Aug 12, 2024
1 parent a91f7f0 commit 945b883
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 1 deletion.
20 changes: 20 additions & 0 deletions docs/en/connector-v2/sink/Hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Output data to Hbase
| encoding | string | no | utf8 |
| hbase_extra_config | string | no | - |
| common-options | | no | - |
| ttl | long | no | - |

### zookeeper_quorum [string]

Expand Down Expand Up @@ -95,6 +96,10 @@ The encoding of string field, support [`utf8`, `gbk`], default `utf8`

The extra configuration of hbase

### ttl [long]

Hbase writes data TTL time, the default is based on the TTL set in the table, unit: milliseconds

### common options

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
Expand All @@ -114,6 +119,7 @@ Hbase {
```


### Multiple Table

```hocon
Expand Down Expand Up @@ -185,6 +191,20 @@ sink {
}
```

## Writes To The Specified Column Family

```hocon
Hbase {
zookeeper_quorum = "hbase_e2e:2181"
table = "assign_cf_table"
rowkey_column = ["id"]
family_name {
c_double = "cf1"
c_bigint = "cf2"
}
}
```

## Changelog

### next version
Expand Down
74 changes: 73 additions & 1 deletion docs/zh/connector-v2/sink/Hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
| 名称 | 类型 | 是否必须 | 默认值 |
|--------------------|---------|------|-----------------|
| zookeeper_quorum | string | yes | - |
| table | string | yes | - |
| table | string | no | - |
| rowkey_column | list | yes | - |
| family_name | config | yes | - |
| rowkey_delimiter | string | no | "" |
Expand Down Expand Up @@ -119,6 +119,78 @@ Hbase {
```


### 写入多表

```hocon
env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
tables_configs = [
{
schema = {
table = "hbase_sink_1"
fields {
name = STRING
c_string = STRING
c_double = DOUBLE
c_bigint = BIGINT
c_float = FLOAT
c_int = INT
c_smallint = SMALLINT
c_boolean = BOOLEAN
time = BIGINT
}
}
rows = [
{
kind = INSERT
fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 1627529632356]
}
]
},
{
schema = {
table = "hbase_sink_2"
fields {
name = STRING
c_string = STRING
c_double = DOUBLE
c_bigint = BIGINT
c_float = FLOAT
c_int = INT
c_smallint = SMALLINT
c_boolean = BOOLEAN
time = BIGINT
}
}
rows = [
{
kind = INSERT
fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true, 1627529632357]
}
]
}
]
}
}
sink {
Hbase {
zookeeper_quorum = "hbase:2181"
rowkey_column = ["name"]
family_name {
all_columns = info
}
}
}
```

## 写入指定列族

```hocon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_TTL_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.QUERY_COLUMNS;
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
Expand Down Expand Up @@ -60,6 +61,8 @@ public class HbaseParameters implements Serializable {

private Map<String, String> hbaseExtraConfig;

@Builder.Default private Long ttl = HBASE_TTL_CONFIG.defaultValue();

@Builder.Default private String rowkeyDelimiter = ROWKEY_DELIMITER.defaultValue();

@Builder.Default private HbaseConfig.NullMode nullMode = NULL_MODE.defaultValue();
Expand Down Expand Up @@ -88,6 +91,7 @@ public static HbaseParameters buildWithConfig(ReadonlyConfig config) {
String encoding = String.valueOf(config.get(ENCODING));
builder.enCoding(HbaseConfig.EnCoding.valueOf(encoding.toUpperCase()));
builder.hbaseExtraConfig(config.get(HBASE_EXTRA_CONFIG));
builder.ttl(config.get(HBASE_TTL_CONFIG));
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ private Put convertRowToPut(SeaTunnelRow row) {
timestamp = (Long) row.getField(versionColumnIndex);
}
Put put = new Put(rowkey, timestamp);
if (hbaseParameters.getTtl() != -1 && hbaseParameters.getTtl() > 0) {
put.setTTL(hbaseParameters.getTtl());
}
if (!hbaseParameters.isWalWrite()) {
put.setDurability(Durability.SKIP_WAL);
}
Expand Down

0 comments on commit 945b883

Please sign in to comment.