Skip to content

Commit

Permalink
Merge pull request #172 from Canner/feature/canner-enterprise-connector
Browse files Browse the repository at this point in the history
Feature: Canner enterprise connector
  • Loading branch information
kokokuo committed Jun 6, 2023
2 parents 2b5e282 + a646f5c commit a0711f0
Show file tree
Hide file tree
Showing 20 changed files with 1,108 additions and 0 deletions.
18 changes: 18 additions & 0 deletions packages/extension-driver-canner/.eslintrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"extends": ["../../.eslintrc.json"],
"ignorePatterns": ["!**/*"],
"overrides": [
{
"files": ["*.ts", "*.tsx", "*.js", "*.jsx"],
"rules": {}
},
{
"files": ["*.ts", "*.tsx"],
"rules": {}
},
{
"files": ["*.js", "*.jsx"],
"rules": {}
}
]
}
56 changes: 56 additions & 0 deletions packages/extension-driver-canner/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# extension-driver-canner

Connect to [canner enterprise](https://docs.cannerdata.com/product/api_sdk/pg/pg_overview) through PostgreSQL Wire Protocol

## Install

1. Install package

```sql
npm i @vulcan-sql/extension-driver-canner
```

2. Update `vulcan.yaml`, enable the extension.

```yaml
extensions:
canner: '@vulcan-sql/extension-driver-canner'
```

3. Create a new profile in `profiles.yaml` or in your profiles' paths.
```yaml
- name: canner # profile name
type: canner
connection:
# Optional: Server host.
host: string
# Optional: The user to connect to canner enterprise. Default canner
user: string
# Optional: Password to connect to canner enterprise. should be the user PAT in canner enterprise
password: string
# Optional: sql name of the workspace.
database: string
# Optional: canner enterprise PostgreSQL wire protocol port
port: 7432
# Optional: The max rows we should fetch once.
chunkSize: 100
# Optional: Maximum number of clients the pool should contain.
max: 10
# Optional: Number of milliseconds before a statement in query will time out, default is no timeout
statement_timeout: 0
# Optional: Passed directly to node.TLSSocket, supports all tls.connect options
ssl: false
# Optional: Number of milliseconds before a query call will timeout, default is no timeout
query_timeout: 0
# Optional: The name of the application that created this Client instance
application_name: string
# Optional: Number of milliseconds to wait for connection, default is no timeout
connectionTimeoutMillis: 0
# Optional: Number of milliseconds before terminating any session with an open idle transaction, default is no timeout
idle_in_transaction_session_timeout: 0
# Optional: Number of milliseconds a client must sit idle in the pool and not be checked out before it is disconnected from the backend and discarded.
idleTimeoutMillis: 10000
```
15 changes: 15 additions & 0 deletions packages/extension-driver-canner/jest.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module.exports = {
displayName: 'extension-driver-canner',
preset: '../../jest.preset.ts',
globals: {
'ts-jest': {
tsconfig: '<rootDir>/tsconfig.spec.json',
},
},
transform: {
'^.+\\.[tj]s$': 'ts-jest',
},
moduleFileExtensions: ['ts', 'js', 'html', 'node'],
coverageDirectory: '../../coverage/packages/extension-driver-canner',
testEnvironment: 'node',
};
29 changes: 29 additions & 0 deletions packages/extension-driver-canner/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"name": "@vulcan-sql/extension-driver-canner",
"description": "Canner Enterprise driver for Vulcan SQL",
"version": "0.4.0",
"type": "commonjs",
"publishConfig": {
"access": "public"
},
"keywords": [
"vulcan",
"vulcan-sql",
"data",
"sql",
"database",
"data-warehouse",
"data-lake",
"api-builder",
"postgres",
"pg"
],
"repository": {
"type": "git",
"url": "https://github.com/Canner/vulcan.git"
},
"license": "MIT",
"peerDependencies": {
"@vulcan-sql/core": "~0.4.0-0"
}
}
64 changes: 64 additions & 0 deletions packages/extension-driver-canner/project.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"root": "packages/extension-driver-canner",
"sourceRoot": "packages/extension-driver-canner/src",
"targets": {
"build": {
"executor": "@nrwl/workspace:run-commands",
"options": {
"command": "yarn ts-node ./tools/scripts/replaceAlias.ts extension-driver-canner"
},
"dependsOn": [
{
"projects": "self",
"target": "tsc"
}
]
},
"tsc": {
"executor": "@nrwl/js:tsc",
"outputs": ["{options.outputPath}"],
"options": {
"outputPath": "dist/packages/extension-driver-canner",
"main": "packages/extension-driver-canner/src/index.ts",
"tsConfig": "packages/extension-driver-canner/tsconfig.lib.json",
"assets": ["packages/extension-driver-canner/*.md"],
"buildableProjectDepsInPackageJsonType": "dependencies"
},
"dependsOn": [
{
"projects": "dependencies",
"target": "build"
}
]
},
"lint": {
"executor": "@nrwl/linter:eslint",
"outputs": ["{options.outputFile}"],
"options": {
"lintFilePatterns": ["packages/extension-driver-canner/**/*.ts"]
}
},
"test": {
"executor": "@nrwl/jest:jest",
"outputs": ["coverage/packages/extension-driver-canner"],
"options": {
"jestConfig": "packages/extension-driver-canner/jest.config.ts",
"passWithNoTests": true
}
},
"publish": {
"executor": "@nrwl/workspace:run-commands",
"options": {
"command": "node ../../../tools/scripts/publish.mjs {args.tag} {args.version}",
"cwd": "dist/packages/extension-driver-canner"
},
"dependsOn": [
{
"projects": "self",
"target": "build"
}
]
}
},
"tags": []
}
3 changes: 3 additions & 0 deletions packages/extension-driver-canner/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './lib/cannerDataSource';
import { CannerDataSource } from './lib/cannerDataSource';
export default [CannerDataSource];
143 changes: 143 additions & 0 deletions packages/extension-driver-canner/src/lib/cannerAdapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import axios from 'axios';
import { PGOptions } from './cannerDataSource';
import { ConnectionOptions } from 'tls';
import { createEnvConfig } from './config';
import { InternalError, getLogger } from '@vulcan-sql/core';

