Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize header download across peers to fetch headers within a chain's checkpointed region. #282

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
47 changes: 16 additions & 31 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,43 +850,37 @@ func (c *checkpointedCFHeadersQuery) requests() []*query.Request {

// handleResponse is the internal response handler used for requests for this
// CFHeaders query.
func (c *checkpointedCFHeadersQuery) handleResponse(req, resp wire.Message,
peerAddr string) query.Progress {
func (c *checkpointedCFHeadersQuery) handleResponse(request query.ReqMessage, resp wire.Message,
peer query.Peer) query.Progress {

peerAddr := ""
if peer != nil {
peerAddr = peer.Addr()
}
Comment on lines +921 to +923

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it ever sensible for the peer to be nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah for the teastcase. This was how it was before:

progress := requests[index].HandleResp(
msgs[index], &resp, "",
)
.

This PR also changes the handleResp function signature from taking in peerAddress to the actual peer as you can see from the function above. So to prevent a nil pointer dereference error, that check was put in place.

There are also various instances of handleResp written in the manner as the code that I highlighted above in the test file.

req := request.Message()

r, ok := resp.(*wire.MsgCFHeaders)
if !ok {
// We are only looking for cfheaders messages.
return query.Progress{
Finished: false,
Progressed: false,
}
return query.NoResponse
}

q, ok := req.(*wire.MsgGetCFHeaders)
if !ok {
// We sent a getcfheaders message, so that's what we should be
// comparing against.
return query.Progress{
Finished: false,
Progressed: false,
}
return query.NoResponse
}

// The response doesn't match the query.
if q.FilterType != r.FilterType || q.StopHash != r.StopHash {
return query.Progress{
Finished: false,
Progressed: false,
}
return query.NoResponse
}

checkPointIndex, ok := c.stopHashes[r.StopHash]
if !ok {
// We never requested a matching stop hash.
return query.Progress{
Finished: false,
Progressed: false,
}
return query.NoResponse
}

// Use either the genesis header or the previous checkpoint index as
Expand Down Expand Up @@ -920,10 +914,7 @@ func (c *checkpointedCFHeadersQuery) handleResponse(req, resp wire.Message,
log.Errorf("Unable to ban peer %v: %v", peerAddr, err)
}

return query.Progress{
Finished: false,
Progressed: false,
}
return query.NoResponse
}

// At this point, the response matches the query, and the relevant
Expand All @@ -934,16 +925,10 @@ func (c *checkpointedCFHeadersQuery) handleResponse(req, resp wire.Message,
select {
case c.headerChan <- r:
case <-c.blockMgr.quit:
return query.Progress{
Finished: false,
Progressed: false,
}
return query.NoResponse
}

return query.Progress{
Finished: true,
Progressed: true,
}
return query.Finished
}

// sendQueryMessageWithEncoding sends a message to the peer with encoding.
Expand Down Expand Up @@ -1106,7 +1091,7 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
// Hand the queries to the work manager, and consume the verified
// responses as they come back.
errChan := b.cfg.QueryDispatcher.Query(
q.requests(), query.Cancel(b.quit), query.NoRetryMax(),
q.requests(), query.Cancel(b.quit), query.NoRetryMax(), query.ErrChan(make(chan error, 1)),
)

// Keep waiting for more headers as long as we haven't received an
Expand Down
36 changes: 18 additions & 18 deletions blockmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,14 @@ func generateHeaders(genesisBlockHeader *wire.BlockHeader,

// generateResponses generates the MsgCFHeaders messages from the given queries
// and headers.
func generateResponses(msgs []wire.Message,
func generateResponses(msgs []query.ReqMessage,
headers *headers) ([]*wire.MsgCFHeaders, error) {

// Craft a response for each message.
var responses []*wire.MsgCFHeaders
for _, msg := range msgs {
// Only GetCFHeaders expected.
q, ok := msg.(*wire.MsgGetCFHeaders)
q, ok := msg.Message().(*wire.MsgGetCFHeaders)
if !ok {
return nil, fmt.Errorf("got unexpected message %T",
msg)
Expand Down Expand Up @@ -350,9 +350,9 @@ func TestBlockManagerInitialInterval(t *testing.T) {
requests []*query.Request,
options ...query.QueryOption) chan error {

var msgs []wire.Message
var msgs []query.ReqMessage
for _, q := range requests {
msgs = append(msgs, q.Req.Message())
msgs = append(msgs, q.Req)
}

responses, err := generateResponses(msgs, headers)
Expand All @@ -379,13 +379,13 @@ func TestBlockManagerInitialInterval(t *testing.T) {
// Let the blockmanager handle the
// message.
progress := requests[index].HandleResp(
msgs[index], &resp, "",
msgs[index], &resp, nil,
)

if !progress.Finished {
if progress != query.Finished {
errChan <- fmt.Errorf("got "+
"response false on "+
"send of index %d: %v",
" %v on "+
"send of index %d: %v", progress,
index, testDesc)
return
}
Expand All @@ -400,13 +400,13 @@ func TestBlockManagerInitialInterval(t *testing.T) {
// Otherwise resend the response we
// just sent.
progress = requests[index].HandleResp(
msgs[index], &resp2, "",
msgs[index], &resp2, nil,
)
if !progress.Finished {
if progress != query.Finished {
errChan <- fmt.Errorf("got "+
"response false on "+
"resend of index %d: "+
"%v", index, testDesc)
" %v on "+
"send of index %d: %v", progress,
index, testDesc)
return
}
}
Expand Down Expand Up @@ -580,9 +580,9 @@ func TestBlockManagerInvalidInterval(t *testing.T) {
requests []*query.Request,
options ...query.QueryOption) chan error {

var msgs []wire.Message
var msgs []query.ReqMessage
for _, q := range requests {
msgs = append(msgs, q.Req.Message())
msgs = append(msgs, q.Req)
}
responses, err := generateResponses(msgs, headers)
require.NoError(t, err)
Expand Down Expand Up @@ -619,10 +619,10 @@ func TestBlockManagerInvalidInterval(t *testing.T) {
// expect.
for i := range responses {
progress := requests[i].HandleResp(
msgs[i], responses[i], "",
msgs[i], responses[i], nil,
)
if i == test.firstInvalid {
if progress.Finished {
if progress == query.Finished {
t.Errorf("expected interval "+
"%d to be invalid", i)
return
Expand All @@ -631,7 +631,7 @@ func TestBlockManagerInvalidInterval(t *testing.T) {
break
}

if !progress.Finished {
if progress != query.Finished {
t.Errorf("expected interval %d to be "+
"valid", i)
return
Expand Down
34 changes: 15 additions & 19 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ var (

// noProgress will be used to indicate to a query.WorkManager that a
// response makes no progress towards the completion of the query.
noProgress = query.Progress{
Finished: false,
Progressed: false,
}
noProgress = query.NoResponse
)

// queries are a set of options that can be modified per-query, unlike global
Expand Down Expand Up @@ -470,9 +467,10 @@ func (q *cfiltersQuery) request() *query.Request {

// handleResponse validates that the cfilter response we get from a peer is
// sane given the getcfilter query that we made.
func (q *cfiltersQuery) handleResponse(req, resp wire.Message,
_ string) query.Progress {
func (q *cfiltersQuery) handleResponse(r query.ReqMessage, resp wire.Message,
peer query.Peer) query.Progress {

req := r.Message()
// The request must have been a "getcfilters" msg.
request, ok := req.(*wire.MsgGetCFilters)
if !ok {
Expand Down Expand Up @@ -573,17 +571,11 @@ func (q *cfiltersQuery) handleResponse(req, resp wire.Message,
// If there are still entries left in the headerIndex then the query
// has made progress but has not yet completed.
if len(q.headerIndex) != 0 {
return query.Progress{
Finished: false,
Progressed: true,
}
return query.Progressed
}

// The headerIndex is empty and so this query is complete.
return query.Progress{
Finished: true,
Progressed: true,
}
return query.Finished
}

// prepareCFiltersQuery creates a cfiltersQuery that can be used to fetch a
Expand Down Expand Up @@ -784,6 +776,7 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash,
query.Cancel(s.quit),
query.Encoding(qo.encoding),
query.NumRetries(qo.numRetries),
query.ErrChan(make(chan error, 1)),
}

errChan := s.workManager.Query(
Expand Down Expand Up @@ -868,7 +861,12 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
// handleResp will be called for each message received from a peer. It
// will be used to signal to the work manager whether progress has been
// made or not.
handleResp := func(req, resp wire.Message, peer string) query.Progress {
handleResp := func(request query.ReqMessage, resp wire.Message, sp query.Peer) query.Progress {
req := request.Message()
peer := ""
if sp != nil {
peer = sp.Addr()
}
// The request must have been a "getdata" msg.
_, ok := req.(*wire.MsgGetData)
if !ok {
Expand Down Expand Up @@ -933,10 +931,7 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
// we declare it sane. We can kill the query and pass the
// response back to the caller.
foundBlock = block
return query.Progress{
Finished: true,
Progressed: true,
}
return query.Finished
}

// Prepare the query request.
Expand Down Expand Up @@ -968,6 +963,7 @@ func (s *ChainService) GetBlock(blockHash chainhash.Hash,
query.Encoding(qo.encoding),
query.NumRetries(qo.numRetries),
query.Cancel(s.quit),
query.ErrChan(make(chan error, 1)),
}

// Send the request to the work manager and await a response.
Expand Down
54 changes: 40 additions & 14 deletions query/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type queryOptions struct {
// that a query can be retried. If this is set then numRetries has no
// effect.
noRetryMax bool

// errChan error channel with which the workmananger sends error.
errChan chan error
}

// QueryOption is a functional option argument to any of the network query
Expand All @@ -67,6 +70,14 @@ func (qo *queryOptions) applyQueryOptions(options ...QueryOption) {
}
}

// ErrChan is a query option that specifies the error channel which the workmanager
// sends any error to.
func ErrChan(err chan error) QueryOption {
return func(qo *queryOptions) {
qo.errChan = err
}
}

// NumRetries is a query option that specifies the number of times a query
// should be retried.
func NumRetries(num uint8) QueryOption {
Expand Down Expand Up @@ -107,19 +118,34 @@ func Cancel(cancel chan struct{}) QueryOption {
}
}

// Progress encloses the result of handling a response for a given Request,
// determining whether the response did progress the query.
type Progress struct {
// Finished is true if the query was finished as a result of the
// received response.
Finished bool

// Progressed is true if the query made progress towards fully
// answering the request as a result of the received response. This is
// used for the requests types where more than one response is
// expected.
Progressed bool
}
// Progress encloses the result of handling a response for a given Request.
type Progress string

var (

// Finished indicates we have received the complete, valid response for this request,
// and so we are done with it.
Finished Progress = "Received complete and valid response for request."

// Progressed indicates that we have received a valid response, but we are expecting more.
Progressed Progress = "Received valid response, expecting more response for query."

// UnFinishedRequest indicates that we have received some response, but we need to rescheule the job
// to completely fetch all the response required for this request.
UnFinishedRequest Progress = "Received valid response, reschedule to complete request"

// ResponseErr indicates we obtained a valid response but response fails checks and needs to
// be rescheduled.
ResponseErr Progress = "Received valid response but fails checks "

// IgnoreRequest indicates that we have received a valid response but the workmanager need take
// no action on the result of this job.
IgnoreRequest Progress = "Received response but ignoring"

// NoResponse indicates that we have received an invalid response for this request, and we need
// to wait for a valid one.
NoResponse Progress = "Received invalid response"
)
ProofOfKeags marked this conversation as resolved.
Show resolved Hide resolved

// Request is the main struct that defines a bitcoin network query to be sent to
// connected peers.
Expand All @@ -138,7 +164,7 @@ type Request struct {
// should validate the response and immediately return the progress.
// The response should be handed off to another goroutine for
// processing.
HandleResp func(req, resp wire.Message, peer string) Progress
HandleResp func(req ReqMessage, resp wire.Message, peer Peer) Progress

// SendQuery handles sending request to the worker's peer. It returns an error,
// if one is encountered while sending the request.
Expand Down
Loading