Skip to content

Commit

Permalink
Add hasState field to program entity (#1342)
Browse files Browse the repository at this point in the history
  • Loading branch information
osipov-mit authored Jul 31, 2023
1 parent fa0bce1 commit 9930458
Show file tree
Hide file tree
Showing 26 changed files with 234 additions and 121 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/CI-CD-gear-js-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
test:
if: github.event_name == 'pull_request'

runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
env:
RUSTUP_HOME: /tmp/rustup_home
steps:
Expand Down Expand Up @@ -93,7 +93,7 @@ jobs:
publish-to-npm:
if: github.event_name == 'push'

runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3

Expand Down
61 changes: 22 additions & 39 deletions idea/api-gateway/src/rabbitmq/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
import { CronJob } from 'cron';
import { connect, Connection, Channel } from 'amqplib';
import {
RMQMessage,
RMQReply,
RMQServiceActions,
RMQServices,
RabbitMQExchanges,
RabbitMQueues,
} from '@gear-js/common';
import { RMQExchanges, RMQMessage, RMQQueues, RMQReply, RMQServiceActions, RMQServices } from '@gear-js/common';

import config from '../config';
import { logger } from '../common/logger';
Expand Down Expand Up @@ -36,36 +29,36 @@ export class RMQService {
}

this.mainChannel = await this.connection.createChannel();
await this.mainChannel.assertExchange(RabbitMQExchanges.TOPIC_EX, 'topic', { durable: true });
await this.mainChannel.assertExchange(RabbitMQExchanges.DIRECT_EX, 'direct', { durable: true });
await this.mainChannel.assertQueue(RabbitMQueues.REPLIES, {
await this.mainChannel.assertExchange(RMQExchanges.TOPIC_EX, 'topic', { durable: true });
await this.mainChannel.assertExchange(RMQExchanges.DIRECT_EX, 'direct', { durable: true });
await this.mainChannel.assertQueue(RMQQueues.REPLIES, {
durable: true,
exclusive: false,
autoDelete: false,
messageTtl: 30_000,
});

await this.mainChannel.bindQueue(RabbitMQueues.REPLIES, RabbitMQExchanges.DIRECT_EX, RabbitMQueues.REPLIES);
await this.mainChannel.bindQueue(RMQQueues.REPLIES, RMQExchanges.DIRECT_EX, RMQQueues.REPLIES);

await this.mainChannel.assertQueue(RabbitMQueues.GENESISES, {
await this.mainChannel.assertQueue(RMQQueues.GENESISES, {
durable: true,
exclusive: false,
autoDelete: false,
messageTtl: 30_000,
});

await this.mainChannel.bindQueue(RabbitMQueues.GENESISES, RabbitMQExchanges.DIRECT_EX, RabbitMQueues.GENESISES);
await this.mainChannel.bindQueue(RMQQueues.GENESISES, RMQExchanges.DIRECT_EX, RMQQueues.GENESISES);

this.metaChannel = await this.connection.createChannel();
this.metaChannel.assertExchange(RabbitMQExchanges.DIRECT_EX, 'direct', { durable: true });
this.metaChannel.assertExchange(RMQExchanges.DIRECT_EX, 'direct', { durable: true });

await this.subscribeToGenesises();
await this.subscribeToReplies();
}

private async subscribeToReplies(): Promise<void> {
await this.mainChannel.consume(
RabbitMQueues.REPLIES,
RMQQueues.REPLIES,
(message) => {
if (!message) {
return;
Expand All @@ -85,7 +78,7 @@ export class RMQService {

private async subscribeToGenesises() {
await this.mainChannel.consume(
RabbitMQueues.GENESISES,
RMQQueues.GENESISES,
async (message) => {
if (!message) {
return;
Expand Down Expand Up @@ -148,41 +141,31 @@ export class RMQService {

private async createChannel() {
const channel = await this.connection.createChannel();
channel.assertExchange(RabbitMQExchanges.DIRECT_EX, 'direct', { durable: true });
channel.assertExchange(RMQExchanges.DIRECT_EX, 'direct', { durable: true });
return channel;
}

public sendMsgToIndexer({ genesis, params, correlationId, method }: RMQMessage) {
const channel = this.indexerChannels.get(genesis);

channel.publish(
RabbitMQExchanges.DIRECT_EX,
`${RMQServices.INDEXER}.${genesis}`,
Buffer.from(JSON.stringify(params)),
{
correlationId,
headers: { method },
},
);
channel.publish(RMQExchanges.DIRECT_EX, `${RMQServices.INDEXER}.${genesis}`, Buffer.from(JSON.stringify(params)), {
correlationId,
headers: { method },
});
}

public sendMsgToMetaStorage({ params, correlationId, method }: RMQMessage) {
this.metaChannel.publish(
RabbitMQExchanges.DIRECT_EX,
RMQServices.META_STORAGE,
Buffer.from(JSON.stringify(params)),
{
correlationId,
headers: { method },
},
);
this.metaChannel.publish(RMQExchanges.DIRECT_EX, RMQServices.META_STORAGE, Buffer.from(JSON.stringify(params)), {
correlationId,
headers: { method },
});
}

public sendMsgToTestBalance({ genesis, params, correlationId, method }: RMQMessage) {
const channel = this.tbChannels.get(genesis);

channel.publish(
RabbitMQExchanges.DIRECT_EX,
RMQExchanges.DIRECT_EX,
`${RMQServices.TEST_BALANCE}.${genesis}`,
Buffer.from(JSON.stringify(params)),
{
Expand All @@ -193,11 +176,11 @@ export class RMQService {
}

public sendMsgIndexerGenesises() {
this.mainChannel.publish(RabbitMQExchanges.TOPIC_EX, `${RMQServices.INDEXER}.genesises`, Buffer.from(''));
this.mainChannel.publish(RMQExchanges.TOPIC_EX, `${RMQServices.INDEXER}.genesises`, Buffer.from(''));
}

public sendMsgTBGenesises() {
this.mainChannel.publish(RabbitMQExchanges.TOPIC_EX, `${RMQServices.TEST_BALANCE}.genesises`, Buffer.from(''));
this.mainChannel.publish(RMQExchanges.TOPIC_EX, `${RMQServices.TEST_BALANCE}.genesises`, Buffer.from(''));
}

public isExistTBChannel(genesis: string) {
Expand Down
4 changes: 4 additions & 0 deletions idea/common/src/enums/api-methods.enum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ export enum INDEXER_METHODS {
STATE_GET = 'state.get',
}

export enum INDEXER_INTERNAL_METHODS {
META_HAS_STATE = 'meta.hasState',
}

export enum META_STORAGE_METHODS {
META_GET = 'meta.get',
META_ADD = 'meta.add',
Expand Down
2 changes: 1 addition & 1 deletion idea/common/src/enums/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ export * from './api-methods.enum';
export * from './message-read-reason.enum';
export * from './code-status.enum';
export * from './program-status.enum';
export * from './rabbit-mqueues.enum';
export * from './rmq';
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
enum RabbitMQueues {
export enum RMQQueues {
GENESISES = 'genesises',
REPLIES = 'replies',
}

enum RabbitMQExchanges {
export enum RMQExchanges {
DIRECT_EX = 'direct.ex',
TOPIC_EX = 'topic.ex',
}

enum RMQServices {
export enum RMQServices {
INDEXER = 'indxr',
TEST_BALANCE = 'tb',
META_STORAGE = 'meta',
}

enum RMQServiceActions {
export enum RMQServiceActions {
ADD = 'add',
DELETE = 'delete',
}

export { RabbitMQueues, RabbitMQExchanges, RMQServiceActions, RMQServices };
File renamed without changes.
3 changes: 3 additions & 0 deletions idea/indexer/src/database/entities/code.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,7 @@ export class Code extends BaseEntity implements ICode {

@OneToMany(() => StateToCode, (stateToCode) => stateToCode.code)
public stateToCodes!: StateToCode[];

@Column({ default: false })
public hasState: boolean;
}
3 changes: 3 additions & 0 deletions idea/indexer/src/database/entities/program.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ export class Program extends BaseEntity implements IProgram {

@OneToMany(() => Message, (message) => message.program)
public messages: Message[];

@Column({ default: false })
public hasState: boolean;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class addHasStateField1690793654952 implements MigrationInterface {
name = 'addHasStateField1690793654952'

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
ALTER TABLE "code"
ADD "hasState" boolean NOT NULL DEFAULT false
`);
await queryRunner.query(`
ALTER TABLE "program"
ADD "hasState" boolean NOT NULL DEFAULT false
`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
ALTER TABLE "program" DROP COLUMN "hasState"
`);
await queryRunner.query(`
ALTER TABLE "code" DROP COLUMN "hasState"
`);
}

}
2 changes: 1 addition & 1 deletion idea/indexer/src/gear/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { GearApi } from '@gear-js/api';
import { RMQServiceActions } from '@gear-js/common';

import config from '../config';
import { changeStatus } from '../healthcheck';
import { changeStatus } from '../healthcheck.server';
import { logger } from '../common';
import { GearIndexer } from './indexer';

Expand Down
3 changes: 2 additions & 1 deletion idea/indexer/src/gear/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import { Block, Code, Message, Program } from '../database/entities';
import { BlockService, CodeService, MessageService, ProgramService, StatusService } from '../services';
import { TempState } from './temp-state';
import config from '../config';
import { RMQService } from '../rabbitmq';
import { RMQService } from '../rmq';

export class GearIndexer {
public api: GearApi;
Expand Down Expand Up @@ -316,6 +316,7 @@ export class GearIndexer {
genesis: this.genesis,
metahash: await this.getMeta(programId, code),
status: ProgramStatus.PROGRAM_SET,
hasState: code.hasState,
}),
);
}
Expand Down
2 changes: 1 addition & 1 deletion idea/indexer/src/gear/temp-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { CodeStatus, MessageReadReason } from '@gear-js/common';
import { MessageStatus, ProgramStatus, generateUUID } from '../common';
import { Block, Code, Message, Program } from '../database';
import { BlockService, CodeService, MessageService, ProgramService } from '../services';
import { RMQService } from '../rabbitmq';
import { RMQService } from '../rmq';

export class TempState {
private programs: Map<string, Program>;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import http from 'http';

import config from '../config';
import { logger } from '../common';
import config from './config';
import { logger } from './common';

export const statuses = {
rmq: false,
Expand Down
4 changes: 2 additions & 2 deletions idea/indexer/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { waitReady } from '@polkadot/wasm-crypto';
import { RMQServiceActions } from '@gear-js/common';

import { changeStatus, runHealthcheckServer } from './healthcheck';
import { changeStatus, runHealthcheckServer } from './healthcheck.server';
import { AppDataSource } from './database';
import { RMQService } from './rabbitmq';
import { RMQService } from './rmq';
import { BlockService } from './services';
import { CodeService } from './services';
import { MessageService } from './services';
Expand Down
4 changes: 2 additions & 2 deletions idea/indexer/src/one-time-sync.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { waitReady } from '@polkadot/wasm-crypto';
import { GearApi } from '@gear-js/api';

import { changeStatus, runHealthcheckServer } from './healthcheck';
import { changeStatus, runHealthcheckServer } from './healthcheck.server';
import { AppDataSource } from './database';
import { BlockService, StatusService } from './services';
import { CodeService } from './services';
Expand All @@ -10,7 +10,7 @@ import { ProgramService } from './services';
import { GearIndexer } from './gear';
import config from './config';
import { logger } from './common';
import { RMQService } from './rabbitmq';
import { RMQService } from './rmq';

async function bootstrap() {
runHealthcheckServer();
Expand Down
Loading

0 comments on commit 9930458

Please sign in to comment.