const envConfig = createEnvConfig();

export class CannerAdapter {
public readonly host: string;
public readonly workspaceSqlName: string;
public readonly PAT: string | (() => string | Promise<string>);
public readonly ssl: boolean | ConnectionOptions;
private baseUrl: string | undefined;
private logger = getLogger({ scopeName: 'CORE' });

constructor(options?: PGOptions) {
if (!options) {
throw new Error(`connection options is required`);
}
const { host, database, password, ssl = false } = options;
if (!host || !database || !password) {
throw new Error(`host, database and password are required`);
}
this.host = host;
this.workspaceSqlName = database;
this.PAT = password;
this.ssl = ssl;
}

// When querying Canner enterprise, the Canner enterprise will save the query result as parquet files,
// and store them in S3. This method will return the S3 urls of the query result.
// For more Canner API ref: https://docs.cannerdata.com/reference/restful
public async createAsyncQueryResultUrls(sql: string): Promise<string[]> {
this.logger.debug(`Create async request to Canner.`);
let data = await this.getWorkspaceRequestData('post', '/v2/async-queries', {
data: {
sql,
timeout: 600,
noLimit: true,
},
});

const { id: requestId } = data;
this.logger.debug(`Wait Async request to finished.`);
await this.waitAsyncQueryToFinish(requestId);

// get the query result after the query finished
data = await this.getRequestInfo(requestId);
if (data.error?.message) {
throw new Error(data.error.message);
}

this.logger.debug(`Get Query result urls.`);
const urls = await this.getAsyncQueryResultUrls(requestId);
this.logger.debug(`Query result urls: \n${urls.join('\n')}`);
return urls;
}

private async getWorkspaceRequestData(
method: string,
urlPath: string,
options?: Record<string, any>
) {
await this.prepare();
try {
const response = await axios({
headers: {
Authorization: `Token ${this.PAT}`,
},
params: {
workspaceSqlName: this.workspaceSqlName,
},
url: `${this.baseUrl}${urlPath}`,
method,
...options,
});
return response.data;
} catch (error: any) {
const message = error.response
? `response status: ${error.response.status}, response data: ${error.response.data}`
: `remote server does not response. request ${error.toJSON()}}`;
throw new InternalError(
`Failed to get workspace request "${urlPath}" data, ${message}`
);
}
}

private async prepare() {
if (this.baseUrl) {
return;
}
const response = await axios({
method: 'get',
maxBodyLength: Infinity,
url: `${this.getCannerUrl()}/cluster-info`,
headers: {},
});
const { restfulApiBaseEndpoint } = response.data;
if (!restfulApiBaseEndpoint) {
throw new Error(
`The restful API base endpoint is not found, please check "restfulApiBaseEndpoint" field from "/cluster-info" endpoint of Canner Enterprise`
);
}

this.baseUrl = restfulApiBaseEndpoint;
}

private getCannerUrl() {
if (envConfig.isOnKubernetes) {
// use env to get the endpoint in k8s
return `http://${envConfig.webServiceHost}`;
} else {
// otherwise use the host user provided
const protocol = this.ssl ? 'https' : 'http';
return `${protocol}://${this.host}`;
}
}

private async waitAsyncQueryToFinish(requestId: string) {
let data = await this.getRequestInfo(requestId);
// FINISHED & FAILED are the end state of a async request, and the result urls will be generated only after the request is finished.
while (!['FINISHED', 'FAILED'].includes(data.status)) {
await new Promise((resolve) => setTimeout(resolve, 1000));
data = await this.getRequestInfo(requestId);
}
}

private async getRequestInfo(requestId: string) {
return await this.getWorkspaceRequestData(
'get',
`/v2/async-queries/${requestId}`
);
}

private async getAsyncQueryResultUrls(requestId: string): Promise<string[]> {
const data = await this.getWorkspaceRequestData(
'get',
`/v2/async-queries/${requestId}/result/urls`
);
return data.urls || [];
}
}
Loading

0 comments on commit a0711f0

Please sign in to comment.