Skip to content

Commit

Permalink
pinner: change the interface to have async pin listing
Browse files Browse the repository at this point in the history
The rational is that if the pin list get big, a synchronous call to get the complete list can delay handling unnecessarily. For example, when listing indirect pins, you can start walking the DAGs immediately with the first recursive pin instead of waiting for the full list.

This matters even more on low power device, of if the pin list is stored remotely.

* coreiface: allow to return an error not linked to a specific Cid
* merkledag/test: add a DAG generator

Rationale is that generating a test DAG is quite difficult, and anything that helps writing better tests is helpful.
  • Loading branch information
MichaelMure authored Jun 2, 2023
1 parent 4c5c98b commit e2fc7f2
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 97 deletions.
3 changes: 3 additions & 0 deletions coreiface/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type PinStatus interface {

// BadNodes returns any bad (usually missing) nodes from the pin
BadNodes() []BadPinNode

// if not nil, an error happened. Everything else should be ignored.
Err() error
}

// BadPinNode is a node that has been marked as bad by Pin.Verify
Expand Down
5 changes: 4 additions & 1 deletion coreiface/tests/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) {
}
n := 0
for r := range res {
if err := r.Err(); err != nil {
t.Error(err)
}
if !r.Ok() {
t.Error("expected pin to be ok")
}
Expand All @@ -208,7 +211,7 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) {
t.Errorf("unexpected verify result count: %d", n)
}

