Skip to content

Commit

Permalink
no websocket need
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkhala committed Mar 27, 2024
1 parent 60c6d88 commit 10a1f6f
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 155 deletions.
58 changes: 33 additions & 25 deletions activemq/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import {Client} from '@stomp/stompjs';
import DB from '@davidkhala/db/index.js';
import assert from 'assert';
import {TCPWrapper} from '@stomp/tcp-wrapper';

export const container = {
config: '/opt/activemq/conf',
Expand All @@ -11,46 +13,38 @@ export const ActivationState = {
INACTIVE: 2,
};

export class BaseClient {
constructor(host, {username, password} = {}, logger = console) {
assert.ok(typeof host === 'string', 'typeof host !== \'string\'');
export class STOMP extends DB {
constructor({domain, username, password}, logger = console) {
super({domain, username, password}, undefined, logger);

this.client = new Client({
this.connection = new Client({
debug: logger.debug,
reconnectDelay: 5000,
heartbeatIncoming: 4000,
heartbeatOutgoing: 4000,
connectHeaders: {},
connectHeaders: {
login: username,
passcode: password
},
});
if (username) {
this.client.connectHeaders.login = username;
}
if (password) {
this.client.connectHeaders.passcode = password;
}
this.host = host;

this.connection.webSocketFactory = () => new TCPWrapper(domain, 61613);
}

async connect() {

if (this.client.state === ActivationState.DEACTIVATING) {
if (this.connection.state === ActivationState.DEACTIVATING) {
throw new Error('Still DEACTIVATING, can not activate now');
}

if (this.client.active) {
this.client.debug('Already ACTIVE, ignoring request to activate');
if (this.connection.active) {
this.connection.debug('Already ACTIVE, ignoring request to activate');
return;
}

this.client.onWebSocketClose = (err, ...others) => {
console.error('onWebSocketClose', err, ...others);
};


this.client._changeState(ActivationState.ACTIVE);
this.connection._changeState(ActivationState.ACTIVE);

await this.client._connect();
await this.connection._connect();
return new Promise((resolve, reject) => {
this.onConnect = (frame) => {
const {command} = frame;
Expand All @@ -61,20 +55,34 @@ export class BaseClient {
}

async disconnect() {
await this.client.deactivate();
await this.connection.deactivate();
}

set onConnect(listener) {
this.client.onConnect = listener;
this.connection.onConnect = listener;
}

send(topic, message) {
assert.ok(typeof topic === 'string', `Invalid destination type:${typeof topic}`);
assert.ok(typeof message === 'string', `Invalid body type:${typeof message}`);

this.client.publish({
this.connection.publish({
destination: topic,
body: message,
});
}

/**
*
* @param topic
* @param {function(message:IMessage)} listener
* @return {StompSubscription}
*/
subscribe(topic, listener) {
return this.connection.subscribe(topic, listener);
}

unsubscribe(subscriptionID) {
this.connection.unsubscribe(subscriptionID);
}
}
2 changes: 1 addition & 1 deletion activemq/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"dependencies": {
"@stomp/stompjs": "beta",
"@stomp/tcp-wrapper": "latest",
"ws": "latest"
"@davidkhala/db": "latest"
},
"devDependencies": {
"@davidkhala/light": "latest",
Expand Down
9 changes: 0 additions & 9 deletions activemq/stomp.js

This file was deleted.

23 changes: 0 additions & 23 deletions activemq/test-oneoff/localhost.js

This file was deleted.

36 changes: 19 additions & 17 deletions activemq/test/localhost.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
import {WebsocketClient} from '../websocketClient.js';
import {STOMPClient} from '../stomp.js';
import {STOMP} from '../index.js';
import assert from 'assert';

const username = 'artemis';
const password = 'artemis';
const domain = 'localhost';
describe('STOMP', function () {
this.timeout(0);
const client = new STOMP({domain, username, password});
it('connect', async () => {
const host = 'localhost';
const client = new STOMPClient(host);
await client.connect();

client.send('a', 'b');
await client.disconnect();
});

});

describe('websocket', function () {
this.timeout(0);
const host = 'localhost';
const conn = new WebsocketClient(host);

it('connect', async () => {
await conn.connect();

conn.send('a', 'b');
await conn.disconnect();
it('listen', async () => {
const topic = 'a';
await client.connect();
const message = 'b';
const received = await new Promise(resolve => {
const listener = client.subscribe(topic, (frame) => {
listener.unsubscribe();
resolve(frame.body);
});
client.send(topic, message);
});
assert.equal(received, message);
await client.disconnect();

});
});
Expand Down
80 changes: 0 additions & 80 deletions activemq/websocketClient.js

This file was deleted.

0 comments on commit 10a1f6f

Please sign in to comment.