diff --git a/idea/api-gateway/src/rmq.ts b/idea/api-gateway/src/rmq.ts index 441fb2cb91..e4f49b4998 100644 --- a/idea/api-gateway/src/rmq.ts +++ b/idea/api-gateway/src/rmq.ts @@ -1,6 +1,6 @@ import { CronJob } from 'cron'; import { connect, Connection, Channel } from 'amqplib'; -import { logger, RMQExchanges, RMQMessage, RMQQueues, RMQReply, RMQServiceActions, RMQServices } from '@gear-js/common'; +import { logger, RMQExchange, RMQMessage, RMQQueue, RMQReply, RMQServiceAction, RMQServices } from '@gear-js/common'; import config from './config'; @@ -27,28 +27,28 @@ export class RMQService { } this.mainChannel = await this.connection.createChannel(); - await this.mainChannel.assertExchange(RMQExchanges.TOPIC_EX, 'topic', { durable: true }); - await this.mainChannel.assertExchange(RMQExchanges.DIRECT_EX, 'direct', { durable: true }); - await this.mainChannel.assertQueue(RMQQueues.REPLIES, { + await this.mainChannel.assertExchange(RMQExchange.TOPIC_EX, 'topic', { durable: true }); + await this.mainChannel.assertExchange(RMQExchange.DIRECT_EX, 'direct', { durable: true }); + await this.mainChannel.assertQueue(RMQQueue.REPLIES, { durable: true, exclusive: false, autoDelete: false, messageTtl: 30_000, }); - await this.mainChannel.bindQueue(RMQQueues.REPLIES, RMQExchanges.DIRECT_EX, RMQQueues.REPLIES); + await this.mainChannel.bindQueue(RMQQueue.REPLIES, RMQExchange.DIRECT_EX, RMQQueue.REPLIES); - await this.mainChannel.assertQueue(RMQQueues.GENESISES, { + await this.mainChannel.assertQueue(RMQQueue.GENESISES, { durable: true, exclusive: false, autoDelete: false, messageTtl: 30_000, }); - await this.mainChannel.bindQueue(RMQQueues.GENESISES, RMQExchanges.DIRECT_EX, RMQQueues.GENESISES); + await this.mainChannel.bindQueue(RMQQueue.GENESISES, RMQExchange.DIRECT_EX, RMQQueue.GENESISES); this.metaChannel = await this.connection.createChannel(); - this.metaChannel.assertExchange(RMQExchanges.DIRECT_EX, 'direct', { durable: true }); + this.metaChannel.assertExchange(RMQExchange.DIRECT_EX, 'direct', { durable: true }); await this.subscribeToGenesises(); await this.subscribeToReplies(); @@ -56,7 +56,7 @@ export class RMQService { private async subscribeToReplies(): Promise { await this.mainChannel.consume( - RMQQueues.REPLIES, + RMQQueue.REPLIES, (message) => { if (!message) { return; @@ -76,7 +76,7 @@ export class RMQService { private async subscribeToGenesises() { await this.mainChannel.consume( - RMQQueues.GENESISES, + RMQQueue.GENESISES, async (message) => { if (!message) { return; @@ -84,7 +84,7 @@ export class RMQService { const { genesis, service, action } = JSON.parse(message.content.toString()); - if (action === RMQServiceActions.ADD) { + if (action === RMQServiceAction.ADD) { if (service === RMQServices.INDEXER) { if (this.indexerChannels.has(genesis)) return; @@ -108,7 +108,7 @@ export class RMQService { } } - if (action === RMQServiceActions.DELETE) { + if (action === RMQServiceAction.DELETE) { if (service === RMQServices.INDEXER) { const channel = this.indexerChannels.get(genesis); if (channel) { @@ -133,21 +133,21 @@ export class RMQService { private async createChannel() { const channel = await this.connection.createChannel(); - channel.assertExchange(RMQExchanges.DIRECT_EX, 'direct', { durable: true }); + channel.assertExchange(RMQExchange.DIRECT_EX, 'direct', { durable: true }); return channel; } public sendMsgToIndexer({ genesis, params, correlationId, method }: RMQMessage) { const channel = this.indexerChannels.get(genesis); - channel.publish(RMQExchanges.DIRECT_EX, `${RMQServices.INDEXER}.${genesis}`, Buffer.from(JSON.stringify(params)), { + channel.publish(RMQExchange.DIRECT_EX, `${RMQServices.INDEXER}.${genesis}`, Buffer.from(JSON.stringify(params)), { correlationId, headers: { method }, }); } public sendMsgToMetaStorage({ params, correlationId, method }: RMQMessage) { - this.metaChannel.publish(RMQExchanges.DIRECT_EX, RMQServices.META_STORAGE, Buffer.from(JSON.stringify(params)), { + this.metaChannel.publish(RMQExchange.DIRECT_EX, RMQServices.META_STORAGE, Buffer.from(JSON.stringify(params)), { correlationId, headers: { method }, }); @@ -157,7 +157,7 @@ export class RMQService { const channel = this.tbChannels.get(genesis); channel.publish( - RMQExchanges.DIRECT_EX, + RMQExchange.DIRECT_EX, `${RMQServices.TEST_BALANCE}.${genesis}`, Buffer.from(JSON.stringify(params)), { @@ -168,11 +168,11 @@ export class RMQService { } public sendMsgIndexerGenesises() { - this.mainChannel.publish(RMQExchanges.TOPIC_EX, `${RMQServices.INDEXER}.genesises`, Buffer.from('')); + this.mainChannel.publish(RMQExchange.TOPIC_EX, `${RMQServices.INDEXER}.genesises`, Buffer.from('')); } public sendMsgTBGenesises() { - this.mainChannel.publish(RMQExchanges.TOPIC_EX, `${RMQServices.TEST_BALANCE}.genesises`, Buffer.from('')); + this.mainChannel.publish(RMQExchange.TOPIC_EX, `${RMQServices.TEST_BALANCE}.genesises`, Buffer.from('')); } public isExistTBChannel(genesis: string) { diff --git a/idea/common/src/enums/rmq.ts b/idea/common/src/enums/rmq.ts index 7804ef5484..79d840c0c5 100644 --- a/idea/common/src/enums/rmq.ts +++ b/idea/common/src/enums/rmq.ts @@ -1,11 +1,12 @@ -export enum RMQQueues { +export enum RMQQueue { GENESISES = 'genesises', REPLIES = 'replies', } -export enum RMQExchanges { +export enum RMQExchange { DIRECT_EX = 'direct.ex', TOPIC_EX = 'topic.ex', + INDXR_META = 'indxr.meta.ex', } export enum RMQServices { @@ -14,7 +15,7 @@ export enum RMQServices { META_STORAGE = 'meta', } -export enum RMQServiceActions { +export enum RMQServiceAction { ADD = 'add', DELETE = 'delete', } diff --git a/idea/indexer/src/gear/connect.ts b/idea/indexer/src/gear/connect.ts index c760a19753..ab4881de37 100644 --- a/idea/indexer/src/gear/connect.ts +++ b/idea/indexer/src/gear/connect.ts @@ -1,5 +1,5 @@ import { GearApi } from '@gear-js/api'; -import { RMQServiceActions, logger } from '@gear-js/common'; +import { RMQServiceAction, logger } from '@gear-js/common'; import config from '../config'; import { changeStatus } from '../healthcheck.server'; @@ -11,7 +11,7 @@ const addresses = config.gear.providerAddresses; const MAX_RECONNECTIONS = 10; let reconnectionsCounter = 0; -type GenesisCb = (action: RMQServiceActions, genesis: string) => void; +type GenesisCb = (action: RMQServiceAction, genesis: string) => void; let providerAddress = addresses[0]; export async function connectToNode(indexer: GearIndexer, cb: GenesisCb) { @@ -34,13 +34,13 @@ export async function connectToNode(indexer: GearIndexer, cb: GenesisCb) { api.on('disconnected', () => { logger.warn('Disconnected from the node.'); indexer.stop(); - genesis && cb(RMQServiceActions.DELETE, genesis); + genesis && cb(RMQServiceAction.DELETE, genesis); reconnect(api, indexer, cb); }); reconnectionsCounter = 0; await indexer.run(api); - cb(RMQServiceActions.ADD, genesis); + cb(RMQServiceAction.ADD, genesis); logger.info(`Connected to ${api.runtimeChain} with genesis ${genesis}`); changeStatus('gear'); } diff --git a/idea/indexer/src/main.ts b/idea/indexer/src/main.ts index bebb10ba8e..5b70646abb 100644 --- a/idea/indexer/src/main.ts +++ b/idea/indexer/src/main.ts @@ -1,5 +1,5 @@ import { waitReady } from '@polkadot/wasm-crypto'; -import { RMQServiceActions, logger } from '@gear-js/common'; +import { RMQServiceAction, logger } from '@gear-js/common'; import { changeStatus, runHealthcheckServer } from './healthcheck.server'; import { AppDataSource } from './database'; @@ -39,7 +39,7 @@ async function bootstrap() { const indexer = new GearIndexer(programService, messageService, codeService, blockService, rmq); await connectToNode(indexer, async (action, genesis) => { - if (action === RMQServiceActions.ADD) { + if (action === RMQServiceAction.ADD) { await rmq.addGenesisQueue(genesis); } else { await rmq.deleteGenesisQueue(genesis); diff --git a/idea/indexer/src/rmq.ts b/idea/indexer/src/rmq.ts index 889e4dd0b3..a1b977d238 100644 --- a/idea/indexer/src/rmq.ts +++ b/idea/indexer/src/rmq.ts @@ -1,12 +1,12 @@ import { Channel, connect, Connection } from 'amqplib'; import { INDEXER_METHODS, - RMQServiceActions, + RMQServiceAction, RMQServices, FormResponse, META_STORAGE_INTERNAL_METHODS, - RMQExchanges, - RMQQueues, + RMQExchange, + RMQQueue, INDEXER_INTERNAL_METHODS, logger, } from '@gear-js/common'; @@ -61,33 +61,28 @@ export class RMQService { this.mainChannel = await this.connection.createChannel(); this.topicChannel = await this.connection.createChannel(); - const directExchange = RMQExchanges.DIRECT_EX; - const topicExchange = RMQExchanges.TOPIC_EX; - const directExchangeType = 'direct'; + await this.mainChannel.assertExchange(RMQExchange.DIRECT_EX, 'direct'); + await this.topicChannel.assertExchange(RMQExchange.TOPIC_EX, 'topic'); - await this.mainChannel.assertExchange(directExchange, directExchangeType); - await this.topicChannel.assertExchange(topicExchange, 'topic'); - await this.topicChannel.assertQueue(`${RMQServices.INDEXER}.meta`, { + await this.mainChannel.assertExchange('INDXR_META', 'fanout', { autoDelete: true }); + await this.mainChannel.assertQueue('', { durable: true, exclusive: false, autoDelete: false, }); - await this.topicChannel.bindQueue( - `${RMQServices.INDEXER}.meta`, - RMQExchanges.TOPIC_EX, - `${RMQServices.INDEXER}.meta`, - ); + await this.mainChannel.bindQueue('', 'INDXR_META', ''); + this.metaMsgConsumer(); } catch (error) { - logger.error('Unable to setup rabbitmq exchanges', { error }); + logger.error('Failed to setup rabbitmq exchanges', { error }); throw error; } } public async deleteGenesisQueue(genesis: string) { const routingKey = `${RMQServices.INDEXER}.${genesis}`; - const messageBuff = JSON.stringify({ service: RMQServices.INDEXER, action: RMQServiceActions.DELETE, genesis }); - await this.mainChannel.unbindQueue(routingKey, RMQExchanges.DIRECT_EX, routingKey); - this.mainChannel.publish(RMQExchanges.DIRECT_EX, RMQQueues.GENESISES, Buffer.from(messageBuff)); + const messageBuff = JSON.stringify({ service: RMQServices.INDEXER, action: RMQServiceAction.DELETE, genesis }); + await this.mainChannel.unbindQueue(routingKey, RMQExchange.DIRECT_EX, routingKey); + this.mainChannel.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESISES, Buffer.from(messageBuff)); } public async addGenesisQueue(genesis: string) { @@ -97,6 +92,7 @@ export class RMQService { exclusive: false, autoDelete: true, }); + await this.mainChannel.bindQueue(genesisQ, RMQExchange.DIRECT_EX, genesisQ); const topicQ = `${RMQServices.INDEXER}t.${genesis}`; await this.topicChannel.assertQueue(topicQ, { @@ -104,15 +100,13 @@ export class RMQService { exclusive: false, autoDelete: true, }); - await this.mainChannel.bindQueue(genesisQ, RMQExchanges.DIRECT_EX, genesisQ); - await this.topicChannel.bindQueue(topicQ, RMQExchanges.TOPIC_EX, `${RMQServices.INDEXER}.genesises`); + await this.topicChannel.bindQueue(topicQ, RMQExchange.TOPIC_EX, `${RMQServices.INDEXER}.genesises`); await this.directMsgConsumer(genesisQ); - await this.metaMsgConsumer(); await this.genesisesMsgConsumer(topicQ, genesis); - const msgBuff = JSON.stringify({ service: RMQServices.INDEXER, action: RMQServiceActions.ADD, genesis }); - this.mainChannel.publish(RMQExchanges.DIRECT_EX, RMQQueues.GENESISES, Buffer.from(msgBuff)); + const msgBuff = JSON.stringify({ service: RMQServices.INDEXER, action: RMQServiceAction.ADD, genesis }); + this.mainChannel.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESISES, Buffer.from(msgBuff)); } private sendMsg(exchange: string, queue: string, params: any, correlationId?: string, method?: string): void { @@ -121,9 +115,15 @@ export class RMQService { } private async metaMsgConsumer(): Promise { + const exchange = 'indxr_meta'; + const channel = await this.connection.createChannel(); + await channel.assertExchange(exchange, 'fanout', { autoDelete: true }); + const q = await channel.assertQueue('', { exclusive: false }); + await channel.bindQueue(q.queue, exchange, ''); + try { - await this.topicChannel.consume( - `${RMQServices.INDEXER}.meta`, + await channel.consume( + q.queue, async (msg) => { if (!msg) { return; @@ -155,7 +155,7 @@ export class RMQService { const result = await this.handleIncomingMsg(method, params); - this.sendMsg(RMQExchanges.DIRECT_EX, RMQQueues.REPLIES, result, correlationId); + this.sendMsg(RMQExchange.DIRECT_EX, RMQQueue.REPLIES, result, correlationId); }, { noAck: true }, ); @@ -173,8 +173,8 @@ export class RMQService { return; } - const messageBuff = JSON.stringify({ service: RMQServices.INDEXER, action: RMQServiceActions.ADD, genesis }); - this.mainChannel.publish(RMQExchanges.DIRECT_EX, RMQQueues.GENESISES, Buffer.from(messageBuff)); + const messageBuff = JSON.stringify({ service: RMQServices.INDEXER, action: RMQServiceAction.ADD, genesis }); + this.mainChannel.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESISES, Buffer.from(messageBuff)); }, { noAck: true }, ); @@ -191,7 +191,7 @@ export class RMQService { public async sendMsgToMetaStorage(metahashes: Map>) { const msg = Array.from(metahashes.entries()).map(([key, value]) => [key, Array.from(value.values())]); return this.sendMsg( - RMQExchanges.DIRECT_EX, + RMQExchange.DIRECT_EX, RMQServices.META_STORAGE, msg, null, diff --git a/idea/meta-storage/src/rmq.ts b/idea/meta-storage/src/rmq.ts index b40a2070ae..95e0858f12 100644 --- a/idea/meta-storage/src/rmq.ts +++ b/idea/meta-storage/src/rmq.ts @@ -4,10 +4,10 @@ import { RMQServices, META_STORAGE_METHODS, META_STORAGE_INTERNAL_METHODS, - RMQExchanges, - RMQQueues, INDEXER_INTERNAL_METHODS, logger, + RMQExchange, + RMQQueue, } from '@gear-js/common'; import config from './config'; @@ -15,6 +15,7 @@ import { MetaService } from './service'; export class RMQService { private channel: Channel; + private indxrChannel: Channel; private connection: Connection; private methods: Record Promise>; @@ -32,51 +33,45 @@ export class RMQService { try { this.channel = await this.connection.createChannel(); - const directExchange = RMQExchanges.DIRECT_EX; - const directExchangeType = 'direct'; - - await this.channel.assertExchange(directExchange, directExchangeType); - await this.channel.assertQueue(RMQServices.META_STORAGE, { - durable: true, - exclusive: false, - autoDelete: false, - }); - - await this.channel.assertExchange(RMQExchanges.TOPIC_EX, 'topic', { durable: true }); - await this.channel.bindQueue(RMQServices.META_STORAGE, RMQExchanges.DIRECT_EX, RMQServices.META_STORAGE); - - await this.directMsgConsumer(RMQServices.META_STORAGE); + await this.setupMsgConsumer(); + await this.setupIndxrExchange(); this.connection.on('close', (error) => { logger.error('RabbitMQ connection closed', { error }); process.exit(1); }); } catch (error) { - logger.error('Unable to setup rabbitmq exchanges', { error, stack: error.stack }); + logger.error('Failed to setup rabbitmq exchanges', { error, stack: error.stack }); throw error; } } - private sendMsg( - exchange: RMQExchanges, - queue: RMQQueues, - params: any, - correlationId?: string, - method?: string, - ): void { + private async setupIndxrExchange() { + this.indxrChannel = await this.connection.createChannel(); + this.indxrChannel.assertExchange(RMQExchange.INDXR_META, 'fanout', { autoDelete: true }); + } + + private sendMsg(exchange: RMQExchange, queue: RMQQueue, params: any, correlationId?: string, method?: string): void { const messageBuff = JSON.stringify(params); this.channel.publish(exchange, queue, Buffer.from(messageBuff), { correlationId, headers: { method } }); } private sendMsgToIndxrTopic(params: any, method: string) { const msgBuf = Buffer.from(JSON.stringify(params)); - this.channel.publish(RMQExchanges.TOPIC_EX, `${RMQServices.INDEXER}.meta`, msgBuf, { headers: { method } }); + this.indxrChannel.publish('indxr_meta', '', msgBuf, { headers: { method } }); } - private async directMsgConsumer(queue: string): Promise { + private async setupMsgConsumer(): Promise { + await this.channel.assertExchange(RMQExchange.DIRECT_EX, 'direct'); + const q = await this.channel.assertQueue(RMQServices.META_STORAGE, { + durable: true, + exclusive: false, + autoDelete: false, + }); + await this.channel.bindQueue(q.queue, RMQExchange.DIRECT_EX, RMQServices.META_STORAGE); try { await this.channel.consume( - queue, + q.queue, async (msg) => { if (!msg) { return; @@ -88,7 +83,7 @@ export class RMQService { const result = await this.handleIncomingMsg(method, params); - this.sendMsg(RMQExchanges.DIRECT_EX, RMQQueues.REPLIES, result, correlationId); + this.sendMsg(RMQExchange.DIRECT_EX, RMQQueue.REPLIES, result, correlationId); }, { noAck: true }, ); diff --git a/idea/test-balance/src/common/transfer-balance-process.ts b/idea/test-balance/src/common/transfer-balance-process.ts index b34285af60..b96e7fd9ab 100644 --- a/idea/test-balance/src/common/transfer-balance-process.ts +++ b/idea/test-balance/src/common/transfer-balance-process.ts @@ -1,4 +1,4 @@ -import { logger, JSONRPC_ERRORS, RMQExchanges, RMQQueues } from '@gear-js/common'; +import { logger, JSONRPC_ERRORS, RMQExchange, RMQQueue } from '@gear-js/common'; import { EventEmitter } from 'node:events'; import { transferService } from '../transfer.service'; @@ -54,6 +54,6 @@ export async function transferProcess(): Promise { logger.error(error.message, { stack: error.stack }); result = { error: JSONRPC_ERRORS.InternalError.name }; } - producer.sendMessage(RMQExchanges.DIRECT_EX, RMQQueues.REPLIES, correlationId, result); + producer.sendMessage(RMQExchange.DIRECT_EX, RMQQueue.REPLIES, correlationId, result); } } diff --git a/idea/test-balance/src/rabbitmq/producer.ts b/idea/test-balance/src/rabbitmq/producer.ts index cb6d78c689..88caccedec 100644 --- a/idea/test-balance/src/rabbitmq/producer.ts +++ b/idea/test-balance/src/rabbitmq/producer.ts @@ -1,18 +1,18 @@ -import { RMQExchanges, RMQQueues, RMQServiceActions, RMQServices } from '@gear-js/common'; +import { RMQExchange, RMQQueue, RMQServiceAction, RMQServices } from '@gear-js/common'; import { mainChannelAMQP } from './rmq'; function sendGenesis(genesis: string): void { - const messageBuff = JSON.stringify({ service: RMQServices.TEST_BALANCE, action: RMQServiceActions.ADD, genesis }); - mainChannelAMQP.publish(RMQExchanges.DIRECT_EX, RMQQueues.GENESISES, Buffer.from(messageBuff)); + const messageBuff = JSON.stringify({ service: RMQServices.TEST_BALANCE, action: RMQServiceAction.ADD, genesis }); + mainChannelAMQP.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESISES, Buffer.from(messageBuff)); } function sendDeleteGenesis(genesis: string): void { - const messageBuff = JSON.stringify({ service: RMQServices.TEST_BALANCE, action: RMQServiceActions.DELETE, genesis }); - mainChannelAMQP.publish(RMQExchanges.DIRECT_EX, RMQQueues.GENESISES, Buffer.from(messageBuff)); + const messageBuff = JSON.stringify({ service: RMQServices.TEST_BALANCE, action: RMQServiceAction.DELETE, genesis }); + mainChannelAMQP.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESISES, Buffer.from(messageBuff)); } -function sendMessage(exchange: RMQExchanges, queue: RMQQueues, correlationId: string, params: any): void { +function sendMessage(exchange: RMQExchange, queue: RMQQueue, correlationId: string, params: any): void { const messageBuff = JSON.stringify(params); mainChannelAMQP.publish(exchange, queue, Buffer.from(messageBuff), { correlationId, diff --git a/idea/test-balance/src/rabbitmq/rmq.ts b/idea/test-balance/src/rabbitmq/rmq.ts index 766544f158..74d47f5dfc 100644 --- a/idea/test-balance/src/rabbitmq/rmq.ts +++ b/idea/test-balance/src/rabbitmq/rmq.ts @@ -1,5 +1,5 @@ import { Channel, connect, Connection } from 'amqplib'; -import { logger, RMQExchanges, RMQQueues, RMQServiceActions, RMQServices } from '@gear-js/common'; +import { logger, RMQExchange, RMQQueue, RMQServiceAction, RMQServices } from '@gear-js/common'; import config from '../config/configuration'; import { gearService } from '../gear'; @@ -18,16 +18,16 @@ export async function initAMQ(): Promise { mainChannelAMQP = await connectionAMQP.createChannel(); topicChannelAMQP = await connectionAMQP.createChannel(); - const directExchange = RMQExchanges.DIRECT_EX; - const topicExchange = RMQExchanges.TOPIC_EX; + const directExchange = RMQExchange.DIRECT_EX; + const topicExchange = RMQExchange.TOPIC_EX; const genesis = gearService.getGenesisHash(); const directExchangeType = 'direct'; const topicExchangeType = 'topic'; const routingKey = `${RMQServices.TEST_BALANCE}.${genesis}`; //send genesis - const messageBuff = JSON.stringify({ service: RMQServices.TEST_BALANCE, action: RMQServiceActions.ADD, genesis }); - mainChannelAMQP.publish(directExchange, RMQQueues.GENESISES, Buffer.from(messageBuff)); + const messageBuff = JSON.stringify({ service: RMQServices.TEST_BALANCE, action: RMQServiceAction.ADD, genesis }); + mainChannelAMQP.publish(directExchange, RMQQueue.GENESISES, Buffer.from(messageBuff)); await mainChannelAMQP.assertExchange(directExchange, directExchangeType); await mainChannelAMQP.assertExchange(topicExchange, topicExchangeType);