Skip to content

Commit

Permalink
First draft of streaming insert sample (#196)
Browse files Browse the repository at this point in the history
* First draft of streaming insert sample

* Fix flaky test + address comments

* Fix comments

* Fix flaky test
  • Loading branch information
Ace Nassri authored Aug 31, 2016
1 parent 241d28f commit 2f9b34f
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 10 deletions.
40 changes: 33 additions & 7 deletions bigquery/system-test/tables.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,21 @@ var path = require('path');
function generateUuid () {
return 'nodejs_docs_samples_' + uuid.v4().replace(/-/gi, '_');
}

var rows = [
{ Name: 'foo', Age: 27, Weight: 80.3, IsMagic: true },
{ Name: 'bar', Age: 13, Weight: 54.6, IsMagic: false }
];
var options = {
projectId: process.env.GCLOUD_PROJECT,
localFilePath: path.join(__dirname, '../resources/data.csv'),
bucket: generateUuid(),
file: 'data.json',
dataset: generateUuid(),
table: generateUuid(),
schema: 'Name:string, Age:integer, Weigth:float, IsMagic:boolean'
schema: 'Name:string, Age:integer, Weight:float, IsMagic:boolean',
rows: rows
};

var file = storage.bucket(options.bucket).file(options.file);

describe('bigquery:tables', function () {
before(function (done) {
// Create bucket
Expand Down Expand Up @@ -62,7 +64,9 @@ describe('bigquery:tables', function () {
return done(err);
}
// Delete bucket
storage.bucket(options.bucket).delete(done);
setTimeout(function () {
storage.bucket(options.bucket).delete(done);
}, 2000);
});
});
});
Expand Down Expand Up @@ -122,7 +126,7 @@ describe('bigquery:tables', function () {
assert(metadata.status, 'job metadata has status');
assert.equal(metadata.status.state, 'DONE', 'job was finished');

file.exists(function (err, exists) {
storage.bucket(options.bucket).file(options.file).exists(function (err, exists) {
assert.ifError(err, 'file existence check succeeded');
assert(exists, 'export destination exists');
done();
Expand All @@ -131,8 +135,30 @@ describe('bigquery:tables', function () {
});
});

describe('insertRowsAsStream', function () {
it('should insert rows into a table', function (done) {
var table = bigquery.dataset(options.dataset).table(options.table);
table.getRows({}, function (err, startRows) {
assert.equal(err, null);

program.insertRowsAsStream(options, function (err, insertErrors) {
assert.equal(err, null);
assert.deepEqual(insertErrors, [], 'no per-row insert errors occurred');

setTimeout(function () {
table.getRows({}, function (err, endRows) {
assert.equal(err, null);
assert.equal(startRows.length + 2, endRows.length, 'insertRows() added 2 rows');
done();
});
}, 2000);
});
});
});
});

describe('deleteTable', function () {
it('should list tables', function (done) {
it('should delete table', function (done) {
program.deleteTable(options, function (err) {
assert.ifError(err);
assert(console.log.calledWith('Deleted table: %s', options.table));
Expand Down
58 changes: 56 additions & 2 deletions bigquery/tables.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,18 +184,41 @@ function exportTableToGCS (options, callback) {
});
}
// [END export_table_to_gcs]

// [START insert_rows_as_stream]
/**
* Insert rows (as a stream) into a BigQuery table.
* @param {object} options Configuration options.
* @param {array} options.rows An array of rows to insert into a BigQuery table.
* @param {string} options.dataset The ID of the dataset containing the target table.
* @param {string} options.table The ID of the table to insert rows into.
* @param {function} callback Callback function to receive query status.
*/
function insertRowsAsStream (options, callback) {
var table = bigquery.dataset(options.dataset).table(options.table);
table.insert(options.rows, function (err, insertErrors) {
if (err) {
return callback(err);
}
console.log('Inserted %d rows!', options.rows.length);
return callback(null, insertErrors);
});
}
// [END insert_rows_as_stream]
// [END all]

// The command-line program
var cli = require('yargs');
var utils = require('../utils');
var fs = require('fs');

var program = module.exports = {
createTable: createTable,
listTables: listTables,
deleteTable: deleteTable,
importFile: importFile,
exportTableToGCS: exportTableToGCS,
insertRowsAsStream: insertRowsAsStream,
main: function (args) {
// Run the command-line program
cli.help().strict().parse(args).argv;
Expand Down Expand Up @@ -243,6 +266,29 @@ cli
}, function (options) {
program.exportTableToGCS(utils.pick(options, ['dataset', 'table', 'bucket', 'file', 'format', 'gzip']), utils.makeHandler());
})
.command('insert <dataset> <table> <json_or_file>',
'Insert a JSON array (as a string or newline-delimited file) into a BigQuery table.', {},
function (options) {
var content;
try {
content = fs.readFileSync(options.json_or_file);
} catch (err) {
content = options.json_or_file;
}

var rows = null;
try {
rows = JSON.parse(content);
} catch (err) {}

if (!Array.isArray(rows)) {
throw new Error('"json_or_file" (or the file it points to) is not a valid JSON array.');
}

options.rows = rows;
program.insertRowsAsStream(utils.pick(options, ['rows', 'dataset', 'table']), utils.makeHandler());
}
)
.example(
'node $0 create my_dataset my_table',
'Create table "my_table" in "my_dataset".'
Expand All @@ -265,11 +311,19 @@ cli
)
.example(
'node $0 export my_dataset my_table my-bucket my-file',
'Export my_dataset:my_table to gcs://my-bucket/my-file as raw CSV'
'Export my_dataset:my_table to gcs://my-bucket/my-file as raw CSV.'
)
.example(
'node $0 export my_dataset my_table my-bucket my-file -f JSON --gzip',
'Export my_dataset:my_table to gcs://my-bucket/my-file as gzipped JSON'
'Export my_dataset:my_table to gcs://my-bucket/my-file as gzipped JSON.'
)
.example(
'node $0 insert my_dataset my_table json_string',
'Insert the JSON array represented by json_string into my_dataset:my_table.'
)
.example(
'node $0 insert my_dataset my_table json_file',
'Insert the JSON objects contained in json_file (one per line) into my_dataset:my_table.'
)
.wrap(100)
.recommendCommands()
Expand Down
131 changes: 130 additions & 1 deletion bigquery/test/tables.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ var dataset = 'dataset';
var table = 'table';
var format = 'JSON';
var schema = 'schema';
var jsonArray = [
{ name: 'foo', age: 27 },
{ name: 'bar', age: 13 }
];
var validJsonFile = 'validJsonFile';
var invalidJsonFile = 'invalidJsonFile';
var validJsonString = JSON.stringify(jsonArray);
var invalidJsonString = 'INVALID';
var errorList = ['error 1', 'error 2'];

function getSample () {
var tableMocks = [
Expand All @@ -44,7 +53,8 @@ function getSample () {
var tableMock = {
export: sinon.stub().yields(null, jobMock),
delete: sinon.stub().yields(null),
import: sinon.stub().yields(null, jobMock)
import: sinon.stub().yields(null, jobMock),
insert: sinon.stub().yields(null, errorList)
};
var datasetMock = {
table: sinon.stub().returns(tableMock),
Expand All @@ -57,11 +67,17 @@ function getSample () {
};
var BigQueryMock = sinon.stub().returns(bigqueryMock);
var StorageMock = sinon.stub().returns(storageMock);
var fsMock = {
readFileSync: sinon.stub().throws(new Error('Invalid file.'))
};
fsMock.readFileSync.withArgs(validJsonFile).returns(validJsonString);
fsMock.readFileSync.withArgs(invalidJsonFile).returns(invalidJsonString);

return {
program: proxyquire('../tables', {
'@google-cloud/bigquery': BigQueryMock,
'@google-cloud/storage': StorageMock,
'fs': fsMock,
yargs: proxyquire('yargs', {})
}),
mocks: {
Expand All @@ -74,6 +90,7 @@ function getSample () {
table: tableMock,
bucket: bucketMock,
dataset: datasetMock,
fs: fsMock,
tables: tableMocks
}
};
Expand Down Expand Up @@ -290,6 +307,45 @@ describe('bigquery:tables', function () {
});
});

describe('insertRowsAsStream', function () {
var options = {
file: file,
dataset: dataset,
table: table,
rows: jsonArray
};

it('should stream-insert rows into a table', function () {
var program = getSample().program;
var callback = sinon.stub();

program.insertRowsAsStream(options, callback);
assert.equal(callback.calledOnce, true);
assert.deepEqual(callback.firstCall.args, [null, errorList]);
});

it('should handle API errors', function () {
var example = getSample();
var callback = sinon.stub();
var error = new Error('error');
example.mocks.table.insert = sinon.stub().yields(error);

example.program.insertRowsAsStream(options, callback);
assert.equal(callback.calledOnce, true);
assert.deepEqual(callback.firstCall.args, [error]);
});

it('should handle (per-row) insert errors', function () {
var example = getSample();
var callback = sinon.stub();
example.mocks.table.insert = sinon.stub().yields(null, errorList);

example.program.insertRowsAsStream(options, callback);
assert.equal(callback.calledOnce, true);
assert.deepEqual(callback.firstCall.args, [null, errorList]);
});
});

describe('main', function () {
it('should call createTable', function () {
var program = getSample().program;
Expand Down Expand Up @@ -349,6 +405,19 @@ describe('bigquery:tables', function () {
}]);
});

it('should call insertRowsAsStream', function () {
var program = getSample().program;
program.insertRowsAsStream = sinon.stub();

program.main(['insert', dataset, table, validJsonFile]);

assert.equal(program.insertRowsAsStream.calledOnce, true);
assert.deepEqual(
program.insertRowsAsStream.firstCall.args.slice(0, -1),
[{ rows: jsonArray, dataset: dataset, table: table }]
);
});

it('should recognize --gzip flag', function () {
var program = getSample().program;
program.exportTableToGCS = sinon.stub();
Expand Down Expand Up @@ -380,5 +449,65 @@ describe('bigquery:tables', function () {
gzip: false
}]);
});

describe('insert', function () {
var options = {
dataset: dataset,
table: table,
rows: jsonArray
};

it('should accept valid JSON files', function () {
var program = getSample().program;
program.insertRowsAsStream = sinon.stub();

program.main(['insert', dataset, table, validJsonFile]);

assert.equal(program.insertRowsAsStream.calledOnce, true);
assert.deepEqual(program.insertRowsAsStream.firstCall.args.slice(0, -1), [options]);
});

it('should reject files with invalid JSON', function () {
var program = getSample().program;
program.insertRowsAsStream = sinon.stub();

assert.throws(
function () { program.main(['insert', dataset, table, invalidJsonFile]); },
/"json_or_file" \(or the file it points to\) is not a valid JSON array\./
);
assert.equal(program.insertRowsAsStream.called, false);
});

it('should reject invalid file names', function () {
var program = getSample().program;
program.insertRowsAsStream = sinon.stub();

assert.throws(
function () { program.main(['insert', dataset, table, '']); },
/"json_or_file" \(or the file it points to\) is not a valid JSON array\./
);
assert.equal(program.insertRowsAsStream.called, false);
});

it('should accept valid JSON strings', function () {
var program = getSample().program;
program.insertRowsAsStream = sinon.stub();

program.main(['insert', dataset, table, validJsonString]);
assert.equal(program.insertRowsAsStream.calledOnce, true);
assert.deepEqual(program.insertRowsAsStream.firstCall.args.slice(0, -1), [options]);
});

it('should reject invalid JSON strings', function () {
var program = getSample().program;
program.insertRowsAsStream = sinon.stub();

assert.throws(
function () { program.main(['insert', dataset, table, invalidJsonString]); },
/"json_or_file" \(or the file it points to\) is not a valid JSON array\./
);
assert.equal(program.insertRowsAsStream.called, false);
});
});
});
});

0 comments on commit 2f9b34f

Please sign in to comment.