Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change error store to be part of configuration instead #39

Merged
merged 9 commits into from
Jan 9, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/java/feast/core/config/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ public ImportJobDefaults getImportJobDefaults(
@Value("${feast.jobs.runner}") String runner,
@Value("${feast.jobs.options}") String options,
@Value("${feast.jobs.executable}") String executable,
@Value("${feast.jobs.errorsStoreId}") String errorsStoreId) {
return new ImportJobDefaults(coreApiUri, runner, options, executable, errorsStoreId);
@Value("${feast.jobs.errorsStoreType}") String errorsStoreType,
@Value("${feast.jobs.errorsStoreOptions}") String errorsStoreOptions) {
return new ImportJobDefaults(
coreApiUri, runner, options, executable, errorsStoreType, errorsStoreOptions);
}
}
3 changes: 2 additions & 1 deletion core/src/main/java/feast/core/config/ImportJobDefaults.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class ImportJobDefaults {
private String runner;
private String importJobOptions;
private String executable;
private String errorsStoreId;
private String errorsStoreType;
private String errorsStoreOptions;
}

Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public ProcessBuilder getProcessBuilder(ImportSpec importSpec, String jobId) {
commands.add(
option("importSpecBase64", Base64.getEncoder().encodeToString(importSpec.toByteArray())));
commands.add(option("coreApiUri", defaults.getCoreApiUri()));
commands.add(option("errorsStoreId", defaults.getErrorsStoreId()));
commands.add(option("errorsStoreType", defaults.getErrorsStoreType()));
commands.add(option("errorsStoreOptions", defaults.getErrorsStoreOptions()));
options.forEach((k, v) -> commands.add(option(k, v)));
return new ProcessBuilder(commands);
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ feast.jobs.coreUri=${CORE_API_URI:localhost:8433}
feast.jobs.runner=${JOB_RUNNER:DirectRunner}
feast.jobs.options=${JOB_OPTIONS:{}}
feast.jobs.executable=${JOB_EXECUTABLE:feast-ingestion.jar}
feast.jobs.errorsStoreId=${JOB_ERRORS_STORE_ID:STDOUT}
feast.jobs.errorsStoreType=${JOB_ERRORS_STORE_TYPE:stdout}
feast.jobs.errorsStoreOptions=${JOB_ERRORS_STORE_OPTIONS:{}}

feast.jobs.dataflow.projectId = ${DATAFLOW_PROJECT_ID:}
feast.jobs.dataflow.location = ${DATAFLOW_LOCATION:}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public void setUp() {
"DirectRunner",
"{\"key\":\"value\"}",
"ingestion.jar",
"STDOUT");
"STDOUT",
"{}");
}

@Test
Expand All @@ -77,7 +78,8 @@ public void shouldBuildProcessBuilderWithCorrectOptions() {
"--runner=DirectRunner",
"--importSpecBase64=CgRmaWxl",
"--coreApiUri=localhost:8080",
"--errorsStoreId=STDOUT",
"--errorsStoreType=STDOUT",
"--errorsStoreOptions={}",
"--key=value");
assertThat(pb.command(), equalTo(expected));
}
Expand Down
14 changes: 12 additions & 2 deletions ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@
import feast.storage.ErrorsStore;
import feast.storage.ServingStore;
import feast.storage.WarehouseStore;
import feast.storage.file.json.JsonFileStores;
import feast.storage.service.ErrorsStoreService;
import feast.storage.service.ServingStoreService;
import feast.storage.service.WarehouseStoreService;
import java.util.List;
import org.apache.beam.sdk.options.PipelineOptions;

import static feast.ingestion.transform.ErrorsStoreTransform.ERRORS_STORE_JSON;

