Skip to content

Commit

Permalink
Merge pull request #191 from ehmry/dagservice-interface
Browse files Browse the repository at this point in the history
convert DAGService to an interface
  • Loading branch information
jbenet committed Oct 26, 2014
2 parents 39316a2 + 056699c commit 08edaf8
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 32 deletions.
4 changes: 2 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type IpfsNode struct {
Blocks *bserv.BlockService

// the merkle dag service, get/add objects.
DAG *merkledag.DAGService
DAG merkledag.DAGService

// the path resolution system
Resolver *path.Resolver
Expand Down Expand Up @@ -161,7 +161,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
return nil, err
}

dag := &merkledag.DAGService{Blocks: bs}
dag := merkledag.NewDAGService(bs)
ns := namesys.NewNameSystem(route)
p, err := pin.LoadPinner(d, dag)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewMockNode() (*IpfsNode, error) {
return nil, err
}

nd.DAG = &mdag.DAGService{Blocks: bserv}
nd.DAG = mdag.NewDAGService(bserv)

// Namespace resolver
nd.Namesys = nsys.NewNameSystem(dht)
Expand Down
32 changes: 22 additions & 10 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func MakeLink(n *Node) (*Link, error) {
}, nil
}

func (l *Link) GetNode(serv *DAGService) (*Node, error) {
func (l *Link) GetNode(serv DAGService) (*Node, error) {
if l.Node != nil {
return l.Node, nil
}
Expand Down Expand Up @@ -151,20 +151,32 @@ func (n *Node) Key() (u.Key, error) {
}

// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Add(*Node) (u.Key, error)
AddRecursive(*Node) error
Get(u.Key) (*Node, error)
Remove(*Node) error
}

func NewDAGService(bs *bserv.BlockService) DAGService {
return &dagService{bs}
}

// dagService is an IPFS Merkle DAG service.
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
// TODO: should cache Nodes that are in memory, and be
// able to free some of them when vm pressure is high
type DAGService struct {
type dagService struct {
Blocks *bserv.BlockService
}

// Add adds a node to the DAGService, storing the block in the BlockService
func (n *DAGService) Add(nd *Node) (u.Key, error) {
// Add adds a node to the dagService, storing the block in the BlockService
func (n *dagService) Add(nd *Node) (u.Key, error) {
k, _ := nd.Key()
log.Debug("DagService Add [%s]", k)
if n == nil {
return "", fmt.Errorf("DAGService is nil")
return "", fmt.Errorf("dagService is nil")
}

d, err := nd.Encoded(false)
Expand All @@ -182,7 +194,7 @@ func (n *DAGService) Add(nd *Node) (u.Key, error) {
return n.Blocks.AddBlock(b)
}

func (n *DAGService) AddRecursive(nd *Node) error {
func (n *dagService) AddRecursive(nd *Node) error {
_, err := n.Add(nd)
if err != nil {
log.Info("AddRecursive Error: %s\n", err)
Expand All @@ -201,10 +213,10 @@ func (n *DAGService) AddRecursive(nd *Node) error {
return nil
}

// Get retrieves a node from the DAGService, fetching the block in the BlockService
func (n *DAGService) Get(k u.Key) (*Node, error) {
// Get retrieves a node from the dagService, fetching the block in the BlockService
func (n *dagService) Get(k u.Key) (*Node, error) {
if n == nil {
return nil, fmt.Errorf("DAGService is nil")
return nil, fmt.Errorf("dagService is nil")
}

ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
Expand All @@ -216,7 +228,7 @@ func (n *DAGService) Get(k u.Key) (*Node, error) {
return Decoded(b.Data)
}

func (n *DAGService) Remove(nd *Node) error {
func (n *dagService) Remove(nd *Node) error {
for _, l := range nd.Links {
if l.Node != nil {
n.Remove(l.Node)
Expand Down
2 changes: 1 addition & 1 deletion path/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var log = u.Logger("path")
// Resolver provides path resolution to IPFS
// It has a pointer to a DAGService, which is uses to resolve nodes.
type Resolver struct {
DAG *merkledag.DAGService
DAG merkledag.DAGService
}

// ResolvePath fetches the node for given path. It uses the first
Expand Down
6 changes: 3 additions & 3 deletions pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ type pinner struct {
recursePin set.BlockSet
directPin set.BlockSet
indirPin *indirectPin
dserv *mdag.DAGService
dserv mdag.DAGService
dstore ds.Datastore
}

func NewPinner(dstore ds.Datastore, serv *mdag.DAGService) Pinner {
func NewPinner(dstore ds.Datastore, serv mdag.DAGService) Pinner {

// Load set from given datastore...
rcds := nsds.Wrap(dstore, recursePinDatastoreKey)
Expand Down Expand Up @@ -151,7 +151,7 @@ func (p *pinner) IsPinned(key util.Key) bool {
p.indirPin.HasKey(key)
}

func LoadPinner(d ds.Datastore, dserv *mdag.DAGService) (Pinner, error) {
func LoadPinner(d ds.Datastore, dserv mdag.DAGService) (Pinner, error) {
p := new(pinner)

{ // load recursive set
Expand Down
2 changes: 1 addition & 1 deletion pin/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestPinnerBasic(t *testing.T) {
t.Fatal(err)
}

dserv := &mdag.DAGService{Blocks: bserv}
dserv := mdag.NewDAGService(bserv)

p := NewPinner(dstore, dserv)

Expand Down
4 changes: 2 additions & 2 deletions unixfs/io/dagmodifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ import (
// perform surgery on a DAG 'file'
// Dear god, please rename this to something more pleasant
type DagModifier struct {
dagserv *mdag.DAGService
dagserv mdag.DAGService
curNode *mdag.Node

pbdata *ftpb.Data
splitter chunk.BlockSplitter
}

func NewDagModifier(from *mdag.Node, serv *mdag.DAGService, spl chunk.BlockSplitter) (*DagModifier, error) {
func NewDagModifier(from *mdag.Node, serv mdag.DAGService, spl chunk.BlockSplitter) (*DagModifier, error) {
pbd, err := ft.FromBytes(from.Data)
if err != nil {
return nil, err
Expand Down
14 changes: 9 additions & 5 deletions unixfs/io/dagmodifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ import (
logging "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-logging"
)

func getMockDagServ(t *testing.T) *mdag.DAGService {
func getMockDagServ(t *testing.T) mdag.DAGService {
dstore := ds.NewMapDatastore()
bserv, err := bs.NewBlockService(dstore, nil)
if err != nil {
t.Fatal(err)
}
return &mdag.DAGService{Blocks: bserv}
return mdag.NewDAGService(bserv)
}

func getNode(t *testing.T, dserv *mdag.DAGService, size int64) ([]byte, *mdag.Node) {
dw := NewDagWriter(dserv, &chunk.SizeSplitter{Size: 500})
func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Node) {
dw := NewDagWriter(dserv, &chunk.SizeSplitter{500})

n, err := io.CopyN(dw, u.NewFastRand(), size)
if err != nil {
Expand All @@ -36,7 +36,11 @@ func getNode(t *testing.T, dserv *mdag.DAGService, size int64) ([]byte, *mdag.No
t.Fatal("Incorrect copy amount!")
}

dw.Close()
err = dw.Close()
if err != nil {
t.Fatal("DagWriter failed to close,", err)
}

node := dw.GetNode()

dr, err := NewDagReader(node, dserv)
Expand Down
4 changes: 2 additions & 2 deletions unixfs/io/dagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ var ErrIsDir = errors.New("this dag node is a directory")

// DagReader provides a way to easily read the data contained in a dag.
type DagReader struct {
serv *mdag.DAGService
serv mdag.DAGService
node *mdag.Node
position int
buf *bytes.Buffer
}

// NewDagReader creates a new reader object that reads the data represented by the given
// node, using the passed in DAGService for data retreival
func NewDagReader(n *mdag.Node, serv *mdag.DAGService) (io.Reader, error) {
func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
pb := new(ftpb.Data)
err := proto.Unmarshal(n.Data, pb)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions unixfs/io/dagwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
var log = util.Logger("dagwriter")

type DagWriter struct {
dagserv *dag.DAGService
dagserv dag.DAGService
node *dag.Node
totalSize int64
splChan chan []byte
Expand All @@ -19,7 +19,7 @@ type DagWriter struct {
seterr error
}

func NewDagWriter(ds *dag.DAGService, splitter chunk.BlockSplitter) *DagWriter {
func NewDagWriter(ds dag.DAGService, splitter chunk.BlockSplitter) *DagWriter {
dw := new(DagWriter)
dw.dagserv = ds
dw.splChan = make(chan []byte, 8)
Expand Down
49 changes: 46 additions & 3 deletions unixfs/io/dagwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
bs "github.com/jbenet/go-ipfs/blockservice"
"github.com/jbenet/go-ipfs/importer"
chunk "github.com/jbenet/go-ipfs/importer/chunk"
mdag "github.com/jbenet/go-ipfs/merkledag"
)
Expand Down Expand Up @@ -53,7 +54,7 @@ func TestDagWriter(t *testing.T) {
if err != nil {
t.Fatal(err)
}
dag := &mdag.DAGService{Blocks: bserv}
dag := mdag.NewDAGService(bserv)
dw := NewDagWriter(dag, &chunk.SizeSplitter{Size: 4096})

nbytes := int64(1024 * 1024 * 2)
Expand Down Expand Up @@ -87,7 +88,7 @@ func TestMassiveWrite(t *testing.T) {
if err != nil {
t.Fatal(err)
}
dag := &mdag.DAGService{Blocks: bserv}
dag := mdag.NewDAGService(bserv)
dw := NewDagWriter(dag, &chunk.SizeSplitter{Size: 4096})

nbytes := int64(1024 * 1024 * 1024 * 16)
Expand All @@ -107,7 +108,7 @@ func BenchmarkDagWriter(b *testing.B) {
if err != nil {
b.Fatal(err)
}
dag := &mdag.DAGService{Blocks: bserv}
dag := mdag.NewDAGService(bserv)

b.ResetTimer()
nbytes := int64(100000)
Expand All @@ -125,3 +126,45 @@ func BenchmarkDagWriter(b *testing.B) {
}

}

func TestAgainstImporter(t *testing.T) {
dstore := ds.NewMapDatastore()
bserv, err := bs.NewBlockService(dstore, nil)
if err != nil {
t.Fatal(err)
}
dag := mdag.NewDAGService(bserv)

nbytes := int64(1024 * 1024 * 2)

// DagWriter
dw := NewDagWriter(dag, &chunk.SizeSplitter{4096})
n, err := io.CopyN(dw, &datasource{}, nbytes)
if err != nil {
t.Fatal(err)
}
if n != nbytes {
t.Fatal("Copied incorrect amount of bytes!")
}

dw.Close()
dwNode := dw.GetNode()
dwKey, err := dwNode.Key()
if err != nil {
t.Fatal(err)
}

// DagFromFile
rl := &io.LimitedReader{&datasource{}, nbytes}

dffNode, err := importer.NewDagFromReaderWithSplitter(rl, &chunk.SizeSplitter{4096})
dffKey, err := dffNode.Key()
if err != nil {
t.Fatal(err)
}
if dwKey.String() != dffKey.String() {
t.Errorf("\nDagWriter produced %s\n"+
"DagFromReader produced %s",
dwKey, dffKey)
}
}

0 comments on commit 08edaf8

Please sign in to comment.