diff --git a/cli/lib/nf3.mjs b/cli/lib/nf3.mjs index 5b4035e04..de71ea2ec 100644 --- a/cli/lib/nf3.mjs +++ b/cli/lib/nf3.mjs @@ -898,7 +898,7 @@ class Nf3 { }; connection.onmessage = async message => { const msg = JSON.parse(message.data); - const { id, type, txDataToSign, block, transactions } = msg; + const { type, txDataToSign, block, transactions } = msg; logger.debug(`Proposer received websocket message of type ${type}`); if (type === 'block') { proposerQueue.push(async () => { @@ -913,8 +913,6 @@ class Nf3 { blockProposeEmitter.emit('error', err, block, transactions); } }); - - connection.send(id); } return null; }; diff --git a/common-files/utils/websocket.mjs b/common-files/utils/websocket.mjs deleted file mode 100644 index baa19a867..000000000 --- a/common-files/utils/websocket.mjs +++ /dev/null @@ -1,102 +0,0 @@ -/* ignore unused exports */ -import WebSocket from 'ws'; -import logger from './logger.mjs'; - -export class NFWebsocket { - constructor({ port, pingTime }) { - this.wss = new WebSocket.Server({ port }); - this.pingTime = pingTime; - } - - /** - Function that does some standardised setting up of a websocket's events. - It logs open, close and error events, sets up a ping and logs the pong. It will - close the socket on pong failure. The user is expected to handle the reconnect. - It does not set up the onmessage event because this tends to be case-specific. - */ - setupWebsocketEvents(ws, socketName) { - let timeoutID; - // setup a pinger to ping the websocket correspondent - const intervalID = setInterval(() => { - ws.ping(); - // set up a timeout - will close the websocket, which will trigger a reconnect - timeoutID = setTimeout(() => { - logger.warn(`Timed out waiting for ping response from ${socketName}`); - this.ws.terminate(); - }, 2 * this.pingTime); - }, this.pingTime); - // check we received a pong in time (clears the timer set by the pinger) - ws.on('pong', () => { - // logger.debug(`Got pong from ${socketName} websocket`); - clearTimeout(timeoutID); - }); - ws.on('error', () => { - logger.debug(`ERROR ${socketName}`); - }); - ws.on('open', () => { - logger.debug(`OPEN ${socketName}`); - }); - ws.on('close', err => { - logger.debug(`CLOSE ${socketName} ${err}`); - clearInterval(intervalID); - }); - } - - subscribe({ topic, socketName, filter }, callback) { - this.wss.on('connection', ws => { - console.log('connected', socketName); - ws.on('message', message => { - console.log('msg', message); - try { - if (filter === 'type' && JSON.parse(message).type === topic) { - logger.info(`SUBSCRIBING TO PROPOSEDBLOCK`); - this.setupWebsocketEvents(ws, socketName); - callback(ws); - } else if (message === topic) { - this.setupWebsocketEvents(ws, socketName); - callback(ws); - } - } catch (err) { - logger.debug('Unrecognized websocket message', message); - } - }); - - logger.debug(`Subscribed to ${socketName} WebSocket connection`); - }); - } -} - -export function submitBlockToWS(ws, data, id) { - const message = JSON.stringify({ - id, - ...data, - }); - - return new Promise(function (resolve) { - let ack = false; - ws.once('message', function (msg) { - if (msg === id) ack = true; - }); - - const retry = () => ws.send(message); - const delay = () => - new Promise((_, reject) => { - setTimeout(reject, 10000); - }); - const checkAck = () => { - if (ack) return ack; - throw ack; - }; - - let p = Promise.reject(); - for (let i = 0; i < 2; i++) { - p = p - .catch(retry) - .then(() => { - checkAck(); - resolve(); - }) - .catch(delay); - } - }); -} diff --git a/nightfall-optimist/src/event-handlers/chain-reorg.mjs b/nightfall-optimist/src/event-handlers/chain-reorg.mjs index 678849afc..e6bb90272 100644 --- a/nightfall-optimist/src/event-handlers/chain-reorg.mjs +++ b/nightfall-optimist/src/event-handlers/chain-reorg.mjs @@ -66,7 +66,7 @@ import { deleteTreeByBlockNumberL2, deleteNullifiersForBlock, } from '../services/database.mjs'; -import { waitForContract } from '../utils/index.mjs'; +import { waitForContract } from './subscribe.mjs'; const { STATE_CONTRACT_NAME } = constants; diff --git a/nightfall-optimist/src/event-handlers/index.mjs b/nightfall-optimist/src/event-handlers/index.mjs index 04ae4b7dc..02d31084e 100644 --- a/nightfall-optimist/src/event-handlers/index.mjs +++ b/nightfall-optimist/src/event-handlers/index.mjs @@ -1,4 +1,10 @@ -import { startEventQueue } from '../utils/index.mjs'; +import { + startEventQueue, + subscribeToBlockAssembledWebSocketConnection, + subscribeToChallengeWebSocketConnection, + subscribeToInstantWithDrawalWebSocketConnection, + subscribeToProposedBlockWebSocketConnection, +} from './subscribe.mjs'; import blockProposedEventHandler from './block-proposed.mjs'; import newCurrentProposerEventHandler from './new-current-proposer.mjs'; import transactionSubmittedEventHandler from './transaction-submitted.mjs'; @@ -37,4 +43,11 @@ const eventHandlers = { }, }; -export { startEventQueue, eventHandlers }; +export { + startEventQueue, + subscribeToBlockAssembledWebSocketConnection, + subscribeToChallengeWebSocketConnection, + subscribeToInstantWithDrawalWebSocketConnection, + subscribeToProposedBlockWebSocketConnection, + eventHandlers, +}; diff --git a/nightfall-optimist/src/event-handlers/subscribe.mjs b/nightfall-optimist/src/event-handlers/subscribe.mjs new file mode 100644 index 000000000..b929e0153 --- /dev/null +++ b/nightfall-optimist/src/event-handlers/subscribe.mjs @@ -0,0 +1,158 @@ +/* eslint-disable no-await-in-loop */ + +/** + * Module to subscribe to blockchain events + */ +import WebSocket from 'ws'; +import config from 'config'; +import logger from 'common-files/utils/logger.mjs'; +import { getContractInstance, getContractAddress } from 'common-files/utils/contract.mjs'; +import constants from 'common-files/constants/index.mjs'; + +const { + PROPOSERS_CONTRACT_NAME, + SHIELD_CONTRACT_NAME, + CHALLENGES_CONTRACT_NAME, + STATE_CONTRACT_NAME, +} = constants; +const { RETRIES, WEBSOCKET_PORT, WEBSOCKET_PING_TIME } = config; +const wss = new WebSocket.Server({ port: WEBSOCKET_PORT }); + +/** +Function that does some standardised setting up of a websocket's events. +It logs open, close and error events, sets up a ping and logs the pong. It will +close the socket on pong failure. The user is expected to handle the reconnect. +It does not set up the onmessage event because this tends to be case-specific. +*/ +function setupWebsocketEvents(ws, socketName) { + let timeoutID; + // setup a pinger to ping the websocket correspondent + const intervalID = setInterval(() => { + ws.ping(); + // set up a timeout - will close the websocket, which will trigger a reconnect + timeoutID = setTimeout(() => { + logger.warn(`Timed out waiting for ping response from ${socketName}`); + ws.terminate(); + }, 2 * WEBSOCKET_PING_TIME); + }, WEBSOCKET_PING_TIME); + // check we received a pong in time (clears the timer set by the pinger) + ws.on('pong', () => { + // logger.debug(`Got pong from ${socketName} websocket`); + clearTimeout(timeoutID); + }); + ws.on('error', () => { + logger.debug(`ERROR ${socketName}`); + }); + ws.on('open', () => { + logger.debug(`OPEN ${socketName}`); + }); + ws.on('close', err => { + logger.debug(`CLOSE ${socketName} ${err}`); + clearInterval(intervalID); + }); +} + +/** + * Function that tries to get a (named) contract instance and, if it fails, will + * retry after 3 seconds. After RETRIES attempts, it will give up and throw. + * This is useful in case nightfall-optimist comes up before the contract + * is fully deployed. + */ +export async function waitForContract(contractName) { + let errorCount = 0; + let error; + let instance; + while (errorCount < RETRIES) { + try { + error = undefined; + const address = await getContractAddress(contractName); + if (address === undefined) throw new Error(`${contractName} contract address was undefined`); + instance = getContractInstance(contractName, address); + return instance; + } catch (err) { + error = err; + errorCount++; + logger.warn(`Unable to get a ${contractName} contract instance will try again in 3 seconds`); + await new Promise(resolve => setTimeout(() => resolve(), 3000)); + } + } + if (error) throw error; + return instance; +} + +/** + * + * @param callback - The function that distributes events to the event-handler function + * @param arg - List of arguments to be passed to callback, the first element must be the event-handler functions + * @returns = List of emitters from each contract. + */ +export async function startEventQueue(callback, ...arg) { + const contractNames = [ + STATE_CONTRACT_NAME, + SHIELD_CONTRACT_NAME, + CHALLENGES_CONTRACT_NAME, + PROPOSERS_CONTRACT_NAME, + ]; + const contracts = await Promise.all(contractNames.map(c => waitForContract(c))); + const emitters = contracts.map(e => { + const emitterC = e.events.allEvents(); + emitterC.on('changed', event => callback(event, arg)); + emitterC.on('data', event => callback(event, arg)); + return emitterC; + }); + logger.debug('Subscribed to layer 2 state events'); + return emitters; +} + +export async function subscribeToChallengeWebSocketConnection(callback, ...args) { + wss.on('connection', ws => { + ws.on('message', message => { + if (message === 'challenge') { + setupWebsocketEvents(ws, 'challenge'); + callback(ws, args); + } + }); + }); + logger.debug('Subscribed to Challenge WebSocket connection'); +} + +export async function subscribeToBlockAssembledWebSocketConnection(callback, ...args) { + wss.on('connection', ws => { + ws.on('message', message => { + if (message === 'blocks') { + setupWebsocketEvents(ws, 'proposer'); + callback(ws, args); + } + }); + }); + logger.debug('Subscribed to BlockAssembled WebSocket connection'); +} + +export async function subscribeToInstantWithDrawalWebSocketConnection(callback, ...args) { + wss.on('connection', ws => { + ws.on('message', message => { + if (message === 'instant') { + setupWebsocketEvents(ws, 'liquidity provider'); + callback(ws, args); + } + }); + }); + logger.debug('Subscribed to InstantWithDrawal WebSocket connection'); +} + +export async function subscribeToProposedBlockWebSocketConnection(callback, ...args) { + wss.on('connection', ws => { + ws.on('message', message => { + try { + if (JSON.parse(message).type === 'sync') { + logger.info(`SUBSCRIBING TO PROPOSEDBLOCK`); + setupWebsocketEvents(ws, 'publisher'); + callback(ws, args); + } + } catch (error) { + logger.debug('Not JSON Message'); + } + }); + }); + logger.debug('Subscribed to ProposedBlock WebSocket connection'); +} diff --git a/nightfall-optimist/src/index.mjs b/nightfall-optimist/src/index.mjs index 1f8ed1424..84d9fabfc 100644 --- a/nightfall-optimist/src/index.mjs +++ b/nightfall-optimist/src/index.mjs @@ -1,9 +1,14 @@ import logger from 'common-files/utils/logger.mjs'; import { queueManager, queues, enqueueEvent } from 'common-files/utils/event-queue.mjs'; -import { NFWebsocket } from 'common-files/utils/websocket.mjs'; -import config from 'config'; import app from './app.mjs'; -import { startEventQueue, eventHandlers } from './event-handlers/index.mjs'; +import { + startEventQueue, + subscribeToBlockAssembledWebSocketConnection, + subscribeToChallengeWebSocketConnection, + subscribeToInstantWithDrawalWebSocketConnection, + subscribeToProposedBlockWebSocketConnection, + eventHandlers, +} from './event-handlers/index.mjs'; import Proposer from './classes/proposer.mjs'; import { setBlockAssembledWebSocketConnection, @@ -15,27 +20,15 @@ import { setInstantWithdrawalWebSocketConnection } from './services/instant-with import { setProposer } from './routes/proposer.mjs'; import { setBlockProposedWebSocketConnection } from './event-handlers/block-proposed.mjs'; -const { WEBSOCKET_PORT, WEBSOCKET_PING_TIME } = config; - const main = async () => { try { const proposer = new Proposer(); setProposer(proposer); // passes the proposer instance int the proposer routes // subscribe to WebSocket events first - - const ws = new NFWebsocket({ port: WEBSOCKET_PORT, pingTime: WEBSOCKET_PING_TIME }); - - ws.subscribe({ topic: 'challenge', socketName: 'challenge' }, setChallengeWebSocketConnection); - ws.subscribe({ topic: 'blocks', socketName: 'proposer' }, setBlockAssembledWebSocketConnection); - ws.subscribe( - { topic: 'instant', socketName: 'liquidity provider' }, - setInstantWithdrawalWebSocketConnection, - ); - ws.subscribe( - { topic: 'sync', socketName: 'publisher', filter: 'type' }, - setBlockProposedWebSocketConnection, - ); - + await subscribeToBlockAssembledWebSocketConnection(setBlockAssembledWebSocketConnection); + await subscribeToChallengeWebSocketConnection(setChallengeWebSocketConnection); + await subscribeToInstantWithDrawalWebSocketConnection(setInstantWithdrawalWebSocketConnection); + await subscribeToProposedBlockWebSocketConnection(setBlockProposedWebSocketConnection); // start the event queue await startEventQueue(queueManager, eventHandlers, proposer); // enqueue the block-assembler every time the queue becomes empty diff --git a/nightfall-optimist/src/routes/proposer.mjs b/nightfall-optimist/src/routes/proposer.mjs index d9073dea0..0ced53bc8 100644 --- a/nightfall-optimist/src/routes/proposer.mjs +++ b/nightfall-optimist/src/routes/proposer.mjs @@ -20,7 +20,7 @@ import { getLatestTree, getLatestBlockInfo, } from '../services/database.mjs'; -import { waitForContract } from '../utils/index.mjs'; +import { waitForContract } from '../event-handlers/subscribe.mjs'; import transactionSubmittedEventHandler from '../event-handlers/transaction-submitted.mjs'; import getProposers from '../services/proposer.mjs'; diff --git a/nightfall-optimist/src/services/block-assembler.mjs b/nightfall-optimist/src/services/block-assembler.mjs index ec7eaeeea..b7db4b904 100644 --- a/nightfall-optimist/src/services/block-assembler.mjs +++ b/nightfall-optimist/src/services/block-assembler.mjs @@ -7,7 +7,6 @@ from posted transactions and proposes these blocks. import WebSocket from 'ws'; import config from 'config'; import logger from 'common-files/utils/logger.mjs'; -import { submitBlockToWS } from 'common-files/utils/websocket.mjs'; import constants from 'common-files/constants/index.mjs'; import { removeTransactionsFromMemPool, @@ -16,7 +15,7 @@ import { } from './database.mjs'; import Block from '../classes/block.mjs'; import { Transaction } from '../classes/index.mjs'; -import { waitForContract } from '../utils/index.mjs'; +import { waitForContract } from '../event-handlers/subscribe.mjs'; import { increaseProposerWsFailed, increaseProposerWsClosed, @@ -104,18 +103,15 @@ export async function conditionalMakeBlock(proposer) { } } if (ws && ws.readyState === WebSocket.OPEN) { - logger.debug('Send unsigned block-assembler transactions to ws client'); - - submitBlockToWS( - ws, - { + await ws.send( + JSON.stringify({ type: 'block', txDataToSign: unsignedProposeBlockTransaction, block, transactions, - }, - block.blockHash, + }), ); + logger.debug('Send unsigned block-assembler transactions to ws client'); } else if (ws) { increaseProposerBlockNotSent(); logger.debug('Block not sent. Socket state', ws.readyState); diff --git a/nightfall-optimist/src/services/state-sync.mjs b/nightfall-optimist/src/services/state-sync.mjs index b51740504..36331cb16 100644 --- a/nightfall-optimist/src/services/state-sync.mjs +++ b/nightfall-optimist/src/services/state-sync.mjs @@ -10,7 +10,7 @@ import committedToChallengeEventHandler from '../event-handlers/challenge-commit import rollbackEventHandler from '../event-handlers/rollback.mjs'; import { getBlockByBlockNumberL2, getBlocks, getLatestBlockInfo } from './database.mjs'; import { stopMakingChallenges, startMakingChallenges } from './challenges.mjs'; -import { waitForContract } from '../utils/index.mjs'; +import { waitForContract } from '../event-handlers/subscribe.mjs'; // TODO can we remove these await-in-loops? diff --git a/nightfall-optimist/src/services/transaction-checker.mjs b/nightfall-optimist/src/services/transaction-checker.mjs index d1467f82a..58b14a11f 100644 --- a/nightfall-optimist/src/services/transaction-checker.mjs +++ b/nightfall-optimist/src/services/transaction-checker.mjs @@ -10,7 +10,7 @@ import gen from 'general-number'; import logger from 'common-files/utils/logger.mjs'; import constants from 'common-files/constants/index.mjs'; import { Transaction, VerificationKey, Proof, TransactionError } from '../classes/index.mjs'; -import { waitForContract } from '../utils/index.mjs'; +import { waitForContract } from '../event-handlers/subscribe.mjs'; import { getBlockByBlockNumberL2 } from './database.mjs'; import verify from './verify.mjs'; diff --git a/nightfall-optimist/src/utils/index.mjs b/nightfall-optimist/src/utils/index.mjs deleted file mode 100644 index 05a74614a..000000000 --- a/nightfall-optimist/src/utils/index.mjs +++ /dev/null @@ -1,69 +0,0 @@ -/* eslint-disable no-await-in-loop */ - -/** - * Module to subscribe to blockchain events - */ -import config from 'config'; -import logger from 'common-files/utils/logger.mjs'; -import constants from 'common-files/constants/index.mjs'; -import { getContractInstance, getContractAddress } from 'common-files/utils/contract.mjs'; - -const { RETRIES } = config; -const { - PROPOSERS_CONTRACT_NAME, - SHIELD_CONTRACT_NAME, - CHALLENGES_CONTRACT_NAME, - STATE_CONTRACT_NAME, -} = constants; - -/** - * Function that tries to get a (named) contract instance and, if it fails, will - * retry after 3 seconds. After RETRIES attempts, it will give up and throw. - * This is useful in case nightfall-optimist comes up before the contract - * is fully deployed. - */ -export async function waitForContract(contractName) { - let errorCount = 0; - let error; - let instance; - while (errorCount < RETRIES) { - try { - error = undefined; - const address = await getContractAddress(contractName); - if (address === undefined) throw new Error(`${contractName} contract address was undefined`); - instance = getContractInstance(contractName, address); - return instance; - } catch (err) { - error = err; - errorCount++; - logger.warn(`Unable to get a ${contractName} contract instance will try again in 3 seconds`); - await new Promise(resolve => setTimeout(() => resolve(), 3000)); - } - } - if (error) throw error; - return instance; -} - -/** - * - * @param callback - The function that distributes events to the event-handler function - * @param arg - List of arguments to be passed to callback, the first element must be the event-handler functions - * @returns = List of emitters from each contract. - */ -export async function startEventQueue(callback, ...arg) { - const contractNames = [ - STATE_CONTRACT_NAME, - SHIELD_CONTRACT_NAME, - CHALLENGES_CONTRACT_NAME, - PROPOSERS_CONTRACT_NAME, - ]; - const contracts = await Promise.all(contractNames.map(c => waitForContract(c))); - const emitters = contracts.map(e => { - const emitterC = e.events.allEvents(); - emitterC.on('changed', event => callback(event, arg)); - emitterC.on('data', event => callback(event, arg)); - return emitterC; - }); - logger.debug('Subscribed to layer 2 state events'); - return emitters; -}