Skip to content

Commit

Permalink
Merge pull request #11 from sweet-security/npmrc
Browse files Browse the repository at this point in the history
Npmrc
  • Loading branch information
TsurEdoe authored Sep 8, 2024
2 parents 09a349a + 10e1ce8 commit 944bf5a
Show file tree
Hide file tree
Showing 14 changed files with 85 additions and 37 deletions.
1 change: 0 additions & 1 deletion .github/workflows/npmPublish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ jobs:
node-version: '20.x'
registry-url: 'https://registry.npmjs.org'
scope: '@sweet-security'
- run: echo ${{secrets.SWEET_SECURITY_NPM_TOKEN}}
- run: npm ci
- run: npm publish --provenance --access public
env:
Expand Down
25 changes: 25 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
MIT License

Includes code from @s3pweb/simple-kafka-promise
Copyright for portions of project Foo are held by [S3pweb, 2020] as part of project @s3pweb/simple-kafka-promise.
All other copyright for project @sweet-security/kafkas are held by [Sweet Security, 2024].

Copyright (c) 2024 sweet security

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
2 changes: 1 addition & 1 deletion examples/consumer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"use strict";

const KafkaConsumer = require("../dist/index").KafkaConsumer;
const KafkaConsumer = require("../dist/index").Consumer;