//TODO: figure out a way to test verify without touching IpfsNode
// TODO: figure out a way to test verify without touching IpfsNode
/*
err = api.Block().Rm(ctx, p0, opt.Block.Force(true))
if err != nil {
Expand Down
86 changes: 86 additions & 0 deletions ipld/merkledag/test/dag_generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package mdutils

import (
"context"
"fmt"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"

"github.com/ipfs/boxo/ipld/merkledag"
)

// NewDAGGenerator returns an object capable of
// producing IPLD DAGs.
func NewDAGGenerator() *DAGGenerator {
return &DAGGenerator{}
}

// DAGGenerator generates BasicBlocks on demand.
// For each instance of DAGGenerator, each new DAG is different from the
// previous, although two different instances will produce the same, given the
// same parameters.
type DAGGenerator struct {
seq int
}

// MakeDagBlock generate a balanced DAG with the given fanout and depth, and add the blocks to the adder.
// This adder can be for example a blockstore.Put or a blockservice.AddBlock.
func (dg *DAGGenerator) MakeDagBlock(adder func(ctx context.Context, block blocks.Block) error, fanout uint, depth uint) (c cid.Cid, allCids []cid.Cid, err error) {
return dg.MakeDagNode(func(ctx context.Context, node format.Node) error {
return adder(ctx, node.(blocks.Block))
}, fanout, depth)
}

// MakeDagNode generate a balanced DAG with the given fanout and depth, and add the blocks to the adder.
// This adder can be for example a DAGService.Add.
func (dg *DAGGenerator) MakeDagNode(adder func(ctx context.Context, node format.Node) error, fanout uint, depth uint) (c cid.Cid, allCids []cid.Cid, err error) {
c, _, allCids, err = dg.generate(adder, fanout, depth)
return c, allCids, err
}

func (dg *DAGGenerator) generate(adder func(ctx context.Context, node format.Node) error, fanout uint, depth uint) (c cid.Cid, size uint64, allCids []cid.Cid, err error) {
if depth == 0 {
panic("depth should be at least 1")
}
if depth == 1 {
c, size, err = dg.encodeBlock(adder)
if err != nil {
return cid.Undef, 0, nil, err
}
return c, size, []cid.Cid{c}, nil
}
links := make([]*format.Link, fanout)
for i := uint(0); i < fanout; i++ {
root, size, children, err := dg.generate(adder, fanout, depth-1)
if err != nil {
return cid.Undef, 0, nil, err
}
links[i] = &format.Link{Cid: root, Size: size}
allCids = append(allCids, children...)
}
c, size, err = dg.encodeBlock(adder, links...)
if err != nil {
return cid.Undef, 0, nil, err
}
return c, size, append([]cid.Cid{c}, allCids...), nil
}

func (dg *DAGGenerator) encodeBlock(adder func(ctx context.Context, node format.Node) error, links ...*format.Link) (cid.Cid, uint64, error) {
dg.seq++
nd := &merkledag.ProtoNode{}
nd.SetData([]byte(fmt.Sprint(dg.seq)))
for i, link := range links {
err := nd.AddRawLink(fmt.Sprintf("link-%d", i), link)
if err != nil {
return cid.Undef, 0, err
}
}
err := adder(context.Background(), nd)
if err != nil {
return cid.Undef, 0, err
}
size, err := nd.Size()
return nd.Cid(), size, err
}
92 changes: 92 additions & 0 deletions ipld/merkledag/test/dag_generator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package mdutils

import (
"context"
"sync"
"testing"

"github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"
)

type testDagServ struct {
mu sync.Mutex
nodes map[string]format.Node
}

func newTestDagServ() *testDagServ {
return &testDagServ{nodes: make(map[string]format.Node)}
}

func (d *testDagServ) Get(_ context.Context, cid cid.Cid) (format.Node, error) {
d.mu.Lock()
defer d.mu.Unlock()
if n, ok := d.nodes[cid.KeyString()]; ok {
return n, nil
}
return nil, format.ErrNotFound{Cid: cid}
}

func (d *testDagServ) Add(_ context.Context, node format.Node) error {
d.mu.Lock()
defer d.mu.Unlock()
d.nodes[node.Cid().KeyString()] = node
return nil
}

func TestNodesAreDifferent(t *testing.T) {
dserv := newTestDagServ()
gen := NewDAGGenerator()

var allCids []cid.Cid
var allNodes []format.Node

const nbDag = 5

for i := 0; i < nbDag; i++ {
c, cids, err := gen.MakeDagNode(dserv.Add, 5, 3)
if err != nil {
t.Fatal(err)
}

allCids = append(allCids, cids...)

// collect all nodes
var getChildren func(n format.Node)
getChildren = func(n format.Node) {
for _, link := range n.Links() {
n, err = dserv.Get(context.Background(), link.Cid)
if err != nil {
t.Fatal(err)
}
allNodes = append(allNodes, n)
getChildren(n)
}
}
n, err := dserv.Get(context.Background(), c)
if err != nil {
t.Fatal(err)
}
allNodes = append(allNodes, n)
getChildren(n)

// make sure they are all different
for i, node1 := range allNodes {
for j, node2 := range allNodes {
if i != j {
if node1.Cid().String() == node2.Cid().String() {
t.Error("Found duplicate node")
}
}
}
}
}

// expected count
if len(allNodes) != nbDag*31 {
t.Error("expected nbDag*31 nodes (1+5+5*5)")
}
if len(allCids) != nbDag*31 {
t.Error("expected nbDag*31 cids (1+5+5*5)")
}
}
89 changes: 42 additions & 47 deletions pinning/pinner/dspinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"path"
"sync"

"github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipld/merkledag/dagutils"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
Expand All @@ -20,6 +18,8 @@ import (
"github.com/polydawn/refmt/cbor"
"github.com/polydawn/refmt/obj/atlas"

"github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipld/merkledag/dagutils"
ipfspinner "github.com/ipfs/boxo/pinning/pinner"
"github.com/ipfs/boxo/pinning/pinner/dsindex"
)
Expand Down Expand Up @@ -665,61 +665,56 @@ func (p *pinner) loadPin(ctx context.Context, pid string) (*pin, error) {
}

// DirectKeys returns a slice containing the directly pinned keys
func (p *pinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()

cidSet := cid.NewSet()
var e error
err := p.cidDIndex.ForEach(ctx, "", func(key, value string) bool {
var c cid.Cid
c, e = cid.Cast([]byte(key))
if e != nil {
return false
}
cidSet.Add(c)
return true
})
if err != nil {
return nil, err
}
if e != nil {
return nil, e
}

return cidSet.Keys(), nil
func (p *pinner) DirectKeys(ctx context.Context) <-chan ipfspinner.StreamedCid {
return p.streamIndex(ctx, p.cidDIndex)
}

// RecursiveKeys returns a slice containing the recursively pinned keys
func (p *pinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()
func (p *pinner) RecursiveKeys(ctx context.Context) <-chan ipfspinner.StreamedCid {
return p.streamIndex(ctx, p.cidRIndex)
}

cidSet := cid.NewSet()
var e error
err := p.cidRIndex.ForEach(ctx, "", func(key, value string) bool {
var c cid.Cid
c, e = cid.Cast([]byte(key))
if e != nil {
return false
func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan ipfspinner.StreamedCid {
out := make(chan ipfspinner.StreamedCid)

go func() {
defer close(out)

p.lock.RLock()
defer p.lock.RUnlock()

cidSet := cid.NewSet()

err := index.ForEach(ctx, "", func(key, value string) bool {
c, err := cid.Cast([]byte(key))
if err != nil {
out <- ipfspinner.StreamedCid{Err: err}
return false
}
if !cidSet.Has(c) {
select {
case <-ctx.Done():
return false
case out <- ipfspinner.StreamedCid{C: c}:
}
cidSet.Add(c)
}
return true
})
if err != nil {
out <- ipfspinner.StreamedCid{Err: err}
}
cidSet.Add(c)
return true
})
if err != nil {
return nil, err
}
if e != nil {
return nil, e
}
}()

return cidSet.Keys(), nil
return out
}

// InternalPins returns all cids kept pinned for the internal state of the
// pinner
func (p *pinner) InternalPins(ctx context.Context) ([]cid.Cid, error) {
return nil, nil
func (p *pinner) InternalPins(ctx context.Context) <-chan ipfspinner.StreamedCid {
c := make(chan ipfspinner.StreamedCid)
close(c)
return c
}

// Update updates a recursive pin from one cid to another. This is equivalent
Expand Down
29 changes: 17 additions & 12 deletions pinning/pinner/dspinner/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@ import (
bs "github.com/ipfs/boxo/blockservice"
mdag "github.com/ipfs/boxo/ipld/merkledag"

blockstore "github.com/ipfs/boxo/blockstore"
offline "github.com/ipfs/boxo/exchange/offline"
util "github.com/ipfs/boxo/util"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
dssync "github.com/ipfs/go-datastore/sync"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"

blockstore "github.com/ipfs/boxo/blockstore"
offline "github.com/ipfs/boxo/exchange/offline"
util "github.com/ipfs/boxo/util"

ipfspin "github.com/ipfs/boxo/pinning/pinner"
)

Expand Down Expand Up @@ -198,10 +199,17 @@ func TestPinnerBasic(t *testing.T) {
dk := d.Cid()
assertPinned(t, p, dk, "pinned node not found.")

cids, err := p.RecursiveKeys(ctx)
if err != nil {
t.Fatal(err)
allCids := func(ch <-chan ipfspin.StreamedCid) (cids []cid.Cid) {
for val := range ch {
if val.Err != nil {
t.Fatal(val.Err)
}
cids = append(cids, val.C)
}
return cids
}

cids := allCids(p.RecursiveKeys(ctx))
if len(cids) != 2 {
t.Error("expected 2 recursive pins")
}
Expand Down Expand Up @@ -243,20 +251,17 @@ func TestPinnerBasic(t *testing.T) {
}
}

cids, err = p.DirectKeys(ctx)
if err != nil {
t.Fatal(err)
}
cids = allCids(p.DirectKeys(ctx))
if len(cids) != 1 {
t.Error("expected 1 direct pin")
}
if cids[0] != ak {
t.Error("wrong direct pin")
}

cids, _ = p.InternalPins(ctx)
cids = allCids(p.InternalPins(ctx))
if len(cids) != 0 {
t.Error("shound not have internal keys")
t.Error("should not have internal keys")
}

err = p.Unpin(ctx, dk, false)
Expand Down
Loading

0 comments on commit e2fc7f2

Please sign in to comment.