diff --git a/unixfs/io/pbdagreader.go b/unixfs/io/pbdagreader.go index dcd383460d0..56a502e444c 100644 --- a/unixfs/io/pbdagreader.go +++ b/unixfs/io/pbdagreader.go @@ -10,6 +10,8 @@ import ( ft "github.com/ipfs/go-ipfs/unixfs" ftpb "github.com/ipfs/go-ipfs/unixfs/pb" + cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" + node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format" proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" ) @@ -30,6 +32,9 @@ type pbDagReader struct { // NodeGetters for each of 'nodes' child links promises []mdag.NodeGetter + // the cid of each child of the current node + links []*cid.Cid + // the index of the child link currently being read from linkPosition int @@ -47,30 +52,54 @@ var _ DagReader = (*pbDagReader)(nil) func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.DAGService) *pbDagReader { fctx, cancel := context.WithCancel(ctx) - promises := mdag.GetDAG(fctx, serv, n) + curLinks := getLinkCids(n) return &pbDagReader{ node: n, serv: serv, buf: NewBufDagReader(pb.GetData()), - promises: promises, + promises: make([]mdag.NodeGetter, len(curLinks)), + links: curLinks, ctx: fctx, cancel: cancel, pbdata: pb, } } +const preloadSize = 10 + +func (dr *pbDagReader) preloadNextNodes(ctx context.Context) { + beg := dr.linkPosition + end := beg + preloadSize + if end >= len(dr.links) { + end = len(dr.links) + } + + for i, p := range mdag.GetNodes(ctx, dr.serv, dr.links[beg:end]) { + dr.promises[beg+i] = p + } +} + // precalcNextBuf follows the next link in line and loads it from the // DAGService, setting the next buffer to read from func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error { - dr.buf.Close() // Just to make sure + if dr.buf != nil { + dr.buf.Close() // Just to make sure + dr.buf = nil + } + if dr.linkPosition >= len(dr.promises) { return io.EOF } + if dr.promises[dr.linkPosition] == nil { + dr.preloadNextNodes(ctx) + } + nxt, err := dr.promises[dr.linkPosition].Get(ctx) if err != nil { return err } + dr.promises[dr.linkPosition] = nil dr.linkPosition++ switch nxt := nxt.(type) { @@ -105,6 +134,15 @@ func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error { } } +func getLinkCids(n node.Node) []*cid.Cid { + links := n.Links() + out := make([]*cid.Cid, 0, len(links)) + for _, l := range links { + out = append(out, l.Cid) + } + return out +} + // Size return the total length of the data from the DAG structured file. func (dr *pbDagReader) Size() uint64 { return dr.pbdata.GetFilesize() @@ -117,6 +155,12 @@ func (dr *pbDagReader) Read(b []byte) (int, error) { // CtxReadFull reads data from the DAG structured file func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) { + if dr.buf == nil { + if err := dr.precalcNextBuf(ctx); err != nil { + return 0, err + } + } + // If no cached buffer, load one total := 0 for { @@ -145,6 +189,12 @@ func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) { } func (dr *pbDagReader) WriteTo(w io.Writer) (int64, error) { + if dr.buf == nil { + if err := dr.precalcNextBuf(dr.ctx); err != nil { + return 0, err + } + } + // If no cached buffer, load one total := int64(0) for { @@ -199,7 +249,9 @@ func (dr *pbDagReader) Seek(offset int64, whence int) (int64, error) { left := offset if int64(len(pb.Data)) >= offset { // Close current buf to close potential child dagreader - dr.buf.Close() + if dr.buf != nil { + dr.buf.Close() + } dr.buf = NewBufDagReader(pb.GetData()[offset:]) // start reading links from the beginning