diff --git a/coreiface/pin.go b/coreiface/pin.go index ba5df5354..6b97c6ca5 100644 --- a/coreiface/pin.go +++ b/coreiface/pin.go @@ -27,6 +27,9 @@ type PinStatus interface { // BadNodes returns any bad (usually missing) nodes from the pin BadNodes() []BadPinNode + + // if not nil, an error happened. Everything else should be ignored. + Err() error } // BadPinNode is a node that has been marked as bad by Pin.Verify diff --git a/coreiface/tests/pin.go b/coreiface/tests/pin.go index bbf602994..4b0fea01d 100644 --- a/coreiface/tests/pin.go +++ b/coreiface/tests/pin.go @@ -198,6 +198,9 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) { } n := 0 for r := range res { + if err := r.Err(); err != nil { + t.Error(err) + } if !r.Ok() { t.Error("expected pin to be ok") } @@ -208,7 +211,7 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) { t.Errorf("unexpected verify result count: %d", n) } - //TODO: figure out a way to test verify without touching IpfsNode + // TODO: figure out a way to test verify without touching IpfsNode /* err = api.Block().Rm(ctx, p0, opt.Block.Force(true)) if err != nil { diff --git a/ipld/merkledag/test/dag_generator.go b/ipld/merkledag/test/dag_generator.go new file mode 100644 index 000000000..ec6fba091 --- /dev/null +++ b/ipld/merkledag/test/dag_generator.go @@ -0,0 +1,86 @@ +package mdutils + +import ( + "context" + "fmt" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" + + "github.com/ipfs/boxo/ipld/merkledag" +) + +// NewDAGGenerator returns an object capable of +// producing IPLD DAGs. +func NewDAGGenerator() *DAGGenerator { + return &DAGGenerator{} +} + +// DAGGenerator generates BasicBlocks on demand. +// For each instance of DAGGenerator, each new DAG is different from the +// previous, although two different instances will produce the same, given the +// same parameters. +type DAGGenerator struct { + seq int +} + +// MakeDagBlock generate a balanced DAG with the given fanout and depth, and add the blocks to the adder. +// This adder can be for example a blockstore.Put or a blockservice.AddBlock. +func (dg *DAGGenerator) MakeDagBlock(adder func(ctx context.Context, block blocks.Block) error, fanout uint, depth uint) (c cid.Cid, allCids []cid.Cid, err error) { + return dg.MakeDagNode(func(ctx context.Context, node format.Node) error { + return adder(ctx, node.(blocks.Block)) + }, fanout, depth) +} + +// MakeDagNode generate a balanced DAG with the given fanout and depth, and add the blocks to the adder. +// This adder can be for example a DAGService.Add. +func (dg *DAGGenerator) MakeDagNode(adder func(ctx context.Context, node format.Node) error, fanout uint, depth uint) (c cid.Cid, allCids []cid.Cid, err error) { + c, _, allCids, err = dg.generate(adder, fanout, depth) + return c, allCids, err +} + +func (dg *DAGGenerator) generate(adder func(ctx context.Context, node format.Node) error, fanout uint, depth uint) (c cid.Cid, size uint64, allCids []cid.Cid, err error) { + if depth == 0 { + panic("depth should be at least 1") + } + if depth == 1 { + c, size, err = dg.encodeBlock(adder) + if err != nil { + return cid.Undef, 0, nil, err + } + return c, size, []cid.Cid{c}, nil + } + links := make([]*format.Link, fanout) + for i := uint(0); i < fanout; i++ { + root, size, children, err := dg.generate(adder, fanout, depth-1) + if err != nil { + return cid.Undef, 0, nil, err + } + links[i] = &format.Link{Cid: root, Size: size} + allCids = append(allCids, children...) + } + c, size, err = dg.encodeBlock(adder, links...) + if err != nil { + return cid.Undef, 0, nil, err + } + return c, size, append([]cid.Cid{c}, allCids...), nil +} + +func (dg *DAGGenerator) encodeBlock(adder func(ctx context.Context, node format.Node) error, links ...*format.Link) (cid.Cid, uint64, error) { + dg.seq++ + nd := &merkledag.ProtoNode{} + nd.SetData([]byte(fmt.Sprint(dg.seq))) + for i, link := range links { + err := nd.AddRawLink(fmt.Sprintf("link-%d", i), link) + if err != nil { + return cid.Undef, 0, err + } + } + err := adder(context.Background(), nd) + if err != nil { + return cid.Undef, 0, err + } + size, err := nd.Size() + return nd.Cid(), size, err +} diff --git a/ipld/merkledag/test/dag_generator_test.go b/ipld/merkledag/test/dag_generator_test.go new file mode 100644 index 000000000..0b900f8b1 --- /dev/null +++ b/ipld/merkledag/test/dag_generator_test.go @@ -0,0 +1,92 @@ +package mdutils + +import ( + "context" + "sync" + "testing" + + "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" +) + +type testDagServ struct { + mu sync.Mutex + nodes map[string]format.Node +} + +func newTestDagServ() *testDagServ { + return &testDagServ{nodes: make(map[string]format.Node)} +} + +func (d *testDagServ) Get(_ context.Context, cid cid.Cid) (format.Node, error) { + d.mu.Lock() + defer d.mu.Unlock() + if n, ok := d.nodes[cid.KeyString()]; ok { + return n, nil + } + return nil, format.ErrNotFound{Cid: cid} +} + +func (d *testDagServ) Add(_ context.Context, node format.Node) error { + d.mu.Lock() + defer d.mu.Unlock() + d.nodes[node.Cid().KeyString()] = node + return nil +} + +func TestNodesAreDifferent(t *testing.T) { + dserv := newTestDagServ() + gen := NewDAGGenerator() + + var allCids []cid.Cid + var allNodes []format.Node + + const nbDag = 5 + + for i := 0; i < nbDag; i++ { + c, cids, err := gen.MakeDagNode(dserv.Add, 5, 3) + if err != nil { + t.Fatal(err) + } + + allCids = append(allCids, cids...) + + // collect all nodes + var getChildren func(n format.Node) + getChildren = func(n format.Node) { + for _, link := range n.Links() { + n, err = dserv.Get(context.Background(), link.Cid) + if err != nil { + t.Fatal(err) + } + allNodes = append(allNodes, n) + getChildren(n) + } + } + n, err := dserv.Get(context.Background(), c) + if err != nil { + t.Fatal(err) + } + allNodes = append(allNodes, n) + getChildren(n) + + // make sure they are all different + for i, node1 := range allNodes { + for j, node2 := range allNodes { + if i != j { + if node1.Cid().String() == node2.Cid().String() { + t.Error("Found duplicate node") + } + } + } + } + } + + // expected count + if len(allNodes) != nbDag*31 { + t.Error("expected nbDag*31 nodes (1+5+5*5)") + } + if len(allCids) != nbDag*31 { + t.Error("expected nbDag*31 cids (1+5+5*5)") + } +} diff --git a/pinning/pinner/dspinner/pin.go b/pinning/pinner/dspinner/pin.go index 7441ca65b..bad23c693 100644 --- a/pinning/pinner/dspinner/pin.go +++ b/pinning/pinner/dspinner/pin.go @@ -10,8 +10,6 @@ import ( "path" "sync" - "github.com/ipfs/boxo/ipld/merkledag" - "github.com/ipfs/boxo/ipld/merkledag/dagutils" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" @@ -20,6 +18,8 @@ import ( "github.com/polydawn/refmt/cbor" "github.com/polydawn/refmt/obj/atlas" + "github.com/ipfs/boxo/ipld/merkledag" + "github.com/ipfs/boxo/ipld/merkledag/dagutils" ipfspinner "github.com/ipfs/boxo/pinning/pinner" "github.com/ipfs/boxo/pinning/pinner/dsindex" ) @@ -665,61 +665,56 @@ func (p *pinner) loadPin(ctx context.Context, pid string) (*pin, error) { } // DirectKeys returns a slice containing the directly pinned keys -func (p *pinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) { - p.lock.RLock() - defer p.lock.RUnlock() - - cidSet := cid.NewSet() - var e error - err := p.cidDIndex.ForEach(ctx, "", func(key, value string) bool { - var c cid.Cid - c, e = cid.Cast([]byte(key)) - if e != nil { - return false - } - cidSet.Add(c) - return true - }) - if err != nil { - return nil, err - } - if e != nil { - return nil, e - } - - return cidSet.Keys(), nil +func (p *pinner) DirectKeys(ctx context.Context) <-chan ipfspinner.StreamedCid { + return p.streamIndex(ctx, p.cidDIndex) } // RecursiveKeys returns a slice containing the recursively pinned keys -func (p *pinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) { - p.lock.RLock() - defer p.lock.RUnlock() +func (p *pinner) RecursiveKeys(ctx context.Context) <-chan ipfspinner.StreamedCid { + return p.streamIndex(ctx, p.cidRIndex) +} - cidSet := cid.NewSet() - var e error - err := p.cidRIndex.ForEach(ctx, "", func(key, value string) bool { - var c cid.Cid - c, e = cid.Cast([]byte(key)) - if e != nil { - return false +func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan ipfspinner.StreamedCid { + out := make(chan ipfspinner.StreamedCid) + + go func() { + defer close(out) + + p.lock.RLock() + defer p.lock.RUnlock() + + cidSet := cid.NewSet() + + err := index.ForEach(ctx, "", func(key, value string) bool { + c, err := cid.Cast([]byte(key)) + if err != nil { + out <- ipfspinner.StreamedCid{Err: err} + return false + } + if !cidSet.Has(c) { + select { + case <-ctx.Done(): + return false + case out <- ipfspinner.StreamedCid{C: c}: + } + cidSet.Add(c) + } + return true + }) + if err != nil { + out <- ipfspinner.StreamedCid{Err: err} } - cidSet.Add(c) - return true - }) - if err != nil { - return nil, err - } - if e != nil { - return nil, e - } + }() - return cidSet.Keys(), nil + return out } // InternalPins returns all cids kept pinned for the internal state of the // pinner -func (p *pinner) InternalPins(ctx context.Context) ([]cid.Cid, error) { - return nil, nil +func (p *pinner) InternalPins(ctx context.Context) <-chan ipfspinner.StreamedCid { + c := make(chan ipfspinner.StreamedCid) + close(c) + return c } // Update updates a recursive pin from one cid to another. This is equivalent diff --git a/pinning/pinner/dspinner/pin_test.go b/pinning/pinner/dspinner/pin_test.go index c6164651a..bc9588395 100644 --- a/pinning/pinner/dspinner/pin_test.go +++ b/pinning/pinner/dspinner/pin_test.go @@ -12,9 +12,6 @@ import ( bs "github.com/ipfs/boxo/blockservice" mdag "github.com/ipfs/boxo/ipld/merkledag" - blockstore "github.com/ipfs/boxo/blockstore" - offline "github.com/ipfs/boxo/exchange/offline" - util "github.com/ipfs/boxo/util" cid "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" @@ -22,6 +19,10 @@ import ( ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" + blockstore "github.com/ipfs/boxo/blockstore" + offline "github.com/ipfs/boxo/exchange/offline" + util "github.com/ipfs/boxo/util" + ipfspin "github.com/ipfs/boxo/pinning/pinner" ) @@ -198,10 +199,17 @@ func TestPinnerBasic(t *testing.T) { dk := d.Cid() assertPinned(t, p, dk, "pinned node not found.") - cids, err := p.RecursiveKeys(ctx) - if err != nil { - t.Fatal(err) + allCids := func(ch <-chan ipfspin.StreamedCid) (cids []cid.Cid) { + for val := range ch { + if val.Err != nil { + t.Fatal(val.Err) + } + cids = append(cids, val.C) + } + return cids } + + cids := allCids(p.RecursiveKeys(ctx)) if len(cids) != 2 { t.Error("expected 2 recursive pins") } @@ -243,10 +251,7 @@ func TestPinnerBasic(t *testing.T) { } } - cids, err = p.DirectKeys(ctx) - if err != nil { - t.Fatal(err) - } + cids = allCids(p.DirectKeys(ctx)) if len(cids) != 1 { t.Error("expected 1 direct pin") } @@ -254,9 +259,9 @@ func TestPinnerBasic(t *testing.T) { t.Error("wrong direct pin") } - cids, _ = p.InternalPins(ctx) + cids = allCids(p.InternalPins(ctx)) if len(cids) != 0 { - t.Error("shound not have internal keys") + t.Error("should not have internal keys") } err = p.Unpin(ctx, dk, false) diff --git a/pinning/pinner/pin.go b/pinning/pinner/pin.go index fcf7d764a..5151b7e64 100644 --- a/pinning/pinner/pin.go +++ b/pinning/pinner/pin.go @@ -38,7 +38,7 @@ const ( // Internal pins are cids used to keep the internal state of the pinner. Internal - // NotPinned + // NotPinned is a value to indicated that a cid is not pinned. NotPinned // Any refers to any pinned cid @@ -80,7 +80,7 @@ var ErrNotPinned = fmt.Errorf("not pinned or pinned indirectly") // A Pinner provides the necessary methods to keep track of Nodes which are // to be kept locally, according to a pin mode. In practice, a Pinner is in -// in charge of keeping the list of items from the local storage that should +// charge of keeping the list of items from the local storage that should // not be garbage-collected. type Pinner interface { // IsPinned returns whether or not the given cid is pinned @@ -119,14 +119,14 @@ type Pinner interface { Flush(ctx context.Context) error // DirectKeys returns all directly pinned cids - DirectKeys(ctx context.Context) ([]cid.Cid, error) + DirectKeys(ctx context.Context) <-chan StreamedCid // RecursiveKeys returns all recursively pinned cids - RecursiveKeys(ctx context.Context) ([]cid.Cid, error) + RecursiveKeys(ctx context.Context) <-chan StreamedCid // InternalPins returns all cids kept pinned for the internal state of the // pinner - InternalPins(ctx context.Context) ([]cid.Cid, error) + InternalPins(ctx context.Context) <-chan StreamedCid } // Pinned represents CID which has been pinned with a pinning strategy. @@ -156,3 +156,9 @@ func (p Pinned) String() string { return fmt.Sprintf("pinned: %s", modeStr) } } + +// StreamedCid encapsulate a Cid and an error for a function to return a channel of Cids. +type StreamedCid struct { + C cid.Cid + Err error +} diff --git a/provider/simple/reprovide.go b/provider/simple/reprovide.go index a29b484fc..51056ad6a 100644 --- a/provider/simple/reprovide.go +++ b/provider/simple/reprovide.go @@ -7,15 +7,17 @@ import ( "time" "github.com/cenkalti/backoff" - blocks "github.com/ipfs/boxo/blockstore" - "github.com/ipfs/boxo/fetcher" - fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers" - "github.com/ipfs/boxo/verifcid" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil" logging "github.com/ipfs/go-log" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/libp2p/go-libp2p/core/routing" + + blocks "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/fetcher" + fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers" + pin "github.com/ipfs/boxo/pinning/pinner" + "github.com/ipfs/boxo/verifcid" ) var logR = logging.Logger("reprovider.simple") @@ -180,8 +182,8 @@ func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { // Pinner interface defines how the simple.Reprovider wants to interact // with a Pinning service type Pinner interface { - DirectKeys(ctx context.Context) ([]cid.Cid, error) - RecursiveKeys(ctx context.Context) ([]cid.Cid, error) + DirectKeys(ctx context.Context) <-chan pin.StreamedCid + RecursiveKeys(ctx context.Context) <-chan pin.StreamedCid } // NewPinnedProvider returns provider supplying pinned keys @@ -217,26 +219,25 @@ func pinSet(ctx context.Context, pinning Pinner, fetchConfig fetcher.Factory, on defer cancel() defer close(set.New) - dkeys, err := pinning.DirectKeys(ctx) - if err != nil { - logR.Errorf("reprovide direct pins: %s", err) - return - } - for _, key := range dkeys { - set.Visitor(ctx)(key) - } - - rkeys, err := pinning.RecursiveKeys(ctx) - if err != nil { - logR.Errorf("reprovide indirect pins: %s", err) - return + dkeys := pinning.DirectKeys(ctx) + for wrapper := range dkeys { + if wrapper.Err != nil { + logR.Errorf("reprovide direct pins: %s", wrapper.Err) + return + } + set.Visitor(ctx)(wrapper.C) } + rkeys := pinning.RecursiveKeys(ctx) session := fetchConfig.NewSession(ctx) - for _, key := range rkeys { - set.Visitor(ctx)(key) + for wrapper := range rkeys { + if wrapper.Err != nil { + logR.Errorf("reprovide indirect pins: %s", wrapper.Err) + return + } + set.Visitor(ctx)(wrapper.C) if !onlyRoots { - err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: key}, func(res fetcher.FetchResult) error { + err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: wrapper.C}, func(res fetcher.FetchResult) error { clink, ok := res.LastBlockLink.(cidlink.Link) if ok { set.Visitor(ctx)(clink.Cid) diff --git a/provider/simple/reprovide_test.go b/provider/simple/reprovide_test.go index 8b521ae56..6641a3315 100644 --- a/provider/simple/reprovide_test.go +++ b/provider/simple/reprovide_test.go @@ -6,12 +6,6 @@ import ( "testing" "time" - bsrv "github.com/ipfs/boxo/blockservice" - blockstore "github.com/ipfs/boxo/blockstore" - offline "github.com/ipfs/boxo/exchange/offline" - bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" - "github.com/ipfs/boxo/internal/test" - mock "github.com/ipfs/boxo/routing/mock" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" @@ -25,6 +19,14 @@ import ( "github.com/libp2p/go-libp2p/core/peer" mh "github.com/multiformats/go-multihash" + bsrv "github.com/ipfs/boxo/blockservice" + blockstore "github.com/ipfs/boxo/blockstore" + offline "github.com/ipfs/boxo/exchange/offline" + bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" + "github.com/ipfs/boxo/internal/test" + pin "github.com/ipfs/boxo/pinning/pinner" + mock "github.com/ipfs/boxo/routing/mock" + . "github.com/ipfs/boxo/provider/simple" ) @@ -224,12 +226,34 @@ type mockPinner struct { direct []cid.Cid } -func (mp *mockPinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) { - return mp.direct, nil +func (mp *mockPinner) DirectKeys(ctx context.Context) <-chan pin.StreamedCid { + out := make(chan pin.StreamedCid) + go func() { + defer close(out) + for _, c := range mp.direct { + select { + case <-ctx.Done(): + return + case out <- pin.StreamedCid{C: c}: + } + } + }() + return out } -func (mp *mockPinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) { - return mp.recursive, nil +func (mp *mockPinner) RecursiveKeys(ctx context.Context) <-chan pin.StreamedCid { + out := make(chan pin.StreamedCid) + go func() { + defer close(out) + for _, c := range mp.recursive { + select { + case <-ctx.Done(): + return + case out <- pin.StreamedCid{C: c}: + } + } + }() + return out } func TestReprovidePinned(t *testing.T) {