/** An ImportJobModule is a Guice module for creating dependency injection bindings. */
public class ImportJobModule extends AbstractModule {

Expand All @@ -48,11 +51,18 @@ public ImportJobModule(ImportJobOptions options, ImportSpec importSpec) {
this.importSpec = importSpec;
}

private void configureErrorsStore() {
if (options.getErrorsStoreType().equals(ERRORS_STORE_JSON)) {
ErrorsStoreService.register(new JsonFileStores.JsonFileErrorsStore());
}
}

@Override
protected void configure() {
bind(ImportJobOptions.class).toInstance(options);
bind(PipelineOptions.class).toInstance(options);
bind(ImportSpec.class).toInstance(importSpec);
configureErrorsStore();
}

@Provides
Expand Down Expand Up @@ -88,7 +98,7 @@ List<ServingStore> provideServingStores() {

@Provides
@Singleton
List<ErrorsStore> provideErrorsStores() {
return ErrorsStoreService.getAll();
ErrorsStore provideErrorsStores() {
zhilingc marked this conversation as resolved.
Show resolved Hide resolved
return ErrorsStoreService.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,19 @@ public interface ImportJobOptions extends PipelineOptions {
void setLimit(Long value);

@Description(
"Set a store id to store errors in, if your data input is **very** small, you can use STDOUT"
+ " or STDERR as the store id, otherwise it must match an associated storage spec")
String getErrorsStoreId();
"Set an errors store type. One of: [STDERR, STDOUT, JSON]. Note that you should not use "
zhilingc marked this conversation as resolved.
Show resolved Hide resolved
+ "STDERR/STDOUT in production unless your data volume is extremely small.")
String getErrorsStoreType();

void setErrorsStoreId(String value);
void setErrorsStoreType(String value);

@Description(
"Provide errors store options as a json string containing key-values. Options required"
+ "depend on the type of store set.")
@Default.String("{}")
String getErrorsStoreOptions();

void setErrorsStoreOptions(String value);

@AutoService(PipelineOptionsRegistrar.class)
class ImportJobOptionsRegistrar implements PipelineOptionsRegistrar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,7 @@

package feast.ingestion.transform;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.inject.Inject;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import feast.ingestion.model.Specs;
import feast.ingestion.options.ImportJobOptions;
import feast.ingestion.transform.FeatureIO.Write;
Expand All @@ -33,54 +26,59 @@
import feast.storage.ErrorsStore;
import feast.storage.noop.NoOpIO;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.slf4j.event.Level;

import javax.annotation.Nullable;

import static feast.ingestion.util.JsonUtil.convertJsonStringToMap;

@Slf4j
public class ErrorsStoreTransform extends FeatureIO.Write {
public static final String STDERR_STORE_ID = "STDERR";
public static final String STDOUT_STORE_ID = "STDOUT";
public static final String ERRORS_STORE_STDERR = "stderr";
public static final String ERRORS_STORE_STDOUT = "stdout";
public static final String ERRORS_STORE_JSON = "file.json";

private String errorsStoreId;
private List<ErrorsStore> stores;
private String errorsStoreType;
private StorageSpec errorsStoreSpec;
private ErrorsStore errorsStore;
private Specs specs;

@Inject
public ErrorsStoreTransform(ImportJobOptions options, List<ErrorsStore> stores, Specs specs) {
this.errorsStoreId = options.getErrorsStoreId();
this.stores = stores;
public ErrorsStoreTransform(
ImportJobOptions options, Specs specs, @Nullable ErrorsStore errorsStore) {
this.specs = specs;
this.errorsStoreType = options.getErrorsStoreType();
this.errorsStore = errorsStore;
StorageSpec.Builder errorsStoreBuilder = StorageSpec.newBuilder().setType(errorsStoreType);

switch (errorsStoreType) {
case ERRORS_STORE_JSON:
errorsStoreBuilder.putAllOptions(convertJsonStringToMap(options.getErrorsStoreOptions()));
default:
this.errorsStoreSpec = errorsStoreBuilder.build();
}
}

@Override
public PDone expand(PCollection<FeatureRowExtended> input) {
if (errorsStoreId == null) {
log.warn("No errorsStoreId specified, errors will be discarded");
return input.apply(new NoOpIO.Write());
}

if (errorsStoreId.equals(STDOUT_STORE_ID)) {
input.apply("Log errors to STDOUT", ParDo.of(new LoggerDoFn(Level.INFO)));
} else if (errorsStoreId.equals(STDERR_STORE_ID)) {
input.apply("Log errors to STDERR", ParDo.of(new LoggerDoFn(Level.ERROR)));
} else {
StorageSpec storageSpec = specs.getStorageSpec(errorsStoreId);
storageSpec =
checkNotNull(
storageSpec,
String.format("errorsStoreId=%s not found in storage specs", errorsStoreId));
Write write = null;
for (ErrorsStore errorsStore : stores) {
if (errorsStore.getType().equals(storageSpec.getType())) {
write = errorsStore.create(storageSpec, specs);
switch (errorsStoreType) {
case ERRORS_STORE_STDOUT:
input.apply("Log errors to STDOUT", ParDo.of(new LoggerDoFn(Level.INFO)));
break;
case ERRORS_STORE_STDERR:
input.apply("Log errors to STDERR", ParDo.of(new LoggerDoFn(Level.ERROR)));
break;
default:
if (errorsStore == null) {
log.warn("No valid errors store specified, errors will be discarded");
return input.apply(new NoOpIO.Write());
}
}
write =
checkNotNull(
write,
"No errors storage factory found for errorsStoreId=%s with type=%s",
errorsStoreId,
storageSpec.getType());
return input.apply(write);
Write write = errorsStore.create(this.errorsStoreSpec, specs);
return input.apply(write);
}
return PDone.in(input.getPipeline());
}
Expand Down
25 changes: 25 additions & 0 deletions ingestion/src/main/java/feast/ingestion/util/JsonUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package feast.ingestion.util;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import java.lang.reflect.Type;
import java.util.Collections;
import java.util.Map;

public class JsonUtil {
private static Gson gson = new Gson();
/**
* Unmarshals a given json string to map
*
* @param jsonString valid json formatted string
* @return map of keys to values in json
*/
public static Map<String, String> convertJsonStringToMap(String jsonString) {
if (jsonString == null || jsonString.equals("") || jsonString.equals("{}")) {
return Collections.emptyMap();
}
Type stringMapType = new TypeToken<Map<String, String>>() {}.getType();
return gson.fromJson(jsonString, stringMapType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,18 @@

package feast.storage.service;

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;
import lombok.extern.slf4j.Slf4j;
import feast.storage.ErrorsStore;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ErrorsStoreService {
private static ServiceLoader<ErrorsStore> serviceLoader = ServiceLoader.load(ErrorsStore.class);
private static List<ErrorsStore> manuallyRegistered = new ArrayList<>();

static {
for (ErrorsStore store : getAll()) {
log.info("ErrorsStore type found: " + store.getType());
}
}

public static List<ErrorsStore> getAll() {
return Lists.newArrayList(
Iterators.concat(manuallyRegistered.iterator(), serviceLoader.iterator()));
}
private static ErrorsStore errorsStore;

/** Get store of the given subclass. */
public static <T extends ErrorsStore> T get(Class<T> clazz) {
for (ErrorsStore store : getAll()) {
if (clazz.isInstance(store)) {
//noinspection unchecked
return (T) store;
}
}
return null;
public static ErrorsStore get() {
return errorsStore;
}

public static void register(ErrorsStore store) {
manuallyRegistered.add(store);
errorsStore = store;
zhilingc marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading