From 9474a4b56fd37cc80a2ff46e2aceb02b5d34d6c2 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Wed, 15 Oct 2014 19:31:27 -0400 Subject: [PATCH 1/2] convert DAGService to an interface --- core/core.go | 4 ++-- core/mock.go | 2 +- merkledag/merkledag.go | 27 +++++++++++++++++++-------- path/path.go | 2 +- unixfs/io/dagmodifier.go | 4 ++-- unixfs/io/dagreader.go | 4 ++-- unixfs/io/dagwriter.go | 4 ++-- 7 files changed, 29 insertions(+), 18 deletions(-) diff --git a/core/core.go b/core/core.go index d22390d9296..bb518649dee 100644 --- a/core/core.go +++ b/core/core.go @@ -58,7 +58,7 @@ type IpfsNode struct { Blocks *bserv.BlockService // the merkle dag service, get/add objects. - DAG *merkledag.DAGService + DAG merkledag.DAGService // the path resolution system Resolver *path.Resolver @@ -157,7 +157,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { return nil, err } - dag := &merkledag.DAGService{Blocks: bs} + dag := merkledag.NewDAGService(bs) ns := namesys.NewNameSystem(route) success = true diff --git a/core/mock.go b/core/mock.go index 9eececa3dac..3c2930ba413 100644 --- a/core/mock.go +++ b/core/mock.go @@ -40,7 +40,7 @@ func NewMockNode() (*IpfsNode, error) { return nil, err } - nd.DAG = &mdag.DAGService{bserv} + nd.DAG = mdag.NewDAGService(bserv) // Namespace resolver nd.Namesys = nsys.NewNameSystem(dht) diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 46b0c408911..ab82a678385 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -140,20 +140,31 @@ func (n *Node) Key() (u.Key, error) { } // DAGService is an IPFS Merkle DAG service. +type DAGService interface { + Add(*Node) (u.Key, error) + AddRecursive(*Node) error + Get(u.Key) (*Node, error) +} + +func NewDAGService(bs *bserv.BlockService) DAGService { + return &dagService{bs} +} + +// dagService is an IPFS Merkle DAG service. // - the root is virtual (like a forest) // - stores nodes' data in a BlockService // TODO: should cache Nodes that are in memory, and be // able to free some of them when vm pressure is high -type DAGService struct { +type dagService struct { Blocks *bserv.BlockService } -// Add adds a node to the DAGService, storing the block in the BlockService -func (n *DAGService) Add(nd *Node) (u.Key, error) { +// Add adds a node to the dagService, storing the block in the BlockService +func (n *dagService) Add(nd *Node) (u.Key, error) { k, _ := nd.Key() log.Debug("DagService Add [%s]", k) if n == nil { - return "", fmt.Errorf("DAGService is nil") + return "", fmt.Errorf("dagService is nil") } d, err := nd.Encoded(false) @@ -171,7 +182,7 @@ func (n *DAGService) Add(nd *Node) (u.Key, error) { return n.Blocks.AddBlock(b) } -func (n *DAGService) AddRecursive(nd *Node) error { +func (n *dagService) AddRecursive(nd *Node) error { _, err := n.Add(nd) if err != nil { log.Info("AddRecursive Error: %s\n", err) @@ -190,10 +201,10 @@ func (n *DAGService) AddRecursive(nd *Node) error { return nil } -// Get retrieves a node from the DAGService, fetching the block in the BlockService -func (n *DAGService) Get(k u.Key) (*Node, error) { +// Get retrieves a node from the dagService, fetching the block in the BlockService +func (n *dagService) Get(k u.Key) (*Node, error) { if n == nil { - return nil, fmt.Errorf("DAGService is nil") + return nil, fmt.Errorf("dagService is nil") } b, err := n.Blocks.GetBlock(k) diff --git a/path/path.go b/path/path.go index 03c1a481e5e..cb1061d1185 100644 --- a/path/path.go +++ b/path/path.go @@ -15,7 +15,7 @@ var log = u.Logger("path") // Resolver provides path resolution to IPFS // It has a pointer to a DAGService, which is uses to resolve nodes. type Resolver struct { - DAG *merkledag.DAGService + DAG merkledag.DAGService } // ResolvePath fetches the node for given path. It uses the first diff --git a/unixfs/io/dagmodifier.go b/unixfs/io/dagmodifier.go index 8680da46a27..64d12074010 100644 --- a/unixfs/io/dagmodifier.go +++ b/unixfs/io/dagmodifier.go @@ -16,14 +16,14 @@ import ( // perform surgery on a DAG 'file' // Dear god, please rename this to something more pleasant type DagModifier struct { - dagserv *mdag.DAGService + dagserv mdag.DAGService curNode *mdag.Node pbdata *ft.PBData splitter chunk.BlockSplitter } -func NewDagModifier(from *mdag.Node, serv *mdag.DAGService, spl chunk.BlockSplitter) (*DagModifier, error) { +func NewDagModifier(from *mdag.Node, serv mdag.DAGService, spl chunk.BlockSplitter) (*DagModifier, error) { pbd, err := ft.FromBytes(from.Data) if err != nil { return nil, err diff --git a/unixfs/io/dagreader.go b/unixfs/io/dagreader.go index 29196a1e3f1..f88ccca259b 100644 --- a/unixfs/io/dagreader.go +++ b/unixfs/io/dagreader.go @@ -15,7 +15,7 @@ var ErrIsDir = errors.New("this dag node is a directory") // DagReader provides a way to easily read the data contained in a dag. type DagReader struct { - serv *mdag.DAGService + serv mdag.DAGService node *mdag.Node position int buf *bytes.Buffer @@ -23,7 +23,7 @@ type DagReader struct { // NewDagReader creates a new reader object that reads the data represented by the given // node, using the passed in DAGService for data retreival -func NewDagReader(n *mdag.Node, serv *mdag.DAGService) (io.Reader, error) { +func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) { pb := new(ft.PBData) err := proto.Unmarshal(n.Data, pb) if err != nil { diff --git a/unixfs/io/dagwriter.go b/unixfs/io/dagwriter.go index 4abb1b36c76..c9b91cc589a 100644 --- a/unixfs/io/dagwriter.go +++ b/unixfs/io/dagwriter.go @@ -10,7 +10,7 @@ import ( var log = util.Logger("dagwriter") type DagWriter struct { - dagserv *dag.DAGService + dagserv dag.DAGService node *dag.Node totalSize int64 splChan chan []byte @@ -19,7 +19,7 @@ type DagWriter struct { seterr error } -func NewDagWriter(ds *dag.DAGService, splitter chunk.BlockSplitter) *DagWriter { +func NewDagWriter(ds dag.DAGService, splitter chunk.BlockSplitter) *DagWriter { dw := new(DagWriter) dw.dagserv = ds dw.splChan = make(chan []byte, 8) From caf5b3270a441ede0b6d53ffb0bdaffad283cc60 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Fri, 17 Oct 2014 17:27:39 -0400 Subject: [PATCH 2/2] Splitter interface sin channels --- importer/chunk/rabin.go | 138 +++++++++++++---------- importer/chunk/splitting.go | 73 ++++++++----- importer/importer.go | 22 +++- importer/importer_test.go | 32 ++++-- unixfs/io/dagmodifier.go | 15 ++- unixfs/io/dagmodifier_test.go | 18 +-- unixfs/io/dagwriter.go | 200 +++++++++++++++++++++++----------- unixfs/io/dagwriter_test.go | 55 +++++++++- 8 files changed, 374 insertions(+), 179 deletions(-) diff --git a/importer/chunk/rabin.go b/importer/chunk/rabin.go index fbfb4cec41e..78db7d97bc8 100644 --- a/importer/chunk/rabin.go +++ b/importer/chunk/rabin.go @@ -3,7 +3,6 @@ package chunk import ( "bufio" "bytes" - "fmt" "io" "math" ) @@ -13,6 +12,13 @@ type MaybeRabin struct { windowSize int MinBlockSize int MaxBlockSize int + inbuf *bufio.Reader + buf bytes.Buffer + window []byte // Window is a circular buffer + wi int // window index + rollingHash int + an int + readers []io.Reader } func NewMaybeRabin(avgBlkSize int) *MaybeRabin { @@ -22,73 +28,87 @@ func NewMaybeRabin(avgBlkSize int) *MaybeRabin { rb.windowSize = 16 // probably a good number... rb.MinBlockSize = avgBlkSize / 2 rb.MaxBlockSize = (avgBlkSize / 2) * 3 + rb.window = make([]byte, rb.windowSize) + rb.an = 1 return rb } -func (mr *MaybeRabin) Split(r io.Reader) chan []byte { - out := make(chan []byte, 16) - go func() { - inbuf := bufio.NewReader(r) - blkbuf := new(bytes.Buffer) +func (mr *MaybeRabin) push(val byte) (outval int) { + outval = int(mr.window[mr.wi%len(mr.window)]) + mr.window[mr.wi%len(mr.window)] = val + return +} - // some bullshit numbers i made up - a := 10 // honestly, no idea what this is - MOD := 33554383 // randomly chosen (seriously) - an := 1 - rollingHash := 0 +// Duplicate byte slice +func dup(b []byte) []byte { + d := make([]byte, len(b)) + copy(d, b) + return d +} - // Window is a circular buffer - window := make([]byte, mr.windowSize) - push := func(i int, val byte) (outval int) { - outval = int(window[i%len(window)]) - window[i%len(window)] = val - return - } +func (mr *MaybeRabin) nextReader() ([]byte, error) { + if len(mr.readers) == 0 { + mr.inbuf = nil + return mr.buf.Bytes(), nil + } + ri := len(mr.readers) - 1 + mr.inbuf = bufio.NewReader(mr.readers[ri]) + mr.readers = mr.readers[:ri] + return mr.Next() +} - // Duplicate byte slice - dup := func(b []byte) []byte { - d := make([]byte, len(b)) - copy(d, b) - return d - } +func (mr *MaybeRabin) Next() ([]byte, error) { + if mr.inbuf == nil { + return nil, io.EOF + } - // Fill up the window - i := 0 - for ; i < mr.windowSize; i++ { - b, err := inbuf.ReadByte() - if err != nil { - fmt.Println(err) - return - } - blkbuf.WriteByte(b) - push(i, b) - rollingHash = (rollingHash*a + int(b)) % MOD - an = (an * a) % MOD - } + // some bullshit numbers i made up + a := 10 // honestly, no idea what this is + MOD := 33554383 // randomly chosen (seriously) - for ; true; i++ { - b, err := inbuf.ReadByte() - if err != nil { - break - } - outval := push(i, b) - blkbuf.WriteByte(b) - rollingHash = (rollingHash*a + int(b) - an*outval) % MOD - if (rollingHash&mr.mask == mr.mask && blkbuf.Len() > mr.MinBlockSize) || - blkbuf.Len() >= mr.MaxBlockSize { - out <- dup(blkbuf.Bytes()) - blkbuf.Reset() + var b byte + var err error + // Fill up the window + for ; mr.wi < mr.windowSize; mr.wi++ { + b, err = mr.inbuf.ReadByte() + if err != nil { + if err == io.EOF { + return mr.nextReader() } + return nil, err + } + mr.buf.WriteByte(b) + mr.push(b) + mr.rollingHash = (mr.rollingHash*a + int(b)) % MOD + mr.an = (mr.an * a) % MOD + } - // Check if there are enough remaining - peek, err := inbuf.Peek(mr.windowSize) - if err != nil || len(peek) != mr.windowSize { - break - } + for ; true; mr.wi++ { + b, err = mr.inbuf.ReadByte() + if err != nil { + break + } + outval := mr.push(b) + mr.buf.WriteByte(b) + mr.rollingHash = (mr.rollingHash*a + int(b) - mr.an*outval) % MOD + if (mr.rollingHash&mr.mask == mr.mask && mr.buf.Len() > mr.MinBlockSize) || mr.buf.Len() >= mr.MaxBlockSize { + block := dup(mr.buf.Bytes()) + mr.buf.Reset() + return block, nil } - io.Copy(blkbuf, inbuf) - out <- blkbuf.Bytes() - close(out) - }() - return out + } + if err == io.EOF { + return mr.nextReader() + } + return nil, err +} + +func (mr *MaybeRabin) Size() int { return mr.MaxBlockSize } + +func (mr *MaybeRabin) Push(r io.Reader) { + if mr.inbuf == nil { + mr.inbuf = bufio.NewReader(r) + } else { + mr.readers = append(mr.readers, r) + } } diff --git a/importer/chunk/splitting.go b/importer/chunk/splitting.go index 0b5717eaf6e..a43d94c2e85 100644 --- a/importer/chunk/splitting.go +++ b/importer/chunk/splitting.go @@ -8,38 +8,59 @@ import ( var log = util.Logger("chunk") -var DefaultSplitter = &SizeSplitter{1024 * 512} +var DefaultSplitter = &SizeSplitter{size: 1024 * 512} +// BlockSplitter is the interface to a block splitting algorithm. type BlockSplitter interface { - Split(r io.Reader) chan []byte + + // Size returns the maximum block size that this BlockSplitter may produce, + // or the maximum amount of data the BlockSplitter may buffer, + // whichever is larger. + Size() int + + // Next returns a block split from the underlying reader. + // io.EOF is returned when the both last Reader and any splitting buffers + // are exausted. + Next() ([]byte, error) + + // Push causes the Reader to start reading from a new io.Reader. + // When an EOF error is seen from the new io.Reader, it is popped + // and the Reader continues to read from the next most recent io.Reader. + Push(io.Reader) } type SizeSplitter struct { - Size int + size int + readers []io.Reader } -func (ss *SizeSplitter) Split(r io.Reader) chan []byte { - out := make(chan []byte) - go func() { - defer close(out) - for { - chunk := make([]byte, ss.Size) - nread, err := r.Read(chunk) - if err != nil { - if err == io.EOF { - if nread > 0 { - out <- chunk[:nread] - } - return - } - log.Error("Block split error: %s", err) - return - } - if nread < ss.Size { - chunk = chunk[:nread] - } - out <- chunk +func (ss *SizeSplitter) Size() int { return ss.size } + +func (ss *SizeSplitter) Next() (b []byte, err error) { + b = make([]byte, ss.size) + + var n, N, ri int + for len(ss.readers) > 0 { + ri = len(ss.readers) - 1 + N, err = ss.readers[ri].Read(b[n:]) + n += N + if err == io.EOF { + ss.readers = ss.readers[:ri] + err = nil } - }() - return out + if n == ss.size { + return + } + } + if n == 0 { + return nil, io.EOF + } + b = b[:n] + return +} + +func (ss *SizeSplitter) Push(r io.Reader) { + ss.readers = append(ss.readers, r) } + +func NewSizeSplitter(size int) *SizeSplitter { return &SizeSplitter{size: size} } diff --git a/importer/importer.go b/importer/importer.go index 0a4d9848e4e..d140f6cbd97 100644 --- a/importer/importer.go +++ b/importer/importer.go @@ -29,15 +29,29 @@ func NewDagFromReader(r io.Reader) (*dag.Node, error) { } func NewDagFromReaderWithSplitter(r io.Reader, spl chunk.BlockSplitter) (*dag.Node, error) { - blkChan := spl.Split(r) - first := <-blkChan + // get rid of the r argument and push the r into spl up in NewDagFromReader + spl.Push(r) + + first, err := spl.Next() + if err != nil { + return nil, err + } + root := &dag.Node{} mbf := new(ft.MultiBlock) - for blk := range blkChan { + for { + blk, err := spl.Next() + if err != nil { + if err == io.EOF { + break + } + return nil, err + } + mbf.AddBlockSize(uint64(len(blk))) child := &dag.Node{Data: ft.WrapData(blk)} - err := root.AddNodeLink("", child) + err = root.AddNodeLink("", child) if err != nil { return nil, err } diff --git a/importer/importer_test.go b/importer/importer_test.go index dd52f9e1fc3..24752a52710 100644 --- a/importer/importer_test.go +++ b/importer/importer_test.go @@ -38,9 +38,9 @@ func TestBuildDag(t *testing.T) { //Test where calls to read are smaller than the chunk size func TestSizeBasedSplit(t *testing.T) { - bs := &chunk.SizeSplitter{512} + bs := chunk.NewSizeSplitter(512) testFileConsistency(t, bs, 32*512) - bs = &chunk.SizeSplitter{4096} + bs = chunk.NewSizeSplitter(4096) testFileConsistency(t, bs, 32*4096) // Uneven offset @@ -94,17 +94,25 @@ func TestMaybeRabinConsistency(t *testing.T) { } func TestRabinBlockSize(t *testing.T) { - buf := new(bytes.Buffer) nbytes := 1024 * 1024 - io.CopyN(buf, rand.Reader, int64(nbytes)) rab := chunk.NewMaybeRabin(4096) - blkch := rab.Split(buf) - - var blocks [][]byte - for b := range blkch { - blocks = append(blocks, b) + rab.Push( + &io.LimitedReader{ + R: rand.Reader, + N: int64(nbytes), + }, + ) + + var nblocks int + for { + _, err := rab.Next() + if err != nil { + if err == io.EOF { + break + } + t.Fatal(err.Error()) + } + nblocks = nblocks + 1 } - - fmt.Printf("Avg block size: %d\n", nbytes/len(blocks)) - + fmt.Printf("Avg block size: %d\n", nbytes/nblocks) } diff --git a/unixfs/io/dagmodifier.go b/unixfs/io/dagmodifier.go index 64d12074010..d75c956c74b 100644 --- a/unixfs/io/dagmodifier.go +++ b/unixfs/io/dagmodifier.go @@ -3,6 +3,7 @@ package io import ( "bytes" "errors" + "io" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" @@ -173,10 +174,20 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) { // splitBytes uses a splitterFunc to turn a large array of bytes // into many smaller arrays of bytes func splitBytes(b []byte, spl chunk.BlockSplitter) [][]byte { - out := spl.Split(bytes.NewReader(b)) + spl.Push(bytes.NewReader(b)) + + var blk []byte var arr [][]byte - for blk := range out { + var err error + for { + blk, err = spl.Next() arr = append(arr, blk) + if err == io.EOF { + break + } + if err != nil { + panic(err.Error()) + } } return arr } diff --git a/unixfs/io/dagmodifier_test.go b/unixfs/io/dagmodifier_test.go index 32d9a84b5da..bca0d71ca68 100644 --- a/unixfs/io/dagmodifier_test.go +++ b/unixfs/io/dagmodifier_test.go @@ -16,17 +16,17 @@ import ( ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" ) -func getMockDagServ(t *testing.T) *mdag.DAGService { +func getMockDagServ(t *testing.T) mdag.DAGService { dstore := ds.NewMapDatastore() bserv, err := bs.NewBlockService(dstore, nil) if err != nil { t.Fatal(err) } - return &mdag.DAGService{bserv} + return mdag.NewDAGService(bserv) } -func getNode(t *testing.T, dserv *mdag.DAGService, size int64) ([]byte, *mdag.Node) { - dw := NewDagWriter(dserv, &chunk.SizeSplitter{500}) +func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) { + dw := NewDagWriter(dserv, chunk.NewSizeSplitter(500)) n, err := io.CopyN(dw, u.NewFastRand(), size) if err != nil { @@ -36,7 +36,11 @@ func getNode(t *testing.T, dserv *mdag.DAGService, size int64) ([]byte, *mdag.No t.Fatal("Incorrect copy amount!") } - dw.Close() + err = dw.Close() + if err != nil { + t.Fatal("DagWriter failed to close,", err) + } + node := dw.GetNode() dr, err := NewDagReader(node, dserv) @@ -99,7 +103,7 @@ func TestDagModifierBasic(t *testing.T) { dserv := getMockDagServ(t) b, n := getNode(t, dserv, 50000) - dagmod, err := NewDagModifier(n, dserv, &chunk.SizeSplitter{512}) + dagmod, err := NewDagModifier(n, dserv, chunk.NewSizeSplitter(512)) if err != nil { t.Fatal(err) } @@ -150,7 +154,7 @@ func TestMultiWrite(t *testing.T) { dserv := getMockDagServ(t) _, n := getNode(t, dserv, 0) - dagmod, err := NewDagModifier(n, dserv, &chunk.SizeSplitter{512}) + dagmod, err := NewDagModifier(n, dserv, chunk.NewSizeSplitter(512)) if err != nil { t.Fatal(err) } diff --git a/unixfs/io/dagwriter.go b/unixfs/io/dagwriter.go index c9b91cc589a..480c76c850c 100644 --- a/unixfs/io/dagwriter.go +++ b/unixfs/io/dagwriter.go @@ -1,6 +1,9 @@ package io import ( + "bytes" + "io" + "github.com/jbenet/go-ipfs/importer/chunk" dag "github.com/jbenet/go-ipfs/merkledag" ft "github.com/jbenet/go-ipfs/unixfs" @@ -10,95 +13,166 @@ import ( var log = util.Logger("dagwriter") type DagWriter struct { - dagserv dag.DAGService - node *dag.Node - totalSize int64 - splChan chan []byte - done chan struct{} - splitter chunk.BlockSplitter - seterr error + buf bytes.Buffer + first []byte + mbf ft.MultiBlock + root dag.Node + + dagserv dag.DAGService + splitter chunk.BlockSplitter + + node *dag.Node + seterr error } func NewDagWriter(ds dag.DAGService, splitter chunk.BlockSplitter) *DagWriter { - dw := new(DagWriter) - dw.dagserv = ds - dw.splChan = make(chan []byte, 8) - dw.splitter = splitter - dw.done = make(chan struct{}) - go dw.startSplitter() + dw := &DagWriter{ + dagserv: ds, + splitter: splitter, + } + dw.splitter.Push(&dw.buf) return dw } -// startSplitter manages splitting incoming bytes and -// creating dag nodes from them. Created nodes are stored -// in the DAGService and then released to the GC. -func (dw *DagWriter) startSplitter() { - - // Since the splitter functions take a reader (and should!) - // we wrap our byte chan input in a reader - r := util.NewByteChanReader(dw.splChan) - blkchan := dw.splitter.Split(r) - - // First data block is reserved for storage in the root node - first := <-blkchan - mbf := new(ft.MultiBlock) - root := new(dag.Node) - - for blkData := range blkchan { - // Store the block size in the root node - mbf.AddBlockSize(uint64(len(blkData))) - node := &dag.Node{Data: ft.WrapData(blkData)} - _, err := dw.dagserv.Add(node) +func (dw *DagWriter) processBlock(blk []byte) (err error) { + // Store the block size in the root node + dw.mbf.AddBlockSize(uint64(len(blk))) + node := &dag.Node{Data: ft.WrapData(blk)} + _, err = dw.dagserv.Add(node) + if err != nil { + dw.seterr = err + log.Critical("Got error adding created node to dagservice: %s", err) + return + } + + // Add a link to this node without storing a reference to the memory + err = dw.root.AddNodeLinkClean("", node) + if err != nil { + dw.seterr = err + log.Critical("Got error adding created node to root node: %s", err) + } + return +} + +func (dw *DagWriter) next() error { + var err error + if dw.first == nil { + dw.first, err = dw.splitter.Next() + return err + } + + // splitter should not return an error when using dw.buf + blk, _ := dw.splitter.Next() + return dw.processBlock(blk) +} + +func (dw *DagWriter) Write(b []byte) (n int, err error) { + if dw.seterr != nil { + return 0, dw.seterr + } + + var N, max int + for len(b) != 0 { + max = dw.splitter.Size() + if len(b) > max { + N, err = dw.buf.Write(b[:max]) + } else { + N, err = dw.buf.Write(b) + } + b = b[N:] + n += N + + if dw.buf.Len() >= max { + err = dw.next() + if err != nil { + return + } + } + } + return +} + +// ReadFrom reads data from r until EOF or error. +// The return value n is the number of bytes read. +// Any error except io.EOF encountered during the +// read is also returned. +// +// The io.Copy function uses ReaderFrom if available. +func (dw *DagWriter) ReadFrom(r io.Reader) (n int64, err error) { + // flush out buffer + for dw.buf.Len() != 0 { + err = dw.next() + if err != nil { + return + } + } + + dw.splitter.Push(r) + + if dw.first == nil { + dw.first, err = dw.splitter.Next() + n += int64(len(dw.first)) if err != nil { - dw.seterr = err - log.Critical("Got error adding created node to dagservice: %s", err) + if err == io.EOF { + return n, nil + } return } + } + var blk []byte + for { + blk, err = dw.splitter.Next() + n += int64(len(blk)) + if err == nil { + err = dw.processBlock(blk) + } - // Add a link to this node without storing a reference to the memory - err = root.AddNodeLinkClean("", node) if err != nil { - dw.seterr = err - log.Critical("Got error adding created node to root node: %s", err) + if err == io.EOF { + return n, nil + } return } } +} + +// Flush the splitter and generate a dag.Node. +func (dw *DagWriter) Close() (err error) { + var blk []byte + for { + blk, err = dw.splitter.Next() + if err != nil { + if err == io.EOF { + err = nil + break + } + return err + } + + err = dw.processBlock(blk) + if err != nil { + return err + } + } // Generate the root node data - mbf.Data = first - data, err := mbf.GetBytes() + dw.mbf.Data = dw.first + data, err := dw.mbf.GetBytes() if err != nil { dw.seterr = err log.Critical("Failed generating bytes for multiblock file: %s", err) - return + return err } - root.Data = data + dw.root.Data = data // Add root node to the dagservice - _, err = dw.dagserv.Add(root) + _, err = dw.dagserv.Add(&dw.root) if err != nil { dw.seterr = err log.Critical("Got error adding created node to dagservice: %s", err) - return + return err } - dw.node = root - dw.done <- struct{}{} -} - -func (dw *DagWriter) Write(b []byte) (int, error) { - if dw.seterr != nil { - return 0, dw.seterr - } - dw.splChan <- b - return len(b), nil -} - -// Close the splitters input channel and wait for it to finish -// Must be called to finish up splitting, otherwise split method -// will never halt -func (dw *DagWriter) Close() error { - close(dw.splChan) - <-dw.done + dw.node = &dw.root return nil } diff --git a/unixfs/io/dagwriter_test.go b/unixfs/io/dagwriter_test.go index 73ba5c4e910..67c089931fe 100644 --- a/unixfs/io/dagwriter_test.go +++ b/unixfs/io/dagwriter_test.go @@ -7,6 +7,7 @@ import ( ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" bs "github.com/jbenet/go-ipfs/blockservice" + "github.com/jbenet/go-ipfs/importer" chunk "github.com/jbenet/go-ipfs/importer/chunk" mdag "github.com/jbenet/go-ipfs/merkledag" ) @@ -53,8 +54,8 @@ func TestDagWriter(t *testing.T) { if err != nil { t.Fatal(err) } - dag := &mdag.DAGService{bserv} - dw := NewDagWriter(dag, &chunk.SizeSplitter{4096}) + dag := mdag.NewDAGService(bserv) + dw := NewDagWriter(dag, chunk.NewSizeSplitter(4096)) nbytes := int64(1024 * 1024 * 2) n, err := io.CopyN(dw, &datasource{}, nbytes) @@ -87,8 +88,8 @@ func TestMassiveWrite(t *testing.T) { if err != nil { t.Fatal(err) } - dag := &mdag.DAGService{bserv} - dw := NewDagWriter(dag, &chunk.SizeSplitter{4096}) + dag := mdag.NewDAGService(bserv) + dw := NewDagWriter(dag, chunk.NewSizeSplitter(4096)) nbytes := int64(1024 * 1024 * 1024 * 16) n, err := io.CopyN(dw, &datasource{}, nbytes) @@ -107,13 +108,13 @@ func BenchmarkDagWriter(b *testing.B) { if err != nil { b.Fatal(err) } - dag := &mdag.DAGService{bserv} + dag := mdag.NewDAGService(bserv) b.ResetTimer() nbytes := int64(100000) for i := 0; i < b.N; i++ { b.SetBytes(nbytes) - dw := NewDagWriter(dag, &chunk.SizeSplitter{4096}) + dw := NewDagWriter(dag, chunk.NewSizeSplitter(4096)) n, err := io.CopyN(dw, &datasource{}, nbytes) if err != nil { b.Fatal(err) @@ -125,3 +126,45 @@ func BenchmarkDagWriter(b *testing.B) { } } + +func TestAgainstImporter(t *testing.T) { + dstore := ds.NewMapDatastore() + bserv, err := bs.NewBlockService(dstore, nil) + if err != nil { + t.Fatal(err) + } + dag := mdag.NewDAGService(bserv) + + nbytes := int64(1024 * 1024 * 2) + + // DagWriter + dw := NewDagWriter(dag, chunk.NewSizeSplitter(4096)) + n, err := io.CopyN(dw, &datasource{}, nbytes) + if err != nil { + t.Fatal(err) + } + if n != nbytes { + t.Fatal("Copied incorrect amount of bytes!") + } + + dw.Close() + dwNode := dw.GetNode() + dwKey, err := dwNode.Key() + if err != nil { + t.Fatal(err) + } + + // DagFromFile + rl := &io.LimitedReader{&datasource{}, nbytes} + + dffNode, err := importer.NewDagFromReaderWithSplitter(rl, chunk.NewSizeSplitter(4096)) + dffKey, err := dffNode.Key() + if err != nil { + t.Fatal(err) + } + if dwKey.String() != dffKey.String() { + t.Errorf("\nDagWriter produced %s\n"+ + "DagFromReader produced %s", + dwKey, dffKey) + } +}