From b5321ff1d2aa60309d46286158c158ff67359ae3 Mon Sep 17 00:00:00 2001 From: lightzhao <40714172+lightzhao@users.noreply.github.com> Date: Mon, 14 Aug 2023 10:11:34 +0800 Subject: [PATCH] [Feature][Connector-v2][RedisSink]Support redis to set expiration time. (#4975) * Support redis to set expiration time. * Set redis expire default value. * add e2e test. * add e2e test. * modify config file name. --------- Co-authored-by: lightzhao --- docs/en/connector-v2/sink/Redis.md | 5 ++ .../seatunnel/redis/config/RedisConfig.java | 6 +++ .../seatunnel/redis/config/RedisDataType.java | 23 ++++++--- .../redis/config/RedisParameters.java | 4 ++ .../redis/sink/RedisSinkFactory.java | 3 +- .../seatunnel/redis/sink/RedisSinkWriter.java | 3 +- .../e2e/connector/redis/RedisIT.java | 11 ++++ .../test/resources/redis-to-redis-expire.conf | 50 +++++++++++++++++++ 8 files changed, 97 insertions(+), 8 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf diff --git a/docs/en/connector-v2/sink/Redis.md b/docs/en/connector-v2/sink/Redis.md index fcface7da22..7d2ef237e1c 100644 --- a/docs/en/connector-v2/sink/Redis.md +++ b/docs/en/connector-v2/sink/Redis.md @@ -23,6 +23,7 @@ Used to write data to Redis. | mode | string | no | single | | nodes | list | yes when mode=cluster | - | | format | string | no | json | +| expire | long | no | -1 | | common-options | | no | - | ### host [string] @@ -120,6 +121,10 @@ Connector will generate data as the following and write it to redis: ``` +### expire [long] + +Set redis expiration time, the unit is second. The default value is -1, keys do not automatically expire by default. + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java index c777d237827..511cbe4aa99 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java @@ -102,6 +102,12 @@ public enum HashKeyParseMode { .withDescription( "hash key parse mode, support all or kv, default value is all"); + public static final Option EXPIRE = + Options.key("expire") + .longType() + .defaultValue(-1L) + .withDescription("Set redis expiration time."); + public enum Format { JSON, // TEXT will be supported later diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java index 64772b5381d..a315e0cdae0 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisDataType.java @@ -30,8 +30,9 @@ public enum RedisDataType { KEY { @Override - public void set(Jedis jedis, String key, String value) { + public void set(Jedis jedis, String key, String value, long expire) { jedis.set(key, value); + expire(jedis, key, expire); } @Override @@ -41,9 +42,10 @@ public List get(Jedis jedis, String key) { }, HASH { @Override - public void set(Jedis jedis, String key, String value) { + public void set(Jedis jedis, String key, String value, long expire) { Map fieldsMap = JsonUtils.toMap(value); jedis.hset(key, fieldsMap); + expire(jedis, key, expire); } @Override @@ -54,8 +56,9 @@ public List get(Jedis jedis, String key) { }, LIST { @Override - public void set(Jedis jedis, String key, String value) { + public void set(Jedis jedis, String key, String value, long expire) { jedis.lpush(key, value); + expire(jedis, key, expire); } @Override @@ -65,8 +68,9 @@ public List get(Jedis jedis, String key) { }, SET { @Override - public void set(Jedis jedis, String key, String value) { + public void set(Jedis jedis, String key, String value, long expire) { jedis.sadd(key, value); + expire(jedis, key, expire); } @Override @@ -77,8 +81,9 @@ public List get(Jedis jedis, String key) { }, ZSET { @Override - public void set(Jedis jedis, String key, String value) { + public void set(Jedis jedis, String key, String value, long expire) { jedis.zadd(key, 1, value); + expire(jedis, key, expire); } @Override @@ -91,7 +96,13 @@ public List get(Jedis jedis, String key) { return Collections.emptyList(); } - public void set(Jedis jedis, String key, String value) { + private static void expire(Jedis jedis, String key, long expire) { + if (expire > 0) { + jedis.expire(key, expire); + } + } + + public void set(Jedis jedis, String key, String value, long expire) { // do nothing } } diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java index c8bb879d0f5..8954b4da2a1 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java @@ -47,6 +47,7 @@ public class RedisParameters implements Serializable { private RedisConfig.RedisMode mode; private RedisConfig.HashKeyParseMode hashKeyParseMode; private List redisNodes = Collections.emptyList(); + private long expire = RedisConfig.EXPIRE.defaultValue(); public void buildWithConfig(Config config) { // set host @@ -89,6 +90,9 @@ public void buildWithConfig(Config config) { if (config.hasPath(RedisConfig.KEY_PATTERN.key())) { this.keysPattern = config.getString(RedisConfig.KEY_PATTERN.key()); } + if (config.hasPath(RedisConfig.EXPIRE.key())) { + this.expire = config.getLong(RedisConfig.EXPIRE.key()); + } // set redis data type try { String dataType = config.getString(RedisConfig.DATA_TYPE.key()); diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java index e68a893f79c..22ae1568740 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java @@ -41,7 +41,8 @@ public OptionRule optionRule() { RedisConfig.AUTH, RedisConfig.USER, RedisConfig.KEY_PATTERN, - RedisConfig.FORMAT) + RedisConfig.FORMAT, + RedisConfig.EXPIRE) .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, RedisConfig.NODES) .build(); } diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java index 657e3aaa565..80b1449b9d6 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java @@ -59,7 +59,8 @@ public void write(SeaTunnelRow element) throws IOException { } else { key = keyField; } - redisDataType.set(jedis, key, data); + long expire = redisParameters.getExpire(); + redisDataType.set(jedis, key, data, expire); } @Override diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java index 808f6860337..bd4a9063ba1 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java @@ -192,4 +192,15 @@ public void testRedis(TestContainer container) throws IOException, InterruptedEx jedis.del("key_list"); Assertions.assertEquals(0, jedis.llen("key_list")); } + + @TestTemplate + public void testRedisWithExpire(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/redis-to-redis-expire.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertEquals(100, jedis.llen("key_list")); + // Clear data to prevent data duplication in the next TestContainer + Thread.sleep(60 * 1000); + Assertions.assertEquals(0, jedis.llen("key_list")); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf new file mode 100644 index 00000000000..4a42bd3a46a --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-expire.conf @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + keys = "key_test*" + data_type = key + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "key_list" + data_type = list + expire = 30 + } +} \ No newline at end of file