From 0597a04924c8e456ea47d7236b8296ed00c4de45 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 30 Apr 2017 13:37:37 -0700 Subject: [PATCH 1/2] Fix sharding memory growth, and fix resolver for unixfs paths License: MIT Signed-off-by: Jeromy --- core/coreunix/cat.go | 7 +++- path/resolver.go | 4 +- test/sharness/t0260-sharding-flag.sh | 5 +++ unixfs/hamt/hamt.go | 63 ++++++++++++++++++++-------- unixfs/io/dirbuilder.go | 6 ++- 5 files changed, 62 insertions(+), 23 deletions(-) diff --git a/core/coreunix/cat.go b/core/coreunix/cat.go index 240307d2c2c..d58f190a316 100644 --- a/core/coreunix/cat.go +++ b/core/coreunix/cat.go @@ -9,7 +9,12 @@ import ( ) func Cat(ctx context.Context, n *core.IpfsNode, pstr string) (uio.DagReader, error) { - dagNode, err := core.Resolve(ctx, n.Namesys, n.Resolver, path.Path(pstr)) + r := &path.Resolver{ + DAG: n.DAG, + ResolveOnce: uio.ResolveUnixfsOnce, + } + + dagNode, err := core.Resolve(ctx, n.Namesys, r, path.Path(pstr)) if err != nil { return nil, err } diff --git a/path/resolver.go b/path/resolver.go index 4ebde479ff7..84a6fe66c3a 100644 --- a/path/resolver.go +++ b/path/resolver.go @@ -163,7 +163,7 @@ func (s *Resolver) ResolveLinks(ctx context.Context, ndd node.Node, names []stri ctx, cancel = context.WithTimeout(ctx, time.Minute) defer cancel() - lnk, rest, err := nd.ResolveLink(names) + lnk, err := s.ResolveOnce(ctx, s.DAG, nd, names[0]) if err == dag.ErrLinkNotFound { return result, ErrNoLink{Name: names[0], Node: nd.Cid()} } else if err != nil { @@ -177,7 +177,7 @@ func (s *Resolver) ResolveLinks(ctx context.Context, ndd node.Node, names []stri nd = nextnode result = append(result, nextnode) - names = rest + names = names[1:] } return result, nil } diff --git a/test/sharness/t0260-sharding-flag.sh b/test/sharness/t0260-sharding-flag.sh index 39d96712bc4..7bb75ab6690 100755 --- a/test/sharness/t0260-sharding-flag.sh +++ b/test/sharness/t0260-sharding-flag.sh @@ -68,6 +68,11 @@ test_add_large_dir_v1() { echo "$exphash" > sharddir_exp && test_cmp sharddir_exp sharddir_out ' + + test_expect_success "can access a path under the dir" ' + ipfs cat "$exphash/file20" > file20_out && + test_cmp testdata/file20 file20_out + ' } # this hash implies both the directory and the leaf entries are CIDv1 diff --git a/unixfs/hamt/hamt.go b/unixfs/hamt/hamt.go index ccdffe7e442..ceda529c9a4 100644 --- a/unixfs/hamt/hamt.go +++ b/unixfs/hamt/hamt.go @@ -62,7 +62,7 @@ type HamtShard struct { // child can either be another shard, or a leaf node value type child interface { - Node() (node.Node, error) + Link() (*node.Link, error) Label() string } @@ -144,12 +144,12 @@ func (ds *HamtShard) Node() (node.Node, error) { cindex := ds.indexForBitPos(i) ch := ds.children[cindex] if ch != nil { - cnd, err := ch.Node() + clnk, err := ch.Link() if err != nil { return nil, err } - err = out.AddNodeLinkClean(ds.linkNamePrefix(i)+ch.Label(), cnd) + err = out.AddRawLink(ds.linkNamePrefix(i)+ch.Label(), clnk) if err != nil { return nil, err } @@ -188,10 +188,10 @@ func (ds *HamtShard) Node() (node.Node, error) { type shardValue struct { key string - val node.Node + val *node.Link } -func (sv *shardValue) Node() (node.Node, error) { +func (sv *shardValue) Link() (*node.Link, error) { return sv.val, nil } @@ -214,7 +214,18 @@ func (ds *HamtShard) Label() string { // Set sets 'name' = nd in the HAMT func (ds *HamtShard) Set(ctx context.Context, name string, nd node.Node) error { hv := &hashBits{b: hash([]byte(name))} - return ds.modifyValue(ctx, hv, name, nd) + _, err := ds.dserv.Add(nd) + if err != nil { + return err + } + + lnk, err := node.MakeLink(nd) + if err != nil { + return err + } + lnk.Name = ds.linkNamePrefix(0) + name + + return ds.modifyValue(ctx, hv, name, lnk) } // Remove deletes the named entry if it exists, this operation is idempotent. @@ -226,13 +237,16 @@ func (ds *HamtShard) Remove(ctx context.Context, name string) error { func (ds *HamtShard) Find(ctx context.Context, name string) (node.Node, error) { hv := &hashBits{b: hash([]byte(name))} - var out node.Node + var out *node.Link err := ds.getValue(ctx, hv, name, func(sv *shardValue) error { out = sv.val return nil }) + if err != nil { + return nil, err + } - return out, err + return ds.dserv.Get(ctx, out.Cid) } // getChild returns the i'th child of this shard. If it is cached in the @@ -291,9 +305,10 @@ func (ds *HamtShard) loadChild(ctx context.Context, i int) (child, error) { c = cds } else { + lnk2 := *lnk c = &shardValue{ key: lnk.Name[ds.maxpadlen:], - val: nd, + val: &lnk2, } } @@ -305,16 +320,32 @@ func (ds *HamtShard) setChild(i int, c child) { ds.children[i] = c } -func (ds *HamtShard) insertChild(idx int, key string, val node.Node) error { - if val == nil { +func (ds *HamtShard) Link() (*node.Link, error) { + nd, err := ds.Node() + if err != nil { + return nil, err + } + + _, err = ds.dserv.Add(nd) + if err != nil { + return nil, err + } + + return node.MakeLink(nd) +} + +func (ds *HamtShard) insertChild(idx int, key string, lnk *node.Link) error { + if lnk == nil { return os.ErrNotExist } i := ds.indexForBitPos(idx) ds.bitfield.SetBit(ds.bitfield, idx, 1) + + lnk.Name = ds.linkNamePrefix(idx) + key sv := &shardValue{ key: key, - val: val, + val: lnk, } ds.children = append(ds.children[:i], append([]child{sv}, ds.children[i:]...)...) @@ -370,11 +401,7 @@ func (ds *HamtShard) EnumLinks(ctx context.Context) ([]*node.Link, error) { func (ds *HamtShard) ForEachLink(ctx context.Context, f func(*node.Link) error) error { return ds.walkTrie(ctx, func(sv *shardValue) error { - lnk, err := node.MakeLink(sv.val) - if err != nil { - return err - } - + lnk := sv.val lnk.Name = sv.key return f(lnk) @@ -414,7 +441,7 @@ func (ds *HamtShard) walkTrie(ctx context.Context, cb func(*shardValue) error) e return nil } -func (ds *HamtShard) modifyValue(ctx context.Context, hv *hashBits, key string, val node.Node) error { +func (ds *HamtShard) modifyValue(ctx context.Context, hv *hashBits, key string, val *node.Link) error { idx := hv.Next(ds.tableSizeLg2) if ds.bitfield.Bit(idx) != 1 { diff --git a/unixfs/io/dirbuilder.go b/unixfs/io/dirbuilder.go index bcf9770f43c..285992081c4 100644 --- a/unixfs/io/dirbuilder.go +++ b/unixfs/io/dirbuilder.go @@ -48,10 +48,12 @@ func NewDirectory(dserv mdag.DAGService) *Directory { return db } +var ErrNotADir = fmt.Errorf("merkledag node was not a directory or shard") + func NewDirectoryFromNode(dserv mdag.DAGService, nd node.Node) (*Directory, error) { pbnd, ok := nd.(*mdag.ProtoNode) if !ok { - return nil, mdag.ErrNotProtobuf + return nil, ErrNotADir } pbd, err := format.FromBytes(pbnd.Data()) @@ -76,7 +78,7 @@ func NewDirectoryFromNode(dserv mdag.DAGService, nd node.Node) (*Directory, erro shard: shard, }, nil default: - return nil, fmt.Errorf("merkledag node was not a directory or shard") + return nil, ErrNotADir } } From a22cae1bce7dfa9b67ed5c969c469e259eac0fd9 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 30 Apr 2017 14:01:48 -0700 Subject: [PATCH 2/2] fix coreapi unixfs resolving License: MIT Signed-off-by: Jeromy --- core/coreapi/coreapi.go | 8 +++++++- core/coreapi/unixfs.go | 21 ++++++++++++++++++--- core/coreapi/unixfs_test.go | 8 +++++++- path/resolver.go | 13 +++++++------ unixfs/hamt/hamt.go | 7 +++++-- unixfs/io/dirbuilder.go | 8 +++++++- unixfs/io/resolve.go | 33 ++++++++++++++++++++++----------- 7 files changed, 73 insertions(+), 25 deletions(-) diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index 29f2a6854c4..e0c3d15ccc4 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -6,6 +6,7 @@ import ( core "github.com/ipfs/go-ipfs/core" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" ipfspath "github.com/ipfs/go-ipfs/path" + uio "github.com/ipfs/go-ipfs/unixfs/io" cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid" ) @@ -42,8 +43,13 @@ func (api *CoreAPI) ResolvePath(ctx context.Context, p coreiface.Path) (coreifac return p, nil } + r := &ipfspath.Resolver{ + DAG: api.node.DAG, + ResolveOnce: uio.ResolveUnixfsOnce, + } + p2 := ipfspath.FromString(p.String()) - node, err := core.Resolve(ctx, api.node.Namesys, api.node.Resolver, p2) + node, err := core.Resolve(ctx, api.node.Namesys, r, p2) if err == core.ErrNoNamesys { return nil, coreiface.ErrOffline } else if err != nil { diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go index 7b39a84406c..d0a1c7bc8e9 100644 --- a/core/coreapi/unixfs.go +++ b/core/coreapi/unixfs.go @@ -9,6 +9,7 @@ import ( uio "github.com/ipfs/go-ipfs/unixfs/io" cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid" + node "gx/ipfs/Qmb3Hm9QDFmfYuET4pu7Kyg8JV78jFa1nvZx5vnCZsK4ck/go-ipld-format" ) type UnixfsAPI CoreAPI @@ -46,9 +47,23 @@ func (api *UnixfsAPI) Ls(ctx context.Context, p coreiface.Path) ([]*coreiface.Li return nil, err } - l := dagnode.Links() - links := make([]*coreiface.Link, len(l)) - for i, l := range l { + var ndlinks []*node.Link + dir, err := uio.NewDirectoryFromNode(api.node.DAG, dagnode) + switch err { + case nil: + l, err := dir.Links(ctx) + if err != nil { + return nil, err + } + ndlinks = l + case uio.ErrNotADir: + ndlinks = dagnode.Links() + default: + return nil, err + } + + links := make([]*coreiface.Link, len(ndlinks)) + for i, l := range ndlinks { links[i] = &coreiface.Link{l.Name, l.Size, l.Cid} } return links, nil diff --git a/core/coreapi/unixfs_test.go b/core/coreapi/unixfs_test.go index fc98d1c8804..c0cd3d85653 100644 --- a/core/coreapi/unixfs_test.go +++ b/core/coreapi/unixfs_test.go @@ -16,6 +16,7 @@ import ( config "github.com/ipfs/go-ipfs/repo/config" testutil "github.com/ipfs/go-ipfs/thirdparty/testutil" unixfs "github.com/ipfs/go-ipfs/unixfs" + cbor "gx/ipfs/QmNrbCt8j9DT5W9Pmjy2SdudT9k8GpaDr4sRuFix3BXhgR/go-ipld-cbor" ) // `echo -n 'hello, world!' | ipfs add` @@ -276,7 +277,12 @@ func TestLsNonUnixfs(t *testing.T) { t.Error(err) } - c, err := node.DAG.Add(new(mdag.ProtoNode)) + nd, err := cbor.WrapObject(map[string]interface{}{"foo": "bar"}) + if err != nil { + t.Fatal(err) + } + + c, err := node.DAG.Add(nd) if err != nil { t.Error(err) } diff --git a/path/resolver.go b/path/resolver.go index 84a6fe66c3a..22bde65ee16 100644 --- a/path/resolver.go +++ b/path/resolver.go @@ -37,7 +37,7 @@ func (e ErrNoLink) Error() string { type Resolver struct { DAG dag.DAGService - ResolveOnce func(ctx context.Context, ds dag.DAGService, nd node.Node, name string) (*node.Link, error) + ResolveOnce func(ctx context.Context, ds dag.DAGService, nd node.Node, names []string) (*node.Link, []string, error) } func NewBasicResolver(ds dag.DAGService) *Resolver { @@ -121,9 +121,10 @@ func (s *Resolver) ResolvePath(ctx context.Context, fpath Path) (node.Node, erro return nodes[len(nodes)-1], err } -func ResolveSingle(ctx context.Context, ds dag.DAGService, nd node.Node, name string) (*node.Link, error) { - lnk, _, err := nd.ResolveLink([]string{name}) - return lnk, err +// ResolveSingle simply resolves one hop of a path through a graph with no +// extra context (does not opaquely resolve through sharded nodes) +func ResolveSingle(ctx context.Context, ds dag.DAGService, nd node.Node, names []string) (*node.Link, []string, error) { + return nd.ResolveLink(names) } // ResolvePathComponents fetches the nodes for each segment of the given path. @@ -163,7 +164,7 @@ func (s *Resolver) ResolveLinks(ctx context.Context, ndd node.Node, names []stri ctx, cancel = context.WithTimeout(ctx, time.Minute) defer cancel() - lnk, err := s.ResolveOnce(ctx, s.DAG, nd, names[0]) + lnk, rest, err := s.ResolveOnce(ctx, s.DAG, nd, names) if err == dag.ErrLinkNotFound { return result, ErrNoLink{Name: names[0], Node: nd.Cid()} } else if err != nil { @@ -177,7 +178,7 @@ func (s *Resolver) ResolveLinks(ctx context.Context, ndd node.Node, names []stri nd = nextnode result = append(result, nextnode) - names = names[1:] + names = rest } return result, nil } diff --git a/unixfs/hamt/hamt.go b/unixfs/hamt/hamt.go index ceda529c9a4..d0b60a9c6f6 100644 --- a/unixfs/hamt/hamt.go +++ b/unixfs/hamt/hamt.go @@ -191,6 +191,7 @@ type shardValue struct { val *node.Link } +// Link returns a link to this node func (sv *shardValue) Link() (*node.Link, error) { return sv.val, nil } @@ -234,7 +235,8 @@ func (ds *HamtShard) Remove(ctx context.Context, name string) error { return ds.modifyValue(ctx, hv, name, nil) } -func (ds *HamtShard) Find(ctx context.Context, name string) (node.Node, error) { +// Find searches for a child node by 'name' within this hamt +func (ds *HamtShard) Find(ctx context.Context, name string) (*node.Link, error) { hv := &hashBits{b: hash([]byte(name))} var out *node.Link @@ -246,7 +248,7 @@ func (ds *HamtShard) Find(ctx context.Context, name string) (node.Node, error) { return nil, err } - return ds.dserv.Get(ctx, out.Cid) + return out, nil } // getChild returns the i'th child of this shard. If it is cached in the @@ -320,6 +322,7 @@ func (ds *HamtShard) setChild(i int, c child) { ds.children[i] = c } +// Link returns a merklelink to this shard node func (ds *HamtShard) Link() (*node.Link, error) { nd, err := ds.Node() if err != nil { diff --git a/unixfs/io/dirbuilder.go b/unixfs/io/dirbuilder.go index 285992081c4..8d850976308 100644 --- a/unixfs/io/dirbuilder.go +++ b/unixfs/io/dirbuilder.go @@ -48,6 +48,7 @@ func NewDirectory(dserv mdag.DAGService) *Directory { return db } +// ErrNotADir implies that the given node was not a unixfs directory var ErrNotADir = fmt.Errorf("merkledag node was not a directory or shard") func NewDirectoryFromNode(dserv mdag.DAGService, nd node.Node) (*Directory, error) { @@ -167,7 +168,12 @@ func (d *Directory) Find(ctx context.Context, name string) (node.Node, error) { return d.dserv.Get(ctx, lnk.Cid) } - return d.shard.Find(ctx, name) + lnk, err := d.shard.Find(ctx, name) + if err != nil { + return nil, err + } + + return lnk.GetNode(ctx, d.dserv) } func (d *Directory) RemoveChild(ctx context.Context, name string) error { diff --git a/unixfs/io/resolve.go b/unixfs/io/resolve.go index 16f360b4acb..f9213b55c38 100644 --- a/unixfs/io/resolve.go +++ b/unixfs/io/resolve.go @@ -10,37 +10,48 @@ import ( node "gx/ipfs/Qmb3Hm9QDFmfYuET4pu7Kyg8JV78jFa1nvZx5vnCZsK4ck/go-ipld-format" ) -func ResolveUnixfsOnce(ctx context.Context, ds dag.DAGService, nd node.Node, name string) (*node.Link, error) { +// ResolveUnixfsOnce resolves a single hop of a path through a graph in a +// unixfs context. This includes handling traversing sharded directories. +func ResolveUnixfsOnce(ctx context.Context, ds dag.DAGService, nd node.Node, names []string) (*node.Link, []string, error) { switch nd := nd.(type) { case *dag.ProtoNode: upb, err := ft.FromBytes(nd.Data()) if err != nil { // Not a unixfs node, use standard object traversal code - return nd.GetNodeLink(name) + lnk, err := nd.GetNodeLink(names[0]) + if err != nil { + return nil, nil, err + } + + return lnk, names[1:], nil } switch upb.GetType() { case ft.THAMTShard: s, err := hamt.NewHamtFromDag(ds, nd) if err != nil { - return nil, err + return nil, nil, err } - // TODO: optimized routine on HAMT for returning a dag.Link to avoid extra disk hits - out, err := s.Find(ctx, name) + out, err := s.Find(ctx, names[0]) if err != nil { - return nil, err + return nil, nil, err } - return node.MakeLink(out) + return out, names[1:], nil default: - return nd.GetNodeLink(name) + lnk, err := nd.GetNodeLink(names[0]) + if err != nil { + return nil, nil, err + } + + return lnk, names[1:], nil } default: - lnk, _, err := nd.ResolveLink([]string{name}) + lnk, rest, err := nd.ResolveLink(names) if err != nil { - return nil, err + return nil, nil, err } - return lnk, nil + return lnk, rest, nil } }