Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkhala committed Mar 25, 2024
1 parent 50c3568 commit 6ee8bd7
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 123 deletions.
146 changes: 73 additions & 73 deletions activemq/index.js
Original file line number Diff line number Diff line change
@@ -1,80 +1,80 @@
import {Client} from '@stomp/stompjs';
import assert from 'assert'
import assert from 'assert';

export const container = {
config: '/opt/activemq/conf',
data: '/opt/activemq/data'
}
config: '/opt/activemq/conf',
data: '/opt/activemq/data'
};
export const ActivationState = {
ACTIVE: 0,
DEACTIVATING: 1,
INACTIVE: 2,
}
ACTIVE: 0,
DEACTIVATING: 1,
INACTIVE: 2,
};

export class BaseClient {
constructor(host, {username, password} = {}, logger = console) {
assert.ok(typeof host === 'string', `typeof host !== 'string'`)

this.client = new Client({
debug: logger.debug,
reconnectDelay: 5000,
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000,
connectHeaders: {},
});
if (username) {
this.client.connectHeaders.login = username
}
if (password) {
this.client.connectHeaders.passcode = password
}
this.host = host

}

async connect() {

if (this.client.state === ActivationState.DEACTIVATING) {
throw new Error('Still DEACTIVATING, can not activate now');
}

if (this.client.active) {
this.client.debug('Already ACTIVE, ignoring request to activate');
return;
}

this.client.onWebSocketClose = (err, ...others) => {
console.error('onWebSocketClose', err, ...others)
};


this.client._changeState(ActivationState.ACTIVE);

await this.client._connect();
return new Promise((resolve, reject) => {
this.onConnect = (frame) => {
const {command} = frame
command === 'CONNECTED' ? resolve() : reject(frame)
}
})

}

async disconnect() {
await this.client.deactivate();
}

set onConnect(listener) {
this.client.onConnect = listener
}

send(topic, message) {
assert.ok(typeof topic === 'string', `Invalid destination type:${typeof topic}`)
assert.ok(typeof message === 'string', `Invalid body type:${typeof message}`)

this.client.publish({
destination: topic,
body: message,
});
}
constructor(host, {username, password} = {}, logger = console) {
assert.ok(typeof host === 'string', 'typeof host !== \'string\'');

this.client = new Client({
debug: logger.debug,
reconnectDelay: 5000,
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000,
connectHeaders: {},
});
if (username) {
this.client.connectHeaders.login = username;
}
if (password) {
this.client.connectHeaders.passcode = password;
}
this.host = host;

}

async connect() {

if (this.client.state === ActivationState.DEACTIVATING) {
throw new Error('Still DEACTIVATING, can not activate now');
}

if (this.client.active) {
this.client.debug('Already ACTIVE, ignoring request to activate');
return;
}

this.client.onWebSocketClose = (err, ...others) => {
console.error('onWebSocketClose', err, ...others);
};


this.client._changeState(ActivationState.ACTIVE);

await this.client._connect();
return new Promise((resolve, reject) => {
this.onConnect = (frame) => {
const {command} = frame;
command === 'CONNECTED' ? resolve() : reject(frame);
};
});

}

async disconnect() {
await this.client.deactivate();
}

set onConnect(listener) {
this.client.onConnect = listener;
}

send(topic, message) {
assert.ok(typeof topic === 'string', `Invalid destination type:${typeof topic}`);
assert.ok(typeof message === 'string', `Invalid body type:${typeof message}`);

this.client.publish({
destination: topic,
body: message,
});
}
}
10 changes: 5 additions & 5 deletions activemq/stomp.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {TCPWrapper} from '@stomp/tcp-wrapper';
import {BaseClient} from './index.js'
import {BaseClient} from './index.js';

export class STOMPClient extends BaseClient {
constructor(host, username, password, logger) {
super(host,{username, password}, logger);
this.client.webSocketFactory = () => new TCPWrapper(host, 61613);
}
constructor(host, username, password, logger) {
super(host, {username, password}, logger);
this.client.webSocketFactory = () => new TCPWrapper(host, 61613);
}
}
89 changes: 44 additions & 45 deletions activemq/websocketClient.js
Original file line number Diff line number Diff line change
@@ -1,81 +1,80 @@
import {BaseClient} from "./index.js";
import WebSocket from 'ws'
import assert from 'assert'
import {BaseClient} from './index.js';
import WebSocket from 'ws';

export class WebsocketClient extends BaseClient {

constructor(host, username, password, logger) {
super(host, {username, password}, logger)
constructor(host, username, password, logger) {
super(host, {username, password}, logger);

const brokerURL = `ws://${host}:61614`
this.client.webSocketFactory = () => {
return new WebSocket(brokerURL, this.client.stompVersions.protocolVersions())
}
}
const brokerURL = `ws://${host}:61614`;
this.client.webSocketFactory = () => {
return new WebSocket(brokerURL, this.client.stompVersions.protocolVersions());
};
}

/**
/**
*
* @param {closeEventCallbackType} listener
*/
set onClose(listener) {
this.client.onWebSocketClose = listener;
}
set onClose(listener) {
this.client.onWebSocketClose = listener;
}

/**
/**
*
* @param {frameCallbackType} listener
*/
set onError(listener) {
this.client.onStompError = listener;
}
set onError(listener) {
this.client.onStompError = listener;
}


/**
/**
*
* @return {ITransaction}
*/
createTransaction() {
return this.client.begin();
}
createTransaction() {
return this.client.begin();
}

/**
/**
*
* @param {ITransaction} tx
*/
commit(tx) {
tx.commit();
}
commit(tx) {
tx.commit();
}

/**
/**
*
* @param {ITransaction} tx
*/
cancel(tx) {
tx.abort();
}
cancel(tx) {
tx.abort();
}

/**
/**
*
* @param topic
* @param {function(message:IMessage)} listener
* @return {StompSubscription}
*/
subscribe(topic, listener) {
return this.client.subscribe(topic, listener);
}
subscribe(topic, listener) {
return this.client.subscribe(topic, listener);
}

unsubscribe(subscriptionID) {
this.client.unsubscribe(subscriptionID)
}
unsubscribe(subscriptionID) {
this.client.unsubscribe(subscriptionID);
}

async disconnect() {
await super.disconnect()
this.client.onWebSocketClose = () => {
delete this.client.onConnect;
delete this.client.onStompError;
delete this.client.onWebSocketClose;
};
}
async disconnect() {
await super.disconnect();
this.client.onWebSocketClose = () => {
delete this.client.onConnect;
delete this.client.onStompError;
delete this.client.onWebSocketClose;
};
}
}


0 comments on commit 6ee8bd7

Please sign in to comment.