diff --git a/shared/commp.go b/shared/commp.go new file mode 100644 index 00000000..4815c446 --- /dev/null +++ b/shared/commp.go @@ -0,0 +1,45 @@ +package shared + +import ( + "io" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-commp-utils/writer" + commcid "github.com/filecoin-project/go-fil-commcid" + commp "github.com/filecoin-project/go-fil-commp-hashhash" +) + +func GenerateCommp(reader io.Reader, payloadSize uint64, targetSize uint64) (cid.Cid, error) { + // dump the CARv1 payload of the CARv2 file to the Commp Writer and get back the CommP. + w := &writer.Writer{} + written, err := io.Copy(w, reader) + if err != nil { + return cid.Undef, xerrors.Errorf("failed to write to CommP writer: %w", err) + } + if written != int64(payloadSize) { + return cid.Undef, xerrors.Errorf("number of bytes written to CommP writer %d not equal to the CARv1 payload size %d", written, payloadSize) + } + + cidAndSize, err := w.Sum() + if err != nil { + return cid.Undef, xerrors.Errorf("failed to get CommP: %w", err) + } + + if uint64(cidAndSize.PieceSize) < targetSize { + // need to pad up! + rawPaddedCommp, err := commp.PadCommP( + // we know how long a pieceCid "hash" is, just blindly extract the trailing 32 bytes + cidAndSize.PieceCID.Hash()[len(cidAndSize.PieceCID.Hash())-32:], + uint64(cidAndSize.PieceSize), + uint64(targetSize), + ) + if err != nil { + return cid.Undef, err + } + cidAndSize.PieceCID, _ = commcid.DataCommitmentV1ToCID(rawPaddedCommp) + } + + return cidAndSize.PieceCID, err +} diff --git a/storagemarket/impl/provider.go b/storagemarket/impl/provider.go index 248afc0a..699234bd 100644 --- a/storagemarket/impl/provider.go +++ b/storagemarket/impl/provider.go @@ -614,13 +614,6 @@ func (p *Provider) runMigrations(ctx context.Context) ([]storagemarket.MinerDeal return nil, xerrors.Errorf("failed to fetch deals during startup: %w", err) } - // re-track all deals for whom we still have a local blockstore. - for _, d := range deals { - if _, err := os.Stat(d.InboundCAR); err == nil && d.Ref != nil { - _, _ = p.stores.GetOrOpen(d.ProposalCid.String(), d.InboundCAR, d.Ref.Root) - } - } - // migrate deals to the dagstore if still not migrated. if ok, err := p.dagStore.MigrateDeals(ctx, deals); err != nil { return nil, fmt.Errorf("failed to migrate deals to DAG store: %w", err) diff --git a/storagemarket/impl/provider_environments.go b/storagemarket/impl/provider_environments.go index 4044e1f8..f4a043b9 100644 --- a/storagemarket/impl/provider_environments.go +++ b/storagemarket/impl/provider_environments.go @@ -2,7 +2,6 @@ package storageimpl import ( "context" - "io" "os" "github.com/ipfs/go-cid" @@ -12,13 +11,11 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-commp-utils/writer" - commcid "github.com/filecoin-project/go-fil-commcid" - commp "github.com/filecoin-project/go-fil-commp-hashhash" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/piecestore" + "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates" "github.com/filecoin-project/go-fil-markets/storagemarket/network" @@ -44,6 +41,10 @@ func (p *providerDealEnvironment) ReadCAR(path string) (*carv2.Reader, error) { func (p *providerDealEnvironment) FinalizeBlockstore(proposalCid cid.Cid) error { bs, err := p.p.stores.Get(proposalCid.String()) if err != nil { + if xerrors.Is(err, stores.ErrNotFound) { + // The blockstore has already been cleaned up + return nil + } return xerrors.Errorf("failed to get read/write blockstore: %w", err) } @@ -107,36 +108,8 @@ func (p *providerDealEnvironment) GeneratePieceCommitment(proposalCid cid.Cid, c } }() - // dump the CARv1 payload of the CARv2 file to the Commp Writer and get back the CommP. - w := &writer.Writer{} - written, err := io.Copy(w, rd.DataReader()) - if err != nil { - return cid.Undef, "", xerrors.Errorf("failed to write to CommP writer: %w", err) - } - if written != int64(rd.Header.DataSize) { - return cid.Undef, "", xerrors.Errorf("number of bytes written to CommP writer %d not equal to the CARv1 payload size %d", written, rd.Header.DataSize) - } - - cidAndSize, err := w.Sum() - if err != nil { - return cid.Undef, "", xerrors.Errorf("failed to get CommP: %w", err) - } - - if cidAndSize.PieceSize < dealSize { - // need to pad up! - rawPaddedCommp, err := commp.PadCommP( - // we know how long a pieceCid "hash" is, just blindly extract the trailing 32 bytes - cidAndSize.PieceCID.Hash()[len(cidAndSize.PieceCID.Hash())-32:], - uint64(cidAndSize.PieceSize), - uint64(dealSize), - ) - if err != nil { - return cid.Undef, "", err - } - cidAndSize.PieceCID, _ = commcid.DataCommitmentV1ToCID(rawPaddedCommp) - } - - return cidAndSize.PieceCID, filestore.Path(""), err + pieceCID, err := shared.GenerateCommp(rd.DataReader(), rd.Header.DataSize, uint64(dealSize)) + return pieceCID, "", err } func (p *providerDealEnvironment) FileStore() filestore.FileStore { diff --git a/storagemarket/impl/providerstates/provider_states_test.go b/storagemarket/impl/providerstates/provider_states_test.go index a87a39e4..6ec40eaf 100644 --- a/storagemarket/impl/providerstates/provider_states_test.go +++ b/storagemarket/impl/providerstates/provider_states_test.go @@ -1340,6 +1340,7 @@ func makeExecutor(ctx context.Context, PublishDealID: nodeParams.PublishDealID, WaitForPublishDealsError: nodeParams.WaitForPublishDealsError, OnDealCompleteError: nodeParams.OnDealCompleteError, + OnDealCompleteSkipCommP: true, DataCap: nodeParams.DataCap, GetDataCapErr: nodeParams.GetDataCapError, } diff --git a/storagemarket/integration_test.go b/storagemarket/integration_test.go index 17472811..6ee223b0 100644 --- a/storagemarket/integration_test.go +++ b/storagemarket/integration_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "fmt" "path/filepath" "sync" "testing" @@ -482,6 +483,98 @@ func TestRestartOnlyProviderDataTransfer(t *testing.T) { shared_testutil.AssertDealState(t, storagemarket.StorageDealExpired, pd.State) } +// TestRestartProviderAtPublishStage tests that if the provider is restarted +// when it's in the publish state, it will successfully complete the deal +func TestRestartProviderAtPublishStage(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + td := shared_testutil.NewLibp2pTestData(ctx, t) + smState := testnodes.NewStorageMarketState() + depGen := dependencies.NewDepGenerator() + deps := depGen.New(t, ctx, td, smState, "", noOpDelay, noOpDelay) + h := testharness.NewHarnessWithTestData(t, td, deps, true, false) + + // start client and provider + shared_testutil.StartAndWaitForReady(ctx, t, h.Provider) + shared_testutil.StartAndWaitForReady(ctx, t, h.Client) + + // set ask price where we'll accept any price + err := h.Provider.SetAsk(big.NewInt(0), big.NewInt(0), 50000) + require.NoError(t, err) + + // Listen for when the provider reaches the Publish state, and shut it down + wgProviderPublish := sync.WaitGroup{} + wgProviderPublish.Add(1) + var providerState []storagemarket.MinerDeal + h.Provider.SubscribeToEvents(func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) { + t.Logf("Provider %s: %s\n", storagemarket.ProviderEvents[event], storagemarket.DealStates[deal.State]) + if deal.State == storagemarket.StorageDealPublish { + require.NoError(t, h.Provider.Stop()) + + time.Sleep(time.Second) + + providerState, err = h.Provider.ListLocalDeals() + assert.NoError(t, err) + wgProviderPublish.Done() + } + }) + + // Propose a storage deal + result := h.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: h.PayloadCid}, false, false) + proposalCid := result.ProposalCid + t.Log("storage deal proposed") + + // Wait till the deal reaches the Publish state + waitGroupWait(ctx, &wgProviderPublish) + t.Log("provider has been shutdown") + + // Create new provider (but don't restart yet) + newProvider := h.CreateNewProvider(t, ctx, h.TestData) + + t.Logf("provider state after stopping is %s", storagemarket.DealStates[providerState[0].State]) + + // This wait group will complete after the deal has completed on both the + // client and provider + expireWg := sync.WaitGroup{} + expireWg.Add(1) + _ = newProvider.SubscribeToEvents(func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) { + t.Logf("New Provider %s: %s\n", storagemarket.ProviderEvents[event], storagemarket.DealStates[deal.State]) + if event == storagemarket.ProviderEventDealExpired { + expireWg.Done() + } + }) + + expireWg.Add(1) + _ = h.Client.SubscribeToEvents(func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) { + if event == storagemarket.ClientEventDealExpired { + expireWg.Done() + } + }) + + // sleep for a moment + time.Sleep(1 * time.Second) + t.Log("finished sleeping") + + // Restart the provider + err = newProvider.Start(ctx) + require.NoError(t, err) + t.Log("------- provider has been restarted---------") + + // Wait till both client and provider have completed the deal + waitGroupWait(ctx, &expireWg) + t.Log("---------- finished waiting for expected events-------") + + // Ensure the provider reached the final state + providerDeals, err := newProvider.ListLocalDeals() + require.NoError(t, err) + + pd := providerDeals[0] + require.Equal(t, pd.ProposalCid, proposalCid) + shared_testutil.AssertDealState(t, storagemarket.StorageDealExpired, pd.State) +} + // FIXME Gets hung sometimes func TestRestartClient(t *testing.T) { testCases := map[string]struct { @@ -572,9 +665,9 @@ func TestRestartClient(t *testing.T) { // Stop the client and provider at some point during deal negotiation ev := storagemarket.ClientEvents[event] t.Logf("event %s has happened on client, shutting down client and provider", ev) + require.NoError(t, h.Client.Stop()) require.NoError(t, h.TestData.MockNet.UnlinkPeers(host1.ID(), host2.ID())) require.NoError(t, h.TestData.MockNet.DisconnectPeers(host1.ID(), host2.ID())) - require.NoError(t, h.Client.Stop()) // if a provider stop event isn't specified, just stop the provider here if tc.stopAtProviderEvent == 0 { @@ -611,7 +704,11 @@ func TestRestartClient(t *testing.T) { cd, err := h.Client.GetLocalDeal(ctx, proposalCid) require.NoError(t, err) t.Logf("client state after stopping is %s", storagemarket.DealStates[cd.State]) - require.Equal(t, tc.expectedClientState, cd.State) + if tc.expectedClientState != cd.State { + t.Logf("client state message: %s", cd.Message) + require.Fail(t, fmt.Sprintf("client deal state mismatch:\nexpected: %s\nactual: %s", + storagemarket.DealStates[tc.expectedClientState], storagemarket.DealStates[cd.State])) + } deps := dependencies.NewDependenciesWithTestData(t, ctx, h.TestData, h.SMState, "", noOpDelay, noOpDelay) h = testharness.NewHarnessWithTestData(t, h.TestData, deps, true, false) diff --git a/storagemarket/testnodes/testnodes.go b/storagemarket/testnodes/testnodes.go index e0b8c4d7..814eb241 100644 --- a/storagemarket/testnodes/testnodes.go +++ b/storagemarket/testnodes/testnodes.go @@ -5,6 +5,7 @@ package testnodes import ( "context" "errors" + "fmt" "io/ioutil" "sync" "testing" @@ -340,6 +341,7 @@ type FakeProviderNode struct { PublishDealsError error WaitForPublishDealsError error OnDealCompleteError error + OnDealCompleteSkipCommP bool LastOnDealCompleteBytes []byte OnDealCompleteCalls []storagemarket.MinerDeal LocatePieceForDealWithinSectorError error @@ -380,7 +382,28 @@ func (n *FakeProviderNode) WaitForPublishDeals(ctx context.Context, mcid cid.Cid func (n *FakeProviderNode) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize abi.UnpaddedPieceSize, pieceReader shared.ReadSeekStarter) (*storagemarket.PackingResult, error) { n.OnDealCompleteCalls = append(n.OnDealCompleteCalls, deal) n.LastOnDealCompleteBytes, _ = ioutil.ReadAll(pieceReader) - // TODO: probably need to return some mock value here + + if n.OnDealCompleteError != nil || n.OnDealCompleteSkipCommP { + return &storagemarket.PackingResult{}, n.OnDealCompleteError + } + + // We read in all the bytes from the reader above, so seek back to the start + err := pieceReader.SeekStart() + if err != nil { + return nil, fmt.Errorf("on deal complete: seeking to start of piece data: %w", err) + } + + // Generate commP + pieceCID, err := shared.GenerateCommp(pieceReader, uint64(pieceSize), uint64(pieceSize)) + if err != nil { + return nil, fmt.Errorf("on deal complete: generating commp: %w", err) + } + + // Check that commP of the data matches the proposal piece CID + if pieceCID != deal.Proposal.PieceCID { + return nil, fmt.Errorf("on deal complete: proposal piece CID %s does not match calculated commP %s", deal.Proposal.PieceCID, pieceCID) + } + return &storagemarket.PackingResult{}, n.OnDealCompleteError }