diff --git a/README.md b/README.md index cd9297850a8..221bde33e38 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,46 @@ If you are not running this client on Google Compute Engine, you need a Google D * If you want to use a new service account, click on **Create new client ID**. After the account is created, you will be prompted to download the JSON key file that the library uses to authorize your requests. * If you want to generate a new key for an existing service account, click on **Generate new JSON key** and download the JSON key file. +## Google BigQuery + +Analyze Big Data in the cloud with [Google BigQuery][cloud-bigquery] ([docs][cloud-bigquery-docs]) . Run fast, SQL-like queries against multi-terabyte datasets in seconds. Scalable and easy to use, BigQuery gives you real-time insights about your data. + +See the [gcloud-node BigQuery API documentation][gcloud-bigquery-docs] to learn how to access your BigQuery datasets using this library. + +```js +var gcloud = require('gcloud'); +var bigquery; + +// From Google Compute Engine: +bigquery = gcloud.bigquery({ + projectId: 'my-project' +}); + +// Or from elsewhere: +bigquery = gcloud.bigquery({ + projectId: 'my-project', + keyFilename: '/path/to/keyfile.json' +}); + +// Access an existing dataset. +var schoolsDataset = bigquery.dataset('schools'); + +// Import data into a dataset. +schoolsDataset.import('/local/file.json', function(err, job) {}); + +// Get results from a query job. +bigquery.job('job-id').getQueryResults(function(err, rows, nextQuery) {}); + +// Get the same results as a readable stream. +bigquery.job('job-id') + .getQueryResults() + .pipe(require('through2').obj(function(row, enc, next) { + this.push(row.address + '\n'); + next(); + })) + .pipe(process.stdout); +``` + ## Google Cloud Datastore [Google Cloud Datastore][cloud-datastore] ([docs][cloud-datastore-docs]) is a fully managed, schemaless database for storing non-relational data. Cloud Datastore automatically scales with your users and supports ACID transactions, high availability of reads and writes, strong consistency for reads and ancestor queries, and eventual consistency for all other queries. @@ -165,6 +205,7 @@ Apache 2.0 - See [COPYING](COPYING) for more information. [gcloud-homepage]: https://googlecloudplatform.github.io/gcloud-node [gcloud-docs]: https://googlecloudplatform.github.io/gcloud-node/#/docs +[gcloud-bigquery-docs]: https://googlecloudplatform.github.io/gcloud-node/#/docs/bigquery [gcloud-datastore-docs]: https://googlecloudplatform.github.io/gcloud-node/#/docs/datastore [gcloud-pubsub-docs]: https://googlecloudplatform.github.io/gcloud-node/#/docs/pubsub [gcloud-storage-docs]: https://googlecloudplatform.github.io/gcloud-node/#/docs/storage @@ -175,6 +216,9 @@ Apache 2.0 - See [COPYING](COPYING) for more information. [googleapis]: https://github.com/google/google-api-nodejs-client +[cloud-bigquery]: https://cloud.google.com/bigquery +[cloud-bigquery-docs]: https://cloud.google.com/bigquery/what-is-bigquery + [cloud-datastore]: https://cloud.google.com/products/cloud-datastore [cloud-datastore-docs]: https://developers.google.com/datastore [cloud-datastore-activation]: https://developers.google.com/datastore/docs/activate diff --git a/docs/components/docs/docs-values.js b/docs/components/docs/docs-values.js index 6c3cecc214e..4767cc9c66f 100644 --- a/docs/components/docs/docs-values.js +++ b/docs/components/docs/docs-values.js @@ -20,6 +20,25 @@ angular.module('gcloud.docs') _url: '{baseUrl}' }, + bigquery: { + title: 'BigQuery', + _url: '{baseUrl}/bigquery', + pages: [ + { + title: 'Dataset', + url: '/dataset' + }, + { + title: 'Job', + url: '/job' + }, + { + title: 'Table', + url: '/table' + } + ] + }, + datastore: { title: 'Datastore', _url: '{baseUrl}/datastore', @@ -114,6 +133,8 @@ angular.module('gcloud.docs') '<0.9.0': ['storage'], // introduce new storage api. - '>=0.9.0': ['storageWithFiles'] + '>=0.9.0': ['storageWithFiles'], + + '>=0.10.0': ['bigquery'] } }); diff --git a/docs/components/docs/docs.html b/docs/components/docs/docs.html index 574f9728c30..1ab9f8ade1b 100644 --- a/docs/components/docs/docs.html +++ b/docs/components/docs/docs.html @@ -58,6 +58,18 @@


+
+

BigQuery Overview

+

+ The object returned from gcloud.bigquery gives you complete access to and control of your BigQuery datasets. You can work with existing ones, by using the dataset method, or create new ones with createDataset. +

+
+var bigquery = gcloud.bigquery();
+

+ Follow along with the examples below to see how to query your datasets, create tables, import data from your Cloud Storage buckets, and more. +

+
+

Datastore Overview

