Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve parallelization in Redis Sink #866

Merged
merged 4 commits into from
Jul 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion infra/scripts/setup-common-functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ start_feast_serving() {

if [ -n "$1" ]; then
echo "Custom Spring application.yml location provided: $1"
export CONFIG_ARG="--spring.config.location=file://$1"
export CONFIG_ARG="--spring.config.location=classpath:/application.yml,file://$1"
fi

nohup java -jar serving/target/feast-serving-$FEAST_BUILD_VERSION.jar $CONFIG_ARG &>/var/log/feast-serving-online.log &
Expand Down
1 change: 1 addition & 0 deletions infra/scripts/test-end-to-end-redis-cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ feast:
config:
# Connection string specifies the IP and ports of Redis instances in Redis cluster
connection_string: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005"
flush_frequency_seconds: 1
# Subscriptions indicate which feature sets needs to be retrieved and used to populate this store
subscriptions:
# Wildcards match all options. No filtering is done.
Expand Down
16 changes: 15 additions & 1 deletion infra/scripts/test-end-to-end.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,21 @@ if [[ ${ENABLE_AUTH} = "True" ]];
start_feast_core
fi

start_feast_serving
cat <<EOF > /tmp/serving.warehouse.application.yml
feast:
stores:
- name: online
type: REDIS
config:
host: localhost
port: 6379
flush_frequency_seconds: 1
subscriptions:
- name: "*"
project: "*"
EOF

start_feast_serving /tmp/serving.warehouse.application.yml
install_python_with_miniconda_and_feast_sdk

print_banner "Running end-to-end tests with pytest at 'tests/e2e'"
Expand Down
4 changes: 4 additions & 0 deletions protos/feast/core/Store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ message Store {
int32 initial_backoff_ms = 3;
// Optional. Maximum total number of retries for connecting to Redis. Default to zero retries.
int32 max_retries = 4;
// Optional. how often flush data to redis
int32 flush_frequency_seconds = 5;
}

message BigQueryConfig {
Expand All @@ -129,6 +131,8 @@ message Store {
string connection_string = 1;
int32 initial_backoff_ms = 2;
int32 max_retries = 3;
// Optional. how often flush data to redis
int32 flush_frequency_seconds = 4;
}

message Subscription {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@
import org.apache.beam.sdk.values.*;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedisCustomIO {

private static final int DEFAULT_BATCH_SIZE = 1000;
private static final int DEFAULT_TIMEOUT = 2000;

private static TupleTag<FeatureRow> successfulInsertsTag =
new TupleTag<FeatureRow>("successfulInserts") {};
private static TupleTag<FailedElement> failedInsertsTupleTag =
Expand All @@ -69,7 +67,7 @@ public static class Write extends PTransform<PCollection<FeatureRow>, WriteResul
private PCollectionView<Map<String, Iterable<FeatureSetSpec>>> featureSetSpecs;
private RedisIngestionClient redisIngestionClient;
private int batchSize;
private int timeout;
private Duration flushFrequency;

public Write(
RedisIngestionClient redisIngestionClient,
Expand All @@ -83,23 +81,28 @@ public Write withBatchSize(int batchSize) {
return this;
}

public Write withTimeout(int timeout) {
this.timeout = timeout;
public Write withFlushFrequency(Duration frequency) {
this.flushFrequency = frequency;
return this;
}

@Override
public WriteResult expand(PCollection<FeatureRow> input) {
PCollectionTuple redisWrite =
input
.apply("FixedFlushWindow", Window.<FeatureRow>into(FixedWindows.of(flushFrequency)))
.apply(
"CollectBatchBeforeWrite",
Window.<FeatureRow>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(batchSize)))
.discardingFiredPanes())
.apply("AttachSingletonKey", WithKeys.of((Void) null))
.apply("GroupOntoSingleton", GroupByKey.create())
"AttachFeatureReferenceKey",
ParDo.of(
new DoFn<FeatureRow, KV<String, FeatureRow>>() {
@ProcessElement
public void process(ProcessContext c) {
c.output(KV.of(c.element().getFeatureSet(), c.element()));
woop marked this conversation as resolved.
Show resolved Hide resolved
}
}))
.apply("IntoBatches", GroupIntoBatches.ofSize(batchSize))
.apply("ExtractResultValues", Values.create())
.apply("GlobalWindow", Window.<Iterable<FeatureRow>>into(new GlobalWindows()))
.apply(
ParDo.of(new WriteDoFn(redisIngestionClient, featureSetSpecs))
.withOutputTags(successfulInsertsTag, TupleTagList.of(failedInsertsTupleTag))
Expand All @@ -112,8 +115,6 @@ public WriteResult expand(PCollection<FeatureRow> input) {

public static class WriteDoFn extends DoFn<Iterable<FeatureRow>, FeatureRow> {
private PCollectionView<Map<String, Iterable<FeatureSetSpec>>> featureSetSpecsView;
private int batchSize = DEFAULT_BATCH_SIZE;
private int timeout = DEFAULT_TIMEOUT;
private RedisIngestionClient redisIngestionClient;

WriteDoFn(
Expand All @@ -124,20 +125,6 @@ public static class WriteDoFn extends DoFn<Iterable<FeatureRow>, FeatureRow> {
this.featureSetSpecsView = featureSetSpecsView;
}

public WriteDoFn withBatchSize(int batchSize) {
if (batchSize > 0) {
this.batchSize = batchSize;
}
return this;
}

public WriteDoFn withTimeout(int timeout) {
if (timeout > 0) {
this.timeout = timeout;
}
return this;
}

@Setup
public void setup() {
this.redisIngestionClient.setup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Duration;

@AutoValue
public abstract class RedisFeatureSink implements FeatureSink {
private static final int DEFAULT_BATCH_SIZE = 10000;
private static final int DEFAULT_FREQUENCY_SECONDS = 30;

/**
* Initialize a {@link RedisFeatureSink.Builder} from a {@link StoreProto.Store.RedisConfig}.
Expand Down Expand Up @@ -112,12 +115,28 @@ public PCollection<FeatureSetReference> prepareWrite(

@Override
public PTransform<PCollection<FeatureRow>, WriteResult> writer() {
int flushFrequencySeconds = DEFAULT_FREQUENCY_SECONDS;

if (getRedisClusterConfig() != null) {

if (getRedisClusterConfig().getFlushFrequencySeconds() > 0) {
flushFrequencySeconds = getRedisClusterConfig().getFlushFrequencySeconds();
}

return new RedisCustomIO.Write(
new RedisClusterIngestionClient(getRedisClusterConfig()), getSpecsView());
new RedisClusterIngestionClient(getRedisClusterConfig()), getSpecsView())
.withFlushFrequency(Duration.standardSeconds(flushFrequencySeconds))
.withBatchSize(DEFAULT_BATCH_SIZE);

} else if (getRedisConfig() != null) {
if (getRedisConfig().getFlushFrequencySeconds() > 0) {
flushFrequencySeconds = getRedisConfig().getFlushFrequencySeconds();
}

return new RedisCustomIO.Write(
new RedisStandaloneIngestionClient(getRedisConfig()), getSpecsView());
new RedisStandaloneIngestionClient(getRedisConfig()), getSpecsView())
.withFlushFrequency(Duration.standardSeconds(flushFrequencySeconds))
.withBatchSize(DEFAULT_BATCH_SIZE);
} else {
throw new RuntimeException(
"At least one RedisConfig or RedisClusterConfig must be provided to Redis Sink");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import feast.proto.types.FieldProto.Field;
import feast.proto.types.ValueProto.Value;
import feast.proto.types.ValueProto.ValueType.Enum;
import feast.storage.api.writer.FailedElement;
import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
Expand Down Expand Up @@ -221,7 +220,6 @@ public void shouldRetryFailConnection() throws InterruptedException {
p.apply(Create.of(featureRows))
.apply(redisClusterFeatureSink.writer())
.getFailedInserts()
.apply(Window.<FailedElement>into(new GlobalWindows()).triggering(Never.ever()))
.apply(Count.globally());

redisCluster.stop();
Expand Down Expand Up @@ -283,7 +281,6 @@ public void shouldProduceFailedElementIfRetryExceeded() {
p.apply(Create.of(featureRows))
.apply("modifiedSink", redisClusterFeatureSink.writer())
.getFailedInserts()
.apply(Window.<FailedElement>into(new GlobalWindows()).triggering(Never.ever()))
.apply(Count.globally());

PAssert.that(failedElementCount).containsInAnyOrder(1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import feast.proto.types.FieldProto.Field;
import feast.proto.types.ValueProto.Value;
import feast.proto.types.ValueProto.ValueType.Enum;
import feast.storage.api.writer.FailedElement;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
Expand All @@ -49,9 +48,6 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -212,7 +208,6 @@ public void shouldRetryFailConnection() throws InterruptedException {
p.apply(Create.of(featureRows))
.apply(redisFeatureSink.writer())
.getFailedInserts()
.apply(Window.<FailedElement>into(new GlobalWindows()).triggering(Never.ever()))
.apply(Count.globally());

redis.stop();
Expand Down Expand Up @@ -271,7 +266,6 @@ public void shouldProduceFailedElementIfRetryExceeded() {
p.apply(Create.of(featureRows))
.apply(redisFeatureSink.writer())
.getFailedInserts()
.apply(Window.<FailedElement>into(new GlobalWindows()).triggering(Never.ever()))
.apply(Count.globally());

redis.stop();
Expand Down