Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix storage provider restart in publish stage #657

Merged
merged 1 commit into from
Dec 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions shared/commp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package shared

import (
"io"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-commp-utils/writer"
commcid "github.com/filecoin-project/go-fil-commcid"
commp "github.com/filecoin-project/go-fil-commp-hashhash"
)

func GenerateCommp(reader io.Reader, payloadSize uint64, targetSize uint64) (cid.Cid, error) {
// dump the CARv1 payload of the CARv2 file to the Commp Writer and get back the CommP.
w := &writer.Writer{}
written, err := io.Copy(w, reader)
if err != nil {
return cid.Undef, xerrors.Errorf("failed to write to CommP writer: %w", err)
}
if written != int64(payloadSize) {
return cid.Undef, xerrors.Errorf("number of bytes written to CommP writer %d not equal to the CARv1 payload size %d", written, payloadSize)
}

cidAndSize, err := w.Sum()
if err != nil {
return cid.Undef, xerrors.Errorf("failed to get CommP: %w", err)
}

if uint64(cidAndSize.PieceSize) < targetSize {
// need to pad up!
rawPaddedCommp, err := commp.PadCommP(
// we know how long a pieceCid "hash" is, just blindly extract the trailing 32 bytes
cidAndSize.PieceCID.Hash()[len(cidAndSize.PieceCID.Hash())-32:],
uint64(cidAndSize.PieceSize),
uint64(targetSize),
)
if err != nil {
return cid.Undef, err
}
cidAndSize.PieceCID, _ = commcid.DataCommitmentV1ToCID(rawPaddedCommp)
}

return cidAndSize.PieceCID, err
}
7 changes: 0 additions & 7 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,13 +614,6 @@ func (p *Provider) runMigrations(ctx context.Context) ([]storagemarket.MinerDeal
return nil, xerrors.Errorf("failed to fetch deals during startup: %w", err)
}

// re-track all deals for whom we still have a local blockstore.
for _, d := range deals {
if _, err := os.Stat(d.InboundCAR); err == nil && d.Ref != nil {
_, _ = p.stores.GetOrOpen(d.ProposalCid.String(), d.InboundCAR, d.Ref.Root)
}
}

Comment on lines -617 to -623
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece of code was causing the problem - we shouldn't need to do this as stores.GetOrOpen() will be called lazily

