diff --git a/.github/workflows/npmPublish.yml b/.github/workflows/npmPublish.yml index 0997069..66a1267 100644 --- a/.github/workflows/npmPublish.yml +++ b/.github/workflows/npmPublish.yml @@ -17,7 +17,6 @@ jobs: node-version: '20.x' registry-url: 'https://registry.npmjs.org' scope: '@sweet-security' - - run: echo ${{secrets.SWEET_SECURITY_NPM_TOKEN}} - run: npm ci - run: npm publish --provenance --access public env: diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..503a114 --- /dev/null +++ b/LICENSE @@ -0,0 +1,25 @@ +MIT License + +Includes code from @s3pweb/simple-kafka-promise +Copyright for portions of project Foo are held by [S3pweb, 2020] as part of project @s3pweb/simple-kafka-promise. +All other copyright for project @sweet-security/kafkas are held by [Sweet Security, 2024]. + +Copyright (c) 2024 sweet security + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/examples/consumer.js b/examples/consumer.js index ad80457..79bfe6a 100644 --- a/examples/consumer.js +++ b/examples/consumer.js @@ -1,6 +1,6 @@ "use strict"; -const KafkaConsumer = require("../dist/index").KafkaConsumer; +const KafkaConsumer = require("../dist/index").Consumer; function later(delay) { return new Promise(function (resolve) { diff --git a/examples/producer.js b/examples/producer.js index 61b1e44..bec5e98 100644 --- a/examples/producer.js +++ b/examples/producer.js @@ -1,6 +1,6 @@ "use strict"; -const KafkaProducer = require("../dist/index").KafkaProducer; +const KafkaProducer = require("../dist/index").Producer; function later(delay) { return new Promise(function (resolve) { diff --git a/package.json b/package.json index 6d7cc0e..4bf2abf 100644 --- a/package.json +++ b/package.json @@ -16,6 +16,7 @@ "type": "git", "url": "git+https://github.com/sweet-security/kafkas.git" }, + "license": "MIT", "dependencies": { "node-rdkafka": "3.1.0" }, diff --git a/src/index.ts b/src/index.ts index 7966215..cea94cf 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,7 +1,7 @@ -export { KafkaProducer } from "./lib/kafkaHighLevelProducer"; -export { KafkaConsumer } from "./lib/kafkaConsumer"; +export { Producer } from "./lib/producer"; +export { Consumer } from "./lib/consumer"; -export { KafkaNProducer } from "./lib/kafkaNProducer"; +export { NProducer } from "./lib/nProducer"; -export { KafkaConsumerMock } from "./mock/kafkaConsumer.mock"; -export { KafkaProducerMock } from "./mock/kafkaProducer.mock"; +export { ConsumerMock } from "./mock/consumer.mock"; +export { ProducerMock } from "./mock/producer.mock"; diff --git a/src/lib/kafkaConsumer.ts b/src/lib/consumer.ts similarity index 94% rename from src/lib/kafkaConsumer.ts rename to src/lib/consumer.ts index 926d124..65842b0 100644 --- a/src/lib/kafkaConsumer.ts +++ b/src/lib/consumer.ts @@ -1,17 +1,17 @@ import { ClientMetrics, - KafkaConsumer as Consumer, + KafkaConsumer, Message, Metadata, SubscribeTopicList, TopicPartitionOffset, WatermarkOffsets, } from "node-rdkafka"; -import { KafkaConsumerInterface } from "./kafkaConsumerInterface"; +import { IConsumer } from "./iConsumer"; import { ConsumerRunConfig } from "../types"; -export class KafkaConsumer implements KafkaConsumerInterface { - private readonly consumer: Consumer; +export class Consumer implements IConsumer { + private readonly consumer: KafkaConsumer; private readonly consumeTimeout: number; /** @@ -27,7 +27,7 @@ export class KafkaConsumer implements KafkaConsumerInterface { ...config, }; - this.consumer = new Consumer(consumerConfig, { + this.consumer = new KafkaConsumer(consumerConfig, { "auto.offset.reset": "earliest", }); } @@ -137,7 +137,7 @@ export class KafkaConsumer implements KafkaConsumerInterface { }); } - getConsumer(): Consumer { + getConsumer(): KafkaConsumer | null { return this.consumer; } diff --git a/src/lib/kafkaConsumerInterface.ts b/src/lib/iConsumer.ts similarity index 95% rename from src/lib/kafkaConsumerInterface.ts rename to src/lib/iConsumer.ts index bfbfda7..a71b79b 100644 --- a/src/lib/kafkaConsumerInterface.ts +++ b/src/lib/iConsumer.ts @@ -1,12 +1,12 @@ import { ClientMetrics, KafkaConsumer as Consumer, Message, Metadata, TopicPartitionOffset, WatermarkOffsets } from "node-rdkafka"; import { ConsumerRunConfig } from "../types"; -export interface KafkaConsumerInterface { +export interface IConsumer { /** * Connect consumer to kafka and subscribe to given topics * @param topics Array of topics */ - connect(topics): Promise; + connect(topics: string[]): Promise; /** * Disconnect the consumer from Kafka. diff --git a/src/lib/kafkaProducerInterface.ts b/src/lib/kafkaProducerInterface.ts index 8187be5..d2398d9 100644 --- a/src/lib/kafkaProducerInterface.ts +++ b/src/lib/kafkaProducerInterface.ts @@ -1,6 +1,7 @@ import { ClientMetrics, Metadata } from "node-rdkafka"; +import { ProducerRecord } from "../types"; -export interface KafkaProducerInterface { +export interface IProducer { /** * Connect the producer to kafka, will return broker's metadata or nothing if already connected. * @@ -18,14 +19,15 @@ export interface KafkaProducerInterface { /** * Send a message to Kafka and await ack. * - * @param topic Topic to send message to. - * If `kafka.producer.topicsPrefix` exist in config, the full topic will be `kafka.producer.topicsPrefix + topic` - * @param message Message to be sent (will be parsed with `JSON.stringify(...)` before). - * @param partition Topic partition. - * @param key Kafka key to be sent along the message. + * @param record - + * - topic: Topic to send message to. + * If `kafka.producer.topicsPrefix` exist in config, the full topic will be `kafka.producer.topicsPrefix + topic` + * - message: Message to be sent (will be parsed with `JSON.stringify(...)` before). + * - partition: Topic partition. + * - key: Kafka key to be sent along the message. * @return Message's offset */ - sendMessage(topic: string, message: any, partition: number, key: any): Promise; + send(record: ProducerRecord): Promise; /** * Send a buffer message to Kafka and await ack. diff --git a/src/lib/kafkaNProducer.ts b/src/lib/nProducer.ts similarity index 99% rename from src/lib/kafkaNProducer.ts rename to src/lib/nProducer.ts index 75accc9..6746d41 100644 --- a/src/lib/kafkaNProducer.ts +++ b/src/lib/nProducer.ts @@ -1,6 +1,6 @@ import { ClientMetrics, Metadata, Producer } from "node-rdkafka"; -export class KafkaNProducer { +export class NProducer { private connected: boolean; private indexMessage: number; private readonly prefix: string; diff --git a/src/lib/kafkaHighLevelProducer.ts b/src/lib/producer.ts similarity index 86% rename from src/lib/kafkaHighLevelProducer.ts rename to src/lib/producer.ts index 33da29d..ee6427c 100644 --- a/src/lib/kafkaHighLevelProducer.ts +++ b/src/lib/producer.ts @@ -1,7 +1,8 @@ import { ClientMetrics, HighLevelProducer, Metadata } from "node-rdkafka"; -import { KafkaProducerInterface } from "./kafkaProducerInterface"; +import { IProducer } from "./kafkaProducerInterface"; +import { ProducerRecord } from "../types"; -export class KafkaProducer implements KafkaProducerInterface { +export class Producer implements IProducer { private connected: boolean; private readonly prefix: string; private readonly producer: HighLevelProducer; @@ -28,7 +29,6 @@ export class KafkaProducer implements KafkaProducerInterface { // Do nothing if we are already connected resolve(null); } else { - // Fix for https://github.com/Blizzard/node-rdkafka/issues/600 this.producer.setValueSerializer((v) => v); this.producer.setKeySerializer((v) => v); @@ -59,9 +59,9 @@ export class KafkaProducer implements KafkaProducerInterface { }); } - sendMessage(topic: string, message: any, partition: number, key: any): Promise { - message = Buffer.from(JSON.stringify(message)); - return this.sendBufferMessage(topic, message, partition, key); + send(record: ProducerRecord): Promise { + const message = Buffer.from(JSON.stringify(record.message)); + return this.sendBufferMessage(record.topic, message, record.partition, record.key); } sendBufferMessage(topic: string, message: any, partition: number, key: any): Promise { diff --git a/src/mock/kafkaConsumer.mock.ts b/src/mock/consumer.mock.ts similarity index 79% rename from src/mock/kafkaConsumer.mock.ts rename to src/mock/consumer.mock.ts index ca2a3e1..5022f65 100644 --- a/src/mock/kafkaConsumer.mock.ts +++ b/src/mock/consumer.mock.ts @@ -1,9 +1,9 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ -import { ClientMetrics, KafkaConsumer as Consumer, Message, Metadata, TopicPartitionOffset, WatermarkOffsets } from "node-rdkafka"; -import { KafkaConsumerInterface } from "../lib/kafkaConsumerInterface"; +import { ClientMetrics, KafkaConsumer, Message, Metadata, TopicPartitionOffset, WatermarkOffsets } from "node-rdkafka"; +import { IConsumer } from "../lib/iConsumer"; import { ConsumerRunConfig } from "../types"; -export class KafkaConsumerMock implements KafkaConsumerInterface { +export class ConsumerMock implements IConsumer { constructor(config: any, timeoutMs?: number) { return; } @@ -40,7 +40,7 @@ export class KafkaConsumerMock implements KafkaConsumerInterface { return Promise.resolve({ highOffset: 100, lowOffset: 0 }); } - getConsumer(): Consumer { + getConsumer(): KafkaConsumer { return null; } diff --git a/src/mock/kafkaProducer.mock.ts b/src/mock/producer.mock.ts similarity index 74% rename from src/mock/kafkaProducer.mock.ts rename to src/mock/producer.mock.ts index 11a4ab3..03f789b 100644 --- a/src/mock/kafkaProducer.mock.ts +++ b/src/mock/producer.mock.ts @@ -1,8 +1,9 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ -import { KafkaProducerInterface } from "../lib/kafkaProducerInterface"; +import { IProducer } from "../lib/kafkaProducerInterface"; import { ClientMetrics, Metadata } from "node-rdkafka"; +import { ProducerRecord } from "../types"; -export class KafkaProducerMock implements KafkaProducerInterface { +export class ProducerMock implements IProducer { constructor(config: any, topicPrefix?: string) { return; } @@ -15,7 +16,7 @@ export class KafkaProducerMock implements KafkaProducerInterface { return Promise.resolve(null); } - sendMessage(topic: string, message: any, partition: number, key: any): Promise { + send(record: ProducerRecord): Promise { return Promise.resolve(0); } diff --git a/src/types.ts b/src/types.ts index f4db0d5..32f6790 100644 --- a/src/types.ts +++ b/src/types.ts @@ -12,3 +12,23 @@ export type ConsumerRunConfig = { eachBatch?: EachBatchHandler; eachMessage?: EachMessageHandler; }; + +export interface ProducerRecord { + topic: string; + message: Message; + ack?: number; + timeout?: number; + compression?: CompressionTypes; + partition?: number; + key?: string; +} + +export enum CompressionTypes { + None = 0, + GZIP = 1, + Snappy = 2, + LZ4 = 3, + ZSTD = 4, +} + +export { Message } from "node-rdkafka";