Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor merkledag fetching methods #2384

Merged
merged 5 commits into from
Feb 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type PinOutput struct {

var addPinCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Pins objects to local storage.",
Tagline: "Pins objects to local storage.",
ShortDescription: "Stores an IPFS object(s) from a given path locally to disk.",
},

Expand Down
2 changes: 1 addition & 1 deletion core/commands/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (rw *RefWriter) writeRefsRecursive(n *dag.Node) (int, error) {
}

var count int
for i, ng := range rw.DAG.GetDAG(rw.Ctx, n) {
for i, ng := range dag.GetDAG(rw.Ctx, rw.DAG, n) {
lk := key.Key(n.Links[i].Hash)
if rw.skip(lk) {
continue
Expand Down
122 changes: 86 additions & 36 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package merkledag

import (
"fmt"
"sync"

blocks "github.com/ipfs/go-ipfs/blocks"
key "github.com/ipfs/go-ipfs/blocks/key"
Expand All @@ -24,8 +25,7 @@ type DAGService interface {

// GetDAG returns, in order, all the single leve child
// nodes of the passed in node.
GetDAG(context.Context, *Node) []NodeGetter
GetNodes(context.Context, []key.Key) []NodeGetter
GetMany(context.Context, []key.Key) <-chan *NodeOption

Batch() *Batch
}
Expand Down Expand Up @@ -146,21 +146,61 @@ func FindLinks(links []key.Key, k key.Key, start int) []int {
return out
}

type NodeOption struct {
Node *Node
Err error
}

func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) <-chan *NodeOption {
out := make(chan *NodeOption, len(keys))
blocks := ds.Blocks.GetBlocks(ctx, keys)
var count int

go func() {
defer close(out)
for {
select {
case b, ok := <-blocks:
if !ok {
if count != len(keys) {
out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
}
return
}
nd, err := Decoded(b.Data)
if err != nil {
out <- &NodeOption{Err: err}
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to send ctx.Err() along the error channel here? I guess not since the deferred close() will wipe it out, but doesn't this mean the caller has no way of knowing that it ended due to a context cancellation?

}

// buffered, no need to select
out <- &NodeOption{Node: nd}
count++

case <-ctx.Done():
out <- &NodeOption{Err: ctx.Err()}
return
}
}
}()
return out
}

// GetDAG will fill out all of the links of the given Node.
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter {
func GetDAG(ctx context.Context, ds DAGService, root *Node) []NodeGetter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this no longer a method on DAGService?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm working towards shrinking our primary interfaces, theres no reason this needs to be a method, it doesnt require access to any of the internal, and it just adds complexity for any other implementation of a DAGService

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ha, I grok this now: minimize the interface surface area where ever possible. That makes excellent sense -- I didn't realize how I was coming at this from a much more Java classes perspective vs implicit go interfaces. +:100:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

come to the duck typing side, we have cookies 😄

var keys []key.Key
for _, lnk := range root.Links {
keys = append(keys, key.Key(lnk.Hash))
}

return ds.GetNodes(ctx, keys)
return GetNodes(ctx, ds, keys)
}

// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter {
func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about this function returning error as well? I'm concerned about the way it eats errors from the DAGService right now: it logs them, but that's not really a consumable format for the caller to react to.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the error eating issue is addressed in another PR i have open (that i need to go pay attention to).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome -- thanks!


// Early out if no work to do
if len(keys) == 0 {
Expand All @@ -178,22 +218,29 @@ func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter
ctx, cancel := context.WithCancel(ctx)
defer cancel()

blkchan := ds.Blocks.GetBlocks(ctx, dedupedKeys)
nodechan := ds.GetMany(ctx, dedupedKeys)

for count := 0; count < len(keys); {
select {
case blk, ok := <-blkchan:
case opt, ok := <-nodechan:
if !ok {
return
}

nd, err := Decoded(blk.Data)
if err != nil {
// NB: can happen with improperly formatted input data
log.Debug("Got back bad block!")
if opt.Err != nil {
log.Error("error fetching: ", opt.Err)
return
}
is := FindLinks(keys, blk.Key(), 0)

nd := opt.Node

k, err := nd.Key()
if err != nil {
log.Error("Failed to get node key: ", err)
continue
}

is := FindLinks(keys, k, 0)
for _, i := range is {
count++
sendChans[i] <- nd
Expand Down Expand Up @@ -318,24 +365,30 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.K

func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
toprocess := make(chan []key.Key, 8)
nodes := make(chan *Node, 8)
errs := make(chan error, 1)
nodes := make(chan *NodeOption, 8)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(toprocess)

go fetchNodes(ctx, ds, toprocess, nodes, errs)
go fetchNodes(ctx, ds, toprocess, nodes)

nodes <- root
nodes <- &NodeOption{Node: root}
live := 1

for {
select {
case nd, ok := <-nodes:
case opt, ok := <-nodes:
if !ok {
return nil
}

if opt.Err != nil {
return opt.Err
}

nd := opt.Node

// a node has been fetched
live--

Expand All @@ -360,38 +413,35 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set
return ctx.Err()
}
}
case err := <-errs:
return err
case <-ctx.Done():
return ctx.Err()
}
}
}

func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
defer close(out)
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *NodeOption) {
var wg sync.WaitGroup
defer func() {
// wait for all 'get' calls to complete so we don't accidentally send
// on a closed channel
wg.Wait()
close(out)
}()

get := func(g NodeGetter) {
nd, err := g.Get(ctx)
if err != nil {
get := func(ks []key.Key) {
defer wg.Done()
nodes := ds.GetMany(ctx, ks)
for opt := range nodes {
select {
case errs <- err:
case out <- opt:
case <-ctx.Done():
return
}
return
}

select {
case out <- nd:
case <-ctx.Done():
return
}
}

for ks := range in {
ng := ds.GetNodes(ctx, ks)
for _, g := range ng {
go get(g)
}
wg.Add(1)
go get(ks)
}
}
12 changes: 6 additions & 6 deletions test/sharness/t0081-repo-pinning.sh
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,9 @@ test_expect_success "some are no longer there" '
'

test_expect_success "recursive pin fails without objects" '
ipfs pin rm "$HASH_DIR1" &&
test_must_fail ipfs pin add -r "$HASH_DIR1" --timeout=500ms 2>err_expected8 &&
grep "context deadline exceeded" err_expected8 ||
ipfs pin rm -r=false "$HASH_DIR1" &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this no longer recursive with -r=false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the test implies it tests recursive pinning.

Maybe more specifically: why has this test been changed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it hasnt, its been made more correct. direct pinning used to be the default, so this test removed the direct pin, then added a recursive pin. When the change to make recursive default happened, this bit wasnt fixed. It didnt cause an error because removing a recursive pin will also remove a direct pin if one exists (rm -r works on a single file too). So all I did here was make it more explicit that we were removing the direct pin, and not removing a recursive one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. Thanks!

test_must_fail ipfs pin add -r "$HASH_DIR1" 2>err_expected8 &&
grep "pin: failed to fetch all nodes" err_expected8 ||
test_fsh cat err_expected8
'

Expand Down Expand Up @@ -275,9 +275,9 @@ test_expect_success "test add nopin dir" '
FICTIONAL_HASH="QmXV4f9v8a56MxWKBhP3ETsz4EaafudU1cKfPaaJnenc48"
test_launch_ipfs_daemon
test_expect_success "test unpinning a hash that's not pinned" "
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH --timeout=5s
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a --timeout=5s
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a/b --timeout=5s
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH --timeout=2s
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a --timeout=2s
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a/b --timeout=2s
"
test_kill_ipfs_daemon

Expand Down
2 changes: 1 addition & 1 deletion unixfs/archive/tar/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (w *Writer) writeDir(nd *mdag.Node, fpath string) error {
return err
}

for i, ng := range w.Dag.GetDAG(w.ctx, nd) {
for i, ng := range mdag.GetDAG(w.ctx, w.Dag, nd) {
child, err := ng.Get(w.ctx)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion unixfs/io/dagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag

func NewDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader {
fctx, cancel := context.WithCancel(ctx)
promises := serv.GetDAG(fctx, n)
promises := mdag.GetDAG(fctx, serv, n)
return &DagReader{
node: n,
serv: serv,
Expand Down