// migrate deals to the dagstore if still not migrated.
if ok, err := p.dagStore.MigrateDeals(ctx, deals); err != nil {
return nil, fmt.Errorf("failed to migrate deals to DAG store: %w", err)
Expand Down
41 changes: 7 additions & 34 deletions storagemarket/impl/provider_environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package storageimpl

import (
"context"
"io"
"os"

"github.com/ipfs/go-cid"
Expand All @@ -12,13 +11,11 @@ import (
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-commp-utils/writer"
commcid "github.com/filecoin-project/go-fil-commcid"
commp "github.com/filecoin-project/go-fil-commp-hashhash"
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
Expand All @@ -44,6 +41,10 @@ func (p *providerDealEnvironment) ReadCAR(path string) (*carv2.Reader, error) {
func (p *providerDealEnvironment) FinalizeBlockstore(proposalCid cid.Cid) error {
bs, err := p.p.stores.Get(proposalCid.String())
if err != nil {
if xerrors.Is(err, stores.ErrNotFound) {
// The blockstore has already been cleaned up
return nil
}
return xerrors.Errorf("failed to get read/write blockstore: %w", err)
}

Expand Down Expand Up @@ -107,36 +108,8 @@ func (p *providerDealEnvironment) GeneratePieceCommitment(proposalCid cid.Cid, c
}
}()

// dump the CARv1 payload of the CARv2 file to the Commp Writer and get back the CommP.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this code to its own file so it can be used from the test - no logical changes

w := &writer.Writer{}
written, err := io.Copy(w, rd.DataReader())
if err != nil {
return cid.Undef, "", xerrors.Errorf("failed to write to CommP writer: %w", err)
}
if written != int64(rd.Header.DataSize) {
return cid.Undef, "", xerrors.Errorf("number of bytes written to CommP writer %d not equal to the CARv1 payload size %d", written, rd.Header.DataSize)
}

cidAndSize, err := w.Sum()
if err != nil {
return cid.Undef, "", xerrors.Errorf("failed to get CommP: %w", err)
}

if cidAndSize.PieceSize < dealSize {
// need to pad up!
rawPaddedCommp, err := commp.PadCommP(
// we know how long a pieceCid "hash" is, just blindly extract the trailing 32 bytes
cidAndSize.PieceCID.Hash()[len(cidAndSize.PieceCID.Hash())-32:],
uint64(cidAndSize.PieceSize),
uint64(dealSize),
)
if err != nil {
return cid.Undef, "", err
}
cidAndSize.PieceCID, _ = commcid.DataCommitmentV1ToCID(rawPaddedCommp)
}

return cidAndSize.PieceCID, filestore.Path(""), err
pieceCID, err := shared.GenerateCommp(rd.DataReader(), rd.Header.DataSize, uint64(dealSize))
return pieceCID, "", err
}

func (p *providerDealEnvironment) FileStore() filestore.FileStore {
Expand Down
1 change: 1 addition & 0 deletions storagemarket/impl/providerstates/provider_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1340,6 +1340,7 @@ func makeExecutor(ctx context.Context,
PublishDealID: nodeParams.PublishDealID,
WaitForPublishDealsError: nodeParams.WaitForPublishDealsError,
OnDealCompleteError: nodeParams.OnDealCompleteError,
OnDealCompleteSkipCommP: true,
DataCap: nodeParams.DataCap,
GetDataCapErr: nodeParams.GetDataCapError,
}
Expand Down
101 changes: 99 additions & 2 deletions storagemarket/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"path/filepath"
"sync"
"testing"
Expand Down Expand Up @@ -482,6 +483,98 @@ func TestRestartOnlyProviderDataTransfer(t *testing.T) {
shared_testutil.AssertDealState(t, storagemarket.StorageDealExpired, pd.State)
}

// TestRestartProviderAtPublishStage tests that if the provider is restarted
// when it's in the publish state, it will successfully complete the deal
func TestRestartProviderAtPublishStage(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

td := shared_testutil.NewLibp2pTestData(ctx, t)
smState := testnodes.NewStorageMarketState()
depGen := dependencies.NewDepGenerator()
deps := depGen.New(t, ctx, td, smState, "", noOpDelay, noOpDelay)
h := testharness.NewHarnessWithTestData(t, td, deps, true, false)

// start client and provider
shared_testutil.StartAndWaitForReady(ctx, t, h.Provider)
shared_testutil.StartAndWaitForReady(ctx, t, h.Client)

// set ask price where we'll accept any price
err := h.Provider.SetAsk(big.NewInt(0), big.NewInt(0), 50000)
require.NoError(t, err)

// Listen for when the provider reaches the Publish state, and shut it down
wgProviderPublish := sync.WaitGroup{}
wgProviderPublish.Add(1)
var providerState []storagemarket.MinerDeal
h.Provider.SubscribeToEvents(func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
t.Logf("Provider %s: %s\n", storagemarket.ProviderEvents[event], storagemarket.DealStates[deal.State])
if deal.State == storagemarket.StorageDealPublish {
require.NoError(t, h.Provider.Stop())

time.Sleep(time.Second)

providerState, err = h.Provider.ListLocalDeals()
assert.NoError(t, err)
wgProviderPublish.Done()
}
})

// Propose a storage deal
result := h.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: h.PayloadCid}, false, false)
proposalCid := result.ProposalCid
t.Log("storage deal proposed")

// Wait till the deal reaches the Publish state
waitGroupWait(ctx, &wgProviderPublish)
t.Log("provider has been shutdown")

// Create new provider (but don't restart yet)
newProvider := h.CreateNewProvider(t, ctx, h.TestData)

t.Logf("provider state after stopping is %s", storagemarket.DealStates[providerState[0].State])

// This wait group will complete after the deal has completed on both the
// client and provider
expireWg := sync.WaitGroup{}
expireWg.Add(1)
_ = newProvider.SubscribeToEvents(func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
t.Logf("New Provider %s: %s\n", storagemarket.ProviderEvents[event], storagemarket.DealStates[deal.State])
if event == storagemarket.ProviderEventDealExpired {
expireWg.Done()
}
})

expireWg.Add(1)
_ = h.Client.SubscribeToEvents(func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
if event == storagemarket.ClientEventDealExpired {
expireWg.Done()
}
})

// sleep for a moment
time.Sleep(1 * time.Second)
t.Log("finished sleeping")

