diff --git a/core/commands/pin.go b/core/commands/pin.go index d64356d0dc8..e754c0a6237 100644 --- a/core/commands/pin.go +++ b/core/commands/pin.go @@ -33,7 +33,7 @@ type PinOutput struct { var addPinCmd = &cmds.Command{ Helptext: cmds.HelpText{ - Tagline: "Pins objects to local storage.", + Tagline: "Pins objects to local storage.", ShortDescription: "Stores an IPFS object(s) from a given path locally to disk.", }, diff --git a/core/commands/refs.go b/core/commands/refs.go index c910cf48510..9a7715f14ac 100644 --- a/core/commands/refs.go +++ b/core/commands/refs.go @@ -233,7 +233,7 @@ func (rw *RefWriter) writeRefsRecursive(n *dag.Node) (int, error) { } var count int - for i, ng := range rw.DAG.GetDAG(rw.Ctx, n) { + for i, ng := range dag.GetDAG(rw.Ctx, rw.DAG, n) { lk := key.Key(n.Links[i].Hash) if rw.skip(lk) { continue diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index e324ceb883a..aebc370ad1d 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -3,6 +3,7 @@ package merkledag import ( "fmt" + "sync" blocks "github.com/ipfs/go-ipfs/blocks" key "github.com/ipfs/go-ipfs/blocks/key" @@ -24,8 +25,7 @@ type DAGService interface { // GetDAG returns, in order, all the single leve child // nodes of the passed in node. - GetDAG(context.Context, *Node) []NodeGetter - GetNodes(context.Context, []key.Key) []NodeGetter + GetMany(context.Context, []key.Key) <-chan *NodeOption Batch() *Batch } @@ -146,21 +146,61 @@ func FindLinks(links []key.Key, k key.Key, start int) []int { return out } +type NodeOption struct { + Node *Node + Err error +} + +func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) <-chan *NodeOption { + out := make(chan *NodeOption, len(keys)) + blocks := ds.Blocks.GetBlocks(ctx, keys) + var count int + + go func() { + defer close(out) + for { + select { + case b, ok := <-blocks: + if !ok { + if count != len(keys) { + out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")} + } + return + } + nd, err := Decoded(b.Data) + if err != nil { + out <- &NodeOption{Err: err} + return + } + + // buffered, no need to select + out <- &NodeOption{Node: nd} + count++ + + case <-ctx.Done(): + out <- &NodeOption{Err: ctx.Err()} + return + } + } + }() + return out +} + // GetDAG will fill out all of the links of the given Node. // It returns a channel of nodes, which the caller can receive // all the child nodes of 'root' on, in proper order. -func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter { +func GetDAG(ctx context.Context, ds DAGService, root *Node) []NodeGetter { var keys []key.Key for _, lnk := range root.Links { keys = append(keys, key.Key(lnk.Hash)) } - return ds.GetNodes(ctx, keys) + return GetNodes(ctx, ds, keys) } // GetNodes returns an array of 'NodeGetter' promises, with each corresponding // to the key with the same index as the passed in keys -func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter { +func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter { // Early out if no work to do if len(keys) == 0 { @@ -178,22 +218,29 @@ func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter ctx, cancel := context.WithCancel(ctx) defer cancel() - blkchan := ds.Blocks.GetBlocks(ctx, dedupedKeys) + nodechan := ds.GetMany(ctx, dedupedKeys) for count := 0; count < len(keys); { select { - case blk, ok := <-blkchan: + case opt, ok := <-nodechan: if !ok { return } - nd, err := Decoded(blk.Data) - if err != nil { - // NB: can happen with improperly formatted input data - log.Debug("Got back bad block!") + if opt.Err != nil { + log.Error("error fetching: ", opt.Err) return } - is := FindLinks(keys, blk.Key(), 0) + + nd := opt.Node + + k, err := nd.Key() + if err != nil { + log.Error("Failed to get node key: ", err) + continue + } + + is := FindLinks(keys, k, 0) for _, i := range is { count++ sendChans[i] <- nd @@ -318,24 +365,30 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.K func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error { toprocess := make(chan []key.Key, 8) - nodes := make(chan *Node, 8) - errs := make(chan error, 1) + nodes := make(chan *NodeOption, 8) ctx, cancel := context.WithCancel(ctx) defer cancel() defer close(toprocess) - go fetchNodes(ctx, ds, toprocess, nodes, errs) + go fetchNodes(ctx, ds, toprocess, nodes) - nodes <- root + nodes <- &NodeOption{Node: root} live := 1 for { select { - case nd, ok := <-nodes: + case opt, ok := <-nodes: if !ok { return nil } + + if opt.Err != nil { + return opt.Err + } + + nd := opt.Node + // a node has been fetched live-- @@ -360,38 +413,35 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set return ctx.Err() } } - case err := <-errs: - return err case <-ctx.Done(): return ctx.Err() } } } -func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) { - defer close(out) +func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *NodeOption) { + var wg sync.WaitGroup + defer func() { + // wait for all 'get' calls to complete so we don't accidentally send + // on a closed channel + wg.Wait() + close(out) + }() - get := func(g NodeGetter) { - nd, err := g.Get(ctx) - if err != nil { + get := func(ks []key.Key) { + defer wg.Done() + nodes := ds.GetMany(ctx, ks) + for opt := range nodes { select { - case errs <- err: + case out <- opt: case <-ctx.Done(): + return } - return - } - - select { - case out <- nd: - case <-ctx.Done(): - return } } for ks := range in { - ng := ds.GetNodes(ctx, ks) - for _, g := range ng { - go get(g) - } + wg.Add(1) + go get(ks) } } diff --git a/test/sharness/t0081-repo-pinning.sh b/test/sharness/t0081-repo-pinning.sh index 1d408598606..319cc39fddf 100755 --- a/test/sharness/t0081-repo-pinning.sh +++ b/test/sharness/t0081-repo-pinning.sh @@ -238,9 +238,9 @@ test_expect_success "some are no longer there" ' ' test_expect_success "recursive pin fails without objects" ' - ipfs pin rm "$HASH_DIR1" && - test_must_fail ipfs pin add -r "$HASH_DIR1" --timeout=500ms 2>err_expected8 && - grep "context deadline exceeded" err_expected8 || + ipfs pin rm -r=false "$HASH_DIR1" && + test_must_fail ipfs pin add -r "$HASH_DIR1" 2>err_expected8 && + grep "pin: failed to fetch all nodes" err_expected8 || test_fsh cat err_expected8 ' @@ -275,9 +275,9 @@ test_expect_success "test add nopin dir" ' FICTIONAL_HASH="QmXV4f9v8a56MxWKBhP3ETsz4EaafudU1cKfPaaJnenc48" test_launch_ipfs_daemon test_expect_success "test unpinning a hash that's not pinned" " - test_expect_code 1 ipfs pin rm $FICTIONAL_HASH --timeout=5s - test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a --timeout=5s - test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a/b --timeout=5s + test_expect_code 1 ipfs pin rm $FICTIONAL_HASH --timeout=2s + test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a --timeout=2s + test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a/b --timeout=2s " test_kill_ipfs_daemon diff --git a/unixfs/archive/tar/writer.go b/unixfs/archive/tar/writer.go index 32596618a23..ad151016ad0 100644 --- a/unixfs/archive/tar/writer.go +++ b/unixfs/archive/tar/writer.go @@ -39,7 +39,7 @@ func (w *Writer) writeDir(nd *mdag.Node, fpath string) error { return err } - for i, ng := range w.Dag.GetDAG(w.ctx, nd) { + for i, ng := range mdag.GetDAG(w.ctx, w.Dag, nd) { child, err := ng.Get(w.ctx) if err != nil { return err diff --git a/unixfs/io/dagreader.go b/unixfs/io/dagreader.go index 5b2a8b112ea..3c68ad896ba 100644 --- a/unixfs/io/dagreader.go +++ b/unixfs/io/dagreader.go @@ -90,7 +90,7 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag func NewDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader { fctx, cancel := context.WithCancel(ctx) - promises := serv.GetDAG(fctx, n) + promises := mdag.GetDAG(fctx, serv, n) return &DagReader{ node: n, serv: serv,