diff --git a/app/constants/constants.js b/app/constants/constants.js index f22feba9e..c24d8c3e7 100644 --- a/app/constants/constants.js +++ b/app/constants/constants.js @@ -449,7 +449,7 @@ export const SAMPLE_DBS = { [DIALECTS.ATHENA]: { s3Outputlocation: 'plotly-s3-connector-test', accessKey: 'AKIAIMHMSHTGARJYSKMQ', - secretKey: 'Urvus4R7MnJOAqT4U3eovlCBimQ4Zg2Y9sV5LWow', + secretAccessKey: 'Urvus4R7MnJOAqT4U3eovlCBimQ4Zg2Y9sV5LWow', region: 'us-east-1', database: 'falcon', queryTimeout: 5000 diff --git a/backend/persistent/datastores/athena.js b/backend/persistent/datastores/athena.js index 2c81692b5..5eb6f458f 100644 --- a/backend/persistent/datastores/athena.js +++ b/backend/persistent/datastores/athena.js @@ -1,37 +1,10 @@ -import {executeQuery} from './drivers/athena'; +import {createAthenaClient, executeQuery} from './drivers/athena'; -import Logger from '../../logger'; const SHOW_TABLES_QUERY = 'SHOW TABLES'; -const SHOW_SCHEMA_QUERY = 'SELECT table_name, column_name, data_type FROM ' - + 'information_schema.columns WHERE table_schema '; +const SHOW_SCHEMA_QUERY = + 'SELECT table_name, column_name, data_type FROM information_schema.columns WHERE table_schema'; const DEFAULT_QUERY_INTERVAL = 2000; -const NUMBER_OF_RETRIES = 50; -const AWS = require('aws-sdk'); -/** - * The following function will create an AWS Athena Client - * @param {object} connection - AWS Athena Connection Parameters - * @param {string} connection.accessKey - AWS Access Key - * @param {string} connection.secretAccessKey - AWS Secret Key - * @param {string} connection.region - AWS Region - * @returns {object} AWS Athena Client - */ -function createAthenaClient(connection) { - const connectionParams = { - apiVersion: '2017-05-18', - accessKeyId: connection.accessKey, - secretAccessKey: connection.secretAccessKey, - region: connection.region, - maxRetries: NUMBER_OF_RETRIES - }; - - if (connection.sslEnabled) { - connectionParams.sslEnabled = connection.sslEnabled; - } - const athenaClient = new AWS.Athena(connectionParams); - - return athenaClient; -} /* * The connection function will validate the parameters and return the connection * parameters @@ -52,14 +25,8 @@ export function connect(connection) { connection.athenaClient = createAthenaClient(connection); - return new Promise(function(resolve, reject) { - return schemas(connection).then(() => { - resolve(connection); - }).catch(err => { - Logger.log(err); - reject(err); - }); - }); + return query('SELECT table_name FROM information_schema.columns LIMIT 1', connection) + .then(() => connection); } /** @@ -69,41 +36,21 @@ export function connect(connection) { * @returns {Promise} that resolves to { columnnames, rows } */ export function query(queryObject, connection) { - let columnnames = []; - const rows = []; + connection.sqlStatement = queryObject; - return new Promise(function(resolve, reject) { + return executeQuery(connection).then(dataSet => { + let columnnames = []; + let rows = []; - if (!queryObject) { - return reject(new Error('The SQL Statement was not defined')); + if (dataSet && dataSet.length > 0) { + // First row contains the column names + columnnames = dataSet[0].Data.map(columnname => columnname.VarCharValue); + + // Loop through the remaining rows to extract data + rows = dataSet.slice(1).map(row => row.Data.map(element => element.VarCharValue)); } - connection.sqlStatement = queryObject; - executeQuery(connection).then(dataSet => { - if (dataSet && dataSet.length > 0) { - // First column contains the column names - const cols = dataSet[0].Data; - columnnames = cols.map(col => col.VarCharValue); - // Loop through the remaining rows to extract data - for (let i = 1; i < dataSet.length; i++) { - const row = dataSet[i]; - // Ensure Row is defined and has expected number of columns - if (row && row.Data && row.Data.length === columnnames.length) { - const r = row.Data.map(element => { - if (element && element.VarCharValue) { - return element.VarCharValue; - } - return ''; - }); - rows.push(r); - } - } - } - resolve({columnnames, rows}); - }).catch(err => { - Logger.log(err); - reject(err); - }); + return {columnnames, rows}; }); } @@ -118,32 +65,9 @@ export function query(queryObject, connection) { * @returns {Promise} that resolves to { columnnames, rows } */ export function schemas(connection) { - const columnnames = ['table_name', 'column_name', 'data_type']; - const rows = []; - connection.sqlStatement = `${SHOW_SCHEMA_QUERY} = '${connection.database}'` ; - connection.queryInterval = DEFAULT_QUERY_INTERVAL; - return new Promise(function(resolve, reject) { - executeQuery(connection).then(dataSet => { - if (dataSet && dataSet.length > 0) { - for (let i = 0; i < dataSet.length; i++) { - const data = dataSet[i]; - if (data && data.Data && data.Data.length > 0) { - if (i !== 0) { - const row = []; - row.push(data.Data[0].VarCharValue); // Table Name - row.push(data.Data[1].VarCharValue); // Column Name - row.push(data.Data[2].VarCharValue); // DataType - rows.push(row); - } - } - } - } - return resolve({columnnames, rows}); - }).catch(err => { - Logger.log(err); - reject(err); - }); - }); + const sqlStatement = `${SHOW_SCHEMA_QUERY} = '${connection.database}'` ; + + return query(sqlStatement, connection); } @@ -158,26 +82,10 @@ export function schemas(connection) { * @returns {Promise} that resolves to { columnnames, rows } */ export function tables(connection) { - connection.sqlStatement = SHOW_TABLES_QUERY; - connection.queryInterval = DEFAULT_QUERY_INTERVAL; - return new Promise(function(resolve, reject) { - executeQuery(connection).then(dataSet => { - let rst = []; - if (dataSet && dataSet.length > 0) { - rst = dataSet.map(data => { - if (data && data.Data && data.Data.length > 0) { - return data.Data[0].VarCharValue; - } - return ''; - - }); - } - return resolve(rst); - }).catch(err => { - Logger.log(err); - return reject(err); - }); + return executeQuery(connection).then(dataSet => { + const tableNames = dataSet.slice(1).map(row => row.Data[0].VarCharValue); + return tableNames; }); -} \ No newline at end of file +} diff --git a/backend/persistent/datastores/drivers/athena.js b/backend/persistent/datastores/drivers/athena.js index f274896fe..35f128807 100644 --- a/backend/persistent/datastores/drivers/athena.js +++ b/backend/persistent/datastores/drivers/athena.js @@ -1,6 +1,33 @@ 'use strict'; +const AWS = require('aws-sdk'); import Logger from '../../../logger'; +const NUMBER_OF_RETRIES = 50; + +/** + * The following function will create an AWS Athena Client + * @param {object} connection - AWS Athena Connection Parameters + * @param {string} connection.accessKey - AWS Access Key + * @param {string} connection.secretAccessKey - AWS Secret Key + * @param {string} connection.region - AWS Region + * @returns {object} AWS Athena Client + */ +export function createAthenaClient(connection) { + const connectionParams = { + apiVersion: '2017-05-18', + accessKeyId: connection.accessKey, + secretAccessKey: connection.secretAccessKey, + region: connection.region, + maxRetries: NUMBER_OF_RETRIES + }; + + if (connection.sslEnabled) { + connectionParams.sslEnabled = connection.sslEnabled; + } + const athenaClient = new AWS.Athena(connectionParams); + + return athenaClient; +} /** * The following method will execute the sql statement to query the @@ -203,4 +230,4 @@ export function executeQuery(queryParams) { return reject(err); }); }); -} \ No newline at end of file +} diff --git a/test/backend/datastores.athena.spec.js b/test/backend/datastores.athena.spec.js index 2e64fd8b0..d8d15102f 100644 --- a/test/backend/datastores.athena.spec.js +++ b/test/backend/datastores.athena.spec.js @@ -3,111 +3,52 @@ const nock = require('nock'); import {assert} from 'chai'; import uuid from 'uuid'; -import AWS from 'aws-sdk'; -import {schemas, query, tables} from '../../backend/persistent/datastores/athena'; +import {connect, schemas, query, tables} from '../../backend/persistent/datastores/athena'; describe('Athena:', function () { const URL = 'https://athena.us-east-1.amazonaws.com:443'; const PATH = '/'; + // Connection object shared by all the tests + const conn = { + region: 'us-east-1', + accessKey: 'XXXXXXXX', + secretAccessKey: 'XXXXXAAAA', + database: 'PLOT.LY-TEST', + outputS3Bucket: 's3://aws-athena-query-results-11111111-us-east-1/', + queryInterval: 1000, + maxRetries: 50 + }; + before(function() { // Enable nock if it has been disabled by other specs if (!nock.isActive()) nock.activate(); }); after(function() { + // Disable nock nock.restore(); }); - it('schemas() retrieves schemas of all tables', function() { - const options = { - region: 'us-east-1', - accessKey: 'XXXXXXXX', - secretAccessKey: 'XXXXXAAAA', - database: 'PLOT.LY-TEST', - outputS3Bucket: 's3://aws-athena-query-results-11111111-us-east-1/', - queryInterval: 1000, - maxRetries: 50 - }; - - const athenaClient = new AWS.Athena(options); - const conn = { ...options, athenaClient}; + it('connect() succeeds', function() { + const queryStatement = 'SELECT table_name FROM information_schema.columns LIMIT 1'; + const columnNames = []; + const rows = []; - const {database, outputS3Bucket} = conn.database; - const queryStatement = - `SELECT table_name, column_name, data_type FROM information_schema.columns WHERE table_schema = '${database}'`; - const queryExecutionId = uuid.v4(); - const submissionDateTime = 1522797420.024; + mockAthenaResponses(queryStatement, columnNames, rows); - // mock connect response - nock(URL).post(PATH).reply(200, { - QueryExecutionId: queryExecutionId - }); - - nock(URL).post(PATH).reply(200, { - 'QueryExecution': { - 'Query': queryStatement, - 'QueryExecutionContext': {'Database': database}, - 'QueryExecutionId': queryExecutionId, - 'ResultConfiguration': { - 'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}, - 'OutputLocation': outputS3Bucket - }, - 'Statistics': {}, - 'Status': { - 'State': 'RUNNING', - 'SubmissionDateTime': submissionDateTime - } - }, - 'QueryExecutionDetail': { - 'OutputLocation': outputS3Bucket, - 'Query': queryStatement, - 'QueryExecutionContext': {'Database': database}, - 'QueryExecutionId': queryExecutionId, - 'ResultConfiguration': { - 'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}, - 'OutputLocation': outputS3Bucket - }, - 'Stats': {}, - 'Status': { - 'State': 'RUNNING', - 'SubmissionDateTime': submissionDateTime - } - } + return connect(conn).then(function(connection) { + assert.isDefined(connection.athenaClient, 'connection.athenaClient is undefined'); }); + }); - nock(URL).post(PATH).reply(200, { - 'QueryExecution': { - 'Query': queryStatement, - 'QueryExecutionContext': {'Database': database}, - 'QueryExecutionId': queryExecutionId, - 'ResultConfiguration': { - 'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}, - 'OutputLocation': outputS3Bucket - }, - 'Statistics': {}, - 'Status': { - 'State': 'SUCCEEDED', - 'SubmissionDateTime': submissionDateTime - } - }, - 'QueryExecutionDetail': { - 'OutputLocation': outputS3Bucket, - 'Query': queryStatement, - 'QueryExecutionContext': {'Database': database}, - 'QueryExecutionId': queryExecutionId, - 'ResultConfiguration': { - 'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}, - 'OutputLocation': outputS3Bucket - }, - 'Stats': {}, - 'Status': { - 'State': 'SUCCEEDED', - 'SubmissionDateTime': submissionDateTime - } - } - }); + it('schemas() retrieves schemas of all tables', function() { + const queryStatement = ` + SELECT table_name, column_name, data_type + FROM information_schema.columns + WHERE table_schema = '${conn.database}' + `; const columnNames = [ 'table_name', @@ -135,49 +76,7 @@ describe('Athena:', function () { ['glue_cleaned_logs', 'subtype', 'varchar'] ]; - const headerAndRows = [columnNames].concat(rows); - - const columnInfos = columnNames.map(function(name) { - return { - 'CaseSensitive': true, - 'CatalogName': 'hive', - 'Label': name, - 'Name': name, - 'Nullable': 'UNKNOWN', - 'Precision': 2147483647, - 'Scale': 0, - 'SchemaName': '', - 'TableName': '', - 'Type': 'varchar' - }; - }); - - const resultRows = headerAndRows.map(function(row) { - return { - 'Data': row - }; - }); - - const resultRows2 = headerAndRows.map(function(row) { - return { - 'Data': row.map(function(value) { - return {'VarCharValue': value}; - }) - }; - }); - - nock(URL).post(PATH).reply(200, { - 'ResultSet': { - 'ColumnInfos': columnInfos, - 'ResultRows': resultRows, - 'ResultSetMetadata': { - 'ColumnInfo': columnInfos - }, - 'Rows': resultRows2 - }, - 'UpdateCount': 0, - 'UpdateType': '' - }); + mockAthenaResponses(queryStatement, columnNames, rows); return schemas(conn).then(function(results) { assert.deepEqual(results.columnnames, columnNames, 'Unexpected column names'); @@ -186,93 +85,7 @@ describe('Athena:', function () { }); it('query() executes a query', function() { - const options = { - region: 'us-east-1', - accessKey: 'XXXXXXXX', - secretAccessKey: 'XXXXXAAAA', - database: 'PLOT.LY-TEST', - outputS3Bucket: 's3://aws-athena-query-results-11111111-us-east-1/', - queryInterval: 1000, - maxRetries: 50 - }; - - const athenaClient = new AWS.Athena(options); - const conn = { ...options, athenaClient}; - - const {database, outputS3Bucket} = conn.database; - const queryStatement = - `SELECT table_name, column_name, data_type FROM information_schema.columns WHERE table_schema = '${database}'`; - const queryExecutionId = uuid.v4(); - const submissionDateTime = 1522797420.024; - - // mock connect response - nock(URL).post(PATH).reply(200, { - QueryExecutionId: queryExecutionId - }); - - nock(URL).post(PATH).reply(200, { - 'QueryExecution': { - 'Query': queryStatement, - 'QueryExecutionContext': {'Database': database}, - 'QueryExecutionId': queryExecutionId, - 'ResultConfiguration': { - 'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}, - 'OutputLocation': outputS3Bucket - }, - 'Statistics': {}, - 'Status': { - 'State': 'RUNNING', - 'SubmissionDateTime': submissionDateTime - } - }, - 'QueryExecutionDetail': { - 'OutputLocation': outputS3Bucket, - 'Query': queryStatement, - 'QueryExecutionContext': {'Database': database}, - 'QueryExecutionId': queryExecutionId, - 'ResultConfiguration': { - 'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}, - 'OutputLocation': outputS3Bucket - }, - 'Stats': {}, - 'Status': { - 'State': 'RUNNING', - 'SubmissionDateTime': submissionDateTime - } - } - }); - - nock(URL).post(PATH).reply(200, { - 'QueryExecution': { - 'Query': queryStatement, - 'QueryExecutionContext': {'Database': database}, - 'QueryExecutionId': queryExecutionId, - 'ResultConfiguration': { - 'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}, - 'OutputLocation': outputS3Bucket - }, - 'Statistics': {}, - 'Status': { - 'State': 'SUCCEEDED', - 'SubmissionDateTime': submissionDateTime - } - }, - 'QueryExecutionDetail': { - 'OutputLocation': outputS3Bucket, - 'Query': queryStatement, - 'QueryExecutionContext': {'Database': database}, - 'QueryExecutionId': queryExecutionId, - 'ResultConfiguration': { - 'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}, - 'OutputLocation': outputS3Bucket - }, - 'Stats': {}, - 'Status': { - 'State': 'SUCCEEDED', - 'SubmissionDateTime': submissionDateTime - } - } - }); + const queryStatement = "select serialnumber from clean_logs where serialnumber != '' limit 10"; const columnNames = [ 'serialnumber' @@ -291,74 +104,38 @@ describe('Athena:', function () { [ '864875034115451' ] ]; - const headerAndRows = [columnNames].concat(rows); + mockAthenaResponses(queryStatement, columnNames, rows); - const columnInfos = columnNames.map(function(name) { - return { - 'CaseSensitive': true, - 'CatalogName': 'hive', - 'Label': name, - 'Name': name, - 'Nullable': 'UNKNOWN', - 'Precision': 2147483647, - 'Scale': 0, - 'SchemaName': '', - 'TableName': '', - 'Type': 'varchar' - }; + return query(queryStatement, conn).then(function(results) { + assert.deepEqual(results.columnnames, columnNames, 'Unexpected column names'); + assert.deepEqual(results.rows, rows, 'Unexpected rows'); }); + }); - const resultRows = headerAndRows.map(function(row) { - return { - 'Data': row - }; - }); + it('tables() executes a query', function() { + const queryStatement = 'SHOW TABLES'; - const resultRows2 = headerAndRows.map(function(row) { - return { - 'Data': row.map(function(value) { - return {'VarCharValue': value}; - }) - }; - }); + const columnNames = [ + 'table_name' + ]; - nock(URL).post(PATH).reply(200, { - 'ResultSet': { - 'ColumnInfos': columnInfos, - 'ResultRows': resultRows, - 'ResultSetMetadata': { - 'ColumnInfo': columnInfos - }, - 'Rows': resultRows2 - }, - 'UpdateCount': 0, - 'UpdateType': '' - }); + const rows = [ + [ 'clean_logs' ], + [ 'clean_logs_json' ], + [ 'glue_cleaned_logs' ], + [ 'test' ] + ]; - const stmt = "select serialnumber from clean_logs where serialnumber != '' limit 10"; - return query(stmt, conn).then(function(results) { - assert.deepEqual(results.columnnames, columnNames, 'Unexpected column names'); - assert.deepEqual(results.rows, rows, 'Unexpected rows'); + mockAthenaResponses(queryStatement, columnNames, rows); + + return tables(conn).then(function(results) { + const expectedRows = rows.map(row => row[0]); + assert.deepEqual(results, expectedRows, 'Unexpected rows'); }); }); - it('tables() executes a query', function() { - const options = { - region: 'us-east-1', - accessKey: 'XXXXXXXX', - secretAccessKey: 'XXXXXAAAA', - database: 'PLOT.LY-TEST', - outputS3Bucket: 's3://aws-athena-query-results-11111111-us-east-1/', - queryInterval: 1000, - maxRetries: 50 - }; - - const athenaClient = new AWS.Athena(options); - const conn = { ...options, athenaClient}; - + function mockAthenaResponses(queryStatement, columnNames, rows) { const {database, outputS3Bucket} = conn.database; - const queryStatement = - 'SHOW TABLES'; const queryExecutionId = uuid.v4(); const submissionDateTime = 1522797420.024; @@ -431,17 +208,6 @@ describe('Athena:', function () { } }); - const columnNames = [ - 'table_name' - ]; - - const rows = [ - [ 'clean_logs' ], - [ 'clean_logs_json' ], - [ 'glue_cleaned_logs' ], - [ 'test' ] - ]; - const headerAndRows = [columnNames].concat(rows); const columnInfos = columnNames.map(function(name) { @@ -485,11 +251,5 @@ describe('Athena:', function () { 'UpdateCount': 0, 'UpdateType': '' }); - - const expectedRows = ['table_name', 'clean_logs', 'clean_logs_json', 'glue_cleaned_logs', 'test']; - return tables(conn).then(function(results) { - // assert.deepEqual(results.columnnames, columnNames, 'Unexpected column names'); - assert.deepEqual(results, expectedRows, 'Unexpected rows'); - }); - }); + } });