From a65e56ceead548723a591ca0e51246645dc8b7dd Mon Sep 17 00:00:00 2001 From: onlyjackfrost Date: Thu, 1 Jun 2023 11:13:40 +0800 Subject: [PATCH 1/3] feat(extension-driver-canner): add canner enterprise connector - can access data sources from canner through this extension - support cache feature in data source "canner enterprise" --- .../extension-driver-canner/.eslintrc.json | 18 + packages/extension-driver-canner/README.md | 56 ++++ .../extension-driver-canner/jest.config.ts | 15 + packages/extension-driver-canner/package.json | 29 ++ packages/extension-driver-canner/project.json | 64 ++++ packages/extension-driver-canner/src/index.ts | 3 + .../src/lib/cannerAdapter.ts | 135 ++++++++ .../src/lib/cannerDataSource.ts | 210 ++++++++++++ .../extension-driver-canner/src/lib/config.ts | 13 + .../src/lib/sqlBuilder.ts | 40 +++ .../src/lib/typeMapper.ts | 25 ++ .../test/cannerAdapter.spec.ts | 21 ++ .../test/cannerDataSource.spec.ts | 316 ++++++++++++++++++ .../test/cannerServer.ts | 25 ++ .../test/sqlBuilder.spec.ts | 73 ++++ .../extension-driver-canner/tsconfig.json | 22 ++ .../extension-driver-canner/tsconfig.lib.json | 10 + .../tsconfig.spec.json | 15 + tsconfig.base.json | 3 + workspace.json | 1 + 20 files changed, 1094 insertions(+) create mode 100644 packages/extension-driver-canner/.eslintrc.json create mode 100644 packages/extension-driver-canner/README.md create mode 100644 packages/extension-driver-canner/jest.config.ts create mode 100644 packages/extension-driver-canner/package.json create mode 100644 packages/extension-driver-canner/project.json create mode 100644 packages/extension-driver-canner/src/index.ts create mode 100644 packages/extension-driver-canner/src/lib/cannerAdapter.ts create mode 100644 packages/extension-driver-canner/src/lib/cannerDataSource.ts create mode 100644 packages/extension-driver-canner/src/lib/config.ts create mode 100644 packages/extension-driver-canner/src/lib/sqlBuilder.ts create mode 100644 packages/extension-driver-canner/src/lib/typeMapper.ts create mode 100644 packages/extension-driver-canner/test/cannerAdapter.spec.ts create mode 100644 packages/extension-driver-canner/test/cannerDataSource.spec.ts create mode 100644 packages/extension-driver-canner/test/cannerServer.ts create mode 100644 packages/extension-driver-canner/test/sqlBuilder.spec.ts create mode 100644 packages/extension-driver-canner/tsconfig.json create mode 100644 packages/extension-driver-canner/tsconfig.lib.json create mode 100644 packages/extension-driver-canner/tsconfig.spec.json diff --git a/packages/extension-driver-canner/.eslintrc.json b/packages/extension-driver-canner/.eslintrc.json new file mode 100644 index 00000000..9d9c0db5 --- /dev/null +++ b/packages/extension-driver-canner/.eslintrc.json @@ -0,0 +1,18 @@ +{ + "extends": ["../../.eslintrc.json"], + "ignorePatterns": ["!**/*"], + "overrides": [ + { + "files": ["*.ts", "*.tsx", "*.js", "*.jsx"], + "rules": {} + }, + { + "files": ["*.ts", "*.tsx"], + "rules": {} + }, + { + "files": ["*.js", "*.jsx"], + "rules": {} + } + ] +} diff --git a/packages/extension-driver-canner/README.md b/packages/extension-driver-canner/README.md new file mode 100644 index 00000000..c5bdd4dc --- /dev/null +++ b/packages/extension-driver-canner/README.md @@ -0,0 +1,56 @@ +# extension-driver-canner + +Connect to [canner enterprise](https://docs.cannerdata.com/product/api_sdk/pg/pg_overview) through PostgreSQL Wire Protocol + +## Install + +1. Install package + + ```sql + npm i @vulcan-sql/extension-driver-canner + ``` + +2. Update `vulcan.yaml`, enable the extension. + + ```yaml + extensions: + canner: '@vulcan-sql/extension-driver-canner' + ``` + +3. Create a new profile in `profiles.yaml` or in your profiles' paths. + + ```yaml + - name: canner # profile name + type: canner + connection: + + + # Optional: Server host. + host: string + # Optional: The user to connect to canner enterprise. Default canner + user: string + # Optional: Password to connect to canner enterprise. should be the user PAT in canner enterprise + password: string + # Optional: sql name of the workspace. + database: string + # Optional: canner enterprise PostgreSQL wire protocol port + port: 7432 + # Optional: The max rows we should fetch once. + chunkSize: 100 + # Optional: Maximum number of clients the pool should contain. + max: 10 + # Optional: Number of milliseconds before a statement in query will time out, default is no timeout + statement_timeout: 0 + # Optional: Passed directly to node.TLSSocket, supports all tls.connect options + ssl: false + # Optional: Number of milliseconds before a query call will timeout, default is no timeout + query_timeout: 0 + # Optional: The name of the application that created this Client instance + application_name: string + # Optional: Number of milliseconds to wait for connection, default is no timeout + connectionTimeoutMillis: 0 + # Optional: Number of milliseconds before terminating any session with an open idle transaction, default is no timeout + idle_in_transaction_session_timeout: 0 + # Optional: Number of milliseconds a client must sit idle in the pool and not be checked out before it is disconnected from the backend and discarded. + idleTimeoutMillis: 10000 + ``` diff --git a/packages/extension-driver-canner/jest.config.ts b/packages/extension-driver-canner/jest.config.ts new file mode 100644 index 00000000..8ef5ca4f --- /dev/null +++ b/packages/extension-driver-canner/jest.config.ts @@ -0,0 +1,15 @@ +module.exports = { + displayName: 'extension-driver-canner', + preset: '../../jest.preset.ts', + globals: { + 'ts-jest': { + tsconfig: '/tsconfig.spec.json', + }, + }, + transform: { + '^.+\\.[tj]s$': 'ts-jest', + }, + moduleFileExtensions: ['ts', 'js', 'html', 'node'], + coverageDirectory: '../../coverage/packages/extension-driver-canner', + testEnvironment: 'node', +}; diff --git a/packages/extension-driver-canner/package.json b/packages/extension-driver-canner/package.json new file mode 100644 index 00000000..a30c34f5 --- /dev/null +++ b/packages/extension-driver-canner/package.json @@ -0,0 +1,29 @@ +{ + "name": "@vulcan-sql/extension-driver-canner", + "description": "Canner Enterprise driver for Vulcan SQL", + "version": "0.4.0", + "type": "commonjs", + "publishConfig": { + "access": "public" + }, + "keywords": [ + "vulcan", + "vulcan-sql", + "data", + "sql", + "database", + "data-warehouse", + "data-lake", + "api-builder", + "postgres", + "pg" + ], + "repository": { + "type": "git", + "url": "https://github.com/Canner/vulcan.git" + }, + "license": "MIT", + "peerDependencies": { + "@vulcan-sql/core": "~0.4.0-0" + } +} diff --git a/packages/extension-driver-canner/project.json b/packages/extension-driver-canner/project.json new file mode 100644 index 00000000..13cd2b23 --- /dev/null +++ b/packages/extension-driver-canner/project.json @@ -0,0 +1,64 @@ +{ + "root": "packages/extension-driver-canner", + "sourceRoot": "packages/extension-driver-canner/src", + "targets": { + "build": { + "executor": "@nrwl/workspace:run-commands", + "options": { + "command": "yarn ts-node ./tools/scripts/replaceAlias.ts extension-driver-canner" + }, + "dependsOn": [ + { + "projects": "self", + "target": "tsc" + } + ] + }, + "tsc": { + "executor": "@nrwl/js:tsc", + "outputs": ["{options.outputPath}"], + "options": { + "outputPath": "dist/packages/extension-driver-canner", + "main": "packages/extension-driver-canner/src/index.ts", + "tsConfig": "packages/extension-driver-canner/tsconfig.lib.json", + "assets": ["packages/extension-driver-canner/*.md"], + "buildableProjectDepsInPackageJsonType": "dependencies" + }, + "dependsOn": [ + { + "projects": "dependencies", + "target": "build" + } + ] + }, + "lint": { + "executor": "@nrwl/linter:eslint", + "outputs": ["{options.outputFile}"], + "options": { + "lintFilePatterns": ["packages/extension-driver-canner/**/*.ts"] + } + }, + "test": { + "executor": "@nrwl/jest:jest", + "outputs": ["coverage/packages/extension-driver-canner"], + "options": { + "jestConfig": "packages/extension-driver-canner/jest.config.ts", + "passWithNoTests": true + } + }, + "publish": { + "executor": "@nrwl/workspace:run-commands", + "options": { + "command": "node ../../../tools/scripts/publish.mjs {args.tag} {args.version}", + "cwd": "dist/packages/extension-driver-canner" + }, + "dependsOn": [ + { + "projects": "self", + "target": "build" + } + ] + } + }, + "tags": [] +} diff --git a/packages/extension-driver-canner/src/index.ts b/packages/extension-driver-canner/src/index.ts new file mode 100644 index 00000000..649cead2 --- /dev/null +++ b/packages/extension-driver-canner/src/index.ts @@ -0,0 +1,3 @@ +export * from './lib/cannerDataSource'; +import { CannerDataSource } from './lib/cannerDataSource'; +export default [CannerDataSource]; diff --git a/packages/extension-driver-canner/src/lib/cannerAdapter.ts b/packages/extension-driver-canner/src/lib/cannerAdapter.ts new file mode 100644 index 00000000..5ddfcdb9 --- /dev/null +++ b/packages/extension-driver-canner/src/lib/cannerAdapter.ts @@ -0,0 +1,135 @@ +import axios from 'axios'; +import { PGOptions } from './cannerDataSource'; +import { ConnectionOptions } from 'tls'; +import { createEnvConfig } from './config'; + +const envConfig = createEnvConfig(); + +export class CannerAdapter { + public readonly host: string; + public readonly workspaceSqlName: string; + public readonly PAT: string | (() => string | Promise); + public readonly ssl: boolean | ConnectionOptions; + private baseUrl: string | undefined; + + constructor(options?: PGOptions) { + if (!options) { + throw new Error(`connection options is required`); + } + const { host, database, password, ssl = false } = options; + if (!host || !database || !password) { + throw new Error(`host, database and password are required`); + } + this.host = host; + this.workspaceSqlName = database; + this.PAT = password; + this.ssl = ssl; + } + + // When querying Canner enterprise, the Canner enterprise will save the query result as parquet files, + // and store them in S3. This method will return the S3 urls of the query result. + // For more Canner API ref: https://docs.cannerdata.com/reference/restful + public async createAsyncQueryResultUrls(sql: string): Promise { + let data = await this.getWorkspaceRequestData('post', '/v2/async-queries', { + data: { + sql, + timeout: 600, + noLimit: true, + }, + }); + + const { id: requestId } = data; + await this.waitAsyncQueryToFinish(requestId); + + // get the query result after the query finished + data = await this.getRequestInfo(requestId); + if (data.error?.message) { + throw new Error(data.error.message); + } + const urls = await this.getAsyncQueryResultUrls(requestId); + return urls; + } + + private async getWorkspaceRequestData( + method: string, + urlPath: string, + options?: Record + ) { + await this.prepare(); + const response = await axios({ + headers: { + Authorization: `Token ${this.PAT}`, + }, + params: { + workspaceSqlName: this.workspaceSqlName, + }, + url: `${this.baseUrl}${urlPath}`, + method, + ...options, + }); + if (response.status !== 200) { + throw new Error( + `Failed to get workspace request "${urlPath}" data, status: ${ + response.status + }, data: ${JSON.stringify(response.data)}` + ); + } + return response.data; + } + + private async prepare() { + if (this.baseUrl) { + return; + } + const response = await axios({ + method: 'get', + maxBodyLength: Infinity, + url: `${this.getCannerUrl()}/cluster-info`, + headers: {}, + }); + const { restfulApiBaseEndpoint } = response.data; + if (!restfulApiBaseEndpoint) { + throw new Error( + `The restful API base endpoint is not found, please check "restfulApiBaseEndpoint" field from "/cluster-info" endpoint of Canner Enterprise` + ); + } + + this.baseUrl = restfulApiBaseEndpoint; + } + + private getCannerUrl() { + if (envConfig.isOnKubernetes) { + // use env to get the endpoint in k8s + return `http://${envConfig.webServiceHost}`; + } else { + // otherwise use the host user provided + const protocol = this.ssl ? 'https' : 'http'; + return `${protocol}://${this.host}`; + } + } + + private async waitAsyncQueryToFinish(requestId: string) { + let data = await this.getRequestInfo(requestId); + // FINISHED & FAILED are the end state of a async request, and the result urls will be generated only after the request is finished. + while (!['FINISHED', 'FAILED'].includes(data.status)) { + await new Promise((resolve) => setTimeout(resolve, 1000)); + data = await this.getRequestInfo(requestId); + } + } + + private async getRequestInfo(requestId: string) { + const data = await this.getWorkspaceRequestData( + 'get', + `/v2/async-queries/${requestId}` + ); + return data; + } + + private async getAsyncQueryResultUrls(requestId: string): Promise { + const data = await this.getWorkspaceRequestData( + 'get', + `/v2/async-queries/${requestId}/result/urls` + ); + return data.urls || []; + } +} diff --git a/packages/extension-driver-canner/src/lib/cannerDataSource.ts b/packages/extension-driver-canner/src/lib/cannerDataSource.ts new file mode 100644 index 00000000..18a3a4ee --- /dev/null +++ b/packages/extension-driver-canner/src/lib/cannerDataSource.ts @@ -0,0 +1,210 @@ +import axios from 'axios'; +import { + DataResult, + DataSource, + ExecuteOptions, + ExportOptions, + InternalError, + RequestParameter, + VulcanExtensionId, +} from '@vulcan-sql/core'; +import { Pool, PoolConfig, QueryResult } from 'pg'; +import * as Cursor from 'pg-cursor'; +import { Readable } from 'stream'; +import { buildSQL } from './sqlBuilder'; +import { mapFromPGTypeId } from './typeMapper'; +import * as fs from 'fs'; +import * as path from 'path'; +import { CannerAdapter } from './cannerAdapter'; + +export interface PGOptions extends PoolConfig { + chunkSize?: number; +} + +@VulcanExtensionId('canner') +export class CannerDataSource extends DataSource { + private logger = this.getLogger(); + private poolMapping = new Map(); + + public override async onActivate() { + const profiles = this.getProfiles().values(); + for (const profile of profiles) { + // try to connect by pg wire protocol and make request to api server + this.logger.debug( + `Initializing profile: ${profile.name} using pg wire protocol` + ); + //================================================================================================= + // PG wire protocol + const pool = new Pool(profile.connection); + // https://node-postgres.com/api/pool#poolconnect + // When a client is sitting idly in the pool it can still emit errors because it is connected to a live backend. + // If the backend goes down or a network partition is encountered all the idle, connected clients in your application will emit an error through the pool's error event emitter. + pool.on('error', (err) => { + this.logger.warn( + `Pool client of profile instance ${profile.name} connecting failed, detail error, ${err}` + ); + }); + await pool.query('select 1'); + this.poolMapping.set(profile.name, { + pool, + options: profile.connection, + }); + this.logger.debug(`Profile ${profile.name} initialized`); + } + } + + public override async export({ + sql, + directory, + profileName, + }: ExportOptions): Promise { + if (!this.poolMapping.has(profileName)) { + throw new InternalError(`Profile instance ${profileName} not found`); + } + // throw if dir is not exist + if (!fs.existsSync(directory)) { + throw new InternalError(`Directory ${directory} not found`); + } + const { options: connection } = this.poolMapping.get(profileName)!; + + const cannerAdapter = new CannerAdapter(connection); + try { + this.logger.debug('Send the async query to the Canner Enterprise'); + const presignedUrls = await cannerAdapter.createAsyncQueryResultUrls(sql); + this.logger.debug( + 'Start fetching the query result parquet files from URLs' + ); + await this.downloadFiles(presignedUrls, directory); + this.logger.debug('Parquet files downloaded successfully'); + } catch (error: any) { + this.logger.debug('Failed to export data from canner', error); + throw error; + } + } + + private async downloadFiles(urls: string[], directory: string) { + await Promise.all( + urls.map(async (url: string, index: number) => { + const response = await axios.get(url, { + responseType: 'stream', + }); + // The file name will be a substring that is after the last "/" and followed by the "?" and the query string + // ex: https://cannerHost/data/canner/somePath/file-name?X-Amz-Algorithm=AWS4-HMAC-SHA256 + const fileName = url.split('/').pop()?.split('?')[0] || `part${index}`; + const writeStream = fs.createWriteStream( + path.join(directory, fileName) + ); + response.data.pipe(writeStream); + return new Promise((resolve, reject) => { + writeStream.on('finish', resolve); + writeStream.on('error', reject); + }); + }) + ); + } + public async execute({ + statement: sql, + bindParams, + profileName, + operations, + }: ExecuteOptions): Promise { + if (!this.poolMapping.has(profileName)) { + throw new InternalError(`Profile instance ${profileName} not found`); + } + const { pool, options } = this.poolMapping.get(profileName)!; + this.logger.debug(`Acquiring connection from ${profileName}`); + const client = await pool.connect(); + this.logger.debug(`Acquired connection from ${profileName}`); + try { + const builtSQL = buildSQL(sql, operations); + const cursor = client.query( + new Cursor(builtSQL, Array.from(bindParams.values())) + ); + cursor.once('done', async () => { + this.logger.debug( + `Data fetched, release connection from ${profileName}` + ); + // It is important to close the cursor before releasing connection, or the connection might not able to handle next request. + await cursor.close(); + client.release(); + }); + // All promises MUST fulfilled in this function or we are not able to release the connection when error occurred + return await this.getResultFromCursor(cursor, options); + } catch (e: any) { + this.logger.debug( + `Errors occurred, release connection from ${profileName}` + ); + client.release(); + throw e; + } + } + + public async prepare({ parameterIndex }: RequestParameter) { + return `$${parameterIndex}`; + } + + public async destroy() { + for (const { pool } of this.poolMapping.values()) { + await pool.end(); + } + } + + private async getResultFromCursor( + cursor: Cursor, + options: PGOptions = {} + ): Promise { + const { chunkSize = 100 } = options; + const cursorRead = this.cursorRead.bind(this); + const firstChunk = await cursorRead(cursor, chunkSize); + // save first chunk in buffer for incoming requests + let bufferedRows = [...firstChunk.rows]; + let bufferReadIndex = 0; + const fetchNext = async () => { + if (bufferReadIndex >= bufferedRows.length) { + bufferedRows = (await cursorRead(cursor, chunkSize)).rows; + bufferReadIndex = 0; + } + return bufferedRows[bufferReadIndex++] || null; + }; + const stream = new Readable({ + objectMode: true, + read() { + fetchNext() + .then((row) => { + this.push(row); + }) + .catch((error) => { + this.destroy(error); + }); + }, + destroy(error: Error | null, cb: (error: Error | null) => void) { + // Send done event to notify upstream to release the connection. + cursor.emit('done'); + cb(error); + }, + // automatically destroy() the stream when it emits 'finish' or errors. Node > 10.16 + autoDestroy: true, + }); + return { + getColumns: () => + firstChunk.result.fields.map((field) => ({ + name: field.name, + type: mapFromPGTypeId(field.dataTypeID), + })), + getData: () => stream, + }; + } + + public async cursorRead(cursor: Cursor, maxRows: number) { + return new Promise<{ rows: any[]; result: QueryResult }>( + (resolve, reject) => { + cursor.read(maxRows, (err, rows, result) => { + if (err) { + return reject(err); + } + resolve({ rows, result }); + }); + } + ); + } +} diff --git a/packages/extension-driver-canner/src/lib/config.ts b/packages/extension-driver-canner/src/lib/config.ts new file mode 100644 index 00000000..e7525f90 --- /dev/null +++ b/packages/extension-driver-canner/src/lib/config.ts @@ -0,0 +1,13 @@ +export interface IEnvConfig { + // indicates whether the extension is running in k8s + isOnKubernetes?: boolean; + // the host of the web service + webServiceHost?: string; +} + +export const createEnvConfig = (): IEnvConfig => { + return { + isOnKubernetes: Boolean(process.env['IS_ON_KUBERNETES']) || false, + webServiceHost: process.env['WEB_SERVICE_HOST'], + } as IEnvConfig; +}; diff --git a/packages/extension-driver-canner/src/lib/sqlBuilder.ts b/packages/extension-driver-canner/src/lib/sqlBuilder.ts new file mode 100644 index 00000000..b5b94e95 --- /dev/null +++ b/packages/extension-driver-canner/src/lib/sqlBuilder.ts @@ -0,0 +1,40 @@ +import { Parameterized, SQLClauseOperation } from '@vulcan-sql/core'; +import { isNull, isUndefined } from 'lodash'; + +const isNullOrUndefine = (value: any) => isUndefined(value) || isNull(value); + +export const removeEndingSemiColon = (sql: string) => { + return sql.replace(/;([ \n]+)?$/, ''); +}; + +export const addLimit = (sql: string, limit?: string | null) => { + if (isNullOrUndefine(limit)) return sql; + return [sql, `LIMIT`, limit].join(' '); +}; + +export const addOffset = (sql: string, offset?: string | null) => { + if (isNullOrUndefine(offset)) return sql; + return [sql, `OFFSET`, offset].join(' '); +}; + +// Check if there is no operations +export const isNoOP = ( + operations: Partial> +): boolean => { + if (!isNullOrUndefine(operations.limit)) return false; + if (!isNullOrUndefine(operations.offset)) return false; + return true; +}; + +export const buildSQL = ( + sql: string, + operations: Partial> +): string => { + if (isNoOP(operations)) return sql; + let builtSQL = ''; + builtSQL += `SELECT * FROM (${removeEndingSemiColon(sql)})`; + builtSQL = addLimit(builtSQL, operations.limit); + builtSQL = addOffset(builtSQL, operations.offset); + builtSQL += ';'; + return builtSQL; +}; diff --git a/packages/extension-driver-canner/src/lib/typeMapper.ts b/packages/extension-driver-canner/src/lib/typeMapper.ts new file mode 100644 index 00000000..d102d49a --- /dev/null +++ b/packages/extension-driver-canner/src/lib/typeMapper.ts @@ -0,0 +1,25 @@ +import { builtins, TypeId } from 'pg-types'; + +const typeMapping = new Map(); + +const register = (pgTypeId: TypeId, type: string) => { + typeMapping.set(pgTypeId, type); +}; + +// Reference +// https://github.com/brianc/node-pg-types/blob/master/lib/textParsers.js +// https://github.com/brianc/node-pg-types/blob/master/lib/binaryParsers.js + +register(builtins.INT8, 'number'); +register(builtins.INT4, 'number'); +register(builtins.INT2, 'number'); +register(builtins.OID, 'number'); +register(builtins.NUMERIC, 'number'); +register(builtins.FLOAT4, 'number'); // float4/real +register(builtins.FLOAT8, 'number'); // float8/double +register(builtins.BOOL, 'boolean'); + +export const mapFromPGTypeId = (pgTypeId: number) => { + if (typeMapping.has(pgTypeId)) return typeMapping.get(pgTypeId)!; + return 'string'; +}; diff --git a/packages/extension-driver-canner/test/cannerAdapter.spec.ts b/packages/extension-driver-canner/test/cannerAdapter.spec.ts new file mode 100644 index 00000000..2fb8b444 --- /dev/null +++ b/packages/extension-driver-canner/test/cannerAdapter.spec.ts @@ -0,0 +1,21 @@ +import { CannerServer } from './cannerServer'; +import { CannerAdapter } from '../src/lib/cannerAdapter'; + +const pg = new CannerServer(); + +it('CannerAdapter should get urls without throw any error when connection and sql are valid', async () => { + // Arrange + const { connection } = pg.getProfile('profile1'); + const adapter = new CannerAdapter(connection); + // Act, Assert + await expect( + adapter.createAsyncQueryResultUrls('select 1') + ).resolves.not.toThrow(); +}, 50000); +it('CannerAdapter should throw when connection or sql are invalid', async () => { + // Arrange + const { connection } = pg.getProfile('profile1'); + const adapter = new CannerAdapter(connection); + // Act, Assert + await expect(adapter.createAsyncQueryResultUrls('wrong')).rejects.toThrow(); // +}, 50000); diff --git a/packages/extension-driver-canner/test/cannerDataSource.spec.ts b/packages/extension-driver-canner/test/cannerDataSource.spec.ts new file mode 100644 index 00000000..12bdf76e --- /dev/null +++ b/packages/extension-driver-canner/test/cannerDataSource.spec.ts @@ -0,0 +1,316 @@ +import { CannerServer } from './cannerServer'; +import { CannerDataSource, PGOptions } from '../src'; +import { ExportOptions, streamToArray } from '@vulcan-sql/core'; +import { Writable } from 'stream'; +import * as sinon from 'ts-sinon'; +import * as fs from 'fs'; +import { CannerAdapter } from '../src/lib/cannerAdapter'; + +const pg = new CannerServer(); +let dataSource: CannerDataSource; + +// restore all sinon mock/stub before each test +beforeEach(() => { + sinon.default.restore(); +}); + +it('Data source should be activate without any error when all profiles are valid', async () => { + // Arrange + dataSource = new CannerDataSource({}, '', [pg.getProfile('profile1')]); + // Act, Assert + await expect(dataSource.activate()).resolves.not.toThrow(); +}); + +it('Data source should throw error when activating if any profile is invalid', async () => { + // Arrange + const profile1 = pg.getProfile('profile1'); + dataSource = new CannerDataSource({}, '', [ + profile1, + { + name: 'wrong-password', + type: 'canner', + connection: { + ...profile1.connection, + password: 'wrong-password', + } as PGOptions, + allow: '*', + }, + ]); + // Act, Assert + await expect(dataSource.activate()).rejects.toThrow(); +}); + +// export method should be executed successfully +it('Data source should export successfully', async () => { + fs.mkdirSync('tmp', { recursive: true }); + dataSource = new CannerDataSource({}, '', [pg.getProfile('profile1')]); + await dataSource.activate(); + + // Act, Assert + await expect( + dataSource.export({ + sql: 'select 1', + directory: 'tmp', + profileName: 'profile1', + } as ExportOptions) + ).resolves.not.toThrow(); + expect(fs.readdirSync('tmp').length).toBe(1); + + // clean up + fs.rmSync('tmp', { recursive: true, force: true }); +}, 100000); + +it('Data source should throw when fail to export data', async () => { + // Arrange + sinon.default + .stub(CannerAdapter.prototype, 'createAsyncQueryResultUrls') + // eslint-disable-next-line @typescript-eslint/no-unused-vars + .callsFake(async (sql) => { + throw new Error('mock error'); + }); + + fs.mkdirSync('tmp', { recursive: true }); + dataSource = new CannerDataSource({}, '', [pg.getProfile('profile1')]); + await dataSource.activate(); + + // Act, Assert + await expect( + dataSource.export({ + sql: 'select 1', + directory: 'tmp', + profileName: 'profile1', + } as ExportOptions) + ).rejects.toThrow(); + expect(fs.readdirSync('tmp').length).toBe(0); + + // clean up + fs.rmSync('tmp', { recursive: true, force: true }); +}, 100000); + +it('Data source should throw when given directory is not exist', async () => { + // Arrange + dataSource = new CannerDataSource({}, '', [pg.getProfile('profile1')]); + await dataSource.activate(); + + // Act, Assert + await expect( + dataSource.export({ + sql: 'select 1', + directory: 'tmp', + profileName: 'profile1', + } as ExportOptions) + ).rejects.toThrow(); +}, 100000); + +it('Data source should throw when given profile name is not exist', async () => { + // Arrange + dataSource = new CannerDataSource({}, '', [pg.getProfile('profile1')]); + await dataSource.activate(); + fs.mkdirSync('tmp', { recursive: true }); + + // Act, Assert + await expect( + dataSource.export({ + sql: 'select 1', + directory: 'tmp', + profileName: 'profile not exist', + } as ExportOptions) + ).rejects.toThrow(); +}, 100000); + +it('Data source should return correct rows with 1 chunks', async () => { + // Arrange + dataSource = new CannerDataSource({}, '', [pg.getProfile('profile1')]); + await dataSource.activate(); + // Act + const { getData } = await dataSource.execute({ + statement: "select 123 as A, 'str' as B, true as C", + bindParams: new Map(), + profileName: 'profile1', + operations: {} as any, + }); + const rows = await streamToArray(getData()); + // Assert + expect(rows.length).toBe(1); +}, 30000); + +it('Data source should return correct rows', async () => { + // Arrange + dataSource = new CannerDataSource({}, '', [pg.getProfile('profile1')]); + await dataSource.activate(); + // Act + const { getData } = await dataSource.execute({ + statement: "select 123 as A, 'str' as B, true as C", + bindParams: new Map(), + profileName: 'profile1', + operations: {} as any, + }); + const rows = await streamToArray(getData()); + // Assert + expect(rows.length).toBe(1); +}, 30000); + +it('Data source should return empty data with no row', async () => { + // Arrange + dataSource = new CannerDataSource({}, '', [pg.getProfile('profile1')]); + await dataSource.activate(); + // Act + const { getData } = await dataSource.execute({ + statement: 'select 1 limit 0', + bindParams: new Map(), + profileName: 'profile1', + operations: {} as any, + }); + const rows = await streamToArray(getData()); + // Assert + expect(rows.length).toBe(0); +}, 30000); + +it('Data source should release the connection when finished no matter success or not', async () => { + // Arrange + const profile1 = pg.getProfile('profile1'); + dataSource = new CannerDataSource({}, '', [ + { + name: 'profile1', + type: 'canner', + connection: { + ...profile1.connection, + max: 1, // Limit the pool size to 1, we'll get blocked with any leak. + min: 1, + } as PGOptions, + allow: '*', + }, + ]); + await dataSource.activate(); + + // Act + // send parallel queries to test pool leak + const result = await Promise.all( + [ + async () => { + const { getData } = await dataSource.execute({ + statement: 'select 1', + bindParams: new Map(), + profileName: 'profile1', + operations: {} as any, + }); + return await streamToArray(getData()); + }, + async () => { + try { + const { getData } = await dataSource.execute({ + statement: 'select 1', + bindParams: new Map(), + profileName: 'profile1', + operations: {} as any, + }); + await streamToArray(getData()); + return [{}]; // fake data + } catch (error) { + // ignore error + return []; + } + }, + async () => { + const { getData } = await dataSource.execute({ + statement: 'select 1', + bindParams: new Map(), + profileName: 'profile1', + operations: {} as any, + }); + return await streamToArray(getData()); + }, + ].map((task) => task()) + ); + + // Assert + expect(result[0].length).toBe(1); + expect(result[1].length).toBe(1); + expect(result[2].length).toBe(1); +}, 60000); + +it('Data source should work with prepare statements', async () => { + // Arrange + dataSource = new CannerDataSource({}, '', [pg.getProfile('profile1')]); + await dataSource.activate(); + // Act + const bindParams = new Map(); + const var1Name = await dataSource.prepare({ + parameterIndex: 1, + value: '123', + profileName: 'profile1', + }); + bindParams.set(var1Name, '123'); + const var2Name = await dataSource.prepare({ + parameterIndex: 2, + value: '456', + profileName: 'profile1', + }); + bindParams.set(var2Name, '456'); + + const { getData } = await dataSource.execute({ + statement: `select ${var1Name} as v1, ${var2Name} as v2;`, + bindParams, + profileName: 'profile1', + operations: {} as any, + }); + const rows = await streamToArray(getData()); + // Assert + expect(rows[0].v1).toBe('123'); + expect(rows[0].v2).toBe('456'); +}, 30000); + +it('Data source should return correct column types', async () => { + // Arrange + dataSource = new CannerDataSource({}, '', [pg.getProfile('profile1')]); + await dataSource.activate(); + // Act + const { getColumns, getData } = await dataSource.execute({ + statement: "select 1 as id, 'name' as name, true as enabled limit 0", + bindParams: new Map(), + profileName: 'profile1', + operations: {} as any, + }); + const column = getColumns(); + // We need to destroy the data stream or the driver waits for us + const data = getData(); + data.destroy(); + + // Assert + expect(column[0]).toEqual({ name: 'id', type: 'number' }); + expect(column[1]).toEqual({ name: 'name', type: 'string' }); + expect(column[2]).toEqual({ name: 'enabled', type: 'boolean' }); +}, 30000); + +it('Data source should release connection when readable stream is destroyed', async () => { + // Arrange + dataSource = new CannerDataSource({}, '', [pg.getProfile('profile1')]); + await dataSource.activate(); + // Act + const { getData } = await dataSource.execute({ + statement: 'select 1', + bindParams: new Map(), + profileName: 'profile1', + operations: {} as any, + }); + const readStream = getData(); + const rows: any[] = []; + let resolve: any; + const waitForStream = () => new Promise((res) => (resolve = res)); + const writeStream = new Writable({ + write(chunk, _, cb) { + rows.push(chunk); + // After read 1 records, destroy the upstream + if (rows.length === 1) { + readStream.destroy(); + resolve(); + } else cb(); + }, + objectMode: true, + }); + readStream.pipe(writeStream); + await waitForStream(); + // Assert + expect(rows.length).toBe(1); + // afterEach hook will timeout if any leak occurred. +}, 300000); diff --git a/packages/extension-driver-canner/test/cannerServer.ts b/packages/extension-driver-canner/test/cannerServer.ts new file mode 100644 index 00000000..a24c2c8e --- /dev/null +++ b/packages/extension-driver-canner/test/cannerServer.ts @@ -0,0 +1,25 @@ +/* istanbul ignore file */ +import { PGOptions } from '../src/lib/cannerDataSource'; + +['CANNER_HOST', 'CANNER_PAT', 'CANNER_WORKSPACE_SQL_NAME'].forEach( + (envName) => { + /* istanbul ignore next */ + if (!process.env[envName]) throw new Error(`${envName} not defined`); + } +); +export class CannerServer { + public getProfile(name: string) { + return { + name, + type: 'canner', + connection: { + host: process.env['CANNER_HOST'], + port: process.env['CANNER_PORT'] || 7432, + user: process.env['CANNER_USER'] || 'canner', + password: process.env['CANNER_PAT'], + database: process.env['CANNER_WORKSPACE_SQL_NAME'], + } as PGOptions, + allow: '*', + }; + } +} diff --git a/packages/extension-driver-canner/test/sqlBuilder.spec.ts b/packages/extension-driver-canner/test/sqlBuilder.spec.ts new file mode 100644 index 00000000..5d0424d0 --- /dev/null +++ b/packages/extension-driver-canner/test/sqlBuilder.spec.ts @@ -0,0 +1,73 @@ +import * as builder from '../src/lib/sqlBuilder'; + +describe('SQL builders components test', () => { + it('removeEndingSemiColon', async () => { + // Arrange + const statement = `SELECT * FROM users; \n `; + // Act + const result = builder.removeEndingSemiColon(statement); + // Arrange + expect(result).toBe('SELECT * FROM users'); + }); + + it('addLimit - string value', async () => { + // Arrange + const statement = `SELECT * FROM users`; + // Act + const result = builder.addLimit(statement, '$1'); + // Arrange + expect(result).toBe('SELECT * FROM users LIMIT $1'); + }); + + it('addLimit - null value', async () => { + // Arrange + const statement = `SELECT * FROM users`; + // Act + const result = builder.addLimit(statement, null); + // Arrange + expect(result).toBe('SELECT * FROM users'); + }); + + it('addOffset - string value', async () => { + // Arrange + const statement = `SELECT * FROM users`; + // Act + const result = builder.addOffset(statement, '$1'); + // Arrange + expect(result).toBe('SELECT * FROM users OFFSET $1'); + }); + + it('addOffset - null value', async () => { + // Arrange + const statement = `SELECT * FROM users`; + // Act + const result = builder.addOffset(statement, null); + // Arrange + expect(result).toBe('SELECT * FROM users'); + }); + + it('isNoOP - empty operation', async () => { + // Act + const result = builder.isNoOP({}); + // Arrange + expect(result).toBe(true); + }); + + it('isNoOP - some operations', async () => { + // Act + const results = [{ limit: '$1' }, { offset: '$1' }].map(builder.isNoOP); + // Arrange + expect(results.every((result) => result === false)).toBeTruthy(); + }); +}); + +it('BuildSQL function should build sql with operations', async () => { + // Arrange + const statement = `SELECT * FROM users;`; + // Act + const result = builder.buildSQL(statement, { limit: '$1', offset: '$2' }); + // Arrange + expect(result).toBe( + 'SELECT * FROM (SELECT * FROM users) LIMIT $1 OFFSET $2;' + ); +}); diff --git a/packages/extension-driver-canner/tsconfig.json b/packages/extension-driver-canner/tsconfig.json new file mode 100644 index 00000000..f5b85657 --- /dev/null +++ b/packages/extension-driver-canner/tsconfig.json @@ -0,0 +1,22 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "module": "commonjs", + "forceConsistentCasingInFileNames": true, + "strict": true, + "noImplicitOverride": true, + "noPropertyAccessFromIndexSignature": true, + "noImplicitReturns": true, + "noFallthroughCasesInSwitch": true + }, + "files": [], + "include": [], + "references": [ + { + "path": "./tsconfig.lib.json" + }, + { + "path": "./tsconfig.spec.json" + } + ] +} diff --git a/packages/extension-driver-canner/tsconfig.lib.json b/packages/extension-driver-canner/tsconfig.lib.json new file mode 100644 index 00000000..1925baa1 --- /dev/null +++ b/packages/extension-driver-canner/tsconfig.lib.json @@ -0,0 +1,10 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "outDir": "../../dist/out-tsc", + "declaration": true, + "types": [] + }, + "include": ["**/*.ts", "../../types/*.d.ts"], + "exclude": ["jest.config.ts", "**/*.spec.ts", "**/*.test.ts"] +} diff --git a/packages/extension-driver-canner/tsconfig.spec.json b/packages/extension-driver-canner/tsconfig.spec.json new file mode 100644 index 00000000..eb72f635 --- /dev/null +++ b/packages/extension-driver-canner/tsconfig.spec.json @@ -0,0 +1,15 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "outDir": "../../dist/out-tsc", + "module": "commonjs", + "types": ["jest", "node"] + }, + "include": [ + "jest.config.ts", + "**/*.test.ts", + "**/*.spec.ts", + "**/*.d.ts", + "../../types/*.d.ts" + ] +} diff --git a/tsconfig.base.json b/tsconfig.base.json index 36b7ee2c..44d94159 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -83,6 +83,9 @@ "@vulcan-sql/extension-driver-pg": [ "packages/extension-driver-pg/src/index.ts" ], + "@vulcan-sql/extension-driver-canner": [ + "packages/extension-driver-canner/src/index.ts" + ], "@vulcan-sql/extension-driver-snowflake": [ "packages/extension-driver-snowflake/src/index.ts" ], diff --git a/workspace.json b/workspace.json index 852e18d8..e37e0eb0 100644 --- a/workspace.json +++ b/workspace.json @@ -12,6 +12,7 @@ "extension-driver-duckdb": "packages/extension-driver-duckdb", "extension-driver-pg": "packages/extension-driver-pg", "extension-driver-snowflake": "packages/extension-driver-snowflake", + "extension-driver-canner": "packages/extension-driver-canner", "integration-testing": "packages/integration-testing", "serve": "packages/serve", "test-utility": "packages/test-utility" From 8ed2dfdd0fb890a67ad04c16e25bc87c1144eec6 Mon Sep 17 00:00:00 2001 From: onlyjackfrost Date: Mon, 5 Jun 2023 16:21:01 +0800 Subject: [PATCH 2/3] chore: fix axios error handling, add comments, add logger for debugging --- .../src/lib/cannerAdapter.ts | 47 +++++++++++-------- .../extension-driver-canner/src/lib/config.ts | 3 ++ .../test/cannerDataSource.spec.ts | 7 ++- 3 files changed, 36 insertions(+), 21 deletions(-) diff --git a/packages/extension-driver-canner/src/lib/cannerAdapter.ts b/packages/extension-driver-canner/src/lib/cannerAdapter.ts index 5ddfcdb9..9c3fd476 100644 --- a/packages/extension-driver-canner/src/lib/cannerAdapter.ts +++ b/packages/extension-driver-canner/src/lib/cannerAdapter.ts @@ -2,6 +2,7 @@ import axios from 'axios'; import { PGOptions } from './cannerDataSource'; import { ConnectionOptions } from 'tls'; import { createEnvConfig } from './config'; +import { InternalError, getLogger } from '@vulcan-sql/core'; const envConfig = createEnvConfig(); @@ -11,6 +12,7 @@ export class CannerAdapter { public readonly PAT: string | (() => string | Promise); public readonly ssl: boolean | ConnectionOptions; private baseUrl: string | undefined; + private logger = getLogger({ scopeName: 'CORE' }); constructor(options?: PGOptions) { if (!options) { @@ -30,6 +32,7 @@ export class CannerAdapter { // and store them in S3. This method will return the S3 urls of the query result. // For more Canner API ref: https://docs.cannerdata.com/reference/restful public async createAsyncQueryResultUrls(sql: string): Promise { + this.logger.debug(`Create async request to Canner.`); let data = await this.getWorkspaceRequestData('post', '/v2/async-queries', { data: { sql, @@ -39,6 +42,7 @@ export class CannerAdapter { }); const { id: requestId } = data; + this.logger.debug(`Wait Async request to finished.`); await this.waitAsyncQueryToFinish(requestId); // get the query result after the query finished @@ -46,7 +50,10 @@ export class CannerAdapter { if (data.error?.message) { throw new Error(data.error.message); } + + this.logger.debug(`Get Query result urls.`); const urls = await this.getAsyncQueryResultUrls(requestId); + this.logger.debug(`Query result urls: \n${urls.join('\n')}`); return urls; } @@ -56,25 +63,28 @@ export class CannerAdapter { options?: Record ) { await this.prepare(); - const response = await axios({ - headers: { - Authorization: `Token ${this.PAT}`, - }, - params: { - workspaceSqlName: this.workspaceSqlName, - }, - url: `${this.baseUrl}${urlPath}`, - method, - ...options, - }); - if (response.status !== 200) { - throw new Error( - `Failed to get workspace request "${urlPath}" data, status: ${ - response.status - }, data: ${JSON.stringify(response.data)}` + try { + const response = await axios({ + headers: { + Authorization: `Token ${this.PAT}`, + }, + params: { + workspaceSqlName: this.workspaceSqlName, + }, + url: `${this.baseUrl}${urlPath}`, + method, + ...options, + }); + return response.data; + } catch (error: any) { + const message = error.response + ? `response: ${JSON.stringify(error.response)}` + : `remote server does not response. request ${error.request}`; + + throw new InternalError( + `Failed to get workspace request "${urlPath}" data, ${message}` ); } - return response.data; } private async prepare() { @@ -118,11 +128,10 @@ export class CannerAdapter { } private async getRequestInfo(requestId: string) { - const data = await this.getWorkspaceRequestData( + return await this.getWorkspaceRequestData( 'get', `/v2/async-queries/${requestId}` ); - return data; } private async getAsyncQueryResultUrls(requestId: string): Promise { diff --git a/packages/extension-driver-canner/src/lib/config.ts b/packages/extension-driver-canner/src/lib/config.ts index e7525f90..6e563077 100644 --- a/packages/extension-driver-canner/src/lib/config.ts +++ b/packages/extension-driver-canner/src/lib/config.ts @@ -7,6 +7,9 @@ export interface IEnvConfig { export const createEnvConfig = (): IEnvConfig => { return { + // when integrate with the Canner Enterprise, the vulcan server and canner server will be deployed in k8s in the same cluster + // so the protocol and host might be different from the user provided. + // e.g. the user provided host is "my-canner.web.com" with "https", but the actual host is "vulcan-server:3000" with protocol "http" isOnKubernetes: Boolean(process.env['IS_ON_KUBERNETES']) || false, webServiceHost: process.env['WEB_SERVICE_HOST'], } as IEnvConfig; diff --git a/packages/extension-driver-canner/test/cannerDataSource.spec.ts b/packages/extension-driver-canner/test/cannerDataSource.spec.ts index 12bdf76e..ac9a7ebb 100644 --- a/packages/extension-driver-canner/test/cannerDataSource.spec.ts +++ b/packages/extension-driver-canner/test/cannerDataSource.spec.ts @@ -1,6 +1,6 @@ import { CannerServer } from './cannerServer'; import { CannerDataSource, PGOptions } from '../src'; -import { ExportOptions, streamToArray } from '@vulcan-sql/core'; +import { ExportOptions, InternalError, streamToArray } from '@vulcan-sql/core'; import { Writable } from 'stream'; import * as sinon from 'ts-sinon'; import * as fs from 'fs'; @@ -62,11 +62,14 @@ it('Data source should export successfully', async () => { it('Data source should throw when fail to export data', async () => { // Arrange + // stub the private function to manipulate getting error from the remote server sinon.default .stub(CannerAdapter.prototype, 'createAsyncQueryResultUrls') // eslint-disable-next-line @typescript-eslint/no-unused-vars .callsFake(async (sql) => { - throw new Error('mock error'); + throw new InternalError( + 'Failed to get workspace request "mock/url" data' + ); }); fs.mkdirSync('tmp', { recursive: true }); From a646f5c16c39e8c30e220a5fc357fcdd5e7108b9 Mon Sep 17 00:00:00 2001 From: onlyjackfrost Date: Mon, 5 Jun 2023 18:29:09 +0800 Subject: [PATCH 3/3] chore: simplify error message --- packages/extension-driver-canner/src/lib/cannerAdapter.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/extension-driver-canner/src/lib/cannerAdapter.ts b/packages/extension-driver-canner/src/lib/cannerAdapter.ts index 9c3fd476..52278e58 100644 --- a/packages/extension-driver-canner/src/lib/cannerAdapter.ts +++ b/packages/extension-driver-canner/src/lib/cannerAdapter.ts @@ -78,9 +78,8 @@ export class CannerAdapter { return response.data; } catch (error: any) { const message = error.response - ? `response: ${JSON.stringify(error.response)}` - : `remote server does not response. request ${error.request}`; - + ? `response status: ${error.response.status}, response data: ${error.response.data}` + : `remote server does not response. request ${error.toJSON()}}`; throw new InternalError( `Failed to get workspace request "${urlPath}" data, ${message}` );