diff --git a/latte/src/main/java/gg/beemo/latte/broker/rabbitmq/RabbitConnection.kt b/latte/src/main/java/gg/beemo/latte/broker/rabbitmq/RabbitConnection.kt index a882965..9098aca 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/rabbitmq/RabbitConnection.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/rabbitmq/RabbitConnection.kt @@ -66,6 +66,7 @@ class RabbitConnection( // https://www.rabbitmq.com/docs/publishers#message-properties deliveryMode(2) // Persistent headers(headers.headers) // lol + messageId(headers.messageId) }.build() channelData.channel.basicPublish(topic, key, properties, value.toByteArray()) } diff --git a/water/package.json b/water/package.json index fac73f9..68a9134 100644 --- a/water/package.json +++ b/water/package.json @@ -15,6 +15,7 @@ "dist" ], "dependencies": { + "rabbitmq-client": "^4.6.0", "valibot": "^0.36.0" }, "devDependencies": { diff --git a/water/src/broker/BrokerConnection.ts b/water/src/broker/BrokerConnection.ts index 35ed543..0d97e21 100644 --- a/water/src/broker/BrokerConnection.ts +++ b/water/src/broker/BrokerConnection.ts @@ -33,8 +33,9 @@ export abstract class BrokerConnection { this.topicListeners.clear(); } - + // Internal API public abstract abstractStart(): Promise; + // Internal API public abstract abstractSend(topic: string, key: string, value: string, headers: BrokerMessageHeaders): Promise; public send(topic: string, key: string, value: string, headers: BrokerMessageHeaders): Promise { diff --git a/water/src/broker/rabbitmq/RabbitConnection.ts b/water/src/broker/rabbitmq/RabbitConnection.ts new file mode 100644 index 0000000..3fb44b9 --- /dev/null +++ b/water/src/broker/rabbitmq/RabbitConnection.ts @@ -0,0 +1,144 @@ +import type { Consumer, Publisher } from "rabbitmq-client"; +import { Connection } from "rabbitmq-client"; +import type { MessageId } from "../BrokerConnection.js"; +import { BrokerConnection } from "../BrokerConnection.js"; +import { BrokerMessageHeaders } from "../BrokerMessageHeaders.js"; +import { Logger } from "../../logging/Logger.js"; + +export class RabbitConnection extends BrokerConnection { + + private static readonly TAG = "RabbitConnection"; + + public override supportsTopicHotSwap: boolean = true; + public override deferInitialTopicCreation: boolean = true; + + private connection: Connection | null = null; + private readonly consumers: Map = new Map(); + private readonly publishers: Map = new Map(); + + public constructor( + private readonly rabbitHosts: string[], + public override readonly serviceName: string, + public override readonly instanceId: string, + private readonly useTls: boolean = false, + private readonly username: string = "guest", + private readonly password: string = "guest", + ) { + super(); + } + + public override async abstractStart(): Promise { + this.connection = new Connection({ + hosts: this.rabbitHosts, + username: this.username, + password: this.password, + tls: this.useTls, + connectionName: `${this.serviceName}: ${this.instanceId}`, + }); + this.connection.on("connection.blocked", reason => { + Logger.warn(RabbitConnection.TAG, `RabbitMQ server blocked connection for reason: ${reason}`); + }); + this.connection.on("connection.unblocked", () => { + Logger.info(RabbitConnection.TAG, "RabbitMQ server unblocked connection"); + }); + await new Promise((resolve, reject) => { + // Use `on` instead of `once` to re-use the same logging code for future connection updates + this.connection!.on("connection", () => { + Logger.info(RabbitConnection.TAG, "RabbitMQ connection (re)established"); + resolve(); + }); + this.connection!.on("error", error => { + Logger.error(RabbitConnection.TAG, "Error in RabbitMQ connection", error); + reject(error); + }); + }); + } + + public override async destroy(): Promise { + await super.destroy(); + await this.connection?.close(); + } + + public override async abstractSend(topic: string, key: string, value: string, headers: BrokerMessageHeaders): Promise { + if (this.shouldDispatchExternallyAfterShortCircuit(topic, key, value, headers)) { + let publisher = this.publishers.get(topic); + if (!publisher) { + publisher = this + .ensureConnection() + .createPublisher(this.commonPublisherConsumerArguments(topic)); + this.publishers.set(topic, publisher); + } + await publisher.send({ + exchange: topic, + routingKey: key, + headers: headers.headers, + messageId: headers.messageId, + durable: true, + }, value); + } + return headers.messageId; + } + + protected override createTopic(topic: string): void { + if (this.consumers.has(topic)) { + return; + } + const queueName = this.createQueueName(topic); + this.consumers.set(topic, this.ensureConnection().createConsumer({ + queue: queueName, + ...this.commonPublisherConsumerArguments(topic), + }, async (msg, reply) => { + const key = msg.routingKey; + const value = msg.body instanceof Buffer ? msg.body.toString("utf8") : String(msg.body); + const headers = new BrokerMessageHeaders(msg.headers ?? {}); + this.dispatchIncomingMessage(topic, key, value, headers); + })); + } + + protected override removeTopic(topic: string): void { + const consumer = this.consumers.get(topic); + if (consumer) { + consumer.close().catch(e => { + Logger.error(RabbitConnection.TAG, `Error closing consumer for topic ${topic}`, e); + }); + this.consumers.delete(topic); + } + } + + private ensureConnection(): Connection { + const connection = this.connection; + if (!connection) { + throw new Error("Connection not open"); + } + return connection; + } + + private createQueueName(topic: string): string { + return `${this.serviceName}.${this.instanceId}.${topic}`; + } + + private commonPublisherConsumerArguments(topic: string) { + const exchangeName = topic; + const queueName = this.createQueueName(topic); + const routingKey = "#"; + return { + queues: [{ + queue: queueName, + durable: true, + exclusive: false, + autoDelete: false, + }], + exchanges: [{ + exchange: exchangeName, + type: "topic", + durable: true, + }], + queueBindings: [{ + queue: queueName, + exchange: exchangeName, + routingKey, + }], + }; + } + +} diff --git a/water/src/index.ts b/water/src/index.ts index 71977fb..f2b410f 100644 --- a/water/src/index.ts +++ b/water/src/index.ts @@ -11,6 +11,7 @@ export * from "./broker/rpc/RpcClient.js"; export * from "./broker/rpc/RpcMessage.js"; export * from "./broker/rpc/RpcMessageHeaders.js"; export * from "./broker/rpc/RpcStatus.js"; +export * from "./broker/rabbitmq/RabbitConnection.js"; export * from "./logging/Logger.js"; diff --git a/water/yarn.lock b/water/yarn.lock index 907d849..7c23e06 100644 --- a/water/yarn.lock +++ b/water/yarn.lock @@ -1299,6 +1299,11 @@ queue-microtask@^1.2.2: resolved "https://registry.yarnpkg.com/queue-microtask/-/queue-microtask-1.2.3.tgz#4929228bbc724dfac43e0efb058caf7b6cfb6243" integrity sha512-NuaNSa6flKT5JaSYQzJok04JzTL1CA6aGhv5rfLW3PgqA+M2ChpZQnAC8h8i4ZFkBS8X5RqkDBHA7r4hej3K9A== +rabbitmq-client@^4.6.0: + version "4.6.0" + resolved "https://registry.yarnpkg.com/rabbitmq-client/-/rabbitmq-client-4.6.0.tgz#7f8d777ade6079e894476c98c388a7560c692c64" + integrity sha512-RHt6Vz+nCzsMlSJ5HbMgxR1SMej1LtRVYCmbIWXfUhgeej3HshByQdVOirF195V3RVCRI3UHEec9PzgDQQtS8Q== + regexp.prototype.flags@^1.5.2: version "1.5.2" resolved "https://registry.yarnpkg.com/regexp.prototype.flags/-/regexp.prototype.flags-1.5.2.tgz#138f644a3350f981a858c44f6bb1a61ff59be334"