Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize header download across peers to fetch headers within a chain's checkpointed region. #282

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
52 changes: 47 additions & 5 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package neutrino
import (
"bytes"
"container/list"
"errors"
"fmt"
"math"
"math/big"
Expand Down Expand Up @@ -808,19 +809,39 @@ func (b *blockManager) getUncheckpointedCFHeaders(
// handle a query for checkpointed filter headers.
type checkpointedCFHeadersQuery struct {
blockMgr *blockManager
msgs []wire.Message
msgs []*encodedQuery
checkpoints []*chainhash.Hash
stopHashes map[chainhash.Hash]uint32
headerChan chan *wire.MsgCFHeaders
}

// encodedQuery holds all the information needed to query a message that pushes requests
// using the QueryMessagingWithEncoding method.
type encodedQuery struct {
message wire.Message
encoding wire.MessageEncoding
priorityIndex uint64
}

// Message returns the wire.Message of encodedQuery's struct.
func (e *encodedQuery) Message() wire.Message {
return e.message
}

// PriorityIndex returns the specified priority the caller wants
// the request to take.
func (e *encodedQuery) PriorityIndex() uint64 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasonale for the priority index? It's adding quite a bit of complexity to the code.

Would the caller asking for block headers within a moving window suffice instead of having a priority index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The work manager assigns priority to jobs on a first come, first serve basis. There are cases where we might need to bump the priority of a job sent at a later time, that is when the priority index comes in. Example use case:
https://github.com/lightninglabs/neutrino/pull/282/files#diff-2345cea8b12f07d1f60d42b7e5563720e69e1544c641261cf147668c4599320bR2227 (I would be changing the priority to not 0 in my next push, though)
Where we might need to re-request headers after validation while fetching headers within the checkpointed region. If there was no priority index, this particular job would be handled last after all jobs, we do not want this as we would not be able to progress with block header verification and writing into the database if we do not have those headers we re-requested at that particular height (validation is done in order). This is because it is at the chain tip and therefore next in line to be processed here

Could you please explain more about what you mean by this?

Would the caller asking for block headers within a moving window suffice instead of having a priority index?

return e.priorityIndex
}

// requests creates the query.Requests for this CF headers query.
func (c *checkpointedCFHeadersQuery) requests() []*query.Request {
reqs := make([]*query.Request, len(c.msgs))
for idx, m := range c.msgs {
reqs[idx] = &query.Request{
Req: m,
HandleResp: c.handleResponse,
SendQuery: sendQueryMessageWithEncoding,
}
}
return reqs
Expand Down Expand Up @@ -924,6 +945,24 @@ func (c *checkpointedCFHeadersQuery) handleResponse(req, resp wire.Message,
}
}

// sendQueryMessageWithEncoding sends a message to the peer with encoding.
func sendQueryMessageWithEncoding(peer query.Peer, req query.ReqMessage) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just require that this is a ServerPeer in the function signature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it follows this function signature:
https://github.com/Chinwendu20/neutrino/blob/64b278771ff75da0b30136af7d4ab0ace06d897e/query/interface.go#L169-L171
And since this mean't to be called in the query package it would cause a cyclic dependence.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like QueueMessageWithEncoding is also part of the Peer interface, so why do we need this ServerPeer downcast?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was removed in this commit: 9ab1d8b

sp, ok := peer.(*ServerPeer)
if !ok {
err := "peer is not of type ServerPeer"
log.Errorf(err)
return errors.New(err)
}
request, ok := req.(*encodedQuery)
if !ok {
return errors.New("invalid request type")
}

sp.QueueMessageWithEncoding(request.message, nil, request.encoding)

return nil
}

// getCheckpointedCFHeaders catches a filter header store up with the
// checkpoints we got from the network. It assumes that the filter header store
// matches the checkpoints up to the tip of the store.
Expand Down Expand Up @@ -959,7 +998,7 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
// the remaining checkpoint intervals.
numCheckpts := uint32(len(checkpoints)) - startingInterval
numQueries := (numCheckpts + maxCFCheckptsPerQuery - 1) / maxCFCheckptsPerQuery
queryMsgs := make([]wire.Message, 0, numQueries)
queryMsgs := make([]*encodedQuery, 0, numQueries)

// We'll also create an additional set of maps that we'll use to
// re-order the responses as we get them in.
Expand Down Expand Up @@ -1004,9 +1043,12 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,

// Once we have the stop hash, we can construct the query
// message itself.
queryMsg := wire.NewMsgGetCFHeaders(
fType, startHeightRange, &stopHash,
)
queryMsg := &encodedQuery{
message: wire.NewMsgGetCFHeaders(
fType, startHeightRange, &stopHash,
),
encoding: wire.WitnessEncoding,
}

