From b5c424b6c41a72c8b2804bf61473ef9279c824a5 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Fri, 26 Oct 2018 18:55:00 +0200 Subject: [PATCH 1/2] Add BatchDAG wrapping Batch as a DAGService. --- batch.go | 93 ++++++++++++++++++++++++++++++++++++++++++++++----- batch_test.go | 20 +++++++++++ 2 files changed, 105 insertions(+), 8 deletions(-) diff --git a/batch.go b/batch.go index a9c8c17..dc879d5 100644 --- a/batch.go +++ b/batch.go @@ -4,6 +4,8 @@ import ( "context" "errors" "runtime" + + cid "github.com/ipfs/go-cid" ) // ParallelBatchCommits is the number of batch commits that can be in-flight before blocking. @@ -23,14 +25,15 @@ var ErrClosed = errors.New("error: batch closed") // to add or remove a lot of nodes all at once. // // If the passed context is canceled, any in-progress commits are aborted. -func NewBatch(ctx context.Context, ds DAGService, opts ...BatchOption) *Batch { +// +func NewBatch(ctx context.Context, na NodeAdder, opts ...BatchOption) *Batch { ctx, cancel := context.WithCancel(ctx) bopts := defaultBatchOptions for _, o := range opts { o(&bopts) } return &Batch{ - ds: ds, + na: na, ctx: ctx, cancel: cancel, commitResults: make(chan error, ParallelBatchCommits), @@ -40,7 +43,7 @@ func NewBatch(ctx context.Context, ds DAGService, opts ...BatchOption) *Batch { // Batch is a buffer for batching adds to a dag. type Batch struct { - ds DAGService + na NodeAdder ctx context.Context cancel func() @@ -89,12 +92,12 @@ func (t *Batch) asyncCommit() { return } } - go func(ctx context.Context, b []Node, result chan error, ds DAGService) { + go func(ctx context.Context, b []Node, result chan error, na NodeAdder) { select { - case result <- ds.AddMany(ctx, b): + case result <- na.AddMany(ctx, b): case <-ctx.Done(): } - }(t.ctx, t.nodes, t.commitResults, t.ds) + }(t.ctx, t.nodes, t.commitResults, t.na) t.activeCommits++ t.nodes = make([]Node, 0, numBlocks) @@ -108,7 +111,7 @@ func (t *Batch) Add(ctx context.Context, nd Node) error { return t.AddMany(ctx, []Node{nd}) } -// Add many calls Add for every given Node, thus batching and +// AddMany many calls Add for every given Node, thus batching and // commiting them as needed. func (t *Batch) AddMany(ctx context.Context, nodes []Node) error { if t.err != nil { @@ -175,7 +178,7 @@ loop: // Be nice and cleanup. These can take a *lot* of memory. t.commitResults = nil - t.ds = nil + t.na = nil t.ctx = nil t.nodes = nil t.size = 0 @@ -216,3 +219,77 @@ func MaxNodesBatchOption(num int) BatchOption { o.maxNodes = num } } + +// BufferedDAG implements DAGService using a Batch NodeAdder to wrap add +// operations in the given DAGService. It will trigger Commit() before any +// non-Add operations, but otherwise calling Commit() is left to the user. +type BufferedDAG struct { + ds DAGService + b *Batch +} + +// NewBufferedDAG creates a BufferedDAG using the given DAGService and the +// given options for the Batch NodeAdder. +func NewBufferedDAG(ctx context.Context, ds DAGService, opts ...BatchOption) *BufferedDAG { + return &BufferedDAG{ + ds: ds, + b: NewBatch(ctx, ds, opts...), + } +} + +// Commit calls commit on the Batch. +func (bd *BufferedDAG) Commit() error { + return bd.b.Commit() +} + +// Add adds a new node using Batch. +func (bd *BufferedDAG) Add(ctx context.Context, n Node) error { + return bd.b.Add(ctx, n) +} + +// AddMany adds many nodes using Batch. +func (bd *BufferedDAG) AddMany(ctx context.Context, nds []Node) error { + return bd.b.AddMany(ctx, nds) +} + +// Get commits and gets a node from the DAGService. +func (bd *BufferedDAG) Get(ctx context.Context, c cid.Cid) (Node, error) { + err := bd.b.Commit() + if err != nil { + return nil, err + } + return bd.ds.Get(ctx, c) +} + +// GetMany commits and gets nodes from the DAGService. +func (bd *BufferedDAG) GetMany(ctx context.Context, cs []cid.Cid) <-chan *NodeOption { + err := bd.b.Commit() + if err != nil { + ch := make(chan *NodeOption, 1) + defer close(ch) + ch <- &NodeOption{ + Node: nil, + Err: err, + } + return ch + } + return bd.ds.GetMany(ctx, cs) +} + +// Remove commits and removes a node from the DAGService. +func (bd *BufferedDAG) Remove(ctx context.Context, c cid.Cid) error { + err := bd.b.Commit() + if err != nil { + return err + } + return bd.ds.Remove(ctx, c) +} + +// RemoveMany commits and removes nodes from the DAGService. +func (bd *BufferedDAG) RemoveMany(ctx context.Context, cs []cid.Cid) error { + err := bd.b.Commit() + if err != nil { + return err + } + return bd.ds.RemoveMany(ctx, cs) +} diff --git a/batch_test.go b/batch_test.go index 23a1d0e..d03a8f5 100644 --- a/batch_test.go +++ b/batch_test.go @@ -109,6 +109,26 @@ func TestBatch(t *testing.T) { } } +func TestBufferedDAG(t *testing.T) { + ds := newTestDag() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var bdag DAGService = NewBufferedDAG(ctx, ds) + + for i := 0; i < 1000; i++ { + n := new(EmptyNode) + if err := bdag.Add(ctx, n); err != nil { + t.Fatal(err) + } + if _, err := bdag.Get(ctx, n.Cid()); err != nil { + t.Fatal(err) + } + if err := bdag.Remove(ctx, n.Cid()); err != nil { + t.Fatal(err) + } + } +} + func TestBatchOptions(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 879e11f13ee8eddb36a3dea8d385fc64324f0b9f Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Sat, 27 Oct 2018 02:29:33 +0200 Subject: [PATCH 2/2] gx publish 0.7.1 --- .gx/lastpubver | 2 +- package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.gx/lastpubver b/.gx/lastpubver index df58177..32c9e3c 100644 --- a/.gx/lastpubver +++ b/.gx/lastpubver @@ -1 +1 @@ -0.7.0: QmdVNBLt7RMYnZwqBQJeexmSbTEDzERjBQUfs5McuPfEtB +0.7.1: QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o diff --git a/package.json b/package.json index e4bd818..f59af36 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,6 @@ "license": "", "name": "go-ipld-format", "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", - "version": "0.7.0" + "version": "0.7.1" }