Skip to content

Commit

Permalink
[gear-idea] Fix sending messages from meta-storage to indexer (#1356)
Browse files Browse the repository at this point in the history
  • Loading branch information
osipov-mit authored Aug 24, 2023
1 parent 10fa6c5 commit 85efb59
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 97 deletions.
36 changes: 18 additions & 18 deletions idea/api-gateway/src/rmq.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { CronJob } from 'cron';
import { connect, Connection, Channel } from 'amqplib';
import { logger, RMQExchanges, RMQMessage, RMQQueues, RMQReply, RMQServiceActions, RMQServices } from '@gear-js/common';
import { logger, RMQExchange, RMQMessage, RMQQueue, RMQReply, RMQServiceAction, RMQServices } from '@gear-js/common';

import config from './config';

Expand All @@ -27,36 +27,36 @@ export class RMQService {
}

this.mainChannel = await this.connection.createChannel();
await this.mainChannel.assertExchange(RMQExchanges.TOPIC_EX, 'topic', { durable: true });
await this.mainChannel.assertExchange(RMQExchanges.DIRECT_EX, 'direct', { durable: true });
await this.mainChannel.assertQueue(RMQQueues.REPLIES, {
await this.mainChannel.assertExchange(RMQExchange.TOPIC_EX, 'topic', { durable: true });
await this.mainChannel.assertExchange(RMQExchange.DIRECT_EX, 'direct', { durable: true });
await this.mainChannel.assertQueue(RMQQueue.REPLIES, {
durable: true,
exclusive: false,
autoDelete: false,
messageTtl: 30_000,
});

await this.mainChannel.bindQueue(RMQQueues.REPLIES, RMQExchanges.DIRECT_EX, RMQQueues.REPLIES);
await this.mainChannel.bindQueue(RMQQueue.REPLIES, RMQExchange.DIRECT_EX, RMQQueue.REPLIES);

await this.mainChannel.assertQueue(RMQQueues.GENESISES, {
await this.mainChannel.assertQueue(RMQQueue.GENESISES, {
durable: true,
exclusive: false,
autoDelete: false,
messageTtl: 30_000,
});

await this.mainChannel.bindQueue(RMQQueues.GENESISES, RMQExchanges.DIRECT_EX, RMQQueues.GENESISES);
await this.mainChannel.bindQueue(RMQQueue.GENESISES, RMQExchange.DIRECT_EX, RMQQueue.GENESISES);

this.metaChannel = await this.connection.createChannel();
this.metaChannel.assertExchange(RMQExchanges.DIRECT_EX, 'direct', { durable: true });
this.metaChannel.assertExchange(RMQExchange.DIRECT_EX, 'direct', { durable: true });

await this.subscribeToGenesises();
await this.subscribeToReplies();
}

private async subscribeToReplies(): Promise<void> {
await this.mainChannel.consume(
RMQQueues.REPLIES,
RMQQueue.REPLIES,
(message) => {
if (!message) {
return;
Expand All @@ -76,15 +76,15 @@ export class RMQService {

private async subscribeToGenesises() {
await this.mainChannel.consume(
RMQQueues.GENESISES,
RMQQueue.GENESISES,
async (message) => {
if (!message) {
return;
}

const { genesis, service, action } = JSON.parse(message.content.toString());

if (action === RMQServiceActions.ADD) {
if (action === RMQServiceAction.ADD) {
if (service === RMQServices.INDEXER) {
if (this.indexerChannels.has(genesis)) return;

Expand All @@ -108,7 +108,7 @@ export class RMQService {
}
}

if (action === RMQServiceActions.DELETE) {
if (action === RMQServiceAction.DELETE) {
if (service === RMQServices.INDEXER) {
const channel = this.indexerChannels.get(genesis);
if (channel) {
Expand All @@ -133,21 +133,21 @@ export class RMQService {

private async createChannel() {
const channel = await this.connection.createChannel();
channel.assertExchange(RMQExchanges.DIRECT_EX, 'direct', { durable: true });
channel.assertExchange(RMQExchange.DIRECT_EX, 'direct', { durable: true });
return channel;
}

public sendMsgToIndexer({ genesis, params, correlationId, method }: RMQMessage) {
const channel = this.indexerChannels.get(genesis);

channel.publish(RMQExchanges.DIRECT_EX, `${RMQServices.INDEXER}.${genesis}`, Buffer.from(JSON.stringify(params)), {
channel.publish(RMQExchange.DIRECT_EX, `${RMQServices.INDEXER}.${genesis}`, Buffer.from(JSON.stringify(params)), {
correlationId,
headers: { method },
});
}

public sendMsgToMetaStorage({ params, correlationId, method }: RMQMessage) {
this.metaChannel.publish(RMQExchanges.DIRECT_EX, RMQServices.META_STORAGE, Buffer.from(JSON.stringify(params)), {
this.metaChannel.publish(RMQExchange.DIRECT_EX, RMQServices.META_STORAGE, Buffer.from(JSON.stringify(params)), {
correlationId,
headers: { method },
});
Expand All @@ -157,7 +157,7 @@ export class RMQService {
const channel = this.tbChannels.get(genesis);

channel.publish(
RMQExchanges.DIRECT_EX,
RMQExchange.DIRECT_EX,
`${RMQServices.TEST_BALANCE}.${genesis}`,
Buffer.from(JSON.stringify(params)),
{
Expand All @@ -168,11 +168,11 @@ export class RMQService {
}

public sendMsgIndexerGenesises() {
this.mainChannel.publish(RMQExchanges.TOPIC_EX, `${RMQServices.INDEXER}.genesises`, Buffer.from(''));
this.mainChannel.publish(RMQExchange.TOPIC_EX, `${RMQServices.INDEXER}.genesises`, Buffer.from(''));
}

public sendMsgTBGenesises() {
this.mainChannel.publish(RMQExchanges.TOPIC_EX, `${RMQServices.TEST_BALANCE}.genesises`, Buffer.from(''));
this.mainChannel.publish(RMQExchange.TOPIC_EX, `${RMQServices.TEST_BALANCE}.genesises`, Buffer.from(''));
}

public isExistTBChannel(genesis: string) {
Expand Down
7 changes: 4 additions & 3 deletions idea/common/src/enums/rmq.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
export enum RMQQueues {
export enum RMQQueue {
GENESISES = 'genesises',
REPLIES = 'replies',
}

export enum RMQExchanges {
export enum RMQExchange {
DIRECT_EX = 'direct.ex',
TOPIC_EX = 'topic.ex',
INDXR_META = 'indxr.meta.ex',
}

export enum RMQServices {
Expand All @@ -14,7 +15,7 @@ export enum RMQServices {
META_STORAGE = 'meta',
}

export enum RMQServiceActions {
export enum RMQServiceAction {
ADD = 'add',
DELETE = 'delete',
}
8 changes: 4 additions & 4 deletions idea/indexer/src/gear/connect.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { GearApi } from '@gear-js/api';
import { RMQServiceActions, logger } from '@gear-js/common';
import { RMQServiceAction, logger } from '@gear-js/common';

import config from '../config';
import { changeStatus } from '../healthcheck.server';
Expand All @@ -11,7 +11,7 @@ const addresses = config.gear.providerAddresses;
const MAX_RECONNECTIONS = 10;
let reconnectionsCounter = 0;

type GenesisCb = (action: RMQServiceActions, genesis: string) => void;
type GenesisCb = (action: RMQServiceAction, genesis: string) => void;
let providerAddress = addresses[0];

export async function connectToNode(indexer: GearIndexer, cb: GenesisCb) {
Expand All @@ -34,13 +34,13 @@ export async function connectToNode(indexer: GearIndexer, cb: GenesisCb) {
api.on('disconnected', () => {
logger.warn('Disconnected from the node.');
indexer.stop();
genesis && cb(RMQServiceActions.DELETE, genesis);
genesis && cb(RMQServiceAction.DELETE, genesis);
reconnect(api, indexer, cb);
});

reconnectionsCounter = 0;
await indexer.run(api);
cb(RMQServiceActions.ADD, genesis);
cb(RMQServiceAction.ADD, genesis);
logger.info(`Connected to ${api.runtimeChain} with genesis ${genesis}`);
changeStatus('gear');
}
Expand Down
4 changes: 2 additions & 2 deletions idea/indexer/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { waitReady } from '@polkadot/wasm-crypto';
import { RMQServiceActions, logger } from '@gear-js/common';
import { RMQServiceAction, logger } from '@gear-js/common';

import { changeStatus, runHealthcheckServer } from './healthcheck.server';
import { AppDataSource } from './database';
Expand Down Expand Up @@ -39,7 +39,7 @@ async function bootstrap() {
const indexer = new GearIndexer(programService, messageService, codeService, blockService, rmq);

await connectToNode(indexer, async (action, genesis) => {
if (action === RMQServiceActions.ADD) {
if (action === RMQServiceAction.ADD) {
await rmq.addGenesisQueue(genesis);
} else {
await rmq.deleteGenesisQueue(genesis);
Expand Down
58 changes: 29 additions & 29 deletions idea/indexer/src/rmq.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { Channel, connect, Connection } from 'amqplib';
import {
INDEXER_METHODS,
RMQServiceActions,
RMQServiceAction,
RMQServices,
FormResponse,
META_STORAGE_INTERNAL_METHODS,
RMQExchanges,
RMQQueues,
RMQExchange,
RMQQueue,
INDEXER_INTERNAL_METHODS,
logger,
} from '@gear-js/common';
Expand Down Expand Up @@ -61,33 +61,28 @@ export class RMQService {
this.mainChannel = await this.connection.createChannel();
this.topicChannel = await this.connection.createChannel();

const directExchange = RMQExchanges.DIRECT_EX;
const topicExchange = RMQExchanges.TOPIC_EX;
const directExchangeType = 'direct';
await this.mainChannel.assertExchange(RMQExchange.DIRECT_EX, 'direct');
await this.topicChannel.assertExchange(RMQExchange.TOPIC_EX, 'topic');

await this.mainChannel.assertExchange(directExchange, directExchangeType);
await this.topicChannel.assertExchange(topicExchange, 'topic');
await this.topicChannel.assertQueue(`${RMQServices.INDEXER}.meta`, {
await this.mainChannel.assertExchange('INDXR_META', 'fanout', { autoDelete: true });
await this.mainChannel.assertQueue('', {
durable: true,
exclusive: false,
autoDelete: false,
});
await this.topicChannel.bindQueue(
`${RMQServices.INDEXER}.meta`,
RMQExchanges.TOPIC_EX,
`${RMQServices.INDEXER}.meta`,
);
await this.mainChannel.bindQueue('', 'INDXR_META', '');
this.metaMsgConsumer();
} catch (error) {
logger.error('Unable to setup rabbitmq exchanges', { error });
logger.error('Failed to setup rabbitmq exchanges', { error });
throw error;
}
}

public async deleteGenesisQueue(genesis: string) {
const routingKey = `${RMQServices.INDEXER}.${genesis}`;
const messageBuff = JSON.stringify({ service: RMQServices.INDEXER, action: RMQServiceActions.DELETE, genesis });
await this.mainChannel.unbindQueue(routingKey, RMQExchanges.DIRECT_EX, routingKey);
this.mainChannel.publish(RMQExchanges.DIRECT_EX, RMQQueues.GENESISES, Buffer.from(messageBuff));
const messageBuff = JSON.stringify({ service: RMQServices.INDEXER, action: RMQServiceAction.DELETE, genesis });
await this.mainChannel.unbindQueue(routingKey, RMQExchange.DIRECT_EX, routingKey);
this.mainChannel.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESISES, Buffer.from(messageBuff));
}

public async addGenesisQueue(genesis: string) {
Expand All @@ -97,22 +92,21 @@ export class RMQService {
exclusive: false,
autoDelete: true,
});
await this.mainChannel.bindQueue(genesisQ, RMQExchange.DIRECT_EX, genesisQ);

const topicQ = `${RMQServices.INDEXER}t.${genesis}`;
await this.topicChannel.assertQueue(topicQ, {
durable: false,
exclusive: false,
autoDelete: true,
});
await this.mainChannel.bindQueue(genesisQ, RMQExchanges.DIRECT_EX, genesisQ);
await this.topicChannel.bindQueue(topicQ, RMQExchanges.TOPIC_EX, `${RMQServices.INDEXER}.genesises`);
await this.topicChannel.bindQueue(topicQ, RMQExchange.TOPIC_EX, `${RMQServices.INDEXER}.genesises`);

await this.directMsgConsumer(genesisQ);
await this.metaMsgConsumer();
await this.genesisesMsgConsumer(topicQ, genesis);

const msgBuff = JSON.stringify({ service: RMQServices.INDEXER, action: RMQServiceActions.ADD, genesis });
this.mainChannel.publish(RMQExchanges.DIRECT_EX, RMQQueues.GENESISES, Buffer.from(msgBuff));
const msgBuff = JSON.stringify({ service: RMQServices.INDEXER, action: RMQServiceAction.ADD, genesis });
this.mainChannel.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESISES, Buffer.from(msgBuff));
}

private sendMsg(exchange: string, queue: string, params: any, correlationId?: string, method?: string): void {
Expand All @@ -121,9 +115,15 @@ export class RMQService {
}

private async metaMsgConsumer(): Promise<void> {
const exchange = 'indxr_meta';
const channel = await this.connection.createChannel();
await channel.assertExchange(exchange, 'fanout', { autoDelete: true });
const q = await channel.assertQueue('', { exclusive: false });
await channel.bindQueue(q.queue, exchange, '');

try {
await this.topicChannel.consume(
`${RMQServices.INDEXER}.meta`,
await channel.consume(
q.queue,
async (msg) => {
if (!msg) {
return;
Expand Down Expand Up @@ -155,7 +155,7 @@ export class RMQService {

const result = await this.handleIncomingMsg(method, params);

this.sendMsg(RMQExchanges.DIRECT_EX, RMQQueues.REPLIES, result, correlationId);
this.sendMsg(RMQExchange.DIRECT_EX, RMQQueue.REPLIES, result, correlationId);
},
{ noAck: true },
);
Expand All @@ -173,8 +173,8 @@ export class RMQService {
return;
}

const messageBuff = JSON.stringify({ service: RMQServices.INDEXER, action: RMQServiceActions.ADD, genesis });
this.mainChannel.publish(RMQExchanges.DIRECT_EX, RMQQueues.GENESISES, Buffer.from(messageBuff));
const messageBuff = JSON.stringify({ service: RMQServices.INDEXER, action: RMQServiceAction.ADD, genesis });
this.mainChannel.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESISES, Buffer.from(messageBuff));
},
{ noAck: true },
);
Expand All @@ -191,7 +191,7 @@ export class RMQService {
public async sendMsgToMetaStorage(metahashes: Map<string, Set<string>>) {
const msg = Array.from(metahashes.entries()).map(([key, value]) => [key, Array.from(value.values())]);
return this.sendMsg(
RMQExchanges.DIRECT_EX,
RMQExchange.DIRECT_EX,
RMQServices.META_STORAGE,
msg,
null,
Expand Down
Loading

0 comments on commit 85efb59

Please sign in to comment.