From 74bff10c5beafa4ada2e71c785991a63ed98fbe6 Mon Sep 17 00:00:00 2001 From: Pradithya Aria Date: Wed, 30 Oct 2019 11:04:46 +0800 Subject: [PATCH] Remove redis transaction --- .../feast/store/serving/redis/RedisCustomIO.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java index 4dc53467c7..6884b86b61 100644 --- a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java +++ b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java @@ -138,12 +138,16 @@ public static class WriteDoFn extends DoFn { } public WriteDoFn withBatchSize(int batchSize) { - if (batchSize > 0) this.batchSize = batchSize; + if (batchSize > 0) { + this.batchSize = batchSize; + } return this; } public WriteDoFn withTimeout(int timeout) { - if (timeout > 0) this.timeout = timeout; + if (timeout > 0) { + this.timeout = timeout; + } return this; } @@ -155,7 +159,6 @@ public void setup() { @StartBundle public void startBundle() { pipeline = jedis.pipelined(); - pipeline.multi(); batchCount = 0; } @@ -168,9 +171,7 @@ public void processElement(ProcessContext context) { } batchCount++; if (batchCount >= batchSize) { - pipeline.exec(); pipeline.sync(); - pipeline.multi(); batchCount = 0; } } @@ -197,10 +198,7 @@ private Response writeRecord(RedisMutation mutation) { @FinishBundle public void finishBundle() { - if (pipeline.isInMulti()) { - pipeline.exec(); - pipeline.sync(); - } + pipeline.sync(); batchCount = 0; }