Skip to content

Commit

Permalink
clear out memory after reads from the dagreader
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
  • Loading branch information
whyrusleeping committed Dec 28, 2017
1 parent 5ad9f4d commit b8b54a1
Showing 1 changed file with 56 additions and 4 deletions.
60 changes: 56 additions & 4 deletions unixfs/io/pbdagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
ft "github.com/ipfs/go-ipfs/unixfs"
ftpb "github.com/ipfs/go-ipfs/unixfs/pb"

cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
)

Expand All @@ -30,6 +32,9 @@ type pbDagReader struct {
// NodeGetters for each of 'nodes' child links
promises []mdag.NodeGetter

// the cid of each child of the current node
links []*cid.Cid

// the index of the child link currently being read from
linkPosition int

Expand All @@ -47,30 +52,54 @@ var _ DagReader = (*pbDagReader)(nil)

func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.DAGService) *pbDagReader {
fctx, cancel := context.WithCancel(ctx)
promises := mdag.GetDAG(fctx, serv, n)
curLinks := getLinkCids(n)
return &pbDagReader{
node: n,
serv: serv,
buf: NewBufDagReader(pb.GetData()),
promises: promises,
promises: make([]mdag.NodeGetter, len(curLinks)),
links: curLinks,
ctx: fctx,
cancel: cancel,
pbdata: pb,
}
}

const preloadSize = 10

func (dr *pbDagReader) preloadNextNodes(ctx context.Context) {
beg := dr.linkPosition
end := beg + preloadSize
if end >= len(dr.links) {
end = len(dr.links)
}

for i, p := range mdag.GetNodes(ctx, dr.serv, dr.links[beg:end]) {
dr.promises[beg+i] = p
}
}

// precalcNextBuf follows the next link in line and loads it from the
// DAGService, setting the next buffer to read from
func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error {
dr.buf.Close() // Just to make sure
if dr.buf != nil {
dr.buf.Close() // Just to make sure
dr.buf = nil
}

if dr.linkPosition >= len(dr.promises) {
return io.EOF
}

if dr.promises[dr.linkPosition] == nil {
dr.preloadNextNodes(ctx)
}

nxt, err := dr.promises[dr.linkPosition].Get(ctx)
if err != nil {
return err
}
dr.promises[dr.linkPosition] = nil
dr.linkPosition++

switch nxt := nxt.(type) {
Expand Down Expand Up @@ -105,6 +134,15 @@ func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error {
}
}

func getLinkCids(n node.Node) []*cid.Cid {
links := n.Links()
out := make([]*cid.Cid, 0, len(links))
for _, l := range links {
out = append(out, l.Cid)
}
return out
}

// Size return the total length of the data from the DAG structured file.
func (dr *pbDagReader) Size() uint64 {
return dr.pbdata.GetFilesize()
Expand All @@ -117,6 +155,12 @@ func (dr *pbDagReader) Read(b []byte) (int, error) {

// CtxReadFull reads data from the DAG structured file
func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
if dr.buf == nil {
if err := dr.precalcNextBuf(ctx); err != nil {
return 0, err
}
}

// If no cached buffer, load one
total := 0
for {
Expand Down Expand Up @@ -145,6 +189,12 @@ func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
}

func (dr *pbDagReader) WriteTo(w io.Writer) (int64, error) {
if dr.buf == nil {
if err := dr.precalcNextBuf(dr.ctx); err != nil {
return 0, err
}
}

// If no cached buffer, load one
total := int64(0)
for {
Expand Down Expand Up @@ -199,7 +249,9 @@ func (dr *pbDagReader) Seek(offset int64, whence int) (int64, error) {
left := offset
if int64(len(pb.Data)) >= offset {
// Close current buf to close potential child dagreader
dr.buf.Close()
if dr.buf != nil {
dr.buf.Close()
}
dr.buf = NewBufDagReader(pb.GetData()[offset:])

// start reading links from the beginning
Expand Down

0 comments on commit b8b54a1

Please sign in to comment.