Skip to content

Commit

Permalink
pulsar handicap
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkhala committed Jan 19, 2024
1 parent bcd006f commit f58f8fa
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 79 deletions.
91 changes: 54 additions & 37 deletions pulsar/index.js
Original file line number Diff line number Diff line change
@@ -1,50 +1,67 @@
import {Client} from 'pulsar-client';
import DB from '@davidkhala/db'
import PulsarClient from 'pulsar-client';
import PubSub from '@davidkhala/pubsub';
import {hostname} from '@davidkhala/light/devOps.js';

export default class Pulsar extends DB {
constructor({domain, port = 6650, name, username, password}, connectionString, logger) {
const {Client} = PulsarClient;

super({domain, port, name, username, password, dialect: 'pulsar'}, connectionString, logger);
this.connection = new Client({
serviceUrl: this.connectionString
})
export default class Pulsar extends PubSub {
constructor({domain, port = 6650}, connectionString, logger) {

}
super({domain, port, dialect: 'pulsar'}, connectionString, logger);
this.connection = new Client({
serviceUrl: this.connectionString
});

set mTLS(bool) {
if (bool) {
this.driver = 'ssl'
} else {
delete this.driver
}
}
}

async _connect() {
return Promise.resolve(undefined);
}
set mTLS(bool) {
if (bool) {
this.driver = 'ssl';
} else {
delete this.driver;
}
}

_throwConnectError(e) {
return true;
}

async disconnect() {

async disconnect() {
return this.connection.close();
}
this.producer && await this.producer.close();
this.consumer && await this.consumer.close();
await this.connection.close();
}

async query(template, values, requestOptions) {
// https://pulsar.apache.org/docs/next/sql-overview/
return Promise.resolve(undefined);
}

async producer(topic) {
return await this.connection.createProducer({
topic,
})
}
async consumer(subscription, topic ){
return await this.connection.subscribe({topic, subscription})
}
async acknowledge(message) {
// TODO cannot build Message from string
return this.consumer.acknowledge(message);
}

async preSend(topic) {
this.producer = await this.connection.createProducer({
topic,
});
return this.producer;
}

async send(topic, message) {
if (!this.producer) {
await this.preSend(topic);
}

const msgId = await this.producer.send({data: Buffer.from(message)});
return msgId.toString();
}

async subscribe(topic, subscription) {
if (!subscription) {
subscription = `${hostname}:${process.pid}`;
}
const consumer = await this.connection.subscribe({
topic,
subscription
});
this.consumer = consumer;
return await consumer.receive();

}
}
3 changes: 2 additions & 1 deletion pulsar/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
},
"dependencies": {
"pulsar-client": "latest",
"@davidkhala/db": "latest"
"@davidkhala/pubsub": "latest",
"@davidkhala/light": "latest"
},
"devDependencies": {
"mocha": "latest"
Expand Down
33 changes: 0 additions & 33 deletions pulsar/test-oneoff/e2e.js

This file was deleted.

20 changes: 20 additions & 0 deletions pulsar/test/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import Pulsar from '../index.js';

describe('pulsar', function () {
this.timeout(0);
const pulsar = new Pulsar({domain: 'localhost'});
const topic = 'topic';
const message = 'message';
it('pub', async () => {
const msgId = await pulsar.send(topic, message);
console.info({msgId});
});
if (!process.env.CI) {
it('sub', async () => {
const msg = await pulsar.subscribe(topic);
await pulsar.acknowledge(msg);
});
}


});
8 changes: 0 additions & 8 deletions solace/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ export class SolaceConnect extends DB {
}


get dba() {
return undefined;
}

async disconnect() {
if (this.connection) {
this.connection.disconnect();
Expand All @@ -81,10 +77,6 @@ export class SolaceConnect extends DB {

}

async query(template, values, requestOptions) {
return Promise.resolve(undefined);
}

_throwConnectError(e) {
return undefined;
}
Expand Down

0 comments on commit f58f8fa

Please sign in to comment.