Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate event logs for unsupported chains #1921

Merged
merged 16 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/datasources/cache/cache.router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export class CacheRouter {
private static readonly TOKENS_KEY = 'tokens';
private static readonly TRANSFER_KEY = 'transfer';
private static readonly TRANSFERS_KEY = 'transfers';
private static readonly UNSUPPORTED_CHAIN_EVENT = 'unsupported_chain_event';
private static readonly ZERION_BALANCES_KEY = 'zerion_balances';
private static readonly ZERION_COLLECTIBLES_KEY = 'zerion_collectibles';

Expand Down Expand Up @@ -632,6 +633,10 @@ export class CacheRouter {
);
}

static getUnsupportedChainEventCacheKey(chainId: string): string {
return `${chainId}_${this.UNSUPPORTED_CHAIN_EVENT}`;
}

static getStakingTransactionStatusCacheDir(args: {
chainId: string;
txHash: `0x${string}`;
Expand Down
64 changes: 62 additions & 2 deletions src/domain/hooks/helpers/event-cache.helper.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { Inject, Injectable } from '@nestjs/common';
import {
Inject,
Injectable,
OnModuleDestroy,
OnModuleInit,
} from '@nestjs/common';
import { IBalancesRepository } from '@/domain/balances/balances.repository.interface';
import { IChainsRepository } from '@/domain/chains/chains.repository.interface';
import { ICollectiblesRepository } from '@/domain/collectibles/collectibles.repository.interface';
Expand All @@ -15,12 +20,21 @@ import { Event } from '@/routes/hooks/entities/event.entity';
import { IBlockchainRepository } from '@/domain/blockchain/blockchain.repository.interface';
import { IStakingRepository } from '@/domain/staking/staking.repository.interface';
import { memoize, MemoizedFunction } from 'lodash';
import { CacheRouter } from '@/datasources/cache/cache.router';
import {
CacheService,
ICacheService,
} from '@/datasources/cache/cache.service.interface';
import { MAX_TTL } from '@/datasources/cache/constants';

@Injectable()
export class EventCacheHelper {
export class EventCacheHelper implements OnModuleInit, OnModuleDestroy {
private static readonly HOOK_TYPE = 'hook';
private static readonly UNSUPPORTED_CHAIN_EVENT = 'unsupported_chain_event';
public static readonly UNSUPPORTED_EVENTS_LOG_INTERVAL = 60 * 1000; // 1 minute
public isSupportedChainMemo: ((chainId: string) => Promise<boolean>) &
MemoizedFunction;
private unsupportedEventsLogTimer: NodeJS.Timeout | undefined;

constructor(
@Inject(IBalancesRepository)
Expand All @@ -43,12 +57,25 @@ export class EventCacheHelper {
private readonly transactionsRepository: ITransactionsRepository,
@Inject(LoggingService)
private readonly loggingService: ILoggingService,
@Inject(CacheService)
private readonly cacheService: ICacheService,
) {
this.isSupportedChainMemo = memoize(
this.chainsRepository.isSupportedChain.bind(this.chainsRepository),
);
}

onModuleInit(): void {
this.unsupportedEventsLogTimer = setInterval(
() => void this._logUnsupportedEvents(),
EventCacheHelper.UNSUPPORTED_EVENTS_LOG_INTERVAL,
);
}

onModuleDestroy(): void {
clearInterval(this.unsupportedEventsLogTimer);
}

// TODO: Split service into multiple classes, each handling Config/Transactions events
private readonly EventTypeHandler: {
[Type in Event['type']]: (
Expand Down Expand Up @@ -121,6 +148,17 @@ export class EventCacheHelper {
}
}

/**
* Increases the counter of unsupported chain events for the given chain.
* @param event {@link Event} object
*/
public async onUnsupportedChainEvent(event: Event): Promise<void> {
const cacheKey = CacheRouter.getUnsupportedChainEventCacheKey(
event.chainId,
);
await this.cacheService.increment(cacheKey, MAX_TTL);
}

// Transaction Service events

private onTransactionEventPendingMultisigTransaction(
Expand Down Expand Up @@ -451,6 +489,28 @@ export class EventCacheHelper {
): Array<Promise<void>> {
return [this.safeRepository.clearIsSafe(event)];
}
/**
* Logs the number of unsupported chain events for each chain and clears the store.
*/
private async _logUnsupportedEvents(): Promise<void> {
const chains = await this.chainsRepository.getChains();
await Promise.all(
chains.results.map(async (chain) => {
const cacheKey = CacheRouter.getUnsupportedChainEventCacheKey(
chain.chainId,
);
const count = await this.cacheService.getCounter(cacheKey);
if (count) {
iamacook marked this conversation as resolved.
Show resolved Hide resolved
this.loggingService.warn({
type: EventCacheHelper.UNSUPPORTED_CHAIN_EVENT,
chainId: chain.chainId,
count,
});
await this.cacheService.deleteByKey(cacheKey);
}
}),
);
}

private _logSafeTxEvent(
event: Event & { address: string; safeTxHash: string },
Expand Down
156 changes: 152 additions & 4 deletions src/domain/hooks/hooks.repository.spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { IConfigurationService } from '@/config/configuration.service.interface';
import { FakeCacheService } from '@/datasources/cache/__tests__/fake.cache.service';
import { BalancesRepository } from '@/domain/balances/balances.repository';
import { BlockchainRepository } from '@/domain/blockchain/blockchain.repository';
import { ChainsRepository } from '@/domain/chains/chains.repository';
import { chainBuilder } from '@/domain/chains/entities/__tests__/chain.builder';
import { CollectiblesRepository } from '@/domain/collectibles/collectibles.repository';
import { pageBuilder } from '@/domain/entities/__tests__/page.builder';
import { EventCacheHelper } from '@/domain/hooks/helpers/event-cache.helper';
import { EventNotificationsHelper } from '@/domain/hooks/helpers/event-notifications.helper';
import {
Expand Down Expand Up @@ -31,6 +33,7 @@ const mockBlockchainRepository = jest.mocked({

const mockChainsRepository = jest.mocked({
getChain: jest.fn(),
getChains: jest.fn(),
clearChain: jest.fn(),
isSupportedChain: jest.fn(),
} as jest.MockedObjectDeep<ChainsRepository>);
Expand Down Expand Up @@ -85,9 +88,18 @@ const mockEventNotificationsHelper = jest.mocked({

describe('HooksRepository (Unit)', () => {
let hooksRepository: HooksRepository;
let fakeCacheService: FakeCacheService;
let eventCacheHelper: EventCacheHelper;

beforeAll(() => {
jest.useFakeTimers();
PooyaRaki marked this conversation as resolved.
Show resolved Hide resolved
});

beforeEach(() => {
const eventCacheHelper = new EventCacheHelper(
jest.clearAllMocks();

fakeCacheService = new FakeCacheService();
eventCacheHelper = new EventCacheHelper(
mockBalancesRepository,
mockBlockchainRepository,
mockChainsRepository,
Expand All @@ -98,14 +110,18 @@ describe('HooksRepository (Unit)', () => {
mockStakingRepository,
mockTransactionsRepository,
mockLoggingService,
fakeCacheService,
);
hooksRepository = new HooksRepository(
mockLoggingService,
mockQueuesRepository,
mockConfigurationService,
eventCacheHelper,
);
jest.clearAllMocks();
});

afterAll(() => {
jest.useRealTimers();
});

it('should process events for known chains and memoize the chain lookup', async () => {
Expand Down Expand Up @@ -200,13 +216,55 @@ describe('HooksRepository (Unit)', () => {
expect(mockSafeRepository.clearTransfers).not.toHaveBeenCalled();
expect(mockSafeRepository.clearIncomingTransfers).not.toHaveBeenCalled();
});

it('should store the unsupported chain events and log them after UNSUPPORTED_EVENTS_LOG_INTERVAL', async () => {
eventCacheHelper.onModuleInit();
const chain = chainBuilder().build();
const chains = pageBuilder<typeof chain>().with('results', [chain]).build();
const event = incomingTokenEventBuilder()
.with('chainId', chain.chainId)
.build();
const cacheKey = `${chain.chainId}_unsupported_chain_event`;
mockChainsRepository.isSupportedChain.mockResolvedValue(false);
mockChainsRepository.getChains.mockResolvedValue(chains);

await hooksRepository.onEvent(event);
await hooksRepository.onEvent(event);
await hooksRepository.onEvent(event);
await hooksRepository.onEvent(event);

await jest.advanceTimersByTimeAsync(
EventCacheHelper.UNSUPPORTED_EVENTS_LOG_INTERVAL - 1,
);
expect(mockLoggingService.warn).not.toHaveBeenCalled();
await jest.advanceTimersByTimeAsync(
EventCacheHelper.UNSUPPORTED_EVENTS_LOG_INTERVAL + 1,
);
expect(mockLoggingService.warn).toHaveBeenCalledTimes(1);
expect(mockLoggingService.warn).toHaveBeenCalledWith({
type: 'unsupported_chain_event',
chainId: chain.chainId,
count: 4,
});
// cache should be cleared after logging
await expect(fakeCacheService.getCounter(cacheKey)).resolves.toBeNull();
});
});

describe('HooksRepositoryWithNotifications (Unit)', () => {
let hooksRepositoryWithNotifications: HooksRepositoryWithNotifications;
let fakeCacheService: FakeCacheService;
let eventCacheHelper: EventCacheHelper;

beforeAll(() => {
jest.useFakeTimers();
PooyaRaki marked this conversation as resolved.
Show resolved Hide resolved
});

beforeEach(() => {
const eventCacheHelper = new EventCacheHelper(
jest.clearAllMocks();

fakeCacheService = new FakeCacheService();
eventCacheHelper = new EventCacheHelper(
mockBalancesRepository,
mockBlockchainRepository,
mockChainsRepository,
Expand All @@ -217,6 +275,7 @@ describe('HooksRepositoryWithNotifications (Unit)', () => {
mockStakingRepository,
mockTransactionsRepository,
mockLoggingService,
fakeCacheService,
);
hooksRepositoryWithNotifications = new HooksRepositoryWithNotifications(
mockLoggingService,
Expand All @@ -225,7 +284,10 @@ describe('HooksRepositoryWithNotifications (Unit)', () => {
mockEventNotificationsHelper,
eventCacheHelper,
);
jest.clearAllMocks();
});

afterAll(() => {
jest.useRealTimers();
});

it('should process events for known chains and memoize the chain lookup', async () => {
Expand Down Expand Up @@ -320,4 +382,90 @@ describe('HooksRepositoryWithNotifications (Unit)', () => {
expect(mockSafeRepository.clearTransfers).not.toHaveBeenCalled();
expect(mockSafeRepository.clearIncomingTransfers).not.toHaveBeenCalled();
});

it('should store the unsupported chain events and log them after UNSUPPORTED_EVENTS_LOG_INTERVAL', async () => {
eventCacheHelper.onModuleInit();
const chain = chainBuilder().build();
const chains = pageBuilder<typeof chain>().with('results', [chain]).build();
const event = incomingTokenEventBuilder()
.with('chainId', chain.chainId)
.build();
const cacheKey = `${chain.chainId}_unsupported_chain_event`;
mockChainsRepository.isSupportedChain.mockResolvedValue(false);
mockChainsRepository.getChains.mockResolvedValue(chains);

await hooksRepositoryWithNotifications.onEvent(event);
await hooksRepositoryWithNotifications.onEvent(event);
await hooksRepositoryWithNotifications.onEvent(event);

await expect(fakeCacheService.getCounter(cacheKey)).resolves.toEqual(3);
await jest.advanceTimersByTimeAsync(
EventCacheHelper.UNSUPPORTED_EVENTS_LOG_INTERVAL - 1,
);
expect(mockLoggingService.warn).not.toHaveBeenCalled();
await jest.advanceTimersByTimeAsync(
EventCacheHelper.UNSUPPORTED_EVENTS_LOG_INTERVAL + 1,
);
expect(mockLoggingService.warn).toHaveBeenCalledTimes(1);
expect(mockLoggingService.warn).toHaveBeenCalledWith({
type: 'unsupported_chain_event',
chainId: chain.chainId,
count: 3,
});
// cache should be cleared after logging
await expect(fakeCacheService.getCounter(cacheKey)).resolves.toBeNull();
});

it('should store the unsupported chain events for several chains and log them after UNSUPPORTED_EVENTS_LOG_INTERVAL', async () => {
eventCacheHelper.onModuleInit();
const chains = [
chainBuilder().with('chainId', '1').build(),
chainBuilder().with('chainId', '2').build(),
];
const chainsPage = pageBuilder<(typeof chains)[0]>()
.with('results', chains)
.build();
const events = [
incomingTokenEventBuilder().with('chainId', chains[0].chainId).build(),
incomingTokenEventBuilder().with('chainId', chains[1].chainId).build(),
];
const cacheKeys = [
`${chains[0].chainId}_unsupported_chain_event`,
`${chains[1].chainId}_unsupported_chain_event`,
];
mockChainsRepository.isSupportedChain.mockResolvedValue(false);
mockChainsRepository.getChains.mockResolvedValue(chainsPage);

await hooksRepositoryWithNotifications.onEvent(events[0]);
await hooksRepositoryWithNotifications.onEvent(events[0]);
await hooksRepositoryWithNotifications.onEvent(events[1]);
await hooksRepositoryWithNotifications.onEvent(events[1]);
await hooksRepositoryWithNotifications.onEvent(events[0]);
await hooksRepositoryWithNotifications.onEvent(events[0]);
await hooksRepositoryWithNotifications.onEvent(events[1]);

await expect(fakeCacheService.getCounter(cacheKeys[0])).resolves.toEqual(4);
await expect(fakeCacheService.getCounter(cacheKeys[1])).resolves.toEqual(3);
await jest.advanceTimersByTimeAsync(
EventCacheHelper.UNSUPPORTED_EVENTS_LOG_INTERVAL - 1,
);
expect(mockLoggingService.warn).not.toHaveBeenCalled();
await jest.advanceTimersByTimeAsync(
EventCacheHelper.UNSUPPORTED_EVENTS_LOG_INTERVAL + 1,
);
expect(mockLoggingService.warn).toHaveBeenCalledTimes(2);
expect(mockLoggingService.warn).toHaveBeenCalledWith({
type: 'unsupported_chain_event',
chainId: chains[0].chainId,
count: 4,
});
expect(mockLoggingService.warn).toHaveBeenCalledWith({
type: 'unsupported_chain_event',
chainId: chains[1].chainId,
count: 3,
});
// cache should be cleared after logging
await expect(fakeCacheService.getCounter(cacheKeys[0])).resolves.toBeNull();
await expect(fakeCacheService.getCounter(cacheKeys[1])).resolves.toBeNull();
});
});
Loading