-
Notifications
You must be signed in to change notification settings - Fork 4
/
eventMonitor.js
executable file
·92 lines (87 loc) · 4.12 KB
/
eventMonitor.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
const config = require('./config');
const Web3 = require('web3');
const redis = require("redis");
const getRedisFunctions = require("./functions/createRedis");
const {initMQ} = require("./functions/initMQ");
const {processBlockForEvent} = require("./functions/processBlockForEvent")
const eventNames = ["DepositEvent", "ExitStartedEvent", "DepositWithdrawStartedEvent"];
const fastEventNames = ["ExitStartedEvent", "DepositWithdrawStartedEvent"];
const slowEventNames = ["DepositEvent"];
async function startBlockProcessing() {
// init MQ and start the loop
console.log("Creating redis")
console.log(JSON.stringify(config.redis));
const redisClient = redis.createClient(config.redis);
console.log("Initializing message queue")
const mq = await initMQ(config.redis, eventNames)
console.log("Getting the block number to start with")
const redisFunctions = getRedisFunctions(redisClient);
const {redisGet, redisSet, redisExists} = redisFunctions;
let exists = await redisExists("fromBlock");
if (!exists) {
console.log("Writing default starting block " + config.fromBlock)
await redisSet("fromBlock", config.fromBlock)
}
let fromBlock = await redisGet("fromBlock");
console.log("Last processed block from persistance is " + fromBlock)
fromBlock = Number.parseInt(fromBlock, 10);
if (fromBlock === undefined || isNaN(fromBlock)) {
console.log("Fallback, starting from block 1")
fromBlock = Number.parseInt(config.fromBlock, 10);
// fromBlock = 1
}
console.log("Getting contract details")
const contractDetails = await config.contractDetails();
const web3 = new Web3(config.ethNodeAddress);
const PlasmaContract = new web3.eth.Contract(contractDetails.abi, contractDetails.address);
console.log("Starting from block " + fromBlock)
processBlockForEvents(fromBlock)().then((_dispose) => {
console.log("Started block processing loop");
});
function processBlockForEvents(previousBlockNumber) {
return async function() {
try{
let lastblock = await web3.eth.getBlockNumber();
// console.log("Last Ethereum block " + lastblock)
if (lastblock > previousBlockNumber) {
lastblock = previousBlockNumber + 1;
let blockToProcess = lastblock;
if (blockToProcess <= 1) {
blockToProcess = 1
}
console.log("Processing block " + blockToProcess);
await processBlock(blockToProcess)();
await redisSet("fromBlock", lastblock);
setTimeout(processBlockForEvents(lastblock), 1000);
return;
} else {
setTimeout(processBlockForEvents(lastblock), 10000);
return;
}
}
catch(error) {
console.log("Error processing block for events : " + error);
if (error.name == "Submitting too far in future") {
setImmediate(processBlock(error.message));
return;
}
setImmediate(processBlockForEvents(previousBlockNumber));
}
}
}
function processBlock(blockNumber) {
return async function() {
for (const eventName of fastEventNames) {
const toProcess = blockNumber - config.blocks_shift;
const numProcessed = await processBlockForEvent(toProcess, eventName, PlasmaContract, mq)
console.log("Processed " + numProcessed + " of events " + eventName + " in block " + toProcess)
}
for (const eventName of slowEventNames) {
const toProcess = blockNumber - config.deposit_blocks_shift;
const numProcessed = await processBlockForEvent(toProcess, eventName, PlasmaContract, mq)
console.log("Processed " + numProcessed + " of events " + eventName + " in block " + toProcess)
}
}
}
}
startBlockProcessing().catch(err => { console.log(err); process.exit(1); });