Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix create_dataset #208

Merged
merged 14 commits into from
Jun 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .prow/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,38 @@ presubmits:
decorate: true
always_run: true
spec:
volumes:
- name: service-account
secret:
secretName: prow-service-account
containers:
- image: maven:3.6-jdk-8
volumeMounts:
- name: service-account
mountPath: /etc/service-account
readOnly: true
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /etc/service-account/service-account.json
command: [".prow/scripts/run_unit_test.sh", "--component", "core"]

- name: unit-test-ingestion
decorate: true
always_run: true
spec:
volumes:
- name: service-account
secret:
secretName: prow-service-account
containers:
- image: maven:3.6-jdk-8
volumeMounts:
- name: service-account
mountPath: /etc/service-account
readOnly: true
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /etc/service-account/service-account.json
command: [".prow/scripts/run_unit_test.sh", "--component", "ingestion"]

- name: unit-test-serving
Expand Down
14 changes: 6 additions & 8 deletions core/src/main/java/feast/core/config/TrainingConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
import com.hubspot.jinjava.Jinjava;
import feast.core.config.StorageConfig.StorageSpecs;
import feast.core.dao.FeatureInfoRepository;
import feast.core.training.BigQueryTraningDatasetCreator;
import feast.core.training.BigQueryDatasetTemplater;
import feast.core.training.BigQueryTraningDatasetCreator;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Clock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -31,18 +30,17 @@ public BigQueryDatasetTemplater getBigQueryTrainingDatasetTemplater(
Resource resource = new ClassPathResource("templates/bq_training.tmpl");
InputStream resourceInputStream = resource.getInputStream();
String tmpl = CharStreams.toString(new InputStreamReader(resourceInputStream, Charsets.UTF_8));
return new BigQueryDatasetTemplater(new Jinjava(), tmpl, storageSpecs.getWarehouseStorageSpec(),
featureInfoRepository);
return new BigQueryDatasetTemplater(
new Jinjava(), tmpl, storageSpecs.getWarehouseStorageSpec(), featureInfoRepository);
}

