Skip to content

Commit

Permalink
test kafka:0.0.1
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkhala committed Jul 6, 2024
1 parent 0efd86a commit f283609
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 23 deletions.
27 changes: 12 additions & 15 deletions kafka/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,24 @@ export default class KafkaManager extends PubSub {
/**
*
* @param {string[]} brokers
* @param {string} [username]
* @param {string} [password]
* @param [ssl] default to true
* @param {string} [clientId] default to "kafkajs"
* @param logger
* @param sasl sasl config
* @param [logger]
*/
constructor(brokers, {username, password, clientId} = {}, logger) {
super({name: clientId, username, password}, undefined, logger);
constructor(brokers, {ssl=true, clientId='kafkajs', sasl} = {}, logger) {
super({
name: clientId,
username:sasl?sasl.username:undefined,
password:sasl?sasl.password:undefined,
}, undefined, logger);
const config = {
clientId,
brokers,
ssl,
sasl
};
if (username) {
Object.assign(config, {
ssl: true,
sasl: {
mechanism: 'plain', // scram-sha-256 or scram-sha-512
username,
password
},
});
}

this.kafkaManager = new Kafka(config);
}

Expand Down
7 changes: 7 additions & 0 deletions kafka/insecure.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import KafkaManager from './index.js';

export default class Insecure extends KafkaManager {
constructor(brokers) {
super(brokers, {ssl: false});
}
}
20 changes: 20 additions & 0 deletions kafka/plain.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import KafkaManager from './index.js';

export default class PlainSASL extends KafkaManager {
/**
*
* @param brokers
* @param username
* @param password
* @param [logger]
*/
constructor(brokers, {username, password}, logger) {
super(brokers, {
sasl: {
mechanism: 'plain',
username,
password
}
}, logger);
}
}
5 changes: 2 additions & 3 deletions kafka/test/IBM.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import KafkaManager from '../index.js';

import assert from 'assert';
import PlainSASL from '../plain.js';

const kafka_brokers_sasl = [
'broker-0-ln9xw23yp788ngy9.kafka.svc02.us-south.eventstreams.cloud.ibm.com:9093',
Expand All @@ -18,7 +17,7 @@ describe('IBM Event Stream', function () {
assert.ok(api_key, 'Missing api_key');

it('connect', async () => {
const client = new KafkaManager(kafka_brokers_sasl, {username, password: api_key});
const client = new PlainSASL(kafka_brokers_sasl, {username, password: api_key});
const {admin} = client;
await admin.connect();
const result = await admin.listTopics();
Expand Down
4 changes: 2 additions & 2 deletions kafka/test/localhost.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import KafkaManager from '../index.js';
import assert from 'assert';
import Insecure from '../insecure.js';

describe('docker', function () {
this.timeout(0);
it('connect', async () => {
const kafka_brokers_sasl = ['localhost:9092'];// ['kafka1:9092', 'kafka2:9092','kafka3:9092']
const client = new KafkaManager(kafka_brokers_sasl);
const client = new Insecure(kafka_brokers_sasl);
const {admin} = client;
await admin.connect();
const topics = await admin.listTopics();
Expand Down
5 changes: 2 additions & 3 deletions kafka/test/oracle.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import KafkaManager from '../index.js';
import assert from 'assert';
import PlainSASL from '../plain.js';

describe('Oracle Streaming', function () {
this.timeout(0);



const kafka_brokers_sasl = ['cell-1.streaming.ap-singapore-1.oci.oraclecloud.com:9092'];
// Your username must be in the format: tenancyName/domain/username/streamPool-ocid
const username = 'davidkhala2021/OracleIdentityCloudService/davidkhala@gmail.com/ocid1.streampool.oc1.ap-singapore-1.amaaaaaaulaazmqarbifzdl6n6ml2nfjykqbvawf6fnfwnistfwdvz32aqda';
Expand All @@ -18,7 +17,7 @@ describe('Oracle Streaming', function () {
}

it('connect', async () => {
const client = new KafkaManager(kafka_brokers_sasl, {username, password});
const client = new PlainSASL(kafka_brokers_sasl, {username, password});
const {admin} = client;
await admin.connect();
const topics = await admin.listTopics();
Expand Down

0 comments on commit f283609

Please sign in to comment.