diff --git a/.prow/config.yaml b/.prow/config.yaml index 6bed56001a..fb0015e091 100644 --- a/.prow/config.yaml +++ b/.prow/config.yaml @@ -54,16 +54,38 @@ presubmits: decorate: true always_run: true spec: + volumes: + - name: service-account + secret: + secretName: prow-service-account containers: - image: maven:3.6-jdk-8 + volumeMounts: + - name: service-account + mountPath: /etc/service-account + readOnly: true + env: + - name: GOOGLE_APPLICATION_CREDENTIALS + value: /etc/service-account/service-account.json command: [".prow/scripts/run_unit_test.sh", "--component", "core"] - name: unit-test-ingestion decorate: true always_run: true spec: + volumes: + - name: service-account + secret: + secretName: prow-service-account containers: - image: maven:3.6-jdk-8 + volumeMounts: + - name: service-account + mountPath: /etc/service-account + readOnly: true + env: + - name: GOOGLE_APPLICATION_CREDENTIALS + value: /etc/service-account/service-account.json command: [".prow/scripts/run_unit_test.sh", "--component", "ingestion"] - name: unit-test-serving diff --git a/core/src/main/java/feast/core/config/TrainingConfig.java b/core/src/main/java/feast/core/config/TrainingConfig.java index 0e5ff3e3dd..6fdfe31b90 100644 --- a/core/src/main/java/feast/core/config/TrainingConfig.java +++ b/core/src/main/java/feast/core/config/TrainingConfig.java @@ -7,12 +7,11 @@ import com.hubspot.jinjava.Jinjava; import feast.core.config.StorageConfig.StorageSpecs; import feast.core.dao.FeatureInfoRepository; -import feast.core.training.BigQueryTraningDatasetCreator; import feast.core.training.BigQueryDatasetTemplater; +import feast.core.training.BigQueryTraningDatasetCreator; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.time.Clock; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -31,18 +30,17 @@ public BigQueryDatasetTemplater getBigQueryTrainingDatasetTemplater( Resource resource = new ClassPathResource("templates/bq_training.tmpl"); InputStream resourceInputStream = resource.getInputStream(); String tmpl = CharStreams.toString(new InputStreamReader(resourceInputStream, Charsets.UTF_8)); - return new BigQueryDatasetTemplater(new Jinjava(), tmpl, storageSpecs.getWarehouseStorageSpec(), - featureInfoRepository); + return new BigQueryDatasetTemplater( + new Jinjava(), tmpl, storageSpecs.getWarehouseStorageSpec(), featureInfoRepository); } @Bean public BigQueryTraningDatasetCreator getBigQueryTrainingDatasetCreator( - BigQueryDatasetTemplater templater, StorageSpecs storageSpecs, + BigQueryDatasetTemplater templater, + StorageSpecs storageSpecs, @Value("${feast.core.projectId}") String projectId, @Value("${feast.core.datasetPrefix}") String datasetPrefix) { BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService(); - Clock clock = Clock.systemUTC(); - return new BigQueryTraningDatasetCreator(templater, clock, - projectId, datasetPrefix); + return new BigQueryTraningDatasetCreator(templater, projectId, datasetPrefix); } } diff --git a/core/src/main/java/feast/core/training/BigQueryTraningDatasetCreator.java b/core/src/main/java/feast/core/training/BigQueryTraningDatasetCreator.java index 5b86928a78..5414364d40 100644 --- a/core/src/main/java/feast/core/training/BigQueryTraningDatasetCreator.java +++ b/core/src/main/java/feast/core/training/BigQueryTraningDatasetCreator.java @@ -21,16 +21,25 @@ import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.JobException; import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.TableResult; import com.google.common.base.Strings; import com.google.protobuf.Timestamp; import feast.core.DatasetServiceProto.DatasetInfo; import feast.core.DatasetServiceProto.FeatureSet; import feast.core.exception.TrainingDatasetCreationException; -import java.time.Clock; +import java.math.BigInteger; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -38,29 +47,24 @@ public class BigQueryTraningDatasetCreator { private final BigQueryDatasetTemplater templater; private final DateTimeFormatter formatter; - private final Clock clock; private final String projectId; private final String datasetPrefix; private transient BigQuery bigQuery; - public BigQueryTraningDatasetCreator( BigQueryDatasetTemplater templater, - Clock clock, String projectId, String datasetPrefix) { - this(templater, clock, projectId, datasetPrefix, + this(templater, projectId, datasetPrefix, BigQueryOptions.newBuilder().setProjectId(projectId).build().getService()); } public BigQueryTraningDatasetCreator( BigQueryDatasetTemplater templater, - Clock clock, String projectId, String datasetPrefix, BigQuery bigQuery) { this.templater = templater; - this.clock = clock; this.formatter = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneId.of("UTC")); this.projectId = projectId; this.datasetPrefix = datasetPrefix; @@ -68,7 +72,8 @@ public BigQueryTraningDatasetCreator( } /** - * Create dataset for a feature set + * Create a training dataset for a feature set for features created between startDate (inclusive) + * and endDate (inclusive) * * @param featureSet feature set for which the training dataset should be created * @param startDate starting date of the training dataset (inclusive) @@ -85,58 +90,92 @@ public DatasetInfo createDataset( String namePrefix) { try { String query = templater.createQuery(featureSet, startDate, endDate, limit); - String tableName = createBqTableName(startDate, endDate, namePrefix); - String bqDatasetName = createBqDatasetName(featureSet.getEntityName()); - - createBqDatasetIfMissing(bqDatasetName); - - TableId destinationTable = - TableId.of(projectId, createBqDatasetName(featureSet.getEntityName()), tableName); - QueryJobConfiguration queryConfig = - QueryJobConfiguration.newBuilder(query) - .setAllowLargeResults(true) - .setDestinationTable(destinationTable) - .build(); - JobOption jobOption = JobOption.fields(); - bigQuery.query(queryConfig, jobOption); + String tableName = + createBqTableName(datasetPrefix, featureSet, startDate, endDate, namePrefix); + String tableDescription = createBqTableDescription(featureSet, startDate, endDate, query); + + Map options = templater.getStorageSpec().getOptionsMap(); + String bq_dataset = options.get("dataset"); + TableId destinationTableId = TableId.of(projectId, bq_dataset, tableName); + + // Create the BigQuery table that will store the training dataset if not exists + if (bigQuery.getTable(destinationTableId) == null) { + QueryJobConfiguration queryConfig = + QueryJobConfiguration.newBuilder(query) + .setAllowLargeResults(true) + .setDestinationTable(destinationTableId) + .build(); + JobOption jobOption = JobOption.fields(); + TableResult res = bigQuery.query(queryConfig, jobOption); + if (res != null) { + Table destinationTable = bigQuery.getTable(destinationTableId); + TableInfo tableInfo = + destinationTable.toBuilder().setDescription(tableDescription).build(); + bigQuery.update(tableInfo); + } + } + return DatasetInfo.newBuilder() - .setName(createTrainingDatasetName(namePrefix, featureSet.getEntityName(), tableName)) - .setTableUrl(toTableUrl(destinationTable)) + .setName(tableName) + .setTableUrl(toTableUrl(destinationTableId)) .build(); } catch (JobException e) { log.error("Failed creating training dataset", e); throw new TrainingDatasetCreationException("Failed creating training dataset", e); } catch (InterruptedException e) { log.error("Training dataset creation was interrupted", e); - throw new TrainingDatasetCreationException("Training dataset creation was interrupted", e); + throw new TrainingDatasetCreationException("Training dataset creation was interrupted", + e); } } - private void createBqDatasetIfMissing(String bqDatasetName) { - if (bigQuery.getDataset(bqDatasetName) != null) { - return; - } + private String createBqTableName( + String datasetPrefix, + FeatureSet featureSet, + Timestamp startDate, + Timestamp endDate, + String namePrefix) { - // create dataset - bigQuery.create(com.google.cloud.bigquery.DatasetInfo.of(bqDatasetName)); - } + List features = new ArrayList(featureSet.getFeatureIdsList()); + Collections.sort(features); + + String datasetId = String.format("%s_%s_%s", features, startDate, endDate); + StringBuilder hashText; + + // create hash from datasetId + try { + MessageDigest md = MessageDigest.getInstance("SHA-1"); + byte[] messageDigest = md.digest(datasetId.getBytes()); + BigInteger no = new BigInteger(1, messageDigest); + hashText = new StringBuilder(no.toString(16)); + while (hashText.length() < 32) { + hashText.insert(0, "0"); + } + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } - private String createBqTableName(Timestamp startDate, Timestamp endDate, String namePrefix) { - String currentTime = String.valueOf(clock.millis()); if (!Strings.isNullOrEmpty(namePrefix)) { // only alphanumeric and underscore are allowed namePrefix = namePrefix.replaceAll("[^a-zA-Z0-9_]", "_"); return String.format( - "%s_%s_%s_%s", - namePrefix, currentTime, formatTimestamp(startDate), formatTimestamp(endDate)); + "%s_%s_%s_%s", datasetPrefix, featureSet.getEntityName(), namePrefix, + hashText.toString()); } return String.format( - "%s_%s_%s", currentTime, formatTimestamp(startDate), formatTimestamp(endDate)); + "%s_%s_%s", datasetPrefix, featureSet.getEntityName(), hashText.toString()); } - private String createBqDatasetName(String entity) { - return String.format("%s_%s", datasetPrefix, entity); + private String createBqTableDescription( + FeatureSet featureSet, Timestamp startDate, Timestamp endDate, String query) { + return String.format( + "Feast Dataset for %s features.\nContains data from %s to %s.\n Last edited at %s.\n\n-----\n\n%s", + featureSet.getEntityName(), + formatTimestamp(startDate), + formatTimestamp(endDate), + Instant.now(), + query); } private String formatTimestamp(Timestamp timestamp) { @@ -148,11 +187,4 @@ private String toTableUrl(TableId tableId) { return String.format( "%s.%s.%s", tableId.getProject(), tableId.getDataset(), tableId.getTable()); } - - private String createTrainingDatasetName(String namePrefix, String entityName, String tableName) { - if (!Strings.isNullOrEmpty(namePrefix)) { - return tableName; - } - return String.format("%s_%s", entityName, tableName); - } } diff --git a/core/src/test/java/feast/core/training/BigQueryTraningDatasetCreatorTest.java b/core/src/test/java/feast/core/training/BigQueryTraningDatasetCreatorTest.java index b28835c39e..fff75eefae 100644 --- a/core/src/test/java/feast/core/training/BigQueryTraningDatasetCreatorTest.java +++ b/core/src/test/java/feast/core/training/BigQueryTraningDatasetCreatorTest.java @@ -16,13 +16,6 @@ */ package feast.core.training; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import com.google.cloud.bigquery.BigQuery; import com.google.protobuf.Timestamp; import com.google.protobuf.util.Timestamps; @@ -30,7 +23,6 @@ import feast.core.DatasetServiceProto.FeatureSet; import feast.core.storage.BigQueryStorageManager; import feast.specs.StorageSpecProto.StorageSpec; -import java.time.Clock; import java.time.Instant; import java.util.Arrays; import org.junit.Before; @@ -38,6 +30,21 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +// TODO: Should consider testing with "actual" BigQuery vs mocking it +// because the mocked BigQuery client is very basic and may miss important functionalities +// such as an actual table / dataset is actually created +// In the test method, should probably add a condition so that tests can be skipped if +// the user running the tests do not have permission to manage BigQuery (although ideally they should have) +// Example of adding the condition whether or not to accept the test result as valid: +// https://stackoverflow.com/questions/1689242/conditionally-ignoring-tests-in-junit-4 + public class BigQueryTraningDatasetCreatorTest { public static final String projectId = "the-project"; @@ -48,11 +55,9 @@ public class BigQueryTraningDatasetCreatorTest { private BigQueryDatasetTemplater templater; @Mock private BigQuery bq; - @Mock - private Clock clock; @Before - public void setUp() throws Exception { + public void setUp() { MockitoAnnotations.initMocks(this); when(templater.getStorageSpec()).thenReturn(StorageSpec.newBuilder() .setId("BIGQUERY1") @@ -60,14 +65,13 @@ public void setUp() throws Exception { .putOptions("project", "project") .putOptions("dataset", "dataset") .build()); - creator = new BigQueryTraningDatasetCreator(templater, clock, projectId, datasetPrefix, bq); + creator = new BigQueryTraningDatasetCreator(templater, projectId, datasetPrefix, bq); when(templater.createQuery( any(FeatureSet.class), any(Timestamp.class), any(Timestamp.class), anyLong())) .thenReturn("SELECT * FROM `project.dataset.table`"); } - @Test public void shouldCreateCorrectDatasetIfPrefixNotSpecified() { String entityName = "myentity"; @@ -85,14 +89,15 @@ public void shouldCreateCorrectDatasetIfPrefixNotSpecified() { long limit = 999; String namePrefix = ""; - DatasetInfo dsInfo = - creator.createDataset(featureSet, startDate, endDate, limit, namePrefix); - assertThat(dsInfo.getName(), equalTo("myentity_0_20180101_20190101")); + DatasetInfo dsInfo = creator.createDataset(featureSet, startDate, endDate, limit, namePrefix); + assertThat( + dsInfo.getName(), equalTo("feast_myentity_b0009f0f7df634ddc130571319e0deb9742eb1da")); assertThat( dsInfo.getTableUrl(), equalTo( String.format( - "%s.%s_%s.%s", projectId, datasetPrefix, entityName, "0_20180101_20190101"))); + "%s.dataset.%s_%s_%s", + projectId, datasetPrefix, entityName, "b0009f0f7df634ddc130571319e0deb9742eb1da"))); } @Test @@ -112,15 +117,20 @@ public void shouldCreateCorrectDatasetIfPrefixIsSpecified() { long limit = 999; String namePrefix = "mydataset"; - DatasetInfo dsInfo = - creator.createDataset(featureSet, startDate, endDate, limit, namePrefix); + DatasetInfo dsInfo = creator.createDataset(featureSet, startDate, endDate, limit, namePrefix); assertThat( dsInfo.getTableUrl(), equalTo( String.format( - "%s.%s_%s.%s", projectId, datasetPrefix, entityName, - "mydataset_0_20180101_20190101"))); - assertThat(dsInfo.getName(), equalTo("mydataset_0_20180101_20190101")); + "%s.dataset.%s_%s_%s_%s", + projectId, + datasetPrefix, + entityName, + namePrefix, + "b0009f0f7df634ddc130571319e0deb9742eb1da"))); + assertThat( + dsInfo.getName(), + equalTo("feast_myentity_mydataset_b0009f0f7df634ddc130571319e0deb9742eb1da")); } @Test diff --git a/sdk/python/examples/quickstart/Quickstart.ipynb b/sdk/python/examples/quickstart/Quickstart.ipynb index 6023d78a19..e915983ac5 100644 --- a/sdk/python/examples/quickstart/Quickstart.ipynb +++ b/sdk/python/examples/quickstart/Quickstart.ipynb @@ -2,15 +2,16 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": { - "collapsed": true + "pycharm": { + "is_executing": false + } }, "outputs": [], "source": [ "import pandas as pd\n", "import numpy as np\n", - "\n", "from feast.sdk.resources.entity import Entity\n", "from feast.sdk.resources.storage import Storage\n", "from feast.sdk.resources.feature import Feature, Datastore, ValueType\n", @@ -18,7 +19,9 @@ "import feast.specs.FeatureSpec_pb2 as feature_pb\n", "\n", "from feast.sdk.importer import Importer\n", - "from feast.sdk.client import Client" + "from feast.sdk.client import Client\n", + "import warnings\n", + "warnings.filterwarnings(\"ignore\", \"Your application has authenticated using end user credentials\")" ] }, { @@ -30,11 +33,168 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": { - "collapsed": true + "pycharm": { + "is_executing": false + } }, - "outputs": [], + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
ridepickup_datetimelog_trip_durationdistance_haversinedistance_dummy_manhattandirectionmonthday_of_monthhourday_of_weekvi_1vi_2sf_nsf_y
0id28754212016-03-14 17:24:556.1224931.4985211.73543399.9701963141700110
1id23773942016-06-12 00:43:356.4982821.8055072.430506-117.153768612061010
2id38585292016-01-19 11:35:247.6615276.3850988.203575-159.6801651191110110
3id35046732016-04-06 19:32:316.0637851.4854981.661331-172.737700461920110
4id21810282016-03-26 13:30:556.0776421.1885881.199457179.4735853261350110
\n", + "
" + ], + "text/plain": [ + " ride pickup_datetime log_trip_duration distance_haversine \\\n", + "0 id2875421 2016-03-14 17:24:55 6.122493 1.498521 \n", + "1 id2377394 2016-06-12 00:43:35 6.498282 1.805507 \n", + "2 id3858529 2016-01-19 11:35:24 7.661527 6.385098 \n", + "3 id3504673 2016-04-06 19:32:31 6.063785 1.485498 \n", + "4 id2181028 2016-03-26 13:30:55 6.077642 1.188588 \n", + "\n", + " distance_dummy_manhattan direction month day_of_month hour \\\n", + "0 1.735433 99.970196 3 14 17 \n", + "1 2.430506 -117.153768 6 12 0 \n", + "2 8.203575 -159.680165 1 19 11 \n", + "3 1.661331 -172.737700 4 6 19 \n", + "4 1.199457 179.473585 3 26 13 \n", + "\n", + " day_of_week vi_1 vi_2 sf_n sf_y \n", + "0 0 0 1 1 0 \n", + "1 6 1 0 1 0 \n", + "2 1 0 1 1 0 \n", + "3 2 0 1 1 0 \n", + "4 5 0 1 1 0 " + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "# Feature engineering steps \n", "## Referenced from https://www.kaggle.com/karelrv/nyct-from-a-to-z-with-xgboost-tutorial/notebook\n", @@ -61,7 +221,7 @@ " x = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(lng_delta_rad)\n", " return np.degrees(np.arctan2(y, x))\n", "\n", - "df = pd.read_csv('taxi_small.csv')\n", + "df = pd.read_csv('~/Workspace/feast/sdk/python/examples/quickstart/taxi_small.csv')\n", "df['pickup_datetime'] = pd.to_datetime(df.pickup_datetime)\n", "df['dropoff_datetime'] = pd.to_datetime(df.dropoff_datetime)\n", "df['log_trip_duration'] = np.log(df['trip_duration'].values + 1)\n", @@ -98,25 +258,206 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, + "execution_count": 3, + "metadata": {}, "outputs": [], "source": [ - "FEAST_CORE_URL = 'localhost:6565'\n", - "FEAST_SERVING_URL = 'localhost:6566'\n", - "STAGING_LOCATION = 'gs://feast-bucket/staging'" + "FEAST_CORE_URL = 'localhost:8433'\n", + "FEAST_SERVING_URL = 'feast-serving.sandbox.s.ds.golabs.io:8433'\n", + "STAGING_LOCATION = 'gs://zzz-bubub/'" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "metadata": { - "collapsed": true, "scrolled": true }, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Successfully applied entity with name: ride\n", + "---\n", + "name: ride\n", + "description: nyc taxi dataset\n", + "\n", + "Successfully applied feature with id: ride.log_trip_duration\n", + "---\n", + "id: ride.log_trip_duration\n", + "name: log_trip_duration\n", + "owner: user@website.com\n", + "description: nyc taxi dataset\n", + "valueType: DOUBLE\n", + "entity: ride\n", + "dataStores: {}\n", + "\n", + "Successfully applied feature with id: ride.distance_haversine\n", + "---\n", + "id: ride.distance_haversine\n", + "name: distance_haversine\n", + "owner: user@website.com\n", + "description: nyc taxi dataset\n", + "valueType: DOUBLE\n", + "entity: ride\n", + "dataStores: {}\n", + "\n", + "Successfully applied feature with id: ride.distance_dummy_manhattan\n", + "---\n", + "id: ride.distance_dummy_manhattan\n", + "name: distance_dummy_manhattan\n", + "owner: user@website.com\n", + "description: nyc taxi dataset\n", + "valueType: DOUBLE\n", + "entity: ride\n", + "dataStores: {}\n", + "\n", + "Successfully applied feature with id: ride.direction\n", + "---\n", + "id: ride.direction\n", + "name: direction\n", + "owner: user@website.com\n", + "description: nyc taxi dataset\n", + "valueType: DOUBLE\n", + "entity: ride\n", + "dataStores: {}\n", + "\n", + "Successfully applied feature with id: ride.month\n", + "---\n", + "id: ride.month\n", + "name: month\n", + "owner: user@website.com\n", + "description: nyc taxi dataset\n", + "valueType: INT64\n", + "entity: ride\n", + "dataStores: {}\n", + "\n", + "Successfully applied feature with id: ride.day_of_month\n", + "---\n", + "id: ride.day_of_month\n", + "name: day_of_month\n", + "owner: user@website.com\n", + "description: nyc taxi dataset\n", + "valueType: INT64\n", + "entity: ride\n", + "dataStores: {}\n", + "\n", + "Successfully applied feature with id: ride.hour\n", + "---\n", + "id: ride.hour\n", + "name: hour\n", + "owner: user@website.com\n", + "description: nyc taxi dataset\n", + "valueType: INT64\n", + "entity: ride\n", + "dataStores: {}\n", + "\n", + "Successfully applied feature with id: ride.day_of_week\n", + "---\n", + "id: ride.day_of_week\n", + "name: day_of_week\n", + "owner: user@website.com\n", + "description: nyc taxi dataset\n", + "valueType: INT64\n", + "entity: ride\n", + "dataStores: {}\n", + "\n", + "Successfully applied feature with id: ride.vi_1\n", + "---\n", + "id: ride.vi_1\n", + "name: vi_1\n", + "owner: user@website.com\n", + "description: nyc taxi dataset\n", + "valueType: INT32\n", + "entity: ride\n", + "dataStores: {}\n", + "\n", + "Successfully applied feature with id: ride.vi_2\n", + "---\n", + "id: ride.vi_2\n", + "name: vi_2\n", + "owner: user@website.com\n", + "description: nyc taxi dataset\n", + "valueType: INT32\n", + "entity: ride\n", + "dataStores: {}\n", + "\n", + "Successfully applied feature with id: ride.sf_n\n", + "---\n", + "id: ride.sf_n\n", + "name: sf_n\n", + "owner: user@website.com\n", + "description: nyc taxi dataset\n", + "valueType: INT32\n", + "entity: ride\n", + "dataStores: {}\n", + "\n", + "Successfully applied feature with id: ride.sf_y\n", + "---\n", + "id: ride.sf_y\n", + "name: sf_y\n", + "owner: user@website.com\n", + "description: nyc taxi dataset\n", + "valueType: INT32\n", + "entity: ride\n", + "dataStores: {}\n", + "\n", + "Staging file to remote path gs://zzz-bubub//tmp_ride_1559191176607.csv\n", + "Submitting job with spec:\n", + " type: file.csv\n", + "sourceOptions:\n", + " path: gs://zzz-bubub//tmp_ride_1559191176607.csv\n", + "entities:\n", + "- ride\n", + "schema:\n", + " entityIdColumn: ride\n", + " fields:\n", + " - name: ride\n", + " - name: pickup_datetime\n", + " - featureId: ride.log_trip_duration\n", + " name: log_trip_duration\n", + " - featureId: ride.distance_haversine\n", + " name: distance_haversine\n", + " - featureId: ride.distance_dummy_manhattan\n", + " name: distance_dummy_manhattan\n", + " - featureId: ride.direction\n", + " name: direction\n", + " - featureId: ride.month\n", + " name: month\n", + " - featureId: ride.day_of_month\n", + " name: day_of_month\n", + " - featureId: ride.hour\n", + " name: hour\n", + " - featureId: ride.day_of_week\n", + " name: day_of_week\n", + " - featureId: ride.vi_1\n", + " name: vi_1\n", + " - featureId: ride.vi_2\n", + " name: vi_2\n", + " - featureId: ride.sf_n\n", + " name: sf_n\n", + " - featureId: ride.sf_y\n", + " name: sf_y\n", + " timestampColumn: pickup_datetime\n", + "\n" + ] + }, + { + "ename": "_Rendezvous", + "evalue": "<_Rendezvous of RPC that terminated with:\n\tstatus = StatusCode.INTERNAL\n\tdetails = \"Error running ingestion job: feast.core.exception.JobExecutionException: Error running ingestion job: feast.core.exception.JobExecutionException: Error running ingestion job: java.lang.RuntimeException: Could not submit job: \nOptional[Error: Unable to access jarfile feast-ingestion.jar]\"\n\tdebug_error_string = \"{\"created\":\"@1559191180.478136000\",\"description\":\"Error received from peer ipv6:[::1]:8433\",\"file\":\"src/core/lib/surface/call.cc\",\"file_line\":1041,\"grpc_message\":\"Error running ingestion job: feast.core.exception.JobExecutionException: Error running ingestion job: feast.core.exception.JobExecutionException: Error running ingestion job: java.lang.RuntimeException: Could not submit job: \\nOptional[Error: Unable to access jarfile feast-ingestion.jar]\",\"grpc_status\":13}\"\n>", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31m_Rendezvous\u001b[0m Traceback (most recent call last)", + "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 19\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 20\u001b[0m \u001b[0;31m# Ingest the feature data into the store\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 21\u001b[0;31m \u001b[0mfs\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrun\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mimporter\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mapply_features\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mTrue\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mapply_entity\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mTrue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m", + "\u001b[0;32m~/Workspace/feast/sdk/python/feast/sdk/client.py\u001b[0m in \u001b[0;36mrun\u001b[0;34m(self, importer, name_override, apply_entity, apply_features)\u001b[0m\n\u001b[1;32m 165\u001b[0m \u001b[0mprint\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"Submitting job with spec:\\n {}\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mformat\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mspec_to_yaml\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mimporter\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mspec\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 166\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_connect_core\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 167\u001b[0;31m \u001b[0mresponse\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_job_service_stub\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mSubmitJob\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mrequest\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 168\u001b[0m \u001b[0mprint\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"Submitted job with id: {}\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mformat\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mresponse\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjobId\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 169\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mresponse\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjobId\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/.pyenv/versions/3.6.2/Python.framework/Versions/3.6/lib/python3.6/site-packages/grpc/_channel.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, request, timeout, metadata, credentials, wait_for_ready)\u001b[0m\n\u001b[1;32m 560\u001b[0m state, call, = self._blocking(request, timeout, metadata, credentials,\n\u001b[1;32m 561\u001b[0m wait_for_ready)\n\u001b[0;32m--> 562\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0m_end_unary_response_blocking\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mstate\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcall\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;32mFalse\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 563\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 564\u001b[0m def with_call(self,\n", + "\u001b[0;32m~/.pyenv/versions/3.6.2/Python.framework/Versions/3.6/lib/python3.6/site-packages/grpc/_channel.py\u001b[0m in \u001b[0;36m_end_unary_response_blocking\u001b[0;34m(state, call, with_call, deadline)\u001b[0m\n\u001b[1;32m 464\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mstate\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mresponse\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 465\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 466\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0m_Rendezvous\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mstate\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdeadline\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 467\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 468\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;31m_Rendezvous\u001b[0m: <_Rendezvous of RPC that terminated with:\n\tstatus = StatusCode.INTERNAL\n\tdetails = \"Error running ingestion job: feast.core.exception.JobExecutionException: Error running ingestion job: feast.core.exception.JobExecutionException: Error running ingestion job: java.lang.RuntimeException: Could not submit job: \nOptional[Error: Unable to access jarfile feast-ingestion.jar]\"\n\tdebug_error_string = \"{\"created\":\"@1559191180.478136000\",\"description\":\"Error received from peer ipv6:[::1]:8433\",\"file\":\"src/core/lib/surface/call.cc\",\"file_line\":1041,\"grpc_message\":\"Error running ingestion job: feast.core.exception.JobExecutionException: Error running ingestion job: feast.core.exception.JobExecutionException: Error running ingestion job: java.lang.RuntimeException: Could not submit job: \\nOptional[Error: Unable to access jarfile feast-ingestion.jar]\",\"grpc_status\":13}\"\n>" + ] + } + ], "source": [ "# Now that we have finished creating our features, we ingest them into feast\n", "\n", @@ -152,11 +493,33 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 8, "metadata": { - "collapsed": true + "scrolled": true }, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "creating training dataset for features: ['ride.log_trip_duration', 'ride.distance_haversine', 'ride.distance_dummy_manhattan', 'ride.month', 'ride.direction', 'ride.day_of_month', 'ride.hour', 'ride.day_of_week', 'ride.vi_1', 'ride.vi_2', 'ride.sf_n', 'ride.sf_y']\n" + ] + }, + { + "ename": "ValueError", + "evalue": "Core API URL not set. Either set the environment variable FEAST_CORE_URL or set it explicitly.", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mValueError\u001b[0m Traceback (most recent call last)", + "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 14\u001b[0m \u001b[0;34m\"ride.sf_n\"\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 15\u001b[0m \"ride.sf_y\"])\n\u001b[0;32m---> 16\u001b[0;31m \u001b[0mdataset_info\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfs\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcreate_dataset\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfeature_set\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"2016-06-01\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"2016-08-01\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 17\u001b[0m \u001b[0mdataset\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfs\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mdownload_dataset_to_df\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdataset_info\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstaging_location\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mSTAGING_LOCATION\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 18\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/Workspace/feast/sdk/python/feast/sdk/client.py\u001b[0m in \u001b[0;36mcreate_dataset\u001b[0;34m(self, feature_set, start_date, end_date, limit, name_prefix)\u001b[0m\n\u001b[1;32m 205\u001b[0m \u001b[0;34m\"creating training dataset for features: \"\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0mstr\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfeature_set\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mfeatures\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 206\u001b[0m )\n\u001b[0;32m--> 207\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_connect_core\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 208\u001b[0m \u001b[0mresp\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_dataset_service_stub\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mCreateDataset\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mreq\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 209\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/Workspace/feast/sdk/python/feast/sdk/client.py\u001b[0m in \u001b[0;36m_connect_core\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 298\u001b[0m \u001b[0;34m\"\"\"Connect to core api\"\"\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 299\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__core_channel\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 300\u001b[0;31m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__core_channel\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mgrpc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0minsecure_channel\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcore_url\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 301\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_core_service_stub\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mCoreServiceStub\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__core_channel\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 302\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_job_service_stub\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mJobServiceStub\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__core_channel\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", + "\u001b[0;32m~/Workspace/feast/sdk/python/feast/sdk/client.py\u001b[0m in \u001b[0;36mcore_url\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 81\u001b[0m \u001b[0;34m\"Core API URL not set. Either set the \"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 82\u001b[0m + \"environment variable {} or set it explicitly.\".format(\n\u001b[0;32m---> 83\u001b[0;31m \u001b[0mFEAST_CORE_URL_ENV_KEY\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 84\u001b[0m )\n\u001b[1;32m 85\u001b[0m )\n", + "\u001b[0;31mValueError\u001b[0m: Core API URL not set. Either set the environment variable FEAST_CORE_URL or set it explicitly." + ] + } + ], "source": [ "# Retrieving data: Training\n", "\n", @@ -164,8 +527,8 @@ " features=[\"ride.log_trip_duration\", \n", " \"ride.distance_haversine\",\n", " \"ride.distance_dummy_manhattan\",\n", - " \"ride.direction\",\n", " \"ride.month\",\n", + " \"ride.direction\",\n", " \"ride.day_of_month\",\n", " \"ride.hour\",\n", " \"ride.day_of_week\",\n", @@ -191,11 +554,105 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
rideride.log_trip_durationride.distance_haversineride.distance_dummy_manhattanride.directionride.monthride.day_of_monthride.hourride.day_of_weekride.vi_1ride.vi_2ride.sf_nride.sf_y
0id12444818.08425417.98821823.770274114.1189841151340110
1id28754216.1224931.4985211.73543399.9701963141700110
\n", + "
" + ], + "text/plain": [ + " ride ride.log_trip_duration ride.distance_haversine \\\n", + "0 id1244481 8.084254 17.988218 \n", + "1 id2875421 6.122493 1.498521 \n", + "\n", + " ride.distance_dummy_manhattan ride.direction ride.month \\\n", + "0 23.770274 114.118984 1 \n", + "1 1.735433 99.970196 3 \n", + "\n", + " ride.day_of_month ride.hour ride.day_of_week ride.vi_1 ride.vi_2 \\\n", + "0 15 13 4 0 1 \n", + "1 14 17 0 0 1 \n", + "\n", + " ride.sf_n ride.sf_y \n", + "0 1 0 \n", + "1 1 0 " + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "# Retrieving data: Serving\n", "\n", @@ -234,9 +691,9 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3", + "display_name": "PyCharm (feast-python-sdk)", "language": "python", - "name": "python3" + "name": "pycharm-e7e8e038" }, "language_info": { "codemirror_mode": { @@ -248,18 +705,18 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.6.7" + "version": "3.6.2" }, "pycharm": { "stem_cell": { "cell_type": "raw", - "source": [], "metadata": { "collapsed": false - } + }, + "source": [] } } }, "nbformat": 4, "nbformat_minor": 2 -} \ No newline at end of file +} diff --git a/sdk/python/feast/sdk/client.py b/sdk/python/feast/sdk/client.py index 1d835d37b5..1d1fe0a93d 100644 --- a/sdk/python/feast/sdk/client.py +++ b/sdk/python/feast/sdk/client.py @@ -266,7 +266,7 @@ def download_dataset( str: path to the downloaded file """ return self._table_downloader.download_table_as_file( - dataset_info.table_id, dest, staging_location, file_type + dataset_info.full_table_id, dest, staging_location, file_type ) def download_dataset_to_df(self, dataset_info, staging_location): @@ -282,7 +282,7 @@ def download_dataset_to_df(self, dataset_info, staging_location): """ return self._table_downloader.download_table_as_df( - dataset_info.table_id, staging_location + dataset_info.full_table_id, staging_location ) def close(self): diff --git a/sdk/python/feast/sdk/resources/feature_set.py b/sdk/python/feast/sdk/resources/feature_set.py index 1895f06c48..7eb2870078 100644 --- a/sdk/python/feast/sdk/resources/feature_set.py +++ b/sdk/python/feast/sdk/resources/feature_set.py @@ -66,16 +66,16 @@ class FileType(object): class DatasetInfo: - def __init__(self, name, table_id): + def __init__(self, name, full_table_id): """ Create instance of DatasetInfo with a BigQuery table as its backing store. Args: name: (str) dataset name - table_id: (str) fully qualified table id + full_table_id: (str) fully qualified table id """ self._name = name - self._table_id = table_id + self._full_table_id = full_table_id @property def name(self): @@ -87,10 +87,10 @@ def name(self): return self._name @property - def table_id(self): + def full_table_id(self): """ Returns: fully qualified table id """ - return self._table_id + return self._full_table_id diff --git a/sdk/python/feast/sdk/utils/bq_util.py b/sdk/python/feast/sdk/utils/bq_util.py index ebb838502b..ebc4f24316 100644 --- a/sdk/python/feast/sdk/utils/bq_util.py +++ b/sdk/python/feast/sdk/utils/bq_util.py @@ -197,11 +197,11 @@ def bq(self): self._bq = BQClient() return self._bq - def download_table_as_file(self, table_id, dest, staging_location, file_type): + def download_table_as_file(self, full_table_id, dest, staging_location, file_type): """ Download a bigquery table as file Args: - table_id (str): fully qualified BigQuery table id + full_table_id (str): fully qualified BigQuery table id dest (str): destination filename staging_location (str): url to staging_location (currently support a folder in GCS) @@ -218,7 +218,7 @@ def download_table_as_file(self, table_id, dest, staging_location, file_type): job_config = ExtractJobConfig() job_config.destination_format = file_type - src_table = Table.from_string(table_id) + src_table = Table.from_string(full_table_id) job = self.bq.extract_table(src_table, staging_file_path, job_config=job_config) # await completion @@ -230,11 +230,11 @@ def download_table_as_file(self, table_id, dest, staging_location, file_type): blob.download_to_filename(dest) return dest - def download_table_as_df(self, table_id, staging_location): + def download_table_as_df(self, full_table_id, staging_location): """ Download a BigQuery table as Pandas Dataframe Args: - table_id (src) : fully qualified BigQuery table id + full_table_id (src) : fully qualified BigQuery table id staging_location: url to staging_location (currently support a folder in GCS) @@ -250,7 +250,7 @@ def download_table_as_df(self, table_id, staging_location): job_config = ExtractJobConfig() job_config.destination_format = DestinationFormat.CSV job = self.bq.extract_table( - Table.from_string(table_id), staging_file_path, job_config=job_config + Table.from_string(full_table_id), staging_file_path, job_config=job_config ) # await completion diff --git a/sdk/python/tests/data/austin_bikeshare.bikeshare_stations.avro b/sdk/python/tests/data/austin_bikeshare.bikeshare_stations.avro index 0a44e3a365..db7ab34417 100644 Binary files a/sdk/python/tests/data/austin_bikeshare.bikeshare_stations.avro and b/sdk/python/tests/data/austin_bikeshare.bikeshare_stations.avro differ diff --git a/sdk/python/tests/sdk/resources/test_feature_set.py b/sdk/python/tests/sdk/resources/test_feature_set.py index 3c6c97eba2..5972ecd0d1 100644 --- a/sdk/python/tests/sdk/resources/test_feature_set.py +++ b/sdk/python/tests/sdk/resources/test_feature_set.py @@ -38,7 +38,7 @@ def test_different_entity(self): class TestDatasetInfo(object): def test_creation(self): name = "dataset_name" - table_id = "gcp-project.dataset.table_name" - dataset = DatasetInfo(name, table_id) + full_table_id = "gcp-project.dataset.table_name" + dataset = DatasetInfo(name, full_table_id) assert dataset.name == name - assert dataset.table_id == table_id + assert dataset.full_table_id == full_table_id diff --git a/sdk/python/tests/sdk/test_client.py b/sdk/python/tests/sdk/test_client.py index 84db40bb63..54f4d2790c 100644 --- a/sdk/python/tests/sdk/test_client.py +++ b/sdk/python/tests/sdk/test_client.py @@ -198,7 +198,7 @@ def test_create_dataset(self, client, mocker): ds = client.create_dataset(fs, start_date, end_date) assert "dataset_name" == ds.name - assert "project.dataset.table" == ds.table_id + assert "project.dataset.table" == ds.full_table_id mock_trn_stub.CreateDataset.assert_called_once_with( DatasetServiceTypes.CreateDatasetRequest( featureSet=fs.proto, @@ -229,7 +229,7 @@ def test_create_dataset_with_limit(self, client, mocker): ds = client.create_dataset(fs, start_date, end_date, limit=limit) assert "dataset_name" == ds.name - assert "project.dataset.table" == ds.table_id + assert "project.dataset.table" == ds.full_table_id mock_trn_stub.CreateDataset.assert_called_once_with( DatasetServiceTypes.CreateDatasetRequest( featureSet=fs.proto, @@ -263,7 +263,7 @@ def test_create_dataset_with_name_prefix(self, client, mocker): fs, start_date, end_date, limit=limit, name_prefix=name_prefix) assert "dataset_name" == ds.name - assert "project.dataset.table" == ds.table_id + assert "project.dataset.table" == ds.full_table_id mock_dssvc_stub.CreateDataset.assert_called_once_with( DatasetServiceTypes.CreateDatasetRequest( featureSet=fs.proto, @@ -427,9 +427,9 @@ def test_download_dataset_as_file(self, client, mocker): table_dlder, "download_table_as_file", return_value=destination) client._table_downloader = table_dlder - table_id = "project.dataset.table" + full_table_id = "project.dataset.table" staging_location = "gs://gcs_bucket/" - dataset = DatasetInfo("mydataset", table_id) + dataset = DatasetInfo("mydataset", full_table_id) result = client.download_dataset( dataset, @@ -439,7 +439,7 @@ def test_download_dataset_as_file(self, client, mocker): assert result == destination table_dlder.download_table_as_file.assert_called_once_with( - table_id, destination, staging_location, FileType.CSV) + full_table_id, destination, staging_location, FileType.CSV) def _create_query_features_response(self, entity_name, entities): response = QueryFeaturesResponse(entityName=entity_name) diff --git a/sdk/python/tests/sdk/utils/test_bq_utils.py b/sdk/python/tests/sdk/utils/test_bq_utils.py index 7fafb1af20..3fde64aee5 100644 --- a/sdk/python/tests/sdk/utils/test_bq_utils.py +++ b/sdk/python/tests/sdk/utils/test_bq_utils.py @@ -48,6 +48,23 @@ def test_get_table_name_not_bq(): with pytest.raises(ValueError, match="storage spec is not BigQuery storage spec"): get_table_name(feature_id, storage_spec) + +@pytest.mark.skipif( + os.getenv("SKIP_BIGQUERY_TEST") is not None, + reason="SKIP_BIGQUERY_TEST is set in the environment", +) +def test_query_to_dataframe(): + with open( + os.path.join(testdata_path, "austin_bikeshare.bikeshare_stations.avro"), "rb" + ) as expected_file: + avro_reader = fastavro.reader(expected_file) + expected = pd.DataFrame.from_records(avro_reader) + + query = "SELECT * FROM `bigquery-public-data.austin_bikeshare.bikeshare_stations`" + actual = query_to_dataframe(query) + assert expected.equals(actual) + + @pytest.mark.skipif( os.getenv("SKIP_BIGQUERY_TEST") is not None, reason="SKIP_BIGQUERY_TEST is set in the environment", @@ -67,7 +84,7 @@ def test_download_table_as_df(self, mocker): staging_path = "gs://temp/" staging_file_name = "temp_0" - table_id = "project_id.dataset_id.table_id" + full_table_id = "project_id.dataset_id.table_id" table_dldr = TableDownloader() exp_staging_path = os.path.join(staging_path, staging_file_name) @@ -75,11 +92,11 @@ def test_download_table_as_df(self, mocker): table_dldr._bq = _Mock_BQ_Client() mocker.patch.object(table_dldr._bq, "extract_table", return_value=_Job()) - table_dldr.download_table_as_df(table_id, staging_location=staging_path) + table_dldr.download_table_as_df(full_table_id, staging_location=staging_path) assert len(table_dldr._bq.extract_table.call_args_list) == 1 args, kwargs = table_dldr._bq.extract_table.call_args_list[0] - assert args[0].full_table_id == Table.from_string(table_id).full_table_id + assert args[0].full_table_id == Table.from_string(full_table_id).full_table_id assert args[1] == exp_staging_path assert kwargs["job_config"].destination_format == "CSV" mocked_gcs_to_df.assert_called_once_with(exp_staging_path) @@ -97,25 +114,25 @@ def test_download_json(self, mocker): self._test_download_file(mocker, FileType.JSON) def test_download_invalid_staging_url(self): - table_id = "project_id.dataset_id.table_id" + full_table_id = "project_id.dataset_id.table_id" table_dldr = TableDownloader() with pytest.raises( ValueError, match="staging_uri must be a directory in " "GCS" ): table_dldr.download_table_as_file( - table_id, "/tmp/dst", "/local/directory", FileType.CSV + full_table_id, "/tmp/dst", "/local/directory", FileType.CSV ) with pytest.raises( ValueError, match="staging_uri must be a directory in " "GCS" ): - table_dldr.download_table_as_df(table_id, "/local/directory") + table_dldr.download_table_as_df(full_table_id, "/local/directory") def _test_download_file(self, mocker, type): staging_path = "gs://temp/" staging_file_name = "temp_0" dst_path = "/tmp/myfile.csv" - table_id = "project_id.dataset_id.table_id" + full_table_id = "project_id.dataset_id.table_id" table_dldr = TableDownloader() mock_blob = _Blob() @@ -128,13 +145,13 @@ def _test_download_file(self, mocker, type): ) table_dldr.download_table_as_file( - table_id, dst_path, staging_location=staging_path, file_type=type + full_table_id, dst_path, staging_location=staging_path, file_type=type ) exp_staging_path = os.path.join(staging_path, staging_file_name) assert len(table_dldr._bq.extract_table.call_args_list) == 1 args, kwargs = table_dldr._bq.extract_table.call_args_list[0] - assert args[0].full_table_id == Table.from_string(table_id).full_table_id + assert args[0].full_table_id == Table.from_string(full_table_id).full_table_id assert args[1] == exp_staging_path assert kwargs["job_config"].destination_format == str(type)