Skip to content

Commit

Permalink
Change error store to be part of configuration instead (#39)
Browse files Browse the repository at this point in the history
* Change error store to be part of configuration instead

* Revert to provider providing list of error stores

* merge master, reformat, align mocks stores to lowercase type

* Remove extra parantheses

* Format code to follow style

* Format code to follow style
  • Loading branch information
zhilingc authored and feast-ci-bot committed Jan 9, 2019
1 parent f389983 commit 5ac4b9c
Show file tree
Hide file tree
Showing 19 changed files with 286 additions and 116 deletions.
6 changes: 4 additions & 2 deletions core/src/main/java/feast/core/config/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ public ImportJobDefaults getImportJobDefaults(
@Value("${feast.jobs.runner}") String runner,
@Value("${feast.jobs.options}") String options,
@Value("${feast.jobs.executable}") String executable,
@Value("${feast.jobs.errorsStoreId}") String errorsStoreId) {
return new ImportJobDefaults(coreApiUri, runner, options, executable, errorsStoreId);
@Value("${feast.jobs.errorsStoreType}") String errorsStoreType,
@Value("${feast.jobs.errorsStoreOptions}") String errorsStoreOptions) {
return new ImportJobDefaults(
coreApiUri, runner, options, executable, errorsStoreType, errorsStoreOptions);
}
}
3 changes: 2 additions & 1 deletion core/src/main/java/feast/core/config/ImportJobDefaults.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class ImportJobDefaults {
private String runner;
private String importJobOptions;
private String executable;
private String errorsStoreId;
private String errorsStoreType;
private String errorsStoreOptions;
}

