Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: health probe #96

Merged
merged 4 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 139 additions & 50 deletions src/domain/chainContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,19 @@ import {
Multicall3,
ComposableCoW,
Multicall3__factory,
RegistryBlock,
blockToRegistryBlock,
} from "../types";
import { SupportedChainId, OrderBookApi } from "@cowprotocol/cow-sdk";
import { addContract } from "./addContract";
import { checkForAndPlaceOrder } from "./checkForAndPlaceOrder";
import { ethers } from "ethers";
import { composableCowContract, DBService, getLogger } from "../utils";
import {
composableCowContract,
DBService,
getLogger,
isRunningInKubernetesPod,
} from "../utils";
import {
blockHeight,
blockProducingRate,
Expand All @@ -25,6 +32,34 @@ const WATCHDOG_FREQUENCY = 5 * 1000; // 5 seconds

const MULTICALL3 = "0xcA11bde05977b3631167028862bE2a173976CA11";

enum ChainSync {
/** The chain is currently in the warm-up phase, synchronising from contract genesis or lastBlockProcessed */
SYNCING = "SYNCING",
mfw78 marked this conversation as resolved.
Show resolved Hide resolved
/** The chain is in sync with the latest block */
IN_SYNC = "IN_SYNC",
/** The chain is in an unknown state based on duration of time since a block was processed */
UNKNOWN = "UNKNOWN",
mfw78 marked this conversation as resolved.
Show resolved Hide resolved
}
mfw78 marked this conversation as resolved.
Show resolved Hide resolved

type Chains = { [chainId: number]: ChainContext };

export interface ChainStatus {
sync: ChainSync;
chainId: SupportedChainId;
lastProcessedBlock: RegistryBlock | null;
}

export interface ChainHealth extends ChainStatus {
isHealthy: boolean;
}

export interface ChainWatcherHealth {
overallHealth: boolean;
chains: {
[chainId: number]: ChainHealth;
};
}

/**
* The chain context handles watching a single chain for new conditional orders
* and executing them.
Expand All @@ -33,7 +68,8 @@ export class ChainContext {
readonly deploymentBlock: number;
readonly pageSize: number;
readonly dryRun: boolean;
private inSync = false;
private sync: ChainSync = ChainSync.SYNCING;
static chains: Chains = {};

provider: ethers.providers.Provider;
chainId: SupportedChainId;
Expand Down Expand Up @@ -83,7 +119,11 @@ export class ChainContext {
deploymentBlock
);

return new ChainContext(options, provider, chainId, registry);
// Save the context to the static map to be used by the API
const context = new ChainContext(options, provider, chainId, registry);
ChainContext.chains[chainId] = context;

return context;
}

/**
Expand All @@ -100,27 +140,28 @@ export class ChainContext {
const { pageSize } = this;

// Set the block height metric
blockHeight.labels(chainId.toString()).set(lastProcessedBlock ?? 0);
blockHeight.labels(chainId.toString()).set(lastProcessedBlock?.number ?? 0);

// Start watching from (not including) the last processed block (if any)
let fromBlock = lastProcessedBlock
? lastProcessedBlock + 1
? lastProcessedBlock.number + 1
: this.deploymentBlock;
let currentBlockNumber = await provider.getBlockNumber();
let currentBlock = await provider.getBlock("latest");

let printSyncInfo = true; // Print sync info only once
let plan: ReplayPlan = {};
let toBlock: "latest" | number = 0;
do {
do {
toBlock = !pageSize ? "latest" : fromBlock + (pageSize - 1);
if (typeof toBlock === "number" && toBlock > currentBlockNumber) {
// refresh the current block number
currentBlockNumber = await provider.getBlockNumber();
toBlock = toBlock > currentBlockNumber ? currentBlockNumber : toBlock;
if (typeof toBlock === "number" && toBlock > currentBlock.number) {
// refresh the current block
currentBlock = await provider.getBlock("latest");
toBlock =
toBlock > currentBlock.number ? currentBlock.number : toBlock;

log.debug(
`Reaching tip of chain, current block number: ${currentBlockNumber}`
`Reaching tip of chain, current block number: ${currentBlock.number}`
);
}

Expand Down Expand Up @@ -156,24 +197,24 @@ export class ChainContext {
if (typeof toBlock === "number") {
fromBlock = toBlock + 1;
}
} while (toBlock !== "latest" && toBlock !== currentBlockNumber);

const block = await provider.getBlock(currentBlockNumber);
} while (toBlock !== "latest" && toBlock !== currentBlock.number);

// Replay only the blocks that had some events.
for (const [blockNumber, events] of Object.entries(plan)) {
log.debug(`Processing block ${blockNumber}`);
const historicalBlock = await provider.getBlock(Number(blockNumber));
try {
await processBlock(
this,
Number(blockNumber),
historicalBlock,
events,
block.number,
block.timestamp
currentBlock.number,
currentBlock.timestamp
);

// Set the last processed block to this iteration's block number
this.registry.lastProcessedBlock = Number(blockNumber);
this.registry.lastProcessedBlock =
blockToRegistryBlock(historicalBlock);
await this.registry.write();

// Set the block height metric
Expand All @@ -186,25 +227,24 @@ export class ChainContext {
}

// Set the last processed block to the current block number
this.registry.lastProcessedBlock = currentBlockNumber;
this.registry.lastProcessedBlock = blockToRegistryBlock(currentBlock);

// Save the registry
await this.registry.write();

// It may have taken some time to process the blocks, so refresh the current block number
// and check if we are in sync
currentBlockNumber = await provider.getBlockNumber();
currentBlock = await provider.getBlock("latest");

// If we are in sync, let it be known
if (currentBlockNumber === this.registry.lastProcessedBlock) {
this.inSync = true;
if (currentBlock.number === this.registry.lastProcessedBlock.number) {
this.sync = ChainSync.IN_SYNC;
} else {
// Otherwise, we need to keep processing blocks
this.inSync = false;
fromBlock = this.registry.lastProcessedBlock + 1;
fromBlock = this.registry.lastProcessedBlock.number + 1;
plan = {};
}
} while (!this.inSync);
} while (this.sync === ChainSync.SYNCING);

log.info(
`💚 ${
Expand All @@ -219,42 +259,45 @@ export class ChainContext {
}

// Otherwise, run the block watcher
return await this.runBlockWatcher(watchdogTimeout);
return await this.runBlockWatcher(watchdogTimeout, currentBlock);
}

/**
* Run the block watcher for the chain. As new blocks come in:
* 1. Check if there are any `ConditionalOrderCreated` events, and index these.
* 2. Check if any orders want to create discrete orders.
*/
private async runBlockWatcher(watchdogTimeout: number) {
private async runBlockWatcher(
watchdogTimeout: number,
lastProcessedBlock: ethers.providers.Block
) {
const { provider, registry, chainId } = this;
const log = getLogger(`chainContext:runBlockWatcher:${chainId}`);
// Watch for new blocks
log.info(`👀 Start block watcher`);
log.debug(`Watchdog timeout: ${watchdogTimeout} seconds`);
let lastBlockReceived = 0;
let timeLastBlockProcessed = new Date().getTime();
let lastBlockReceived = lastProcessedBlock;
provider.on("block", async (blockNumber: number) => {
try {
const block = await provider.getBlock(blockNumber);
log.debug(`New block ${blockNumber}`);
// Set the block time metric
const now = new Date().getTime();
const _blockTime = (now - timeLastBlockProcessed) / 1000;
timeLastBlockProcessed = now;

// Set the block time metric
const _blockTime = block.timestamp - lastBlockReceived.timestamp;
blockProducingRate.labels(chainId.toString()).set(_blockTime);

if (blockNumber <= lastBlockReceived) {
// This may be a re-org, so process the block again
if (
blockNumber <= lastBlockReceived.number &&
block.hash !== lastBlockReceived.hash
) {
// This is a re-org, so process the block again
reorgsTotal.labels(chainId.toString()).inc();
log.info(`Re-org detected, re-processing block ${blockNumber}`);
reorgDepth
.labels(chainId.toString())
.set(lastBlockReceived - blockNumber + 1);
.set(lastBlockReceived.number - blockNumber + 1);
}
lastBlockReceived = blockNumber;
lastBlockReceived = block;

const events = await pollContractForEvents(
blockNumber,
Expand All @@ -263,9 +306,10 @@ export class ChainContext {
);

try {
await processBlock(this, Number(blockNumber), events);
await processBlock(this, block, events);

// Block height metric
this.registry.lastProcessedBlock = blockToRegistryBlock(block);
blockHeight.labels(chainId.toString()).set(Number(blockNumber));
} catch {
log.error(`Error processing block ${blockNumber}`);
Expand All @@ -280,38 +324,84 @@ export class ChainContext {
}
});

// We run a watchdog to check if we are receiving blocks.
// We run a watchdog to check if we are receiving blocks. This determines if
// the chain is stuck or not issuing blocks. If running within a kubernetes
// pod, we don't exit, but we do log an error and set the sync status to unknown.
while (true) {
// sleep for 5 seconds
await asyncSleep(WATCHDOG_FREQUENCY);
const currentTime = new Date().getTime();
const timeElapsed = currentTime - timeLastBlockProcessed;
const now = Math.floor(new Date().getTime() / 1000);
const timeElapsed = now - lastBlockReceived.timestamp;

log.debug(`Time since last block processed: ${timeElapsed}ms`);

// If we haven't received a block in 30 seconds, exit so that the process manager can restart us
// If we haven't received a block within `watchdogTimeout` seconds, either signal
// an error or exit if not running in a kubernetes pod
if (timeElapsed >= watchdogTimeout * 1000) {
const formattedElapsedTime = Math.floor(timeElapsed / 1000);
log.error(
`Watchdog timeout (RPC failed, or chain is stuck / not issuing blocks)`
`Chain watcher last processed a block ${formattedElapsedTime}s ago (${watchdogTimeout}s timeout configured). Check the RPC.`
);
if (isRunningInKubernetesPod()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why mentioning here Kubernetes?
I think this could just be a setting: exit on stale chain state

this.sync = ChainSync.STALE;

if (exitOnStaleChainState) {
  await registry.storage.close();
  exit(1)
}

this.sync = ChainSync.UNKNOWN;
continue;
}

// We need to handle our own exit here as the process is not running in a kubernetes pod
await registry.storage.close();
process.exit(1);
}
}
}

/** Get the specific chain's health */
get health(): ChainHealth {
const { sync, chainId } = this;
return {
sync,
chainId,
lastProcessedBlock: this.registry.lastProcessedBlock,
isHealthy: this.isHealthy(),
};
}

/** Determine if the specific chain is healthy */
private isHealthy(): boolean {
return this.sync === ChainSync.IN_SYNC;
}

/** Get the health status of all the chains, and the overall status */
static get health(): ChainWatcherHealth {
const chains = Object.values(ChainContext.chains).reduce(
(acc, chain) => {
const { chainId } = chain;
acc.chains[Number(chainId.toString())] = chain.health;
acc.overallHealth = acc.overallHealth && chain.isHealthy();
return acc;
},
{ chains: {}, overallHealth: true } as ChainWatcherHealth
);

return chains;
}

/** Determine if all chains are healthy */
static isHealthy(): boolean {
return ChainContext.health.overallHealth;
}
}

/**
* Process events in a block.
* @param context of the chain who's block is being processed
* @param blockNumber from which the events were emitted
* @param block from which the events were emitted
* @param events an array of conditional order created events
* @param blockNumberOverride to override the block number when polling the SDK
* @param blockTimestampOverride to override the block timestamp when polling the SDK
*/
async function processBlock(
context: ChainContext,
blockNumber: number,
block: ethers.providers.Block,
events: ConditionalOrderCreatedEvent[],
blockNumberOverride?: number,
blockTimestampOverride?: number
Expand All @@ -320,8 +410,7 @@ async function processBlock(
const timer = processBlockDurationSeconds
.labels(context.chainId.toString())
.startTimer();
const log = getLogger(`chainContext:processBlock:${chainId}:${blockNumber}`);
const block = await provider.getBlock(blockNumber);
const log = getLogger(`chainContext:processBlock:${chainId}:${block.number}`);

// Transaction watcher for adding new contracts
let hasErrors = false;
Expand Down Expand Up @@ -356,9 +445,9 @@ async function processBlock(
return false;
});
log.debug(
`Result of "checkForAndPlaceOrder" action for block ${blockNumber}: ${_formatResult(
result
)}`
`Result of "checkForAndPlaceOrder" action for block ${
block.number
}: ${_formatResult(result)}`
);

timer();
Expand Down
Loading
Loading