Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Canner enterprise connector #172

Merged
merged 3 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions packages/extension-driver-canner/.eslintrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"extends": ["../../.eslintrc.json"],
"ignorePatterns": ["!**/*"],
"overrides": [
{
"files": ["*.ts", "*.tsx", "*.js", "*.jsx"],
"rules": {}
},
{
"files": ["*.ts", "*.tsx"],
"rules": {}
},
{
"files": ["*.js", "*.jsx"],
"rules": {}
}
]
}
56 changes: 56 additions & 0 deletions packages/extension-driver-canner/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# extension-driver-canner

Connect to [canner enterprise](https://docs.cannerdata.com/product/api_sdk/pg/pg_overview) through PostgreSQL Wire Protocol

## Install

1. Install package

```sql
npm i @vulcan-sql/extension-driver-canner
```

2. Update `vulcan.yaml`, enable the extension.

```yaml
extensions:
canner: '@vulcan-sql/extension-driver-canner'
```

3. Create a new profile in `profiles.yaml` or in your profiles' paths.

```yaml
- name: canner # profile name
type: canner
connection:


# Optional: Server host.
host: string
# Optional: The user to connect to canner enterprise. Default canner
user: string
# Optional: Password to connect to canner enterprise. should be the user PAT in canner enterprise
password: string
# Optional: sql name of the workspace.
database: string
# Optional: canner enterprise PostgreSQL wire protocol port
port: 7432
# Optional: The max rows we should fetch once.
chunkSize: 100
# Optional: Maximum number of clients the pool should contain.
max: 10
# Optional: Number of milliseconds before a statement in query will time out, default is no timeout
statement_timeout: 0
# Optional: Passed directly to node.TLSSocket, supports all tls.connect options
ssl: false
# Optional: Number of milliseconds before a query call will timeout, default is no timeout
query_timeout: 0
# Optional: The name of the application that created this Client instance
application_name: string
# Optional: Number of milliseconds to wait for connection, default is no timeout
connectionTimeoutMillis: 0
# Optional: Number of milliseconds before terminating any session with an open idle transaction, default is no timeout
idle_in_transaction_session_timeout: 0
# Optional: Number of milliseconds a client must sit idle in the pool and not be checked out before it is disconnected from the backend and discarded.
idleTimeoutMillis: 10000
```
15 changes: 15 additions & 0 deletions packages/extension-driver-canner/jest.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module.exports = {
displayName: 'extension-driver-canner',
preset: '../../jest.preset.ts',
globals: {
'ts-jest': {
tsconfig: '<rootDir>/tsconfig.spec.json',
},
},
transform: {
'^.+\\.[tj]s$': 'ts-jest',
},
moduleFileExtensions: ['ts', 'js', 'html', 'node'],
coverageDirectory: '../../coverage/packages/extension-driver-canner',
testEnvironment: 'node',
};
29 changes: 29 additions & 0 deletions packages/extension-driver-canner/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"name": "@vulcan-sql/extension-driver-canner",
"description": "Canner Enterprise driver for Vulcan SQL",
"version": "0.4.0",
"type": "commonjs",
"publishConfig": {
"access": "public"
},
"keywords": [
"vulcan",
"vulcan-sql",
"data",
"sql",
"database",
"data-warehouse",
"data-lake",
"api-builder",
"postgres",
"pg"
],
"repository": {
"type": "git",
"url": "https://github.com/Canner/vulcan.git"
},
"license": "MIT",
"peerDependencies": {
"@vulcan-sql/core": "~0.4.0-0"
}
}
64 changes: 64 additions & 0 deletions packages/extension-driver-canner/project.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"root": "packages/extension-driver-canner",
"sourceRoot": "packages/extension-driver-canner/src",
"targets": {
"build": {
"executor": "@nrwl/workspace:run-commands",
"options": {
"command": "yarn ts-node ./tools/scripts/replaceAlias.ts extension-driver-canner"
},
"dependsOn": [
{
"projects": "self",
"target": "tsc"
}
]
},
"tsc": {
"executor": "@nrwl/js:tsc",
"outputs": ["{options.outputPath}"],
"options": {
"outputPath": "dist/packages/extension-driver-canner",
"main": "packages/extension-driver-canner/src/index.ts",
"tsConfig": "packages/extension-driver-canner/tsconfig.lib.json",
"assets": ["packages/extension-driver-canner/*.md"],
"buildableProjectDepsInPackageJsonType": "dependencies"
},
"dependsOn": [
{
"projects": "dependencies",
"target": "build"
}
]
},
"lint": {
"executor": "@nrwl/linter:eslint",
"outputs": ["{options.outputFile}"],
"options": {
"lintFilePatterns": ["packages/extension-driver-canner/**/*.ts"]
}
},
"test": {
"executor": "@nrwl/jest:jest",
"outputs": ["coverage/packages/extension-driver-canner"],
"options": {
"jestConfig": "packages/extension-driver-canner/jest.config.ts",
"passWithNoTests": true
}
},
"publish": {
"executor": "@nrwl/workspace:run-commands",
"options": {
"command": "node ../../../tools/scripts/publish.mjs {args.tag} {args.version}",
"cwd": "dist/packages/extension-driver-canner"
},
"dependsOn": [
{
"projects": "self",
"target": "build"
}
]
}
},
"tags": []
}
3 changes: 3 additions & 0 deletions packages/extension-driver-canner/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './lib/cannerDataSource';
import { CannerDataSource } from './lib/cannerDataSource';
export default [CannerDataSource];
135 changes: 135 additions & 0 deletions packages/extension-driver-canner/src/lib/cannerAdapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import axios from 'axios';
import { PGOptions } from './cannerDataSource';
import { ConnectionOptions } from 'tls';
import { createEnvConfig } from './config';

