From cc884b4480c50555d8a9f28e0629d138d0d9bd36 Mon Sep 17 00:00:00 2001 From: Khor Shu Heng <32997938+khorshuheng@users.noreply.github.com> Date: Wed, 8 Jan 2020 16:00:40 +0800 Subject: [PATCH] Fix null pointer exception in Dataflow Runner due to unserializable backoff (#417) --- .../java/feast/core/service/SpecService.java | 4 +- .../java/feast/core/util/PackageUtil.java | 6 +- .../ingestion/transform/WriteToStore.java | 15 +- .../transform/fn/ValidateFeatureRowDoFn.java | 7 +- .../java/feast/retry/BackOffExecutor.java | 66 +++++--- .../src/main/java/feast/retry/Retriable.java | 24 ++- .../store/serving/redis/RedisCustomIO.java | 117 ++++++++------- .../transform/ValidateFeatureRowsTest.java | 14 +- .../serving/redis/RedisCustomIOTest.java | 141 ++++++++++-------- 9 files changed, 228 insertions(+), 166 deletions(-) diff --git a/core/src/main/java/feast/core/service/SpecService.java b/core/src/main/java/feast/core/service/SpecService.java index 1d6ce16de5..129fa68a82 100644 --- a/core/src/main/java/feast/core/service/SpecService.java +++ b/core/src/main/java/feast/core/service/SpecService.java @@ -143,8 +143,8 @@ public GetFeatureSetResponse getFeatureSet(GetFeatureSetRequest request) { * possible if a project name is not set explicitly * *
The version field can be one of - '*' - This will match all versions - 'latest' - This will - * match the latest feature set version - '<number>' - This will match a specific feature set - * version. This property can only be set if both the feature set name and project name are + * match the latest feature set version - '<number>' - This will match a specific feature + * set version. This property can only be set if both the feature set name and project name are * explicitly set. * * @param filter filter containing the desired featureSet name and version filter diff --git a/core/src/main/java/feast/core/util/PackageUtil.java b/core/src/main/java/feast/core/util/PackageUtil.java index 20b2310644..99c5d73ba7 100644 --- a/core/src/main/java/feast/core/util/PackageUtil.java +++ b/core/src/main/java/feast/core/util/PackageUtil.java @@ -44,9 +44,9 @@ public class PackageUtil { * points to the resource location. Note that the extraction process can take several minutes to * complete. * - *
One use case of this function is to detect the class path of resources to stage when - * using Dataflow runner. The resource URL however is in "jar:file:" format, which cannot be - * handled by default in Apache Beam. + *
One use case of this function is to detect the class path of resources to stage when using + * Dataflow runner. The resource URL however is in "jar:file:" format, which cannot be handled by + * default in Apache Beam. * *
*
diff --git a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java
index 778540595a..b7901c2f90 100644
--- a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java
+++ b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java
@@ -89,15 +89,14 @@ public PDone expand(PCollection input) {
switch (storeType) {
case REDIS:
RedisConfig redisConfig = getStore().getRedisConfig();
- PCollection redisWriteResult = input
- .apply(
- "FeatureRowToRedisMutation",
- ParDo.of(new FeatureRowToRedisMutationDoFn(getFeatureSets())))
- .apply(
- "WriteRedisMutationToRedis",
- RedisCustomIO.write(redisConfig));
+ PCollection redisWriteResult =
+ input
+ .apply(
+ "FeatureRowToRedisMutation",
+ ParDo.of(new FeatureRowToRedisMutationDoFn(getFeatureSets())))
+ .apply("WriteRedisMutationToRedis", RedisCustomIO.write(redisConfig));
if (options.getDeadLetterTableSpec() != null) {
- redisWriteResult.apply(
+ redisWriteResult.apply(
WriteFailedElementToBigQuery.newBuilder()
.setTableSpec(options.getDeadLetterTableSpec())
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
diff --git a/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java
index 7d61a62f3f..c31d3c535e 100644
--- a/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java
+++ b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java
@@ -24,10 +24,8 @@
import feast.types.FieldProto;
import feast.types.ValueProto.Value.ValCase;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.TupleTag;
@@ -111,10 +109,7 @@ public void processElement(ProcessContext context) {
}
context.output(getFailureTag(), failedElement.build());
} else {
- featureRow = featureRow.toBuilder()
- .clearFields()
- .addAllFields(fields)
- .build();
+ featureRow = featureRow.toBuilder().clearFields().addAllFields(fields).build();
context.output(getSuccessTag(), featureRow);
}
}
diff --git a/ingestion/src/main/java/feast/retry/BackOffExecutor.java b/ingestion/src/main/java/feast/retry/BackOffExecutor.java
index 7e38a3cf70..344c65ac42 100644
--- a/ingestion/src/main/java/feast/retry/BackOffExecutor.java
+++ b/ingestion/src/main/java/feast/retry/BackOffExecutor.java
@@ -1,38 +1,58 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package feast.retry;
+import java.io.Serializable;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.joda.time.Duration;
-import java.io.IOException;
-import java.io.Serializable;
-
public class BackOffExecutor implements Serializable {
- private static FluentBackoff backoff;
+ private final Integer maxRetries;
+ private final Duration initialBackOff;
- public BackOffExecutor(Integer maxRetries, Duration initialBackOff) {
- backoff = FluentBackoff.DEFAULT
- .withMaxRetries(maxRetries)
- .withInitialBackoff(initialBackOff);
- }
+ public BackOffExecutor(Integer maxRetries, Duration initialBackOff) {
+ this.maxRetries = maxRetries;
+ this.initialBackOff = initialBackOff;
+ }
+
+ public void execute(Retriable retriable) throws Exception {
+ FluentBackoff backoff =
+ FluentBackoff.DEFAULT.withMaxRetries(maxRetries).withInitialBackoff(initialBackOff);
+ execute(retriable, backoff);
+ }
- public void execute(Retriable retriable) throws Exception {
- Sleeper sleeper = Sleeper.DEFAULT;
- BackOff backOff = backoff.backoff();
- while(true) {
- try {
- retriable.execute();
- break;
- } catch (Exception e) {
- if(retriable.isExceptionRetriable(e) && BackOffUtils.next(sleeper, backOff)) {
- retriable.cleanUpAfterFailure();
- } else {
- throw e;
- }
- }
+ private void execute(Retriable retriable, FluentBackoff backoff) throws Exception {
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backOff = backoff.backoff();
+ while (true) {
+ try {
+ retriable.execute();
+ break;
+ } catch (Exception e) {
+ if (retriable.isExceptionRetriable(e) && BackOffUtils.next(sleeper, backOff)) {
+ retriable.cleanUpAfterFailure();
+ } else {
+ throw e;
}
+ }
}
+ }
}
diff --git a/ingestion/src/main/java/feast/retry/Retriable.java b/ingestion/src/main/java/feast/retry/Retriable.java
index 8fd76fedbb..0a788fcdd6 100644
--- a/ingestion/src/main/java/feast/retry/Retriable.java
+++ b/ingestion/src/main/java/feast/retry/Retriable.java
@@ -1,7 +1,25 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package feast.retry;
public interface Retriable {
- void execute();
- Boolean isExceptionRetriable(Exception e);
- void cleanUpAfterFailure();
+ void execute();
+
+ Boolean isExceptionRetriable(Exception e);
+
+ void cleanUpAfterFailure();
}
diff --git a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java
index 20afc43d76..8c142b66c9 100644
--- a/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java
+++ b/ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java
@@ -20,6 +20,9 @@
import feast.ingestion.values.FailedElement;
import feast.retry.BackOffExecutor;
import feast.retry.Retriable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
@@ -38,10 +41,6 @@
import redis.clients.jedis.Response;
import redis.clients.jedis.exceptions.JedisConnectionException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
public class RedisCustomIO {
private static final int DEFAULT_BATCH_SIZE = 1000;
@@ -164,7 +163,8 @@ public void setScore(@Nullable Long score) {
}
/** ServingStoreWrite data to a Redis server. */
- public static class Write extends PTransform, PCollection> {
+ public static class Write
+ extends PTransform, PCollection> {
private WriteDoFn dofn;
@@ -202,9 +202,10 @@ public static class WriteDoFn extends DoFn {
WriteDoFn(StoreProto.Store.RedisConfig redisConfig) {
this.host = redisConfig.getHost();
this.port = redisConfig.getPort();
- long backoffMs = redisConfig.getInitialBackoffMs() > 0 ? redisConfig.getInitialBackoffMs() : 1;
- this.backOffExecutor = new BackOffExecutor(redisConfig.getMaxRetries(),
- Duration.millis(backoffMs));
+ long backoffMs =
+ redisConfig.getInitialBackoffMs() > 0 ? redisConfig.getInitialBackoffMs() : 1;
+ this.backOffExecutor =
+ new BackOffExecutor(redisConfig.getMaxRetries(), Duration.millis(backoffMs));
}
public WriteDoFn withBatchSize(int batchSize) {
@@ -233,47 +234,50 @@ public void startBundle() {
}
private void executeBatch() throws Exception {
- backOffExecutor.execute(new Retriable() {
- @Override
- public void execute() {
- pipeline.multi();
- mutations.forEach(mutation -> {
- writeRecord(mutation);
- if (mutation.getExpiryMillis() != null && mutation.getExpiryMillis() > 0) {
- pipeline.pexpire(mutation.getKey(), mutation.getExpiryMillis());
+ backOffExecutor.execute(
+ new Retriable() {
+ @Override
+ public void execute() {
+ pipeline.multi();
+ mutations.forEach(
+ mutation -> {
+ writeRecord(mutation);
+ if (mutation.getExpiryMillis() != null && mutation.getExpiryMillis() > 0) {
+ pipeline.pexpire(mutation.getKey(), mutation.getExpiryMillis());
+ }
+ });
+ pipeline.exec();
+ pipeline.sync();
+ mutations.clear();
}
- });
- pipeline.exec();
- pipeline.sync();
- mutations.clear();
- }
- @Override
- public Boolean isExceptionRetriable(Exception e) {
- return e instanceof JedisConnectionException;
- }
+ @Override
+ public Boolean isExceptionRetriable(Exception e) {
+ return e instanceof JedisConnectionException;
+ }
- @Override
- public void cleanUpAfterFailure() {
- try {
- pipeline.close();
- } catch (IOException e) {
- log.error(String.format("Error while closing pipeline: %s", e.getMessage()));
- }
- jedis = new Jedis(host, port, timeout);
- pipeline = jedis.pipelined();
- }
- });
+ @Override
+ public void cleanUpAfterFailure() {
+ try {
+ pipeline.close();
+ } catch (IOException e) {
+ log.error(String.format("Error while closing pipeline: %s", e.getMessage()));
+ }
+ jedis = new Jedis(host, port, timeout);
+ pipeline = jedis.pipelined();
+ }
+ });
}
- private FailedElement toFailedElement(RedisMutation mutation, Exception exception, String jobName) {
+ private FailedElement toFailedElement(
+ RedisMutation mutation, Exception exception, String jobName) {
return FailedElement.newBuilder()
- .setJobName(jobName)
- .setTransformName("RedisCustomIO")
- .setPayload(mutation.getValue().toString())
- .setErrorMessage(exception.getMessage())
- .setStackTrace(ExceptionUtils.getStackTrace(exception))
- .build();
+ .setJobName(jobName)
+ .setTransformName("RedisCustomIO")
+ .setPayload(mutation.getValue().toString())
+ .setErrorMessage(exception.getMessage())
+ .setStackTrace(ExceptionUtils.getStackTrace(exception))
+ .build();
}
@ProcessElement
@@ -284,11 +288,12 @@ public void processElement(ProcessContext context) {
try {
executeBatch();
} catch (Exception e) {
- mutations.forEach(failedMutation -> {
- FailedElement failedElement = toFailedElement(
- failedMutation, e, context.getPipelineOptions().getJobName());
- context.output(failedElement);
- });
+ mutations.forEach(
+ failedMutation -> {
+ FailedElement failedElement =
+ toFailedElement(failedMutation, e, context.getPipelineOptions().getJobName());
+ context.output(failedElement);
+ });
mutations.clear();
}
}
@@ -315,16 +320,18 @@ private Response> writeRecord(RedisMutation mutation) {
}
@FinishBundle
- public void finishBundle(FinishBundleContext context) throws IOException, InterruptedException {
- if(mutations.size() > 0) {
+ public void finishBundle(FinishBundleContext context)
+ throws IOException, InterruptedException {
+ if (mutations.size() > 0) {
try {
executeBatch();
} catch (Exception e) {
- mutations.forEach(failedMutation -> {
- FailedElement failedElement = toFailedElement(
- failedMutation, e, context.getPipelineOptions().getJobName());
- context.output(failedElement, Instant.now(), GlobalWindow.INSTANCE);
- });
+ mutations.forEach(
+ failedMutation -> {
+ FailedElement failedElement =
+ toFailedElement(failedMutation, e, context.getPipelineOptions().getJobName());
+ context.output(failedElement, Instant.now(), GlobalWindow.INSTANCE);
+ });
mutations.clear();
}
}
diff --git a/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java b/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java
index aca3956387..5c9860ed97 100644
--- a/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java
+++ b/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java
@@ -180,12 +180,14 @@ public void shouldExcludeUnregisteredFields() {
FeatureRow randomRow = TestUtil.createRandomFeatureRow(fs1);
expected.add(randomRow);
- input.add(randomRow.toBuilder()
- .addFields(Field.newBuilder()
- .setName("extra")
- .setValue(Value.newBuilder().setStringVal("hello")))
- .build()
- );
+ input.add(
+ randomRow
+ .toBuilder()
+ .addFields(
+ Field.newBuilder()
+ .setName("extra")
+ .setValue(Value.newBuilder().setStringVal("hello")))
+ .build());
PCollectionTuple output =
p.apply(Create.of(input))
diff --git a/ingestion/src/test/java/feast/store/serving/redis/RedisCustomIOTest.java b/ingestion/src/test/java/feast/store/serving/redis/RedisCustomIOTest.java
index 94167059b4..fc17f6207f 100644
--- a/ingestion/src/test/java/feast/store/serving/redis/RedisCustomIOTest.java
+++ b/ingestion/src/test/java/feast/store/serving/redis/RedisCustomIOTest.java
@@ -16,12 +16,24 @@
*/
package feast.store.serving.redis;
+import static feast.test.TestUtil.field;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
import feast.core.StoreProto;
import feast.storage.RedisProto.RedisKey;
import feast.store.serving.redis.RedisCustomIO.Method;
import feast.store.serving.redis.RedisCustomIO.RedisMutation;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.ValueProto.ValueType.Enum;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
@@ -35,29 +47,14 @@
import redis.embedded.Redis;
import redis.embedded.RedisServer;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static feast.test.TestUtil.field;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-
public class RedisCustomIOTest {
- @Rule
- public transient TestPipeline p = TestPipeline.create();
+ @Rule public transient TestPipeline p = TestPipeline.create();
private static String REDIS_HOST = "localhost";
private static int REDIS_PORT = 51234;
private Redis redis;
private Jedis jedis;
-
@Before
public void setUp() throws IOException {
redis = new RedisServer(REDIS_PORT);
@@ -72,10 +69,8 @@ public void teardown() {
@Test
public void shouldWriteToRedis() {
- StoreProto.Store.RedisConfig redisConfig = StoreProto.Store.RedisConfig.newBuilder()
- .setHost(REDIS_HOST)
- .setPort(REDIS_PORT)
- .build();
+ StoreProto.Store.RedisConfig redisConfig =
+ StoreProto.Store.RedisConfig.newBuilder().setHost(REDIS_HOST).setPort(REDIS_PORT).build();
HashMap kvs = new LinkedHashMap<>();
kvs.put(
RedisKey.newBuilder()
@@ -110,8 +105,7 @@ public void shouldWriteToRedis() {
null))
.collect(Collectors.toList());
- p.apply(Create.of(featureRowWrites))
- .apply(RedisCustomIO.write(redisConfig));
+ p.apply(Create.of(featureRowWrites)).apply(RedisCustomIO.write(redisConfig));
p.run();
kvs.forEach(
@@ -123,68 +117,95 @@ public void shouldWriteToRedis() {
@Test(timeout = 10000)
public void shouldRetryFailConnection() throws InterruptedException {
- StoreProto.Store.RedisConfig redisConfig = StoreProto.Store.RedisConfig.newBuilder()
+ StoreProto.Store.RedisConfig redisConfig =
+ StoreProto.Store.RedisConfig.newBuilder()
.setHost(REDIS_HOST)
.setPort(REDIS_PORT)
.setMaxRetries(4)
.setInitialBackoffMs(2000)
.build();
HashMap kvs = new LinkedHashMap<>();
- kvs.put(RedisKey.newBuilder().setFeatureSet("fs:1")
- .addEntities(field("entity", 1, Enum.INT64)).build(),
- FeatureRow.newBuilder().setFeatureSet("fs:1")
- .addFields(field("entity", 1, Enum.INT64))
- .addFields(field("feature", "one", Enum.STRING)).build());
-
- List featureRowWrites = kvs.entrySet().stream()
- .map(kv -> new RedisMutation(Method.SET, kv.getKey().toByteArray(),
- kv.getValue().toByteArray(),
- null, null)
- )
+ kvs.put(
+ RedisKey.newBuilder()
+ .setFeatureSet("fs:1")
+ .addEntities(field("entity", 1, Enum.INT64))
+ .build(),
+ FeatureRow.newBuilder()
+ .setFeatureSet("fs:1")
+ .addFields(field("entity", 1, Enum.INT64))
+ .addFields(field("feature", "one", Enum.STRING))
+ .build());
+
+ List featureRowWrites =
+ kvs.entrySet().stream()
+ .map(
+ kv ->
+ new RedisMutation(
+ Method.SET,
+ kv.getKey().toByteArray(),
+ kv.getValue().toByteArray(),
+ null,
+ null))
.collect(Collectors.toList());
- PCollection failedElementCount = p.apply(Create.of(featureRowWrites))
- .apply(RedisCustomIO.write(redisConfig))
- .apply(Count.globally());
+ PCollection failedElementCount =
+ p.apply(Create.of(featureRowWrites))
+ .apply(RedisCustomIO.write(redisConfig))
+ .apply(Count.globally());
redis.stop();
final ScheduledThreadPoolExecutor redisRestartExecutor = new ScheduledThreadPoolExecutor(1);
- ScheduledFuture> scheduledRedisRestart = redisRestartExecutor.schedule(() -> {
- redis.start();
- }, 3, TimeUnit.SECONDS);
+ ScheduledFuture> scheduledRedisRestart =
+ redisRestartExecutor.schedule(
+ () -> {
+ redis.start();
+ },
+ 3,
+ TimeUnit.SECONDS);
PAssert.that(failedElementCount).containsInAnyOrder(0L);
p.run();
scheduledRedisRestart.cancel(true);
- kvs.forEach((key, value) -> {
- byte[] actual = jedis.get(key.toByteArray());
- assertThat(actual, equalTo(value.toByteArray()));
- });
+ kvs.forEach(
+ (key, value) -> {
+ byte[] actual = jedis.get(key.toByteArray());
+ assertThat(actual, equalTo(value.toByteArray()));
+ });
}
@Test
public void shouldProduceFailedElementIfRetryExceeded() {
- StoreProto.Store.RedisConfig redisConfig = StoreProto.Store.RedisConfig.newBuilder()
- .setHost(REDIS_HOST)
- .setPort(REDIS_PORT)
- .build();
+ StoreProto.Store.RedisConfig redisConfig =
+ StoreProto.Store.RedisConfig.newBuilder().setHost(REDIS_HOST).setPort(REDIS_PORT).build();
HashMap kvs = new LinkedHashMap<>();
- kvs.put(RedisKey.newBuilder().setFeatureSet("fs:1")
- .addEntities(field("entity", 1, Enum.INT64)).build(),
- FeatureRow.newBuilder().setFeatureSet("fs:1")
+ kvs.put(
+ RedisKey.newBuilder()
+ .setFeatureSet("fs:1")
+ .addEntities(field("entity", 1, Enum.INT64))
+ .build(),
+ FeatureRow.newBuilder()
+ .setFeatureSet("fs:1")
.addFields(field("entity", 1, Enum.INT64))
- .addFields(field("feature", "one", Enum.STRING)).build());
+ .addFields(field("feature", "one", Enum.STRING))
+ .build());
- List featureRowWrites = kvs.entrySet().stream()
- .map(kv -> new RedisMutation(Method.SET, kv.getKey().toByteArray(),
- kv.getValue().toByteArray(),
- null, null)
- ).collect(Collectors.toList());
+ List featureRowWrites =
+ kvs.entrySet().stream()
+ .map(
+ kv ->
+ new RedisMutation(
+ Method.SET,
+ kv.getKey().toByteArray(),
+ kv.getValue().toByteArray(),
+ null,
+ null))
+ .collect(Collectors.toList());
- PCollection failedElementCount = p.apply(Create.of(featureRowWrites))
- .apply(RedisCustomIO.write(redisConfig))
- .apply(Count.globally());
+ PCollection failedElementCount =
+ p.apply(Create.of(featureRowWrites))
+ .apply(RedisCustomIO.write(redisConfig))
+ .apply(Count.globally());
redis.stop();
PAssert.that(failedElementCount).containsInAnyOrder(1L);