@Bean
public BigQueryTraningDatasetCreator getBigQueryTrainingDatasetCreator(
BigQueryDatasetTemplater templater, StorageSpecs storageSpecs,
BigQueryDatasetTemplater templater,
StorageSpecs storageSpecs,
@Value("${feast.core.projectId}") String projectId,
@Value("${feast.core.datasetPrefix}") String datasetPrefix) {
BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();
Clock clock = Clock.systemUTC();
return new BigQueryTraningDatasetCreator(templater, clock,
projectId, datasetPrefix);
return new BigQueryTraningDatasetCreator(templater, projectId, datasetPrefix);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,54 +21,59 @@
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.JobException;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
import com.google.common.base.Strings;
import com.google.protobuf.Timestamp;
import feast.core.DatasetServiceProto.DatasetInfo;
import feast.core.DatasetServiceProto.FeatureSet;
import feast.core.exception.TrainingDatasetCreationException;
import java.time.Clock;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class BigQueryTraningDatasetCreator {

private final BigQueryDatasetTemplater templater;
private final DateTimeFormatter formatter;
private final Clock clock;
private final String projectId;
private final String datasetPrefix;
private transient BigQuery bigQuery;


public BigQueryTraningDatasetCreator(
BigQueryDatasetTemplater templater,
Clock clock,
String projectId,
String datasetPrefix) {
this(templater, clock, projectId, datasetPrefix,
this(templater, projectId, datasetPrefix,
BigQueryOptions.newBuilder().setProjectId(projectId).build().getService());
}

public BigQueryTraningDatasetCreator(
BigQueryDatasetTemplater templater,
Clock clock,
String projectId,
String datasetPrefix,
BigQuery bigQuery) {
this.templater = templater;
this.clock = clock;
this.formatter = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneId.of("UTC"));
this.projectId = projectId;
this.datasetPrefix = datasetPrefix;
this.bigQuery = bigQuery;
}

/**
* Create dataset for a feature set
* Create a training dataset for a feature set for features created between startDate (inclusive)
* and endDate (inclusive)
*
* @param featureSet feature set for which the training dataset should be created
* @param startDate starting date of the training dataset (inclusive)
Expand All @@ -85,58 +90,92 @@ public DatasetInfo createDataset(
String namePrefix) {
try {
String query = templater.createQuery(featureSet, startDate, endDate, limit);
String tableName = createBqTableName(startDate, endDate, namePrefix);
String bqDatasetName = createBqDatasetName(featureSet.getEntityName());

createBqDatasetIfMissing(bqDatasetName);

TableId destinationTable =
TableId.of(projectId, createBqDatasetName(featureSet.getEntityName()), tableName);
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(query)
.setAllowLargeResults(true)
.setDestinationTable(destinationTable)
.build();
JobOption jobOption = JobOption.fields();
bigQuery.query(queryConfig, jobOption);
String tableName =
createBqTableName(datasetPrefix, featureSet, startDate, endDate, namePrefix);
String tableDescription = createBqTableDescription(featureSet, startDate, endDate, query);

Map<String, String> options = templater.getStorageSpec().getOptionsMap();
String bq_dataset = options.get("dataset");
TableId destinationTableId = TableId.of(projectId, bq_dataset, tableName);

// Create the BigQuery table that will store the training dataset if not exists
if (bigQuery.getTable(destinationTableId) == null) {
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(query)
.setAllowLargeResults(true)
.setDestinationTable(destinationTableId)
.build();
JobOption jobOption = JobOption.fields();
TableResult res = bigQuery.query(queryConfig, jobOption);
if (res != null) {
Table destinationTable = bigQuery.getTable(destinationTableId);
TableInfo tableInfo =
destinationTable.toBuilder().setDescription(tableDescription).build();
bigQuery.update(tableInfo);
}
}

return DatasetInfo.newBuilder()
.setName(createTrainingDatasetName(namePrefix, featureSet.getEntityName(), tableName))
.setTableUrl(toTableUrl(destinationTable))
.setName(tableName)
.setTableUrl(toTableUrl(destinationTableId))
.build();
} catch (JobException e) {
log.error("Failed creating training dataset", e);
throw new TrainingDatasetCreationException("Failed creating training dataset", e);
} catch (InterruptedException e) {
log.error("Training dataset creation was interrupted", e);
throw new TrainingDatasetCreationException("Training dataset creation was interrupted", e);
throw new TrainingDatasetCreationException("Training dataset creation was interrupted",
e);
}
}

private void createBqDatasetIfMissing(String bqDatasetName) {
if (bigQuery.getDataset(bqDatasetName) != null) {
return;
}
private String createBqTableName(
String datasetPrefix,
FeatureSet featureSet,
Timestamp startDate,
Timestamp endDate,
String namePrefix) {

// create dataset
bigQuery.create(com.google.cloud.bigquery.DatasetInfo.of(bqDatasetName));
}
List<String> features = new ArrayList(featureSet.getFeatureIdsList());
Collections.sort(features);

String datasetId = String.format("%s_%s_%s", features, startDate, endDate);
StringBuilder hashText;

// create hash from datasetId
try {
MessageDigest md = MessageDigest.getInstance("SHA-1");
byte[] messageDigest = md.digest(datasetId.getBytes());
BigInteger no = new BigInteger(1, messageDigest);
hashText = new StringBuilder(no.toString(16));
while (hashText.length() < 32) {
hashText.insert(0, "0");
}
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}

private String createBqTableName(Timestamp startDate, Timestamp endDate, String namePrefix) {
String currentTime = String.valueOf(clock.millis());
if (!Strings.isNullOrEmpty(namePrefix)) {
// only alphanumeric and underscore are allowed
namePrefix = namePrefix.replaceAll("[^a-zA-Z0-9_]", "_");
return String.format(
"%s_%s_%s_%s",
namePrefix, currentTime, formatTimestamp(startDate), formatTimestamp(endDate));
"%s_%s_%s_%s", datasetPrefix, featureSet.getEntityName(), namePrefix,
hashText.toString());
}

return String.format(
"%s_%s_%s", currentTime, formatTimestamp(startDate), formatTimestamp(endDate));
"%s_%s_%s", datasetPrefix, featureSet.getEntityName(), hashText.toString());
}

private String createBqDatasetName(String entity) {
return String.format("%s_%s", datasetPrefix, entity);
private String createBqTableDescription(
FeatureSet featureSet, Timestamp startDate, Timestamp endDate, String query) {
return String.format(
"Feast Dataset for %s features.\nContains data from %s to %s.\n Last edited at %s.\n\n-----\n\n%s",
featureSet.getEntityName(),
formatTimestamp(startDate),
formatTimestamp(endDate),
Instant.now(),
query);
}

private String formatTimestamp(Timestamp timestamp) {
Expand All @@ -148,11 +187,4 @@ private String toTableUrl(TableId tableId) {
return String.format(
"%s.%s.%s", tableId.getProject(), tableId.getDataset(), tableId.getTable());
}

private String createTrainingDatasetName(String namePrefix, String entityName, String tableName) {
if (!Strings.isNullOrEmpty(namePrefix)) {
return tableName;
}
return String.format("%s_%s", entityName, tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,35 @@
*/
package feast.core.training;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.cloud.bigquery.BigQuery;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import feast.core.DatasetServiceProto.DatasetInfo;
import feast.core.DatasetServiceProto.FeatureSet;
import feast.core.storage.BigQueryStorageManager;
import feast.specs.StorageSpecProto.StorageSpec;
import java.time.Clock;
import java.time.Instant;
import java.util.Arrays;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

// TODO: Should consider testing with "actual" BigQuery vs mocking it
// because the mocked BigQuery client is very basic and may miss important functionalities
// such as an actual table / dataset is actually created
// In the test method, should probably add a condition so that tests can be skipped if
// the user running the tests do not have permission to manage BigQuery (although ideally they should have)
// Example of adding the condition whether or not to accept the test result as valid:
// https://stackoverflow.com/questions/1689242/conditionally-ignoring-tests-in-junit-4

public class BigQueryTraningDatasetCreatorTest {

public static final String projectId = "the-project";
Expand All @@ -48,26 +55,23 @@ public class BigQueryTraningDatasetCreatorTest {
private BigQueryDatasetTemplater templater;
@Mock
private BigQuery bq;
@Mock
private Clock clock;

@Before
public void setUp() throws Exception {
public void setUp() {
MockitoAnnotations.initMocks(this);
when(templater.getStorageSpec()).thenReturn(StorageSpec.newBuilder()
.setId("BIGQUERY1")
.setType(BigQueryStorageManager.TYPE)
.putOptions("project", "project")
.putOptions("dataset", "dataset")
.build());
creator = new BigQueryTraningDatasetCreator(templater, clock, projectId, datasetPrefix, bq);
creator = new BigQueryTraningDatasetCreator(templater, projectId, datasetPrefix, bq);

when(templater.createQuery(
any(FeatureSet.class), any(Timestamp.class), any(Timestamp.class), anyLong()))
.thenReturn("SELECT * FROM `project.dataset.table`");
}


@Test
public void shouldCreateCorrectDatasetIfPrefixNotSpecified() {
String entityName = "myentity";
Expand All @@ -85,14 +89,15 @@ public void shouldCreateCorrectDatasetIfPrefixNotSpecified() {
long limit = 999;
String namePrefix = "";

DatasetInfo dsInfo =
creator.createDataset(featureSet, startDate, endDate, limit, namePrefix);
assertThat(dsInfo.getName(), equalTo("myentity_0_20180101_20190101"));
DatasetInfo dsInfo = creator.createDataset(featureSet, startDate, endDate, limit, namePrefix);
assertThat(
dsInfo.getName(), equalTo("feast_myentity_b0009f0f7df634ddc130571319e0deb9742eb1da"));
assertThat(
dsInfo.getTableUrl(),
equalTo(
String.format(
"%s.%s_%s.%s", projectId, datasetPrefix, entityName, "0_20180101_20190101")));
"%s.dataset.%s_%s_%s",
projectId, datasetPrefix, entityName, "b0009f0f7df634ddc130571319e0deb9742eb1da")));
}

@Test
Expand All @@ -112,15 +117,20 @@ public void shouldCreateCorrectDatasetIfPrefixIsSpecified() {
long limit = 999;
String namePrefix = "mydataset";

DatasetInfo dsInfo =
creator.createDataset(featureSet, startDate, endDate, limit, namePrefix);
DatasetInfo dsInfo = creator.createDataset(featureSet, startDate, endDate, limit, namePrefix);
assertThat(
dsInfo.getTableUrl(),
equalTo(
String.format(
"%s.%s_%s.%s", projectId, datasetPrefix, entityName,
"mydataset_0_20180101_20190101")));
assertThat(dsInfo.getName(), equalTo("mydataset_0_20180101_20190101"));
"%s.dataset.%s_%s_%s_%s",
projectId,
datasetPrefix,
entityName,
namePrefix,
"b0009f0f7df634ddc130571319e0deb9742eb1da")));
assertThat(
dsInfo.getName(),
equalTo("feast_myentity_mydataset_b0009f0f7df634ddc130571319e0deb9742eb1da"));
}

@Test
Expand Down
Loading