From 479761ec6a35dd42a23eca6abe3784c2787c41b5 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 20 Feb 2016 10:27:07 -0800 Subject: [PATCH 1/5] change batch fetching methods of dagserv License: MIT Signed-off-by: Jeromy --- core/commands/refs.go | 2 +- merkledag/merkledag.go | 60 ++++++++++++++++++++++++++++-------- unixfs/archive/tar/writer.go | 2 +- unixfs/io/dagreader.go | 2 +- 4 files changed, 50 insertions(+), 16 deletions(-) diff --git a/core/commands/refs.go b/core/commands/refs.go index c910cf48510..9a7715f14ac 100644 --- a/core/commands/refs.go +++ b/core/commands/refs.go @@ -233,7 +233,7 @@ func (rw *RefWriter) writeRefsRecursive(n *dag.Node) (int, error) { } var count int - for i, ng := range rw.DAG.GetDAG(rw.Ctx, n) { + for i, ng := range dag.GetDAG(rw.Ctx, rw.DAG, n) { lk := key.Key(n.Links[i].Hash) if rw.skip(lk) { continue diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index e324ceb883a..a311b396cae 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -24,8 +24,7 @@ type DAGService interface { // GetDAG returns, in order, all the single leve child // nodes of the passed in node. - GetDAG(context.Context, *Node) []NodeGetter - GetNodes(context.Context, []key.Key) []NodeGetter + GetMany(context.Context, []key.Key) (<-chan *Node, <-chan error) Batch() *Batch } @@ -146,21 +145,52 @@ func FindLinks(links []key.Key, k key.Key, start int) []int { return out } +func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node, <-chan error) { + out := make(chan *Node) + errs := make(chan error, 1) + blocks := ds.Blocks.GetBlocks(ctx, keys) + go func() { + defer close(out) + defer close(errs) + for { + select { + case b, ok := <-blocks: + if !ok { + return + } + nd, err := Decoded(b.Data) + if err != nil { + errs <- err + return + } + select { + case out <- nd: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } + }() + return out, errs +} + // GetDAG will fill out all of the links of the given Node. // It returns a channel of nodes, which the caller can receive // all the child nodes of 'root' on, in proper order. -func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter { +func GetDAG(ctx context.Context, ds DAGService, root *Node) []NodeGetter { var keys []key.Key for _, lnk := range root.Links { keys = append(keys, key.Key(lnk.Hash)) } - return ds.GetNodes(ctx, keys) + return GetNodes(ctx, ds, keys) } // GetNodes returns an array of 'NodeGetter' promises, with each corresponding // to the key with the same index as the passed in keys -func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter { +func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter { // Early out if no work to do if len(keys) == 0 { @@ -178,26 +208,29 @@ func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter ctx, cancel := context.WithCancel(ctx) defer cancel() - blkchan := ds.Blocks.GetBlocks(ctx, dedupedKeys) + nodechan, errchan := ds.GetMany(ctx, dedupedKeys) for count := 0; count < len(keys); { select { - case blk, ok := <-blkchan: + case nd, ok := <-nodechan: if !ok { return } - nd, err := Decoded(blk.Data) + k, err := nd.Key() if err != nil { - // NB: can happen with improperly formatted input data - log.Debug("Got back bad block!") - return + log.Error("Failed to get node key: ", err) + continue } - is := FindLinks(keys, blk.Key(), 0) + + is := FindLinks(keys, k, 0) for _, i := range is { count++ sendChans[i] <- nd } + case err := <-errchan: + log.Error("error fetching: ", err) + return case <-ctx.Done(): return } @@ -389,9 +422,10 @@ func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out cha } for ks := range in { - ng := ds.GetNodes(ctx, ks) + ng := GetNodes(ctx, ds, ks) for _, g := range ng { go get(g) } } + } diff --git a/unixfs/archive/tar/writer.go b/unixfs/archive/tar/writer.go index 32596618a23..ad151016ad0 100644 --- a/unixfs/archive/tar/writer.go +++ b/unixfs/archive/tar/writer.go @@ -39,7 +39,7 @@ func (w *Writer) writeDir(nd *mdag.Node, fpath string) error { return err } - for i, ng := range w.Dag.GetDAG(w.ctx, nd) { + for i, ng := range mdag.GetDAG(w.ctx, w.Dag, nd) { child, err := ng.Get(w.ctx) if err != nil { return err diff --git a/unixfs/io/dagreader.go b/unixfs/io/dagreader.go index 5b2a8b112ea..3c68ad896ba 100644 --- a/unixfs/io/dagreader.go +++ b/unixfs/io/dagreader.go @@ -90,7 +90,7 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag func NewDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader { fctx, cancel := context.WithCancel(ctx) - promises := serv.GetDAG(fctx, n) + promises := mdag.GetDAG(fctx, serv, n) return &DagReader{ node: n, serv: serv, From 5806ac00c84bcb3301826aaad1f2dacafa54abf6 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 20 Feb 2016 11:04:21 -0800 Subject: [PATCH 2/5] rework FetchGraph to be less of a memory hog License: MIT Signed-off-by: Jeromy --- merkledag/merkledag.go | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index a311b396cae..dc02b92f802 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -149,6 +149,8 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node out := make(chan *Node) errs := make(chan error, 1) blocks := ds.Blocks.GetBlocks(ctx, keys) + var count int + go func() { defer close(out) defer close(errs) @@ -156,6 +158,9 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node select { case b, ok := <-blocks: if !ok { + if count != len(keys) { + errs <- fmt.Errorf("failed to fetch all nodes") + } return } nd, err := Decoded(b.Data) @@ -165,6 +170,7 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node } select { case out <- nd: + count++ case <-ctx.Done(): return } @@ -404,28 +410,27 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) { defer close(out) - get := func(g NodeGetter) { - nd, err := g.Get(ctx) - if err != nil { + get := func(ks []key.Key) { + nodes, errch := ds.GetMany(ctx, ks) + for { select { - case errs <- err: - case <-ctx.Done(): + case nd, ok := <-nodes: + if !ok { + return + } + select { + case out <- nd: + case <-ctx.Done(): + return + } + case err := <-errch: + errs <- err + return } - return - } - - select { - case out <- nd: - case <-ctx.Done(): - return } } for ks := range in { - ng := GetNodes(ctx, ds, ks) - for _, g := range ng { - go get(g) - } + go get(ks) } - } From d2931d70a6060cf8c77d9b82a283e9288d68feb5 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 20 Feb 2016 16:21:04 -0800 Subject: [PATCH 3/5] fix test now that dag batches can more properly fail License: MIT Signed-off-by: Jeromy --- test/sharness/t0081-repo-pinning.sh | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/sharness/t0081-repo-pinning.sh b/test/sharness/t0081-repo-pinning.sh index 1d408598606..319cc39fddf 100755 --- a/test/sharness/t0081-repo-pinning.sh +++ b/test/sharness/t0081-repo-pinning.sh @@ -238,9 +238,9 @@ test_expect_success "some are no longer there" ' ' test_expect_success "recursive pin fails without objects" ' - ipfs pin rm "$HASH_DIR1" && - test_must_fail ipfs pin add -r "$HASH_DIR1" --timeout=500ms 2>err_expected8 && - grep "context deadline exceeded" err_expected8 || + ipfs pin rm -r=false "$HASH_DIR1" && + test_must_fail ipfs pin add -r "$HASH_DIR1" 2>err_expected8 && + grep "pin: failed to fetch all nodes" err_expected8 || test_fsh cat err_expected8 ' @@ -275,9 +275,9 @@ test_expect_success "test add nopin dir" ' FICTIONAL_HASH="QmXV4f9v8a56MxWKBhP3ETsz4EaafudU1cKfPaaJnenc48" test_launch_ipfs_daemon test_expect_success "test unpinning a hash that's not pinned" " - test_expect_code 1 ipfs pin rm $FICTIONAL_HASH --timeout=5s - test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a --timeout=5s - test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a/b --timeout=5s + test_expect_code 1 ipfs pin rm $FICTIONAL_HASH --timeout=2s + test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a --timeout=2s + test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a/b --timeout=2s " test_kill_ipfs_daemon From cba00b900b32367a13f19199104a00df4abefd94 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 24 Feb 2016 10:06:07 -0800 Subject: [PATCH 4/5] fixes from review License: MIT Signed-off-by: Jeromy --- core/commands/pin.go | 2 +- merkledag/merkledag.go | 15 +++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/core/commands/pin.go b/core/commands/pin.go index d64356d0dc8..e754c0a6237 100644 --- a/core/commands/pin.go +++ b/core/commands/pin.go @@ -33,7 +33,7 @@ type PinOutput struct { var addPinCmd = &cmds.Command{ Helptext: cmds.HelpText{ - Tagline: "Pins objects to local storage.", + Tagline: "Pins objects to local storage.", ShortDescription: "Stores an IPFS object(s) from a given path locally to disk.", }, diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index dc02b92f802..552fc068d75 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -146,14 +146,13 @@ func FindLinks(links []key.Key, k key.Key, start int) []int { } func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node, <-chan error) { - out := make(chan *Node) + out := make(chan *Node, len(keys)) errs := make(chan error, 1) blocks := ds.Blocks.GetBlocks(ctx, keys) var count int go func() { defer close(out) - defer close(errs) for { select { case b, ok := <-blocks: @@ -168,13 +167,13 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node errs <- err return } - select { - case out <- nd: - count++ - case <-ctx.Done(): - return - } + + // buffered, no need to select + out <- nd + count++ + case <-ctx.Done(): + errs <- ctx.Err() return } } From ec3959a6afab5a12d2c471ba6d9e35ed51c9ce07 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 24 Feb 2016 11:38:44 -0800 Subject: [PATCH 5/5] use an option type to simplify concurrency License: MIT Signed-off-by: Jeromy --- merkledag/merkledag.go | 84 ++++++++++++++++++++++++------------------ 1 file changed, 48 insertions(+), 36 deletions(-) diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 552fc068d75..aebc370ad1d 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -3,6 +3,7 @@ package merkledag import ( "fmt" + "sync" blocks "github.com/ipfs/go-ipfs/blocks" key "github.com/ipfs/go-ipfs/blocks/key" @@ -24,7 +25,7 @@ type DAGService interface { // GetDAG returns, in order, all the single leve child // nodes of the passed in node. - GetMany(context.Context, []key.Key) (<-chan *Node, <-chan error) + GetMany(context.Context, []key.Key) <-chan *NodeOption Batch() *Batch } @@ -145,9 +146,13 @@ func FindLinks(links []key.Key, k key.Key, start int) []int { return out } -func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node, <-chan error) { - out := make(chan *Node, len(keys)) - errs := make(chan error, 1) +type NodeOption struct { + Node *Node + Err error +} + +func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) <-chan *NodeOption { + out := make(chan *NodeOption, len(keys)) blocks := ds.Blocks.GetBlocks(ctx, keys) var count int @@ -158,27 +163,27 @@ func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node case b, ok := <-blocks: if !ok { if count != len(keys) { - errs <- fmt.Errorf("failed to fetch all nodes") + out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")} } return } nd, err := Decoded(b.Data) if err != nil { - errs <- err + out <- &NodeOption{Err: err} return } // buffered, no need to select - out <- nd + out <- &NodeOption{Node: nd} count++ case <-ctx.Done(): - errs <- ctx.Err() + out <- &NodeOption{Err: ctx.Err()} return } } }() - return out, errs + return out } // GetDAG will fill out all of the links of the given Node. @@ -213,15 +218,22 @@ func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter { ctx, cancel := context.WithCancel(ctx) defer cancel() - nodechan, errchan := ds.GetMany(ctx, dedupedKeys) + nodechan := ds.GetMany(ctx, dedupedKeys) for count := 0; count < len(keys); { select { - case nd, ok := <-nodechan: + case opt, ok := <-nodechan: if !ok { return } + if opt.Err != nil { + log.Error("error fetching: ", opt.Err) + return + } + + nd := opt.Node + k, err := nd.Key() if err != nil { log.Error("Failed to get node key: ", err) @@ -233,9 +245,6 @@ func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter { count++ sendChans[i] <- nd } - case err := <-errchan: - log.Error("error fetching: ", err) - return case <-ctx.Done(): return } @@ -356,24 +365,30 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.K func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error { toprocess := make(chan []key.Key, 8) - nodes := make(chan *Node, 8) - errs := make(chan error, 1) + nodes := make(chan *NodeOption, 8) ctx, cancel := context.WithCancel(ctx) defer cancel() defer close(toprocess) - go fetchNodes(ctx, ds, toprocess, nodes, errs) + go fetchNodes(ctx, ds, toprocess, nodes) - nodes <- root + nodes <- &NodeOption{Node: root} live := 1 for { select { - case nd, ok := <-nodes: + case opt, ok := <-nodes: if !ok { return nil } + + if opt.Err != nil { + return opt.Err + } + + nd := opt.Node + // a node has been fetched live-- @@ -398,38 +413,35 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set return ctx.Err() } } - case err := <-errs: - return err case <-ctx.Done(): return ctx.Err() } } } -func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) { - defer close(out) +func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *NodeOption) { + var wg sync.WaitGroup + defer func() { + // wait for all 'get' calls to complete so we don't accidentally send + // on a closed channel + wg.Wait() + close(out) + }() get := func(ks []key.Key) { - nodes, errch := ds.GetMany(ctx, ks) - for { + defer wg.Done() + nodes := ds.GetMany(ctx, ks) + for opt := range nodes { select { - case nd, ok := <-nodes: - if !ok { - return - } - select { - case out <- nd: - case <-ctx.Done(): - return - } - case err := <-errch: - errs <- err + case out <- opt: + case <-ctx.Done(): return } } } for ks := range in { + wg.Add(1) go get(ks) } }