Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add data cache for UTXO and RGB++ asset cells #132

Merged
merged 22 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0d0d90a
feat(rgbpp): add RgbppCollector for collect rgbpp assets cells from utxo
ahonn May 9, 2024
3dc15a2
feat(rgbpp): add rgbpp assets cells data cache, response cache data a…
ahonn May 9, 2024
ddae59f
refactor: add BaseQueueWorker class
ahonn May 9, 2024
245c532
feat: add UTXOSyncer for cache and update btc address's utxo
ahonn May 9, 2024
912315e
feat: add exponential backoff repeat strategy for sync utxo job queue
ahonn May 13, 2024
b305cf2
feat: add some random delay to avoid all jobs being processed at the …
ahonn May 13, 2024
6c9fe4b
fix: remove useless repeat job startDate option
ahonn May 13, 2024
ba33715
feat: add DataCache class to manage cache data
ahonn May 13, 2024
6a47cc9
feat: add repeatable job expire time
ahonn May 13, 2024
1470e22
feat: add data cache expire time and update repeat job end date
ahonn May 14, 2024
80a21f2
feat: use CKB RPC batch request to get rgbpp cells by utxos
ahonn May 14, 2024
536bb57
refactor: remove performance time log
ahonn May 14, 2024
08513f4
refactor: split utxos into buckets
ahonn May 14, 2024
df3e3cd
feat: add new env vars for enable/disable utxo/rgbpp data cache
ahonn May 14, 2024
af95bca
feat: add sentry custom Instrumentation
ahonn May 15, 2024
4321124
test: add RgbppCollector & UTXOSyncer unit tests
ahonn May 15, 2024
c3d6360
feat: add new cron task routes for serverless deployment
ahonn May 15, 2024
9b7756d
feat: register syncUTXOCronRoute and collectRgbppCellsCronRoute
ahonn May 15, 2024
3570fac
feat: optimize rgbpp assets routes
ahonn May 15, 2024
036643a
fix: fix typo
ahonn May 16, 2024
3fe50e6
test: update tests
ahonn May 16, 2024
9dd1b7c
chore: add todo comments
ahonn May 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/@types/fastify/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import Paymaster from '../../services/paymaster';
import SPVClient from '../../services/spv';
import CKBClient from '../../services/ckb';
import BitcoinClient from '../../services/bitcoin';
import RgbppCollector from '../../services/rgbpp';
import UTXOSyncer from '../../services/utxo';

declare module 'fastify' {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
Expand All @@ -15,5 +17,7 @@ declare module 'fastify' {
spv: SPVClient;
paymaster: Paymaster;
transactionProcessor: TransactionProcessor;
rgbppCollector: RgbppCollector;
utxoSyncer: UTXOSyncer;
}
}
6 changes: 6 additions & 0 deletions src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import Unlocker from './services/unlocker';
import SPVClient from './services/spv';
import CKBClient from './services/ckb';
import BitcoinClient from './services/bitcoin';
import RgbppCollector from './services/rgbpp';
import UTXOSyncer from './services/utxo';

