Skip to content

Commit

Permalink
net: get CFHeaders and Filters
Browse files Browse the repository at this point in the history
  • Loading branch information
pinheadmz committed Aug 6, 2023
1 parent d1d8e80 commit 5c002d4
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 84 deletions.
25 changes: 15 additions & 10 deletions lib/blockchain/chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -1500,9 +1500,6 @@ class Chain extends AsyncEmitter {
*/

isSlow() {
if (this.options.spv)
return false;

if (this.synced)
return true;

Expand Down Expand Up @@ -1533,13 +1530,21 @@ class Chain extends AsyncEmitter {

const elapsed = util.bench(start);

this.logger.info(
'Block %h (%d) added to chain (size=%d txs=%d time=%d).',
entry.hash,
entry.height,
block.getSize(),
block.txs.length,
elapsed);
if (this.options.spv) {
this.logger.info(
'Block header %h (%d) added to chain (time=%d).',
entry.hash,
entry.height,
elapsed);
} else {
this.logger.info(
'Block %h (%d) added to chain (size=%d txs=%d time=%d).',
entry.hash,
entry.height,
block.getSize(),
block.txs.length,
elapsed);
}
}

/**
Expand Down
111 changes: 57 additions & 54 deletions lib/net/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ class Pool extends EventEmitter {
this.connected = false;
this.disconnecting = false;
this.syncing = false;
this.filterSyncing = false;
this.discovering = false;
this.spvFilter = null;
this.txFilter = null;
Expand Down Expand Up @@ -733,23 +732,25 @@ class Pool extends EventEmitter {
* Start the filters headers sync.
*/

async startFilterHeadersSync() {
this.filterSyncing = true;
this.logger.info('Starting filter headers sync (%s).',
this.chain.options.network);
if (!this.opened || !this.connected)
startFilterHeadersSync() {
for (let peer = this.peers.head(); peer; peer = peer.next)
this._startFilterHeadersSync(peer);
}

async _startFilterHeadersSync(peer) {
if (!this.opened || !this.connected || !this.chain.synced)
return;

const cFHeaderHeight = this.cfHeaderChain.tail.height;
const startHeight = cFHeaderHeight
? cFHeaderHeight + 1 : 1;
const chainHeight = await this.chain.height;
? cFHeaderHeight + 1 : 1;
const chainHeight = this.chain.height;
const stopHeight = chainHeight - startHeight + 1 > 2000
? startHeight + 1999 : chainHeight;
const stopHash = await this.chain.getHash(stopHeight);
this.requestedFilterType = common.FILTERS.BASIC;
this.requestedStopHash = stopHash;
await this.peers.load.sendGetCFHeaders(
peer.sendGetCFHeaders(
common.FILTERS.BASIC,
startHeight,
stopHash);
Expand All @@ -759,10 +760,13 @@ class Pool extends EventEmitter {
* Start the filters sync.
*/

async startFilterSync() {
this.logger.info('Starting filter sync (%s).',
this.chain.options.network);
if (!this.opened || !this.connected)
startFilterSync() {
for (let peer = this.peers.head(); peer; peer = peer.next)
this._startFilterSync(peer);
}

async _startFilterSync(peer) {
if (!this.opened || !this.connected || !this.chain.synced)
return;

const indexer = this.getFilterIndexer(filtersByVal[common.FILTERS.BASIC]);
Expand All @@ -777,7 +781,7 @@ class Pool extends EventEmitter {
this.requestedFilterType = common.FILTERS.BASIC;
this.getcfiltersStartHeight = startHeight;
this.requestedStopHash = stopHash;
await this.peers.load.sendGetCFilters(
peer.sendGetCFilters(
common.FILTERS.BASIC,
startHeight,
stopHash);
Expand Down Expand Up @@ -2130,11 +2134,11 @@ class Pool extends EventEmitter {
}

const stopHash = packet.stopHash;
if (!stopHash.equals(this.requestedStopHash)) {
this.logger.warning('Received CFHeaders packet with wrong stopHash');
peer.increaseBan(10);
return;
}
// if (!stopHash.equals(this.requestedStopHash)) {
// this.logger.warning('Received CFHeaders packet with wrong stopHash');
// peer.increaseBan(10);
// return;
// }
let previousFilterHeader = packet.previousFilterHeader;
const filterHashes = packet.filterHashes;
let blockHeight = await this.chain.getHeight(stopHash)
Expand All @@ -2148,18 +2152,18 @@ class Pool extends EventEmitter {
const basicFilter = new BasicFilter();
basicFilter._hash = filterHash;
const filterHeader = basicFilter.header(previousFilterHeader);
const lastFilterHeader = this.cfHeaderChain.tail;

const blockHash = await this.chain.getHash(blockHeight);
const cfHeaderEntry = new CFHeaderEntry(blockHash,
filterHeader, lastFilterHeader.height + 1);
filterHeader, blockHeight);
this.cfHeaderChain.push(cfHeaderEntry);
previousFilterHeader = filterHeader;
blockHeight++;
}
this.logger.info('CFHeader height: %d', this.cfHeaderChain.tail.height);
if (this.headerChain.tail.height <= stopHeight)
this.emit('cfheaders');
else {
if (this.cfHeaderChain.tail.height >= this.chain.height) {
this.startFilterSync();
} else {
const nextStopHeight = stopHeight + 2000 < this.chain.height
? stopHeight + 2000 : this.chain.height;
const nextStopHash = await this.chain.getHash(nextStopHeight);
Expand All @@ -2171,10 +2175,10 @@ class Pool extends EventEmitter {
async handleCFilters(peer, packet) {
const filterType = packet.filterType;
const indexer = this.getFilterIndexer(filtersByVal[filterType]);
if (indexer.height % 100 === 0)
this.logger.debug(
'Received CFilter 100 packets from %s', peer.hostname()
);
// if (indexer.height % 100 === 0)
// this.logger.debug(
// 'Received CFilter 100 packets from %s', peer.hostname()
// );
if (!this.options.neutrino) {
peer.ban();
peer.destroy();
Expand All @@ -2195,28 +2199,26 @@ class Pool extends EventEmitter {
const blockHeight = await this.chain.getHeight(blockHash);
const stopHeight = await this.chain.getHeight(this.requestedStopHash);

if (!(blockHeight >= this.getcfiltersStartHeight
&& blockHeight <= stopHeight)) {
this.logger.warning('Received CFilter packet with wrong blockHeight');
peer.increaseBan(10);
return;
}
// if (!(blockHeight >= this.getcfiltersStartHeight
// && blockHeight <= stopHeight)) {
// this.logger.warning('Received CFilter packet with wrong blockHeight');
// peer.increaseBan(10);
// return;
// }

const basicFilter = new BasicFilter();
const gcsFilter = basicFilter.fromNBytes(filter);

const filterHeader = this.cfHeaderChain.head.header;
await indexer.saveFilter(blockHash, blockHeight, gcsFilter, filterHeader);
const cFilterHeight = await indexer.height;
if (cFilterHeight % 100 === 0)
this.logger.info('CFilter height: %d', cFilterHeight);
this.logger.info('CFilter height: %d', cFilterHeight);
this.emit('cfilter', blockHash, gcsFilter);
const startHeight = stopHeight + 1;
let nextStopHeight;
if (cFilterHeight === stopHeight
&& stopHeight < this.chain.height) {
if (startHeight + 1000 < this.chain.height) {
nextStopHeight = stopHeight + 1000;
if (cFilterHeight === stopHeight) {
if (stopHeight < this.cfHeaderChain.tail.height) {
nextStopHeight = this.cfHeaderChain.tail.height;
const stopHash = await this.chain.getHash(nextStopHeight);
this.getcfiltersStartHeight = startHeight;
this.requestedStopHash = stopHash;
Expand All @@ -2226,20 +2228,8 @@ class Pool extends EventEmitter {
stopHash
);
} else {
nextStopHeight = this.chain.height;
const stopHash = await this.chain.getHash(nextStopHeight);
this.getcfiltersStartHeight = startHeight;
this.requestedStopHash = stopHash;
this.peers.load.sendGetCFilters(
common.FILTERS.BASIC,
startHeight,
stopHash
);
return;
this.emit('cfilters');
}
} else if (cFilterHeight === this.chain.height) {
this.filterSyncing = false;
this.emit('cfilters');
}
}

Expand Down Expand Up @@ -2376,7 +2366,7 @@ class Pool extends EventEmitter {
if (!this.checkpoints && !this.options.neutrino)
return;

if (!this.syncing || this.filterSyncing)
if (!this.syncing)
return;

if (!peer.loader)
Expand Down Expand Up @@ -2461,9 +2451,22 @@ class Pool extends EventEmitter {
return;
}

// If we are out of IBD, neutrino needs the filters for these headers
this._startFilterHeadersSync(peer);

// Request more headers.
if (this.checkpoints)
peer.sendGetHeaders([hash], this.headerTip.hash);

let stopHash = this.chain.tip.hash;
try {
stopHash = this.getNextTip(this.chain.height).hash;
} catch (e) {
this.logger.spam('Could not get next checkpoint');
stopHash = null;
}
if (this.options.neutrino)
peer.sendGetHeaders([hash], stopHash);
}

/**
Expand Down
19 changes: 0 additions & 19 deletions lib/node/neutrino.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,25 +162,6 @@ class Neutrino extends Node {
this.emit('reset', tip);
});

this.pool.on('headers', async () => {
if (this.chain.height === 0)
return;
this.logger.info('Block Headers are fully synced');
await this.pool.startFilterHeadersSync();
});

this.pool.on('cfheaders', async () => {
if (this.chain.height === 0)
return;
this.logger.info('Filter Headers are fully synced');
await this.pool.startFilterSync();
});

this.pool.on('cfilters', async () => {
this.logger.info('Compact Filters are fully synced');
this.pool.forceSync();
});

this.loadPlugins();
}

Expand Down
20 changes: 19 additions & 1 deletion test/neutrino-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ describe('Neutrino', function () {
port: 10000,
httpPort: 20000,
logConsole: true,
logLevel: 'debug',
logLevel: 'spam',
neutrino: true,
only: '127.0.0.1'
});
Expand All @@ -65,6 +65,15 @@ describe('Neutrino', function () {
await forValue(neutrinoNode.chain, 'height', fullNode.chain.height);
});

it('should cfheaders and getcfilters', async () => {
const filterIndexer = neutrinoNode.filterIndexers.get('BASIC');
await forValue(filterIndexer, 'height', neutrinoNode.chain.height);
const filterHeight = filterIndexer.height;
assert.equal(filterHeight, neutrinoNode.chain.height);
const headerHeight = await neutrinoNode.pool.cfHeaderChain.tail.height;
assert.equal(headerHeight, neutrinoNode.chain.height);
});

it('should get new blocks headers-only', async () => {
await mineBlocks(10);
await forValue(neutrinoNode.chain, 'height', fullNode.chain.height);
Expand Down Expand Up @@ -135,6 +144,15 @@ describe('Neutrino', function () {
assert(neutrinoNode.chain.synced);
});

it('should getcfheaders and getcfilters', async () => {
const filterIndexer = neutrinoNode.filterIndexers.get('BASIC');
await forValue(filterIndexer, 'height', neutrinoNode.chain.height);
const filterHeight = filterIndexer.height;
assert.equal(filterHeight, neutrinoNode.chain.height);
const headerHeight = await neutrinoNode.pool.cfHeaderChain.tail.height;
assert.equal(headerHeight, neutrinoNode.chain.height);
});

it('should get new blocks headers-only', async () => {
await mineBlocks(10);
await forValue(neutrinoNode.chain, 'height', fullNode.chain.height);
Expand Down

1 comment on commit 5c002d4

@masterchief164
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried syncing the entire blockchain with this version and I have a firm belief that we need to start saving the filter headers. The current code uses a loader peer to load the filter headers and if the peer disconnects then we will have to wait for a new block to be mined to resume the filter sync process. Moreover, if the user kills the process while syncing the filter headers they will have to be downloaded again from scratch.

Please sign in to comment.