Skip to content

Commit

Permalink
feat: WSC for web-socket client
Browse files Browse the repository at this point in the history
  • Loading branch information
steve-lemon committed Aug 14, 2019
1 parent 5435b9b commit 3fd3a15
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 117 deletions.
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@
"lemon-hello-api": "^1.3.1",
"morgan": "^1.9.1",
"multer": "^1.4.1",
"nanoid": "^2.0.3",
"request": "^2.88.0",
"source-map-support": "^0.5.12"
"source-map-support": "^0.5.12",
"ws": "^7.1.2"
},
"devDependencies": {
"@types/aws-serverless-express": "^3.3.1",
Expand All @@ -66,9 +68,11 @@
"@types/lodash": "^4.14.130",
"@types/morgan": "^1.7.35",
"@types/multer": "^1.3.8",
"@types/nanoid": "^2.0.0",
"@types/request": "^2.48.1",
"@types/supertest": "^2.0.7",
"@types/winston": "^2.4.4",
"@types/ws": "^6.0.2",
"@typescript-eslint/eslint-plugin": "^1.7.0",
"@typescript-eslint/parser": "^1.7.0",
"aws-sdk": "^2.363.0",
Expand Down
194 changes: 84 additions & 110 deletions src/builder/WSC.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,131 +12,93 @@
* ```
*
*
* @author Steve Jung <steve@lemoncloud.io>
* @origin See `lemon-clusters-api/WSC.js`
* @author Steve Jung <steve@lemoncloud.io>
* @date 2019-08-14 refactoring version via latest `WSC.js`
*
* Copyright (C) 2019 LemonCloud Co Ltd. - All Rights Reserved.
* @copyright (C) 2019 LemonCloud Co Ltd. - All Rights Reserved.
*/
import { $U, _log, _inf, _err } from '../core/engine';
import { $engine } from '../core/engine';
import { WebResult, BrokerBuilder, CoreHandler } from '../common/types';
import { executeServiceApi } from '../core/engine';

//! custom definitions.
// const WebSocket = require('ws');
import * as WebSocket from 'ws';

//! main exports
module.exports = ($engine, params) => {
'use strict';
if (!$engine) throw new Error('$engine(lemon-engine) is required!');

//! load core services (_$ defined in global)
const $U = $engine.U; // re-use global instance (utils).
if (!$U) throw new Error('$U(utilities) is required!');

//! load common functions
const _log = $engine.log;
const _inf = $engine.inf;
const _err = $engine.err;

//! local constant
const NS = $U.NS('WSC', 'yellow'); // NAMESPACE TO BE PRINTED.
const DEFAULT_TYPE = $engine.environ('DEFAULT_TYPE', 'clusters');
const WSS_ENDPOINT = $engine.environ('WSS_ENDPOINT', params.url || '');
if (!WSS_ENDPOINT) throw new Error('env:WSS_ENDPOINT is required!');
_inf(NS, `! WSS_ENDPOINT[${DEFAULT_TYPE}] =`, WSS_ENDPOINT);

function success(body) {
return buildResponse(200, body);
}

function notfound(body) {
return buildResponse(404, body);
}
import WebSocket from 'ws';
import generator from 'nanoid/generate';

function failure(body) {
return buildResponse(503, body);
}
export const success = (body: any) => {
return buildResponse(200, body);
};

function buildResponse(statusCode, body) {
return {
statusCode: statusCode,
body: body === undefined ? undefined : typeof body == 'string' ? body : JSON.stringify(body),
};
}
export const notfound = (body: any) => {
return buildResponse(404, body);
};

//! chain for HTTP type.
const executeServiceApi = (
method,
type = DEFAULT_TYPE,
id = '',
cmd = '',
param = null,
body = null,
context = null,
) =>
new Promise((resolve, reject) => {
if (!method) throw new Error('method is required!');
if (method && typeof method === 'object') {
const data = method;
type = '' + (type || DEFAULT_TYPE); //MUST BE STRING!
method = '' + (data.method || 'get'); //MUST BE STRING!
id = '' + (data.id || id); //MUST BE STRING!
cmd = '' + (data.cmd || cmd); //MUST BE STRING!
param = data.param;
body = data.body;
context = data.context;
}
method = `${method}`.toUpperCase();
_log(NS, `executeServiceApi(${method}, ${type}, ${id}, ${cmd})...`);
// _log(NS, `> ${method} ${type}/${id}/${cmd} param=`, param);
export const failure = (body: any) => {
return buildResponse(503, body);
};

//! lookup target-api by name.
const API = $engine(type);
if (!API) new Error('404 NOT FOUND - API.type:' + type);
export const buildResponse = (statusCode: number, body: any): WebResult => {
return {
statusCode: statusCode,
body: body === undefined ? undefined : typeof body == 'string' ? body : JSON.stringify(body),
};
};

//! transform to APIGatewayEvent;
const event = {
httpMethod: method,
path: cmd ? `/${id}/${cmd}` : id !== undefined ? `/${id}` : `/`,
headers: {},
pathParameters: {},
queryStringParameters: {},
body: '',
isBase64Encoded: false,
stageVariables: null,
requestContext: context || {},
resource: '',
};
if (id !== undefined) event.pathParameters.id = id;
if (cmd !== undefined) event.pathParameters.cmd = cmd;
if (param) event.queryStringParameters = param;
if (body) event.body = body;
export interface WebClientHandler extends CoreHandler<any> {
client: any;
}

//! basic handler type. (see bootload.main)
API(event, {}, (err, res) => {
err && reject(err);
!err && resolve(res);
});
});
/**
* build WSC() handler.
*
* @param {*} defType default type of $api()
* @param {*} NS namespace to print
* @param {*} params namespace to print
*/
const builder: BrokerBuilder<any> = (defType, NS, params) => {
defType = defType || 'hello';
NS = NS || $U.NS(`WSC`, 'yellow'); // NAMESPACE TO BE PRINTED.
params = params || {};

//! load default-handler type.
const DEFAULT_TYPE = $engine.environ('DEFAULT_TYPE', defType) as string;
const WSS_ENDPOINT = $engine.environ('WSS_ENDPOINT', params.url || '') as string;
if (!WSS_ENDPOINT) throw new Error('env:WSS_ENDPOINT is required!');
_inf(NS, `! WSS_ENDPOINT[${DEFAULT_TYPE}] =`, WSS_ENDPOINT);
const headers = params.headers || {};
const isStart = $U.N(params.start, params.start === '' ? 1 : 0);

/**
* create websocket client.
*/
const client = (url => {
const client = ((url: string) => {
_log(NS, `client(${url})...`);
const AUTO_RECONNECT_INTERVAL = 2.5 * 1000; // Reconnect Retry (ms)
const MAX_TIMEOUT = 30 * 1000; // Max Wait Timeout (ms)
const WSC_REQID_KEY = '$wsc-request-id'; // Client Request ID
const WSS_REQID_KEY = '$wss-request-id'; // Server Request ID

//! prepare thiz internal.
const op = () => {};
const thiz = {
_instance: null,
_waits: {},
_instance: null as any,
_waits: {} as any,
start: op,
stop: op,
post: op,
send: op,
reconnect: op,
next_id: op,
};

//! open socket with server.
const open = () => {
_log(NS, '> open()...');
if (thiz._instance) throw new Error('already connected!');
const instance = new WebSocket(url, {
headers: params.headers || {},
headers,
// agent: params.agent||`wsc/${DEFAULT_TYPE}`,
perMessageDeflate: false, // see: https://github.com/websockets/ws#websocket-compression
});
Expand All @@ -159,14 +121,13 @@ module.exports = ($engine, params) => {

//! generate next-id.
const next_id = () => {
const generator = require('nanoid/generate');
const ID_CHARS = '1234567890abcdefghjkmnpqrstuvwxyz';
const id = generator(ID_CHARS, 12);
return `WSC${id}`;
};

//! post message without wait
const post = (data, options) => {
const post = (data: any, options?: any) => {
_log(NS, '! post()...');
if (!thiz._instance) throw new Error('404 NOT CONNECTED');
const payload = data && typeof data == 'object' ? JSON.stringify(data) : `${data}`;
Expand All @@ -175,7 +136,7 @@ module.exports = ($engine, params) => {
};

//! send message and get response
const send = (data, options) =>
const send = (data: any, options?: any) =>
new Promise((resolve, reject) => {
_log(NS, '! send()...');
if (!thiz._instance) throw new Error('404 NOT CONNECTED');
Expand All @@ -197,11 +158,18 @@ module.exports = ($engine, params) => {
};
_log(NS, '> sent =', data);
//! send with data as text.
thiz._instance.send(data && typeof data == 'object' ? JSON.stringify(data) : `${data}`, options);
thiz._instance.send(
data && typeof data == 'object' ? JSON.stringify(data) : `${data}`,
options,
(err: Error) => {
err && _err(NS, '! err.send =', err);
err && reject(err);
},
);
});

//! handle message via server.
const on_message = async (body, flags) => {
const on_message = async (body: any, flags: any) => {
_log(NS, '! on.message()...');
_log(NS, '> body =', typeof body, body);
_log(NS, '> flags =', typeof flags, flags);
Expand All @@ -216,8 +184,9 @@ module.exports = ($engine, params) => {
clientReqId && _inf(NS, `> data[${WSC_REQID_KEY}]=`, clientReqId);

//NOTE - in connected state, send result via web-socket with success
const message = await (async () => {
const message: any = await (async () => {
if (data && typeof data === 'object') {
data.type = data.type || DEFAULT_TYPE;
data.context = null; //TODO - Set proper context.
if (clientReqId) {
const waits = thiz._waits[clientReqId];
Expand Down Expand Up @@ -252,7 +221,7 @@ module.exports = ($engine, params) => {
};

//! reconnect in some interval.
const reconnect = e => {
const reconnect = (e: any) => {
_log(NS, '> reconnect()... e=', e);
close();
setTimeout(() => {
Expand All @@ -262,12 +231,12 @@ module.exports = ($engine, params) => {

const on_open = () => {
_log(NS, '! on.open()...');
executeServiceApi({ method: 'CONNECT', context: null, headers: params.headers });
executeServiceApi({ method: 'CONNECT', type: DEFAULT_TYPE, context: null, headers: params.headers });
};

const on_close = e => {
const on_close = (e: any) => {
_log(NS, '! on.close()... e=', e);
executeServiceApi({ method: 'DISCONNECT', context: null });
executeServiceApi({ method: 'DISCONNECT', type: DEFAULT_TYPE, context: null });
switch (e) {
case 1000: // CLOSE_NORMAL
_log(NS, '! closed');
Expand All @@ -279,7 +248,7 @@ module.exports = ($engine, params) => {
}
};

const on_error = e => {
const on_error = (e: any) => {
_err(NS, '! on.error =', e);
switch (e.code) {
case 'ECONNREFUSED':
Expand All @@ -291,7 +260,7 @@ module.exports = ($engine, params) => {
};

//! attach
Object.assign(thiz, { start: open, stop: close, post, send, reconnect });
Object.assign(thiz, { start: open, stop: close, post, send, reconnect, next_id });

//! retruns instance.
return thiz;
Expand All @@ -311,7 +280,8 @@ module.exports = ($engine, params) => {
* @param {*} event
* @param {*} context
*/
const WSC = async (event, context) => {
//! Common SNS Handler for lemon-protocol integration.
const WSC: WebClientHandler = (event, context, callback) => {
// context.callbackWaitsForEmptyEventLoop = false;
// _log(NS, '! event =', event);
// _log(NS, '! context=', context);
Expand All @@ -320,7 +290,11 @@ module.exports = ($engine, params) => {

//! send message.
WSC.client = client;
if (isStart) client.start();

//! returns main SNS handler.
return WSC;
};

//! export default.
export default builder;
Loading

0 comments on commit 3fd3a15

Please sign in to comment.