Skip to content

Commit

Permalink
fix: storage provider restart in publish stage (#657)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc authored Dec 21, 2021
1 parent e95a234 commit 00ea88b
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 44 deletions.
45 changes: 45 additions & 0 deletions shared/commp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package shared

import (
"io"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-commp-utils/writer"
commcid "github.com/filecoin-project/go-fil-commcid"
commp "github.com/filecoin-project/go-fil-commp-hashhash"
)

func GenerateCommp(reader io.Reader, payloadSize uint64, targetSize uint64) (cid.Cid, error) {
// dump the CARv1 payload of the CARv2 file to the Commp Writer and get back the CommP.
w := &writer.Writer{}
written, err := io.Copy(w, reader)
if err != nil {
return cid.Undef, xerrors.Errorf("failed to write to CommP writer: %w", err)
}
if written != int64(payloadSize) {
return cid.Undef, xerrors.Errorf("number of bytes written to CommP writer %d not equal to the CARv1 payload size %d", written, payloadSize)
}

cidAndSize, err := w.Sum()
if err != nil {
return cid.Undef, xerrors.Errorf("failed to get CommP: %w", err)
}

if uint64(cidAndSize.PieceSize) < targetSize {
// need to pad up!
rawPaddedCommp, err := commp.PadCommP(
// we know how long a pieceCid "hash" is, just blindly extract the trailing 32 bytes
cidAndSize.PieceCID.Hash()[len(cidAndSize.PieceCID.Hash())-32:],
uint64(cidAndSize.PieceSize),
uint64(targetSize),
)
if err != nil {
return cid.Undef, err
}
cidAndSize.PieceCID, _ = commcid.DataCommitmentV1ToCID(rawPaddedCommp)
}

return cidAndSize.PieceCID, err
}
7 changes: 0 additions & 7 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,13 +614,6 @@ func (p *Provider) runMigrations(ctx context.Context) ([]storagemarket.MinerDeal
return nil, xerrors.Errorf("failed to fetch deals during startup: %w", err)
}

// re-track all deals for whom we still have a local blockstore.
for _, d := range deals {
if _, err := os.Stat(d.InboundCAR); err == nil && d.Ref != nil {
_, _ = p.stores.GetOrOpen(d.ProposalCid.String(), d.InboundCAR, d.Ref.Root)
}
}

// migrate deals to the dagstore if still not migrated.
if ok, err := p.dagStore.MigrateDeals(ctx, deals); err != nil {
return nil, fmt.Errorf("failed to migrate deals to DAG store: %w", err)
Expand Down
41 changes: 7 additions & 34 deletions storagemarket/impl/provider_environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package storageimpl

import (
"context"
"io"
"os"

"github.com/ipfs/go-cid"
Expand All @@ -12,13 +11,11 @@ import (
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-commp-utils/writer"
commcid "github.com/filecoin-project/go-fil-commcid"
commp "github.com/filecoin-project/go-fil-commp-hashhash"
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
Expand All @@ -44,6 +41,10 @@ func (p *providerDealEnvironment) ReadCAR(path string) (*carv2.Reader, error) {
func (p *providerDealEnvironment) FinalizeBlockstore(proposalCid cid.Cid) error {
bs, err := p.p.stores.Get(proposalCid.String())
if err != nil {
if xerrors.Is(err, stores.ErrNotFound) {
// The blockstore has already been cleaned up
return nil
}
return xerrors.Errorf("failed to get read/write blockstore: %w", err)
}

Expand Down Expand Up @@ -107,36 +108,8 @@ func (p *providerDealEnvironment) GeneratePieceCommitment(proposalCid cid.Cid, c
}
}()

// dump the CARv1 payload of the CARv2 file to the Commp Writer and get back the CommP.
w := &writer.Writer{}
written, err := io.Copy(w, rd.DataReader())
if err != nil {
return cid.Undef, "", xerrors.Errorf("failed to write to CommP writer: %w", err)
}
if written != int64(rd.Header.DataSize) {
return cid.Undef, "", xerrors.Errorf("number of bytes written to CommP writer %d not equal to the CARv1 payload size %d", written, rd.Header.DataSize)
}

cidAndSize, err := w.Sum()
if err != nil {
return cid.Undef, "", xerrors.Errorf("failed to get CommP: %w", err)
}

if cidAndSize.PieceSize < dealSize {
// need to pad up!
rawPaddedCommp, err := commp.PadCommP(
// we know how long a pieceCid "hash" is, just blindly extract the trailing 32 bytes
cidAndSize.PieceCID.Hash()[len(cidAndSize.PieceCID.Hash())-32:],
uint64(cidAndSize.PieceSize),
uint64(dealSize),
)
if err != nil {
return cid.Undef, "", err
}
cidAndSize.PieceCID, _ = commcid.DataCommitmentV1ToCID(rawPaddedCommp)
}

return cidAndSize.PieceCID, filestore.Path(""), err
pieceCID, err := shared.GenerateCommp(rd.DataReader(), rd.Header.DataSize, uint64(dealSize))
return pieceCID, "", err
}

func (p *providerDealEnvironment) FileStore() filestore.FileStore {
Expand Down
1 change: 1 addition & 0 deletions storagemarket/impl/providerstates/provider_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1340,6 +1340,7 @@ func makeExecutor(ctx context.Context,
PublishDealID: nodeParams.PublishDealID,
WaitForPublishDealsError: nodeParams.WaitForPublishDealsError,
OnDealCompleteError: nodeParams.OnDealCompleteError,
OnDealCompleteSkipCommP: true,
DataCap: nodeParams.DataCap,
GetDataCapErr: nodeParams.GetDataCapError,
}
Expand Down
101 changes: 99 additions & 2 deletions storagemarket/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"path/filepath"
"sync"
"testing"
Expand Down Expand Up @@ -482,6 +483,98 @@ func TestRestartOnlyProviderDataTransfer(t *testing.T) {
shared_testutil.AssertDealState(t, storagemarket.StorageDealExpired, pd.State)
}

// TestRestartProviderAtPublishStage tests that if the provider is restarted
// when it's in the publish state, it will successfully complete the deal
func TestRestartProviderAtPublishStage(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

td := shared_testutil.NewLibp2pTestData(ctx, t)
smState := testnodes.NewStorageMarketState()
depGen := dependencies.NewDepGenerator()
deps := depGen.New(t, ctx, td, smState, "", noOpDelay, noOpDelay)
h := testharness.NewHarnessWithTestData(t, td, deps, true, false)

// start client and provider
shared_testutil.StartAndWaitForReady(ctx, t, h.Provider)
shared_testutil.StartAndWaitForReady(ctx, t, h.Client)

// set ask price where we'll accept any price
err := h.Provider.SetAsk(big.NewInt(0), big.NewInt(0), 50000)
require.NoError(t, err)

// Listen for when the provider reaches the Publish state, and shut it down
wgProviderPublish := sync.WaitGroup{}
wgProviderPublish.Add(1)
var providerState []storagemarket.MinerDeal
h.Provider.SubscribeToEvents(func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
t.Logf("Provider %s: %s\n", storagemarket.ProviderEvents[event], storagemarket.DealStates[deal.State])
if deal.State == storagemarket.StorageDealPublish {
require.NoError(t, h.Provider.Stop())

time.Sleep(time.Second)

providerState, err = h.Provider.ListLocalDeals()
assert.NoError(t, err)
wgProviderPublish.Done()
}
})

// Propose a storage deal
result := h.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: h.PayloadCid}, false, false)
proposalCid := result.ProposalCid
t.Log("storage deal proposed")

// Wait till the deal reaches the Publish state
waitGroupWait(ctx, &wgProviderPublish)
t.Log("provider has been shutdown")

// Create new provider (but don't restart yet)
newProvider := h.CreateNewProvider(t, ctx, h.TestData)

t.Logf("provider state after stopping is %s", storagemarket.DealStates[providerState[0].State])

// This wait group will complete after the deal has completed on both the
// client and provider
expireWg := sync.WaitGroup{}
expireWg.Add(1)
_ = newProvider.SubscribeToEvents(func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
t.Logf("New Provider %s: %s\n", storagemarket.ProviderEvents[event], storagemarket.DealStates[deal.State])
if event == storagemarket.ProviderEventDealExpired {
expireWg.Done()
}
})

expireWg.Add(1)
_ = h.Client.SubscribeToEvents(func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
if event == storagemarket.ClientEventDealExpired {
expireWg.Done()
}
})

// sleep for a moment
time.Sleep(1 * time.Second)
t.Log("finished sleeping")

// Restart the provider
err = newProvider.Start(ctx)
require.NoError(t, err)
t.Log("------- provider has been restarted---------")

// Wait till both client and provider have completed the deal
waitGroupWait(ctx, &expireWg)
t.Log("---------- finished waiting for expected events-------")

// Ensure the provider reached the final state
providerDeals, err := newProvider.ListLocalDeals()
require.NoError(t, err)

pd := providerDeals[0]
require.Equal(t, pd.ProposalCid, proposalCid)
shared_testutil.AssertDealState(t, storagemarket.StorageDealExpired, pd.State)
}