// We'll mark that the ith interval is queried by this message,
// and also map the stop hash back to the index of this message.
Expand Down
4 changes: 2 additions & 2 deletions blockmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func TestBlockManagerInitialInterval(t *testing.T) {

var msgs []wire.Message
for _, q := range requests {
msgs = append(msgs, q.Req)
msgs = append(msgs, q.Req.Message())
}

responses, err := generateResponses(msgs, headers)
Expand Down Expand Up @@ -582,7 +582,7 @@ func TestBlockManagerInvalidInterval(t *testing.T) {

var msgs []wire.Message
for _, q := range requests {
msgs = append(msgs, q.Req)
msgs = append(msgs, q.Req.Message())
}
responses, err := generateResponses(msgs, headers)
require.NoError(t, err)
Expand Down
17 changes: 13 additions & 4 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,13 +435,17 @@ type cfiltersQuery struct {
// request couples a query message with the handler to be used for the response
// in a query.Request struct.
func (q *cfiltersQuery) request() *query.Request {
msg := wire.NewMsgGetCFilters(
q.filterType, uint32(q.startHeight), q.stopHash,
)
msg := &encodedQuery{
message: wire.NewMsgGetCFilters(
q.filterType, uint32(q.startHeight), q.stopHash,
),
encoding: wire.WitnessEncoding,
}

return &query.Request{
Req: msg,
HandleResp: q.handleResponse,
SendQuery: sendQueryMessageWithEncoding,
}
}

Expand Down Expand Up @@ -833,6 +837,10 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
// Construct the appropriate getdata message to fetch the target block.
getData := wire.NewMsgGetData()
_ = getData.AddInvVect(inv)
msg := &encodedQuery{
message: getData,
encoding: wire.WitnessEncoding,
}

var foundBlock *btcutil.Block

Expand Down Expand Up @@ -912,8 +920,9 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,

// Prepare the query request.
request := &query.Request{
Req: getData,
Req: msg,
HandleResp: handleResp,
SendQuery: sendQueryMessageWithEncoding,
}

// Prepare the query options.
Expand Down
20 changes: 18 additions & 2 deletions query/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ type Progress struct {
// Request is the main struct that defines a bitcoin network query to be sent to
// connected peers.
type Request struct {
// Req is the message request to send.
Req wire.Message
// Req contains the message request to send.
Req ReqMessage

// HandleResp is a response handler that will be called for every
// message received from the peer that the request was made to. It
Expand All @@ -139,6 +139,22 @@ type Request struct {
// The response should be handed off to another goroutine for
// processing.
HandleResp func(req, resp wire.Message, peer string) Progress

// SendQuery handles sending request to the worker's peer. It returns an error,
// if one is encountered while sending the request.
SendQuery func(peer Peer, request ReqMessage) error
}

// ReqMessage is an interface which all structs containing information
// required to process a message request must implement.
type ReqMessage interface {

// Message returns the message request.
Message() wire.Message

// PriorityIndex returns the priority the caller prefers the request
// would take.
PriorityIndex() uint64
}

// WorkManager defines an API for a manager that dispatches queries to bitcoin
Expand Down
35 changes: 24 additions & 11 deletions query/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package query
import (
"errors"
"time"

"github.com/btcsuite/btcd/wire"
)

var (
Expand All @@ -27,7 +25,6 @@ type queryJob struct {
tries uint8
index uint64
timeout time.Duration
encoding wire.MessageEncoding
cancelChan <-chan struct{}
*Request
}
Expand Down Expand Up @@ -89,6 +86,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
msgChan, cancel := peer.SubscribeRecvMsg()
defer cancel()

nexJobLoop:
for {
log.Tracef("Worker %v waiting for more work", peer.Addr())

Expand Down Expand Up @@ -133,7 +131,22 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
log.Tracef("Worker %v queuing job %T with index %v",
peer.Addr(), job.Req, job.Index())

peer.QueueMessageWithEncoding(job.Req, nil, job.encoding)
err := job.SendQuery(peer, job.Req)

// If any error occurs while sending query, quickly send the result
// containing the error to the workmanager.
if err != nil {
select {
case results <- &jobResult{
job: job,
peer: peer,
err: err,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is unfinished intentionally omitted here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes as bool is already false by default.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think including it would make the code more readable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about redundancy?

}:
case <-quit:
return
}
goto nexJobLoop

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a continue suffice here? Is there anything that this goto provides that a continue wouldn't do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks looking at it again, I think continue could work, thanks.

}
}

// Wait for the correct response to be received from the peer,
Expand All @@ -143,15 +156,15 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
timeout = time.NewTimer(job.timeout)
)

Loop:
feedbackLoop:
for {
select {
// A message was received from the peer, use the
// response handler to check whether it was answering
// our request.
case resp := <-msgChan:
progress := job.HandleResp(
job.Req, resp, peer.Addr(),
job.Req.Message(), resp, peer.Addr(),
)

log.Tracef("Worker %v handled msg %T while "+
Expand All @@ -176,12 +189,12 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
job.timeout,
)
}
continue Loop
continue feedbackLoop
}

// We did get a valid response, and can break
// the loop.
break Loop
break feedbackLoop

// If the timeout is reached before a valid response
// has been received, we exit with an error.
Expand All @@ -193,7 +206,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
"with job index %v", peer.Addr(),
job.Req, job.Index())

break Loop
break feedbackLoop

// If the peer disconnects before giving us a valid
// answer, we'll also exit with an error.
Expand All @@ -203,7 +216,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
job.Index())

jobErr = ErrPeerDisconnected
break Loop
break feedbackLoop

// If the job was canceled, we report this back to the
// work manager.
Expand All @@ -212,7 +225,7 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) {
peer.Addr(), job.Index())

jobErr = ErrJobCanceled
break Loop
break feedbackLoop

case <-quit:
return
Expand Down
Loading