-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: Canner enterprise connector #172
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
{ | ||
"extends": ["../../.eslintrc.json"], | ||
"ignorePatterns": ["!**/*"], | ||
"overrides": [ | ||
{ | ||
"files": ["*.ts", "*.tsx", "*.js", "*.jsx"], | ||
"rules": {} | ||
}, | ||
{ | ||
"files": ["*.ts", "*.tsx"], | ||
"rules": {} | ||
}, | ||
{ | ||
"files": ["*.js", "*.jsx"], | ||
"rules": {} | ||
} | ||
] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
# extension-driver-canner | ||
|
||
Connect to [canner enterprise](https://docs.cannerdata.com/product/api_sdk/pg/pg_overview) through PostgreSQL Wire Protocol | ||
|
||
## Install | ||
|
||
1. Install package | ||
|
||
```sql | ||
npm i @vulcan-sql/extension-driver-canner | ||
``` | ||
|
||
2. Update `vulcan.yaml`, enable the extension. | ||
|
||
```yaml | ||
extensions: | ||
canner: '@vulcan-sql/extension-driver-canner' | ||
``` | ||
|
||
3. Create a new profile in `profiles.yaml` or in your profiles' paths. | ||
```yaml | ||
- name: canner # profile name | ||
type: canner | ||
connection: | ||
# Optional: Server host. | ||
host: string | ||
# Optional: The user to connect to canner enterprise. Default canner | ||
user: string | ||
# Optional: Password to connect to canner enterprise. should be the user PAT in canner enterprise | ||
password: string | ||
# Optional: sql name of the workspace. | ||
database: string | ||
# Optional: canner enterprise PostgreSQL wire protocol port | ||
port: 7432 | ||
# Optional: The max rows we should fetch once. | ||
chunkSize: 100 | ||
# Optional: Maximum number of clients the pool should contain. | ||
max: 10 | ||
# Optional: Number of milliseconds before a statement in query will time out, default is no timeout | ||
statement_timeout: 0 | ||
# Optional: Passed directly to node.TLSSocket, supports all tls.connect options | ||
ssl: false | ||
# Optional: Number of milliseconds before a query call will timeout, default is no timeout | ||
query_timeout: 0 | ||
# Optional: The name of the application that created this Client instance | ||
application_name: string | ||
# Optional: Number of milliseconds to wait for connection, default is no timeout | ||
connectionTimeoutMillis: 0 | ||
# Optional: Number of milliseconds before terminating any session with an open idle transaction, default is no timeout | ||
idle_in_transaction_session_timeout: 0 | ||
# Optional: Number of milliseconds a client must sit idle in the pool and not be checked out before it is disconnected from the backend and discarded. | ||
idleTimeoutMillis: 10000 | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
module.exports = { | ||
displayName: 'extension-driver-canner', | ||
preset: '../../jest.preset.ts', | ||
globals: { | ||
'ts-jest': { | ||
tsconfig: '<rootDir>/tsconfig.spec.json', | ||
}, | ||
}, | ||
transform: { | ||
'^.+\\.[tj]s$': 'ts-jest', | ||
}, | ||
moduleFileExtensions: ['ts', 'js', 'html', 'node'], | ||
coverageDirectory: '../../coverage/packages/extension-driver-canner', | ||
testEnvironment: 'node', | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
{ | ||
"name": "@vulcan-sql/extension-driver-canner", | ||
"description": "Canner Enterprise driver for Vulcan SQL", | ||
"version": "0.4.0", | ||
"type": "commonjs", | ||
"publishConfig": { | ||
"access": "public" | ||
}, | ||
"keywords": [ | ||
"vulcan", | ||
"vulcan-sql", | ||
"data", | ||
"sql", | ||
"database", | ||
"data-warehouse", | ||
"data-lake", | ||
"api-builder", | ||
"postgres", | ||
"pg" | ||
], | ||
"repository": { | ||
"type": "git", | ||
"url": "https://github.com/Canner/vulcan.git" | ||
}, | ||
"license": "MIT", | ||
"peerDependencies": { | ||
"@vulcan-sql/core": "~0.4.0-0" | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
{ | ||
"root": "packages/extension-driver-canner", | ||
"sourceRoot": "packages/extension-driver-canner/src", | ||
"targets": { | ||
"build": { | ||
"executor": "@nrwl/workspace:run-commands", | ||
"options": { | ||
"command": "yarn ts-node ./tools/scripts/replaceAlias.ts extension-driver-canner" | ||
}, | ||
"dependsOn": [ | ||
{ | ||
"projects": "self", | ||
"target": "tsc" | ||
} | ||
] | ||
}, | ||
"tsc": { | ||
"executor": "@nrwl/js:tsc", | ||
"outputs": ["{options.outputPath}"], | ||
"options": { | ||
"outputPath": "dist/packages/extension-driver-canner", | ||
"main": "packages/extension-driver-canner/src/index.ts", | ||
"tsConfig": "packages/extension-driver-canner/tsconfig.lib.json", | ||
"assets": ["packages/extension-driver-canner/*.md"], | ||
"buildableProjectDepsInPackageJsonType": "dependencies" | ||
}, | ||
"dependsOn": [ | ||
{ | ||
"projects": "dependencies", | ||
"target": "build" | ||
} | ||
] | ||
}, | ||
"lint": { | ||
"executor": "@nrwl/linter:eslint", | ||
"outputs": ["{options.outputFile}"], | ||
"options": { | ||
"lintFilePatterns": ["packages/extension-driver-canner/**/*.ts"] | ||
} | ||
}, | ||
"test": { | ||
"executor": "@nrwl/jest:jest", | ||
"outputs": ["coverage/packages/extension-driver-canner"], | ||
"options": { | ||
"jestConfig": "packages/extension-driver-canner/jest.config.ts", | ||
"passWithNoTests": true | ||
} | ||
}, | ||
"publish": { | ||
"executor": "@nrwl/workspace:run-commands", | ||
"options": { | ||
"command": "node ../../../tools/scripts/publish.mjs {args.tag} {args.version}", | ||
"cwd": "dist/packages/extension-driver-canner" | ||
}, | ||
"dependsOn": [ | ||
{ | ||
"projects": "self", | ||
"target": "build" | ||
} | ||
] | ||
} | ||
}, | ||
"tags": [] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
export * from './lib/cannerDataSource'; | ||
import { CannerDataSource } from './lib/cannerDataSource'; | ||
export default [CannerDataSource]; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
import axios from 'axios'; | ||
import { PGOptions } from './cannerDataSource'; | ||
import { ConnectionOptions } from 'tls'; | ||
import { createEnvConfig } from './config'; | ||
import { InternalError, getLogger } from '@vulcan-sql/core'; | ||
|
||
const envConfig = createEnvConfig(); | ||
|
||
export class CannerAdapter { | ||
public readonly host: string; | ||
public readonly workspaceSqlName: string; | ||
public readonly PAT: string | (() => string | Promise<string>); | ||
public readonly ssl: boolean | ConnectionOptions; | ||
private baseUrl: string | undefined; | ||
private logger = getLogger({ scopeName: 'CORE' }); | ||
|
||
constructor(options?: PGOptions) { | ||
if (!options) { | ||
throw new Error(`connection options is required`); | ||
} | ||
const { host, database, password, ssl = false } = options; | ||
if (!host || !database || !password) { | ||
throw new Error(`host, database and password are required`); | ||
} | ||
this.host = host; | ||
this.workspaceSqlName = database; | ||
this.PAT = password; | ||
this.ssl = ssl; | ||
} | ||
|
||
// When querying Canner enterprise, the Canner enterprise will save the query result as parquet files, | ||
// and store them in S3. This method will return the S3 urls of the query result. | ||
// For more Canner API ref: https://docs.cannerdata.com/reference/restful | ||
public async createAsyncQueryResultUrls(sql: string): Promise<string[]> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest using the
It could help the developer and QA to know the current status, working phase, and approximate location on the K8S when an error happened and the error is not caused by what we defined especially. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you miss the suggestion or ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. delivered in 8ed2dfd |
||
this.logger.debug(`Create async request to Canner.`); | ||
let data = await this.getWorkspaceRequestData('post', '/v2/async-queries', { | ||
data: { | ||
sql, | ||
timeout: 600, | ||
noLimit: true, | ||
}, | ||
}); | ||
|
||
const { id: requestId } = data; | ||
this.logger.debug(`Wait Async request to finished.`); | ||
await this.waitAsyncQueryToFinish(requestId); | ||
|
||
// get the query result after the query finished | ||
data = await this.getRequestInfo(requestId); | ||
if (data.error?.message) { | ||
throw new Error(data.error.message); | ||
} | ||
|
||
this.logger.debug(`Get Query result urls.`); | ||
const urls = await this.getAsyncQueryResultUrls(requestId); | ||
this.logger.debug(`Query result urls: \n${urls.join('\n')}`); | ||
return urls; | ||
} | ||
|
||
private async getWorkspaceRequestData( | ||
method: string, | ||
urlPath: string, | ||
options?: Record<string, any> | ||
) { | ||
await this.prepare(); | ||
try { | ||
const response = await axios({ | ||
headers: { | ||
Authorization: `Token ${this.PAT}`, | ||
}, | ||
params: { | ||
workspaceSqlName: this.workspaceSqlName, | ||
}, | ||
url: `${this.baseUrl}${urlPath}`, | ||
method, | ||
...options, | ||
}); | ||
return response.data; | ||
} catch (error: any) { | ||
const message = error.response | ||
? `response status: ${error.response.status}, response data: ${error.response.data}` | ||
: `remote server does not response. request ${error.toJSON()}}`; | ||
throw new InternalError( | ||
`Failed to get workspace request "${urlPath}" data, ${message}` | ||
); | ||
} | ||
} | ||
|
||
private async prepare() { | ||
if (this.baseUrl) { | ||
return; | ||
} | ||
const response = await axios({ | ||
method: 'get', | ||
maxBodyLength: Infinity, | ||
url: `${this.getCannerUrl()}/cluster-info`, | ||
headers: {}, | ||
}); | ||
const { restfulApiBaseEndpoint } = response.data; | ||
if (!restfulApiBaseEndpoint) { | ||
throw new Error( | ||
`The restful API base endpoint is not found, please check "restfulApiBaseEndpoint" field from "/cluster-info" endpoint of Canner Enterprise` | ||
); | ||
} | ||
|
||
this.baseUrl = restfulApiBaseEndpoint; | ||
} | ||
|
||
private getCannerUrl() { | ||
if (envConfig.isOnKubernetes) { | ||
// use env to get the endpoint in k8s | ||
return `http://${envConfig.webServiceHost}`; | ||
} else { | ||
// otherwise use the host user provided | ||
const protocol = this.ssl ? 'https' : 'http'; | ||
return `${protocol}://${this.host}`; | ||
} | ||
} | ||
|
||
private async waitAsyncQueryToFinish(requestId: string) { | ||
let data = await this.getRequestInfo(requestId); | ||
// FINISHED & FAILED are the end state of a async request, and the result urls will be generated only after the request is finished. | ||
while (!['FINISHED', 'FAILED'].includes(data.status)) { | ||
await new Promise((resolve) => setTimeout(resolve, 1000)); | ||
data = await this.getRequestInfo(requestId); | ||
} | ||
} | ||
|
||
private async getRequestInfo(requestId: string) { | ||
return await this.getWorkspaceRequestData( | ||
'get', | ||
`/v2/async-queries/${requestId}` | ||
); | ||
} | ||
|
||
private async getAsyncQueryResultUrls(requestId: string): Promise<string[]> { | ||
const data = await this.getWorkspaceRequestData( | ||
'get', | ||
`/v2/async-queries/${requestId}/result/urls` | ||
); | ||
return data.urls || []; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems the Tips: we could handle the error in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll use status to check and handle the error message from the canner server |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding the comment on the
createAsyncQueryResultUrls
method to describe what the methods could do?Our partner or new member may not have the context to know why we need to get the result URLs. so may suggest you describe what the result urls could do :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add some comment to explain it, thanks.