Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Athena/pr 352 debugging #1

Merged
merged 3 commits into from
Apr 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/constants/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ export const SAMPLE_DBS = {
[DIALECTS.ATHENA]: {
s3Outputlocation: 'plotly-s3-connector-test',
accessKey: 'AKIAIMHMSHTGARJYSKMQ',
secretKey: 'Urvus4R7MnJOAqT4U3eovlCBimQ4Zg2Y9sV5LWow',
secretAccessKey: 'Urvus4R7MnJOAqT4U3eovlCBimQ4Zg2Y9sV5LWow',
region: 'us-east-1',
database: 'falcon',
queryTimeout: 5000
Expand Down
138 changes: 23 additions & 115 deletions backend/persistent/datastores/athena.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,10 @@
import {executeQuery} from './drivers/athena';
import {createAthenaClient, executeQuery} from './drivers/athena';

import Logger from '../../logger';
const SHOW_TABLES_QUERY = 'SHOW TABLES';
const SHOW_SCHEMA_QUERY = 'SELECT table_name, column_name, data_type FROM '
+ 'information_schema.columns WHERE table_schema ';
const SHOW_SCHEMA_QUERY =
'SELECT table_name, column_name, data_type FROM information_schema.columns WHERE table_schema';
const DEFAULT_QUERY_INTERVAL = 2000;
const NUMBER_OF_RETRIES = 50;
const AWS = require('aws-sdk');

/**
* The following function will create an AWS Athena Client
* @param {object} connection - AWS Athena Connection Parameters
* @param {string} connection.accessKey - AWS Access Key
* @param {string} connection.secretAccessKey - AWS Secret Key
* @param {string} connection.region - AWS Region
* @returns {object} AWS Athena Client
*/
function createAthenaClient(connection) {
const connectionParams = {
apiVersion: '2017-05-18',
accessKeyId: connection.accessKey,
secretAccessKey: connection.secretAccessKey,
region: connection.region,
maxRetries: NUMBER_OF_RETRIES
};

if (connection.sslEnabled) {
connectionParams.sslEnabled = connection.sslEnabled;
}
const athenaClient = new AWS.Athena(connectionParams);

return athenaClient;
}
/*
* The connection function will validate the parameters and return the connection
* parameters
Expand All @@ -52,14 +25,8 @@ export function connect(connection) {

connection.athenaClient = createAthenaClient(connection);

return new Promise(function(resolve, reject) {
return schemas(connection).then(() => {
resolve(connection);
}).catch(err => {
Logger.log(err);
reject(err);
});
});
return query('SELECT table_name FROM information_schema.columns LIMIT 1', connection)
.then(() => connection);
}

/**
Expand All @@ -69,41 +36,21 @@ export function connect(connection) {
* @returns {Promise} that resolves to { columnnames, rows }
*/
export function query(queryObject, connection) {
let columnnames = [];
const rows = [];
connection.sqlStatement = queryObject;

return new Promise(function(resolve, reject) {
return executeQuery(connection).then(dataSet => {
let columnnames = [];
let rows = [];

if (!queryObject) {
return reject(new Error('The SQL Statement was not defined'));
if (dataSet && dataSet.length > 0) {
// First row contains the column names
columnnames = dataSet[0].Data.map(columnname => columnname.VarCharValue);

// Loop through the remaining rows to extract data
rows = dataSet.slice(1).map(row => row.Data.map(element => element.VarCharValue));
}
connection.sqlStatement = queryObject;
executeQuery(connection).then(dataSet => {
if (dataSet && dataSet.length > 0) {
// First column contains the column names
const cols = dataSet[0].Data;
columnnames = cols.map(col => col.VarCharValue);

// Loop through the remaining rows to extract data
for (let i = 1; i < dataSet.length; i++) {
const row = dataSet[i];
// Ensure Row is defined and has expected number of columns
if (row && row.Data && row.Data.length === columnnames.length) {
const r = row.Data.map(element => {
if (element && element.VarCharValue) {
return element.VarCharValue;
}
return '';
});
rows.push(r);
}
}
}
resolve({columnnames, rows});
}).catch(err => {
Logger.log(err);
reject(err);
});
return {columnnames, rows};
});
}

Expand All @@ -118,32 +65,9 @@ export function query(queryObject, connection) {
* @returns {Promise} that resolves to { columnnames, rows }
*/
export function schemas(connection) {
const columnnames = ['table_name', 'column_name', 'data_type'];
const rows = [];
connection.sqlStatement = `${SHOW_SCHEMA_QUERY} = '${connection.database}'` ;
connection.queryInterval = DEFAULT_QUERY_INTERVAL;
return new Promise(function(resolve, reject) {
executeQuery(connection).then(dataSet => {
if (dataSet && dataSet.length > 0) {
for (let i = 0; i < dataSet.length; i++) {
const data = dataSet[i];
if (data && data.Data && data.Data.length > 0) {
if (i !== 0) {
const row = [];
row.push(data.Data[0].VarCharValue); // Table Name
row.push(data.Data[1].VarCharValue); // Column Name
row.push(data.Data[2].VarCharValue); // DataType
rows.push(row);
}
}
}
}
return resolve({columnnames, rows});
}).catch(err => {
Logger.log(err);
reject(err);
});
});
const sqlStatement = `${SHOW_SCHEMA_QUERY} = '${connection.database}'` ;

return query(sqlStatement, connection);
}


Expand All @@ -158,26 +82,10 @@ export function schemas(connection) {
* @returns {Promise} that resolves to { columnnames, rows }
*/
export function tables(connection) {

connection.sqlStatement = SHOW_TABLES_QUERY;
connection.queryInterval = DEFAULT_QUERY_INTERVAL;
return new Promise(function(resolve, reject) {
executeQuery(connection).then(dataSet => {

let rst = [];
if (dataSet && dataSet.length > 0) {
rst = dataSet.map(data => {
if (data && data.Data && data.Data.length > 0) {
return data.Data[0].VarCharValue;
}
return '';

});
}
return resolve(rst);
}).catch(err => {
Logger.log(err);
return reject(err);
});
return executeQuery(connection).then(dataSet => {
const tableNames = dataSet.slice(1).map(row => row.Data[0].VarCharValue);
return tableNames;
});
}
}
29 changes: 28 additions & 1 deletion backend/persistent/datastores/drivers/athena.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,33 @@
'use strict';

const AWS = require('aws-sdk');
import Logger from '../../../logger';
const NUMBER_OF_RETRIES = 50;

/**
* The following function will create an AWS Athena Client
* @param {object} connection - AWS Athena Connection Parameters
* @param {string} connection.accessKey - AWS Access Key
* @param {string} connection.secretAccessKey - AWS Secret Key
* @param {string} connection.region - AWS Region
* @returns {object} AWS Athena Client
*/
export function createAthenaClient(connection) {
const connectionParams = {
apiVersion: '2017-05-18',
accessKeyId: connection.accessKey,
secretAccessKey: connection.secretAccessKey,
region: connection.region,
maxRetries: NUMBER_OF_RETRIES
};

if (connection.sslEnabled) {
connectionParams.sslEnabled = connection.sslEnabled;
}
const athenaClient = new AWS.Athena(connectionParams);

return athenaClient;
}

/**
* The following method will execute the sql statement to query the
Expand Down Expand Up @@ -203,4 +230,4 @@ export function executeQuery(queryParams) {
return reject(err);
});
});
}
}
Loading