Skip to content

Commit

Permalink
Merge pull request #3255 from ipfs/kevina/getlinks
Browse files Browse the repository at this point in the history
Add DAGService.GetLinks() method and use it in the GC and elsewhere.
  • Loading branch information
whyrusleeping authored Oct 8, 2016
2 parents 67a1b3e + 772164c commit 2fd045f
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 72 deletions.
6 changes: 1 addition & 5 deletions core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,8 @@ func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGSer
provided := cid.NewSet()
for _, c := range cids {
kset := cid.NewSet()
node, err := dserv.Get(ctx, c)
if err != nil {
return err
}

err = dag.EnumerateChildrenAsync(ctx, dserv, node, kset.Visit)
err := dag.EnumerateChildrenAsync(ctx, dserv, c, kset.Visit)
if err != nil {
return err
}
Expand Down
7 changes: 1 addition & 6 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,7 @@ func pinLsAll(typeStr string, ctx context.Context, n *core.IpfsNode) (map[string
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() {
nd, err := n.DAG.Get(ctx, k)
if err != nil {
return nil, err
}

err = dag.EnumerateChildren(n.Context(), n.DAG, nd, set.Visit, false)
err := dag.EnumerateChildren(n.Context(), n.DAG, k, set.Visit, false)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions core/corerepo/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
if err != nil {
return err
}
rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning, roots)
rmed, err := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
if err != nil {
return err
}
Expand All @@ -113,7 +113,7 @@ func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemo
if err != nil {
return nil, err
}
rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning, roots)
rmed, err := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
if err != nil {
return nil, err
}
Expand Down
8 changes: 2 additions & 6 deletions core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestAddGCLive(t *testing.T) {
gcstarted := make(chan struct{})
go func() {
defer close(gcstarted)
gcchan, err := gc.GC(context.Background(), node.Blockstore, node.Pinning, nil)
gcchan, err := gc.GC(context.Background(), node.Blockstore, node.DAG, node.Pinning, nil)
if err != nil {
log.Error("GC ERROR:", err)
errs <- err
Expand Down Expand Up @@ -156,13 +156,9 @@ func TestAddGCLive(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
root, err := node.DAG.Get(ctx, last)
if err != nil {
t.Fatal(err)
}

set := cid.NewSet()
err = dag.EnumerateChildren(ctx, node.DAG, root, set.Visit, false)
err = dag.EnumerateChildren(ctx, node.DAG, last, set.Visit, false)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 4 additions & 0 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,7 @@ func (bs *Bitswap) GetWantlist() []key.Key {
}
return out
}

func (bs *Bitswap) IsOnline() bool {
return true
}
2 changes: 2 additions & 0 deletions exchange/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ type Interface interface { // type Exchanger interface
// available on the network?
HasBlock(blocks.Block) error

IsOnline() bool

io.Closer
}
4 changes: 4 additions & 0 deletions exchange/offline/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,7 @@ func (e *offlineExchange) GetBlocks(ctx context.Context, ks []key.Key) (<-chan b
}()
return out, nil
}

func (e *offlineExchange) IsOnline() bool {
return false
}
63 changes: 47 additions & 16 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"

"context"
Expand All @@ -28,10 +29,20 @@ type DAGService interface {
GetMany(context.Context, []*cid.Cid) <-chan *NodeOption

Batch() *Batch

LinkService
}

type LinkService interface {
// Return all links for a node, may be more effect than
// calling Get in DAGService
GetLinks(context.Context, *cid.Cid) ([]*Link, error)

GetOfflineLinkService() LinkService
}

func NewDAGService(bs *bserv.BlockService) DAGService {
return &dagService{bs}
func NewDAGService(bs *bserv.BlockService) *dagService {
return &dagService{Blocks: bs}
}

// dagService is an IPFS Merkle DAG service.
Expand Down Expand Up @@ -93,13 +104,30 @@ func (n *dagService) Get(ctx context.Context, c *cid.Cid) (*Node, error) {
return res, nil
}

func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) {
node, err := n.Get(ctx, c)
if err != nil {
return nil, err
}
return node.Links, nil
}

func (n *dagService) GetOfflineLinkService() LinkService {
if n.Blocks.Exchange.IsOnline() {
bsrv := bserv.New(n.Blocks.Blockstore, offline.Exchange(n.Blocks.Blockstore))
return NewDAGService(bsrv)
} else {
return n
}
}

func (n *dagService) Remove(nd *Node) error {
return n.Blocks.DeleteObject(nd)
}

// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *Node, serv DAGService) error {
return EnumerateChildrenAsync(ctx, serv, root, cid.NewSet().Visit)
func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error {
return EnumerateChildrenAsync(ctx, serv, c, cid.NewSet().Visit)
}

// FindLinks searches this nodes links for the given key,
Expand Down Expand Up @@ -366,19 +394,17 @@ func legacyCidFromLink(lnk *Link) *cid.Cid {
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, visit func(*cid.Cid) bool, bestEffort bool) error {
for _, lnk := range root.Links {
func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit func(*cid.Cid) bool, bestEffort bool) error {
links, err := ds.GetLinks(ctx, root)
if bestEffort && err == ErrNotFound {
return nil
} else if err != nil {
return err
}
for _, lnk := range links {
c := legacyCidFromLink(lnk)
if visit(c) {
child, err := ds.Get(ctx, c)
if err != nil {
if bestEffort && err == ErrNotFound {
continue
} else {
return err
}
}
err = EnumerateChildren(ctx, ds, child, visit, bestEffort)
err = EnumerateChildren(ctx, ds, c, visit, bestEffort)
if err != nil {
return err
}
Expand All @@ -387,7 +413,7 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, visit fun
return nil
}

func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, visit func(*cid.Cid) bool) error {
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error {
toprocess := make(chan []*cid.Cid, 8)
nodes := make(chan *NodeOption, 8)

Expand All @@ -397,6 +423,11 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, visi

go fetchNodes(ctx, ds, toprocess, nodes)

root, err := ds.Get(ctx, c)
if err != nil {
return err
}

nodes <- &NodeOption{Node: root}
live := 1

Expand Down
8 changes: 4 additions & 4 deletions merkledag/merkledag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func TestFetchGraph(t *testing.T) {
t.Fatal(err)
}

err = FetchGraph(context.TODO(), root, dservs[1])
err = FetchGraph(context.TODO(), root.Cid(), dservs[1])
if err != nil {
t.Fatal(err)
}
Expand All @@ -241,7 +241,7 @@ func TestFetchGraph(t *testing.T) {

offline_ds := NewDAGService(bs)

err = EnumerateChildren(context.Background(), offline_ds, root, func(_ *cid.Cid) bool { return true }, false)
err = EnumerateChildren(context.Background(), offline_ds, root.Cid(), func(_ *cid.Cid) bool { return true }, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -258,7 +258,7 @@ func TestEnumerateChildren(t *testing.T) {
}

set := cid.NewSet()
err = EnumerateChildren(context.Background(), ds, root, set.Visit, false)
err = EnumerateChildren(context.Background(), ds, root.Cid(), set.Visit, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -269,7 +269,7 @@ func TestEnumerateChildren(t *testing.T) {
for _, lnk := range n.Links {
c := cid.NewCidV0(lnk.Hash)
if !set.Has(c) {
t.Fatal("missing key in set!")
t.Fatal("missing key in set! ", lnk.Hash.B58String())
}
child, err := ds.Get(context.Background(), c)
if err != nil {
Expand Down
25 changes: 9 additions & 16 deletions pin/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package gc

import (
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
dag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
Expand All @@ -24,13 +22,12 @@ var log = logging.Logger("gc")
//
// The routine then iterates over every block in the blockstore and
// deletes any block that is not found in the marked set.
func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) {
func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) {
unlocker := bs.GCLock()

bsrv := bserv.New(bs, offline.Exchange(bs))
ds := dag.NewDAGService(bsrv)
ls = ls.GetOfflineLinkService()

gcs, err := ColoredSet(ctx, pn, ds, bestEffortRoots)
gcs, err := ColoredSet(ctx, pn, ls, bestEffortRoots)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -71,16 +68,12 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRo
return output, nil
}

func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots []*cid.Cid, bestEffort bool) error {
func Descendants(ctx context.Context, ls dag.LinkService, set key.KeySet, roots []*cid.Cid, bestEffort bool) error {
for _, c := range roots {
set.Add(key.Key(c.Hash()))
nd, err := ds.Get(ctx, c)
if err != nil {
return err
}

// EnumerateChildren recursively walks the dag and adds the keys to the given set
err = dag.EnumerateChildren(ctx, ds, nd, func(c *cid.Cid) bool {
err := dag.EnumerateChildren(ctx, ls, c, func(c *cid.Cid) bool {
k := key.Key(c.Hash())
seen := set.Has(k)
if seen {
Expand All @@ -97,16 +90,16 @@ func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots [
return nil
}

func ColoredSet(ctx context.Context, pn pin.Pinner, ds dag.DAGService, bestEffortRoots []*cid.Cid) (key.KeySet, error) {
func ColoredSet(ctx context.Context, pn pin.Pinner, ls dag.LinkService, bestEffortRoots []*cid.Cid) (key.KeySet, error) {
// KeySet currently implemented in memory, in the future, may be bloom filter or
// disk backed to conserve memory.
gcs := key.NewKeySet()
err := Descendants(ctx, ds, gcs, pn.RecursiveKeys(), false)
err := Descendants(ctx, ls, gcs, pn.RecursiveKeys(), false)
if err != nil {
return nil, err
}

err = Descendants(ctx, ds, gcs, bestEffortRoots, true)
err = Descendants(ctx, ls, gcs, bestEffortRoots, true)
if err != nil {
return nil, err
}
Expand All @@ -115,7 +108,7 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ds dag.DAGService, bestEffor
gcs.Add(key.Key(k.Hash()))
}

err = Descendants(ctx, ds, gcs, pn.InternalPins(), false)
err = Descendants(ctx, ls, gcs, pn.InternalPins(), false)
if err != nil {
return nil, err
}
Expand Down
28 changes: 11 additions & 17 deletions pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (p *pinner) Pin(ctx context.Context, node *mdag.Node, recurse bool) error {
}

// fetch entire graph
err := mdag.FetchGraph(ctx, node, p.dserv)
err := mdag.FetchGraph(ctx, c, p.dserv)
if err != nil {
return err
}
Expand Down Expand Up @@ -279,12 +279,7 @@ func (p *pinner) isPinnedWithType(c *cid.Cid, mode PinMode) (string, bool, error

// Default is Indirect
for _, rc := range p.recursePin.Keys() {
rnd, err := p.dserv.Get(context.Background(), rc)
if err != nil {
return "", false, err
}

has, err := hasChild(p.dserv, rnd, k)
has, err := hasChild(p.dserv, rc, k)
if err != nil {
return "", false, err
}
Expand Down Expand Up @@ -317,11 +312,11 @@ func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) {
// Now walk all recursive pins to check for indirect pins
var checkChildren func(*cid.Cid, *cid.Cid) error
checkChildren = func(rk, parentKey *cid.Cid) error {
parent, err := p.dserv.Get(context.Background(), parentKey)
links, err := p.dserv.GetLinks(context.Background(), parentKey)
if err != nil {
return err
}
for _, lnk := range parent.Links {
for _, lnk := range links {
c := cid.NewCidV0(lnk.Hash)

if toCheck.Has(c) {
Expand Down Expand Up @@ -521,19 +516,18 @@ func (p *pinner) PinWithMode(c *cid.Cid, mode PinMode) {
}
}

func hasChild(ds mdag.DAGService, root *mdag.Node, child key.Key) (bool, error) {
for _, lnk := range root.Links {
func hasChild(ds mdag.LinkService, root *cid.Cid, child key.Key) (bool, error) {
links, err := ds.GetLinks(context.Background(), root)
if err != nil {
return false, err
}
for _, lnk := range links {
c := cid.NewCidV0(lnk.Hash)
if key.Key(c.Hash()) == child {
return true, nil
}

nd, err := ds.Get(context.Background(), c)
if err != nil {
return false, err
}

has, err := hasChild(ds, nd, child)
has, err := hasChild(ds, c, child)
if err != nil {
return false, err
}
Expand Down
5 changes: 5 additions & 0 deletions pin/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ func TestPinRecursiveFail(t *testing.T) {
t.Fatal(err)
}

_, err = dserv.Add(a)
if err != nil {
t.Fatal(err)
}

// this one is time based... but shouldnt cause any issues
mctx, _ = context.WithTimeout(ctx, time.Second)
err = p.Pin(mctx, a, true)
Expand Down

0 comments on commit 2fd045f

Please sign in to comment.