diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go new file mode 100644 index 00000000000..527abf040d3 --- /dev/null +++ b/core/coreapi/coreapi.go @@ -0,0 +1,26 @@ +package coreapi + +import ( + "context" + + core "github.com/ipfs/go-ipfs/core" + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" + path "github.com/ipfs/go-ipfs/path" + + ipld "gx/ipfs/QmU7bFWQ793qmvNy7outdCaMfSDNk8uqhx4VNrxYj5fj5g/go-ipld-node" +) + +func resolve(ctx context.Context, n *core.IpfsNode, p string) (ipld.Node, error) { + pp, err := path.ParsePath(p) + if err != nil { + return nil, err + } + + dagnode, err := core.Resolve(ctx, n.Namesys, n.Resolver, pp) + if err == core.ErrNoNamesys { + return nil, coreiface.ErrOffline + } else if err != nil { + return nil, err + } + return dagnode, nil +} diff --git a/core/coreapi/interface/interface.go b/core/coreapi/interface/interface.go new file mode 100644 index 00000000000..18328a2a091 --- /dev/null +++ b/core/coreapi/interface/interface.go @@ -0,0 +1,54 @@ +package iface + +import ( + "context" + "errors" + "io" + + ipld "gx/ipfs/QmU7bFWQ793qmvNy7outdCaMfSDNk8uqhx4VNrxYj5fj5g/go-ipld-node" + cid "gx/ipfs/QmXfiyr2RWEXpVDdaYnD2HNiBk6UBddsvEP4RPfXb6nGqY/go-cid" +) + +// type CoreAPI interface { +// ID() CoreID +// Version() CoreVersion +// } + +type Link ipld.Link + +type Reader interface { + io.ReadSeeker + io.Closer +} + +type UnixfsAPI interface { + Add(context.Context, io.Reader) (*cid.Cid, error) + Cat(context.Context, string) (Reader, error) + Ls(context.Context, string) ([]*Link, error) +} + +// type ObjectAPI interface { +// New() (cid.Cid, Object) +// Get(string) (Object, error) +// Links(string) ([]*Link, error) +// Data(string) (Reader, error) +// Stat(string) (ObjectStat, error) +// Put(Object) (cid.Cid, error) +// SetData(string, Reader) (cid.Cid, error) +// AppendData(string, Data) (cid.Cid, error) +// AddLink(string, string, string) (cid.Cid, error) +// RmLink(string, string) (cid.Cid, error) +// } + +// type ObjectStat struct { +// Cid cid.Cid +// NumLinks int +// BlockSize int +// LinksSize int +// DataSize int +// CumulativeSize int +// } + +var ErrIsDir = errors.New("object is a directory") +var ErrIsNonDag = errors.New("not a merkledag object") +var ErrOffline = errors.New("can't resolve, ipfs node is offline") diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go new file mode 100644 index 00000000000..5018e858daa --- /dev/null +++ b/core/coreapi/unixfs.go @@ -0,0 +1,59 @@ +package coreapi + +import ( + "context" + "io" + + core "github.com/ipfs/go-ipfs/core" + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" + coreunix "github.com/ipfs/go-ipfs/core/coreunix" + uio "github.com/ipfs/go-ipfs/unixfs/io" + + cid "gx/ipfs/QmXfiyr2RWEXpVDdaYnD2HNiBk6UBddsvEP4RPfXb6nGqY/go-cid" +) + +type UnixfsAPI struct { + node *core.IpfsNode +} + +func NewUnixfsAPI(n *core.IpfsNode) coreiface.UnixfsAPI { + api := &UnixfsAPI{n} + return api +} + +func (api *UnixfsAPI) Add(ctx context.Context, r io.Reader) (*cid.Cid, error) { + k, err := coreunix.AddWithContext(ctx, api.node, r) + if err != nil { + return nil, err + } + return cid.Decode(k) +} + +func (api *UnixfsAPI) Cat(ctx context.Context, p string) (coreiface.Reader, error) { + dagnode, err := resolve(ctx, api.node, p) + if err != nil { + return nil, err + } + + r, err := uio.NewDagReader(ctx, dagnode, api.node.DAG) + if err == uio.ErrIsDir { + return nil, coreiface.ErrIsDir + } else if err != nil { + return nil, err + } + return r, nil +} + +func (api *UnixfsAPI) Ls(ctx context.Context, p string) ([]*coreiface.Link, error) { + dagnode, err := resolve(ctx, api.node, p) + if err != nil { + return nil, err + } + + l := dagnode.Links() + links := make([]*coreiface.Link, len(l)) + for i, l := range l { + links[i] = &coreiface.Link{l.Name, l.Size, l.Cid} + } + return links, nil +} diff --git a/core/coreapi/unixfs_test.go b/core/coreapi/unixfs_test.go new file mode 100644 index 00000000000..61270551045 --- /dev/null +++ b/core/coreapi/unixfs_test.go @@ -0,0 +1,286 @@ +package coreapi_test + +import ( + "bytes" + "context" + "io" + "strings" + "testing" + + core "github.com/ipfs/go-ipfs/core" + coreapi "github.com/ipfs/go-ipfs/core/coreapi" + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" + coreunix "github.com/ipfs/go-ipfs/core/coreunix" + mdag "github.com/ipfs/go-ipfs/merkledag" + repo "github.com/ipfs/go-ipfs/repo" + config "github.com/ipfs/go-ipfs/repo/config" + testutil "github.com/ipfs/go-ipfs/thirdparty/testutil" + unixfs "github.com/ipfs/go-ipfs/unixfs" +) + +// `echo -n 'hello, world!' | ipfs add` +var hello = "QmQy2Dw4Wk7rdJKjThjYXzfFJNaRKRHhHP5gHHXroJMYxk" +var helloStr = "hello, world!" + +// `ipfs object new unixfs-dir` +var emptyUnixfsDir = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn" + +// `echo -n | ipfs add` +var emptyUnixfsFile = "QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH" + +func makeAPI(ctx context.Context) (*core.IpfsNode, coreiface.UnixfsAPI, error) { + r := &repo.Mock{ + C: config.Config{ + Identity: config.Identity{ + PeerID: "Qmfoo", // required by offline node + }, + }, + D: testutil.ThreadSafeCloserMapDatastore(), + } + node, err := core.NewNode(ctx, &core.BuildCfg{Repo: r}) + if err != nil { + return nil, nil, err + } + api := coreapi.NewUnixfsAPI(node) + return node, api, nil +} + +func TestAdd(t *testing.T) { + ctx := context.Background() + _, api, err := makeAPI(ctx) + if err != nil { + t.Error(err) + } + + str := strings.NewReader(helloStr) + c, err := api.Add(ctx, str) + if err != nil { + t.Error(err) + } + + if c.String() != hello { + t.Fatalf("expected CID %s, got: %s", hello, c) + } + + r, err := api.Cat(ctx, hello) + if err != nil { + t.Fatal(err) + } + buf := make([]byte, len(helloStr)) + _, err = io.ReadFull(r, buf) + if err != nil { + t.Error(err) + } + + if string(buf) != helloStr { + t.Fatalf("expected [%s], got [%s] [err=%s]", helloStr, string(buf), err) + } +} + +func TestAddEmptyFile(t *testing.T) { + ctx := context.Background() + _, api, err := makeAPI(ctx) + if err != nil { + t.Error(err) + } + + str := strings.NewReader("") + c, err := api.Add(ctx, str) + if err != nil { + t.Error(err) + } + + if c.String() != emptyUnixfsFile { + t.Fatalf("expected CID %s, got: %s", hello, c) + } +} + +func TestCatBasic(t *testing.T) { + ctx := context.Background() + node, api, err := makeAPI(ctx) + if err != nil { + t.Fatal(err) + } + + hr := strings.NewReader(helloStr) + k, err := coreunix.Add(node, hr) + if err != nil { + t.Fatal(err) + } + + if k != hello { + t.Fatalf("expected CID %s, got: %s", hello, k) + } + + r, err := api.Cat(ctx, k) + if err != nil { + t.Fatal(err) + } + + buf := make([]byte, len(helloStr)) + _, err = io.ReadFull(r, buf) + if err != nil { + t.Error(err) + } + if string(buf) != helloStr { + t.Fatalf("expected [%s], got [%s] [err=%s]", helloStr, string(buf), err) + } +} + +func TestCatEmptyFile(t *testing.T) { + ctx := context.Background() + node, api, err := makeAPI(ctx) + if err != nil { + t.Fatal(err) + } + + _, err = coreunix.Add(node, strings.NewReader("")) + if err != nil { + t.Fatal(err) + } + + r, err := api.Cat(ctx, emptyUnixfsFile) + if err != nil { + t.Fatal(err) + } + + buf := make([]byte, 1) // non-zero so that Read() actually tries to read + n, err := io.ReadFull(r, buf) + if err != nil && err != io.EOF { + t.Error(err) + } + if !bytes.HasPrefix(buf, []byte{0x00}) { + t.Fatalf("expected empty data, got [%s] [read=%d]", buf, n) + } +} + +func TestCatDir(t *testing.T) { + ctx := context.Background() + node, api, err := makeAPI(ctx) + if err != nil { + t.Error(err) + } + + c, err := node.DAG.Add(unixfs.EmptyDirNode()) + if err != nil { + t.Error(err) + } + + _, err = api.Cat(ctx, c.String()) + if err != coreiface.ErrIsDir { + t.Fatalf("expected ErrIsDir, got: %s", err) + } +} + +func TestCatNonUnixfs(t *testing.T) { + ctx := context.Background() + node, api, err := makeAPI(ctx) + if err != nil { + t.Error(err) + } + + c, err := node.DAG.Add(new(mdag.ProtoNode)) + if err != nil { + t.Error(err) + } + + _, err = api.Cat(ctx, c.String()) + if !strings.Contains(err.Error(), "proto: required field") { + t.Fatalf("expected protobuf error, got: %s", err) + } +} + +func TestCatOffline(t *testing.T) { + ctx := context.Background() + _, api, err := makeAPI(ctx) + if err != nil { + t.Error(err) + } + + _, err = api.Cat(ctx, "/ipns/Qmfoobar") + if err != coreiface.ErrOffline { + t.Fatalf("expected ErrOffline, got: %", err) + } +} + +func TestLs(t *testing.T) { + ctx := context.Background() + node, api, err := makeAPI(ctx) + if err != nil { + t.Error(err) + } + + r := strings.NewReader("content-of-file") + p, _, err := coreunix.AddWrapped(node, r, "name-of-file") + if err != nil { + t.Error(err) + } + parts := strings.Split(p, "/") + if len(parts) != 2 { + t.Errorf("unexpected path:", p) + } + k := parts[0] + + links, err := api.Ls(ctx, k) + if err != nil { + t.Error(err) + } + + if len(links) != 1 { + t.Fatalf("expected 1 link, got %d", len(links)) + } + if links[0].Size != 23 { + t.Fatalf("expected size = 23, got %d", links[0].Size) + } + if links[0].Name != "name-of-file" { + t.Fatalf("expected name = name-of-file, got %s", links[0].Name) + } + if links[0].Cid.String() != "QmX3qQVKxDGz3URVC3861Z3CKtQKGBn6ffXRBBWGMFz9Lr" { + t.Fatalf("expected cid = QmX3qQVKxDGz3URVC3861Z3CKtQKGBn6ffXRBBWGMFz9Lr, got %s", links[0].Cid.String()) + } +} + +func TestLsEmptyDir(t *testing.T) { + ctx := context.Background() + node, api, err := makeAPI(ctx) + if err != nil { + t.Error(err) + } + + c, err := node.DAG.Add(unixfs.EmptyDirNode()) + if err != nil { + t.Error(err) + } + + links, err := api.Ls(ctx, c.String()) + if err != nil { + t.Error(err) + } + + if len(links) != 0 { + t.Fatalf("expected 0 links, got %d", len(links)) + } +} + +// TODO(lgierth) this should test properly, with len(links) > 0 +func TestLsNonUnixfs(t *testing.T) { + ctx := context.Background() + node, api, err := makeAPI(ctx) + if err != nil { + t.Error(err) + } + + c, err := node.DAG.Add(new(mdag.ProtoNode)) + if err != nil { + t.Error(err) + } + + links, err := api.Ls(ctx, c.String()) + if err != nil { + t.Error(err) + } + + if len(links) != 0 { + t.Fatalf("expected 0 links, got %d", len(links)) + } +} diff --git a/core/corehttp/gateway.go b/core/corehttp/gateway.go index 85b56e723fe..6663e8b7359 100644 --- a/core/corehttp/gateway.go +++ b/core/corehttp/gateway.go @@ -6,6 +6,7 @@ import ( "net/http" core "github.com/ipfs/go-ipfs/core" + coreapi "github.com/ipfs/go-ipfs/core/coreapi" config "github.com/ipfs/go-ipfs/repo/config" id "gx/ipfs/QmQfvKShQ2v7nkfCE4ygisxpcSBFvBYaorQ54SibY6PGXV/go-libp2p/p2p/protocol/identify" ) @@ -27,7 +28,7 @@ func GatewayOption(writable bool, paths ...string) ServeOption { Headers: cfg.Gateway.HTTPHeaders, Writable: writable, PathPrefixes: cfg.Gateway.PathPrefixes, - }) + }, coreapi.NewUnixfsAPI(n)) for _, p := range paths { mux.Handle(p+"/", gateway) @@ -37,7 +38,7 @@ func GatewayOption(writable bool, paths ...string) ServeOption { } func VersionOption() ServeOption { - return func(n *core.IpfsNode, _ net.Listener, mux *http.ServeMux) (*http.ServeMux, error) { + return func(_ *core.IpfsNode, _ net.Listener, mux *http.ServeMux) (*http.ServeMux, error) { mux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Commit: %s\n", config.CurrentCommit) fmt.Fprintf(w, "Client Version: %s\n", id.ClientVersion) diff --git a/core/corehttp/gateway_handler.go b/core/corehttp/gateway_handler.go index ea0ef5c5de4..8bf781ef723 100644 --- a/core/corehttp/gateway_handler.go +++ b/core/corehttp/gateway_handler.go @@ -16,9 +16,10 @@ import ( chunk "github.com/ipfs/go-ipfs/importer/chunk" dag "github.com/ipfs/go-ipfs/merkledag" dagutils "github.com/ipfs/go-ipfs/merkledag/utils" + + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" path "github.com/ipfs/go-ipfs/path" ft "github.com/ipfs/go-ipfs/unixfs" - uio "github.com/ipfs/go-ipfs/unixfs/io" humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize" routing "gx/ipfs/QmQKEgGgYCDyk8VNY6A65FpuE4YwbspvjXHco1rdb75PVc/go-libp2p-routing" @@ -36,12 +37,14 @@ const ( type gatewayHandler struct { node *core.IpfsNode config GatewayConfig + api coreiface.UnixfsAPI } -func newGatewayHandler(node *core.IpfsNode, conf GatewayConfig) *gatewayHandler { +func newGatewayHandler(n *core.IpfsNode, c GatewayConfig, api coreiface.UnixfsAPI) *gatewayHandler { i := &gatewayHandler{ - node: node, - config: conf, + node: n, + config: c, + api: api, } return i } @@ -57,6 +60,21 @@ func (i *gatewayHandler) newDagFromReader(r io.Reader) (node.Node, error) { // TODO(btc): break this apart into separate handlers using a more expressive muxer func (i *gatewayHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(i.node.Context(), time.Hour) + // the hour is a hard fallback, we don't expect it to happen, but just in case + defer cancel() + + if cn, ok := w.(http.CloseNotifier); ok { + clientGone := cn.CloseNotify() + go func() { + select { + case <-clientGone: + case <-ctx.Done(): + } + cancel() + }() + } + defer func() { if r := recover(); r != nil { log.Error("A panic occurred in the gateway handler!") @@ -68,7 +86,7 @@ func (i *gatewayHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if i.config.Writable { switch r.Method { case "POST": - i.postHandler(w, r) + i.postHandler(ctx, w, r) return case "PUT": i.putHandler(w, r) @@ -80,7 +98,7 @@ func (i *gatewayHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if r.Method == "GET" || r.Method == "HEAD" { - i.getOrHeadHandler(w, r) + i.getOrHeadHandler(ctx, w, r) return } @@ -110,21 +128,7 @@ func (i *gatewayHandler) optionsHandler(w http.ResponseWriter, r *http.Request) i.addUserHeaders(w) // return all custom headers (including CORS ones, if set) } -func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request) { - ctx, cancel := context.WithTimeout(i.node.Context(), time.Hour) - // the hour is a hard fallback, we don't expect it to happen, but just in case - defer cancel() - - if cn, ok := w.(http.CloseNotifier); ok { - clientGone := cn.CloseNotify() - go func() { - select { - case <-clientGone: - case <-ctx.Done(): - } - cancel() - }() - } +func (i *gatewayHandler) getOrHeadHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { urlPath := r.URL.Path @@ -154,27 +158,19 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request ipnsHostname = true } - p, err := path.ParsePath(urlPath) - if err != nil { - webError(w, "Invalid Path Error", err, http.StatusBadRequest) - return - } - - nd, err := core.Resolve(ctx, i.node.Namesys, i.node.Resolver, p) - // If node is in offline mode the error code and message should be different - if err == core.ErrNoNamesys && !i.node.OnlineMode() { + dr, err := i.api.Cat(ctx, urlPath) + dir := false + if err == coreiface.ErrIsDir { + dir = true + } else if err == coreiface.ErrOffline { w.WriteHeader(http.StatusServiceUnavailable) fmt.Fprint(w, "Could not resolve path. Node is in offline mode.") return } else if err != nil { webError(w, "Path Resolve error", err, http.StatusBadRequest) return - } - - pbnd, ok := nd.(*dag.ProtoNode) - if !ok { - webError(w, "Cannot read non protobuf nodes through gateway", dag.ErrNotProtobuf, http.StatusBadRequest) - return + } else { + defer dr.Close() } etag := gopath.Base(urlPath) @@ -204,13 +200,6 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request w.Header().Set("Suborigin", pathRoot) } - dr, err := uio.NewDagReader(ctx, pbnd, i.node.DAG) - if err != nil && err != uio.ErrIsDir { - // not a directory and still an error - internalWebError(w, err) - return - } - // set these headers _after_ the error, for we may just not have it // and dont want the client to cache a 500 response... // and only if it's /ipfs! @@ -224,18 +213,23 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request modtime = time.Unix(1, 0) } - if err == nil { - defer dr.Close() + if !dir { name := gopath.Base(urlPath) http.ServeContent(w, r, name, modtime, dr) return } + links, err := i.api.Ls(ctx, urlPath) + if err != nil { + internalWebError(w, err) + return + } + // storage for directory listing var dirListing []directoryItem // loop through files foundIndex := false - for _, link := range nd.Links() { + for _, link := range links { if link.Name == "index.html" { log.Debugf("found index.html link for %s", urlPath) foundIndex = true @@ -254,19 +248,7 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request } // return index page instead. - nd, err := core.Resolve(ctx, i.node.Namesys, i.node.Resolver, p) - if err != nil { - internalWebError(w, err) - return - } - - pbnd, ok := nd.(*dag.ProtoNode) - if !ok { - internalWebError(w, dag.ErrNotProtobuf) - return - } - - dr, err := uio.NewDagReader(ctx, pbnd, i.node.DAG) + dr, err := i.api.Cat(ctx, p.String()) if err != nil { internalWebError(w, err) return @@ -332,14 +314,8 @@ func (i *gatewayHandler) getOrHeadHandler(w http.ResponseWriter, r *http.Request } } -func (i *gatewayHandler) postHandler(w http.ResponseWriter, r *http.Request) { - nd, err := i.newDagFromReader(r.Body) - if err != nil { - internalWebError(w, err) - return - } - - k, err := i.node.DAG.Add(nd) +func (i *gatewayHandler) postHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { + k, err := i.api.Add(ctx, r.Body) if err != nil { internalWebError(w, err) return diff --git a/core/coreunix/add.go b/core/coreunix/add.go index 5e224808db6..5221b3472db 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -254,6 +254,10 @@ func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error { // Add builds a merkledag from the a reader, pinning all objects to the local // datastore. Returns a key representing the root node. func Add(n *core.IpfsNode, r io.Reader) (string, error) { + return AddWithContext(n.Context(), n, r) +} + +func AddWithContext(ctx context.Context, n *core.IpfsNode, r io.Reader) (string, error) { defer n.Blockstore.PinLock().Unlock() fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)