Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor merkledag fetching methods #2384

Merged
merged 5 commits into from
Feb 29, 2016
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type PinOutput struct {

var addPinCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Pins objects to local storage.",
Tagline: "Pins objects to local storage.",
ShortDescription: "Stores an IPFS object(s) from a given path locally to disk.",
},

Expand Down
2 changes: 1 addition & 1 deletion core/commands/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (rw *RefWriter) writeRefsRecursive(n *dag.Node) (int, error) {
}

var count int
for i, ng := range rw.DAG.GetDAG(rw.Ctx, n) {
for i, ng := range dag.GetDAG(rw.Ctx, rw.DAG, n) {
lk := key.Key(n.Links[i].Hash)
if rw.skip(lk) {
continue
Expand Down
94 changes: 66 additions & 28 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ type DAGService interface {

// GetDAG returns, in order, all the single leve child
// nodes of the passed in node.
GetDAG(context.Context, *Node) []NodeGetter
GetNodes(context.Context, []key.Key) []NodeGetter
GetMany(context.Context, []key.Key) (<-chan *Node, <-chan error)

Batch() *Batch
}
Expand Down Expand Up @@ -146,21 +145,57 @@ func FindLinks(links []key.Key, k key.Key, start int) []int {
return out
}

func (ds *dagService) GetMany(ctx context.Context, keys []key.Key) (<-chan *Node, <-chan error) {
out := make(chan *Node, len(keys))
errs := make(chan error, 1)
blocks := ds.Blocks.GetBlocks(ctx, keys)
var count int

go func() {
defer close(out)
for {
select {
case b, ok := <-blocks:
if !ok {
if count != len(keys) {
errs <- fmt.Errorf("failed to fetch all nodes")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failed to fetch all nodes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go's error string convention is lower case with no punctuation: https://github.com/golang/lint/blob/master/lint.go#L1173

}
return
}
nd, err := Decoded(b.Data)
if err != nil {
errs <- err
return
}

// buffered, no need to select
out <- nd
count++

case <-ctx.Done():
errs <- ctx.Err()
return
}
}
}()
return out, errs
}

// GetDAG will fill out all of the links of the given Node.
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter {
func GetDAG(ctx context.Context, ds DAGService, root *Node) []NodeGetter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this no longer a method on DAGService?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm working towards shrinking our primary interfaces, theres no reason this needs to be a method, it doesnt require access to any of the internal, and it just adds complexity for any other implementation of a DAGService

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ha, I grok this now: minimize the interface surface area where ever possible. That makes excellent sense -- I didn't realize how I was coming at this from a much more Java classes perspective vs implicit go interfaces. +:100:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

come to the duck typing side, we have cookies 😄

var keys []key.Key
for _, lnk := range root.Links {
keys = append(keys, key.Key(lnk.Hash))
}

return ds.GetNodes(ctx, keys)
return GetNodes(ctx, ds, keys)
}

// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter {
func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about this function returning error as well? I'm concerned about the way it eats errors from the DAGService right now: it logs them, but that's not really a consumable format for the caller to react to.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the error eating issue is addressed in another PR i have open (that i need to go pay attention to).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome -- thanks!


// Early out if no work to do
if len(keys) == 0 {
Expand All @@ -178,26 +213,29 @@ func (ds *dagService) GetNodes(ctx context.Context, keys []key.Key) []NodeGetter
ctx, cancel := context.WithCancel(ctx)
defer cancel()

blkchan := ds.Blocks.GetBlocks(ctx, dedupedKeys)
nodechan, errchan := ds.GetMany(ctx, dedupedKeys)

for count := 0; count < len(keys); {
select {
case blk, ok := <-blkchan:
case nd, ok := <-nodechan:
if !ok {
return
}

nd, err := Decoded(blk.Data)
k, err := nd.Key()
if err != nil {
// NB: can happen with improperly formatted input data
log.Debug("Got back bad block!")
return
log.Error("Failed to get node key: ", err)
continue
}
is := FindLinks(keys, blk.Key(), 0)

is := FindLinks(keys, k, 0)
for _, i := range is {
count++
sendChans[i] <- nd
}
case err := <-errchan:
log.Error("error fetching: ", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error

return
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -371,27 +409,27 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
defer close(out)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should errs be closed too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I normally avoid doing that, it causes odd race conditions, and nobody will ever be waiting solely on that error channel.

The race condition goes something like:

select {
case nd, ok := <-nodes:
// handle the node
case err := <-errs:
// handle the error, generally exit
}

if i send a node, then return and the error channel gets closed, then i might miss out on handling that node depending on how well the scheduler feels like behaving. If I only close the one channel, we only have one outcome to worry about.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but, in the case of an error, i'm not sure if its imperative we process every node received, or if we error out asap

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- that makes sense. Is there any possibility of missing a node if you 1. send the node, then 2. return, thus closing the node channel?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(discussed offline; the answer is no: go guarantees sends+closes happen in order)


get := func(g NodeGetter) {
nd, err := g.Get(ctx)
if err != nil {
get := func(ks []key.Key) {
nodes, errch := ds.GetMany(ctx, ks)
for {
select {
case errs <- err:
case <-ctx.Done():
case nd, ok := <-nodes:
if !ok {
return
}
select {
case out <- nd:
case <-ctx.Done():
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we write ctx.Err() to the errs channel here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I normally don't for the same issue i describe above

}
case err := <-errch:
errs <- err
return
}
return
}

select {
case out <- nd:
case <-ctx.Done():
return
}
}

for ks := range in {
ng := ds.GetNodes(ctx, ks)
for _, g := range ng {
go get(g)
}
go get(ks)
}
}
12 changes: 6 additions & 6 deletions test/sharness/t0081-repo-pinning.sh
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,9 @@ test_expect_success "some are no longer there" '
'

test_expect_success "recursive pin fails without objects" '
ipfs pin rm "$HASH_DIR1" &&
test_must_fail ipfs pin add -r "$HASH_DIR1" --timeout=500ms 2>err_expected8 &&
grep "context deadline exceeded" err_expected8 ||
ipfs pin rm -r=false "$HASH_DIR1" &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this no longer recursive with -r=false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the test implies it tests recursive pinning.

Maybe more specifically: why has this test been changed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it hasnt, its been made more correct. direct pinning used to be the default, so this test removed the direct pin, then added a recursive pin. When the change to make recursive default happened, this bit wasnt fixed. It didnt cause an error because removing a recursive pin will also remove a direct pin if one exists (rm -r works on a single file too). So all I did here was make it more explicit that we were removing the direct pin, and not removing a recursive one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. Thanks!

test_must_fail ipfs pin add -r "$HASH_DIR1" 2>err_expected8 &&
grep "pin: failed to fetch all nodes" err_expected8 ||
test_fsh cat err_expected8
'

Expand Down Expand Up @@ -275,9 +275,9 @@ test_expect_success "test add nopin dir" '
FICTIONAL_HASH="QmXV4f9v8a56MxWKBhP3ETsz4EaafudU1cKfPaaJnenc48"
test_launch_ipfs_daemon
test_expect_success "test unpinning a hash that's not pinned" "
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH --timeout=5s
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a --timeout=5s
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a/b --timeout=5s
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH --timeout=2s
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a --timeout=2s
test_expect_code 1 ipfs pin rm $FICTIONAL_HASH/a/b --timeout=2s
"
test_kill_ipfs_daemon

Expand Down
2 changes: 1 addition & 1 deletion unixfs/archive/tar/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (w *Writer) writeDir(nd *mdag.Node, fpath string) error {
return err
}

for i, ng := range w.Dag.GetDAG(w.ctx, nd) {
for i, ng := range mdag.GetDAG(w.ctx, w.Dag, nd) {
child, err := ng.Get(w.ctx)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion unixfs/io/dagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag

func NewDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader {
fctx, cancel := context.WithCancel(ctx)
promises := serv.GetDAG(fctx, n)
promises := mdag.GetDAG(fctx, serv, n)
return &DagReader{
node: n,
serv: serv,
Expand Down