Skip to content

Commit

Permalink
Merge pull request #132 from ckb-cell/feat/utxo-and-cell-cache
Browse files Browse the repository at this point in the history
feat: add data cache for UTXO and RGB++ asset cells
  • Loading branch information
ahonn authored May 17, 2024
2 parents 2cf34c5 + 9dd1b7c commit ecae7c6
Show file tree
Hide file tree
Showing 22 changed files with 1,152 additions and 151 deletions.
4 changes: 4 additions & 0 deletions src/@types/fastify/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import Paymaster from '../../services/paymaster';
import SPVClient from '../../services/spv';
import CKBClient from '../../services/ckb';
import BitcoinClient from '../../services/bitcoin';
import RgbppCollector from '../../services/rgbpp';
import UTXOSyncer from '../../services/utxo';

declare module 'fastify' {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
Expand All @@ -15,5 +17,7 @@ declare module 'fastify' {
spv: SPVClient;
paymaster: Paymaster;
transactionProcessor: TransactionProcessor;
rgbppCollector: RgbppCollector;
utxoSyncer: UTXOSyncer;
}
}
6 changes: 6 additions & 0 deletions src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import Unlocker from './services/unlocker';
import SPVClient from './services/spv';
import CKBClient from './services/ckb';
import BitcoinClient from './services/bitcoin';
import RgbppCollector from './services/rgbpp';
import UTXOSyncer from './services/utxo';

