From 21cd7bd1f01d2614cf80c4b7c9bac2e5da1f6e14 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 12 Jan 2022 16:03:44 -0800 Subject: [PATCH 1/2] fix(executor): don't run hooks on blocks we didn't have --- requestmanager/executor/executor.go | 6 +- requestmanager/executor/executor_test.go | 72 ++++++++++++++---------- 2 files changed, 47 insertions(+), 31 deletions(-) diff --git a/requestmanager/executor/executor.go b/requestmanager/executor/executor.go index a505ffe1..54790345 100644 --- a/requestmanager/executor/executor.go +++ b/requestmanager/executor/executor.go @@ -163,6 +163,7 @@ func (e *Executor) traverse(rt RequestTask) error { if err != nil { return err } + } } @@ -201,7 +202,10 @@ func (e *Executor) advanceTraversal(rt RequestTask, result types.AsyncLoadResult } func (e *Executor) processResult(rt RequestTask, link ipld.Link, result types.AsyncLoadResult) error { - err := e.onNewBlock(rt, &blockData{link, result.Local, uint64(len(result.Data)), int64(rt.Traverser.NBlocksTraversed())}) + var err error + if result.Err == nil { + err = e.onNewBlock(rt, &blockData{link, result.Local, uint64(len(result.Data)), int64(rt.Traverser.NBlocksTraversed())}) + } select { case <-rt.PauseMessages: if err == nil { diff --git a/requestmanager/executor/executor_test.go b/requestmanager/executor/executor_test.go index 8b737e3e..cb130157 100644 --- a/requestmanager/executor/executor_test.go +++ b/requestmanager/executor/executor_test.go @@ -6,7 +6,6 @@ import ( "math/rand" "sync/atomic" "testing" - "time" "github.com/ipfs/go-cid" "github.com/ipfs/go-peertaskqueue/peertask" @@ -27,11 +26,8 @@ import ( "github.com/ipfs/go-graphsync/testutil" ) -type configureLoaderFn func(p peer.ID, requestID graphsync.RequestID, tbc *testutil.TestBlockChain, fal *testloader.FakeAsyncLoader, startStop [2]int) - func TestRequestExecutionBlockChain(t *testing.T) { testCases := map[string]struct { - configureLoader configureLoaderFn configureRequestExecution func(p peer.ID, requestID graphsync.RequestID, tbc *testutil.TestBlockChain, ree *requestExecutionEnv) verifyResults func(t *testing.T, tbc *testutil.TestBlockChain, ree *requestExecutionEnv, responses []graphsync.ResponseProgress, receivedErrors []error) }{ @@ -44,6 +40,26 @@ func TestRequestExecutionBlockChain(t *testing.T) { require.NoError(t, ree.terminalError) }, }, + "missing block case": { + configureRequestExecution: func(p peer.ID, requestID graphsync.RequestID, tbc *testutil.TestBlockChain, ree *requestExecutionEnv) { + ree.customRemoteBehavior = func() { + // pretend the remote sent five blocks before encountering a missing block + ree.fal.SuccessResponseOn(p, requestID, tbc.Blocks(0, 5)) + missingCid := cidlink.Link{Cid: tbc.Blocks(5, 6)[0].Cid()} + ree.fal.ResponseOn(p, requestID, missingCid, types.AsyncLoadResult{Err: graphsync.RemoteMissingBlockErr{Link: missingCid}}) + } + }, + verifyResults: func(t *testing.T, tbc *testutil.TestBlockChain, ree *requestExecutionEnv, responses []graphsync.ResponseProgress, receivedErrors []error) { + tbc.VerifyResponseRangeSync(responses, 0, 5) + require.Len(t, receivedErrors, 1) + require.Equal(t, receivedErrors[0], graphsync.RemoteMissingBlockErr{Link: cidlink.Link{Cid: tbc.Blocks(5, 6)[0].Cid()}}) + require.Equal(t, []requestSent{{ree.p, ree.request}}, ree.requestsSent) + // we should only call block hooks for blocks we actually received + require.Len(t, ree.blookHooksCalled, 5) + require.NoError(t, ree.terminalError) + }, + }, + "error at block hook": { configureRequestExecution: func(p peer.ID, requestID graphsync.RequestID, tbc *testutil.TestBlockChain, ree *requestExecutionEnv) { ree.blockHookResults[blockHookKey{p, requestID, tbc.LinkTipIndex(5)}] = hooks.UpdateResult{Err: errors.New("something went wrong")} @@ -174,19 +190,13 @@ func TestRequestExecutionBlockChain(t *testing.T) { for testCase, data := range testCases { t.Run(testCase, func(t *testing.T) { ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() + //ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + //defer cancel() persistence := testutil.NewTestStore(make(map[ipld.Link][]byte)) tbc := testutil.SetupBlockChain(ctx, t, persistence, 100, 10) fal := testloader.NewFakeAsyncLoader() requestID := graphsync.RequestID(rand.Int31()) p := testutil.GeneratePeers(1)[0] - configureLoader := data.configureLoader - if configureLoader == nil { - configureLoader = func(p peer.ID, requestID graphsync.RequestID, tbc *testutil.TestBlockChain, fal *testloader.FakeAsyncLoader, startStop [2]int) { - fal.SuccessResponseOn(p, requestID, tbc.Blocks(startStop[0], startStop[1])) - } - } requestCtx, requestCancel := context.WithCancel(ctx) defer requestCancel() var responsesReceived []graphsync.ResponseProgress @@ -199,7 +209,6 @@ func TestRequestExecutionBlockChain(t *testing.T) { request: gsmsg.NewRequest(requestID, tbc.TipLink.(cidlink.Link).Cid, tbc.Selector(), graphsync.Priority(rand.Int31())), fal: fal, tbc: tbc, - configureLoader: configureLoader, initialRequest: true, inProgressErr: make(chan error, 1), traverser: ipldutil.TraversalBuilder{ @@ -219,7 +228,7 @@ func TestRequestExecutionBlockChain(t *testing.T) { if data.configureRequestExecution != nil { data.configureRequestExecution(p, requestID, tbc, ree) } - ree.configureLoader(p, requestID, tbc, fal, [2]int{0, ree.loadLocallyUntil}) + ree.fal.SuccessResponseOn(p, requestID, tbc.Blocks(0, ree.loadLocallyUntil)) var errorsReceived []error errCollectionErr := make(chan error, 1) go func() { @@ -262,27 +271,26 @@ type pauseKey struct { type requestExecutionEnv struct { // params - ctx context.Context - request gsmsg.GraphSyncRequest - p peer.ID - blockHookResults map[blockHookKey]hooks.UpdateResult - doNotSendCids *cid.Set - pauseMessages chan struct{} - externalPause pauseKey - loadLocallyUntil int - traverser ipldutil.Traverser - inProgressErr chan error - initialRequest bool - + ctx context.Context + request gsmsg.GraphSyncRequest + p peer.ID + blockHookResults map[blockHookKey]hooks.UpdateResult + doNotSendCids *cid.Set + pauseMessages chan struct{} + externalPause pauseKey + loadLocallyUntil int + traverser ipldutil.Traverser + inProgressErr chan error + initialRequest bool + customRemoteBehavior func() // results requestsSent []requestSent blookHooksCalled []blockHookKey terminalError error // deps - configureLoader configureLoaderFn - tbc *testutil.TestBlockChain - fal *testloader.FakeAsyncLoader + tbc *testutil.TestBlockChain + fal *testloader.FakeAsyncLoader } func (ree *requestExecutionEnv) ReleaseRequestTask(_ peer.ID, _ *peertask.Task, err error) { @@ -317,7 +325,11 @@ func (ree *requestExecutionEnv) GetRequestTask(_ peer.ID, _ *peertask.Task, requ func (ree *requestExecutionEnv) SendRequest(p peer.ID, request gsmsg.GraphSyncRequest) { ree.requestsSent = append(ree.requestsSent, requestSent{p, request}) if !request.IsCancel() && !request.IsUpdate() { - ree.configureLoader(ree.p, ree.request.ID(), ree.tbc, ree.fal, [2]int{ree.loadLocallyUntil, len(ree.tbc.AllBlocks())}) + if ree.customRemoteBehavior == nil { + ree.fal.SuccessResponseOn(p, request.ID(), ree.tbc.Blocks(ree.loadLocallyUntil, len(ree.tbc.AllBlocks()))) + } else { + ree.customRemoteBehavior() + } } } From 5727192d71312ac1f4686c432d5b31f5af57ff77 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 12 Jan 2022 18:26:09 -0800 Subject: [PATCH 2/2] fix(executor): fix test --- requestmanager/executor/executor_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/requestmanager/executor/executor_test.go b/requestmanager/executor/executor_test.go index cb130157..2119d421 100644 --- a/requestmanager/executor/executor_test.go +++ b/requestmanager/executor/executor_test.go @@ -6,6 +6,7 @@ import ( "math/rand" "sync/atomic" "testing" + "time" "github.com/ipfs/go-cid" "github.com/ipfs/go-peertaskqueue/peertask" @@ -190,8 +191,8 @@ func TestRequestExecutionBlockChain(t *testing.T) { for testCase, data := range testCases { t.Run(testCase, func(t *testing.T) { ctx := context.Background() - //ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - //defer cancel() + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() persistence := testutil.NewTestStore(make(map[ipld.Link][]byte)) tbc := testutil.SetupBlockChain(ctx, t, persistence, 100, 10) fal := testloader.NewFakeAsyncLoader()