// FIXME Gets hung sometimes
func TestRestartClient(t *testing.T) {
testCases := map[string]struct {
Expand Down Expand Up @@ -572,9 +665,9 @@ func TestRestartClient(t *testing.T) {
// Stop the client and provider at some point during deal negotiation
ev := storagemarket.ClientEvents[event]
t.Logf("event %s has happened on client, shutting down client and provider", ev)
require.NoError(t, h.Client.Stop())
require.NoError(t, h.TestData.MockNet.UnlinkPeers(host1.ID(), host2.ID()))
require.NoError(t, h.TestData.MockNet.DisconnectPeers(host1.ID(), host2.ID()))
require.NoError(t, h.Client.Stop())

// if a provider stop event isn't specified, just stop the provider here
if tc.stopAtProviderEvent == 0 {
Expand Down Expand Up @@ -611,7 +704,11 @@ func TestRestartClient(t *testing.T) {
cd, err := h.Client.GetLocalDeal(ctx, proposalCid)
require.NoError(t, err)
t.Logf("client state after stopping is %s", storagemarket.DealStates[cd.State])
require.Equal(t, tc.expectedClientState, cd.State)
if tc.expectedClientState != cd.State {
t.Logf("client state message: %s", cd.Message)
require.Fail(t, fmt.Sprintf("client deal state mismatch:\nexpected: %s\nactual: %s",
storagemarket.DealStates[tc.expectedClientState], storagemarket.DealStates[cd.State]))
}

deps := dependencies.NewDependenciesWithTestData(t, ctx, h.TestData, h.SMState, "", noOpDelay, noOpDelay)
h = testharness.NewHarnessWithTestData(t, h.TestData, deps, true, false)
Expand Down
25 changes: 24 additions & 1 deletion storagemarket/testnodes/testnodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package testnodes
import (
"context"
"errors"
"fmt"
"io/ioutil"
"sync"
"testing"
Expand Down Expand Up @@ -340,6 +341,7 @@ type FakeProviderNode struct {
PublishDealsError error
WaitForPublishDealsError error
OnDealCompleteError error
OnDealCompleteSkipCommP bool
LastOnDealCompleteBytes []byte
OnDealCompleteCalls []storagemarket.MinerDeal
LocatePieceForDealWithinSectorError error
Expand Down Expand Up @@ -380,7 +382,28 @@ func (n *FakeProviderNode) WaitForPublishDeals(ctx context.Context, mcid cid.Cid
func (n *FakeProviderNode) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize abi.UnpaddedPieceSize, pieceReader shared.ReadSeekStarter) (*storagemarket.PackingResult, error) {
n.OnDealCompleteCalls = append(n.OnDealCompleteCalls, deal)
n.LastOnDealCompleteBytes, _ = ioutil.ReadAll(pieceReader)
// TODO: probably need to return some mock value here

if n.OnDealCompleteError != nil || n.OnDealCompleteSkipCommP {
return &storagemarket.PackingResult{}, n.OnDealCompleteError
}

// We read in all the bytes from the reader above, so seek back to the start
err := pieceReader.SeekStart()
if err != nil {
return nil, fmt.Errorf("on deal complete: seeking to start of piece data: %w", err)
}

// Generate commP
pieceCID, err := shared.GenerateCommp(pieceReader, uint64(pieceSize), uint64(pieceSize))
if err != nil {
return nil, fmt.Errorf("on deal complete: generating commp: %w", err)
}

// Check that commP of the data matches the proposal piece CID
if pieceCID != deal.Proposal.PieceCID {
return nil, fmt.Errorf("on deal complete: proposal piece CID %s does not match calculated commP %s", deal.Proposal.PieceCID, pieceCID)
}

return &storagemarket.PackingResult{}, n.OnDealCompleteError
}

Expand Down

0 comments on commit 00ea88b

Please sign in to comment.