Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "feat: adding ack to block produced websocket 🔌" #852

Merged
merged 5 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cli/lib/nf3.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ class Nf3 {
};
connection.onmessage = async message => {
const msg = JSON.parse(message.data);
const { id, type, txDataToSign, block, transactions } = msg;
const { type, txDataToSign, block, transactions } = msg;
logger.debug(`Proposer received websocket message of type ${type}`);
if (type === 'block') {
proposerQueue.push(async () => {
Expand All @@ -913,8 +913,6 @@ class Nf3 {
blockProposeEmitter.emit('error', err, block, transactions);
}
});

connection.send(id);
}
return null;
};
Expand Down
102 changes: 0 additions & 102 deletions common-files/utils/websocket.mjs

This file was deleted.

2 changes: 1 addition & 1 deletion nightfall-optimist/src/event-handlers/chain-reorg.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ import {
deleteTreeByBlockNumberL2,
deleteNullifiersForBlock,
} from '../services/database.mjs';
import { waitForContract } from '../utils/index.mjs';
import { waitForContract } from './subscribe.mjs';

const { STATE_CONTRACT_NAME } = constants;

Expand Down
17 changes: 15 additions & 2 deletions nightfall-optimist/src/event-handlers/index.mjs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { startEventQueue } from '../utils/index.mjs';
import {
startEventQueue,
subscribeToBlockAssembledWebSocketConnection,
subscribeToChallengeWebSocketConnection,
subscribeToInstantWithDrawalWebSocketConnection,
subscribeToProposedBlockWebSocketConnection,
} from './subscribe.mjs';
import blockProposedEventHandler from './block-proposed.mjs';
import newCurrentProposerEventHandler from './new-current-proposer.mjs';
import transactionSubmittedEventHandler from './transaction-submitted.mjs';
Expand Down Expand Up @@ -37,4 +43,11 @@ const eventHandlers = {
},
};

export { startEventQueue, eventHandlers };
export {
startEventQueue,
subscribeToBlockAssembledWebSocketConnection,
subscribeToChallengeWebSocketConnection,
subscribeToInstantWithDrawalWebSocketConnection,
subscribeToProposedBlockWebSocketConnection,
eventHandlers,
};
158 changes: 158 additions & 0 deletions nightfall-optimist/src/event-handlers/subscribe.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/* eslint-disable no-await-in-loop */

/**
* Module to subscribe to blockchain events
*/
import WebSocket from 'ws';
import config from 'config';
import logger from 'common-files/utils/logger.mjs';
import { getContractInstance, getContractAddress } from 'common-files/utils/contract.mjs';
import constants from 'common-files/constants/index.mjs';

const {
PROPOSERS_CONTRACT_NAME,
SHIELD_CONTRACT_NAME,
CHALLENGES_CONTRACT_NAME,
STATE_CONTRACT_NAME,
} = constants;
const { RETRIES, WEBSOCKET_PORT, WEBSOCKET_PING_TIME } = config;
const wss = new WebSocket.Server({ port: WEBSOCKET_PORT });

/**
Function that does some standardised setting up of a websocket's events.
It logs open, close and error events, sets up a ping and logs the pong. It will
close the socket on pong failure. The user is expected to handle the reconnect.
It does not set up the onmessage event because this tends to be case-specific.
*/
function setupWebsocketEvents(ws, socketName) {
let timeoutID;
// setup a pinger to ping the websocket correspondent
const intervalID = setInterval(() => {
ws.ping();
// set up a timeout - will close the websocket, which will trigger a reconnect
timeoutID = setTimeout(() => {
logger.warn(`Timed out waiting for ping response from ${socketName}`);
ws.terminate();
}, 2 * WEBSOCKET_PING_TIME);
}, WEBSOCKET_PING_TIME);
// check we received a pong in time (clears the timer set by the pinger)
ws.on('pong', () => {
// logger.debug(`Got pong from ${socketName} websocket`);
clearTimeout(timeoutID);
});
ws.on('error', () => {
logger.debug(`ERROR ${socketName}`);
});
ws.on('open', () => {
logger.debug(`OPEN ${socketName}`);
});
ws.on('close', err => {
logger.debug(`CLOSE ${socketName} ${err}`);
clearInterval(intervalID);
});
}

/**
* Function that tries to get a (named) contract instance and, if it fails, will
* retry after 3 seconds. After RETRIES attempts, it will give up and throw.
* This is useful in case nightfall-optimist comes up before the contract
* is fully deployed.
*/
export async function waitForContract(contractName) {
let errorCount = 0;
let error;
let instance;
while (errorCount < RETRIES) {
try {
error = undefined;
const address = await getContractAddress(contractName);
if (address === undefined) throw new Error(`${contractName} contract address was undefined`);
instance = getContractInstance(contractName, address);
return instance;
} catch (err) {
error = err;
errorCount++;
logger.warn(`Unable to get a ${contractName} contract instance will try again in 3 seconds`);
await new Promise(resolve => setTimeout(() => resolve(), 3000));
}
}
if (error) throw error;
return instance;
}