diff --git a/docs/components/docs/docs.js b/docs/components/docs/docs.js index 61efbdba830..b5c48b3ad0b 100644 --- a/docs/components/docs/docs.js +++ b/docs/components/docs/docs.js @@ -28,6 +28,7 @@ angular matches.push(formatHtml(detectLinks(detectModules( block.trim() .replace(/\/\/-*\s*/g, '\n') + .replace(/(https*:)\W*/g, '$1//') .replace(/\n\n/g, '\n') .replace(/(\w)\n(\w)/g, '$1 $2') .replace(/\n\n/g, '

') @@ -46,12 +47,24 @@ angular } function detectLinks(str) { var regex = { + normal: /{@link ([^}]*)}/g, + normalWithTitle: /\[([^\]]*)]{@link ([^}]*)}/g, withCode: /{@linkcode ([^<]*)<\/a>/g, withTitle: /\[([^\]]*)]{@link [^}]*}<\/a>/g, withoutTitle: /{@link [^}]*}<\/a>/g }; var a = document.createElement('a'); return str + .replace(regex.normalWithTitle, function(match, title, link) { + a.href = link; + a.innerText = title; + return a.outerHTML; + }) + .replace(regex.normal, function(match, link) { + a.href = link; + a.innerText = link; + return a.outerHTML; + }) .replace(regex.withTitle, function(match, title, link) { a.href = link; a.innerText = title; @@ -90,7 +103,15 @@ angular }); } function reduceModules(acc, type, index, types) { - var CUSTOM_TYPES = ['query', 'dataset', 'transaction', 'bucket', 'file']; + var CUSTOM_TYPES = [ + 'query', + 'dataset', + 'transaction', + 'bucket', + 'file', + 'job', + 'table' + ]; type = type.replace('=', ''); if (CUSTOM_TYPES.indexOf(type.toLowerCase()) > -1) { if (types[index - 1]) { @@ -123,7 +144,7 @@ angular }) .map(function(tag) { tag.description = $sce.trustAsHtml( - formatHtml(tag.description.replace(/^- /, ''))); + formatHtml(detectLinks(tag.description.replace(/^- /, '')))); tag.types = $sce.trustAsHtml(tag.types.reduceRight( reduceModules, []).join(', ')); tag.optional = tag.types.toString().indexOf('=') > -1; @@ -216,7 +237,7 @@ angular } function compareMethods(a, b) { - return a.constructor ? -1: a.name > b.name; + return a.constructor ? -1: a.name > b.name ? 1 : -1; } function getLinks($route, getLinks) { diff --git a/docs/json/master/bigquery/.gitkeep b/docs/json/master/bigquery/.gitkeep new file mode 100644 index 00000000000..e69de29bb2d diff --git a/lib/bigquery/dataset.js b/lib/bigquery/dataset.js new file mode 100644 index 00000000000..7788343e9c5 --- /dev/null +++ b/lib/bigquery/dataset.js @@ -0,0 +1,290 @@ +/*! + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*! + * @module bigquery/dataset + */ + +'use strict'; + +var extend = require('extend'); + +/** + * @type {module:bigquery/table} + * @private + */ +var Table = require('./table.js'); + +/** + * @type {module:common/util} + * @private + */ +var util = require('../common/util.js'); + +/*! Developer Documentation + * + * @param {module:bigquery} bigQuery - BigQuery instance. + * @param {string} id - The ID of the Dataset. + */ +/** + * Interact with your BigQuery dataset. Create a Dataset instance with + * {module:bigquery#createDataset} or {module:bigquery#dataset}. + * + * @alias module:bigquery/dataset + * @constructor + */ +function Dataset(bigQuery, id) { + this.bigQuery = bigQuery; + this.id = id; + this.metadata = {}; +} + +/** + * Create a table given a tableId or configuration object. + * + * @param {object} options - Table id or configuration object. + * @param {string} options.id - The id of the table. + * @param {string|object} options.schema - A comma-separated list of name:type + * pairs. Valid types are "string", "integer", "float", "boolean", and + * "timestamp". If the type is omitted, it is assumed to be "string". + * Example: "name:string, age:integer". Schemas can also be specified as a + * JSON array of fields, which allows for nested and repeated fields. See + * a [Table resource]{@link http://goo.gl/sl8Dmg} for more detailed + * information. + * @param {function} callback - The callback function. + * + * @example + * var tableConfig = { + * id: 'institution_data', + * + * // From the data.gov CSV dataset (http://goo.gl/kSE7z6): + * schema: 'UNITID,INSTNM,ADDR,CITY,STABBR,ZIP,FIPS,OBEREG,CHFNM,...' + * }; + * + * dataset.createTable(tableConfig, function(err, table) {}); + */ +Dataset.prototype.createTable = function(options, callback) { + var that = this; + + extend(true, options, { + tableReference: { + datasetId: this.id, + projectId: this.bigQuery.projectId, + tableId: options.id + } + }); + + if (util.is(options.schema, 'string')) { + options.schema = Table.createSchemaFromString_(options.schema); + } + + delete options.id; + + this.makeReq_('POST', '/tables', null, options, function(err, resp) { + if (err) { + callback(err); + return; + } + + var table = that.table(resp.tableReference.tableId); + table.metadata = resp; + + callback(null, table); + }); +}; + +/** + * Delete the dataset. + * + * @param {object=} options - The configuration object. + * @param {boolean} options.force - Force delete dataset and all tables. + * Default: false. + * @param {function} callback - The callback function. + * + * @example + * //- + * // Delete the dataset, only if it does not have any tables. + * //- + * dataset.delete(function(err) {}); + * + * //- + * // Delete the dataset and any tables it contains. + * //- + * dataset.delete({ force: true }, function(err) {}); + */ +Dataset.prototype.delete = function(options, callback) { + if (!callback) { + callback = options; + options = {}; + } + + var query = { + deleteContents: !!options.force + }; + + this.makeReq_('DELETE', '', query, null, callback); +}; + +/** + * Get the metadata for the Dataset. + * + * @param {function} callback - The callback function. + * + * @example + * dataset.getMetadata(function(err, metadata) {}); + */ +Dataset.prototype.getMetadata = function(callback) { + var that = this; + this.makeReq_('GET', '', null, null, function(err, resp) { + if (err) { + callback(err); + return; + } + + that.metadata = resp; + + callback(null, that.metadata); + }); +}; + +/** + * Get a list of tables. + * + * @param {object=} query - Configuration object. + * @param {number} query.maxResults - Maximum number of results to return. + * @param {string} query.pageToken - Token returned from a previous call, to + * request the next page of results. + * @param {function} callback - The callback function. + * + * @example + * dataset.getTables(function(err, tables, nextQuery) { + * // If `nextQuery` is non-null, there are more results to fetch. + * }); + */ +Dataset.prototype.getTables = function(query, callback) { + var that = this; + + if (util.is(query, 'function')) { + callback = query; + query = {}; + } + + query = query || {}; + + this.makeReq_('GET', '/tables', query, null, function(err, resp) { + if (err) { + callback(err); + return; + } + + var nextQuery = null; + + if (resp.nextPageToken) { + nextQuery = extend({}, query, { + pageToken: resp.nextPageToken + }); + } + + var tables = (resp.tables || []).map(function(tableObject) { + var table = that.table(tableObject.id); + table.metadata = tableObject; + return table; + }); + + callback(null, tables, nextQuery); + }); +}; + +/** + * Run a query scoped to your dataset. + * + * See {module:bigquery#query} for full documentation of this method. + */ +Dataset.prototype.query = function(options, callback) { + if (util.is(options, 'string')) { + options = { + query: options + }; + } + + options = extend(true, {}, options, { + defaultDataset: { + datasetId: this.id + } + }); + + return this.bigQuery.query(options, callback); +}; + +/** + * Sets the metadata of the Dataset object. + * + * @param {object} metadata - Metadata to save on the Dataset. + * @param {function} callback - The callback function. + * + * @example + * var metadata = { + * description: 'Information for every institution in the 2013 IPEDS universe' + * }; + * + * dataset.setMetadata(metadata, function(err) {}); + */ +Dataset.prototype.setMetadata = function(metadata, callback) { + var that = this; + + this.makeReq_('PUT', '', null, metadata, function(err, resp) { + if (err) { + callback(err); + return; + } + + that.metadata = resp; + + callback(null, that.metadata); + }); +}; + +/** + * Return a new instance of reference to an existing Table object. + * + * @param {string} id - The ID of the table. + * @return {module:bigquery/table} + * + * @example + * var institutions = dataset.table('institution_data'); + */ +Dataset.prototype.table = function(id) { + return new Table(this, id); +}; + +/** + * Pass through this request to BigQuery's request handler, first prepending the + * path with the dataset. + * + * @private + * + * @param {string} method - Action. + * @param {string} path - Request path. + * @param {*} query - Request query object. + * @param {*} body - Request body contents. + * @param {function} callback - The callback function. + */ +Dataset.prototype.makeReq_ = function(method, path, query, body, callback) { + path = '/datasets/' + this.id + path; + this.bigQuery.makeReq_(method, path, query, body, callback); +}; + +module.exports = Dataset; diff --git a/lib/bigquery/index.js b/lib/bigquery/index.js new file mode 100644 index 00000000000..3afdec68604 --- /dev/null +++ b/lib/bigquery/index.js @@ -0,0 +1,573 @@ +/*! + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*! + * @module bigquery + */ + +'use strict'; + +var extend = require('extend'); +var streamEvents = require('stream-events'); +var through = require('through2'); + +/** + * @type module:common/connection + * @private + */ +var conn = require('../common/connection.js'); + +/** + * @type module:bigquery/dataset + * @private + */ +var Dataset = require('./dataset.js'); + +/** + * @type module:bigquery/job + * @private + */ +var Job = require('./job.js'); + +/** + * @type module:bigquery/table + * @private + */ +var Table = require('./table.js'); + +/** + * @type module:common/util + * @private + */ +var util = require('../common/util.js'); + +/** + * @const {string} Base URL for the BigQuery API. + * @private + */ +var BIGQUERY_BASE_URL = 'https://www.googleapis.com/bigquery/v2/projects/'; + +/** + * Required scopes for Google Cloud BigQuery API. + * + * @const {array} + * @private + */ +var SCOPES = ['https://www.googleapis.com/auth/bigquery']; + +/** + * The example below will demonstrate the different usage patterns your app may + * need to support to retrieve a BigQuery object. + * + * @alias module:bigquery + * @constructor + * + * @example + * var gcloud = require('gcloud'); + * + * // Providing configuration details up-front. + * var myProject = gcloud({ + * keyFilename: '/path/to/keyfile.json', + * projectId: 'my-project' + * }); + * + * var bigquery = myProject.bigquery(); + * + * + * // Overriding default configuration details. + * var anotherBigQueryInstance = myProject.bigquery({ + * keyFilename: '/path/to/another/keyfile.json' + * }); + * + * + * // Not using a default configuration. + * var myOtherProject = gcloud.bigquery({ + * keyFilename: '/path/to/keyfile.json', + * projectId: 'my-project' + * }); + * + * //- + * // In the following examples from this page and the other modules (Dataset, + * // Table, etc.), we are going to be using a dataset from + * // [data.gov]{@link http://goo.gl/f2SXcb} of higher education institutions. + * // + * // We will create a table with the correct schema, import the public CSV file + * // into that table, and query it for data. + * //- + */ +function BigQuery(options) { + if (!(this instanceof BigQuery)) { + return new BigQuery(options); + } + + options = options || {}; + + this.connection_ = new conn.Connection({ + credentials: options.credentials, + keyFilename: options.keyFilename, + scopes: SCOPES + }); + + this.projectId = options.projectId; +} + +/** + * Create a dataset. + * + * @param {string} id - ID of the dataset to create. + * @param {function} callback - The callback function. + * + * @example + * bigquery.createDataset('higher_education', function(err, dataset) {}); + */ +BigQuery.prototype.createDataset = function(id, callback) { + var that = this; + + var body = { + datasetReference: { + datasetId: id + } + }; + + this.makeReq_('POST', '/datasets', null, body, function(err, resp) { + if (err) { + callback(err); + return; + } + + var dataset = that.dataset(id); + dataset.metadata = resp; + + callback(null, dataset); + }); +}; + +/** + * Create a reference to an existing dataset. + * + * @param {string} id - ID of the dataset. + * @return {module:bigquery/dataset} + * + * @example + * var dataset = bigquery.dataset('higher_education'); + */ +BigQuery.prototype.dataset = function(id) { + return new Dataset(this, id); +}; + +/** + * List all or some of the datasets in your project. + * + * @param {object=} query - Configuration object. + * @param {boolean} query.all - List all datasets, including hidden ones. + * @param {number} query.maxResults - Maximum number of results to return. + * @param {string} query.pageToken - Token returned from a previous call, to + * request the next page of results. + * @param {function} callback - The callback function. + * + * @example + * bigquery.getDatasets(function(err, datasets, nextQuery) { + * // If `nextQuery` is non-null, there are more results to fetch. + * }); + */ +BigQuery.prototype.getDatasets = function(query, callback) { + var that = this; + + if (util.is(query, 'function')) { + callback = query; + query = {}; + } + + query = query || {}; + + this.makeReq_('GET', '/datasets', query, null, function(err, resp) { + if (err) { + callback(err); + return; + } + + var nextQuery = null; + + if (resp.nextPageToken) { + nextQuery = extend({}, query, { + pageToken: resp.nextPageToken + }); + } + + var datasets = (resp.datasets || []).map(function(dataset) { + var ds = that.dataset(dataset.datasetReference.datasetId); + ds.metadata = dataset; + return ds; + }); + + callback(null, datasets, nextQuery); + }); +}; + +/** + * Get all of the jobs from your project. + * + * @param {object=} options - Configuration object. + * @param {boolean=} options.allUsers - Display jobs owned by all users in the + * project. + * @param {number=} options.maxResults - Maximum number of results to return. + * @param {string=} options.pageToken - Token returned from a previous call, to + * request the next page of results. + * @param {string=} options.projection - Restrict information returned to a set + * of selected fields. Acceptable values are "full", for all job data, and + * "minimal", to not include the job configuration. + * @param {string=} options.stateFilter - Filter for job state. Acceptable + * values are "done", "pending", and "running". + * @param {function} callback - The callback function. + * + * @example + * bigquery.getJobs(function(err, jobs, nextQuery) { + * // If `nextQuery` is non-null, there are more results to fetch. + * }); + */ +BigQuery.prototype.getJobs = function(options, callback) { + var that = this; + + if (util.is(options, 'function')) { + callback = options; + options = {}; + } + + options = options || {}; + + this.makeReq_('GET', '/jobs', options, null, function(err, resp) { + if (err) { + callback(err); + return; + } + + var nextQuery = null; + + if (resp.nextPageToken) { + nextQuery = extend({}, options, { + pageToken: resp.nextPageToken + }); + } + + var jobs = (resp.jobs || []).map(function(jobObject) { + var job = that.job(jobObject.id); + job.metadata = jobObject; + return job; + }); + + callback(null, jobs, nextQuery); + }); +}; + +/** + * Create a reference to an existing Job. + * + * @param {string} id - ID of the job. + * @return {module:bigquery/job} + * + * @example + * var myExistingJob = bigquery.job('job-id'); + */ +BigQuery.prototype.job = function(id) { + return new Job(this, id); +}; + +/*! Developer Documentation + * + * The `query` method is dual-purpose, like the use cases for a query. + * Sometimes, a user will want to fetch results from their table in a serial + * manner (get results -> more results exist? -> get more results, repeat.) -- + * other times, a user may want to wave their hands at making repeated calls to + * get all of the rows, instead using a stream. + * + * A couple different libraries are used to cover the stream case: + * + * var stream = streamEvents(through2.obj()); + * + * - streamEvents - https://github.com/stephenplusplus/stream-events + * This library enables us to wait until our stream is being asked for + * data, before making any API calls. It is possible a user will get a + * stream, then not end up running it - or, it will be run later, at a + * time when the token returned from the API call could have expired. + * Using this library ensures we wait until the last possible chance to + * get that token. + * + * - through2 - https://github.com/rvagg/through2 + * This is a popular library for how simple it makes dealing with the + * complicated Node.js Streams API. We're creating an object stream, as + * the data we are receiving from the API are rows of JSON data. + */ +/** + * Run a query scoped to your project. + * + * If you provide a callback, this method returns the results from your query to + * it. When querying large sets of data, it is possible your results won't be + * returned all at once. In those cases, you will receive a third argument to + * your callback that can be passed back to this method to return more results. + * + * See the examples below for such a workflow. + * + * This method also runs as a readable stream if you do not provide a callback. + * In cases where more results exist, this will automatically run the subsequent + * queries for you, pushing each row to the stream. + * + * @param {string|object} options - A string SQL query or configuration object. + * @param {number} options.maxResults - Maximum number of results to read. + * @param {string} options.query - A query string, following the BigQuery query + * syntax, of the query to execute. + * @param {number} options.timeoutMs - How long to wait for the query to + * complete, in milliseconds, before returning. Default is to return + * immediately. If the timeout passes before the job completes, the request + * will fail with a `TIMEOUT` error. + * @param {function=} callback - The callback function. If you intend to + * continuously run this query until all results are in as part of a stream, + * do not pass a callback. + * + * @example + * var query = 'SELECT url FROM [publicdata:samples.github_nested] LIMIT 100'; + * + * //- + * // You can run a query against your data in a serial manner. + * //- + * bigquery.query(query, function(err, rows, nextQuery) { + * if (nextQuery) { + * bigquery.query(nextQuery, function(err, rows, nextQuery) {}); + * } + * }); + * + * //- + * // You can also use the `query` method as a readable object stream by + * // omitting the callback. + * //- + * var through2 = require('through2'); + * + * bigquery.query(query) + * .pipe(through2.obj(function(row, enc, next) { + * this.push(row.url += '?trackingCode=AZ19b\n'); + * next(); + * })) + * .pipe(process.stdout); + */ +BigQuery.prototype.query = function(options, callback) { + var that = this; + var stream; + + if (util.is(options, 'string')) { + options = { + query: options + }; + } + + options = options || {}; + + if (!util.is(callback, 'function')) { + stream = streamEvents(through.obj()); + stream.once('reading', runQuery); + return stream; + } else { + callback = callback || util.noop; + runQuery(); + } + + function runQuery() { + if (options.job) { + that.makeReq_( + 'GET', '/queries/' + options.job.id, options, null, responseHandler); + } else { + // Create a job. + that.makeReq_('POST', '/queries', null, options, responseHandler); + } + + function responseHandler(err, resp) { + if (err) { + onComplete(err); + return; + } + + var job = that.job(resp.jobReference.jobId); + var nextQuery = null; + var rows = Table.mergeSchemaWithRows_(resp.schema, resp.rows || []); + + if (resp.jobComplete === false) { + // Query is still running. + nextQuery = extend({ + job: job + }, options); + } else if (resp.pageToken) { + // More results exist. + nextQuery = extend({ + job: job + }, options, { + pageToken: resp.pageToken + }); + } + + onComplete(null, rows, nextQuery); + } + + function onComplete(err, rows, nextQuery) { + if (err) { + if (stream) { + stream.emit('error', err); + stream.end(); + } else { + callback(err); + } + return; + } + + if (stream) { + rows.forEach(function(row) { + stream.push(row); + }); + + if (nextQuery) { + that.query(nextQuery, onComplete); + } else { + stream.end(); + } + } else { + callback(null, rows, nextQuery); + } + } + } +}; + +/** + * Run a query as a job. No results are immediately returned. Instead, your + * callback will be executed with a {module:bigquery/job} object that you must + * ping for the results. See the Job documentation for explanations of how to + * check on the status of the job. + * + * @param {object|string} options - The configuration object. This must be in + * the format of the [`configuration.query`]{@link http://goo.gl/wRpHvR} + * property of a Jobs resource. If a string is provided, this is used as the + * query string, and all other options are defaulted. + * @param {module:bigquery/table=} options.destination - The table to save the + * query's results to. If omitted, a new table will be created. + * @param {string} options.query - A query string, following the BigQuery query + * syntax, of the query to execute. + * @param {function} callback - The callback function. + * + * @throws {Error} If a query is not specified. + * @throws {Error} If a Table is not provided as a destination. + * + * @example + * var query = 'SELECT url FROM [publicdata:samples.github_nested] LIMIT 100'; + * + * //- + * // You may pass only a query string, having a new table created to store the + * // results of the query. + * //- + * bigquery.startQuery(query, function(err, job) {}); + * + * //- + * // You can also control the destination table by providing a + * // {module:bigquery/table} object. + * //- + * bigquery.startQuery({ + * destination: bigquery.dataset('higher_education').table('institutions'), + * query: query + * }, function(err, job) {}); + * + * //- + * // After you have run `startQuery`, your query will execute in a job. Your + * // callback is executed with a {module:bigquery/job} object so that you may + * // check for the results. + * //- + * bigquery.startQuery(query, function(err, job) { + * if (!err) { + * job.getQueryResults(function(err, rows) {}); + * } + * }); + */ +BigQuery.prototype.startQuery = function(options, callback) { + var that = this; + + if (util.is(options, 'string')) { + options = { + query: options + }; + } + + options = options || {}; + + if (!options.query) { + throw new Error('A SQL query string is required.'); + } + + var defaults = {}; + + if (options.destination) { + if (!(options.destination instanceof Table)) { + throw new Error('Destination must be a Table object.'); + } + defaults.destinationTable = { + datasetId: options.destination.dataset.id, + projectId: options.destination.dataset.bigQuery.projectId, + tableId: options.destination.id + }; + delete options.destination; + } + + var body = { + configuration: { + query: extend(true, defaults, options) + } + }; + + this.makeReq_('POST', '/jobs', null, body, function(err, resp) { + if (err) { + callback(err); + return; + } + + var job = that.job(resp.jobReference.jobId); + job.metadata = resp; + + callback(null, job); + }); +}; + +/** + * Make a new request object from the provided arguments and wrap the callback + * to intercept non-successful responses. + * + * @private + * + * @param {string} method - Action. + * @param {string} path - Request path. + * @param {*} query - Request query object. + * @param {*} body - Request body contents. + * @param {function} callback - The callback function. + */ +BigQuery.prototype.makeReq_ = function(method, path, query, body, callback) { + var reqOpts = { + method: method, + qs: query, + uri: BIGQUERY_BASE_URL + this.projectId + path + }; + + if (body) { + reqOpts.json = body; + } + + this.connection_.req(reqOpts, function(err, res, body) { + util.handleResp(err, res, body, callback); + }); +}; + +module.exports = BigQuery; diff --git a/lib/bigquery/job.js b/lib/bigquery/job.js new file mode 100644 index 00000000000..117924830fb --- /dev/null +++ b/lib/bigquery/job.js @@ -0,0 +1,139 @@ +/*! + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*! + * @module bigquery/job + */ + +'use strict'; + +/** + * @type {module:common/util} + * @private + */ +var util = require('../common/util'); + +/*! Developer Documentation + * + * @param {module:bigquery} bigQuery - BigQuery instance. + * @param {string} id - The ID of the table. + */ +/** + * Job objects are returned from various places in the BigQuery API: + * + * - {module:bigquery#getJobs} + * - {module:bigquery#job} + * - {module:bigquery#query} + * - {module:bigquery#startJob} + * - {module:bigquery/table#copy} + * - {module:bigquery/table#createWriteStream} + * - {module:bigquery/table#export} + * - {module:bigquery/table#import} + * + * They can be used to check the status of a running job or fetching the results + * of a previously-executed one. + * + * @alias module:bigquery/job + * @constructor + */ +function Job(bigQuery, id) { + this.bigQuery = bigQuery; + this.id = id; + this.metadata = {}; +} + +/** + * Get the metadata of the job. This will mostly be useful for checking the + * status of a previously-run job. + * + * @param {function} callback - The callback function. + * + * @example + * job.getMetadata(function(err, metadata) {}); + */ +Job.prototype.getMetadata = function(callback) { + var that = this; + + var path = '/jobs/' + this.id; + + this.bigQuery.makeReq_('GET', path, null, null, function(err, resp) { + if (err) { + callback(err); + return; + } + + that.metadata = resp; + + callback(null, that.metadata); + }); +}; + +/** + * Get the results of a job. + * + * @param {object=} options - Configuration object. + * @param {number} options.maxResults - Maximum number of results to read. + * @param {string} options.pageToken - Page token, returned by a previous call, + * to request the next page of results. Note: This is automatically added to + * the `nextQuery` argument of your callback. + * @param {number} options.startIndex - Zero-based index of the starting row. + * @param {number} options.timeoutMs - How long to wait for the query to + * complete, in milliseconds, before returning. Default is to return + * immediately. If the timeout passes before the job completes, the request + * will fail with a `TIMEOUT` error. + * @param {function=} callback - The callback function. If you intend to + * continuously run this query until all results are in as part of a stream, + * do not pass a callback. + * + * @example + * //- + * // Use the default options to get the results of a query. + * //- + * job.getQueryResults(function(err, rows, nextQuery) {}); + * + * //- + * // Customize the results you want to fetch. + * //- + * var options = { + * maxResults: 100 + * }; + * + * job.getQueryResults(options, function(err, rows, nextQuery) {}); + * + * //- + * // Consume the results from the query as a readable stream. + * //- + * var through2 = require('through2'); + * + * job.getQueryResults(options) + * .pipe(through2.obj(function (row, enc, next) { + * this.push(JSON.stringify(row) + '\n'); + * })) + * .pipe(fs.createWriteStream('./backup.json')); + */ +Job.prototype.getQueryResults = function(options, callback) { + if (util.is(options, 'function')) { + callback = options; + options = {}; + } + + options = options || {}; + options.job = this; + + return this.bigQuery.query(options, callback); +}; + +module.exports = Job; diff --git a/lib/bigquery/table.js b/lib/bigquery/table.js new file mode 100644 index 00000000000..906e06cb3ff --- /dev/null +++ b/lib/bigquery/table.js @@ -0,0 +1,837 @@ +/*! + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*! + * @module bigquery/table + */ + +'use strict'; + +var crypto = require('crypto'); +var duplexify = require('duplexify'); +var extend = require('extend'); +var fs = require('fs'); +var path = require('path'); +var streamEvents = require('stream-events'); +var through = require('through2'); + +/** + * @type {module:storage/file} + * @private + */ +var File = require('../storage/file'); + +/** + * @type {module:common/util} + * @private + */ +var util = require('../common/util'); + +/*! Developer Documentation + * + * @param {module:bigquery/dataset} dataset - Dataset instance. + * @param {string} id - The ID of the table. + */ +/** + * Table objects are returned by methods such as + * {module:bigquery/dataset#table}, {module:bigquery/dataset#createTable}, and + * {module:bigquery/dataset#getTables}. + * + * @alias module:bigquery/table + * @constructor + */ +function Table(dataset, id) { + this.bigQuery = dataset.bigQuery; + this.dataset = dataset; + this.id = id; + this.metadata = {}; +} + +/** + * Convert a comma-separated name:type string to a table schema object. + * + * @static + * @private + * + * @param {string} str - Comma-separated schema string. + * @return {object} Table schema in the format the API expects. + */ +Table.createSchemaFromString_ = function(str) { + return str.split(/\s*,\s*/).reduce(function(acc, pair) { + acc.fields.push({ + name: pair.split(':')[0], + type: pair.split(':')[1] || 'string' + }); + + return acc; + }, { + fields: [] + }); +}; + +/** + * Merge a rowset returned from the API with a table schema. + * + * @static + * @private + * + * @param {object} schema + * @param {array} rows + * @return {array} Fields using their matching names from the table's schema. + */ +Table.mergeSchemaWithRows_ = function(schema, rows) { + return rows.map(mergeSchema).map(flattenRows); + + function mergeSchema(row) { + return row.f.map(function(field, index) { + var fieldObject = {}; + fieldObject[schema.fields[index].name] = field.v; + return fieldObject; + }); + } + + function flattenRows(rows) { + return rows.reduce(function(acc, row) { + var key = Object.keys(row)[0]; + acc[key] = row[key]; + return acc; + }, {}); + } +}; + +/** + * Copy data from one table to another, optionally creating that table. + * + * @param {module:bigquery/table} destination - The destination table. + * @param {object=} metadata - Metadata to set with the copy operation. The + * metadata object should be in the format of the + * [`configuration.copy`]{@link http://goo.gl/dKWIyS} property of a Jobs + * resource. + * @param {function} callback - The callback function. + * + * @throws {Error} If a destination other than a Table object is provided. + * + * @example + * table.copy(dataset.table('institution_data'), function(err, job) {}); + * + * //- + * // See the [`configuration.copy`]{@link http://goo.gl/dKWIyS} object for all + * // available options. + * //- + * var metadata = { + * createDisposition: 'CREATE_NEVER', + * writeDisposition: 'WRITE_TRUNCATE' + * }; + * + * table.copy(options, metadata, function(err, job) {}); + */ +Table.prototype.copy = function(destination, metadata, callback) { + var that = this; + + if (!(destination instanceof Table)) { + throw new Error('Destination must be a Table object.'); + } + + if (util.is(metadata, 'function')) { + callback = metadata; + metadata = {}; + } + + var body = { + configuration: { + copy: extend(true, metadata || {}, { + destinationTable: { + datasetId: destination.dataset.id, + projectId: destination.bigQuery.projectId, + tableId: destination.id + }, + sourceTable: { + datasetId: this.dataset.id, + projectId: this.bigQuery.projectId, + tableId: this.id + } + }) + } + }; + + this.bigQuery.makeReq_('POST', '/jobs', null, body, function(err, resp) { + if (err) { + callback(err); + return; + } + + var job = that.bigQuery.job(resp.jobReference.jobId); + job.metadata = resp; + + callback(null, job); + }); +}; + +/** + * Create a readable stream of the rows of data in your table. + * + * @return {ReadStream} + * + * @example + * var through2 = require('through2'); + * + * table.createReadStream() + * .pipe(through2.obj(function(row, enc, next) { + * this.push(JSON.stringify(row) + '\n'); + * })) + * .pipe(fs.createWriteStream('./institutions.json')); + */ +Table.prototype.createReadStream = function() { + var that = this; + + var stream = streamEvents(through.obj()); + stream.once('reading', function() { + that.getRows(handleResponse); + }); + return stream; + + function handleResponse(err, rows, nextQuery) { + if (err) { + stream.emit('error', err); + stream.end(); + return; + } + + rows.forEach(function(row) { + stream.push(row); + }); + + if (nextQuery) { + that.getRows(nextQuery, handleResponse); + } else { + stream.end(); + } + } +}; + +/** + * Load data into your table from a readable stream of JSON or CSV-formatted + * data. + * + * @param {string|object=} metadata - Metadata to set with the load operation. + * The metadata object should be in the format of the + * [`configuration.load`]{@link http://goo.gl/BVcXk4} property of a Jobs + * resource. If a string is given, it will be used as the filetype. + * @return {WriteStream} + * + * @throws {Error} If source format isn't recognized. + * + * @example + * //- + * // Load data from a CSV file. + * //- + * var request = require('request'); + * + * var csvUrl = 'http://goo.gl/kSE7z6'; + * + * var metadata = { + * allowJaggedRows: true, + * skipLeadingRows: 1 + * }; + * + * request.get(csvUrl) + * .pipe(table.createWriteStream(metadata)) + * .on('complete', function(job) { + * // job is a Job object, which you can use to check the status of the load + * // operation. + * job.getMetadata(function(err, metadata) { + * // metadata.status + * }); + * }); + * + * //- + * // Load data from a JSON file. + * //- + * var fs = require('fs'); + * + * fs.createReadStream('./institutions.json') + * .pipe(table.createWriteStream('json')) + * .on('complete', function(job) {}); + */ +Table.prototype.createWriteStream = function(metadata) { + var that = this; + + metadata = metadata || {}; + + var fileTypeMap = { + csv: 'CSV', + json: 'NEWLINE_DELIMITED_JSON' + }; + var fileTypes = Object.keys(fileTypeMap).map(function(key) { + return fileTypeMap[key]; + }); + + if (util.is(metadata, 'string')) { + metadata = { + sourceFormat: fileTypeMap[metadata.toLowerCase()] + }; + } + + if (util.is(metadata.schema, 'string')) { + metadata.schema = Table.createSchemaFromString_(metadata.schema); + } + + extend(true, metadata, { + destinationTable: { + projectId: that.bigQuery.projectId, + datasetId: that.dataset.id, + tableId: that.id + } + }); + + if (metadata.hasOwnProperty('sourceFormat') && + fileTypes.indexOf(metadata.sourceFormat) < 0) { + throw new Error('Source format not recognized: ' + metadata.sourceFormat); + } + + var dup = streamEvents(duplexify()); + + dup.once('writing', function() { + util.makeWritableStream(dup, { + connection: that.bigQuery.connection_, + metadata: { + configuration: { + load: metadata + } + }, + request: { + uri: util.format('{base}/{projectId}/jobs', { + base: 'https://www.googleapis.com/upload/bigquery/v2/projects', + projectId: that.bigQuery.projectId + }) + } + }, function(data) { + var job = that.bigQuery.job(data.jobReference.jobId); + job.metadata = data; + + dup.emit('complete', job); + dup.end(); + }); + }); + + return dup; +}; + +/** + * Delete a table and all its data. + * + * @param {function} callback - The callback function. + * + * @example + * table.delete(function(err) {}); + */ +Table.prototype.delete = function(callback) { + this.makeReq_('DELETE', '', null, null, callback); +}; + +/** + * Export table to Google Cloud Storage. + * + * @param {module:storage/file|module:storage/file[]} destination - Where the + * file should be exported to. + * @param {object=} options - The configuration object. + * @param {string} options.format - The format to export the data in. Allowed + * options are "CSV", "JSON", or "AVRO". Default: "CSV". + * @param {boolean} options.gzip - Specify if you would like the file compressed + * with GZIP. Default: false. + * @param {function} callback - The callback function. + * + * @throws {Error} If destination isn't a File object. + * @throws {Error} If destination format isn't recongized. + * + * @example + * var exportedFile = storage.bucket('institutions').file('2014.csv'); + * + * //- + * // To use the default options, just pass a {module:storage/file} object. + * // + * // Note: The exported format type will be inferred by the file's extension. + * // If you wish to override this, or provide an array of destination files, + * // you must provide an `options` object. + * //- + * table.export(exportedFile, function(err, job) {}); + * + * //- + * // If you need more customization, pass an `options` object. + * //- + * var options = { + * format: 'json', + * gzip: true + * }; + * + * table.export(exportedFile, options, function(err, job) {}); + * + * //- + * // You can also specify multiple destination files. + * //- + * table.export([ + * storage.bucket('institutions').file('2014.json'), + * storage.bucket('institutions-copy').file('2014.json') + * ], options, function(err, job) {}); + */ +Table.prototype.export = function(destination, options, callback) { + var that = this; + + if (util.is(options, 'function')) { + callback = options; + options = {}; + } + + var formats = { + avro: 'AVRO', + csv: 'CSV', + json: 'NEWLINE_DELIMITED_JSON' + }; + + extend(true, options, { + destinationUris: util.arrayize(destination).map(function(dest) { + if (!(dest instanceof File)) { + throw new Error('Destination must be a File object.'); + } + + // If no explicit format was provided, attempt to find a match from the + // file's extension. If no match, don't set, and default upstream to CSV. + var format = path.extname(dest.name).substr(1).toLowerCase(); + if (!options.destinationFormat && !options.format && formats[format]) { + options.destinationFormat = formats[format]; + } + + return 'gs://' + dest.bucket.name + '/' + dest.name; + }) + }); + + if (options.format) { + options.format = options.format.toLowerCase(); + + if (formats[options.format]) { + options.destinationFormat = formats[options.format]; + delete options.format; + } else { + throw new Error('Destination format not recognized: ' + options.format); + } + } + + if (options.gzip) { + options.compression = 'GZIP'; + delete options.gzip; + } + + var body = { + configuration: { + extract: extend(true, options, { + sourceTable: { + datasetId: this.dataset.id, + projectId: this.bigQuery.projectId, + tableId: this.id + } + }) + } + }; + + this.bigQuery.makeReq_('POST', '/jobs', null, body, function(err, resp) { + if (err) { + callback(err); + return; + } + + var job = that.bigQuery.job(resp.jobReference.jobId); + job.metadata = resp; + + callback(null, job); + }); +}; + +/** + * Return the metadata associated with the Table. + * + * @param {function} callback - The callback function. + * + * @example + * table.getMetadata(function(err, metadata) {}); + */ +Table.prototype.getMetadata = function(callback) { + var that = this; + + this.makeReq_('GET', '', null, null, function(err, resp) { + if (err) { + callback(err); + return; + } + + that.metadata = resp; + + callback(null, that.metadata); + }); +}; + +/** + * Retrieves table data from a specified set of rows. The rows are returned to + * your callback as an array of objects matching your table's schema. + * + * @param {object=} options - The configuration object. + * @param {number} options.maxResults - Maximum number of results to return. + * @param {function} callback - The callback function. + * + * @example + * var options = { + * maxResults: 100 + * }; + * + * table.getRows(options, function(err, rows, nextQuery) { + * // If `nextQuery` is non-null, there are more results to fetch. + * if (nextQuery) { + * table.getRows(nextQuery, function(err, rows, nextQuery) {}); + * } + * });` + */ +Table.prototype.getRows = function(options, callback) { + var that = this; + + if (util.is(options, 'function')) { + callback = options; + options = {}; + } + + callback = callback || util.noop; + + this.makeReq_('GET', '/data', options, null, function(err, resp) { + if (err) { + onComplete(err); + return; + } + + var nextQuery = null; + + if (resp.pageToken) { + nextQuery = extend({}, options, { + pageToken: resp.pageToken + }); + } + + if (resp.rows && resp.rows.length > 0 && !that.metadata.schema) { + // We don't know the schema for this table yet. Do a quick stat. + that.getMetadata(function(err) { + if (err) { + onComplete(err); + return; + } + + onComplete(null, resp.rows, nextQuery); + }); + + return; + } + + onComplete(null, resp.rows, nextQuery); + }); + + function onComplete(err, rows, nextQuery) { + if (err) { + callback(err); + return; + } + + rows = Table.mergeSchemaWithRows_(that.metadata.schema, rows || []); + + callback(null, rows, nextQuery); + } +}; + +/** + * Load data from a local file or Storage file ({module:storage/file}). + * + * By loading data this way, you create a load job that will run your data load + * asynchronously. If you would like instantaneous access to your data, insert + * it using {module:bigquery/table#insert}. + * + * Note: Only JSON and CSV source files are supported. The file type will be + * inferred by the given file's extension. If you wish to override this, you + * must provide a `metadata` object. + * + * @param {string|module:storage/file} source - The source file to import. + * @param {object=} metadata - Metadata to set with the load operation. The + * metadata object should be in the format of the + * [`configuration.load`]{@link http://goo.gl/BVcXk4} property of a Jobs + * resource. + * @param {function} callback - The callback function. + * + * @throws {Error} If the source isn't a string file name or a File instance. + * + * @example + * //- + * // Load data from a local file. + * //- + * table.import('./institutions.csv', function(err, job) {}); + * + * //- + * // You may also pass in metadata in the format of a Jobs resource. See + * // {@link http://goo.gl/BVcXk4} for a full list of supported values. + * //- + * var metadata = { + * encoding: 'ISO-8859-1', + * sourceFormat: 'JSON' + * }; + * + * table.import('./institutions.csv', metadata, function(err, job) {}); + * + * //- + * // Load data from a file in your Storage bucket. + * //- + * var data = storage.bucket('institutions').file('data.csv'); + * table.import(data, function(err, job) {}); + * + * //- + * // Load data from multiple files in your Storage bucket(s). + * //- + * table.import([ + * storage.bucket('institutions').file('2011.csv'), + * storage.bucket('institutions').file('2012.csv') + * ], function(err, job) {}); + */ +Table.prototype.import = function(source, metadata, callback) { + var that = this; + + if (util.is(metadata, 'function')) { + callback = metadata; + metadata = {}; + } + + callback = callback || util.noop; + metadata = metadata || {}; + + var formats = { + csv: 'CSV', + json: 'NEWLINE_DELIMITED_JSON' + }; + + if (util.is(source, 'string')) { + // A path to a file was given. If a sourceFormat wasn't specified, try to + // find a match from the file's extension. + var format = formats[path.extname(source).substr(1).toLowerCase()]; + if (!metadata.sourceFormat && format) { + metadata.sourceFormat = format; + } + + // Read the file into a new write stream. + return fs.createReadStream(source) + .pipe(this.createWriteStream(metadata)) + .on('error', callback) + .on('complete', callback); + } + + var body = { + configuration: { + load: { + destinationTable: { + projectId: this.bigQuery.projectId, + datasetId: this.dataset.id, + tableId: this.id + } + } + } + }; + + extend(true, body.configuration.load, metadata, { + sourceUris: util.arrayize(source).map(function(src) { + if (!(src instanceof File)) { + throw new Error('Source must be a File object.'); + } + + // If no explicit format was provided, attempt to find a match from + // the file's extension. If no match, don't set, and default upstream + // to CSV. + var format = formats[path.extname(src.name).substr(1).toLowerCase()]; + if (!metadata.sourceFormat && format) { + body.configuration.load.sourceFormat = format; + } + + return 'gs://' + src.bucket.name + '/' + src.name; + }) + }); + + this.bigQuery.makeReq_('POST', '/jobs', null, body, function(err, resp) { + if (err) { + callback(err); + return; + } + + var job = that.bigQuery.job(resp.jobReference.jobId); + job.metadata = resp; + + callback(null, job); + }); +}; + +/** + * Stream data into BigQuery one record at a time without running a load job. + * + * There are more strict quota limits using this method so it is highly + * recommended that you load data into BigQuery using + * {module:bigquery/table#import} instead. + * + * @param {object|object[]} rows - The rows to insert into the table. + * @param {function} callback - The callback function. + * + * @example + * //- + * // Insert a single row. + * //- + * table.insert({ + * INSTNM: 'Motion Picture Institute of Michigan', + * CITY: 'Troy', + * STABBR: 'MI' + * }, insertHandler); + * + * //- + * // Insert multiple rows at a time. + * //- + * var rows = [ + * { + * INSTNM: 'Motion Picture Institute of Michigan', + * CITY: 'Troy', + * STABBR: 'MI' + * }, + * // ... + * ]; + * + * table.insert(rows, insertHandler); + * + * //- + * // Handling the response. + * //- + * function insertHandler(err, insertErrors) { + * // err (object): + * // An API error occurred. + * + * // insertErrors (object[]): + * // If populated, some rows failed to insert, while others may have + * // succeeded. + * // + * // insertErrors[].row (original individual row object passed to `insert`) + * // insertErrors[].errors[].reason + * // insertErrors[].errors[].message + * + * // See https://developers.google.com/bigquery/troubleshooting-errors for + * // recommendations on handling errors. + * } + */ +Table.prototype.insert = function(rows, callback) { + var body = { + rows: util.arrayize(rows || []).map(function(row) { + var rowObject = {}; + // Use the stringified contents of the row as a unique insert ID. + var md5 = crypto.createHash('md5'); + md5.update(JSON.stringify(row)); + rowObject.insertId = md5.digest('hex'); + rowObject.json = row; + return rowObject; + }) + }; + + this.makeReq_('POST', '/insertAll', null, body, function(err, resp) { + if (err) { + callback(err); + return; + } + + var failedToInsert = (resp.insertErrors || []).map(function(insertError) { + return { + error: insertError.errors.map(function(error) { + return { + message: error.message, + reason: error.reason + }; + }), + row: body.rows[insertError.index].json + }; + }); + + callback(null, failedToInsert); + }); +}; + +/** + * Run a query scoped to your dataset. + * + * See {module:bigquery#query} for full documentation of this method. + */ +Table.prototype.query = function(query, callback) { + return this.dataset.query(query, callback); +}; + +/** + * Set the metadata on the table. + * + * @param {object} metadata - The metadata key/value object to set. + * @param {string} metadata.description - A user-friendly description of the + * table. + * @param {string} metadata.name - A descriptive name for the table. + * @param {string|object} metadata.schema - A comma-separated list of name:type + * pairs. Valid types are "string", "integer", "float", "boolean", and + * "timestamp". If the type is omitted, it is assumed to be "string". + * Example: "name:string, age:integer". Schemas can also be specified as a + * JSON array of fields, which allows for nested and repeated fields. See + * a [Table resource]{@link http://goo.gl/sl8Dmg} for more detailed + * information. + * @param {function} callback - The callback function. + */ +Table.prototype.setMetadata = function(metadata, callback) { + var that = this; + + if (metadata.name) { + metadata.friendlyName = metadata.name; + delete metadata.name; + } + + if (util.is(metadata.schema, 'string')) { + metadata.schema = Table.createSchemaFromString_(metadata.schema); + } + + this.makeReq_('PUT', '', null, metadata, function(err, resp) { + if (err) { + callback(err); + return; + } + + that.metadata = resp; + + callback(null, that.metadata); + }); +}; + +/** + * Pass through this request to BigQuery's request handler, first prepending the + * path with the dataset. + * + * @private + * + * @param {string} method - Action. + * @param {string} path - Request path. + * @param {*} query - Request query object. + * @param {*} body - Request body contents. + * @param {function} callback - The callback function. + */ +Table.prototype.makeReq_ = function(method, path, query, body, callback) { + path = '/tables/' + this.id + path; + this.dataset.makeReq_(method, path, query, body, callback); +}; + +module.exports = Table; diff --git a/lib/common/util.js b/lib/common/util.js index ec27a6d09e7..3853a641c89 100644 --- a/lib/common/util.js +++ b/lib/common/util.js @@ -23,6 +23,7 @@ var extend = require('extend'); var util = require('util'); +var uuid = require('node-uuid'); /** * Extend a global configuration object with user options provided at the time @@ -225,3 +226,98 @@ function toArray(object) { } module.exports.toArray = toArray; + +/** + * Take a Duplexify stream, fetch an authorized connection header, and create an + * outgoing writable stream. + * + * @param {Duplexify} dup - Duplexify stream. + * @param {object} options - Configuration object. + * @param {module:common/connection} options.connection - A connection instance, + * used to get a token with and send the request through. + * @param {object} options.metadata - Metadata to send at the head of the + * request. + * @param {object} options.request - Request object, in the format of a standard + * Node.js http.request() object. + * @param {string=} options.request.method - Default: "POST". + * @param {string=} options.request.qs.uploadType - Default: "multipart". + * @param {string=} options.streamContentType - Default: + * "application/octet-stream". + * @param {function} onComplete - Callback, executed after the writable Request + * stream has completed. + */ +function makeWritableStream(dup, options, onComplete) { + onComplete = onComplete || noop; + + var boundary = uuid.v4(); + + var defaults = { + method: 'POST', + qs: { + uploadType: 'multipart' + } + }; + + // Extend the provided request object with common defaults. + // + // `options.request` takes precedence over the defaults, but not over the + // headers, as these set up the boundary. + var request = extend(true, defaults, options.request, { + headers: { + 'Content-Type': 'multipart/related; boundary="' + boundary + '"' + } + }); + + // With the provided connection, be sure we have a valid token before + // attempting to create a request. + options.connection.createAuthorizedReq(request, function(err, req) { + if (err) { + dup.emit('error', err); + return; + } + + var streamType = options.streamContentType || 'application/octet-stream'; + + var stream = options.connection.requester(req); + stream.callback = noop; + + // Write the metadata to the request. + stream.write('--' + boundary + '\n'); + stream.write('Content-Type: application/json\n\n'); + stream.write(JSON.stringify(options.metadata)); + stream.write('\n\n'); + stream.write('--' + boundary + '\n'); + stream.write('Content-Type: ' + streamType + '\n\n'); + + // Overwrite the `end` function, so we can close the boundary. + var oldEndFn = stream.end; + stream.end = function(data, encoding, callback) { + data = (data || '') + '\n--' + boundary + '--\n'; + stream.write(data, encoding, callback); + oldEndFn.apply(this); + }; + + // When the request is complete, parse it. If everything went well, pass the + // parsed response data to the callback handler. + stream.on('complete', function(res) { + handleResp(null, res, res.body, function(err, data) { + if (err) { + dup.emit('error', err); + dup.end(); + return; + } + onComplete(data); + }); + }); + + // We have a writable stream - tell Duplexify about it, so it can resume + // processing incoming data. + dup.setWritable(stream); + + // Keep part of the stream open to keep Request from closing the conneciton. + // Reference: http://goo.gl/zZVSif. + dup.pipe(stream); + }); +} + +module.exports.makeWritableStream = makeWritableStream; diff --git a/lib/index.js b/lib/index.js index 8e068270c0a..06cfdf3d680 100644 --- a/lib/index.js +++ b/lib/index.js @@ -21,10 +21,10 @@ 'use strict'; /** - * @type module:common/util + * @type {module:bigquery} * @private */ -var util = require('./common/util.js'); +var BigQuery = require('./bigquery'); /** * @type {module:datastore} @@ -45,7 +45,7 @@ var PubSub = require('./pubsub'); var Storage = require('./storage'); /** - * @type {module:common/util} + * @type module:common/util * @private */ var util = require('./common/util.js'); @@ -104,6 +104,10 @@ var util = require('./common/util.js'); */ function gcloud(config) { return { + bigquery: function(options) { + options = options || {}; + return new BigQuery(util.extendGlobalConfig(config, options)); + }, datastore: new Datastore(config), pubsub: function(options) { options = options || {}; @@ -116,6 +120,25 @@ function gcloud(config) { }; } +/** + * Analyze Big Data in the cloud with + * [Google BigQuery]{@link https://cloud.google.com/bigquery}. Run fast, + * SQL-like queries against multi-terabyte datasets in seconds. Scalable and + * easy to use, BigQuery gives you real-time insights about your data. + * + * @type {module:bigquery} + * + * @return {module:bigquery} + * + * @example + * var gcloud = require('gcloud'); + * var bigquery = gcloud.bigquery({ + * projectId: 'project-id', + * keyFilename: '/path/to/keyfile.json' + * }); + */ +gcloud.bigquery = BigQuery; + /** * [Google Cloud Datastore]{@link https://developers.google.com/datastore/} is * a fully managed, schemaless database for storing non-relational data. Use @@ -129,13 +152,10 @@ function gcloud(config) { * @example * var gcloud = require('gcloud'); * var datastore = gcloud.datastore; - * - * // datastore: - * // { - * // Dataset: function() {}, - * // double: function() {}, - * // int: function() {} - * // } + * var dataset = datastore.dataset({ + * projectId: 'project-id', + * keyFilename: '/path/to/keyfile.json' + * }); */ gcloud.datastore = Datastore; @@ -159,10 +179,9 @@ gcloud.datastore = Datastore; * * @example * var gcloud = require('gcloud'); - * * var pubsub = gcloud.pubsub({ - * projectId: YOUR_PROJECT_ID, - * keyFilename: '/path/to/the/key.json' + * projectId: 'project-id', + * keyFilename: '/path/to/keyfile.json' * }); */ gcloud.pubsub = function(config) { @@ -181,12 +200,10 @@ gcloud.pubsub = function(config) { * * @example * var gcloud = require('gcloud'); - * var storage = gcloud.storage; - * - * // storage: - * // { - * // bucket: function() {} - * // } + * var storage = gcloud.storage({ + * projectId: 'project-id', + * keyFilename: '/path/to/keyfile.json' + * }); */ gcloud.storage = Storage; diff --git a/package.json b/package.json index 1b9b57161d3..7ab2d98a362 100644 --- a/package.json +++ b/package.json @@ -49,7 +49,9 @@ "gapitoken": "^0.1.3", "node-uuid": "^1.4.1", "protobufjs": "^3.4.0", - "request": "^2.39.0" + "request": "^2.39.0", + "stream-events": "^1.0.1", + "through2": "^0.6.3" }, "devDependencies": { "async": "^0.9.0", @@ -65,10 +67,10 @@ "scripts": { "docs": "./scripts/docs.sh", "lint": "jshint lib/ regression/ test/", - "test": "mocha --recursive --reporter spec", - "regression-test": "mocha regression/datastore.js regression/pubsub.js regression/storage.js --reporter spec --timeout 15000", - "cover": "istanbul cover -x 'regression/*' _mocha -- --timeout 15000 test/* regression/datastore.js regression/pubsub.js regression/storage.js", - "coveralls": "istanbul cover -x 'regression/*' _mocha --report lcovonly -- --timeout 15000 test/* regression/datastore.js regression/pubsub.js regression/storage.js -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage" + "test": "mocha --recursive", + "regression-test": "mocha regression/* --timeout 15000", + "cover": "istanbul cover -x 'regression/*' _mocha -- --timeout 15000 test/* regression/*", + "coveralls": "istanbul cover -x 'regression/*' _mocha --report lcovonly -- --timeout 15000 test/* regression/* -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage" }, "license": "Apache 2" } diff --git a/regression/bigquery.js b/regression/bigquery.js new file mode 100644 index 00000000000..d484c376068 --- /dev/null +++ b/regression/bigquery.js @@ -0,0 +1,240 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*global describe, it, before */ + +'use strict'; + +var assert = require('assert'); +var async = require('async'); +var Dataset = require('../lib/bigquery/dataset'); +var env = require('./env'); +var fs = require('fs'); +var Job = require('../lib/bigquery/job'); + +var gcloud = require('../lib')(env); +var bigquery = gcloud.bigquery(); +var bucket = gcloud.storage().bucket(); + +describe('BigQuery', function() { + var DATASET_ID = 'testDatasetId'; + var dataset; + var TABLE_ID = 'myKittens'; + var table; + + var query = 'SELECT url FROM [publicdata:samples.github_nested] LIMIT 100'; + + before(function(done) { + async.series([ + function(next) { + // Delete the test dataset, if it exists. + bigquery.dataset(DATASET_ID).delete({ force: true }, function() { + next(); + }); + }, + + // Create the test dataset. + function(next) { + bigquery.createDataset(DATASET_ID, function(err, ds) { + if (err) { + next(err); + return; + } + + dataset = ds; + next(); + }); + }, + + // Delete the test table, if it exists. + function(next) { + dataset.table(TABLE_ID).delete(function() { + next(); + }); + }, + + // Create the test table. + function(next) { + dataset.createTable({ + id: TABLE_ID, + schema: 'id:integer,breed,name,dob:timestamp' + }, function(err, t) { + if (err) { + next(err); + return; + } + + table = t; + next(); + }); + }, + + // Create a Bucket, if necessary. + function(next) { + bucket.getMetadata(function(err) { + if (!err) { + next(); + return; + } + + gcloud.storage().createBucket(bucket.name, function(err, b) { + if (err) { + next(err); + return; + } + + bucket = b; + next(); + }); + }); + } + ], done); + }); + + it('should get a list of datasets', function(done) { + bigquery.getDatasets(function(err, datasets) { + assert(datasets.length > 0); + assert(datasets[0] instanceof Dataset); + done(); + }); + }); + + it('should run a query job, then get results', function(done) { + bigquery.startQuery(query, function(err, job) { + assert.ifError(err); + assert(job instanceof Job); + + job.getQueryResults(function(err, rows) { + assert.equal(rows.length, 100); + assert.equal(typeof rows[0].url, 'string'); + done(); + }); + }); + }); + + it('should query as a stream', function(done) { + var rowsEmitted = 0; + + bigquery.query(query) + .on('data', function(row) { + rowsEmitted++; + assert.equal(typeof row.url, 'string'); + }) + .on('error', done) + .on('finish', function() { + assert.equal(rowsEmitted, 100); + done(); + }); + }); + + it('should allow querying in series', function(done) { + bigquery.query({ + query: query, + maxResults: 10 + }, function(err, rows, nextQuery) { + assert.ifError(err); + assert.equal(rows.length, 10); + assert.equal(typeof nextQuery.pageToken, 'string'); + done(); + }); + }); + + it('should get a list of jobs', function(done) { + bigquery.getJobs(function(err, jobs) { + assert.ifError(err); + assert(jobs[0] instanceof Job); + done(); + }); + }); + + describe('BigQuery/Dataset', function() { + it('should set & get metadata', function(done) { + dataset.setMetadata({ + description: 'yay description' + }, function(err) { + assert.ifError(err); + + dataset.getMetadata(function(err, metadata) { + assert.ifError(err); + assert.equal(metadata.description, 'yay description'); + done(); + }); + }); + }); + }); + + describe('BigQuery/Table', function() { + var TEST_DATA_JSON_PATH = require.resolve('./data/kitten-test-data.json'); + + it('should have created the correct schema', function() { + assert.deepEqual(table.metadata.schema, { + fields: [ + { name: 'id', type: 'INTEGER' }, + { name: 'breed', type: 'STRING' }, + { name: 'name', type: 'STRING' }, + { name: 'dob', type: 'TIMESTAMP' } + ] + }); + }); + + it('should insert rows', function(done) { + table.insert([ + { name: 'silvano', breed: 'the cat kind', id: 1, dob: Date.now() }, + { name: 'ryan', breed: 'golden retriever?', id: 2, dob: Date.now() }, + { name: 'stephen', breed: 'idkanycatbreeds', id: 3, dob: Date.now() } + ], done); + }); + + it('should insert rows via stream', function(done) { + fs.createReadStream(TEST_DATA_JSON_PATH) + .pipe(table.createWriteStream('json')) + .on('error', done) + .on('complete', function() { + done(); + }); + }); + + it('should set & get metadata', function(done) { + table.setMetadata({ + description: 'catsandstuff' + }, function(err) { + assert.ifError(err); + + table.getMetadata(function(err, metadata) { + assert.ifError(err); + assert.equal(metadata.description, 'catsandstuff'); + done(); + }); + }); + }); + + it('should import data from a file in your bucket', function(done) { + bucket.upload(TEST_DATA_JSON_PATH, function(err, file) { + assert.ifError(err); + + table.import(file, function(err, job) { + assert.ifError(err); + assert(job instanceof Job); + done(); + }); + }); + }); + + it('should export data to a file in your bucket', function(done) { + table.export(bucket.file('kitten-test-data-backup.json'), done); + }); + }); +}); diff --git a/regression/data/kitten-test-data.json b/regression/data/kitten-test-data.json new file mode 100644 index 00000000000..7c1896af2f4 --- /dev/null +++ b/regression/data/kitten-test-data.json @@ -0,0 +1,3 @@ +{ "name": "silvano", "breed": "the cat kind", "id": 1, "dob": 1414634759011 } +{ "name": "ryan", "breed": "golden retriever?", "id": 2, "dob": 1414634759012 } +{ "name": "stephen", "breed": "idkanycatbreeds", "id": 3, "dob": 1414634759013 } diff --git a/regression/env.js b/regression/env.js index f6b75f5ace9..af2321e2f6c 100644 --- a/regression/env.js +++ b/regression/env.js @@ -29,5 +29,5 @@ if (!process.env.GCLOUD_TESTS_PROJECT_ID && module.exports = { projectId: process.env.GCLOUD_TESTS_PROJECT_ID, bucketName: process.env.GCLOUD_TESTS_BUCKET_NAME, - keyFilename: process.env.GCLOUD_TESTS_KEY, + keyFilename: process.env.GCLOUD_TESTS_KEY }; diff --git a/scripts/docs.sh b/scripts/docs.sh index a9aa1ca0173..7892971d3f2 100755 --- a/scripts/docs.sh +++ b/scripts/docs.sh @@ -16,6 +16,11 @@ ./node_modules/.bin/dox < lib/index.js > docs/json/master/index.json & +./node_modules/.bin/dox < lib/bigquery/dataset.js > docs/json/master/bigquery/dataset.json & +./node_modules/.bin/dox < lib/bigquery/index.js > docs/json/master/bigquery/index.json & +./node_modules/.bin/dox < lib/bigquery/job.js > docs/json/master/bigquery/job.json & +./node_modules/.bin/dox < lib/bigquery/table.js > docs/json/master/bigquery/table.json & + ./node_modules/.bin/dox < lib/datastore/dataset.js > docs/json/master/datastore/dataset.json & ./node_modules/.bin/dox < lib/datastore/index.js > docs/json/master/datastore/index.json & ./node_modules/.bin/dox < lib/datastore/query.js > docs/json/master/datastore/query.json & diff --git a/test/bigquery/dataset.js b/test/bigquery/dataset.js new file mode 100644 index 00000000000..ad90613f3f5 --- /dev/null +++ b/test/bigquery/dataset.js @@ -0,0 +1,431 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*global describe, it, beforeEach */ + +'use strict'; + +var assert = require('assert'); +var Dataset = require('../../lib/bigquery/dataset'); +var Table = require('../../lib/bigquery/table'); +var util = require('../../lib/common/util'); + +describe('BigQuery/Dataset', function() { + var BIGQUERY = { projectId: 'my-project' }; + var DATASET_ID = 'kittens'; + var ds; + + beforeEach(function() { + ds = new Dataset(BIGQUERY, DATASET_ID); + }); + + describe('createTable', function() { + var SCHEMA_OBJECT = { + fields: [ + { name: 'id', type: 'integer' }, + { name: 'breed', type: 'string' }, + { name: 'name', type: 'string' }, + { name: 'dob', type: 'timestamp' } + ] + }; + var SCHEMA_STRING = 'id:integer,breed,name,dob:timestamp'; + var TABLE_ID = 'kittens'; + + it('should create a table', function(done) { + ds.makeReq_ = function(method, path, query, body) { + assert.equal(method, 'POST'); + assert.equal(path, '/tables'); + assert.strictEqual(query, null); + assert.deepEqual(body.schema, SCHEMA_OBJECT); + assert.equal(body.tableReference.datasetId, DATASET_ID); + assert.equal(body.tableReference.projectId, ds.bigQuery.projectId); + assert.equal(body.tableReference.tableId, TABLE_ID); + done(); + }; + ds.createTable({ id: TABLE_ID, schema: SCHEMA_OBJECT }, assert.ifError); + }); + + it('should create a schema object from a string', function(done) { + ds.makeReq_ = function(method, path, query, body) { + assert.deepEqual(body.schema, SCHEMA_OBJECT); + done(); + }; + ds.createTable({ id: TABLE_ID, schema: SCHEMA_STRING }, assert.ifError); + }); + + it('should return an error to the callback', function(done) { + var error = new Error('Error.'); + ds.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; + ds.createTable({ id: TABLE_ID, schema: SCHEMA_OBJECT }, function(err) { + assert.equal(err, error); + done(); + }); + }); + + it('should return a Table object', function(done) { + ds.makeReq_ = function(method, path, query, body, callback) { + callback(null, { tableReference: { tableId: TABLE_ID } }); + }; + var options = { id: TABLE_ID, schema: SCHEMA_OBJECT }; + ds.createTable(options, function(err, table) { + assert.ifError(err); + assert(table instanceof Table); + done(); + }); + }); + + it('should assign metadata to the Table object', function(done) { + var metadata = { + a: 'b', + c: 'd', + tableReference: { tableId: TABLE_ID } + }; + ds.makeReq_ = function(method, path, query, body, callback) { + callback(null, metadata); + }; + var options = { id: TABLE_ID, schema: SCHEMA_OBJECT }; + ds.createTable(options, function(e, table) { + assert.ifError(e); + assert.deepEqual(table.metadata, metadata); + done(); + }); + }); + }); + + describe('delete', function() { + it('should delete the dataset via the api', function(done) { + ds.makeReq_ = function(method, path, query, body) { + assert.equal(method, 'DELETE'); + assert.equal(path, ''); + assert.deepEqual(query, { deleteContents: false }); + assert.strictEqual(body, null); + done(); + }; + ds.delete(assert.ifError); + }); + + it('should allow a force delete', function(done) { + ds.makeReq_ = function(method, path, query) { + assert.deepEqual(query, { deleteContents: true }); + done(); + }; + ds.delete({ force: true }, assert.ifError); + }); + + it('should execute callback when done', function(done) { + ds.makeReq_ = function(method, path, query, body, callback) { + callback(); + }; + ds.delete(done); + }); + + it('should pass error to callback', function(done) { + var error = new Error('Error.'); + ds.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; + ds.delete(function(err) { + assert.equal(err, error); + done(); + }); + }); + }); + + describe('getMetadata', function() { + it('should get metadata from api', function(done) { + ds.makeReq_ = function(method, path, query, body) { + assert.equal(method, 'GET'); + assert.equal(path, ''); + assert.strictEqual(query, null); + assert.strictEqual(body, null); + done(); + }; + ds.getMetadata(assert.ifError); + }); + + it('should execute callback with error', function(done) { + var error = new Error('Error.'); + ds.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; + ds.getMetadata(function(err) { + assert.equal(err, error); + done(); + }); + }); + + describe('metadata', function() { + var METADATA = { a: 'b', c: 'd' }; + + beforeEach(function() { + ds.makeReq_ = function(method, path, query, body, callback) { + callback(null, METADATA); + }; + }); + + it('should update metadata on Dataset object', function(done) { + ds.getMetadata(function(err) { + assert.ifError(err); + assert.deepEqual(ds.metadata, METADATA); + done(); + }); + }); + + it('should execute callback with metadata', function(done) { + ds.getMetadata(function(err, metadata) { + assert.ifError(err); + assert.deepEqual(metadata, METADATA); + done(); + }); + }); + }); + }); + + describe('getTables', function() { + it('should get tables from the api', function(done) { + ds.makeReq_ = function(method, path, query, body) { + assert.equal(method, 'GET'); + assert.equal(path, '/tables'); + assert.deepEqual(query, {}); + assert.strictEqual(body, null); + done(); + }; + ds.getTables(assert.ifError); + }); + + it('should accept query', function(done) { + var queryObject = { maxResults: 8, pageToken: 'token' }; + ds.makeReq_ = function(method, path, query) { + assert.deepEqual(query, queryObject); + done(); + }; + ds.getTables(queryObject, assert.ifError); + }); + + it('should return error to callback', function(done) { + var error = new Error('Error.'); + ds.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; + ds.getTables(function(err) { + assert.equal(err, error); + done(); + }); + }); + + it('should return Table objects', function(done) { + ds.makeReq_ = function(method, path, query, body, callback) { + callback(null, { tables: [{ id: 'tableName' }] }); + }; + ds.getTables(function(err, tables) { + assert.ifError(err); + assert(tables[0] instanceof Table); + done(); + }); + }); + + it('should assign metadata to the Table objects', function(done) { + var tableObjects = [{ a: 'b', c: 'd', id: 'tableName' }]; + ds.makeReq_ = function(method, path, query, body, callback) { + callback(null, { tables: tableObjects }); + }; + ds.getTables(function(err, tables) { + assert.ifError(err); + assert(tables[0].metadata, tableObjects[0]); + done(); + }); + }); + + it('should return token if more results exist', function(done) { + var token = 'token'; + ds.makeReq_ = function(method, path, query, body, callback) { + callback(null, { nextPageToken: token }); + }; + ds.getTables(function(err, tables, nextQuery) { + assert.deepEqual(nextQuery, { + pageToken: token + }); + done(); + }); + }); + }); + + describe('query', function() { + var options = { + a: 'b', + c: 'd' + }; + + it('should call through to bigQuery', function(done) { + ds.bigQuery.query = function() { + done(); + }; + + ds.query(); + }); + + it('should return the result of the call to bq.query', function(done) { + ds.bigQuery.query = function() { + return { + done: done + }; + }; + + ds.query().done(); + }); + + it('should accept a string', function(done) { + var query = 'SELECT * FROM allthedata'; + + ds.bigQuery.query = function(opts) { + assert.equal(opts.query, query); + done(); + }; + + ds.query(query); + }); + + it('should pass along options', function(done) { + ds.bigQuery.query = function(opts) { + assert.equal(opts.a, options.a); + assert.equal(opts.c, options.c); + done(); + }; + + ds.query(options); + }); + + it('should extend options with defaultDataset', function(done) { + ds.bigQuery.query = function(opts) { + assert.deepEqual(opts.defaultDataset, { datasetId: ds.id }); + done(); + }; + + ds.query(options); + }); + + it('should not modify original options object', function(done) { + ds.bigQuery.query = function() { + assert.deepEqual(options, { a: 'b', c: 'd' }); + done(); + }; + + ds.query(); + }); + + it('should pass callback', function(done) { + var callback = util.noop; + + ds.bigQuery.query = function(opts, cb) { + assert.equal(cb, callback); + done(); + }; + + ds.query(options, callback); + }); + }); + + describe('setMetadata', function() { + var METADATA = { a: 'b', c: 'd' }; + + it('should send request to the api', function(done) { + ds.makeReq_ = function(method, path, query, body) { + assert.equal(method, 'PUT'); + assert.equal(path, ''); + assert.strictEqual(query, null); + assert.deepEqual(body, METADATA); + done(); + }; + ds.setMetadata(METADATA, assert.ifError); + }); + + it('should execute callback with error', function(done) { + var error = new Error('Error.'); + ds.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; + ds.setMetadata(METADATA, function(err) { + assert.equal(err, error); + done(); + }); + }); + + describe('metadata', function() { + beforeEach(function() { + ds.makeReq_ = function(method, path, query, body, callback) { + callback(null, METADATA); + }; + }); + + it('should update metadata on Dataset object', function(done) { + ds.setMetadata(METADATA, function(err) { + assert.ifError(err); + assert.deepEqual(ds.metadata, METADATA); + done(); + }); + }); + + it('should execute callback with metadata', function(done) { + ds.setMetadata(METADATA, function(err, metadata) { + assert.ifError(err); + assert.deepEqual(metadata, METADATA); + done(); + }); + }); + }); + }); + + describe('table', function() { + it('should return a Table object', function() { + var tableId = 'tableId'; + var table = ds.table(tableId); + assert(table instanceof Table); + assert.equal(table.id, tableId); + }); + }); + + describe('makeReq_', function() { + it('should prefix the path', function(done) { + var path = '/test-path'; + + ds.bigQuery.makeReq_ = function(method, p) { + assert.equal(p, '/datasets/' + ds.id + path); + done(); + }; + + ds.makeReq_('POST', path); + }); + + it('should pass through arguments', function(done) { + var method = 'POST'; + var query = { a: 'b', c: 'd', e: { f: 'g' } }; + var body = { a: 'b', c: 'd', e: { f: 'g' } }; + var callback = util.noop; + + ds.bigQuery.makeReq_ = function(m, p, q, b, c) { + assert.equal(m, method); + assert.deepEqual(q, query); + assert.deepEqual(b, body); + assert.equal(c, callback); + done(); + }; + + ds.makeReq_(method, '/path', query, body, callback); + }); + }); +}); diff --git a/test/bigquery/index.js b/test/bigquery/index.js new file mode 100644 index 00000000000..68253740d2f --- /dev/null +++ b/test/bigquery/index.js @@ -0,0 +1,680 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*global describe, it, beforeEach */ + +'use strict'; + +var assert = require('assert'); +var Dataset = require('../../lib/bigquery/dataset'); +var Job = require('../../lib/bigquery/job'); +var Stream = require('stream').Stream; +var Table = require('../../lib/bigquery/table'); +var util = require('../../lib/common/util'); + +var BigQuery = require('sandboxed-module') + .require('../../lib/bigquery', { + requires: { + './dataset': Dataset, + './job': Job, + './table': FakeTable + } + }); + +var mergeSchemaWithRows_Cached = Table.mergeSchemaWithRows_; +var mergeSchemaWithRows_Override; + +function FakeTable(a, b) { + Table.call(this, a, b); +} + +FakeTable.mergeSchemaWithRows_ = function() { + var args = [].slice.apply(arguments); + return (mergeSchemaWithRows_Override || mergeSchemaWithRows_Cached) + .apply(null, args); +}; + +describe('BigQuery', function() { + var JOB_ID = JOB_ID; + var PROJECT_ID = 'test-project'; + var bq; + + beforeEach(function() { + bq = new BigQuery({ projectId: PROJECT_ID }); + }); + + describe('createDataset', function() { + var DATASET_ID = 'kittens'; + + it('should create a dataset', function(done) { + bq.makeReq_ = function(method, path, query, body) { + assert.equal(method, 'POST'); + assert.equal(path, '/datasets'); + assert.strictEqual(query, null); + assert.deepEqual(body, { + datasetReference: { + datasetId: DATASET_ID + } + }); + done(); + }; + bq.createDataset(DATASET_ID, assert.ifError); + }); + + it('should return an error to the callback', function(done) { + var error = new Error('Error.'); + bq.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; + bq.createDataset(DATASET_ID, function(err) { + assert.equal(err, error); + done(); + }); + }); + + it('should return a Dataset object', function(done) { + bq.makeReq_ = function(method, path, query, body, callback) { + callback(null, {}); + }; + bq.createDataset(DATASET_ID, function(err, dataset) { + assert.ifError(err); + assert(dataset instanceof Dataset); + done(); + }); + }); + + it('should assign metadata to the Dataset object', function(done) { + var metadata = { a: 'b', c: 'd' }; + bq.makeReq_ = function(method, path, query, body, callback) { + callback(null, metadata); + }; + bq.createDataset(DATASET_ID, function(err, dataset) { + assert.ifError(err); + assert.deepEqual(dataset.metadata, metadata); + done(); + }); + }); + }); + + describe('dataset', function() { + var DATASET_ID = 'dataset-id'; + + it('returns a Dataset instance', function() { + var ds = bq.dataset(DATASET_ID); + assert(ds instanceof Dataset); + }); + + it('should scope the correct dataset', function() { + var ds = bq.dataset(DATASET_ID); + assert.equal(ds.id, DATASET_ID); + assert.deepEqual(ds.bigQuery, bq); + }); + }); + + describe('getDatasets', function() { + it('should get datasets from the api', function(done) { + bq.makeReq_ = function(method, path, query, body) { + assert.equal(method, 'GET'); + assert.equal(path, '/datasets'); + assert.deepEqual(query, {}); + assert.strictEqual(body, null); + done(); + }; + bq.getDatasets(assert.ifError); + }); + + it('should accept query', function(done) { + var queryObject = { all: true, maxResults: 8, pageToken: 'token' }; + bq.makeReq_ = function(method, path, query) { + assert.deepEqual(query, queryObject); + done(); + }; + bq.getDatasets(queryObject, assert.ifError); + }); + + it('should return error to callback', function(done) { + var error = new Error('Error.'); + bq.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; + bq.getDatasets(function(err) { + assert.equal(err, error); + done(); + }); + }); + + it('should return Dataset objects', function(done) { + bq.makeReq_ = function(method, path, query, body, callback) { + callback(null, { + datasets: [{ datasetReference: { datasetId: 'datasetName' } }] + }); + }; + bq.getDatasets(function(err, datasets) { + assert.ifError(err); + assert(datasets[0] instanceof Dataset); + done(); + }); + }); + + it('should assign metadata to the Dataset objects', function(done) { + var datasetObjects = [ + { + a: 'b', + c: 'd', + datasetReference: { + datasetId: 'datasetName' + } + } + ]; + bq.makeReq_ = function(method, path, query, body, callback) { + callback(null, { datasets: datasetObjects }); + }; + bq.getDatasets(function(err, datasets) { + assert.ifError(err); + assert(datasets[0].metadata, datasetObjects[0]); + done(); + }); + }); + + it('should return token if more results exist', function(done) { + var token = 'token'; + bq.makeReq_ = function(method, path, query, body, callback) { + callback(null, { nextPageToken: token }); + }; + bq.getDatasets(function(err, datasets, nextQuery) { + assert.deepEqual(nextQuery, { + pageToken: token + }); + done(); + }); + }); + }); + + describe('getJobs', function() { + it('should get jobs from the api', function(done) { + bq.makeReq_ = function(method, path, query, body) { + assert.equal(method, 'GET'); + assert.equal(path, '/jobs'); + assert.deepEqual(query, {}); + assert.strictEqual(body, null); + done(); + }; + bq.getJobs(assert.ifError); + }); + + it('should accept query', function(done) { + var queryObject = { + allUsers: true, + maxResults: 8, + pageToken: 'token', + projection: 'full', + stateFilter: 'done' + }; + bq.makeReq_ = function(method, path, query) { + assert.deepEqual(query, queryObject); + done(); + }; + bq.getJobs(queryObject, assert.ifError); + }); + + it('should return error to callback', function(done) { + var error = new Error('Error.'); + bq.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; + bq.getJobs(function(err) { + assert.equal(err, error); + done(); + }); + }); + + it('should return Job objects', function(done) { + bq.makeReq_ = function(method, path, query, body, callback) { + callback(null, { jobs: [{ id: JOB_ID }] }); + }; + bq.getJobs(function(err, jobs) { + assert.ifError(err); + assert(jobs[0] instanceof Job); + done(); + }); + }); + + it('should assign metadata to the Job objects', function(done) { + var jobObjects = [{ a: 'b', c: 'd', id: JOB_ID }]; + bq.makeReq_ = function(method, path, query, body, callback) { + callback(null, { jobs: jobObjects }); + }; + bq.getJobs(function(err, jobs) { + assert.ifError(err); + assert(jobs[0].metadata, jobObjects[0]); + done(); + }); + }); + + it('should return token if more results exist', function(done) { + var token = 'token'; + bq.makeReq_ = function(method, path, query, body, callback) { + callback(null, { nextPageToken: token }); + }; + bq.getJobs(function(err, jobs, nextQuery) { + assert.deepEqual(nextQuery, { + pageToken: token + }); + done(); + }); + }); + }); + + describe('job', function() { + it('should return a Job instance', function() { + var job = bq.job(JOB_ID); + assert(job instanceof Job); + }); + + it('should scope the correct job', function() { + var job = bq.job(JOB_ID); + assert.equal(job.id, JOB_ID); + assert.deepEqual(job.bigQuery, bq); + }); + }); + + describe('query', function() { + var QUERY_STRING = 'SELECT * FROM [dataset.table]'; + + it('should accept a string for a query', function(done) { + bq.makeReq_ = function(method, path, query, body) { + assert.equal(body.query, QUERY_STRING); + done(); + }; + + bq.query(QUERY_STRING, assert.ifError); + }); + + it('should pass along query options', function(done) { + var options = { + query: QUERY_STRING, + a: 'b', + c: 'd' + }; + + bq.makeReq_ = function(method, path, query, body) { + assert.equal(body.query, QUERY_STRING); + assert.equal(body.a, 'b'); + assert.equal(body.c, 'd'); + done(); + }; + + bq.query(options, assert.ifError); + }); + + it('should get the results of a job if one is provided', function(done) { + bq.makeReq_ = function(method, path) { + assert.equal(method, 'GET'); + assert.equal(path, '/queries/' + JOB_ID); + done(); + }; + + bq.query({ job: bq.job(JOB_ID) }, assert.ifError); + }); + + it('should be a stream if a callback is omitted', function() { + assert(bq.query() instanceof Stream); + }); + + it('should run the query after being read from', function(done) { + bq.makeReq_ = function() { + done(); + }; + + var stream = bq.query(); + stream.emit('reading'); + }); + + describe('job is incomplete', function() { + var options = {}; + + beforeEach(function() { + bq.makeReq_ = function(method, path, query, body, callback) { + callback(null, { + jobComplete: false, + jobReference: { jobId: JOB_ID } + }); + }; + }); + + it('should populate nextQuery when job is incomplete', function(done) { + bq.query({}, function(err, rows, nextQuery) { + assert.ifError(err); + assert(nextQuery.job instanceof Job); + assert.equal(nextQuery.job.id, JOB_ID); + done(); + }); + }); + + it('should not modify original options object', function(done) { + bq.query(options, function(err) { + assert.ifError(err); + assert.deepEqual(options, {}); + done(); + }); + }); + }); + + describe('more results exist', function() { + var options = {}; + var pageToken = 'token'; + + beforeEach(function() { + bq.makeReq_ = function(method, path, query, body, callback) { + callback(null, { + pageToken: pageToken, + jobReference: { jobId: JOB_ID } + }); + }; + }); + + it('should populate nextQuery when more results exist', function(done) { + bq.query(options, function(err, rows, nextQuery) { + assert.ifError(err); + assert(nextQuery.job instanceof Job); + assert.equal(nextQuery.job.id, JOB_ID); + assert.equal(nextQuery.pageToken, pageToken); + done(); + }); + }); + + it('should not modify original options object', function(done) { + bq.query(options, function(err) { + assert.ifError(err); + assert.deepEqual(options, {}); + done(); + }); + }); + }); + + it('should merge the schema with rows', function(done) { + var rows = [{ row: 'a' }, { row: 'b' }, { row: 'c' }]; + var schema = [{ fields: [] }]; + + mergeSchemaWithRows_Override = function(s, r) { + mergeSchemaWithRows_Override = null; + assert.deepEqual(s, schema); + assert.deepEqual(r, rows); + done(); + }; + + bq.makeReq_ = function(method, path, query, body, callback) { + callback(null, { + jobReference: { jobId: JOB_ID }, + rows: rows, + schema: schema + }); + }; + + bq.query({}, assert.ifError); + }); + + describe('errors', function() { + var error = new Error('Error.'); + + beforeEach(function() { + bq.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; + }); + + describe('serial', function() { + it('should pass errors to the callback', function(done) { + bq.query({}, function(err) { + assert.equal(err, error); + done(); + }); + }); + }); + + describe('streams', function() { + it('should emit errors', function(done) { + bq.query() + .once('error', function(err) { + assert.equal(err, error); + done(); + }) + .emit('reading'); + }); + + it('should end the stream', function(done) { + bq.query() + .once('error', util.noop) + .once('finish', done) + .emit('reading'); + }); + }); + }); + + describe('results', function() { + var ROWS = [{ a: 'b' }, { c: 'd' }]; + + beforeEach(function() { + bq.makeReq_ = function(method, path, query, body, callback) { + callback(null, { + jobReference: { jobId: JOB_ID }, + rows: [], + schema: {} + }); + }; + + mergeSchemaWithRows_Override = function() { + mergeSchemaWithRows_Override = null; + return ROWS; + }; + }); + + describe('serial', function() { + it('should return rows to callback', function(done) { + bq.query({}, function(err, rows) { + assert.deepEqual(rows, ROWS); + done(); + }); + }); + }); + + describe('streams', function() { + it('should emit rows to stream', function(done) { + var rowsEmitted = 0; + bq.query() + .on('data', function(row) { + assert.deepEqual(row, ROWS[rowsEmitted]); + rowsEmitted++; + }) + .on('end', function() { + assert.equal(rowsEmitted, ROWS.length); + done(); + }); + }); + + it('should call .query() with nextQuery automatically', function(done) { + var queryCalled = 0; + var pageToken = 'token'; + + bq.makeReq_ = function(method, path, query, body, callback) { + callback(null, { + jobReference: { jobId: JOB_ID }, + pageToken: pageToken + }); + }; + + var query = bq.query; + bq.query = function(options) { + queryCalled++; + + if (queryCalled === 1) { + return query.apply(bq, [].slice.call(arguments)); + } else { + assert.deepEqual(options.pageToken, pageToken); + done(); + } + }; + + bq.query().emit('reading'); + }); + + it('should end the stream if there is no nextQuery', function(done) { + bq.query().on('finish', done).emit('reading'); + }); + }); + }); + }); + + describe('startQuery', function() { + it('should throw if a query is not provided', function() { + assert.throws(function() { + bq.startQuery(); + }, /SQL query string is required/); + + assert.throws(function() { + bq.startQuery({ noQuery: 'here' }); + }, /SQL query string is required/); + }); + + describe('with destination', function() { + var dataset; + var TABLE_ID = 'table-id'; + + beforeEach(function() { + dataset = { + bigQuery: bq, + id: 'dataset-id' + }; + }); + + it('should throw if a destination table is not provided', function() { + assert.throws(function() { + bq.startQuery({ + query: 'query', + destination: 'not a table' + }); + }, /Destination must be a Table/); + }); + + it('should assign destination table to request body', function(done) { + bq.makeReq_ = function(method, path, query, body) { + assert.deepEqual(body.configuration.query.destinationTable, { + datasetId: dataset.id, + projectId: dataset.bigQuery.projectId, + tableId: TABLE_ID + }); + + done(); + }; + + bq.startQuery({ + query: 'query', + destination: new FakeTable(dataset, TABLE_ID) + }); + }); + + it('should delete `destination` prop from request body', function(done) { + bq.makeReq_ = function(method, path, query, body) { + assert.strictEqual(body.configuration.query.destination, undefined); + done(); + }; + + bq.startQuery({ + query: 'query', + destination: new FakeTable(dataset, TABLE_ID) + }); + }); + }); + + it('should pass options to the request body', function(done) { + var options = { a: 'b', c: 'd', query: 'query' }; + + bq.makeReq_ = function(method, path, query, body) { + assert.deepEqual(body.configuration.query, options); + done(); + }; + + bq.startQuery(options); + }); + + it('should make the correct api request', function(done) { + bq.makeReq_ = function(method, path, query, body) { + assert.equal(method, 'POST'); + assert.equal(path, '/jobs'); + assert.strictEqual(query, null); + assert.deepEqual(body.configuration.query, { query: 'query' }); + done(); + }; + + bq.startQuery('query'); + }); + + it('should execute the callback with error', function(done) { + var error = new Error('Error.'); + + bq.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; + + bq.startQuery('query', function(err) { + assert.equal(err, error); + done(); + }); + }); + + it('should execute the callback with Job', function(done) { + var jobsResource = { jobReference: { jobId: JOB_ID }, a: 'b', c: 'd' }; + + bq.makeReq_ = function(method, path, query, body, callback) { + callback(null, jobsResource); + }; + + bq.startQuery('query', function(err, job) { + assert.ifError(err); + assert(job instanceof Job); + assert.equal(job.id, JOB_ID); + assert.deepEqual(job.metadata, jobsResource); + done(); + }); + }); + }); + + describe('makeReq_', function() { + var method = 'POST'; + var path = '/path'; + var query = { a: 'b', c: { d: 'e' } }; + var body = { hi: 'there' }; + + it('should make correct request', function(done) { + bq.connection_.req = function(request) { + var basePath = 'https://www.googleapis.com/bigquery/v2/projects/'; + assert.equal(request.method, method); + assert.equal(request.uri, basePath + bq.projectId + path); + assert.deepEqual(request.qs, query); + assert.deepEqual(request.json, body); + done(); + }; + bq.makeReq_(method, path, query, body, assert.ifError); + }); + + it('should execute callback', function(done) { + bq.connection_.req = function(request, callback) { + callback(); + }; + bq.makeReq_(method, path, query, body, done); + }); + }); +}); diff --git a/test/bigquery/job.js b/test/bigquery/job.js new file mode 100644 index 00000000000..2c07e957c17 --- /dev/null +++ b/test/bigquery/job.js @@ -0,0 +1,157 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*global describe, it, beforeEach */ + +'use strict'; + +var assert = require('assert'); +var Job = require('../../lib/bigquery/job'); +var util = require('../../lib/common/util'); + +describe('BigQuery/Job', function() { + var BIGQUERY = { + projectId: 'test-project', + makeReq_: util.noop + }; + var JOB_ID = 'job_XYrk_3z'; + var job; + + beforeEach(function() { + job = new Job(BIGQUERY, JOB_ID); + }); + + describe('initialization', function() { + it('should assign this.bigQuery', function() { + assert.deepEqual(job.bigQuery, BIGQUERY); + }); + + it('should assign the given id', function() { + assert.equal(job.id, JOB_ID); + }); + + it('should assign empty metadata object', function() { + assert.equal(JSON.stringify(job.metadata), '{}'); + }); + }); + + describe('getMetadata', function() { + it('should get metadata from api', function(done) { + job.bigQuery.makeReq_ = function(method, path, query, body) { + assert.equal(method, 'GET'); + assert.equal(path, '/jobs/' + job.id); + assert.strictEqual(query, null); + assert.strictEqual(body, null); + done(); + }; + job.getMetadata(assert.ifError); + }); + + it('should execute callback with error', function(done) { + var error = new Error('Error.'); + job.bigQuery.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; + job.getMetadata(function(err) { + assert.equal(err, error); + done(); + }); + }); + + describe('metadata', function() { + var METADATA = { a: 'b', c: 'd' }; + + beforeEach(function() { + job.bigQuery.makeReq_ = function(method, path, query, body, callback) { + callback(null, METADATA); + }; + }); + + it('should update metadata on Dataset object', function(done) { + job.getMetadata(function(err) { + assert.ifError(err); + assert.deepEqual(job.metadata, METADATA); + done(); + }); + }); + + it('should execute callback with metadata', function(done) { + job.getMetadata(function(err, metadata) { + assert.ifError(err); + assert.deepEqual(metadata, METADATA); + done(); + }); + }); + }); + }); + + describe('getQueryResults', function() { + var options = { + a: 'b', + c: 'd' + }; + var callback = util.noop; + + it('should accept an options object & callback', function(done) { + job.bigQuery.query = function(opts, cb) { + assert.deepEqual(opts, options); + assert.equal(cb, callback); + done(); + }; + + job.getQueryResults(options, callback); + }); + + it('should accept an options object without a callback', function(done) { + job.bigQuery.query = function(opts, cb) { + assert.deepEqual(opts, options); + assert.equal(cb, undefined); + done(); + }; + + job.getQueryResults(options); + }); + + it('should accept no arguments', function(done) { + job.bigQuery.query = function(opts, cb) { + assert.deepEqual(opts, { job: job }); + assert.equal(cb, undefined); + done(); + }; + + job.getQueryResults(); + }); + + it('should assign job to the options object', function(done) { + job.bigQuery.query = function(opts) { + assert.deepEqual(opts.job, job); + done(); + }; + + job.getQueryResults(); + }); + + it('should return the result of the call to bq.query', function(done) { + job.bigQuery.query = function() { + return { + done: done + }; + }; + + job.getQueryResults().done(); + }); + }); +}); diff --git a/test/bigquery/table.js b/test/bigquery/table.js new file mode 100644 index 00000000000..e8d4d7dc2d8 --- /dev/null +++ b/test/bigquery/table.js @@ -0,0 +1,923 @@ +/** + * Copyright 2014 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*global describe, it, beforeEach */ + +'use strict'; + +var assert = require('assert'); +var extend = require('extend'); +var File = require('../../lib/storage/file'); +var Stream = require('stream'); +var sandbox = require('sandboxed-module'); +var through = require('through2'); +var util = require('../../lib/common/util'); + +function FakeFile(a, b) { + File.call(this, a, b); +} + +var makeWritableStream_Override; +var fakeUtil = extend(util, { + makeWritableStream: function() { + var args = [].slice.call(arguments); + (makeWritableStream_Override || util.makeWritableStream).apply(null, args); + makeWritableStream_Override = null; + } +}); + +var Table = sandbox.require('../../lib/bigquery/table', { + requires: { + '../storage/file': FakeFile, + '../common/util': fakeUtil + } +}); + +describe('BigQuery/Table', function() { + var DATASET = { + id: 'dataset-id', + bigQuery: { + makeReq_: util.noop, + job: function(id) { + return { id: id }; + }, + projectId: 'project-id', + } + }; + + var SCHEMA_OBJECT = { + fields: [ + { name: 'id', type: 'integer' }, + { name: 'breed', type: 'string' }, + { name: 'name', type: 'string' }, + { name: 'dob', type: 'timestamp' } + ] + }; + var SCHEMA_STRING = 'id:integer,breed,name,dob:timestamp'; + + var TABLE_ID = 'kittens'; + var table; + + beforeEach(function() { + table = new Table(DATASET, TABLE_ID); + }); + + describe('createSchemaFromString_', function() { + it('should create a schema object from a string', function() { + assert.deepEqual( + Table.createSchemaFromString_(SCHEMA_STRING), SCHEMA_OBJECT); + }); + }); + + describe('mergeSchemaWithRows_', function() { + it('should merge the schema and flatten the rows', function() { + var rows = [ + { + f: [ + { v: 3 }, + { v: 'dogbreed' }, + { v: 'dogname' }, + { v: '234234' } + ] + }, + { + f: [ + { v: 4 }, + { v: 'dogbreed2' }, + { v: 'dogname2' }, + { v: '2342342' } + ] + } + ]; + + assert.deepEqual(Table.mergeSchemaWithRows_(SCHEMA_OBJECT, rows), [ + { id: 3, breed: 'dogbreed', name: 'dogname', dob: '234234' }, + { id: 4, breed: 'dogbreed2', name: 'dogname2', dob: '2342342' } + ]); + }); + }); + + describe('copy', function() { + var DEST_TABLE = new Table(DATASET, 'destination-table'); + + it('should throw if a destination is not a Table', function() { + assert.throws(function() { + table.copy(); + }, /Destination must be a Table/); + + assert.throws(function() { + table.copy({}); + }, /Destination must be a Table/); + + assert.throws(function() { + table.copy(function() {}); + }, /Destination must be a Table/); + }); + + it('should send correct request to the API', function(done) { + table.bigQuery.makeReq_ = function(method, path, query, body) { + assert.equal(method, 'POST'); + assert.equal(path, '/jobs'); + assert.strictEqual(query, null); + assert.deepEqual(body, { + configuration: { + copy: { + a: 'b', + c: 'd', + destinationTable: { + datasetId: DEST_TABLE.dataset.id, + projectId: DEST_TABLE.bigQuery.projectId, + tableId: DEST_TABLE.id + }, + sourceTable: { + datasetId: table.dataset.id, + projectId: table.bigQuery.projectId, + tableId: table.id + } + } + } + }); + done(); + }; + + table.copy(DEST_TABLE, { a: 'b', c: 'd' }, assert.ifError); + }); + + it('should create and return a Job', function(done) { + var jobId = 'job-id'; + + table.bigQuery.makeReq_ = function(method, path, query, body, callback) { + callback(null, { jobReference: { jobId: jobId } }); + }; + + table.copy(DEST_TABLE, function(err, job) { + assert.ifError(err); + assert.equal(job.id, jobId); + done(); + }); + }); + + it('should assign metadata on the job', function(done) { + var jobMetadata = { jobReference: { jobId: 'job-id' }, a: 'b', c: 'd' }; + + table.bigQuery.makeReq_ = function(method, path, query, body, callback) { + callback(null, jobMetadata); + }; + + table.copy(DEST_TABLE, function(err, job) { + assert.ifError(err); + assert.deepEqual(job.metadata, jobMetadata); + done(); + }); + }); + + it('should accept just a destination and callback', function(done) { + table.bigQuery.makeReq_ = function(method, path, query, body, callback) { + callback(null, { jobReference: { jobId: 'job-id' } }); + }; + + table.copy(DEST_TABLE, done); + }); + + it('should pass an error to the callback', function(done) { + var error = new Error('Error.'); + + table.bigQuery.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; + + table.copy(DEST_TABLE, function(err) { + assert.equal(err, error); + done(); + }); + }); + }); + + describe('createReadStream', function() { + it('should return a stream', function() { + assert(table.createReadStream() instanceof Stream); + }); + + it('should call getRows() when asked for data', function(done) { + table.getRows = function() { + done(); + }; + table.createReadStream().emit('reading'); + }); + + it('should emit rows', function(done) { + var rows = [{ a: 'b' }, { c: 'd' }]; + var rowsEmitted = 0; + + table.getRows = function(handler) { + handler(null, rows); + }; + + table.createReadStream() + .on('data', function(row) { + assert.deepEqual(row, rows[rowsEmitted]); + rowsEmitted++; + }) + .on('end', function() { + assert.equal(rowsEmitted, rows.length); + done(); + }); + }); + + it('should call getRows() if nextQuery exists', function(done) { + var called = 0; + + var nextQuery = { a: 'b', c: 'd' }; + var responseHandler; + + table.getRows = function(query, handler) { + responseHandler = responseHandler || handler || query; + + called++; + + if (called === 2) { + assert.deepEqual(query, nextQuery); + assert.equal(responseHandler, handler); + done(); + } else { + responseHandler(null, [], nextQuery); + } + }; + + table.createReadStream().emit('reading'); + }); + + it('should end the stream when nextQuery is not present', function(done) { + table.getRows = function(handler) { + handler(null, []); + }; + + table.createReadStream().on('finish', done).emit('reading'); + }); + + describe('errors', function() { + var error = new Error('Error.'); + + beforeEach(function() { + table.getRows = function(handler) { + handler(error); + }; + }); + + it('should emit errors', function(done) { + table.createReadStream() + .once('error', function(err) { + assert.equal(err, error); + done(); + }) + .emit('reading'); + }); + + it('should end the stream', function(done) { + table.createReadStream() + .once('error', util.noop) + .once('finish', done) + .emit('reading'); + }); + }); + }); + + describe('createWriteStream', function() { + it('should use a string as the file type', function(done) { + var fileType = 'csv'; + + makeWritableStream_Override = function(stream, options) { + var load = options.metadata.configuration.load; + assert.equal(load.sourceFormat, 'CSV'); + done(); + }; + + table.createWriteStream(fileType).emit('writing'); + }); + + it('should throw if a given source format is not recognized', function() { + assert.throws(function() { + table.createWriteStream('zip'); + }, /Source format not recognized/); + + assert.throws(function() { + table.createWriteStream({ + sourceFormat: 'zip' + }); + }, /Source format not recognized/); + + assert.doesNotThrow(function() { + table.createWriteStream(); + table.createWriteStream({}); + }); + }); + + it('should return a stream', function() { + assert(table.createWriteStream() instanceof Stream); + }); + + describe('writable stream', function() { + it('should make a writable stream when written to', function(done) { + var stream; + + makeWritableStream_Override = function(s) { + assert.equal(s, stream); + done(); + }; + + stream = table.createWriteStream(); + stream.emit('writing'); + }); + + it('should pass the connection', function(done) { + makeWritableStream_Override = function(stream, options) { + assert.deepEqual(options.connection, table.connection); + done(); + }; + + table.createWriteStream().emit('writing'); + }); + + it('should pass extended metadata', function(done) { + makeWritableStream_Override = function(stream, options) { + assert.deepEqual(options.metadata, { + configuration: { + load: { + a: 'b', + c: 'd', + destinationTable: { + projectId: table.bigQuery.projectId, + datasetId: table.dataset.id, + tableId: table.id + } + } + } + }); + done(); + }; + + table.createWriteStream({ a: 'b', c: 'd' }).emit('writing'); + }); + + it('should pass the correct request uri', function(done) { + makeWritableStream_Override = function(stream, options) { + var uri = 'https://www.googleapis.com/upload/bigquery/v2/projects/' + + table.bigQuery.projectId + '/jobs'; + assert.equal(options.request.uri, uri); + done(); + }; + + table.createWriteStream().emit('writing'); + }); + + it('should create a job and emit it with complete', function(done) { + var jobId = 'job-id'; + var metadata = { jobReference: { jobId: jobId }, a: 'b', c: 'd' }; + + table.bigQuery.job = function(id) { + return { id: id }; + }; + + makeWritableStream_Override = function(stream, options, callback) { + callback(metadata); + }; + + table.createWriteStream() + .on('complete', function(job) { + assert.equal(job.id, jobId); + assert.deepEqual(job.metadata, metadata); + done(); + }) + .emit('writing'); + }); + }); + }); + + describe('delete', function() { + it('should send the correct API request to delete', function(done) { + table.makeReq_ = function(method, path, query, body, callback) { + assert.equal(method, 'DELETE'); + assert.equal(path, ''); + assert.strictEqual(query, null); + assert.strictEqual(body, null); + callback(); + }; + + table.delete(done); + }); + }); + + describe('export', function() { + var FILE = new FakeFile({ + name: 'bucket-name', + makeReq_: util.noop + }, 'file.json'); + + beforeEach(function() { + table.bigQuery.job = function(id) { + return { id: id }; + }; + }); + + it('should send the correct API request', function(done) { + table.bigQuery.makeReq_ = function(method, path, query, body) { + assert.equal(method, 'POST'); + assert.equal(path, '/jobs'); + assert.strictEqual(query, null); + assert.deepEqual(body.configuration.extract.sourceTable, { + datasetId: table.dataset.id, + projectId: table.bigQuery.projectId, + tableId: table.id + }); + done(); + }; + + table.export(FILE, assert.ifError); + }); + + it('should accept just a destination and a callback', function(done) { + table.bigQuery.makeReq_ = function(method, path, query, body, callback) { + callback(null, { jobReference: { jobId: 'job-id' }}); + }; + + table.export(FILE, done); + }); + + it('should parse out full gs:// urls from files', function(done) { + table.bigQuery.makeReq_ = function(method, path, query, body) { + assert.deepEqual(body.configuration.extract.destinationUris, [ + 'gs://' + FILE.bucket.name + '/' + FILE.name + ]); + done(); + }; + + table.export(FILE, done); + }); + + it('should throw if a destination is not a File', function() { + assert.throws(function() { + table.export({}, util.noop); + }, /Destination must be a File object/); + + assert.throws(function() { + table.export([FILE, {}], util.noop); + }, /Destination must be a File object/); + }); + + it('should detect file format if a format is not provided', function(done) { + table.bigQuery.makeReq_ = function(method, path, query, body) { + var destinationFormat = body.configuration.extract.destinationFormat; + assert.equal(destinationFormat, 'NEWLINE_DELIMITED_JSON'); + done(); + }; + + table.export(FILE, assert.ifError); + }); + + it('should assign the provided format if matched', function(done) { + table.bigQuery.makeReq_ = function(method, path, query, body) { + assert.equal(body.configuration.extract.destinationFormat, 'CSV'); + assert.strictEqual(body.configuration.extract.format, undefined); + done(); + }; + + table.export(FILE, { format: 'csv' }, assert.ifError); + }); + + it('should throw if a provided format is not recognized', function() { + assert.throws(function() { + table.export(FILE, { format: 'zip' }, util.noop); + }, /Destination format not recognized/); + }); + + it('should assign GZIP compression with gzip: true', function(done) { + table.bigQuery.makeReq_ = function(method, path, query, body) { + assert.equal(body.configuration.extract.compression, 'GZIP'); + assert.strictEqual(body.configuration.extract.gzip, undefined); + done(); + }; + + table.export(FILE, { gzip: true }, util.noop); + }); + + it('should execute the callback with error', function(done) { + var error = new Error('Error.'); + + table.bigQuery.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; + + table.export(FILE, function(err) { + assert.equal(err, error); + done(); + }); + }); + + it('should create a Job and returns it to the callback', function(done) { + var jobMetadata = { jobReference: { jobId: 'job-id' }, a: 'b', c: 'd' }; + + table.bigQuery.makeReq_ = function(method, path, query, body, callback) { + callback(null, jobMetadata); + }; + + table.export(FILE, function(err, job) { + assert.ifError(err); + assert.deepEqual(job.metadata, jobMetadata); + done(); + }); + }); + }); + + describe('getMetadata', function() { + it('should get metadata from api', function(done) { + table.makeReq_ = function(method, path, query, body) { + assert.equal(method, 'GET'); + assert.equal(path, ''); + assert.strictEqual(query, null); + assert.strictEqual(body, null); + done(); + }; + table.getMetadata(assert.ifError); + }); + + it('should execute callback with error', function(done) { + var error = new Error('Error.'); + table.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; + table.getMetadata(function(err) { + assert.equal(err, error); + done(); + }); + }); + + describe('metadata', function() { + var METADATA = { a: 'b', c: 'd' }; + + beforeEach(function() { + table.makeReq_ = function(method, path, query, body, callback) { + callback(null, METADATA); + }; + }); + + it('should update metadata on Table object', function(done) { + table.getMetadata(function(err) { + assert.ifError(err); + assert.deepEqual(table.metadata, METADATA); + done(); + }); + }); + + it('should execute callback with metadata', function(done) { + table.getMetadata(function(err, metadata) { + assert.ifError(err); + assert.deepEqual(metadata, METADATA); + done(); + }); + }); + }); + }); + + describe('getRows', function() { + it('should accept just a callback', function(done) { + table.makeReq_ = function(method, path, query, body, callback) { + callback(null, {}); + }; + table.getRows(done); + }); + + it('should make correct API request', function(done) { + var options = { a: 'b', c: 'd' }; + + table.makeReq_ = function(method, path, query, body, callback) { + assert.equal(method, 'GET'); + assert.equal(path, '/data'); + assert.deepEqual(query, options); + assert.strictEqual(body, null); + callback(null, {}); + }; + + table.getRows(options, done); + }); + + it('should refresh metadata if it does not have a schema', function(done) { + // Step 0: use "Stephen" so you know who to blame for this test. + var rows = [{ f: [{ v: 'stephen' }] }]; + var schema = { fields: [{ name: 'name', type: 'string' }] }; + + // Step 1: makes the request. + table.makeReq_ = function(method, path, query, body, callback) { + // Respond with a row, so it grabs the schema. + // Use setImmediate to let our getMetadata overwrite process. + setImmediate(callback, null, { rows: rows }); + }; + table.getRows(responseHandler); + + // Step 2: refreshes the metadata to pull down the schema. + table.getMetadata = function(callback) { + table.metadata = { schema: schema }; + callback(); + }; + + // Step 3: execute original complete handler with schema-merged rows. + function responseHandler(err, rows) { + assert.ifError(err); + assert.deepEqual(rows, [{ name: 'stephen' }]); + done(); + } + }); + + it('should return schema-merged rows', function(done) { + var rows = [{ f: [{ v: 'stephen' }] }]; + var schema = { fields: [{ name: 'name', type: 'string' }] }; + table.metadata = { schema: schema }; + + table.makeReq_ = function(method, path, query, body, callback) { + callback(null, { rows: rows }); + }; + + table.getRows(function(err, rows) { + assert.ifError(err); + assert.deepEqual(rows, [{ name: 'stephen' }]); + done(); + }); + }); + + it('should pass nextQuery if pageToken is returned', function(done) { + var options = { a: 'b', c: 'd' }; + var pageToken = 'token'; + + // Set a schema so it doesn't try to refresh the metadata. + table.metadata = { schema: {} }; + + table.makeReq_ = function(method, path, query, body, callback) { + callback(null, { pageToken: pageToken }); + }; + + table.getRows(options, function(err, rows, nextQuery) { + assert.ifError(err); + assert.deepEqual(nextQuery, { a: 'b', c: 'd', pageToken: pageToken }); + // Original object isn't affected. + assert.deepEqual(options, { a: 'b', c: 'd' }); + done(); + }); + }); + }); + + describe('import', function() { + var FILEPATH = require.resolve('../testdata/response_get.json'); + var FILE = new FakeFile({ + name: 'bucket-name', + makeReq_: util.noop + }, 'file.json'); + + it('should accept just a File and a callback', function(done) { + table.createWriteStream = function() { + var stream = through(); + setImmediate(function() { + stream.emit('complete'); + }); + return stream; + }; + + table.import(FILEPATH, done); + }); + + it('should return a stream when a string is given', function() { + table.createWriteStream = through; + + assert(table.import(FILEPATH) instanceof Stream); + }); + + it('should infer the file format from the given filepath', function(done) { + table.createWriteStream = function(metadata) { + assert.equal(metadata.sourceFormat, 'NEWLINE_DELIMITED_JSON'); + var stream = through(); + setImmediate(function() { + stream.emit('complete'); + }); + return stream; + }; + + table.import(FILEPATH, done); + }); + + it('should not infer the file format if one is given', function(done) { + table.createWriteStream = function(metadata) { + assert.equal(metadata.sourceFormat, 'CSV'); + var stream = through(); + setImmediate(function() { + stream.emit('complete'); + }); + return stream; + }; + + table.import(FILEPATH, { sourceFormat: 'CSV' }, done); + }); + + it('should throw if a File object is not provided', function() { + assert.throws(function() { + table.import({}); + }, /Source must be a File object/); + }); + + it('should convert File objects to gs:// urls', function(done) { + table.bigQuery.makeReq_ = function(method, path, query, body) { + var sourceUri = body.configuration.load.sourceUris[0]; + assert.equal(sourceUri, 'gs://' + FILE.bucket.name + '/' + FILE.name); + done(); + }; + + table.import(FILE, assert.ifError); + }); + + it('should infer the file format from a File object', function(done) { + table.bigQuery.makeReq_ = function(method, path, query, body) { + var sourceFormat = body.configuration.load.sourceFormat; + assert.equal(sourceFormat, 'NEWLINE_DELIMITED_JSON'); + done(); + }; + + table.import(FILE, assert.ifError); + }); + + it('should not override a provided format with a File', function(done) { + table.bigQuery.makeReq_ = function(method, path, query, body) { + var sourceFormat = body.configuration.load.sourceFormat; + assert.equal(sourceFormat, 'CSV'); + done(); + }; + + table.import(FILE, { sourceFormat: 'CSV' }, assert.ifError); + }); + + it('should execute the callback with error', function(done) { + var error = new Error('Error.'); + + table.bigQuery.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; + + table.import(FILE, function(err) { + assert.equal(err, error); + done(); + }); + }); + + it('should create a Job and return it to the callback', function(done) { + var jobMetadata = { jobReference: { jobId: 'job-id' }, a: 'b', c: 'd' }; + + table.bigQuery.makeReq_ = function(method, path, query, body, callback) { + callback(null, jobMetadata); + }; + + table.import(FILE, function(err, job) { + assert.ifError(err); + assert.deepEqual(job.metadata, jobMetadata); + done(); + }); + }); + }); + + describe('insert', function() { + var data = [ + { state: 'MI', gender: 'M', year: '2015', name: 'Berkley', count: '0' }, + { state: 'MI', gender: 'M', year: '2015', name: 'Berkley', count: '0' }, + { state: 'MI', gender: 'M', year: '2015', name: 'Berkley', count: '0' }, + { state: 'MI', gender: 'M', year: '2015', name: 'Berkley', count: '0' }, + { state: 'MI', gender: 'M', year: '2015', name: 'Berkley', count: '0' } + ]; + + it('should save data', function(done) { + table.makeReq_ = function(method, path, query, body, callback) { + callback(null, { insertErrors: [] }); + }; + + table.insert(data, done); + }); + }); + + describe('query', function() { + it('should pass args through to datasetInstance.query()', function(done) { + table.dataset.query = function(a, b) { + assert.equal(a, 'a'); + assert.equal(b, 'b'); + done(); + }; + + table.query('a', 'b'); + }); + }); + + describe('setMetadata', function() { + var METADATA = { a: 'b', c: 'd' }; + + it('should send request to the api', function(done) { + table.makeReq_ = function(method, path, query, body) { + assert.equal(method, 'PUT'); + assert.equal(path, ''); + assert.strictEqual(query, null); + assert.deepEqual(body, METADATA); + done(); + }; + table.setMetadata(METADATA, assert.ifError); + }); + + it('should convert a name to a friendly name', function(done) { + var name = 'a new name'; + table.makeReq_ = function(method, path, query, body) { + assert.equal(body.friendlyName, name); + done(); + }; + table.setMetadata({ name: name }, assert.ifError); + }); + + it('should accept a schema', function(done) { + table.makeReq_ = function(method, path, query, body) { + assert.deepEqual(body.schema, { + fields: [{ name: 'schema', type: 'string' }] + }); + done(); + }; + table.setMetadata({ schema: 'schema' }); + }); + + it('should execute callback with error', function(done) { + var error = new Error('Error.'); + table.makeReq_ = function(method, path, query, body, callback) { + callback(error); + }; + table.setMetadata(METADATA, function(err) { + assert.equal(err, error); + done(); + }); + }); + + describe('metadata', function() { + beforeEach(function() { + table.makeReq_ = function(method, path, query, body, callback) { + callback(null, METADATA); + }; + }); + + it('should update metadata on Dataset object', function(done) { + table.setMetadata(METADATA, function(err) { + assert.ifError(err); + assert.deepEqual(table.metadata, METADATA); + done(); + }); + }); + + it('should execute callback with metadata', function(done) { + table.setMetadata(METADATA, function(err, metadata) { + assert.ifError(err); + assert.deepEqual(metadata, METADATA); + done(); + }); + }); + }); + }); + + describe('makeReq_', function() { + it('should prefix the path', function(done) { + var path = '/test-path'; + + table.dataset.makeReq_ = function(method, p) { + assert.equal(p, '/tables/' + table.id + path); + done(); + }; + + table.makeReq_('POST', path); + }); + + it('should pass through arguments', function(done) { + var method = 'POST'; + var query = { a: 'b', c: 'd', e: { f: 'g' } }; + var body = { a: 'b', c: 'd', e: { f: 'g' } }; + var callback = util.noop; + + table.dataset.makeReq_ = function(m, p, q, b, c) { + assert.equal(m, method); + assert.deepEqual(q, query); + assert.deepEqual(b, body); + assert.equal(c, callback); + done(); + }; + + table.makeReq_(method, '/path', query, body, callback); + }); + }); +});