Skip to content

Commit

Permalink
neutrino: Added sideload functionality to blockmanager
Browse files Browse the repository at this point in the history
Signed-off-by: Maureen Ononiwu <amaka013@gmail.com>
  • Loading branch information
Chinwendu20 committed Aug 24, 2023
1 parent 4f7b382 commit 7159f20
Showing 1 changed file with 237 additions and 10 deletions.
247 changes: 237 additions & 10 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/neutrino/banman"
"github.com/lightninglabs/neutrino/blockntfns"
"github.com/lightninglabs/neutrino/chainDataLoader"
"github.com/lightninglabs/neutrino/chainsync"
"github.com/lightninglabs/neutrino/chanutils"
"github.com/lightninglabs/neutrino/headerfs"
"github.com/lightninglabs/neutrino/headerlist"
"github.com/lightninglabs/neutrino/query"
Expand All @@ -43,6 +45,14 @@ const (
// maxCFCheckptsPerQuery is the maximum number of filter header
// checkpoints we can query for within a single message over the wire.
maxCFCheckptsPerQuery = wire.MaxCFHeadersPerMsg / wire.CFCheckptInterval

//BlkHeaderWriteBatch is the batch size to be written into the DB while sideloading, less could
//be written if the ticker DBTickerBlkHeaderWriteDuration expires.
BlkHeaderWriteBatch = 2000

//DBTickerBlkHeaderWriteDuration is the maximum amount of time required to be expended before
//the next batch of block headers is written into the DB during side loading.
DBTickerBlkHeaderWriteDuration = time.Millisecond * 500
)

// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
Expand Down Expand Up @@ -109,6 +119,9 @@ type blockManagerCfg struct {
checkResponse func(sp *ServerPeer, resp wire.Message,
quit chan<- struct{}, peerQuit chan<- struct{}),
options ...QueryOption)

// sideLoad is the config used to enable a non p2p fetching of headers to improve sync speed.
sideLoad SideLoadOpt
}

// blockManager provides a concurrency safe block manager for handling all
Expand Down Expand Up @@ -206,6 +219,12 @@ type blockManager struct { // nolint:maligned
minRetargetTimespan int64 // target timespan / adjustment factor
maxRetargetTimespan int64 // target timespan * adjustment factor
blocksPerRetarget int32 // target timespan / target time per block

//sideLoadReader is the reader used for a non p2p fetching of headers.
sideLoadReader chainDataLoader.Reader

//blockHeaderBatchWriter writes sideloaded block headers in batches into the blockheaders store.
blockHeaderBatchWriter *chanutils.BatchWriter[headerfs.BlockHeader]
}

