From a70876e8717e3c160e5fe8af82bc98acc61c910e Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Wed, 1 Jun 2016 16:14:38 -0400 Subject: [PATCH] Store needed parts of IpfsNode in Adder. This will make it easier to set up a specialized data pipeline. License: MIT Signed-off-by: Kevin Atkinson --- core/commands/add.go | 4 +- core/coreunix/add.go | 90 ++++++++++++++++++++------------------- core/coreunix/add_test.go | 3 +- 3 files changed, 52 insertions(+), 45 deletions(-) diff --git a/core/commands/add.go b/core/commands/add.go index c6fc7146e3ad..48ca23a64557 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -141,11 +141,13 @@ You can now refer to the added file in a gateway, like so: outChan := make(chan interface{}, 8) res.SetOutput((<-chan interface{})(outChan)) - fileAdder, err := coreunix.NewAdder(req.Context(), n, outChan) + fileAdder, err := coreunix.NewAdder(req.Context(), n.Pinning, n.Blockstore, n.DAG) if err != nil { res.SetError(err, cmds.ErrNormal) return } + + fileAdder.Out = outChan fileAdder.Chunker = chunker fileAdder.Progress = progress fileAdder.Hidden = hidden diff --git a/core/coreunix/add.go b/core/coreunix/add.go index 806fe52831b7..b815e42c513e 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -67,42 +67,46 @@ type AddedObject struct { Bytes int64 `json:",omitempty"` } -func NewAdder(ctx context.Context, n *core.IpfsNode, out chan interface{}) (*Adder, error) { - mr, err := mfs.NewRoot(ctx, n.DAG, newDirNode(), nil) +func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds dag.DAGService) (*Adder, error) { + mr, err := mfs.NewRoot(ctx, ds, newDirNode(), nil) if err != nil { return nil, err } return &Adder{ - mr: mr, - ctx: ctx, - node: n, - out: out, - Progress: false, - Hidden: true, - Pin: true, - Trickle: false, - Wrap: false, - Chunker: "", + mr: mr, + ctx: ctx, + pinning: p, + blockstore: bs, + dagService: ds, + Progress: false, + Hidden: true, + Pin: true, + Trickle: false, + Wrap: false, + Chunker: "", }, nil + } // Internal structure for holding the switches passed to the `add` call type Adder struct { - ctx context.Context - node *core.IpfsNode - out chan interface{} - Progress bool - Hidden bool - Pin bool - Trickle bool - Silent bool - Wrap bool - Chunker string - root *dag.Node - mr *mfs.Root - unlocker bs.Unlocker - tempRoot key.Key + ctx context.Context + pinning pin.Pinner + blockstore bstore.GCBlockstore + dagService dag.DAGService + Out chan interface{} + Progress bool + Hidden bool + Pin bool + Trickle bool + Silent bool + Wrap bool + Chunker string + root *dag.Node + mr *mfs.Root + unlocker bs.Unlocker + tempRoot key.Key } // Perform the actual add & pin locally, outputting results to reader @@ -114,12 +118,12 @@ func (adder Adder) add(reader io.Reader) (*dag.Node, error) { if adder.Trickle { return importer.BuildTrickleDagFromReader( - adder.node.DAG, + adder.dagService, chnk, ) } return importer.BuildDagFromReader( - adder.node.DAG, + adder.dagService, chnk, ) } @@ -137,7 +141,7 @@ func (adder *Adder) RootNode() (*dag.Node, error) { // if not wrapping, AND one root file, use that hash as root. if !adder.Wrap && len(root.Links) == 1 { - root, err = root.Links[0].GetNode(adder.ctx, adder.node.DAG) + root, err = root.Links[0].GetNode(adder.ctx, adder.dagService) if err != nil { return nil, err } @@ -156,21 +160,21 @@ func (adder *Adder) PinRoot() error { return nil } - rnk, err := adder.node.DAG.Add(root) + rnk, err := adder.dagService.Add(root) if err != nil { return err } if adder.tempRoot != "" { - err := adder.node.Pinning.Unpin(adder.ctx, adder.tempRoot, true) + err := adder.pinning.Unpin(adder.ctx, adder.tempRoot, true) if err != nil { return err } adder.tempRoot = rnk } - adder.node.Pinning.PinWithMode(rnk, pin.Recursive) - return adder.node.Pinning.Flush() + adder.pinning.PinWithMode(rnk, pin.Recursive) + return adder.pinning.Flush() } func (adder *Adder) Finalize() (*dag.Node, error) { @@ -237,7 +241,7 @@ func (adder *Adder) outputDirs(path string, fs mfs.FSNode) error { } } - return outputDagnode(adder.out, path, nd) + return outputDagnode(adder.Out, path, nd) } // Add builds a merkledag from the a reader, pinning all objects to the local @@ -245,7 +249,7 @@ func (adder *Adder) outputDirs(path string, fs mfs.FSNode) error { func Add(n *core.IpfsNode, r io.Reader) (string, error) { defer n.Blockstore.PinLock().Unlock() - fileAdder, err := NewAdder(n.Context(), n, nil) + fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG) if err != nil { return "", err } @@ -277,7 +281,7 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) { } defer f.Close() - fileAdder, err := NewAdder(n.Context(), n, nil) + fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG) if err != nil { return "", err } @@ -306,7 +310,7 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) { // the directory, and and error if any. func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.Node, error) { file := files.NewReaderFile(filename, filename, ioutil.NopCloser(r), nil) - fileAdder, err := NewAdder(n.Context(), n, nil) + fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG) if err != nil { return "", nil, err } @@ -355,14 +359,14 @@ func (adder *Adder) addNode(node *dag.Node, path string) error { } if !adder.Silent { - return outputDagnode(adder.out, path, node) + return outputDagnode(adder.Out, path, node) } return nil } // Add the given file while respecting the adder. func (adder *Adder) AddFile(file files.File) error { - adder.unlocker = adder.node.Blockstore.PinLock() + adder.unlocker = adder.blockstore.PinLock() defer func() { adder.unlocker.Unlock() }() @@ -388,7 +392,7 @@ func (adder *Adder) addFile(file files.File) error { } dagnode := &dag.Node{Data: sdata} - _, err = adder.node.DAG.Add(dagnode) + _, err = adder.dagService.Add(dagnode) if err != nil { return err } @@ -401,7 +405,7 @@ func (adder *Adder) addFile(file files.File) error { // progress updates to the client (over the output channel) var reader io.Reader = file if adder.Progress { - reader = &progressReader{file: file, out: adder.out} + reader = &progressReader{file: file, out: adder.Out} } dagnode, err := adder.add(reader) @@ -445,14 +449,14 @@ func (adder *Adder) addDir(dir files.File) error { } func (adder *Adder) maybePauseForGC() error { - if adder.node.Blockstore.GCRequested() { + if adder.blockstore.GCRequested() { err := adder.PinRoot() if err != nil { return err } adder.unlocker.Unlock() - adder.unlocker = adder.node.Blockstore.PinLock() + adder.unlocker = adder.blockstore.PinLock() } return nil } diff --git a/core/coreunix/add_test.go b/core/coreunix/add_test.go index c773f46216e9..76b3d9ff4397 100644 --- a/core/coreunix/add_test.go +++ b/core/coreunix/add_test.go @@ -54,7 +54,8 @@ func TestAddGCLive(t *testing.T) { errs := make(chan error) out := make(chan interface{}) - adder, err := NewAdder(context.Background(), node, out) + adder, err := NewAdder(context.Background(), node.Pinning, node.Blockstore, node.DAG) + adder.Out = out if err != nil { t.Fatal(err) }