31 changes: 13 additions & 18 deletions core/src/main/java/feast/core/service/JobExecutionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,26 @@
import feast.core.model.JobStatus;
import feast.core.util.TypeConversion;
import feast.specs.ImportSpecProto.ImportSpec;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.time.Instant;
import java.util.*;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class JobExecutionService {
private static final int SLEEP_MS = 10;
private static final Pattern JOB_EXT_ID_PREFIX_REGEX = Pattern.compile(".*FeastImportJobId:.*");

public static final String JOB_PREFIX_DEFAULT = "feastimport";

private static final int SLEEP_MS = 10;
private static final Pattern JOB_EXT_ID_PREFIX_REGEX = Pattern.compile(".*FeastImportJobId:.*");
private JobInfoRepository jobInfoRepository;
private ImportJobDefaults defaults;

Expand Down Expand Up @@ -106,9 +108,6 @@ public SubmitImportJobResponse submitJob(ImportSpec importSpec, String jobPrefix

/**
* Update a given job's status
*
* @param jobId
* @param status
*/
public void updateJobStatus(String jobId, JobStatus status) {
Optional<JobInfo> jobRecordOptional = jobInfoRepository.findById(jobId);
Expand All @@ -121,9 +120,6 @@ public void updateJobStatus(String jobId, JobStatus status) {

/**
* Update a given job's external id
*
* @param jobId
* @param jobExtId
*/
public void updateJobExtId(String jobId, String jobExtId) {
Optional<JobInfo> jobRecordOptional = jobInfoRepository.findById(jobId);
Expand All @@ -137,8 +133,6 @@ public void updateJobExtId(String jobId, String jobExtId) {
/**
* Builds the command to execute the ingestion job
*
* @param importSpec
* @param jobId
* @return configured ProcessBuilder
*/
public ProcessBuilder getProcessBuilder(ImportSpec importSpec, String jobId) {
Expand All @@ -153,7 +147,8 @@ public ProcessBuilder getProcessBuilder(ImportSpec importSpec, String jobId) {
commands.add(
option("importSpecBase64", Base64.getEncoder().encodeToString(importSpec.toByteArray())));
commands.add(option("coreApiUri", defaults.getCoreApiUri()));
commands.add(option("errorsStoreId", defaults.getErrorsStoreId()));
commands.add(option("errorsStoreType", defaults.getErrorsStoreType()));
commands.add(option("errorsStoreOptions", defaults.getErrorsStoreOptions()));
options.forEach((k, v) -> commands.add(option(k, v)));
return new ProcessBuilder(commands);
}
Expand All @@ -170,7 +165,7 @@ private String option(String key, String value) {
*/
public String runProcess(Process p) {
try (BufferedReader outputStream =
new BufferedReader(new InputStreamReader(p.getInputStream()));
new BufferedReader(new InputStreamReader(p.getInputStream()));
BufferedReader errorsStream =
new BufferedReader(new InputStreamReader(p.getErrorStream()))) {
String extId = "";
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ feast.jobs.coreUri=${CORE_API_URI:localhost:8433}
feast.jobs.runner=${JOB_RUNNER:DirectRunner}
feast.jobs.options=${JOB_OPTIONS:{}}
feast.jobs.executable=${JOB_EXECUTABLE:feast-ingestion.jar}
feast.jobs.errorsStoreId=${JOB_ERRORS_STORE_ID:STDOUT}
feast.jobs.errorsStoreType=${JOB_ERRORS_STORE_TYPE:stdout}
feast.jobs.errorsStoreOptions=${JOB_ERRORS_STORE_OPTIONS:{}}

feast.jobs.dataflow.projectId = ${DATAFLOW_PROJECT_ID:}
feast.jobs.dataflow.location = ${DATAFLOW_LOCATION:}
Expand Down
41 changes: 22 additions & 19 deletions core/src/test/java/feast/core/service/JobExecutionServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,25 @@

package feast.core.service;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
import static org.mockito.internal.verification.VerificationModeFactory.times;

import com.google.common.collect.Lists;
import feast.core.config.ImportJobDefaults;
import feast.core.dao.JobInfoRepository;
import feast.core.model.JobInfo;
import feast.core.model.JobStatus;
import feast.specs.ImportSpecProto.ImportSpec;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -31,25 +44,13 @@
import org.mockito.Mock;
import org.mockito.Mockito;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
import static org.mockito.internal.verification.VerificationModeFactory.times;

public class JobExecutionServiceTest {
private ImportJobDefaults defaults;
@Mock JobInfoRepository jobInfoRepository;

@Rule public final ExpectedException expectedException = ExpectedException.none();
@Rule
public final ExpectedException expectedException = ExpectedException.none();
@Mock
JobInfoRepository jobInfoRepository;
private ImportJobDefaults defaults;

@Before
public void setUp() {
Expand All @@ -60,7 +61,8 @@ public void setUp() {
"DirectRunner",
"{\"key\":\"value\"}",
"ingestion.jar",
"STDOUT");
"STDOUT",
"{}");
}

@Test
Expand All @@ -77,7 +79,8 @@ public void shouldBuildProcessBuilderWithCorrectOptions() {
"--runner=DirectRunner",
"--importSpecBase64=CgRmaWxl",
"--coreApiUri=localhost:8080",
"--errorsStoreId=STDOUT",
"--errorsStoreType=STDOUT",
"--errorsStoreOptions={}",
"--key=value");
assertThat(pb.command(), equalTo(expected));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import java.util.List;
import org.apache.beam.sdk.options.PipelineOptions;

/** An ImportJobModule is a Guice module for creating dependency injection bindings. */
/**
* An ImportJobModule is a Guice module for creating dependency injection bindings.
*/
public class ImportJobModule extends AbstractModule {

private final ImportJobOptions options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.options.Validation.Required;

public interface ImportJobOptions extends PipelineOptions {

@Description("Import spec yaml file path")
@Required(groups = {"importSpec"})
String getImportSpecYamlFile();
Expand Down Expand Up @@ -72,14 +73,23 @@ public interface ImportJobOptions extends PipelineOptions {
void setLimit(Long value);

@Description(
"Set a store id to store errors in, if your data input is **very** small, you can use STDOUT"
+ " or STDERR as the store id, otherwise it must match an associated storage spec")
String getErrorsStoreId();
"Set an errors store type. One of: [stderr, stdout, file.json]. Note that you should not use "
+ "stderr/stdout in production unless your data volume is extremely small.")
String getErrorsStoreType();

void setErrorsStoreType(String value);

void setErrorsStoreId(String value);
@Description(
"Provide errors store options as a json string containing key-values. Options required"
+ "depend on the type of store set.")
@Default.String("{}")
String getErrorsStoreOptions();

void setErrorsStoreOptions(String value);

@AutoService(PipelineOptionsRegistrar.class)
class ImportJobOptionsRegistrar implements PipelineOptionsRegistrar {

@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
return Collections.singleton(ImportJobOptions.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,9 @@

package feast.ingestion.transform;

import static com.google.common.base.Preconditions.checkNotNull;
import static feast.ingestion.util.JsonUtil.convertJsonStringToMap;

import com.google.inject.Inject;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import feast.ingestion.model.Specs;
import feast.ingestion.options.ImportJobOptions;
import feast.ingestion.transform.FeatureIO.Write;
Expand All @@ -33,54 +28,60 @@
import feast.storage.ErrorsStore;
import feast.storage.noop.NoOpIO;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.slf4j.event.Level;

@Slf4j
public class ErrorsStoreTransform extends FeatureIO.Write {
public static final String STDERR_STORE_ID = "STDERR";
public static final String STDOUT_STORE_ID = "STDOUT";

private String errorsStoreId;
private List<ErrorsStore> stores;
public static final String ERRORS_STORE_STDERR = "stderr";
public static final String ERRORS_STORE_STDOUT = "stdout";
public static final String ERRORS_STORE_JSON = "file.json";

private String errorsStoreType;
private StorageSpec errorsStoreSpec;
private ErrorsStore errorsStore;
private Specs specs;

@Inject
public ErrorsStoreTransform(ImportJobOptions options, List<ErrorsStore> stores, Specs specs) {
this.errorsStoreId = options.getErrorsStoreId();
this.stores = stores;
public ErrorsStoreTransform(
ImportJobOptions options, Specs specs, List<ErrorsStore> errorsStores) {
this.specs = specs;
this.errorsStoreType = options.getErrorsStoreType();

for (ErrorsStore errorsStore : errorsStores) {
if (errorsStore.getType().equals(errorsStoreType)) {
this.errorsStore = errorsStore;
}
}

this.errorsStoreSpec =
StorageSpec.newBuilder()
.setType(errorsStoreType)
.putAllOptions(convertJsonStringToMap(options.getErrorsStoreOptions()))
.build();
}

@Override
public PDone expand(PCollection<FeatureRowExtended> input) {
if (errorsStoreId == null) {
log.warn("No errorsStoreId specified, errors will be discarded");
return input.apply(new NoOpIO.Write());
}

if (errorsStoreId.equals(STDOUT_STORE_ID)) {
input.apply("Log errors to STDOUT", ParDo.of(new LoggerDoFn(Level.INFO)));
} else if (errorsStoreId.equals(STDERR_STORE_ID)) {
input.apply("Log errors to STDERR", ParDo.of(new LoggerDoFn(Level.ERROR)));
} else {
StorageSpec storageSpec = specs.getStorageSpec(errorsStoreId);
storageSpec =
checkNotNull(
storageSpec,
String.format("errorsStoreId=%s not found in storage specs", errorsStoreId));
Write write = null;
for (ErrorsStore errorsStore : stores) {
if (errorsStore.getType().equals(storageSpec.getType())) {
write = errorsStore.create(storageSpec, specs);
switch (errorsStoreType) {
case ERRORS_STORE_STDOUT:
input.apply("Log errors to STDOUT", ParDo.of(new LoggerDoFn(Level.INFO)));
break;
case ERRORS_STORE_STDERR:
input.apply("Log errors to STDERR", ParDo.of(new LoggerDoFn(Level.ERROR)));
break;
default:
if (errorsStore == null) {
log.warn("No valid errors store specified, errors will be discarded");
return input.apply(new NoOpIO.Write());
}
}
write =
checkNotNull(
write,
"No errors storage factory found for errorsStoreId=%s with type=%s",
errorsStoreId,
storageSpec.getType());
return input.apply(write);
Write write = errorsStore.create(this.errorsStoreSpec, specs);
return input.apply(write);
}
return PDone.in(input.getPipeline());
}
Expand Down
27 changes: 27 additions & 0 deletions ingestion/src/main/java/feast/ingestion/util/JsonUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package feast.ingestion.util;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.Map;

public class JsonUtil {

private static Gson gson = new Gson();

/**
* Unmarshals a given json string to map
*
* @param jsonString valid json formatted string
* @return map of keys to values in json
*/
public static Map<String, String> convertJsonStringToMap(String jsonString) {
if (jsonString == null || jsonString.equals("") || jsonString.equals("{}")) {
return Collections.emptyMap();
}
Type stringMapType = new TypeToken<Map<String, String>>() {
}.getType();
return gson.fromJson(jsonString, stringMapType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import feast.storage.ErrorsStore;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;
import lombok.extern.slf4j.Slf4j;
import feast.storage.ErrorsStore;

@Slf4j
public class ErrorsStoreService {

private static ServiceLoader<ErrorsStore> serviceLoader = ServiceLoader.load(ErrorsStore.class);
private static List<ErrorsStore> manuallyRegistered = new ArrayList<>();

Expand Down
Loading

0 comments on commit 5ac4b9c

Please sign in to comment.