/**
*
* @param callback - The function that distributes events to the event-handler function
* @param arg - List of arguments to be passed to callback, the first element must be the event-handler functions
* @returns = List of emitters from each contract.
*/
export async function startEventQueue(callback, ...arg) {
const contractNames = [
STATE_CONTRACT_NAME,
SHIELD_CONTRACT_NAME,
CHALLENGES_CONTRACT_NAME,
PROPOSERS_CONTRACT_NAME,
];
const contracts = await Promise.all(contractNames.map(c => waitForContract(c)));
const emitters = contracts.map(e => {
const emitterC = e.events.allEvents();
emitterC.on('changed', event => callback(event, arg));
emitterC.on('data', event => callback(event, arg));
return emitterC;
});
logger.debug('Subscribed to layer 2 state events');
return emitters;
}

export async function subscribeToChallengeWebSocketConnection(callback, ...args) {
wss.on('connection', ws => {
ws.on('message', message => {
if (message === 'challenge') {
setupWebsocketEvents(ws, 'challenge');
callback(ws, args);
}
});
});
logger.debug('Subscribed to Challenge WebSocket connection');
}

export async function subscribeToBlockAssembledWebSocketConnection(callback, ...args) {
wss.on('connection', ws => {
ws.on('message', message => {
if (message === 'blocks') {
setupWebsocketEvents(ws, 'proposer');
callback(ws, args);
}
});
});
logger.debug('Subscribed to BlockAssembled WebSocket connection');
}

export async function subscribeToInstantWithDrawalWebSocketConnection(callback, ...args) {
wss.on('connection', ws => {
ws.on('message', message => {
if (message === 'instant') {
setupWebsocketEvents(ws, 'liquidity provider');
callback(ws, args);
}
});
});
logger.debug('Subscribed to InstantWithDrawal WebSocket connection');
}

export async function subscribeToProposedBlockWebSocketConnection(callback, ...args) {
wss.on('connection', ws => {
ws.on('message', message => {
try {
if (JSON.parse(message).type === 'sync') {
logger.info(`SUBSCRIBING TO PROPOSEDBLOCK`);
setupWebsocketEvents(ws, 'publisher');
callback(ws, args);
}
} catch (error) {
logger.debug('Not JSON Message');
}
});
});
logger.debug('Subscribed to ProposedBlock WebSocket connection');
}
31 changes: 12 additions & 19 deletions nightfall-optimist/src/index.mjs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import logger from 'common-files/utils/logger.mjs';
import { queueManager, queues, enqueueEvent } from 'common-files/utils/event-queue.mjs';
import { NFWebsocket } from 'common-files/utils/websocket.mjs';
import config from 'config';
import app from './app.mjs';
import { startEventQueue, eventHandlers } from './event-handlers/index.mjs';
import {
startEventQueue,
subscribeToBlockAssembledWebSocketConnection,
subscribeToChallengeWebSocketConnection,
subscribeToInstantWithDrawalWebSocketConnection,
subscribeToProposedBlockWebSocketConnection,
eventHandlers,
} from './event-handlers/index.mjs';
import Proposer from './classes/proposer.mjs';
import {
setBlockAssembledWebSocketConnection,
Expand All @@ -15,27 +20,15 @@ import { setInstantWithdrawalWebSocketConnection } from './services/instant-with
import { setProposer } from './routes/proposer.mjs';
import { setBlockProposedWebSocketConnection } from './event-handlers/block-proposed.mjs';

const { WEBSOCKET_PORT, WEBSOCKET_PING_TIME } = config;

const main = async () => {
try {
const proposer = new Proposer();
setProposer(proposer); // passes the proposer instance int the proposer routes
// subscribe to WebSocket events first

const ws = new NFWebsocket({ port: WEBSOCKET_PORT, pingTime: WEBSOCKET_PING_TIME });

ws.subscribe({ topic: 'challenge', socketName: 'challenge' }, setChallengeWebSocketConnection);
ws.subscribe({ topic: 'blocks', socketName: 'proposer' }, setBlockAssembledWebSocketConnection);
ws.subscribe(
{ topic: 'instant', socketName: 'liquidity provider' },
setInstantWithdrawalWebSocketConnection,
);
ws.subscribe(
{ topic: 'sync', socketName: 'publisher', filter: 'type' },
setBlockProposedWebSocketConnection,
);

await subscribeToBlockAssembledWebSocketConnection(setBlockAssembledWebSocketConnection);
await subscribeToChallengeWebSocketConnection(setChallengeWebSocketConnection);
await subscribeToInstantWithDrawalWebSocketConnection(setInstantWithdrawalWebSocketConnection);
await subscribeToProposedBlockWebSocketConnection(setBlockProposedWebSocketConnection);
// start the event queue
await startEventQueue(queueManager, eventHandlers, proposer);
// enqueue the block-assembler every time the queue becomes empty
Expand Down
2 changes: 1 addition & 1 deletion nightfall-optimist/src/routes/proposer.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
getLatestTree,
getLatestBlockInfo,
} from '../services/database.mjs';
import { waitForContract } from '../utils/index.mjs';
import { waitForContract } from '../event-handlers/subscribe.mjs';
import transactionSubmittedEventHandler from '../event-handlers/transaction-submitted.mjs';
import getProposers from '../services/proposer.mjs';

Expand Down
Loading