diff --git a/archive/tar/writer.go b/archive/tar/writer.go index bc1253d3d..0e91ba9b0 100644 --- a/archive/tar/writer.go +++ b/archive/tar/writer.go @@ -60,7 +60,11 @@ func (w *Writer) writeFile(nd *mdag.ProtoNode, fsNode *ft.FSNode, fpath string) return err } - dagr := uio.NewPBFileReader(w.ctx, nd, fsNode, w.Dag) + dagr, err := uio.NewDagReader(w.ctx, nd, w.Dag) + if err != nil { + return err + } + if _, err := dagr.WriteTo(w.TarW); err != nil { return err } diff --git a/io/bufdagreader.go b/io/bufdagreader.go deleted file mode 100644 index 48efe98ad..000000000 --- a/io/bufdagreader.go +++ /dev/null @@ -1,39 +0,0 @@ -package io - -import ( - "bytes" - "context" -) - -// BufDagReader implements a DagReader that reads from a byte slice -// using a bytes.Reader. It is used for RawNodes. -type BufDagReader struct { - *bytes.Reader -} - -// NewBufDagReader returns a DAG reader for the given byte slice. -// BufDagReader is used to read RawNodes. -func NewBufDagReader(b []byte) *BufDagReader { - return &BufDagReader{bytes.NewReader(b)} -} - -var _ DagReader = (*BufDagReader)(nil) - -// Close is a nop. -func (*BufDagReader) Close() error { - return nil -} - -// CtxReadFull reads the slice onto b. -func (rd *BufDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) { - return rd.Read(b) -} - -// Size returns the size of the buffer. -func (rd *BufDagReader) Size() uint64 { - s := rd.Reader.Size() - if s < 0 { - panic("size smaller than 0 (impossible!!)") - } - return uint64(s) -} diff --git a/io/dagreader.go b/io/dagreader.go index cf980d2be..d5de73661 100644 --- a/io/dagreader.go +++ b/io/dagreader.go @@ -1,13 +1,14 @@ package io import ( + "bytes" "context" "errors" "io" ipld "github.com/ipfs/go-ipld-format" mdag "github.com/ipfs/go-merkledag" - ft "github.com/ipfs/go-unixfs" + unixfs "github.com/ipfs/go-unixfs" ) // Common errors @@ -17,6 +18,10 @@ var ( ErrUnkownNodeType = errors.New("unknown node type") ) +// TODO: Rename the `DagReader` interface, this doesn't read *any* DAG, just +// DAGs with UnixFS node (and it *belongs* to the `unixfs` package). Some +// alternatives: `FileReader`, `UnixFSFileReader`, `UnixFSReader`. + // A DagReader provides read-only read and seek acess to a unixfs file. // Different implementations of readers are used for the different // types of unixfs/protobuf-encoded nodes. @@ -35,24 +40,29 @@ type ReadSeekCloser interface { } // NewDagReader creates a new reader object that reads the data represented by -// the given node, using the passed in DAGService for data retrieval +// the given node, using the passed in DAGService for data retrieval. func NewDagReader(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) (DagReader, error) { + var size uint64 + switch n := n.(type) { case *mdag.RawNode: - return NewBufDagReader(n.RawData()), nil + size = uint64(len(n.RawData())) + case *mdag.ProtoNode: - fsNode, err := ft.FSNodeFromBytes(n.Data()) + fsNode, err := unixfs.FSNodeFromBytes(n.Data()) if err != nil { return nil, err } switch fsNode.Type() { - case ft.TDirectory, ft.THAMTShard: + case unixfs.TFile, unixfs.TRaw: + size = fsNode.FileSize() + + case unixfs.TDirectory, unixfs.THAMTShard: // Dont allow reading directories return nil, ErrIsDir - case ft.TFile, ft.TRaw: - return NewPBFileReader(ctx, n, fsNode, serv), nil - case ft.TMetadata: + + case unixfs.TMetadata: if len(n.Links()) == 0 { return nil, errors.New("incorrectly formatted metadata object") } @@ -66,12 +76,421 @@ func NewDagReader(ctx context.Context, n ipld.Node, serv ipld.NodeGetter) (DagRe return nil, mdag.ErrNotProtobuf } return NewDagReader(ctx, childpb, serv) - case ft.TSymlink: + case unixfs.TSymlink: return nil, ErrCantReadSymlinks default: - return nil, ft.ErrUnrecognizedType + return nil, unixfs.ErrUnrecognizedType } default: return nil, ErrUnkownNodeType } + + ctxWithCancel, cancel := context.WithCancel(ctx) + + return &dagReader{ + ctx: ctxWithCancel, + cancel: cancel, + serv: serv, + size: size, + rootNode: n, + dagWalker: ipld.NewWalker(ctxWithCancel, ipld.NewNavigableIPLDNode(n, serv)), + }, nil +} + +// dagReader provides a way to easily read the data contained in a dag. +type dagReader struct { + + // Structure to perform the DAG iteration and search, the reader + // just needs to add logic to the `Visitor` callback passed to + // `Iterate` and `Seek`. + dagWalker *ipld.Walker + + // Buffer with the data extracted from the current node being visited. + // To avoid revisiting a node to complete a (potential) partial read + // (or read after seek) the node's data is fully extracted in a single + // `readNodeDataBuffer` operation. + currentNodeData *bytes.Reader + + // Implements the `Size()` API. + size uint64 + + // Current offset for the read head within the DAG file. + offset int64 + + // Root node of the DAG, stored to re-create the `dagWalker` (effectively + // re-setting the position of the reader, used during `Seek`). + rootNode ipld.Node + + // Context passed to the `dagWalker`, the `cancel` function is used to + // cancel read operations (cancelling requested child node promises, + // see `ipld.NavigableIPLDNode.FetchChild` for details). + ctx context.Context + cancel func() + + // Passed to the `dagWalker` that will use it to request nodes. + // TODO: Revisit name. + serv ipld.NodeGetter +} + +// Size returns the total size of the data from the DAG structured file. +func (dr *dagReader) Size() uint64 { + return dr.size +} + +// Read implements the `io.Reader` interface through the `CtxReadFull` +// method using the DAG reader's internal context. +func (dr *dagReader) Read(b []byte) (int, error) { + return dr.CtxReadFull(dr.ctx, b) +} + +// CtxReadFull reads data from the DAG structured file. It always +// attempts a full read of the DAG until the `out` buffer is full. +// It uses the `Walker` structure to iterate the file DAG and read +// every node's data into the `out` buffer. +func (dr *dagReader) CtxReadFull(ctx context.Context, out []byte) (n int, err error) { + // Set the `dagWalker`'s context to the `ctx` argument, it will be used + // to fetch the child node promises (see + // `ipld.NavigableIPLDNode.FetchChild` for details). + dr.dagWalker.SetContext(ctx) + + // If there was a partially read buffer from the last visited + // node read it before visiting a new one. + if dr.currentNodeData != nil { + // TODO: Move this check inside `readNodeDataBuffer`? + n = dr.readNodeDataBuffer(out) + + if n == len(out) { + return n, nil + // Output buffer full, no need to traverse the DAG. + } + } + + // Iterate the DAG calling the passed `Visitor` function on every node + // to read its data into the `out` buffer, stop if there is an error or + // if the entire DAG is traversed (`EndOfDag`). + err = dr.dagWalker.Iterate(func(visitedNode ipld.NavigableNode) error { + node := ipld.ExtractIPLDNode(visitedNode) + + // Skip internal nodes, they shouldn't have any file data + // (see the `balanced` package for more details). + if len(node.Links()) > 0 { + return nil + } + + err = dr.saveNodeData(node) + if err != nil { + return err + } + // Save the leaf node file data in a buffer in case it is only + // partially read now and future `CtxReadFull` calls reclaim the + // rest (as each node is visited only once during `Iterate`). + // + // TODO: We could check if the entire node's data can fit in the + // remaining `out` buffer free space to skip this intermediary step. + + n += dr.readNodeDataBuffer(out[n:]) + + if n == len(out) { + // Output buffer full, no need to keep traversing the DAG, + // signal the `Walker` to pause the iteration. + dr.dagWalker.Pause() + } + + return nil + }) + + if err == ipld.EndOfDag { + return n, io.EOF + // Reached the end of the (DAG) file, no more data to read. + } else if err != nil { + return n, err + // Pass along any other errors from the `Visitor`. + } + + return n, nil +} + +// Save the UnixFS `node`'s data into the internal `currentNodeData` buffer to +// later move it to the output buffer (`Read`) or seek into it (`Seek`). +func (dr *dagReader) saveNodeData(node ipld.Node) error { + extractedNodeData, err := unixfs.ReadUnixFSNodeData(node) + if err != nil { + return err + } + + dr.currentNodeData = bytes.NewReader(extractedNodeData) + return nil +} + +// Read the `currentNodeData` buffer into `out`. This function can't have +// any errors as it's always reading from a `bytes.Reader` and asking only +// the available data in it. +func (dr *dagReader) readNodeDataBuffer(out []byte) int { + + n, _ := dr.currentNodeData.Read(out) + // Ignore the error as the EOF may not be returned in the first + // `Read` call, explicitly ask for an empty buffer below to check + // if we've reached the end. + + if dr.currentNodeData.Len() == 0 { + dr.currentNodeData = nil + // Signal that the buffer was consumed (for later `Read` calls). + // This shouldn't return an EOF error as it's just the end of a + // single node's data, not the entire DAG. + } + + dr.offset += int64(n) + // TODO: Should `offset` be incremented here or in the calling function? + // (Doing it here saves LoC but may be confusing as it's more hidden). + + return n +} + +// Similar to `readNodeDataBuffer` but it writes the contents to +// an `io.Writer` argument. +// +// TODO: Check what part of the logic between the two functions +// can be extracted away. +func (dr *dagReader) writeNodeDataBuffer(w io.Writer) (int64, error) { + + n, err := dr.currentNodeData.WriteTo(w) + if err != nil { + return n, err + } + + if dr.currentNodeData.Len() == 0 { + dr.currentNodeData = nil + // Signal that the buffer was consumed (for later `Read` calls). + // This shouldn't return an EOF error as it's just the end of a + // single node's data, not the entire DAG. + } + + dr.offset += int64(n) + return n, nil +} + +// WriteTo writes to the given writer. +// This follows the `bytes.Reader.WriteTo` implementation +// where it starts from the internal index that may have +// been modified by other `Read` calls. +// +// TODO: This implementation is very similar to `CtxReadFull`, +// the common parts should be abstracted away. +func (dr *dagReader) WriteTo(w io.Writer) (n int64, err error) { + // Use the internal reader's context to fetch the child node promises + // (see `ipld.NavigableIPLDNode.FetchChild` for details). + dr.dagWalker.SetContext(dr.ctx) + + // If there was a partially read buffer from the last visited + // node read it before visiting a new one. + if dr.currentNodeData != nil { + n, err = dr.writeNodeDataBuffer(w) + if err != nil { + return n, err + } + } + + // Iterate the DAG calling the passed `Visitor` function on every node + // to read its data into the `out` buffer, stop if there is an error or + // if the entire DAG is traversed (`EndOfDag`). + err = dr.dagWalker.Iterate(func(visitedNode ipld.NavigableNode) error { + node := ipld.ExtractIPLDNode(visitedNode) + + // Skip internal nodes, they shouldn't have any file data + // (see the `balanced` package for more details). + if len(node.Links()) > 0 { + return nil + } + + err = dr.saveNodeData(node) + if err != nil { + return err + } + // Save the leaf node file data in a buffer in case it is only + // partially read now and future `CtxReadFull` calls reclaim the + // rest (as each node is visited only once during `Iterate`). + + written, err := dr.writeNodeDataBuffer(w) + n += written + if err != nil { + return err + } + + return nil + }) + + if err == ipld.EndOfDag { + return n, io.EOF + // Reached the end of the (DAG) file, no more data to read. + // TODO: Is this a correct return error for `WriteTo`? + } else if err != nil { + return n, err + // Pass along any other errors from the `Visitor`. + } + + return n, nil +} + +// Close the reader (cancelling fetch node operations requested with +// the internal context, that is, `Read` calls but not `CtxReadFull` +// with user-supplied contexts). +func (dr *dagReader) Close() error { + dr.cancel() + return nil +} + +// Seek implements `io.Seeker` seeking to a given offset in the DAG file, +// it matches the standard unix `seek`. It moves the position of the internal +// `dagWalker` and may also leave a `currentNodeData` buffer loaded in case +// the seek is performed to the middle of the data in a node. +// +// TODO: Support seeking from the current position (relative seek) +// through the `dagWalker` in `io.SeekCurrent`. +func (dr *dagReader) Seek(offset int64, whence int) (int64, error) { + switch whence { + case io.SeekStart: + if offset < 0 { + return -1, errors.New("invalid offset") + } + + if offset == dr.offset { + return offset, nil + // Already at the requested `offset`, nothing to do. + } + + left := offset + // Amount left to seek. + + // Seek from the beginning of the DAG. + dr.resetPosition() + + // Use the internal reader's context to fetch the child node promises + // (see `ipld.NavigableIPLDNode.FetchChild` for details). + dr.dagWalker.SetContext(dr.ctx) + // TODO: Performance: we could adjust here `preloadSize` of + // `ipld.NavigableIPLDNode` also, when seeking we only want + // to fetch one child at a time. + + // Seek the DAG by calling the provided `Visitor` function on every + // node the `dagWalker` descends to while searching which can be + // either an internal or leaf node. In the internal node case, check + // the child node sizes and set the corresponding child index to go + // down to next. In the leaf case (last visit of the search), if there + // is still an amount `left` to seek do it inside the node's data + // saved in the `currentNodeData` buffer, leaving it ready for a `Read` + // call. + err := dr.dagWalker.Seek(func(visitedNode ipld.NavigableNode) error { + node := ipld.ExtractIPLDNode(visitedNode) + + if len(node.Links()) > 0 { + // Internal node, should be a `mdag.ProtoNode` containing a + // `unixfs.FSNode` (see the `balanced` package for more details). + fsNode, err := unixfs.ExtractFSNode(node) + if err != nil { + return err + } + + // If there aren't enough size hints don't seek + // (see the `io.EOF` handling error comment below). + if fsNode.NumChildren() != len(node.Links()) { + return io.EOF + } + + // Internal nodes have no data, so just iterate through the + // sizes of its children (advancing the child index of the + // `dagWalker`) to find where we need to go down to next in + // the search. + for { + childSize := fsNode.BlockSize(int(dr.dagWalker.ActiveChildIndex())) + + if childSize > uint64(left) { + // This child's data contains the position requested + // in `offset`, go down this child. + return nil + } + + // Else, skip this child. + left -= int64(childSize) + err := dr.dagWalker.NextChild() + if err == ipld.ErrNextNoChild { + // No more child nodes available, nothing to do, + // the `Seek` will stop on its own. + return nil + } else if err != nil { + return err + // Pass along any other errors (that may in future + // implementations be returned by `Next`) to stop + // the search. + } + } + + } else { + // Leaf node, seek inside its data. + err := dr.saveNodeData(node) + if err != nil { + return err + } + + _, err = dr.currentNodeData.Seek(left, io.SeekStart) + if err != nil { + return err + } + // The corner case of a DAG consisting only of a single (leaf) + // node should make no difference here. In that case, where the + // node doesn't have a parent UnixFS node with size hints, this + // implementation would allow this `Seek` to be called with an + // argument larger than the buffer size which normally wouldn't + // happen (because we would skip the node based on the size + // hint) but that would just mean that a future `CtxReadFull` + // call would read no data from the `currentNodeData` buffer. + // TODO: Re-check this reasoning. + + return nil + // In the leaf node case the search will stop here. + } + }) + + if err == io.EOF { + // TODO: Taken from https://github.com/ipfs/go-ipfs/pull/4320, + // check if still valid. + // Return negative number if we can't figure out the file size. Using io.EOF + // for this seems to be good(-enough) solution as it's only returned by + // precalcNextBuf when we step out of file range. + // This is needed for gateway to function properly + return -1, nil + } + + if err != nil { + return 0, err + } + + dr.offset = offset + return dr.offset, nil + + case io.SeekCurrent: + if offset == 0 { + return dr.offset, nil + } + + return dr.Seek(dr.offset+offset, io.SeekStart) + // TODO: Performance. This can be improved supporting relative + // searches in the `Walker` (see `Walker.Seek`). + + case io.SeekEnd: + return dr.Seek(int64(dr.Size())-offset, io.SeekStart) + + default: + return 0, errors.New("invalid whence") + } +} + +// Reset the reader position by resetting the `dagWalker` and discarding +// any partially used node's data in the `currentNodeData` buffer, used +// in the `SeekStart` case. +func (dr *dagReader) resetPosition() { + dr.currentNodeData = nil + + dr.dagWalker = ipld.NewWalker(dr.ctx, ipld.NewNavigableIPLDNode(dr.rootNode, dr.serv)) + // TODO: This could be avoided (along with storing the `dr.rootNode` and + // `dr.serv` just for this call) if `Reset` is supported in the `Walker`. } diff --git a/io/dagreader_test.go b/io/dagreader_test.go index 6e1fef8d0..d79d2d1f5 100644 --- a/io/dagreader_test.go +++ b/io/dagreader_test.go @@ -4,7 +4,6 @@ import ( "bytes" "io" "io/ioutil" - "math/rand" "strings" "testing" @@ -73,90 +72,6 @@ func TestSeekAndRead(t *testing.T) { } } -func TestSeekAndReadLarge(t *testing.T) { - dserv := testu.GetDAGServ() - inbuf := make([]byte, 20000) - rand.Read(inbuf) - - node := testu.GetNode(t, dserv, inbuf, testu.UseProtoBufLeaves) - ctx, closer := context.WithCancel(context.Background()) - defer closer() - - reader, err := NewDagReader(ctx, node, dserv) - if err != nil { - t.Fatal(err) - } - - _, err = reader.Seek(10000, io.SeekStart) - if err != nil { - t.Fatal(err) - } - - buf := make([]byte, 100) - _, err = io.ReadFull(reader, buf) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(buf, inbuf[10000:10100]) { - t.Fatal("seeked read failed") - } - - pbdr := reader.(*PBDagReader) - var count int - for i, p := range pbdr.promises { - if i > 20 && i < 30 { - if p == nil { - t.Fatal("expected index to be not nil: ", i) - } - count++ - } else { - if p != nil { - t.Fatal("expected index to be nil: ", i) - } - } - } - // -1 because we read some and it cleared one - if count != preloadSize-1 { - t.Fatalf("expected %d preloaded promises, got %d", preloadSize-1, count) - } -} - -func TestReadAndCancel(t *testing.T) { - dserv := testu.GetDAGServ() - inbuf := make([]byte, 20000) - rand.Read(inbuf) - - node := testu.GetNode(t, dserv, inbuf, testu.UseProtoBufLeaves) - ctx, closer := context.WithCancel(context.Background()) - defer closer() - - reader, err := NewDagReader(ctx, node, dserv) - if err != nil { - t.Fatal(err) - } - - ctx, cancel := context.WithCancel(context.Background()) - buf := make([]byte, 100) - _, err = reader.CtxReadFull(ctx, buf) - if err != nil { - t.Fatal(err) - } - if !bytes.Equal(buf, inbuf[0:100]) { - t.Fatal("read failed") - } - cancel() - - b, err := ioutil.ReadAll(reader) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(inbuf[100:], b) { - t.Fatal("buffers not equal") - } -} - func TestRelativeSeek(t *testing.T) { dserv := testu.GetDAGServ() ctx, closer := context.WithCancel(context.Background()) diff --git a/io/pbdagreader.go b/io/pbdagreader.go deleted file mode 100644 index bea5f496e..000000000 --- a/io/pbdagreader.go +++ /dev/null @@ -1,326 +0,0 @@ -package io - -import ( - "context" - "errors" - "fmt" - "io" - - cid "github.com/ipfs/go-cid" - ipld "github.com/ipfs/go-ipld-format" - mdag "github.com/ipfs/go-merkledag" - ft "github.com/ipfs/go-unixfs" -) - -// PBDagReader provides a way to easily read the data contained in a dag. -type PBDagReader struct { - serv ipld.NodeGetter - - // UnixFS file (it should be of type `Data_File` or `Data_Raw` only). - file *ft.FSNode - - // the current data buffer to be read from - // will either be a bytes.Reader or a child DagReader - buf ReadSeekCloser - - // NodePromises for each of 'nodes' child links - promises []*ipld.NodePromise - - // the cid of each child of the current node - links []cid.Cid - - // the index of the child link currently being read from - linkPosition int - - // current offset for the read head within the 'file' - offset int64 - - // Our context - ctx context.Context - - // context cancel for children - cancel func() -} - -var _ DagReader = (*PBDagReader)(nil) - -// NewPBFileReader constructs a new PBFileReader. -func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, file *ft.FSNode, serv ipld.NodeGetter) *PBDagReader { - fctx, cancel := context.WithCancel(ctx) - curLinks := getLinkCids(n) - return &PBDagReader{ - serv: serv, - buf: NewBufDagReader(file.Data()), - promises: make([]*ipld.NodePromise, len(curLinks)), - links: curLinks, - ctx: fctx, - cancel: cancel, - file: file, - } -} - -const preloadSize = 10 - -func (dr *PBDagReader) preload(ctx context.Context, beg int) { - end := beg + preloadSize - if end >= len(dr.links) { - end = len(dr.links) - } - - copy(dr.promises[beg:], ipld.GetNodes(ctx, dr.serv, dr.links[beg:end])) -} - -// precalcNextBuf follows the next link in line and loads it from the -// DAGService, setting the next buffer to read from -func (dr *PBDagReader) precalcNextBuf(ctx context.Context) error { - if dr.buf != nil { - dr.buf.Close() // Just to make sure - dr.buf = nil - } - - if dr.linkPosition >= len(dr.promises) { - return io.EOF - } - - // If we drop to <= preloadSize/2 preloading nodes, preload the next 10. - for i := dr.linkPosition; i < dr.linkPosition+preloadSize/2 && i < len(dr.promises); i++ { - // TODO: check if canceled. - if dr.promises[i] == nil { - dr.preload(ctx, i) - break - } - } - - nxt, err := dr.promises[dr.linkPosition].Get(ctx) - dr.promises[dr.linkPosition] = nil - switch err { - case nil: - case context.DeadlineExceeded, context.Canceled: - err = ctx.Err() - if err != nil { - return ctx.Err() - } - // In this case, the context used to *preload* the node has been canceled. - // We need to retry the load with our context and we might as - // well preload some extra nodes while we're at it. - // - // Note: When using `Read`, this code will never execute as - // `Read` will use the global context. It only runs if the user - // explicitly reads with a custom context (e.g., by calling - // `CtxReadFull`). - dr.preload(ctx, dr.linkPosition) - nxt, err = dr.promises[dr.linkPosition].Get(ctx) - dr.promises[dr.linkPosition] = nil - if err != nil { - return err - } - default: - return err - } - - dr.linkPosition++ - - return dr.loadBufNode(nxt) -} - -func (dr *PBDagReader) loadBufNode(node ipld.Node) error { - switch node := node.(type) { - case *mdag.ProtoNode: - fsNode, err := ft.FSNodeFromBytes(node.Data()) - if err != nil { - return fmt.Errorf("incorrectly formatted protobuf: %s", err) - } - - switch fsNode.Type() { - case ft.TFile: - dr.buf = NewPBFileReader(dr.ctx, node, fsNode, dr.serv) - return nil - case ft.TRaw: - dr.buf = NewBufDagReader(fsNode.Data()) - return nil - default: - return fmt.Errorf("found %s node in unexpected place", fsNode.Type().String()) - } - case *mdag.RawNode: - dr.buf = NewBufDagReader(node.RawData()) - return nil - default: - return ErrUnkownNodeType - } -} - -func getLinkCids(n ipld.Node) []cid.Cid { - links := n.Links() - out := make([]cid.Cid, 0, len(links)) - for _, l := range links { - out = append(out, l.Cid) - } - return out -} - -// Size return the total length of the data from the DAG structured file. -func (dr *PBDagReader) Size() uint64 { - return dr.file.FileSize() -} - -// Read reads data from the DAG structured file -func (dr *PBDagReader) Read(b []byte) (int, error) { - return dr.CtxReadFull(dr.ctx, b) -} - -// CtxReadFull reads data from the DAG structured file -func (dr *PBDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) { - if dr.buf == nil { - if err := dr.precalcNextBuf(ctx); err != nil { - return 0, err - } - } - - // If no cached buffer, load one - total := 0 - for { - // Attempt to fill bytes from cached buffer - n, err := io.ReadFull(dr.buf, b[total:]) - total += n - dr.offset += int64(n) - switch err { - // io.EOF will happen is dr.buf had noting more to read (n == 0) - case io.EOF, io.ErrUnexpectedEOF: - // do nothing - case nil: - return total, nil - default: - return total, err - } - - // if we are not done with the output buffer load next block - err = dr.precalcNextBuf(ctx) - if err != nil { - return total, err - } - } -} - -// WriteTo writes to the given writer. -func (dr *PBDagReader) WriteTo(w io.Writer) (int64, error) { - if dr.buf == nil { - if err := dr.precalcNextBuf(dr.ctx); err != nil { - return 0, err - } - } - - // If no cached buffer, load one - total := int64(0) - for { - // Attempt to write bytes from cached buffer - n, err := dr.buf.WriteTo(w) - total += n - dr.offset += n - if err != nil { - if err != io.EOF { - return total, err - } - } - - // Otherwise, load up the next block - err = dr.precalcNextBuf(dr.ctx) - if err != nil { - if err == io.EOF { - return total, nil - } - return total, err - } - } -} - -// Close closes the reader. -func (dr *PBDagReader) Close() error { - dr.cancel() - return nil -} - -// Seek implements io.Seeker, and will seek to a given offset in the file -// interface matches standard unix seek -// TODO: check if we can do relative seeks, to reduce the amount of dagreader -// recreations that need to happen. -func (dr *PBDagReader) Seek(offset int64, whence int) (int64, error) { - switch whence { - case io.SeekStart: - if offset < 0 { - return -1, errors.New("invalid offset") - } - if offset == dr.offset { - return offset, nil - } - - // left represents the number of bytes remaining to seek to (from beginning) - left := offset - if int64(len(dr.file.Data())) >= offset { - // Close current buf to close potential child dagreader - if dr.buf != nil { - dr.buf.Close() - } - dr.buf = NewBufDagReader(dr.file.Data()[offset:]) - - // start reading links from the beginning - dr.linkPosition = 0 - dr.offset = offset - return offset, nil - } - - // skip past root block data - left -= int64(len(dr.file.Data())) - - // iterate through links and find where we need to be - for i := 0; i < dr.file.NumChildren(); i++ { - if dr.file.BlockSize(i) > uint64(left) { - dr.linkPosition = i - break - } else { - left -= int64(dr.file.BlockSize(i)) - } - } - - // start sub-block request - err := dr.precalcNextBuf(dr.ctx) - if err != nil { - return 0, err - } - - // set proper offset within child readseeker - n, err := dr.buf.Seek(left, io.SeekStart) - if err != nil { - return -1, err - } - - // sanity - left -= n - if left != 0 { - return -1, errors.New("failed to seek properly") - } - dr.offset = offset - return offset, nil - case io.SeekCurrent: - // TODO: be smarter here - if offset == 0 { - return dr.offset, nil - } - - noffset := dr.offset + offset - return dr.Seek(noffset, io.SeekStart) - case io.SeekEnd: - noffset := int64(dr.file.FileSize()) - offset - n, err := dr.Seek(noffset, io.SeekStart) - - // Return negative number if we can't figure out the file size. Using io.EOF - // for this seems to be good(-enough) solution as it's only returned by - // precalcNextBuf when we step out of file range. - // This is needed for gateway to function properly - if err == io.EOF && dr.file.Type() == ft.TFile { - return -1, nil - } - return n, err - default: - return 0, errors.New("invalid whence") - } -} diff --git a/mod/dagmodifier_test.go b/mod/dagmodifier_test.go index b61369362..9870b2022 100644 --- a/mod/dagmodifier_test.go +++ b/mod/dagmodifier_test.go @@ -453,6 +453,11 @@ func TestDagSync(t *testing.T) { t.Fatal(err) } + _, err = dagmod.Seek(0, io.SeekStart) + if err != nil { + t.Fatal(err) + } + out, err := ioutil.ReadAll(dagmod) if err != nil { t.Fatal(err) diff --git a/unixfs.go b/unixfs.go index 4ee755186..84caf6f44 100644 --- a/unixfs.go +++ b/unixfs.go @@ -5,9 +5,9 @@ package unixfs import ( "errors" + "fmt" proto "github.com/gogo/protobuf/proto" - dag "github.com/ipfs/go-merkledag" ipld "github.com/ipfs/go-ipld-format" @@ -355,3 +355,54 @@ func BytesForMetadata(m *Metadata) ([]byte, error) { func EmptyDirNode() *dag.ProtoNode { return dag.NodeWithData(FolderPBData()) } + +// ReadUnixFSNodeData extracts the UnixFS data from an IPLD node. +// Raw nodes are (also) processed because they are used as leaf +// nodes containing (only) UnixFS data. +func ReadUnixFSNodeData(node ipld.Node) (data []byte, err error) { + switch node := node.(type) { + + case *dag.ProtoNode: + fsNode, err := FSNodeFromBytes(node.Data()) + if err != nil { + return nil, fmt.Errorf("incorrectly formatted protobuf: %s", err) + } + + switch fsNode.Type() { + case pb.Data_File, pb.Data_Raw: + return fsNode.Data(), nil + // Only leaf nodes (of type `Data_Raw`) contain data but due to a + // bug the `Data_File` type (normally used for internal nodes) is + // also used for leaf nodes, so both types are accepted here + // (see the `balanced` package for more details). + default: + return nil, fmt.Errorf("found %s node in unexpected place", + fsNode.Type().String()) + } + + case *dag.RawNode: + return node.RawData(), nil + + default: + return nil, ErrUnrecognizedType + // TODO: To avoid rewriting the error message, but a different error from + // `unixfs.ErrUnrecognizedType` should be used (defining it in the + // `merkledag` or `go-ipld-format` packages). + } +} + +// Extract the `unixfs.FSNode` from the `ipld.Node` (assuming this +// was implemented by a `mdag.ProtoNode`). +func ExtractFSNode(node ipld.Node) (*FSNode, error) { + protoNode, ok := node.(*dag.ProtoNode) + if !ok { + return nil, errors.New("expected a ProtoNode as internal node") + } + + fsNode, err := FSNodeFromBytes(protoNode.Data()) + if err != nil { + return nil, err + } + + return fsNode, nil +}