// newBlockManager returns a new bitcoin block manager. Use Start to begin
Expand All @@ -214,7 +233,6 @@ func newBlockManager(cfg *blockManagerCfg) (*blockManager, error) {
targetTimespan := int64(cfg.ChainParams.TargetTimespan / time.Second)
targetTimePerBlock := int64(cfg.ChainParams.TargetTimePerBlock / time.Second)
adjustmentFactor := cfg.ChainParams.RetargetAdjustmentFactor

bm := blockManager{
cfg: cfg,
peerChan: make(chan interface{}, MaxPeers*3),
Expand Down Expand Up @@ -279,6 +297,36 @@ func newBlockManager(cfg *blockManagerCfg) (*blockManager, error) {
}
bm.filterHeaderTipHash = fh.BlockHash()

// If sideload is enabled initialize reader and assign to blockmanager.
if bm.cfg.sideLoad.Enabled {

reader, err := chainDataLoader.NewReader(&chainDataLoader.ReaderConfig{

SourceType: bm.cfg.sideLoad.SourceType,
Path: bm.cfg.sideLoad.Path,
})

log.Infof("Side loading enabled")

if err == nil {
bm.sideLoadReader = reader
cfg := &chanutils.BatchWriterConfig[headerfs.BlockHeader]{
QueueBufferSize: chanutils.DefaultQueueSize,
MaxBatch: BlkHeaderWriteBatch,
DBWritesTickerDuration: DBTickerBlkHeaderWriteDuration,
PutItems: bm.cfg.BlockHeaders.WriteHeaders,
}

bm.blockHeaderBatchWriter = chanutils.NewBatchWriter[headerfs.BlockHeader](
cfg,
)
} else {
bm.cfg.sideLoad.Enabled = false
log.Warnf("Side loading disabled: %v", err)
}

}

return &bm, nil
}

Expand Down Expand Up @@ -1981,6 +2029,15 @@ func checkCFCheckptSanity(cp map[string][]*chainhash.Hash,
func (b *blockManager) blockHandler() {
defer b.wg.Done()

// Attempt to sideLoad headers. If sideLoading is not enabled the function
// returns quickly.
b.sideLoadHeaders()

select {
case <-b.quit:
return
default:
}
candidatePeers := list.New()
out:
for {
Expand Down Expand Up @@ -2410,16 +2467,11 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
node := headerlist.Node{Header: *blockHeader}
prevNode := prevNodeEl
prevHash := prevNode.Header.BlockHash()
if prevHash.IsEqual(&blockHeader.PrevBlock) {
prevNodeHeight := prevNode.Height
prevNodeHeader := prevNode.Header
err := b.checkHeaderSanity(
blockHeader, false, prevNodeHeight,
&prevNodeHeader,
)
valid, err := b.verifyBlockHeader(blockHeader, *prevNode)
if valid {
if err != nil {
log.Warnf("Header doesn't pass sanity check: "+
"%s -- disconnecting peer", err)
log.Warnf("%v: "+
" -- disconnecting peer", err)
hmsg.peer.Disconnect()
return
}
Expand Down Expand Up @@ -2547,6 +2599,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
reorgHeader, true,
int32(prevNodeHeight), prevNodeHeader,
)

if err != nil {
log.Warnf("Header doesn't pass sanity"+
" check: %s -- disconnecting "+
Expand Down Expand Up @@ -2754,6 +2807,7 @@ func areHeadersConnected(headers []*wire.BlockHeader) bool {
}

lastHeader = blockHash

}

return true
Expand Down Expand Up @@ -3043,4 +3097,177 @@ func (l *lightHeaderCtx) RelativeAncestorCtx(
return newLightHeaderCtx(
ancestorHeight, ancestor, l.store, l.headerList,
)

}

func (b *blockManager) sideLoadHeaders() {
// If sideloading is not enabled on this chain
// return quickly.
if !b.cfg.sideLoad.Enabled {
return
}

// If headers contained in the side load source are for a different chain network return
// immediately.
if b.sideLoadReader.HeadersChain() != b.cfg.ChainParams.Net {
log.Error("headers from side load file are of network %v "+
"and so incompatible with neutrino's current bitcoin network "+
"-- skipping side loading", b.sideLoadReader.HeadersChain())

return
}
log.Infof("Side loading headers from %v to %v", b.headerTip, b.sideLoadReader.EndHeight())

// Set headerTip to enable reader supply header, node needs
err := b.sideLoadReader.SetHeight(int64(b.headerTip))
if err != nil {
log.Errorf("error while setting height for sideload--- skipping sideloading: "+
"%v", err)

return
}

// Start batch writer to write headers into DB at specified batch size and ticker duration.
b.blockHeaderBatchWriter.Start()
defer b.blockHeaderBatchWriter.Stop()
for {
select {
case <-b.quit:
return
default:
//Request header
header, headerErr := b.sideLoadReader.NextHeader()
// If any error occurs while fetching headers that does not indicate
// an end of file, return immediately.
if headerErr != nil {

if headerErr == chainDataLoader.ErrEndOfFile {
log.Infof("Successfully completed sideloading")
return
}

log.Errorf("error while fetching headers -- skipping sideloading %v", err)
return
}

var (
node *headerlist.Node
prevNode *headerlist.Node
)
// Ensure there is a previous header to compare against.
prevNodeEl := b.headerList.Back()
if prevNodeEl == nil {
log.Warnf("side load - Header list does not contain a previous" +
"element as expected -- exiting side load")

return
}

node = &headerlist.Node{Header: *header}
prevNode = prevNodeEl
node.Height = prevNode.Height + 1

if b.cfg.sideLoad.Verify {
valid, err := b.verifyBlockHeader(header, *prevNode)
if err != nil || !valid {
log.Debugf("Side Load- Did not pass verification at height %v"+
"-- rolling back to last verified checkpoint and skipping sideload", node.Height)

prevCheckpoint := b.findPreviousHeaderCheckpoint(
node.Height,
)

log.Infof("Rolling back to previous validated "+
"checkpoint at height %d/hash %s",
prevCheckpoint.Height,
prevCheckpoint.Hash)

//It is possible that the number of headers in the batch writer's buffer is less than the
//BlkHeaderWriteBatch and so these headers have not been written into the DB at this time.
//We wait for the DBTickerBlkHeaderWriteDuration to expire and write these headers into the
//DB before we roll back which should take approximately twice the DBTickerBlkHeaderWriteDuration.
//This ensures that these headers are wiped off, during the rollback, in the batch writer's buffer
//and in the DB as well.
time.Sleep(DBTickerBlkHeaderWriteDuration * 2)
err = b.rollBackToHeight(uint32(prevCheckpoint.Height))
if err != nil {
panic(fmt.Sprintf("Rollback failed: %s", err))
// Should we panic here?
}
tipHeader, height, err := b.cfg.BlockHeaders.ChainTip()
if err != nil {
return
}

b.headerList.ResetHeaderState(headerlist.Node{
Height: int32(height),
Header: *tipHeader,
})
return
}

}

// Verify checkpoint only if verification is enabled.
if b.nextCheckpoint != nil && b.cfg.sideLoad.Verify &&
node.Height == b.nextCheckpoint.Height {

nodeHash := node.Header.BlockHash()
if nodeHash.IsEqual(b.nextCheckpoint.Hash) {

log.Infof("Verified downloaded block "+
"header against checkpoint at height "+
"%d/hash %s", node.Height, nodeHash)
} else {
log.Warnf("Error at checkpoint while side loading headers, exiting"+
"%d/hash %s", node.Height, nodeHash)
return
}

}

//Add to batch writer.
b.blockHeaderBatchWriter.AddItem(headerfs.BlockHeader{
BlockHeader: header,
Height: uint32(node.Height),
},
)

//Update headerTip to give more accuarate info about tip of DB.
b.nextCheckpoint = b.findNextHeaderCheckpoint(node.Height)
b.newHeadersMtx.Lock()
b.headerTip = uint32(node.Height)
b.headerTipHash = node.Header.BlockHash()
b.newHeadersMtx.Unlock()
b.newHeadersSignal.Broadcast()

b.blkHeaderProgressLogger.LogBlockHeight(
header.Timestamp, node.Height,
)

b.headerList.PushBack(*node)

}
}

}

// verifyBlockHeader verifies blockheader by checking if it connects to the previous block and its block header sanity.
func (b *blockManager) verifyBlockHeader(blockHeader *wire.BlockHeader, prevNode headerlist.Node) (bool, error) {
prevNodeHeader := prevNode.Header
prevHash := prevNode.Header.BlockHash()
prevNodeHeight := prevNode.Height

if prevHash.IsEqual(&blockHeader.PrevBlock) {
err := b.checkHeaderSanity(blockHeader, false, prevNodeHeight, &prevNodeHeader)

if err != nil {
return true, fmt.Errorf("did not pass sanity check: %w", err)
}
return true, nil

} else {
return false, nil
}

}

0 comments on commit 7159f20

Please sign in to comment.