Skip to content

Commit

Permalink
[Feature][Connector-v2][RedisSink]Support redis to set expiration tim…
Browse files Browse the repository at this point in the history
…e. (#4975)

* Support redis to set expiration time.

* Set redis expire default value.

* add e2e test.

* add e2e test.

* modify config file name.

---------

Co-authored-by: lightzhao <zhaolianyong777@gmail.com>
  • Loading branch information
lightzhao and lightzhao authored Aug 14, 2023
1 parent ad4f1fc commit b5321ff
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 8 deletions.
5 changes: 5 additions & 0 deletions docs/en/connector-v2/sink/Redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Used to write data to Redis.
| mode | string | no | single |
| nodes | list | yes when mode=cluster | - |
| format | string | no | json |
| expire | long | no | -1 |
| common-options | | no | - |

### host [string]
Expand Down Expand Up @@ -120,6 +121,10 @@ Connector will generate data as the following and write it to redis:

```

### expire [long]

Set redis expiration time, the unit is second. The default value is -1, keys do not automatically expire by default.

### common options

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ public enum HashKeyParseMode {
.withDescription(
"hash key parse mode, support all or kv, default value is all");

public static final Option<Long> EXPIRE =
Options.key("expire")
.longType()
.defaultValue(-1L)
.withDescription("Set redis expiration time.");

public enum Format {
JSON,
// TEXT will be supported later
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@
public enum RedisDataType {
KEY {
@Override
public void set(Jedis jedis, String key, String value) {
public void set(Jedis jedis, String key, String value, long expire) {
jedis.set(key, value);
expire(jedis, key, expire);
}

@Override
Expand All @@ -41,9 +42,10 @@ public List<String> get(Jedis jedis, String key) {
},
HASH {
@Override
public void set(Jedis jedis, String key, String value) {
public void set(Jedis jedis, String key, String value, long expire) {
Map<String, String> fieldsMap = JsonUtils.toMap(value);
jedis.hset(key, fieldsMap);
expire(jedis, key, expire);
}

@Override
Expand All @@ -54,8 +56,9 @@ public List<String> get(Jedis jedis, String key) {
},
LIST {
@Override
public void set(Jedis jedis, String key, String value) {
public void set(Jedis jedis, String key, String value, long expire) {
jedis.lpush(key, value);
expire(jedis, key, expire);
}

@Override
Expand All @@ -65,8 +68,9 @@ public List<String> get(Jedis jedis, String key) {
},
SET {
@Override
public void set(Jedis jedis, String key, String value) {
public void set(Jedis jedis, String key, String value, long expire) {
jedis.sadd(key, value);
expire(jedis, key, expire);
}

@Override
Expand All @@ -77,8 +81,9 @@ public List<String> get(Jedis jedis, String key) {
},
ZSET {
@Override
public void set(Jedis jedis, String key, String value) {
public void set(Jedis jedis, String key, String value, long expire) {
jedis.zadd(key, 1, value);
expire(jedis, key, expire);
}

@Override
Expand All @@ -91,7 +96,13 @@ public List<String> get(Jedis jedis, String key) {
return Collections.emptyList();
}

public void set(Jedis jedis, String key, String value) {
private static void expire(Jedis jedis, String key, long expire) {
if (expire > 0) {
jedis.expire(key, expire);
}
}

public void set(Jedis jedis, String key, String value, long expire) {
// do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class RedisParameters implements Serializable {
private RedisConfig.RedisMode mode;
private RedisConfig.HashKeyParseMode hashKeyParseMode;
private List<String> redisNodes = Collections.emptyList();
private long expire = RedisConfig.EXPIRE.defaultValue();

public void buildWithConfig(Config config) {
// set host
Expand Down Expand Up @@ -89,6 +90,9 @@ public void buildWithConfig(Config config) {
if (config.hasPath(RedisConfig.KEY_PATTERN.key())) {
this.keysPattern = config.getString(RedisConfig.KEY_PATTERN.key());
}
if (config.hasPath(RedisConfig.EXPIRE.key())) {
this.expire = config.getLong(RedisConfig.EXPIRE.key());
}
// set redis data type
try {
String dataType = config.getString(RedisConfig.DATA_TYPE.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public OptionRule optionRule() {
RedisConfig.AUTH,
RedisConfig.USER,
RedisConfig.KEY_PATTERN,
RedisConfig.FORMAT)
RedisConfig.FORMAT,
RedisConfig.EXPIRE)
.conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, RedisConfig.NODES)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public void write(SeaTunnelRow element) throws IOException {
} else {
key = keyField;
}
redisDataType.set(jedis, key, data);
long expire = redisParameters.getExpire();
redisDataType.set(jedis, key, data, expire);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,15 @@ public void testRedis(TestContainer container) throws IOException, InterruptedEx
jedis.del("key_list");
Assertions.assertEquals(0, jedis.llen("key_list"));
}

@TestTemplate
public void testRedisWithExpire(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/redis-to-redis-expire.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(100, jedis.llen("key_list"));
// Clear data to prevent data duplication in the next TestContainer
Thread.sleep(60 * 1000);
Assertions.assertEquals(0, jedis.llen("key_list"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
execution.parallelism = 1
job.mode = "BATCH"
shade.identifier = "base64"

#spark config
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
Redis {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
keys = "key_test*"
data_type = key
}
}

sink {
Redis {
host = "redis-e2e"
port = 6379
auth = "U2VhVHVubmVs"
key = "key_list"
data_type = list
expire = 30
}
}

0 comments on commit b5321ff

Please sign in to comment.