diff --git a/core/core.go b/core/core.go index 14112f95793..30106c79413 100644 --- a/core/core.go +++ b/core/core.go @@ -58,7 +58,7 @@ type IpfsNode struct { Blocks *bserv.BlockService // the merkle dag service, get/add objects. - DAG *merkledag.DAGService + DAG merkledag.DAGService // the path resolution system Resolver *path.Resolver @@ -161,7 +161,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) { return nil, err } - dag := &merkledag.DAGService{Blocks: bs} + dag := merkledag.NewDAGService(bs) ns := namesys.NewNameSystem(route) p, err := pin.LoadPinner(d, dag) if err != nil { diff --git a/core/mock.go b/core/mock.go index 2257e988543..f35e591f23b 100644 --- a/core/mock.go +++ b/core/mock.go @@ -49,7 +49,7 @@ func NewMockNode() (*IpfsNode, error) { return nil, err } - nd.DAG = &mdag.DAGService{Blocks: bserv} + nd.DAG = mdag.NewDAGService(bserv) // Namespace resolver nd.Namesys = nsys.NewNameSystem(dht) diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 7834677a8f8..0e595c9d693 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -62,7 +62,7 @@ func MakeLink(n *Node) (*Link, error) { }, nil } -func (l *Link) GetNode(serv *DAGService) (*Node, error) { +func (l *Link) GetNode(serv DAGService) (*Node, error) { if l.Node != nil { return l.Node, nil } @@ -151,20 +151,32 @@ func (n *Node) Key() (u.Key, error) { } // DAGService is an IPFS Merkle DAG service. +type DAGService interface { + Add(*Node) (u.Key, error) + AddRecursive(*Node) error + Get(u.Key) (*Node, error) + Remove(*Node) error +} + +func NewDAGService(bs *bserv.BlockService) DAGService { + return &dagService{bs} +} + +// dagService is an IPFS Merkle DAG service. // - the root is virtual (like a forest) // - stores nodes' data in a BlockService // TODO: should cache Nodes that are in memory, and be // able to free some of them when vm pressure is high -type DAGService struct { +type dagService struct { Blocks *bserv.BlockService } -// Add adds a node to the DAGService, storing the block in the BlockService -func (n *DAGService) Add(nd *Node) (u.Key, error) { +// Add adds a node to the dagService, storing the block in the BlockService +func (n *dagService) Add(nd *Node) (u.Key, error) { k, _ := nd.Key() log.Debug("DagService Add [%s]", k) if n == nil { - return "", fmt.Errorf("DAGService is nil") + return "", fmt.Errorf("dagService is nil") } d, err := nd.Encoded(false) @@ -182,7 +194,7 @@ func (n *DAGService) Add(nd *Node) (u.Key, error) { return n.Blocks.AddBlock(b) } -func (n *DAGService) AddRecursive(nd *Node) error { +func (n *dagService) AddRecursive(nd *Node) error { _, err := n.Add(nd) if err != nil { log.Info("AddRecursive Error: %s\n", err) @@ -201,10 +213,10 @@ func (n *DAGService) AddRecursive(nd *Node) error { return nil } -// Get retrieves a node from the DAGService, fetching the block in the BlockService -func (n *DAGService) Get(k u.Key) (*Node, error) { +// Get retrieves a node from the dagService, fetching the block in the BlockService +func (n *dagService) Get(k u.Key) (*Node, error) { if n == nil { - return nil, fmt.Errorf("DAGService is nil") + return nil, fmt.Errorf("dagService is nil") } ctx, _ := context.WithTimeout(context.TODO(), time.Second*5) @@ -216,7 +228,7 @@ func (n *DAGService) Get(k u.Key) (*Node, error) { return Decoded(b.Data) } -func (n *DAGService) Remove(nd *Node) error { +func (n *dagService) Remove(nd *Node) error { for _, l := range nd.Links { if l.Node != nil { n.Remove(l.Node) diff --git a/path/path.go b/path/path.go index 03c1a481e5e..cb1061d1185 100644 --- a/path/path.go +++ b/path/path.go @@ -15,7 +15,7 @@ var log = u.Logger("path") // Resolver provides path resolution to IPFS // It has a pointer to a DAGService, which is uses to resolve nodes. type Resolver struct { - DAG *merkledag.DAGService + DAG merkledag.DAGService } // ResolvePath fetches the node for given path. It uses the first diff --git a/pin/pin.go b/pin/pin.go index b9c509a0343..a3f0e260b6e 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -32,11 +32,11 @@ type pinner struct { recursePin set.BlockSet directPin set.BlockSet indirPin *indirectPin - dserv *mdag.DAGService + dserv mdag.DAGService dstore ds.Datastore } -func NewPinner(dstore ds.Datastore, serv *mdag.DAGService) Pinner { +func NewPinner(dstore ds.Datastore, serv mdag.DAGService) Pinner { // Load set from given datastore... rcds := nsds.Wrap(dstore, recursePinDatastoreKey) @@ -151,7 +151,7 @@ func (p *pinner) IsPinned(key util.Key) bool { p.indirPin.HasKey(key) } -func LoadPinner(d ds.Datastore, dserv *mdag.DAGService) (Pinner, error) { +func LoadPinner(d ds.Datastore, dserv mdag.DAGService) (Pinner, error) { p := new(pinner) { // load recursive set diff --git a/pin/pin_test.go b/pin/pin_test.go index 8f6f6c343b1..7bf0756df56 100644 --- a/pin/pin_test.go +++ b/pin/pin_test.go @@ -24,7 +24,7 @@ func TestPinnerBasic(t *testing.T) { t.Fatal(err) } - dserv := &mdag.DAGService{Blocks: bserv} + dserv := mdag.NewDAGService(bserv) p := NewPinner(dstore, dserv) diff --git a/unixfs/io/dagmodifier.go b/unixfs/io/dagmodifier.go index 2d5fb77d924..ebec24cfce8 100644 --- a/unixfs/io/dagmodifier.go +++ b/unixfs/io/dagmodifier.go @@ -17,14 +17,14 @@ import ( // perform surgery on a DAG 'file' // Dear god, please rename this to something more pleasant type DagModifier struct { - dagserv *mdag.DAGService + dagserv mdag.DAGService curNode *mdag.Node pbdata *ftpb.Data splitter chunk.BlockSplitter } -func NewDagModifier(from *mdag.Node, serv *mdag.DAGService, spl chunk.BlockSplitter) (*DagModifier, error) { +func NewDagModifier(from *mdag.Node, serv mdag.DAGService, spl chunk.BlockSplitter) (*DagModifier, error) { pbd, err := ft.FromBytes(from.Data) if err != nil { return nil, err diff --git a/unixfs/io/dagmodifier_test.go b/unixfs/io/dagmodifier_test.go index 3686ff859ae..d45559b3adb 100644 --- a/unixfs/io/dagmodifier_test.go +++ b/unixfs/io/dagmodifier_test.go @@ -16,17 +16,17 @@ import ( logging "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-logging" ) -func getMockDagServ(t *testing.T) *mdag.DAGService { +func getMockDagServ(t *testing.T) mdag.DAGService { dstore := ds.NewMapDatastore() bserv, err := bs.NewBlockService(dstore, nil) if err != nil { t.Fatal(err) } - return &mdag.DAGService{Blocks: bserv} + return mdag.NewDAGService(bserv) } -func getNode(t *testing.T, dserv *mdag.DAGService, size int64) ([]byte, *mdag.Node) { - dw := NewDagWriter(dserv, &chunk.SizeSplitter{Size: 500}) +func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) { + dw := NewDagWriter(dserv, &chunk.SizeSplitter{500}) n, err := io.CopyN(dw, u.NewFastRand(), size) if err != nil { @@ -36,7 +36,11 @@ func getNode(t *testing.T, dserv *mdag.DAGService, size int64) ([]byte, *mdag.No t.Fatal("Incorrect copy amount!") } - dw.Close() + err = dw.Close() + if err != nil { + t.Fatal("DagWriter failed to close,", err) + } + node := dw.GetNode() dr, err := NewDagReader(node, dserv) diff --git a/unixfs/io/dagreader.go b/unixfs/io/dagreader.go index 84691610377..17ad8737193 100644 --- a/unixfs/io/dagreader.go +++ b/unixfs/io/dagreader.go @@ -16,7 +16,7 @@ var ErrIsDir = errors.New("this dag node is a directory") // DagReader provides a way to easily read the data contained in a dag. type DagReader struct { - serv *mdag.DAGService + serv mdag.DAGService node *mdag.Node position int buf *bytes.Buffer @@ -24,7 +24,7 @@ type DagReader struct { // NewDagReader creates a new reader object that reads the data represented by the given // node, using the passed in DAGService for data retreival -func NewDagReader(n *mdag.Node, serv *mdag.DAGService) (io.Reader, error) { +func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) { pb := new(ftpb.Data) err := proto.Unmarshal(n.Data, pb) if err != nil { diff --git a/unixfs/io/dagwriter.go b/unixfs/io/dagwriter.go index 4abb1b36c76..c9b91cc589a 100644 --- a/unixfs/io/dagwriter.go +++ b/unixfs/io/dagwriter.go @@ -10,7 +10,7 @@ import ( var log = util.Logger("dagwriter") type DagWriter struct { - dagserv *dag.DAGService + dagserv dag.DAGService node *dag.Node totalSize int64 splChan chan []byte @@ -19,7 +19,7 @@ type DagWriter struct { seterr error } -func NewDagWriter(ds *dag.DAGService, splitter chunk.BlockSplitter) *DagWriter { +func NewDagWriter(ds dag.DAGService, splitter chunk.BlockSplitter) *DagWriter { dw := new(DagWriter) dw.dagserv = ds dw.splChan = make(chan []byte, 8) diff --git a/unixfs/io/dagwriter_test.go b/unixfs/io/dagwriter_test.go index d0b8f45d16f..08779e2c1f2 100644 --- a/unixfs/io/dagwriter_test.go +++ b/unixfs/io/dagwriter_test.go @@ -7,6 +7,7 @@ import ( ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" bs "github.com/jbenet/go-ipfs/blockservice" + "github.com/jbenet/go-ipfs/importer" chunk "github.com/jbenet/go-ipfs/importer/chunk" mdag "github.com/jbenet/go-ipfs/merkledag" ) @@ -53,7 +54,7 @@ func TestDagWriter(t *testing.T) { if err != nil { t.Fatal(err) } - dag := &mdag.DAGService{Blocks: bserv} + dag := mdag.NewDAGService(bserv) dw := NewDagWriter(dag, &chunk.SizeSplitter{Size: 4096}) nbytes := int64(1024 * 1024 * 2) @@ -87,7 +88,7 @@ func TestMassiveWrite(t *testing.T) { if err != nil { t.Fatal(err) } - dag := &mdag.DAGService{Blocks: bserv} + dag := mdag.NewDAGService(bserv) dw := NewDagWriter(dag, &chunk.SizeSplitter{Size: 4096}) nbytes := int64(1024 * 1024 * 1024 * 16) @@ -107,7 +108,7 @@ func BenchmarkDagWriter(b *testing.B) { if err != nil { b.Fatal(err) } - dag := &mdag.DAGService{Blocks: bserv} + dag := mdag.NewDAGService(bserv) b.ResetTimer() nbytes := int64(100000) @@ -125,3 +126,45 @@ func BenchmarkDagWriter(b *testing.B) { } } + +func TestAgainstImporter(t *testing.T) { + dstore := ds.NewMapDatastore() + bserv, err := bs.NewBlockService(dstore, nil) + if err != nil { + t.Fatal(err) + } + dag := mdag.NewDAGService(bserv) + + nbytes := int64(1024 * 1024 * 2) + + // DagWriter + dw := NewDagWriter(dag, &chunk.SizeSplitter{4096}) + n, err := io.CopyN(dw, &datasource{}, nbytes) + if err != nil { + t.Fatal(err) + } + if n != nbytes { + t.Fatal("Copied incorrect amount of bytes!") + } + + dw.Close() + dwNode := dw.GetNode() + dwKey, err := dwNode.Key() + if err != nil { + t.Fatal(err) + } + + // DagFromFile + rl := &io.LimitedReader{&datasource{}, nbytes} + + dffNode, err := importer.NewDagFromReaderWithSplitter(rl, &chunk.SizeSplitter{4096}) + dffKey, err := dffNode.Key() + if err != nil { + t.Fatal(err) + } + if dwKey.String() != dffKey.String() { + t.Errorf("\nDagWriter produced %s\n"+ + "DagFromReader produced %s", + dwKey, dffKey) + } +}