export interface Cradle {
env: typeof env;
Expand All @@ -19,6 +21,8 @@ export interface Cradle {
paymaster: Paymaster;
unlocker: Unlocker;
transactionProcessor: TransactionProcessor;
rgbppCollector: RgbppCollector;
utxoSyncer: UTXOSyncer;
}

const container = createContainer<Cradle>({
Expand All @@ -40,6 +44,8 @@ container.register({
paymaster: asClass(Paymaster).singleton(),
transactionProcessor: asClass(TransactionProcessor).singleton(),
unlocker: asClass(Unlocker).singleton(),
rgbppCollector: asClass(RgbppCollector).singleton(),
utxoSyncer: asClass(UTXOSyncer).singleton(),
});

export default container;
42 changes: 42 additions & 0 deletions src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,48 @@ const envSchema = z
.enum(['true', 'false'])
.default('false')
.transform((value) => value === 'true'),

/**
* UTXO sync data cache enable flag, used to cache the UTXO sync data
* enable by default
*/
UTXO_SYNC_DATA_CACHE_ENABLE: z
.enum(['true', 'false'])
.default('true')
.transform((value) => value === 'true'),
/**
* UTXO sync repeat base duration, used to set the UTXO sync repeat interval
* repeat job start interval is 10 seconds by default
*/
UTXO_SYNC_REPEAT_BASE_DURATION: z.coerce.number().default(10 * 1000),
/**
* UTXO sync repeat max duration, used to maximum the UTXO sync repeat interval
* 1 hour by default
*/
UTXO_SYNC_REPEAT_MAX_DURATION: z.coerce.number().default(60 * 60 * 1000),
/**
* UTXO sync repeat expired duration, used to remove the expired UTXO sync job
* 336 hours by default
*/
UTXO_SYNC_REPEAT_EXPRIED_DURATION: z.coerce.number().default(336 * 60 * 60 * 1000),
/**
* UTXO sync data cache expire duration, used to cache the UTXO sync data
* 30 minutes by default
*/
UTXO_SYNC_DATA_CACHE_EXPIRE: z.coerce.number().default(30 * 60 * 1000),

/**
* RGB++ collect data cache enable flag, used to cache the RGB++ collect data
* enable by default
*/
RGBPP_COLLECT_DATA_CACHE_ENABLE: z
.enum(['true', 'false'])
.default('true')
.transform((value) => value === 'true'),
/**
* RGB++ collect data cache expire duration, used to cache the RGB++ collect data
*/
RGBPP_COLLECT_DATA_CACHE_EXPIRE: z.coerce.number().default(30 * 60 * 1000),
})
.and(
z.union([
Expand Down
45 changes: 43 additions & 2 deletions src/plugins/cron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import TransactionProcessor from '../services/transaction';
import cron from 'fastify-cron';
import { Env } from '../env';
import Unlocker from '../services/unlocker';
import RgbppCollector from '../services/rgbpp';
import UTXOSyncer from '../services/utxo';

export default fp(async (fastify) => {
try {
Expand Down Expand Up @@ -48,10 +50,10 @@ export default fp(async (fastify) => {
fastify.addHook('onReady', async () => {
transactionProcessor.startProcess({
onActive: (job) => {
fastify.log.info(`Job active: ${job.id}`);
fastify.log.info(`[TransactionProcessor] job active: ${job.id}`);
},
onCompleted: (job) => {
fastify.log.info(`Job completed: ${job.id}`);
fastify.log.info(`[TransactionProcessor] job completed: ${job.id}`);
},
});
});
Expand All @@ -78,6 +80,45 @@ export default fp(async (fastify) => {
},
};

if (env.UTXO_SYNC_DATA_CACHE_ENABLE) {
const utxoSyncer: UTXOSyncer = fastify.container.resolve('utxoSyncer');
fastify.addHook('onReady', async () => {
utxoSyncer.startProcess({
onActive: (job) => {
fastify.log.info(`[UTXOSyncer] job active: ${job.id}`);
},
onCompleted: async (job) => {
fastify.log.info(`[UTXOSyncer] job completed: ${job.id}`);
if (env.RGBPP_COLLECT_DATA_CACHE_ENABLE) {
const { btcAddress, utxos } = job.returnvalue;
const rgbppCollector: RgbppCollector = fastify.container.resolve('rgbppCollector');
await rgbppCollector.enqueueCollectJob(btcAddress, utxos, true);
}
},
});
});
fastify.addHook('onClose', async () => {
utxoSyncer.closeProcess();
});
}

if (env.RGBPP_COLLECT_DATA_CACHE_ENABLE) {
const rgbppCollector: RgbppCollector = fastify.container.resolve('rgbppCollector');
fastify.addHook('onReady', async () => {
rgbppCollector.startProcess({
onActive: (job) => {
fastify.log.info(`[RgbppCollector] job active: ${job.id}`);
},
onCompleted: (job) => {
fastify.log.info(`[RgbppCollector] job completed: ${job.id}`);
},
});
});
fastify.addHook('onClose', async () => {
rgbppCollector.closeProcess();
Flouse marked this conversation as resolved.
Show resolved Hide resolved
});
}

// processing unlock BTC_TIME_LOCK cells
const unlocker: Unlocker = fastify.container.resolve('unlocker');
const monitorSlug = env.UNLOCKER_MONITOR_SLUG;
Expand Down
14 changes: 13 additions & 1 deletion src/routes/bitcoin/address.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import { Balance, Transaction, UTXO } from './types';
import validateBitcoinAddress from '../../utils/validators';
import { ZodTypeProvider } from 'fastify-type-provider-zod';
import z from 'zod';
import { Env } from '../../env';

const addressRoutes: FastifyPluginCallback<Record<never, never>, Server, ZodTypeProvider> = (fastify, _, done) => {
const env: Env = fastify.container.resolve('env');

fastify.addHook('preHandler', async (request) => {
const { address } = request.params as { address: string };
const valid = validateBitcoinAddress(address);
Expand Down Expand Up @@ -83,7 +86,16 @@ const addressRoutes: FastifyPluginCallback<Record<never, never>, Server, ZodType
async function (request) {
const { address } = request.params;
const { only_confirmed, min_satoshi } = request.query;
let utxos = await fastify.bitcoin.getAddressTxsUtxo({ address });

let utxosCache = null;
if (env.UTXO_SYNC_DATA_CACHE_ENABLE) {
utxosCache = await fastify.utxoSyncer.getUTXOsFromCache(address);
await fastify.utxoSyncer.enqueueSyncJob(address);
}
let utxos = utxosCache ? utxosCache : await fastify.bitcoin.getAddressTxsUtxo({ address });
if (utxosCache) {
fastify.log.debug(`[UTXO] get utxos from cache: ${address}`);
}

// compatible with the case where only_confirmed is undefined
if (only_confirmed === 'true' || only_confirmed === 'undefined') {
Expand Down
2 changes: 2 additions & 0 deletions src/routes/bitcoin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import container from '../../container';
import { ZodTypeProvider } from 'fastify-type-provider-zod';
import BitcoinClient from '../../services/bitcoin';
import feesRoutes from './fees';
import UTXOSyncer from '../../services/utxo';

const bitcoinRoutes: FastifyPluginCallback<Record<never, never>, Server, ZodTypeProvider> = (fastify, _, done) => {
fastify.decorate('bitcoin', container.resolve<BitcoinClient>('bitcoin'));
fastify.decorate('utxoSyncer', container.resolve<UTXOSyncer>('utxoSyncer'));

fastify.register(infoRoute);
fastify.register(blockRoutes, { prefix: '/block' });
Expand Down
48 changes: 48 additions & 0 deletions src/routes/cron/collect-rgbpp-cells.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import pino from 'pino';
import { FastifyPluginCallback } from 'fastify';
import { Server } from 'http';
import { ZodTypeProvider } from 'fastify-type-provider-zod';
import container from '../../container';
import { VERCEL_MAX_DURATION } from '../../constants';
import RgbppCollector from '../../services/rgbpp';

const collectRgbppCellsCronRoute: FastifyPluginCallback<Record<never, never>, Server, ZodTypeProvider> = (
fastify,
_,
done,
) => {
fastify.get(
'/collect-rgbpp-cells',
{
schema: {
tags: ['Cron Task'],
description: 'Run UTXO sync cron task to update data cache, used for serverless deployment',
},
},
async () => {
const logger = container.resolve<pino.BaseLogger>('logger');
const rgbppCollector: RgbppCollector = container.resolve('rgbppCollector');
try {
await new Promise((resolve) => {
setTimeout(resolve, (VERCEL_MAX_DURATION - 10) * 1000);
rgbppCollector.startProcess({
onActive: (job) => {
logger.info(`[rgbppCollector] Job active: ${job.id}`);
},
onCompleted: (job) => {
logger.info(`[rgbppCollector] Job completed: ${job.id}`);
},
});
});
await rgbppCollector.pauseProcess();
await rgbppCollector.closeProcess();
} catch (err) {
logger.error(err);
fastify.Sentry.captureException(err);
}
},
);
done();
};

export default collectRgbppCellsCronRoute;
14 changes: 9 additions & 5 deletions src/routes/cron/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import { FastifyPluginCallback } from "fastify";
import { ZodTypeProvider } from "fastify-type-provider-zod";
import { Server } from "http";
import processTransactionsCronRoute from "./process-transactions";
import unlockCellsCronRoute from "./unlock-cells";
import { FastifyPluginCallback } from 'fastify';
import { ZodTypeProvider } from 'fastify-type-provider-zod';
import { Server } from 'http';
import processTransactionsCronRoute from './process-transactions';
import unlockCellsCronRoute from './unlock-cells';
import syncUTXOCronRoute from './sync-utxo';
import collectRgbppCellsCronRoute from './collect-rgbpp-cells';

const cronRoutes: FastifyPluginCallback<Record<never, never>, Server, ZodTypeProvider> = (fastify, _, done) => {
fastify.register(processTransactionsCronRoute);
fastify.register(unlockCellsCronRoute);
fastify.register(syncUTXOCronRoute);
fastify.register(collectRgbppCellsCronRoute);
done();
};

Expand Down
4 changes: 2 additions & 2 deletions src/routes/cron/process-transactions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ const processTransactionsCronRoute: FastifyPluginCallback<Record<never, never>,
setTimeout(resolve, (VERCEL_MAX_DURATION - 10) * 1000);
transactionProcessor.startProcess({
onActive: (job) => {
logger.info(`Job active: ${job.id}`);
logger.info(`[TransactionProcessor] Job active: ${job.id}`);
},
onCompleted: (job) => {
logger.info(`Job completed: ${job.id}`);
logger.info(`[TransactionProcessor] Job completed: ${job.id}`);
},
});
});
Expand Down
44 changes: 44 additions & 0 deletions src/routes/cron/sync-utxo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import pino from 'pino';
import { FastifyPluginCallback } from 'fastify';
import { Server } from 'http';
import { ZodTypeProvider } from 'fastify-type-provider-zod';
import container from '../../container';
import { VERCEL_MAX_DURATION } from '../../constants';
import UTXOSyncer from '../../services/utxo';

const syncUTXOCronRoute: FastifyPluginCallback<Record<never, never>, Server, ZodTypeProvider> = (fastify, _, done) => {
fastify.get(
'/sync-utxo',
{
schema: {
tags: ['Cron Task'],
description: 'Run UTXO sync cron task to update data cache, used for serverless deployment',
},
},
async () => {
const logger = container.resolve<pino.BaseLogger>('logger');
const utxoSyncer: UTXOSyncer = container.resolve('utxoSyncer');
try {
await new Promise((resolve) => {
setTimeout(resolve, (VERCEL_MAX_DURATION - 10) * 1000);
utxoSyncer.startProcess({
onActive: (job) => {
logger.info(`[UTXOSyncer] Job active: ${job.id}`);
},
onCompleted: (job) => {
logger.info(`[UTXOSyncer] Job completed: ${job.id}`);
},
});
});
await utxoSyncer.pauseProcess();
await utxoSyncer.closeProcess();
} catch (err) {
logger.error(err);
fastify.Sentry.captureException(err);
}
},
);
done();
};

export default syncUTXOCronRoute;
Loading
Loading