Skip to content

Commit

Permalink
Fix null pointer exception in Dataflow Runner due to unserializable b…
Browse files Browse the repository at this point in the history
…ackoff (#417)
  • Loading branch information
khorshuheng authored and Shu Heng committed Jan 8, 2020
1 parent ec6b26b commit cc884b4
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 166 deletions.
4 changes: 2 additions & 2 deletions core/src/main/java/feast/core/service/SpecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ public GetFeatureSetResponse getFeatureSet(GetFeatureSetRequest request) {
* possible if a project name is not set explicitly
*
* <p>The version field can be one of - '*' - This will match all versions - 'latest' - This will
* match the latest feature set version - '&lt;number&gt;' - This will match a specific feature set
* version. This property can only be set if both the feature set name and project name are
* match the latest feature set version - '&lt;number&gt;' - This will match a specific feature
* set version. This property can only be set if both the feature set name and project name are
* explicitly set.
*
* @param filter filter containing the desired featureSet name and version filter
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/feast/core/util/PackageUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public class PackageUtil {
* points to the resource location. Note that the extraction process can take several minutes to
* complete.
*
* <p>One use case of this function is to detect the class path of resources to stage when
* using Dataflow runner. The resource URL however is in "jar:file:" format, which cannot be
* handled by default in Apache Beam.
* <p>One use case of this function is to detect the class path of resources to stage when using
* Dataflow runner. The resource URL however is in "jar:file:" format, which cannot be handled by
* default in Apache Beam.
*
* <pre>
* <code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,14 @@ public PDone expand(PCollection<FeatureRow> input) {
switch (storeType) {
case REDIS:
RedisConfig redisConfig = getStore().getRedisConfig();
PCollection<FailedElement> redisWriteResult = input
.apply(
"FeatureRowToRedisMutation",
ParDo.of(new FeatureRowToRedisMutationDoFn(getFeatureSets())))
.apply(
"WriteRedisMutationToRedis",
RedisCustomIO.write(redisConfig));
PCollection<FailedElement> redisWriteResult =
input
.apply(
"FeatureRowToRedisMutation",
ParDo.of(new FeatureRowToRedisMutationDoFn(getFeatureSets())))
.apply("WriteRedisMutationToRedis", RedisCustomIO.write(redisConfig));
if (options.getDeadLetterTableSpec() != null) {
redisWriteResult.apply(
redisWriteResult.apply(
WriteFailedElementToBigQuery.newBuilder()
.setTableSpec(options.getDeadLetterTableSpec())
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import feast.types.FieldProto;
import feast.types.ValueProto.Value.ValCase;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.TupleTag;

Expand Down Expand Up @@ -111,10 +109,7 @@ public void processElement(ProcessContext context) {
}
context.output(getFailureTag(), failedElement.build());
} else {
featureRow = featureRow.toBuilder()
.clearFields()
.addAllFields(fields)
.build();
featureRow = featureRow.toBuilder().clearFields().addAllFields(fields).build();
context.output(getSuccessTag(), featureRow);
}
}
Expand Down
66 changes: 43 additions & 23 deletions ingestion/src/main/java/feast/retry/BackOffExecutor.java
Original file line number Diff line number Diff line change
@@ -1,38 +1,58 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.retry;

import java.io.Serializable;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.joda.time.Duration;

import java.io.IOException;
import java.io.Serializable;

public class BackOffExecutor implements Serializable {

private static FluentBackoff backoff;
private final Integer maxRetries;
private final Duration initialBackOff;

public BackOffExecutor(Integer maxRetries, Duration initialBackOff) {
backoff = FluentBackoff.DEFAULT
.withMaxRetries(maxRetries)
.withInitialBackoff(initialBackOff);
}
public BackOffExecutor(Integer maxRetries, Duration initialBackOff) {
this.maxRetries = maxRetries;
this.initialBackOff = initialBackOff;
}

public void execute(Retriable retriable) throws Exception {
FluentBackoff backoff =
FluentBackoff.DEFAULT.withMaxRetries(maxRetries).withInitialBackoff(initialBackOff);
execute(retriable, backoff);
}

public void execute(Retriable retriable) throws Exception {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backOff = backoff.backoff();
while(true) {
try {
retriable.execute();
break;
} catch (Exception e) {
if(retriable.isExceptionRetriable(e) && BackOffUtils.next(sleeper, backOff)) {
retriable.cleanUpAfterFailure();
} else {
throw e;
}
}
private void execute(Retriable retriable, FluentBackoff backoff) throws Exception {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backOff = backoff.backoff();
while (true) {
try {
retriable.execute();
break;
} catch (Exception e) {
if (retriable.isExceptionRetriable(e) && BackOffUtils.next(sleeper, backOff)) {
retriable.cleanUpAfterFailure();
} else {
throw e;
}
}
}
}
}
24 changes: 21 additions & 3 deletions ingestion/src/main/java/feast/retry/Retriable.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.retry;

public interface Retriable {
void execute();
Boolean isExceptionRetriable(Exception e);
void cleanUpAfterFailure();
void execute();

Boolean isExceptionRetriable(Exception e);

void cleanUpAfterFailure();
}
117 changes: 62 additions & 55 deletions ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import feast.ingestion.values.FailedElement;
import feast.retry.BackOffExecutor;
import feast.retry.Retriable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
Expand All @@ -38,10 +41,6 @@
import redis.clients.jedis.Response;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class RedisCustomIO {

private static final int DEFAULT_BATCH_SIZE = 1000;
Expand Down Expand Up @@ -164,7 +163,8 @@ public void setScore(@Nullable Long score) {
}

/** ServingStoreWrite data to a Redis server. */
public static class Write extends PTransform<PCollection<RedisMutation>, PCollection<FailedElement>> {
public static class Write
extends PTransform<PCollection<RedisMutation>, PCollection<FailedElement>> {

private WriteDoFn dofn;

Expand Down Expand Up @@ -202,9 +202,10 @@ public static class WriteDoFn extends DoFn<RedisMutation, FailedElement> {
WriteDoFn(StoreProto.Store.RedisConfig redisConfig) {
this.host = redisConfig.getHost();
this.port = redisConfig.getPort();
long backoffMs = redisConfig.getInitialBackoffMs() > 0 ? redisConfig.getInitialBackoffMs() : 1;
this.backOffExecutor = new BackOffExecutor(redisConfig.getMaxRetries(),
Duration.millis(backoffMs));
long backoffMs =
redisConfig.getInitialBackoffMs() > 0 ? redisConfig.getInitialBackoffMs() : 1;
this.backOffExecutor =
new BackOffExecutor(redisConfig.getMaxRetries(), Duration.millis(backoffMs));
}

public WriteDoFn withBatchSize(int batchSize) {
Expand Down Expand Up @@ -233,47 +234,50 @@ public void startBundle() {
}

private void executeBatch() throws Exception {
backOffExecutor.execute(new Retriable() {
@Override
public void execute() {
pipeline.multi();
mutations.forEach(mutation -> {
writeRecord(mutation);
if (mutation.getExpiryMillis() != null && mutation.getExpiryMillis() > 0) {
pipeline.pexpire(mutation.getKey(), mutation.getExpiryMillis());
backOffExecutor.execute(
new Retriable() {
@Override
public void execute() {
pipeline.multi();
mutations.forEach(
mutation -> {
writeRecord(mutation);
if (mutation.getExpiryMillis() != null && mutation.getExpiryMillis() > 0) {
pipeline.pexpire(mutation.getKey(), mutation.getExpiryMillis());
}
});
pipeline.exec();
pipeline.sync();
mutations.clear();
}
});
pipeline.exec();
pipeline.sync();
mutations.clear();
}

@Override
public Boolean isExceptionRetriable(Exception e) {
return e instanceof JedisConnectionException;
}
@Override
public Boolean isExceptionRetriable(Exception e) {
return e instanceof JedisConnectionException;
}

@Override
public void cleanUpAfterFailure() {
try {
pipeline.close();
} catch (IOException e) {
log.error(String.format("Error while closing pipeline: %s", e.getMessage()));
}
jedis = new Jedis(host, port, timeout);
pipeline = jedis.pipelined();
}
});
@Override
public void cleanUpAfterFailure() {
try {
pipeline.close();
} catch (IOException e) {
log.error(String.format("Error while closing pipeline: %s", e.getMessage()));
}
jedis = new Jedis(host, port, timeout);
pipeline = jedis.pipelined();
}
});
}

private FailedElement toFailedElement(RedisMutation mutation, Exception exception, String jobName) {
private FailedElement toFailedElement(
RedisMutation mutation, Exception exception, String jobName) {
return FailedElement.newBuilder()
.setJobName(jobName)
.setTransformName("RedisCustomIO")
.setPayload(mutation.getValue().toString())
.setErrorMessage(exception.getMessage())
.setStackTrace(ExceptionUtils.getStackTrace(exception))
.build();
.setJobName(jobName)
.setTransformName("RedisCustomIO")
.setPayload(mutation.getValue().toString())
.setErrorMessage(exception.getMessage())
.setStackTrace(ExceptionUtils.getStackTrace(exception))
.build();
}

@ProcessElement
Expand All @@ -284,11 +288,12 @@ public void processElement(ProcessContext context) {
try {
executeBatch();
} catch (Exception e) {
mutations.forEach(failedMutation -> {
FailedElement failedElement = toFailedElement(
failedMutation, e, context.getPipelineOptions().getJobName());
context.output(failedElement);
});
mutations.forEach(
failedMutation -> {
FailedElement failedElement =
toFailedElement(failedMutation, e, context.getPipelineOptions().getJobName());
context.output(failedElement);
});
mutations.clear();
}
}
Expand All @@ -315,16 +320,18 @@ private Response<?> writeRecord(RedisMutation mutation) {
}

@FinishBundle
public void finishBundle(FinishBundleContext context) throws IOException, InterruptedException {
if(mutations.size() > 0) {
public void finishBundle(FinishBundleContext context)
throws IOException, InterruptedException {
if (mutations.size() > 0) {
try {
executeBatch();
} catch (Exception e) {
mutations.forEach(failedMutation -> {
FailedElement failedElement = toFailedElement(
failedMutation, e, context.getPipelineOptions().getJobName());
context.output(failedElement, Instant.now(), GlobalWindow.INSTANCE);
});
mutations.forEach(
failedMutation -> {
FailedElement failedElement =
toFailedElement(failedMutation, e, context.getPipelineOptions().getJobName());
context.output(failedElement, Instant.now(), GlobalWindow.INSTANCE);
});
mutations.clear();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,14 @@ public void shouldExcludeUnregisteredFields() {

FeatureRow randomRow = TestUtil.createRandomFeatureRow(fs1);
expected.add(randomRow);
input.add(randomRow.toBuilder()
.addFields(Field.newBuilder()
.setName("extra")
.setValue(Value.newBuilder().setStringVal("hello")))
.build()
);
input.add(
randomRow
.toBuilder()
.addFields(
Field.newBuilder()
.setName("extra")
.setValue(Value.newBuilder().setStringVal("hello")))
.build());

PCollectionTuple output =
p.apply(Create.of(input))
Expand Down
Loading

0 comments on commit cc884b4

Please sign in to comment.