Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up Feast configuration #611

Merged
merged 17 commits into from
Apr 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@
<configuration>
<skip>false</skip>
</configuration>
<executions>
<execution>
<id>build-info</id>
<goals>
<goal>build-info</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Expand Down Expand Up @@ -207,5 +215,21 @@
<artifactId>jaxb-api</artifactId>
</dependency>

<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>2.0.0.Final</version>
</dependency>
<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator</artifactId>
<version>6.1.2.Final</version>
</dependency>
<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator-annotation-processor</artifactId>
<version>6.1.2.Final</version>
</dependency>

</dependencies>
</project>
188 changes: 173 additions & 15 deletions core/src/main/java/feast/core/config/FeastProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,211 @@
*/
package feast.core.config;

import java.util.Map;
import feast.core.config.FeastProperties.StreamProperties.FeatureStreamOptions;
import feast.core.validators.OneOfStrings;
import java.util.*;
import javax.annotation.PostConstruct;
import javax.validation.*;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Positive;
import lombok.Getter;
import lombok.Setter;
import org.hibernate.validator.constraints.URL;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.info.BuildProperties;

@Getter
@Setter
@ConfigurationProperties(prefix = "feast", ignoreInvalidFields = true)
public class FeastProperties {

private String version;
private JobProperties jobs;
/**
* Instantiates a new Feast properties.
*
* @param buildProperties Feast build properties
*/
@Autowired
public FeastProperties(BuildProperties buildProperties) {
setVersion(buildProperties.getVersion());
}

/** Instantiates a new Feast properties. */
public FeastProperties() {}

/* Feast Core Build Version */
@NotBlank private String version = "unknown";

/* Population job properties */
@NotNull private JobProperties jobs;

@NotNull
/* Feast Kafka stream properties */
private StreamProperties stream;

/** Feast job properties. These properties are used for ingestion jobs. */
@Getter
@Setter
public static class JobProperties {

private String runner;
private Map<String, String> options;
@NotBlank
/* The active Apache Beam runner name. This name references one instance of the Runner class */
private String activeRunner;

/** List of configured job runners. */
private List<Runner> runners = new ArrayList<>();
zhilingc marked this conversation as resolved.
Show resolved Hide resolved

/**
* Gets a {@link Runner} instance of the active runner
*
* @return the active runner
*/
public Runner getActiveRunner() {
for (Runner runner : getRunners()) {
if (activeRunner.equals(runner.getName())) {
return runner;
}
}
throw new RuntimeException(
String.format(
"Active runner is misconfigured. Could not find runner: %s.", activeRunner));
}

/** Job Runner class. */
@Getter
@Setter
public static class Runner {
/** Job runner name. This must be unique. */
String name;

/** Job runner type DirectRunner, DataflowRunner currently supported */
String type;

/**
* Job runner configuration options. See the following for options
* https://api.docs.feast.dev/grpc/feast.core.pb.html#Runner
*/
Map<String, String> options = new HashMap<>();

/**
* Gets the job runner type as an enum.
*
* @return Returns the job runner type as {@link feast.core.job.Runner}
*/
public feast.core.job.Runner getType() {
return feast.core.job.Runner.fromString(type);
}
}

@NotNull
/* Population job metric properties */
private MetricsProperties metrics;
private JobUpdatesProperties updates;
}

@Getter
@Setter
public static class JobUpdatesProperties {
/* Timeout in seconds for each attempt to update or submit a new job to the runner */
@Positive private long jobUpdateTimeoutSeconds;

private long timeoutSeconds;
private long pollingIntervalMillis;
/* Job update polling interval in millisecond. How frequently Feast will update running jobs. */
@Positive private long pollingIntervalMilliseconds;
}

/** Properties used to configure Feast's managed Kafka feature stream. */
@Getter
@Setter
public static class StreamProperties {

/* Feature stream type. Only "kafka" is supported. */
@OneOfStrings({"kafka"})
@NotBlank
private String type;
private Map<String, String> options;

/* Feature stream options */
@NotNull private FeatureStreamOptions options;

/** Feature stream options */
@Getter
@Setter
public static class FeatureStreamOptions {

/* Kafka topic to use for feature sets without source topics. */
@NotBlank private String topic = "feast-features";

/**
* Comma separated list of Kafka bootstrap servers. Used for feature sets without a defined
* source.
*/
@NotBlank private String bootstrapServers = "localhost:9092";

/* Defines the number of copies of managed feature stream Kafka. */
@Positive private short replicationFactor = 1;

/* Number of Kafka partitions to to use for managed feature stream. */
@Positive private int partitions = 1;
}
}

/** Feast population job metrics */
@Getter
@Setter
public static class MetricsProperties {

/* Population job metrics enabled */
private boolean enabled;

/* Metric type. Possible options: statsd */
@OneOfStrings({"statsd"})
@NotBlank
private String type;
private String host;
private int port;

/* Host of metric sink */
@URL private String host;

/* Port of metric sink */
@Positive private int port;
}

/**
* Validates all FeastProperties. This method runs after properties have been initialized and
* individually and conditionally validates each class.
*/
@PostConstruct
public void validate() {
ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
Validator validator = factory.getValidator();

// Validate root fields in FeastProperties
Set<ConstraintViolation<FeastProperties>> violations = validator.validate(this);
if (!violations.isEmpty()) {
throw new ConstraintViolationException(violations);
}

// Validate Stream properties
Set<ConstraintViolation<StreamProperties>> streamPropertyViolations =
validator.validate(getStream());
if (!streamPropertyViolations.isEmpty()) {
throw new ConstraintViolationException(streamPropertyViolations);
}

// Validate Stream Options
Set<ConstraintViolation<FeatureStreamOptions>> featureStreamOptionsViolations =
validator.validate(getStream().getOptions());
if (!featureStreamOptionsViolations.isEmpty()) {
throw new ConstraintViolationException(featureStreamOptionsViolations);
}

// Validate JobProperties
Set<ConstraintViolation<JobProperties>> jobPropertiesViolations = validator.validate(getJobs());
if (!jobPropertiesViolations.isEmpty()) {
throw new ConstraintViolationException(jobPropertiesViolations);
}

// Validate MetricsProperties
if (getJobs().getMetrics().isEnabled()) {
Set<ConstraintViolation<MetricsProperties>> jobMetricViolations =
validator.validate(getJobs().getMetrics());
if (!jobMetricViolations.isEmpty()) {
throw new ConstraintViolationException(jobMetricViolations);
}
}
}
}
9 changes: 4 additions & 5 deletions core/src/main/java/feast/core/config/FeatureStreamConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public Source getDefaultSource(FeastProperties feastProperties) {
SourceType featureStreamType = SourceType.valueOf(streamProperties.getType().toUpperCase());
switch (featureStreamType) {
case KAFKA:
String bootstrapServers = streamProperties.getOptions().get("bootstrapServers");
String topicName = streamProperties.getOptions().get("topic");
String bootstrapServers = streamProperties.getOptions().getBootstrapServers();
String topicName = streamProperties.getOptions().getTopic();
Map<String, Object> map = new HashMap<>();
map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
map.put(
Expand All @@ -59,9 +59,8 @@ public Source getDefaultSource(FeastProperties feastProperties) {
NewTopic newTopic =
new NewTopic(
topicName,
Integer.valueOf(streamProperties.getOptions().getOrDefault("numPartitions", "1")),
Short.valueOf(
streamProperties.getOptions().getOrDefault("replicationFactor", "1")));
streamProperties.getOptions().getPartitions(),
streamProperties.getOptions().getReplicationFactor());
CreateTopicsResult createTopicsResult =
client.createTopics(Collections.singleton(newTopic));
try {
Expand Down
70 changes: 10 additions & 60 deletions core/src/main/java/feast/core/config/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,11 @@
*/
package feast.core.config;

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.DataflowScopes;
import com.google.common.base.Strings;
import feast.core.config.FeastProperties.JobProperties;
import feast.core.config.FeastProperties.JobUpdatesProperties;
import feast.core.job.JobManager;
import feast.core.job.Runner;
import feast.core.job.dataflow.DataflowJobManager;
import feast.core.job.direct.DirectJobRegistry;
import feast.core.job.direct.DirectRunnerJobManager;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -44,65 +33,26 @@
public class JobConfig {

/**
* Get a JobManager according to the runner type and dataflow configuration.
* Get a JobManager according to the runner type and Dataflow configuration.
*
* @param feastProperties feast config properties
*/
@Bean
@Autowired
public JobManager getJobManager(
FeastProperties feastProperties, DirectJobRegistry directJobRegistry) {
public JobManager getJobManager(FeastProperties feastProperties) {

JobProperties jobProperties = feastProperties.getJobs();
Runner runner = Runner.fromString(jobProperties.getRunner());
if (jobProperties.getOptions() == null) {
jobProperties.setOptions(new HashMap<>());
}
Map<String, String> jobOptions = jobProperties.getOptions();
switch (runner) {
case DATAFLOW:
if (Strings.isNullOrEmpty(jobOptions.getOrDefault("region", null))
|| Strings.isNullOrEmpty(jobOptions.getOrDefault("project", null))) {
log.error("Project and location of the Dataflow runner is not configured");
throw new IllegalStateException(
"Project and location of Dataflow runner must be specified for jobs to be run on Dataflow runner.");
}
try {
GoogleCredential credential =
GoogleCredential.getApplicationDefault().createScoped(DataflowScopes.all());
Dataflow dataflow =
new Dataflow(
GoogleNetHttpTransport.newTrustedTransport(),
JacksonFactory.getDefaultInstance(),
credential);
FeastProperties.JobProperties.Runner runner = jobProperties.getActiveRunner();
Map<String, String> runnerConfigOptions = runner.getOptions();
FeastProperties.MetricsProperties metrics = jobProperties.getMetrics();

return new DataflowJobManager(
dataflow, jobProperties.getOptions(), jobProperties.getMetrics());
} catch (IOException e) {
throw new IllegalStateException(
"Unable to find credential required for Dataflow monitoring API", e);
} catch (GeneralSecurityException e) {
throw new IllegalStateException("Security exception while connecting to Dataflow API", e);
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize DataflowJobManager", e);
}
switch (runner.getType()) {
case DATAFLOW:
return new DataflowJobManager(runnerConfigOptions, metrics);
case DIRECT:
return new DirectRunnerJobManager(
jobProperties.getOptions(), directJobRegistry, jobProperties.getMetrics());
return new DirectRunnerJobManager(runnerConfigOptions, new DirectJobRegistry(), metrics);
default:
throw new IllegalArgumentException("Unsupported runner: " + jobProperties.getRunner());
throw new IllegalArgumentException("Unsupported runner: " + runner);
}
}

/** Get a direct job registry */
@Bean
public DirectJobRegistry directJobRegistry() {
return new DirectJobRegistry();
}

/** Extracts job update options from feast core options. */
@Bean
public JobUpdatesProperties jobUpdatesProperties(FeastProperties feastProperties) {
return feastProperties.getJobs().getUpdates();
}
}
Loading