Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#12: prefetch specs and validate on job expansion #15

Merged
merged 1 commit into from
Dec 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,21 @@
import feast.ingestion.config.ImportSpecSupplier;
import feast.ingestion.model.Specs;
import feast.ingestion.options.ImportJobOptions;
import feast.ingestion.transform.*;
import feast.ingestion.transform.ErrorsStoreTransform;
import feast.ingestion.transform.ReadFeaturesTransform;
import feast.ingestion.transform.ServingStoreTransform;
import feast.ingestion.transform.ToFeatureRowExtended;
import feast.ingestion.transform.ValidateTransform;
import feast.ingestion.transform.WarehouseStoreTransform;
import feast.ingestion.transform.fn.ConvertTypesDoFn;
import feast.ingestion.transform.fn.LoggerDoFn;
import feast.ingestion.transform.fn.RoundEventTimestampsDoFn;
import feast.ingestion.values.PFeatureRows;
import feast.specs.ImportSpecProto.ImportSpec;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.Arrays;
import java.util.Random;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowRunner;
Expand All @@ -60,9 +67,6 @@
import org.joda.time.Duration;
import org.slf4j.event.Level;

import java.util.Arrays;
import java.util.Random;

@Slf4j
public class ImportJob {
private static Random random = new Random(System.currentTimeMillis());
Expand Down Expand Up @@ -120,6 +124,13 @@ public static PipelineResult mainWithResult(String[] args) {
return job.run();
}

private static String generateName() {
byte[] bytes = new byte[7];
random.nextBytes(bytes);
String randomHex = DigestUtils.sha1Hex(bytes).substring(0, 7);
return String.format("feast-importjob-%s-%s", DateTime.now().getMillis(), randomHex);
}

public void expand() {
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(
Expand All @@ -134,6 +145,8 @@ public void expand() {
// pass
}

specs.validate();

PCollection<FeatureRow> features = pipeline.apply("Read", readFeaturesTransform);
if (options.getLimit() != null && options.getLimit() > 0) {
features = features.apply(Sample.any(options.getLimit()));
Expand Down Expand Up @@ -195,13 +208,6 @@ public void logNRows(PFeatureRows pFeatureRows, String name, int limit) {
.apply("Log errors sample", ParDo.of(new LoggerDoFn(Level.ERROR, name + " ERRORS ")));
}

private static String generateName() {
byte[] bytes = new byte[7];
random.nextBytes(bytes);
String randomHex = DigestUtils.sha1Hex(bytes).substring(0, 7);
return String.format("feast-importjob-%s-%s", DateTime.now().getMillis(), randomHex);
}

private String retrieveId(PipelineResult result) {
Class<? extends PipelineRunner<?>> runner = options.getRunner();
if (runner.isAssignableFrom(DataflowRunner.class)) {
Expand Down
19 changes: 11 additions & 8 deletions ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import java.util.List;
import org.apache.beam.sdk.options.PipelineOptions;
import feast.ingestion.model.Specs;
import feast.ingestion.model.SpecsImpl;
import feast.ingestion.options.ImportJobOptions;
import feast.ingestion.service.CachedSpecService;
import feast.ingestion.service.CoreSpecService;
import feast.ingestion.service.FileSpecService;
import feast.ingestion.service.SpecService;
import feast.ingestion.service.SpecService.Builder;
import feast.ingestion.service.SpecService.UnsupportedBuilder;
import feast.specs.ImportSpecProto.ImportSpec;
Expand All @@ -37,6 +34,8 @@
import feast.storage.service.ErrorsStoreService;
import feast.storage.service.ServingStoreService;
import feast.storage.service.WarehouseStoreService;
import java.util.List;
import org.apache.beam.sdk.options.PipelineOptions;

/** An ImportJobModule is a Guice module for creating dependency injection bindings. */
public class ImportJobModule extends AbstractModule {
Expand All @@ -54,23 +53,27 @@ protected void configure() {
bind(ImportJobOptions.class).toInstance(options);
bind(PipelineOptions.class).toInstance(options);
bind(ImportSpec.class).toInstance(importSpec);
bind(Specs.class).to(SpecsImpl.class);
}

@Provides
@Singleton
Builder provideSpecService(ImportJobOptions options) {
if (options.getCoreApiUri() != null) {
return new CachedSpecService.Builder(new CoreSpecService.Builder(options.getCoreApiUri()));
return new CoreSpecService.Builder(options.getCoreApiUri());
} else if (options.getCoreApiSpecPath() != null) {
return new CachedSpecService.Builder(
new FileSpecService.Builder(options.getCoreApiSpecPath()));
return new FileSpecService.Builder(options.getCoreApiSpecPath());
} else {
return new UnsupportedBuilder(
"Cannot initialise spec service as coreApiHost or specPath was not set.");
}
}

@Provides
@Singleton
Specs provideSpecs(SpecService.Builder specService) {
return Specs.of(options.getJobName(), importSpec, specService.build());
}

@Provides
@Singleton
List<WarehouseStore> provideWarehouseStores() {
Expand Down
124 changes: 112 additions & 12 deletions ingestion/src/main/java/feast/ingestion/model/Specs.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,127 @@

package feast.ingestion.model;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import feast.ingestion.service.SpecRetrievalException;
import com.google.common.base.Preconditions;
import feast.ingestion.service.SpecService;
import feast.specs.EntitySpecProto.EntitySpec;
import feast.specs.FeatureSpecProto.FeatureSpec;
import feast.specs.ImportSpecProto.Field;
import feast.specs.ImportSpecProto.ImportSpec;
import feast.specs.StorageSpecProto.StorageSpec;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import lombok.Builder;
import lombok.Getter;

@Builder
@Getter
public class Specs implements Serializable {
private String jobName;
private ImportSpec importSpec;
private Map<String, EntitySpec> entitySpecs;
private Map<String, FeatureSpec> featureSpecs;
private Map<String, StorageSpec> storageSpecs;
private transient SpecService specService;
private RuntimeException error;

public static Specs of(String jobName, ImportSpec importSpec, SpecService specService) {
try {
Specs.SpecsBuilder specsBuilder = Specs.builder().jobName(jobName).importSpec(importSpec);

List<Field> fields = importSpec.getSchema().getFieldsList();
List<String> featureIds = new ArrayList<>();
for (Field field : fields) {
if (!field.getFeatureId().isEmpty()) {
featureIds.add(field.getFeatureId());
}
}
specsBuilder.featureSpecs(specService.getFeatureSpecs(featureIds));

List<String> entityNames = importSpec.getEntitiesList();
for (FeatureSpec featureSpec : specsBuilder.featureSpecs.values()) {
Preconditions.checkArgument(
entityNames.contains(featureSpec.getEntity()),
"Feature has entity not listed in import spec featureSpec=" + featureSpec.toString());
}
specsBuilder.entitySpecs(specService.getEntitySpecs(entityNames));

specsBuilder.storageSpecs(specService.getAllStorageSpecs());

return specsBuilder.build();
} catch (RuntimeException e) {
return Specs.builder().error(e).build();
}
}

public interface Specs extends Serializable {
FeatureSpec getFeatureSpec(String featureId);
public void validate() {
if (error != null) {
throw error;
}

List<FeatureSpec> getFeatureSpecByServingStoreId(String storeId) throws SpecRetrievalException;
// Sanity checks that our maps are built correctly
for (Entry<String, FeatureSpec> entry : featureSpecs.entrySet()) {
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getId()));
}
for (Entry<String, EntitySpec> entry : entitySpecs.entrySet()) {
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getName()));
}
for (Entry<String, StorageSpec> entry : storageSpecs.entrySet()) {
Preconditions.checkArgument(entry.getKey().equals(entry.getValue().getId()));
}

EntitySpec getEntitySpec(String entityName) throws SpecRetrievalException;
for (FeatureSpec featureSpec : featureSpecs.values()) {
// Check that feature has a matching entity
Preconditions.checkArgument(
entitySpecs.containsKey(featureSpec.getEntity()),
String.format(
"Feature %s references unknown entity %s",
featureSpec.getId(), featureSpec.getEntity()));
// Check that feature has a matching serving store
Preconditions.checkArgument(
storageSpecs.containsKey(featureSpec.getDataStores().getServing().getId()),
String.format(
"Feature %s references unknown serving store %s",
featureSpec.getId(), featureSpec.getDataStores().getServing().getId()));
// Check that feature has a matching warehouse store
Preconditions.checkArgument(
storageSpecs.containsKey(featureSpec.getDataStores().getWarehouse().getId()),
String.format(
"Feature %s references unknown warehouse store %s",
featureSpec.getId(), featureSpec.getDataStores().getWarehouse().getId()));
}
}

ImportSpec getImportSpec() throws SpecRetrievalException;
public EntitySpec getEntitySpec(String entityName) {
Preconditions.checkArgument(
entitySpecs.containsKey(entityName),
String.format("Unknown entity %s, spec was not initialized", entityName));
return entitySpecs.get(entityName);
}

Map<String, StorageSpec> getStorageSpecs() throws SpecRetrievalException;
public FeatureSpec getFeatureSpec(String featureId) {
Preconditions.checkArgument(
featureSpecs.containsKey(featureId),
String.format("Unknown feature %s, spec was not initialized", featureId));
return featureSpecs.get(featureId);
}

StorageSpec getStorageSpec(String storeId);
public List<FeatureSpec> getFeatureSpecByServingStoreId(String storeId) {
List<FeatureSpec> out = new ArrayList<>();
for (FeatureSpec featureSpec : featureSpecs.values()) {
if (featureSpec.getDataStores().getServing().getId().equals(storeId)) {
out.add(featureSpec);
}
}
return out;
}

String getJobName();
public StorageSpec getStorageSpec(String storeId) {
Preconditions.checkArgument(
storageSpecs.containsKey(storeId),
String.format("Unknown store %s, spec was not initialized", storeId));
return storageSpecs.get(storeId);
}
}
103 changes: 0 additions & 103 deletions ingestion/src/main/java/feast/ingestion/model/SpecsImpl.java

This file was deleted.

Loading