function later(delay) {
return new Promise(function (resolve) {
Expand Down
2 changes: 1 addition & 1 deletion examples/producer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"use strict";

const KafkaProducer = require("../dist/index").KafkaProducer;
const KafkaProducer = require("../dist/index").Producer;

function later(delay) {
return new Promise(function (resolve) {
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"type": "git",
"url": "git+https://github.com/sweet-security/kafkas.git"
},
"license": "MIT",
"dependencies": {
"node-rdkafka": "3.1.0"
},
Expand Down
10 changes: 5 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export { KafkaProducer } from "./lib/kafkaHighLevelProducer";
export { KafkaConsumer } from "./lib/kafkaConsumer";
export { Producer } from "./lib/producer";
export { Consumer } from "./lib/consumer";

export { KafkaNProducer } from "./lib/kafkaNProducer";
export { NProducer } from "./lib/nProducer";

export { KafkaConsumerMock } from "./mock/kafkaConsumer.mock";
export { KafkaProducerMock } from "./mock/kafkaProducer.mock";
export { ConsumerMock } from "./mock/consumer.mock";
export { ProducerMock } from "./mock/producer.mock";
12 changes: 6 additions & 6 deletions src/lib/kafkaConsumer.ts → src/lib/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import {
ClientMetrics,
KafkaConsumer as Consumer,
KafkaConsumer,
Message,
Metadata,
SubscribeTopicList,
TopicPartitionOffset,
WatermarkOffsets,
} from "node-rdkafka";
import { KafkaConsumerInterface } from "./kafkaConsumerInterface";
import { IConsumer } from "./iConsumer";
import { ConsumerRunConfig } from "../types";

export class KafkaConsumer implements KafkaConsumerInterface {
private readonly consumer: Consumer;
export class Consumer implements IConsumer {
private readonly consumer: KafkaConsumer;
private readonly consumeTimeout: number;

/**
Expand All @@ -27,7 +27,7 @@ export class KafkaConsumer implements KafkaConsumerInterface {
...config,
};

this.consumer = new Consumer(consumerConfig, {
this.consumer = new KafkaConsumer(consumerConfig, {
"auto.offset.reset": "earliest",
});
}
Expand Down Expand Up @@ -137,7 +137,7 @@ export class KafkaConsumer implements KafkaConsumerInterface {
});
}

getConsumer(): Consumer {
getConsumer(): KafkaConsumer | null {
return this.consumer;
}

Expand Down
4 changes: 2 additions & 2 deletions src/lib/kafkaConsumerInterface.ts → src/lib/iConsumer.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { ClientMetrics, KafkaConsumer as Consumer, Message, Metadata, TopicPartitionOffset, WatermarkOffsets } from "node-rdkafka";
import { ConsumerRunConfig } from "../types";

export interface KafkaConsumerInterface {
export interface IConsumer {
/**
* Connect consumer to kafka and subscribe to given topics
* @param topics Array of topics
*/
connect(topics): Promise<Metadata>;
connect(topics: string[]): Promise<Metadata>;

/**
* Disconnect the consumer from Kafka.
Expand Down
16 changes: 9 additions & 7 deletions src/lib/kafkaProducerInterface.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { ClientMetrics, Metadata } from "node-rdkafka";
import { ProducerRecord } from "../types";

export interface KafkaProducerInterface {
export interface IProducer {
/**
* Connect the producer to kafka, will return broker's metadata or nothing if already connected.
*
Expand All @@ -18,14 +19,15 @@ export interface KafkaProducerInterface {
/**
* Send a message to Kafka and await ack.
*
* @param topic Topic to send message to.
* If `kafka.producer.topicsPrefix` exist in config, the full topic will be `kafka.producer.topicsPrefix + topic`
* @param message Message to be sent (will be parsed with `JSON.stringify(...)` before).
* @param partition Topic partition.
* @param key Kafka key to be sent along the message.
* @param record -
* - topic: Topic to send message to.
* If `kafka.producer.topicsPrefix` exist in config, the full topic will be `kafka.producer.topicsPrefix + topic`
* - message: Message to be sent (will be parsed with `JSON.stringify(...)` before).
* - partition: Topic partition.
* - key: Kafka key to be sent along the message.
* @return Message's offset
*/
sendMessage(topic: string, message: any, partition: number, key: any): Promise<number>;
send(record: ProducerRecord): Promise<number>;

/**
* Send a buffer message to Kafka and await ack.
Expand Down
2 changes: 1 addition & 1 deletion src/lib/kafkaNProducer.ts → src/lib/nProducer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ClientMetrics, Metadata, Producer } from "node-rdkafka";

export class KafkaNProducer {
export class NProducer {
private connected: boolean;
private indexMessage: number;
private readonly prefix: string;
Expand Down
12 changes: 6 additions & 6 deletions src/lib/kafkaHighLevelProducer.ts → src/lib/producer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { ClientMetrics, HighLevelProducer, Metadata } from "node-rdkafka";
import { KafkaProducerInterface } from "./kafkaProducerInterface";
import { IProducer } from "./kafkaProducerInterface";
import { ProducerRecord } from "../types";

export class KafkaProducer implements KafkaProducerInterface {
export class Producer implements IProducer {
private connected: boolean;
private readonly prefix: string;
private readonly producer: HighLevelProducer;
Expand All @@ -28,7 +29,6 @@ export class KafkaProducer implements KafkaProducerInterface {
// Do nothing if we are already connected
resolve(null);
} else {
// Fix for https://github.com/Blizzard/node-rdkafka/issues/600
this.producer.setValueSerializer((v) => v);
this.producer.setKeySerializer((v) => v);

Expand Down Expand Up @@ -59,9 +59,9 @@ export class KafkaProducer implements KafkaProducerInterface {
});
}

sendMessage(topic: string, message: any, partition: number, key: any): Promise<number> {
message = Buffer.from(JSON.stringify(message));
return this.sendBufferMessage(topic, message, partition, key);
send(record: ProducerRecord): Promise<number> {
const message = Buffer.from(JSON.stringify(record.message));
return this.sendBufferMessage(record.topic, message, record.partition, record.key);
}

sendBufferMessage(topic: string, message: any, partition: number, key: any): Promise<number> {
Expand Down
8 changes: 4 additions & 4 deletions src/mock/kafkaConsumer.mock.ts → src/mock/consumer.mock.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
import { ClientMetrics, KafkaConsumer as Consumer, Message, Metadata, TopicPartitionOffset, WatermarkOffsets } from "node-rdkafka";
import { KafkaConsumerInterface } from "../lib/kafkaConsumerInterface";
import { ClientMetrics, KafkaConsumer, Message, Metadata, TopicPartitionOffset, WatermarkOffsets } from "node-rdkafka";
import { IConsumer } from "../lib/iConsumer";
import { ConsumerRunConfig } from "../types";

export class KafkaConsumerMock implements KafkaConsumerInterface {
export class ConsumerMock implements IConsumer {
constructor(config: any, timeoutMs?: number) {
return;
}
Expand Down Expand Up @@ -40,7 +40,7 @@ export class KafkaConsumerMock implements KafkaConsumerInterface {
return Promise.resolve({ highOffset: 100, lowOffset: 0 });
}

getConsumer(): Consumer {
getConsumer(): KafkaConsumer {
return null;
}

Expand Down
7 changes: 4 additions & 3 deletions src/mock/kafkaProducer.mock.ts → src/mock/producer.mock.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
import { KafkaProducerInterface } from "../lib/kafkaProducerInterface";
import { IProducer } from "../lib/kafkaProducerInterface";
import { ClientMetrics, Metadata } from "node-rdkafka";
import { ProducerRecord } from "../types";

export class KafkaProducerMock implements KafkaProducerInterface {
export class ProducerMock implements IProducer {
constructor(config: any, topicPrefix?: string) {
return;
}
Expand All @@ -15,7 +16,7 @@ export class KafkaProducerMock implements KafkaProducerInterface {
return Promise.resolve(null);
}

sendMessage(topic: string, message: any, partition: number, key: any): Promise<number> {
send(record: ProducerRecord): Promise<number> {
return Promise.resolve(0);
}

Expand Down
20 changes: 20 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,23 @@ export type ConsumerRunConfig = {
eachBatch?: EachBatchHandler;
eachMessage?: EachMessageHandler;
};

export interface ProducerRecord {
topic: string;
message: Message;
ack?: number;
timeout?: number;
compression?: CompressionTypes;
partition?: number;
key?: string;
}

export enum CompressionTypes {
None = 0,
GZIP = 1,
Snappy = 2,
LZ4 = 3,
ZSTD = 4,
}

export { Message } from "node-rdkafka";

0 comments on commit 944bf5a

Please sign in to comment.