// Restart the provider
err = newProvider.Start(ctx)
require.NoError(t, err)
t.Log("------- provider has been restarted---------")

// Wait till both client and provider have completed the deal
waitGroupWait(ctx, &expireWg)
t.Log("---------- finished waiting for expected events-------")

// Ensure the provider reached the final state
providerDeals, err := newProvider.ListLocalDeals()
require.NoError(t, err)

pd := providerDeals[0]
require.Equal(t, pd.ProposalCid, proposalCid)
shared_testutil.AssertDealState(t, storagemarket.StorageDealExpired, pd.State)
}

// FIXME Gets hung sometimes
func TestRestartClient(t *testing.T) {
testCases := map[string]struct {
Expand Down Expand Up @@ -572,9 +665,9 @@ func TestRestartClient(t *testing.T) {
// Stop the client and provider at some point during deal negotiation
ev := storagemarket.ClientEvents[event]
t.Logf("event %s has happened on client, shutting down client and provider", ev)
require.NoError(t, h.Client.Stop())
require.NoError(t, h.TestData.MockNet.UnlinkPeers(host1.ID(), host2.ID()))
require.NoError(t, h.TestData.MockNet.DisconnectPeers(host1.ID(), host2.ID()))
require.NoError(t, h.Client.Stop())

// if a provider stop event isn't specified, just stop the provider here
if tc.stopAtProviderEvent == 0 {
Expand Down Expand Up @@ -611,7 +704,11 @@ func TestRestartClient(t *testing.T) {
cd, err := h.Client.GetLocalDeal(ctx, proposalCid)
require.NoError(t, err)
t.Logf("client state after stopping is %s", storagemarket.DealStates[cd.State])
require.Equal(t, tc.expectedClientState, cd.State)
if tc.expectedClientState != cd.State {
t.Logf("client state message: %s", cd.Message)
require.Fail(t, fmt.Sprintf("client deal state mismatch:\nexpected: %s\nactual: %s",
storagemarket.DealStates[tc.expectedClientState], storagemarket.DealStates[cd.State]))
}

deps := dependencies.NewDependenciesWithTestData(t, ctx, h.TestData, h.SMState, "", noOpDelay, noOpDelay)
h = testharness.NewHarnessWithTestData(t, h.TestData, deps, true, false)
Expand Down
25 changes: 24 additions & 1 deletion storagemarket/testnodes/testnodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package testnodes
import (
"context"
"errors"
"fmt"
"io/ioutil"
"sync"
"testing"
Expand Down Expand Up @@ -340,6 +341,7 @@ type FakeProviderNode struct {
PublishDealsError error
WaitForPublishDealsError error
OnDealCompleteError error
OnDealCompleteSkipCommP bool
LastOnDealCompleteBytes []byte
OnDealCompleteCalls []storagemarket.MinerDeal
LocatePieceForDealWithinSectorError error
Expand Down Expand Up @@ -380,7 +382,28 @@ func (n *FakeProviderNode) WaitForPublishDeals(ctx context.Context, mcid cid.Cid
func (n *FakeProviderNode) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize abi.UnpaddedPieceSize, pieceReader shared.ReadSeekStarter) (*storagemarket.PackingResult, error) {
n.OnDealCompleteCalls = append(n.OnDealCompleteCalls, deal)
n.LastOnDealCompleteBytes, _ = ioutil.ReadAll(pieceReader)
// TODO: probably need to return some mock value here

if n.OnDealCompleteError != nil || n.OnDealCompleteSkipCommP {
return &storagemarket.PackingResult{}, n.OnDealCompleteError
}

// We read in all the bytes from the reader above, so seek back to the start
err := pieceReader.SeekStart()
if err != nil {
return nil, fmt.Errorf("on deal complete: seeking to start of piece data: %w", err)
}

// Generate commP
pieceCID, err := shared.GenerateCommp(pieceReader, uint64(pieceSize), uint64(pieceSize))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added commp generation to the test so if the data is bad the test will fail

if err != nil {
return nil, fmt.Errorf("on deal complete: generating commp: %w", err)
}

// Check that commP of the data matches the proposal piece CID
if pieceCID != deal.Proposal.PieceCID {
return nil, fmt.Errorf("on deal complete: proposal piece CID %s does not match calculated commP %s", deal.Proposal.PieceCID, pieceCID)
}

return &storagemarket.PackingResult{}, n.OnDealCompleteError
}

Expand Down