Skip to content

Commit

Permalink
[wip] rebase to new @davidkhala/pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkhala committed Jan 20, 2024
1 parent 22017c4 commit 7f94c75
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 79 deletions.
1 change: 1 addition & 0 deletions common/.npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test
69 changes: 56 additions & 13 deletions common/index.js
Original file line number Diff line number Diff line change
@@ -1,34 +1,77 @@
import DB from '@davidkhala/db';
import DB, {Connectable} from '@davidkhala/db';

export default class MQ extends DB {
constructor({domain, port, name, username, password, dialect, driver}, connectionString, logger) {
super({domain, port, name, username, password, dialect, driver}, connectionString, logger);
}

/**
* @abstract
* @return {Promise<Pub>|Pub}
*/
get producer() {

}

/**
* @abstract
* @return {Promise<Sub>|Sub}
*/
get consumer() {

}

/**
*
* @param {string} topic
* @param {string} message
* @param [options]
* @abstract
*/
* @return {Promise<Admin>|Admin}
*/
get admin() {

}

}

export class Pub extends Connectable {
/**
*
* @abstract
* @param {string} topic
* @param {string} message
* @param [options]
*/
async send(topic, message, ...options) {

}

}

export class Sub extends Connectable {
/**
*
* @param {string} topic
* @param [options]
*
* @abstract
*/
* @param {string} topic
* @param [options]
*/
async subscribe(topic, ...options) {

}

/**
*
* @param {string} message
*
* @abstract
*/
* @param {string} message
*/
async acknowledge(message) {

}

}

export class Admin extends Connectable {
/**
* @abstract
*/
async listTopics() {

}
}
6 changes: 5 additions & 1 deletion common/package.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
{
"name": "@davidkhala/pubsub",
"version": "0.0.1",
"version": "0.0.4",
"description": "common module for message queue, broker, eventhub, pub/sub",
"main": "index.js",
"scripts": {
"test": "mocha test"
},
"repository": {
"type": "git",
Expand All @@ -12,6 +13,9 @@
"publishConfig": {
"access": "public"
},
"devDependencies": {
"mocha": "latest"
},
"dependencies": {
"@davidkhala/db": "latest"
},
Expand Down
4 changes: 4 additions & 0 deletions common/test/syntax.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import '../index.js'
it('OK', function (){

})
124 changes: 72 additions & 52 deletions kafka/index.js
Original file line number Diff line number Diff line change
@@ -1,55 +1,75 @@
import {Kafka} from 'kafkajs'


export default class KafkaManager {

/**
*
* @param {string[]} brokers
* @param {string} [username]
* @param {string} [password]
* @param {string} [clientId] default to "kafkajs"
*/
constructor(brokers, {username, password, clientId} = {}) {
const config = {
clientId,
brokers,
}
if (username) {
Object.assign(config, {
ssl: true,
sasl: {
mechanism: 'plain', // scram-sha-256 or scram-sha-512
username,
password
},
})
}
this.kafkaManager = new Kafka(config)
}

get admin() {
if (!this._admin) {
this._admin = this.kafkaManager.admin()
}
return this._admin
}

async connect() {
await this.admin.connect()
}

async disconnect() {
await this.admin.disconnect()
}

async listTopics() {
return this.admin.listTopics()
}

producer() {
return this.kafkaManager.producer()
}
import kafkajs from 'kafkajs';
import PubSub, {Admin as AbstractAdmin} from '@davidkhala/pubsub';
const {Admin, Kafka} = kafkajs

export default class KafkaManager extends PubSub {

/**
*
* @param {string[]} brokers
* @param {string} [username]
* @param {string} [password]
* @param {string} [clientId] default to "kafkajs"
* @param logger
*/
constructor(brokers, {username, password, clientId} = {}, logger) {
super({name: clientId, username, password}, undefined, logger);
const config = {
clientId,
brokers,
};
if (username) {
Object.assign(config, {
ssl: true,
sasl: {
mechanism: 'plain', // scram-sha-256 or scram-sha-512
username,
password
},
});
}
this.kafkaManager = new Kafka(config);
}

get admin() {
if (!this._admin) {
this._admin = new KafkaAdmin(this.kafkaManager.admin());
}
return this._admin;
}


get producer() {
return this.kafkaManager.producer();
}

get consumer() {
return this.kafkaManager.consumer();
}

}

export class KafkaAdmin extends AbstractAdmin {
/**
*
* @param {Admin} admin
*/
constructor(admin) {
super();
this.admin = admin;
}

async listTopics() {
return this.admin.listTopics();
}

async _connect() {
await this.admin.connect();
return true;
}

async disconnect() {
await this.admin.disconnect();
}

}
3 changes: 2 additions & 1 deletion kafka/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"author": "david Liu <david-khala@hotmail.com>",
"license": "Apache-2.0",
"dependencies": {
"kafkajs": "latest"
"kafkajs": "latest",
"@davidkhala/pubsub": "latest"
},
"scripts": {
"test": "mocha test",
Expand Down
8 changes: 4 additions & 4 deletions kafka/test/IBM.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ describe('IBM Event Stream', function () {

it('connect', async () => {
const client = new KafkaManager(kafka_brokers_sasl, {username, password: api_key});

await client.connect();
const result = await client.listTopics();
const {admin} = client
await admin.connect();
const result = await admin.listTopics();

assert.ok(result.includes(topic));
await client.disconnect();
await admin.disconnect();
});
});
8 changes: 4 additions & 4 deletions kafka/test/localhost.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ describe('docker', function () {
it('connect', async () => {
const kafka_brokers_sasl = ['localhost:9092']// ['kafka1:9092', 'kafka2:9092','kafka3:9092']
const client = new KafkaManager(kafka_brokers_sasl)

await client.connect()
const topics = await client.listTopics()
const {admin} = client
await admin.connect()
const topics = await admin.listTopics()

assert.ok(topics.length ===0)
await client.disconnect()
await admin.disconnect()
})
})
8 changes: 4 additions & 4 deletions kafka/test/oracle.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ describe('Oracle Streaming', function () {

it('connect', async () => {
const client = new KafkaManager(kafka_brokers_sasl, {username, password})

await client.connect()
const topics = await client.listTopics()
const {admin} = client
await admin.connect()
const topics = await admin.listTopics()

assert.ok(topics.includes('streaming'))
await client.disconnect()
await admin.disconnect()
})
})

0 comments on commit 7f94c75

Please sign in to comment.