Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a bitswap session for 'Cat' #4641

Merged
merged 2 commits into from
Feb 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (

core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
namesys "github.com/ipfs/go-ipfs/namesys"
ipfspath "github.com/ipfs/go-ipfs/path"
uio "github.com/ipfs/go-ipfs/unixfs/io"

cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

type CoreAPI struct {
Expand Down Expand Up @@ -49,12 +51,16 @@ func (api *CoreAPI) Object() coreiface.ObjectAPI {
// ResolveNode resolves the path `p` using Unixfx resolver, gets and returns the
// resolved Node.
func (api *CoreAPI) ResolveNode(ctx context.Context, p coreiface.Path) (coreiface.Node, error) {
p, err := api.ResolvePath(ctx, p)
return resolveNode(ctx, api.node.DAG, api.node.Namesys, p)
}

func resolveNode(ctx context.Context, ng ipld.NodeGetter, nsys namesys.NameSystem, p coreiface.Path) (coreiface.Node, error) {
p, err := resolvePath(ctx, ng, nsys, p)
if err != nil {
return nil, err
}

node, err := api.node.DAG.Get(ctx, p.Cid())
node, err := ng.Get(ctx, p.Cid())
if err != nil {
return nil, err
}
Expand All @@ -65,17 +71,21 @@ func (api *CoreAPI) ResolveNode(ctx context.Context, p coreiface.Path) (coreifac
// resolved path.
// TODO: store all of ipfspath.Resolver.ResolvePathComponents() in Path
func (api *CoreAPI) ResolvePath(ctx context.Context, p coreiface.Path) (coreiface.Path, error) {
return resolvePath(ctx, api.node.DAG, api.node.Namesys, p)
}

func resolvePath(ctx context.Context, ng ipld.NodeGetter, nsys namesys.NameSystem, p coreiface.Path) (coreiface.Path, error) {
if p.Resolved() {
return p, nil
}

r := &ipfspath.Resolver{
DAG: api.node.DAG,
DAG: ng,
ResolveOnce: uio.ResolveUnixfsOnce,
}

p2 := ipfspath.FromString(p.String())
node, err := core.Resolve(ctx, api.node.Namesys, r, p2)
node, err := core.Resolve(ctx, nsys, r, p2)
if err == core.ErrNoNamesys {
return nil, coreiface.ErrOffline
} else if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
coreunix "github.com/ipfs/go-ipfs/core/coreunix"
dag "github.com/ipfs/go-ipfs/merkledag"
uio "github.com/ipfs/go-ipfs/unixfs/io"

cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
Expand All @@ -30,12 +31,14 @@ func (api *UnixfsAPI) Add(ctx context.Context, r io.Reader) (coreiface.Path, err

// Cat returns the data contained by an IPFS or IPNS object(s) at path `p`.
func (api *UnixfsAPI) Cat(ctx context.Context, p coreiface.Path) (coreiface.Reader, error) {
dagnode, err := api.core().ResolveNode(ctx, p)
ses := dag.NewSession(ctx, api.node.DAG)

dagnode, err := resolveNode(ctx, ses, api.node.Namesys, p)
if err != nil {
return nil, err
}

r, err := uio.NewDagReader(ctx, dagnode, api.node.DAG)
r, err := uio.NewDagReader(ctx, dagnode, ses)
if err == uio.ErrIsDir {
return nil, coreiface.ErrIsDir
} else if err != nil {
Expand Down
41 changes: 41 additions & 0 deletions merkledag/errservice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package merkledag

import (
"context"

cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

// ErrorService implements ipld.DAGService, returning 'Err' for every call.
type ErrorService struct {
Err error
}

var _ ipld.DAGService = (*ErrorService)(nil)

func (cs *ErrorService) Add(ctx context.Context, nd ipld.Node) error {
return cs.Err
}

func (cs *ErrorService) AddMany(ctx context.Context, nds []ipld.Node) error {
return cs.Err
}

func (cs *ErrorService) Get(ctx context.Context, c *cid.Cid) (ipld.Node, error) {
return nil, cs.Err
}

func (cs *ErrorService) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *ipld.NodeOption {
ch := make(chan *ipld.NodeOption)
close(ch)
return ch
}

func (cs *ErrorService) Remove(ctx context.Context, c *cid.Cid) error {
return cs.Err
}

func (cs *ErrorService) RemoveMany(ctx context.Context, cids []*cid.Cid) error {
return cs.Err
}
5 changes: 5 additions & 0 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ func (sg *sesGetter) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *ipld.
return getNodesFromBG(ctx, sg.bs, keys)
}

// Session returns a NodeGetter using a new session for block fetches.
func (ds *dagService) Session(ctx context.Context) ipld.NodeGetter {
return &sesGetter{bserv.NewSession(ctx, ds.Blocks)}
}

// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *cid.Cid, serv ipld.DAGService) error {
var ng ipld.NodeGetter = serv
Expand Down
20 changes: 20 additions & 0 deletions merkledag/readonly.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package merkledag

import (
"fmt"

ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

// ErrReadOnly is used when a read-only datastructure is written to.
var ErrReadOnly = fmt.Errorf("cannot write to readonly DAGService")

// NewReadOnlyDagService takes a NodeGetter, and returns a full DAGService
// implementation that returns ErrReadOnly when its 'write' methods are
// invoked.
func NewReadOnlyDagService(ng ipld.NodeGetter) ipld.DAGService {
return &ComboService{
Read: ng,
Write: &ErrorService{ErrReadOnly},
}
}
64 changes: 64 additions & 0 deletions merkledag/readonly_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package merkledag_test

import (
"context"
"testing"

. "github.com/ipfs/go-ipfs/merkledag"
dstest "github.com/ipfs/go-ipfs/merkledag/test"

cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

func TestReadonlyProperties(t *testing.T) {
ds := dstest.Mock()
ro := NewReadOnlyDagService(ds)

ctx := context.Background()
nds := []ipld.Node{
NewRawNode([]byte("foo1")),
NewRawNode([]byte("foo2")),
NewRawNode([]byte("foo3")),
NewRawNode([]byte("foo4")),
}
cids := []*cid.Cid{
nds[0].Cid(),
nds[1].Cid(),
nds[2].Cid(),
nds[3].Cid(),
}

// add to the actual underlying datastore
if err := ds.Add(ctx, nds[2]); err != nil {
t.Fatal(err)
}
if err := ds.Add(ctx, nds[3]); err != nil {
t.Fatal(err)
}

if err := ro.Add(ctx, nds[0]); err != ErrReadOnly {
t.Fatal("expected ErrReadOnly")
}
if err := ro.Add(ctx, nds[2]); err != ErrReadOnly {
t.Fatal("expected ErrReadOnly")
}

if err := ro.AddMany(ctx, nds[0:1]); err != ErrReadOnly {
t.Fatal("expected ErrReadOnly")
}

if err := ro.Remove(ctx, cids[3]); err != ErrReadOnly {
t.Fatal("expected ErrReadOnly")
}
if err := ro.RemoveMany(ctx, cids[1:2]); err != ErrReadOnly {
t.Fatal("expected ErrReadOnly")
}

if _, err := ro.Get(ctx, cids[0]); err != ipld.ErrNotFound {
t.Fatal("expected ErrNotFound")
}
if _, err := ro.Get(ctx, cids[3]); err != nil {
t.Fatal(err)
}
}
41 changes: 41 additions & 0 deletions merkledag/rwservice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package merkledag

import (
"context"

cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

// ComboService implements ipld.DAGService, using 'Read' for all fetch methods,
// and 'Write' for all methods that add new objects.
type ComboService struct {
Read ipld.NodeGetter
Write ipld.DAGService
}

var _ ipld.DAGService = (*ComboService)(nil)

func (cs *ComboService) Add(ctx context.Context, nd ipld.Node) error {
return cs.Write.Add(ctx, nd)
}

func (cs *ComboService) AddMany(ctx context.Context, nds []ipld.Node) error {
return cs.Write.AddMany(ctx, nds)
}

func (cs *ComboService) Get(ctx context.Context, c *cid.Cid) (ipld.Node, error) {
return cs.Read.Get(ctx, c)
}

func (cs *ComboService) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *ipld.NodeOption {
return cs.Read.GetMany(ctx, cids)
}

func (cs *ComboService) Remove(ctx context.Context, c *cid.Cid) error {
return cs.Write.Remove(ctx, c)
}

func (cs *ComboService) RemoveMany(ctx context.Context, cids []*cid.Cid) error {
return cs.Write.RemoveMany(ctx, cids)
}
21 changes: 21 additions & 0 deletions merkledag/session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package merkledag

import (
"context"

ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

// SessionMaker is an object that can generate a new fetching session.
type SessionMaker interface {
Session(context.Context) ipld.NodeGetter
}

// NewSession returns a session backed NodeGetter if the given NodeGetter
// implements SessionMaker.
func NewSession(ctx context.Context, g ipld.NodeGetter) ipld.NodeGetter {
if sm, ok := g.(SessionMaker); ok {
return sm.Session(ctx)
}
return g
}
6 changes: 3 additions & 3 deletions path/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func (e ErrNoLink) Error() string {
// TODO: now that this is more modular, try to unify this code with the
// the resolvers in namesys
type Resolver struct {
DAG ipld.DAGService
DAG ipld.NodeGetter

ResolveOnce func(ctx context.Context, ds ipld.DAGService, nd ipld.Node, names []string) (*ipld.Link, []string, error)
ResolveOnce func(ctx context.Context, ds ipld.NodeGetter, nd ipld.Node, names []string) (*ipld.Link, []string, error)
}

// NewBasicResolver constructs a new basic resolver.
Expand Down Expand Up @@ -124,7 +124,7 @@ func (s *Resolver) ResolvePath(ctx context.Context, fpath Path) (ipld.Node, erro

// ResolveSingle simply resolves one hop of a path through a graph with no
// extra context (does not opaquely resolve through sharded nodes)
func ResolveSingle(ctx context.Context, ds ipld.DAGService, nd ipld.Node, names []string) (*ipld.Link, []string, error) {
func ResolveSingle(ctx context.Context, ds ipld.NodeGetter, nd ipld.Node, names []string) (*ipld.Link, []string, error) {
return nd.ResolveLink(names)
}

Expand Down
2 changes: 1 addition & 1 deletion unixfs/io/dagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ReadSeekCloser interface {

// NewDagReader creates a new reader object that reads the data represented by
// the given node, using the passed in DAGService for data retreival
func NewDagReader(ctx context.Context, n ipld.Node, serv ipld.DAGService) (DagReader, error) {
func NewDagReader(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) (DagReader, error) {
switch n := n.(type) {
case *mdag.RawNode:
return NewBufDagReader(n.RawData()), nil
Expand Down
4 changes: 2 additions & 2 deletions unixfs/io/pbdagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

// DagReader provides a way to easily read the data contained in a dag.
type pbDagReader struct {
serv ipld.DAGService
serv ipld.NodeGetter

// the node being read
node *mdag.ProtoNode
Expand Down Expand Up @@ -51,7 +51,7 @@ type pbDagReader struct {
var _ DagReader = (*pbDagReader)(nil)

// NewPBFileReader constructs a new PBFileReader.
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv ipld.DAGService) *pbDagReader {
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv ipld.NodeGetter) *pbDagReader {
fctx, cancel := context.WithCancel(ctx)
curLinks := getLinkCids(n)
return &pbDagReader{
Expand Down
5 changes: 3 additions & 2 deletions unixfs/io/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// ResolveUnixfsOnce resolves a single hop of a path through a graph in a
// unixfs context. This includes handling traversing sharded directories.
func ResolveUnixfsOnce(ctx context.Context, ds ipld.DAGService, nd ipld.Node, names []string) (*ipld.Link, []string, error) {
func ResolveUnixfsOnce(ctx context.Context, ds ipld.NodeGetter, nd ipld.Node, names []string) (*ipld.Link, []string, error) {
switch nd := nd.(type) {
case *dag.ProtoNode:
upb, err := ft.FromBytes(nd.Data())
Expand All @@ -28,7 +28,8 @@ func ResolveUnixfsOnce(ctx context.Context, ds ipld.DAGService, nd ipld.Node, na

switch upb.GetType() {
case ft.THAMTShard:
s, err := hamt.NewHamtFromDag(ds, nd)
rods := dag.NewReadOnlyDagService(ds)
s, err := hamt.NewHamtFromDag(rods, nd)
if err != nil {
return nil, nil, err
}
Expand Down