Skip to content

Commit

Permalink
Add Water RabbitConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Jul 16, 2024
1 parent ed1dcf6 commit 435d1fc
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class RabbitConnection(
// https://www.rabbitmq.com/docs/publishers#message-properties
deliveryMode(2) // Persistent
headers(headers.headers) // lol
messageId(headers.messageId)
}.build()
channelData.channel.basicPublish(topic, key, properties, value.toByteArray())
}
Expand Down
1 change: 1 addition & 0 deletions water/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"dist"
],
"dependencies": {
"rabbitmq-client": "^4.6.0",
"valibot": "^0.36.0"
},
"devDependencies": {
Expand Down
3 changes: 2 additions & 1 deletion water/src/broker/BrokerConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ export abstract class BrokerConnection {
this.topicListeners.clear();
}


// Internal API
public abstract abstractStart(): Promise<void>;
// Internal API
public abstract abstractSend(topic: string, key: string, value: string, headers: BrokerMessageHeaders): Promise<MessageId>;

public send(topic: string, key: string, value: string, headers: BrokerMessageHeaders): Promise<MessageId> {
Expand Down
144 changes: 144 additions & 0 deletions water/src/broker/rabbitmq/RabbitConnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import type { Consumer, Publisher } from "rabbitmq-client";
import { Connection } from "rabbitmq-client";
import type { MessageId } from "../BrokerConnection.js";
import { BrokerConnection } from "../BrokerConnection.js";
import { BrokerMessageHeaders } from "../BrokerMessageHeaders.js";
import { Logger } from "../../logging/Logger.js";

export class RabbitConnection extends BrokerConnection {

private static readonly TAG = "RabbitConnection";

public override supportsTopicHotSwap: boolean = true;
public override deferInitialTopicCreation: boolean = true;

private connection: Connection | null = null;
private readonly consumers: Map<string, Consumer> = new Map();
private readonly publishers: Map<string, Publisher> = new Map();

public constructor(
private readonly rabbitHosts: string[],
public override readonly serviceName: string,
public override readonly instanceId: string,
private readonly useTls: boolean = false,
private readonly username: string = "guest",
private readonly password: string = "guest",
) {
super();
}

public override async abstractStart(): Promise<void> {
this.connection = new Connection({
hosts: this.rabbitHosts,
username: this.username,
password: this.password,
tls: this.useTls,
connectionName: `${this.serviceName}: ${this.instanceId}`,
});
this.connection.on("connection.blocked", reason => {
Logger.warn(RabbitConnection.TAG, `RabbitMQ server blocked connection for reason: ${reason}`);
});
this.connection.on("connection.unblocked", () => {
Logger.info(RabbitConnection.TAG, "RabbitMQ server unblocked connection");
});
await new Promise<void>((resolve, reject) => {
// Use `on` instead of `once` to re-use the same logging code for future connection updates
this.connection!.on("connection", () => {
Logger.info(RabbitConnection.TAG, "RabbitMQ connection (re)established");
resolve();
});
this.connection!.on("error", error => {
Logger.error(RabbitConnection.TAG, "Error in RabbitMQ connection", error);
reject(error);
});
});
}

public override async destroy(): Promise<void> {
await super.destroy();
await this.connection?.close();
}

public override async abstractSend(topic: string, key: string, value: string, headers: BrokerMessageHeaders): Promise<MessageId> {
if (this.shouldDispatchExternallyAfterShortCircuit(topic, key, value, headers)) {
let publisher = this.publishers.get(topic);
if (!publisher) {
publisher = this
.ensureConnection()
.createPublisher(this.commonPublisherConsumerArguments(topic));
this.publishers.set(topic, publisher);
}
await publisher.send({
exchange: topic,
routingKey: key,
headers: headers.headers,
messageId: headers.messageId,
durable: true,
}, value);
}
return headers.messageId;
}

protected override createTopic(topic: string): void {
if (this.consumers.has(topic)) {
return;
}
const queueName = this.createQueueName(topic);
this.consumers.set(topic, this.ensureConnection().createConsumer({
queue: queueName,
...this.commonPublisherConsumerArguments(topic),
}, async (msg, reply) => {
const key = msg.routingKey;
const value = msg.body instanceof Buffer ? msg.body.toString("utf8") : String(msg.body);
const headers = new BrokerMessageHeaders(msg.headers ?? {});
this.dispatchIncomingMessage(topic, key, value, headers);
}));
}

protected override removeTopic(topic: string): void {
const consumer = this.consumers.get(topic);
if (consumer) {
consumer.close().catch(e => {
Logger.error(RabbitConnection.TAG, `Error closing consumer for topic ${topic}`, e);
});
this.consumers.delete(topic);
}
}

private ensureConnection(): Connection {
const connection = this.connection;
if (!connection) {
throw new Error("Connection not open");
}
return connection;
}

private createQueueName(topic: string): string {
return `${this.serviceName}.${this.instanceId}.${topic}`;
}

private commonPublisherConsumerArguments(topic: string) {
const exchangeName = topic;
const queueName = this.createQueueName(topic);
const routingKey = "#";
return {
queues: [{
queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
}],
exchanges: [{
exchange: exchangeName,
type: "topic",
durable: true,
}],
queueBindings: [{
queue: queueName,
exchange: exchangeName,
routingKey,
}],
};
}

}
1 change: 1 addition & 0 deletions water/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export * from "./broker/rpc/RpcClient.js";
export * from "./broker/rpc/RpcMessage.js";
export * from "./broker/rpc/RpcMessageHeaders.js";
export * from "./broker/rpc/RpcStatus.js";
export * from "./broker/rabbitmq/RabbitConnection.js";

export * from "./logging/Logger.js";

Expand Down
5 changes: 5 additions & 0 deletions water/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,11 @@ queue-microtask@^1.2.2:
resolved "https://registry.yarnpkg.com/queue-microtask/-/queue-microtask-1.2.3.tgz#4929228bbc724dfac43e0efb058caf7b6cfb6243"
integrity sha512-NuaNSa6flKT5JaSYQzJok04JzTL1CA6aGhv5rfLW3PgqA+M2ChpZQnAC8h8i4ZFkBS8X5RqkDBHA7r4hej3K9A==

rabbitmq-client@^4.6.0:
version "4.6.0"
resolved "https://registry.yarnpkg.com/rabbitmq-client/-/rabbitmq-client-4.6.0.tgz#7f8d777ade6079e894476c98c388a7560c692c64"
integrity sha512-RHt6Vz+nCzsMlSJ5HbMgxR1SMej1LtRVYCmbIWXfUhgeej3HshByQdVOirF195V3RVCRI3UHEec9PzgDQQtS8Q==

regexp.prototype.flags@^1.5.2:
version "1.5.2"
resolved "https://registry.yarnpkg.com/regexp.prototype.flags/-/regexp.prototype.flags-1.5.2.tgz#138f644a3350f981a858c44f6bb1a61ff59be334"
Expand Down

0 comments on commit 435d1fc

Please sign in to comment.