diff --git a/packages/uni-info-watcher/src/database.ts b/packages/uni-info-watcher/src/database.ts index 66c3fc0f..61e1146c 100644 --- a/packages/uni-info-watcher/src/database.ts +++ b/packages/uni-info-watcher/src/database.ts @@ -56,6 +56,8 @@ import { StateSyncStatus } from './entity/StateSyncStatus'; import { Collect } from './entity/Collect'; import { Flash } from './entity/Flash'; import { TickHourData } from './entity/TickHourData'; +import { FrothyEntity } from './entity/FrothyEntity'; +import { entityToLatestEntityMap } from './custom-indexer'; const log = debug('vulcanize:database'); @@ -95,8 +97,8 @@ export const ENTITY_QUERY_TYPE_MAP = new Map any, ENTITY_QUERY_TYPE>([ [UniswapDayData, ENTITY_QUERY_TYPE.GROUP_BY_WITHOUT_PRUNED] ]); -const ENTITIES = [Bundle, Burn, Collect, Factory, Flash, Mint, Pool, PoolDayData, PoolHourData, Position, PositionSnapshot, - Swap, Tick, TickDayData, TickHourData, Transaction, UniswapDayData]; +export const ENTITIES = new Set([Bundle, Burn, Collect, Factory, Flash, Mint, Pool, PoolDayData, PoolHourData, Position, PositionSnapshot, + Swap, Tick, TickDayData, TickHourData, Transaction, UniswapDayData]); export class Database implements DatabaseInterface { _config: ConnectionOptions @@ -869,37 +871,66 @@ export class Database implements DatabaseInterface { const blockHashes = blocks.map(block => block.blockHash); // Get all entities at the block height - const entitiesAtBlock = await Promise.all( - ENTITIES.map(entity => { - return this.getEntities( - queryRunner, - entity as any, - { - select: ['id'] as any, - where: { blockNumber } - } - ); - }) - ); + const entitiesAtHeight = await this.getEntities(queryRunner, FrothyEntity, { where: { blockNumber } }); // Extract entity ids from result - const entityIds = entitiesAtBlock.map(entities => { - return entities.map((entity: any) => entity.id); - }); + const entityIdsMap: Map = new Map(); + entitiesAtHeight.forEach(entity => + entityIdsMap.set( + entity.name, + [...entityIdsMap.get(entity.name) || [], entity.id] + ) + ); // Update isPruned flag using fetched entity ids and hashes of blocks to be pruned updatePromises.push( - ...ENTITIES.map((entity, index: number) => { + [...ENTITIES].map((entity) => { return this.updateEntity( queryRunner, entity as any, - { id: In(entityIds[index]), blockHash: In(blockHashes) }, + { id: In(entityIdsMap.get(entity.name) || []), blockHash: In(blockHashes) }, { isPruned: true } ); }) as any ); + // Simultaneously update isPruned flag for all entities await Promise.all(updatePromises); + + // Update latest entity tables with canonical entries + await this.updateNonCanonicalLatestEntities(queryRunner, blockNumber, blockHashes); + } + + async updateNonCanonicalLatestEntities (queryRunner: QueryRunner, blockNumber: number, nonCanonicalBlockHashes: string[]): Promise { + // Update latest entity tables with canonical entries + await Promise.all( + Array.from(entityToLatestEntityMap.entries()).map(async ([entity, latestEntity]) => { + // Get entries for non canonical blocks + const nonCanonicalLatestEntities = await this.getEntities(queryRunner, latestEntity, { where: { blockHash: In(nonCanonicalBlockHashes) } }); + + await Promise.all(nonCanonicalLatestEntities.map(async (nonCanonicalLatestEntity: any) => { + // Get pruned version for the non canonical entity + const prunedVersion = await this.getLatestPrunedEntity(queryRunner, entity, nonCanonicalLatestEntity.id, blockNumber); + + // If found, update the latestEntity entry for the id + // Else, delete the latestEntity entry for the id + if (prunedVersion) { + return this.updateEntity( + queryRunner, + latestEntity, + { id: nonCanonicalLatestEntity.id }, + prunedVersion + ); + } else { + return this.removeEntities( + queryRunner, + latestEntity, + { where: { id: nonCanonicalLatestEntity.id } } + ); + } + })); + }) + ); } async getBlockProgress (blockHash: string): Promise { @@ -929,7 +960,26 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getEntities(queryRunner, entity, findConditions); } - async removeEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions): Promise { + async getLatestPrunedEntity (queryRunner: QueryRunner, entity: new () => Entity, id: string, canonicalBlockNumber: number): Promise { + // Fetch the latest canonical entity for given id + const repo = queryRunner.manager.getRepository(entity); + const entityInPrunedRegion = await repo.createQueryBuilder('entity') + .where('entity.id = :id', { id }) + .andWhere('entity.is_pruned = false') + .andWhere('entity.block_number <= :canonicalBlockNumber', { canonicalBlockNumber }) + .orderBy('entity.block_number', 'DESC') + .limit(1) + .getOne(); + + return entityInPrunedRegion; + } + + async pruneFrothyEntities (queryRunner: QueryRunner, blockNumber: number): Promise { + // Remove frothy entity entries at the prune block height + return this.removeEntities(queryRunner, FrothyEntity, { where: { blockNumber: LessThanOrEqual(blockNumber) } }); + } + + async removeEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions): Promise { return this._baseDatabase.removeEntities(queryRunner, entity, findConditions); } diff --git a/packages/uni-info-watcher/src/entity/FrothyEntity.ts b/packages/uni-info-watcher/src/entity/FrothyEntity.ts new file mode 100644 index 00000000..87b4df53 --- /dev/null +++ b/packages/uni-info-watcher/src/entity/FrothyEntity.ts @@ -0,0 +1,21 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import { Entity, PrimaryColumn, Column, Index } from 'typeorm'; + +@Entity() +@Index(['blockNumber']) +export class FrothyEntity { + @PrimaryColumn('varchar') + id!: string; + + @PrimaryColumn('varchar') + name!: string; + + @PrimaryColumn('varchar', { length: 66 }) + blockHash!: string; + + @Column('integer') + blockNumber!: number; +} diff --git a/packages/uni-info-watcher/src/entity/Subscriber.ts b/packages/uni-info-watcher/src/entity/Subscriber.ts index 25c42fd6..9f46a3a8 100644 --- a/packages/uni-info-watcher/src/entity/Subscriber.ts +++ b/packages/uni-info-watcher/src/entity/Subscriber.ts @@ -6,6 +6,8 @@ import { EventSubscriber, EntitySubscriberInterface, InsertEvent, UpdateEvent } import _ from 'lodash'; import { entityToLatestEntityMap } from '../custom-indexer'; +import { ENTITIES } from '../database'; +import { FrothyEntity } from './FrothyEntity'; @EventSubscriber() export class EntitySubscriber implements EntitySubscriberInterface { @@ -19,10 +21,33 @@ export class EntitySubscriber implements EntitySubscriberInterface { } const afterInsertOrUpdate = async (event: InsertEvent | UpdateEvent): Promise => { - // Get latest entity's type const entity = event.entity; - const entityTarget = entityToLatestEntityMap.get(entity.constructor); + // Return if the entity is being pruned + if (entity.isPruned) { + return; + } + + // Insert the entity details in FrothyEntity table + if (ENTITIES.has(entity.constructor)) { + const frothyEntity = event.manager.create( + FrothyEntity, + { + ..._.pick(entity, ['id', 'blockHash', 'blockNumber']), + ...{ name: entity.constructor.name } + } + ); + + await event.manager.createQueryBuilder() + .insert() + .into(FrothyEntity) + .values(frothyEntity) + .orIgnore() + .execute(); + } + + // Get latest entity's type + const entityTarget = entityToLatestEntityMap.get(entity.constructor); if (!entityTarget) { return; } diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index 96cea857..43883367 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -519,6 +519,20 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.markBlocksAsPruned(blocks); } + async pruneFrothyEntities (blockNumber: number): Promise { + const dbTx = await this._db.createTransactionRunner(); + try { + await this._db.pruneFrothyEntities(dbTx, blockNumber); + + dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + } + async getAncestorAtDepth (blockHash: string, depth: number): Promise { return this._baseIndexer.getAncestorAtDepth(blockHash, depth); } @@ -549,8 +563,11 @@ export class Indexer implements IndexerInterface { async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force = false): Promise { const syncStatus = await this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber, force); + this._db.pruneEntityCacheFrothyBlocks(syncStatus.latestCanonicalBlockHash, syncStatus.latestCanonicalBlockNumber); + await this.pruneFrothyEntities(blockNumber); + return syncStatus; }