export interface Cradle {
env: typeof env;
Expand All @@ -19,6 +21,8 @@ export interface Cradle {
paymaster: Paymaster;
unlocker: Unlocker;
transactionProcessor: TransactionProcessor;
rgbppCollector: RgbppCollector;
utxoSyncer: UTXOSyncer;
}

const container = createContainer<Cradle>({
Expand All @@ -40,6 +44,8 @@ container.register({
paymaster: asClass(Paymaster).singleton(),
transactionProcessor: asClass(TransactionProcessor).singleton(),
unlocker: asClass(Unlocker).singleton(),
rgbppCollector: asClass(RgbppCollector).singleton(),
utxoSyncer: asClass(UTXOSyncer).singleton(),
});

export default container;
42 changes: 42 additions & 0 deletions src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,48 @@ const envSchema = z
.enum(['true', 'false'])
.default('false')
.transform((value) => value === 'true'),

/**
* UTXO sync data cache enable flag, used to cache the UTXO sync data
* enable by default
*/
UTXO_SYNC_DATA_CACHE_ENABLE: z
.enum(['true', 'false'])
.default('true')
.transform((value) => value === 'true'),
/**
* UTXO sync repeat base duration, used to set the UTXO sync repeat interval
* repeat job start interval is 10 seconds by default
*/
UTXO_SYNC_REPEAT_BASE_DURATION: z.coerce.number().default(10 * 1000),
/**
* UTXO sync repeat max duration, used to maximum the UTXO sync repeat interval
* 1 hour by default
*/
UTXO_SYNC_REPEAT_MAX_DURATION: z.coerce.number().default(60 * 60 * 1000),
/**
* UTXO sync repeat expired duration, used to remove the expired UTXO sync job
* 336 hours by default
*/
UTXO_SYNC_REPEAT_EXPRIED_DURATION: z.coerce.number().default(336 * 60 * 60 * 1000),
/**
* UTXO sync data cache expire duration, used to cache the UTXO sync data
* 30 minutes by default
*/
UTXO_SYNC_DATA_CACHE_EXPIRE: z.coerce.number().default(30 * 60 * 1000),

/**
* RGB++ collect data cache enable flag, used to cache the RGB++ collect data
* enable by default
*/
RGBPP_COLLECT_DATA_CACHE_ENABLE: z
.enum(['true', 'false'])
.default('true')
.transform((value) => value === 'true'),
/**
* RGB++ collect data cache expire duration, used to cache the RGB++ collect data
*/
RGBPP_COLLECT_DATA_CACHE_EXPIRE: z.coerce.number().default(30 * 60 * 1000),
})
.and(
z.union([
Expand Down
45 changes: 43 additions & 2 deletions src/plugins/cron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import TransactionProcessor from '../services/transaction';
import cron from 'fastify-cron';
import { Env } from '../env';
import Unlocker from '../services/unlocker';
import RgbppCollector from '../services/rgbpp';
import UTXOSyncer from '../services/utxo';

export default fp(async (fastify) => {
try {
Expand Down Expand Up @@ -48,10 +50,10 @@ export default fp(async (fastify) => {
fastify.addHook('onReady', async () => {
transactionProcessor.startProcess({
onActive: (job) => {
fastify.log.info(`Job active: ${job.id}`);
fastify.log.info(`[TransactionProcessor] job active: ${job.id}`);
},
onCompleted: (job) => {
fastify.log.info(`Job completed: ${job.id}`);
fastify.log.info(`[TransactionProcessor] job completed: ${job.id}`);
},
});
});
Expand All @@ -78,6 +80,45 @@ export default fp(async (fastify) => {
},
};

if (env.UTXO_SYNC_DATA_CACHE_ENABLE) {
const utxoSyncer: UTXOSyncer = fastify.container.resolve('utxoSyncer');
fastify.addHook('onReady', async () => {
utxoSyncer.startProcess({
onActive: (job) => {
fastify.log.info(`[UTXOSyncer] job active: ${job.id}`);
},
onCompleted: async (job) => {
fastify.log.info(`[UTXOSyncer] job completed: ${job.id}`);
if (env.RGBPP_COLLECT_DATA_CACHE_ENABLE) {
const { btcAddress, utxos } = job.returnvalue;
const rgbppCollector: RgbppCollector = fastify.container.resolve('rgbppCollector');
await rgbppCollector.enqueueCollectJob(btcAddress, utxos, true);
}
},
});
});
fastify.addHook('onClose', async () => {
utxoSyncer.closeProcess();
});
}

if (env.RGBPP_COLLECT_DATA_CACHE_ENABLE) {
const rgbppCollector: RgbppCollector = fastify.container.resolve('rgbppCollector');
fastify.addHook('onReady', async () => {
rgbppCollector.startProcess({
onActive: (job) => {
fastify.log.info(`[RgbppCollector] job active: ${job.id}`);
},
onCompleted: (job) => {
fastify.log.info(`[RgbppCollector] job completed: ${job.id}`);
},
});
});
fastify.addHook('onClose', async () => {
rgbppCollector.closeProcess();
});
}

// processing unlock BTC_TIME_LOCK cells
const unlocker: Unlocker = fastify.container.resolve('unlocker');
const monitorSlug = env.UNLOCKER_MONITOR_SLUG;
Expand Down
14 changes: 13 additions & 1 deletion src/routes/bitcoin/address.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import { Balance, Transaction, UTXO } from './types';
import validateBitcoinAddress from '../../utils/validators';
import { ZodTypeProvider } from 'fastify-type-provider-zod';
import z from 'zod';
import { Env } from '../../env';

const addressRoutes: FastifyPluginCallback<Record<never, never>, Server, ZodTypeProvider> = (fastify, _, done) => {
const env: Env = fastify.container.resolve('env');

fastify.addHook('preHandler', async (request) => {
const { address } = request.params as { address: string };
const valid = validateBitcoinAddress(address);
Expand Down Expand Up @@ -83,7 +86,16 @@ const addressRoutes: FastifyPluginCallback<Record<never, never>, Server, ZodType
async function (request) {
const { address } = request.params;
const { only_confirmed, min_satoshi } = request.query;
let utxos = await fastify.bitcoin.getAddressTxsUtxo({ address });

let utxosCache = null;
if (env.UTXO_SYNC_DATA_CACHE_ENABLE) {
utxosCache = await fastify.utxoSyncer.getUTXOsFromCache(address);
await fastify.utxoSyncer.enqueueSyncJob(address);
}
let utxos = utxosCache ? utxosCache : await fastify.bitcoin.getAddressTxsUtxo({ address });
if (utxosCache) {
fastify.log.debug(`[UTXO] get utxos from cache: ${address}`);
}

// compatible with the case where only_confirmed is undefined
if (only_confirmed === 'true' || only_confirmed === 'undefined') {
Expand Down
2 changes: 2 additions & 0 deletions src/routes/bitcoin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import container from '../../container';
import { ZodTypeProvider } from 'fastify-type-provider-zod';
import BitcoinClient from '../../services/bitcoin';
import feesRoutes from './fees';
import UTXOSyncer from '../../services/utxo';

const bitcoinRoutes: FastifyPluginCallback<Record<never, never>, Server, ZodTypeProvider> = (fastify, _, done) => {
fastify.decorate('bitcoin', container.resolve<BitcoinClient>('bitcoin'));
fastify.decorate('utxoSyncer', container.resolve<UTXOSyncer>('utxoSyncer'));

fastify.register(infoRoute);
fastify.register(blockRoutes, { prefix: '/block' });
Expand Down
48 changes: 48 additions & 0 deletions src/routes/cron/collect-rgbpp-cells.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import pino from 'pino';
import { FastifyPluginCallback } from 'fastify';
import { Server } from 'http';
import { ZodTypeProvider } from 'fastify-type-provider-zod';
import container from '../../container';
import { VERCEL_MAX_DURATION } from '../../constants';
import RgbppCollector from '../../services/rgbpp';

const collectRgbppCellsCronRoute: FastifyPluginCallback<Record<never, never>, Server, ZodTypeProvider> = (
fastify,
_,
done,
) => {
fastify.get(
'/collect-rgbpp-cells',
{
schema: {
tags: ['Cron Task'],
description: 'Run UTXO sync cron task to update data cache, used for serverless deployment',
},
},
async () => {
const logger = container.resolve<pino.BaseLogger>('logger');
const rgbppCollector: RgbppCollector = container.resolve('rgbppCollector');
try {
await new Promise((resolve) => {
setTimeout(resolve, (VERCEL_MAX_DURATION - 10) * 1000);
rgbppCollector.startProcess({
onActive: (job) => {
logger.info(`[rgbppCollector] Job active: ${job.id}`);
},
onCompleted: (job) => {
logger.info(`[rgbppCollector] Job completed: ${job.id}`);
},
});
});
await rgbppCollector.pauseProcess();
await rgbppCollector.closeProcess();
} catch (err) {
logger.error(err);
fastify.Sentry.captureException(err);
}
},
);
done();
};

export default collectRgbppCellsCronRoute;
14 changes: 9 additions & 5 deletions src/routes/cron/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import { FastifyPluginCallback } from "fastify";
import { ZodTypeProvider } from "fastify-type-provider-zod";
import { Server } from "http";
import processTransactionsCronRoute from "./process-transactions";
import unlockCellsCronRoute from "./unlock-cells";
import { FastifyPluginCallback } from 'fastify';
import { ZodTypeProvider } from 'fastify-type-provider-zod';
import { Server } from 'http';
import processTransactionsCronRoute from './process-transactions';
import unlockCellsCronRoute from './unlock-cells';
import syncUTXOCronRoute from './sync-utxo';
import collectRgbppCellsCronRoute from './collect-rgbpp-cells';

const cronRoutes: FastifyPluginCallback<Record<never, never>, Server, ZodTypeProvider> = (fastify, _, done) => {
fastify.register(processTransactionsCronRoute);
fastify.register(unlockCellsCronRoute);
fastify.register(syncUTXOCronRoute);
fastify.register(collectRgbppCellsCronRoute);
done();
};

Expand Down
4 changes: 2 additions & 2 deletions src/routes/cron/process-transactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ const processTransactionsCronRoute: FastifyPluginCallback<Record<never, never>,
setTimeout(resolve, (VERCEL_MAX_DURATION - 10) * 1000);
transactionProcessor.startProcess({
onActive: (job) => {
logger.info(`Job active: ${job.id}`);
logger.info(`[TransactionProcessor] Job active: ${job.id}`);
},
onCompleted: (job) => {
logger.info(`Job completed: ${job.id}`);
logger.info(`[TransactionProcessor] Job completed: ${job.id}`);
},
});
});
Expand Down
44 changes: 44 additions & 0 deletions src/routes/cron/sync-utxo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import pino from 'pino';
import { FastifyPluginCallback } from 'fastify';
import { Server } from 'http';
import { ZodTypeProvider } from 'fastify-type-provider-zod';
import container from '../../container';
import { VERCEL_MAX_DURATION } from '../../constants';
import UTXOSyncer from '../../services/utxo';

const syncUTXOCronRoute: FastifyPluginCallback<Record<never, never>, Server, ZodTypeProvider> = (fastify, _, done) => {
fastify.get(
'/sync-utxo',
{
schema: {
tags: ['Cron Task'],
description: 'Run UTXO sync cron task to update data cache, used for serverless deployment',
},
},
async () => {
const logger = container.resolve<pino.BaseLogger>('logger');
const utxoSyncer: UTXOSyncer = container.resolve('utxoSyncer');
try {
await new Promise((resolve) => {
setTimeout(resolve, (VERCEL_MAX_DURATION - 10) * 1000);
utxoSyncer.startProcess({
onActive: (job) => {
logger.info(`[UTXOSyncer] Job active: ${job.id}`);
},
onCompleted: (job) => {
logger.info(`[UTXOSyncer] Job completed: ${job.id}`);
},
});
});
await utxoSyncer.pauseProcess();
await utxoSyncer.closeProcess();
} catch (err) {
logger.error(err);
fastify.Sentry.captureException(err);
}
},
);
done();
};

export default syncUTXOCronRoute;
Loading

0 comments on commit ecae7c6

Please sign in to comment.