const envConfig = createEnvConfig();

export class CannerAdapter {
public readonly host: string;
public readonly workspaceSqlName: string;
public readonly PAT: string | (() => string | Promise<string>);
public readonly ssl: boolean | ConnectionOptions;
private baseUrl: string | undefined;

constructor(options?: PGOptions) {
if (!options) {
throw new Error(`connection options is required`);
}
const { host, database, password, ssl = false } = options;
if (!host || !database || !password) {
throw new Error(`host, database and password are required`);
}
this.host = host;
this.workspaceSqlName = database;
this.PAT = password;
this.ssl = ssl;
}

// When querying Canner enterprise, the Canner enterprise will save the query result as parquet files,
// and store them in S3. This method will return the S3 urls of the query result.
// For more Canner API ref: https://docs.cannerdata.com/reference/restful
public async createAsyncQueryResultUrls(sql: string): Promise<string[]> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding the comment on the createAsyncQueryResultUrls method to describe what the methods could do?
Our partner or new member may not have the context to know why we need to get the result URLs. so may suggest you describe what the result urls could do :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add some comment to explain it, thanks.

Copy link
Contributor

@kokokuo kokokuo May 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest using the logger.debug ( from our getLogger put in the core/utils ) to record at least steps:

  • send the async query to the Canner Enterprise
  • get the query result parquet files from URLs.

It could help the developer and QA to know the current status, working phase, and approximate location on the K8S when an error happened and the error is not caused by what we defined especially.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you miss the suggestion or ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delivered in 8ed2dfd

let data = await this.getWorkspaceRequestData('post', '/v2/async-queries', {
data: {
sql,
timeout: 600,
noLimit: true,
},
});

const { id: requestId } = data;
await this.waitAsyncQueryToFinish(requestId);

// get the query result after the query finished
data = await this.getRequestInfo(requestId);
if (data.error?.message) {
throw new Error(data.error.message);
}
const urls = await this.getAsyncQueryResultUrls(requestId);
return urls;
}

private async getWorkspaceRequestData(
method: string,
urlPath: string,
options?: Record<string, any>
) {
await this.prepare();
const response = await axios({
headers: {
Authorization: `Token ${this.PAT}`,
},
params: {
workspaceSqlName: this.workspaceSqlName,
},
url: `${this.baseUrl}${urlPath}`,
method,
...options,
});
if (response.status !== 200) {
throw new Error(
`Failed to get workspace request "${urlPath}" data, status: ${
response.status
}, data: ${JSON.stringify(response.data)}`
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you survey the error handling of the axios ?

The axios will throw erro when the status is not 200, so according to your logistic, it won' t trigger the if-else, please:
https://www.axios-http.cn/docs/handling_errors
https://stackoverflow.com/questions/49967779/axios-handling-errors

Otherwise, you should also check the Canner Enterprise hot to respond the error and it code, please see how we handle the different errors and wrap to HTTP status code to 500 when happened the errors, the reason you could see the comment https://github.com/Canner/canner-ui/blob/5ea2e698ebad8ed953daabe362e773461701bd09/server/errors/formatUnifiedError.ts#L61.

Btw, we also use the axios in Canner to handle the restful API from SQL Engine, and they also will throw different error codes, so that why we use the try-catch to handle it: https://github.com/Canner/canner-ui/blob/5ea2e698ebad8ed953daabe362e773461701bd09/server/adapters/externalQueryTable.ts#L45

And calling the async-query API, it also may throw the domain error and cause the 500 status code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delivered in 8ed2dfd

return response.data;
}

private async prepare() {
if (this.baseUrl) {
return;
}
const response = await axios({
method: 'get',
maxBodyLength: Infinity,
url: `${this.getCannerUrl()}/cluster-info`,
headers: {},
});
const { restfulApiBaseEndpoint } = response.data;
if (!restfulApiBaseEndpoint) {
throw new Error(
`The restful API base endpoint is not found, please check "restfulApiBaseEndpoint" field from "/cluster-info" endpoint of Canner Enterprise`
);
}

this.baseUrl = restfulApiBaseEndpoint;
}

private getCannerUrl() {
if (envConfig.isOnKubernetes) {
// use env to get the endpoint in k8s
return `http://${envConfig.webServiceHost}`;
} else {
// otherwise use the host user provided
const protocol = this.ssl ? 'https' : 'http';
return `${protocol}://${this.host}`;
}
}

private async waitAsyncQueryToFinish(requestId: string) {
let data = await this.getRequestInfo(requestId);
// FINISHED & FAILED are the end state of a async request, and the result urls will be generated only after the request is finished.
while (!['FINISHED', 'FAILED'].includes(data.status)) {
await new Promise((resolve) => setTimeout(resolve, 1000));
data = await this.getRequestInfo(requestId);
}
}

private async getRequestInfo(requestId: string) {
const data = await this.getWorkspaceRequestData(
'get',
`/v2/async-queries/${requestId}`
);
return data;
}

private async getAsyncQueryResultUrls(requestId: string): Promise<string[]> {
const data = await this.getWorkspaceRequestData(
'get',
`/v2/async-queries/${requestId}/result/urls`
);
return data.urls || [];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the /v2/async-queries/<requestId>/result/urls will throw an error from Canner Enterprise and generate an error message, maybe we should check if exist any errors and throw it.

Tips: we could handle the error in the WorkspaceRequest directly, then you don't need to handle it in different places

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll use status to check and handle the error message from the canner server

}
}
Loading