Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BufferedDAG wrapping Batch as a DAGService. #48

Merged
merged 2 commits into from
Oct 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gx/lastpubver
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.7.0: QmdVNBLt7RMYnZwqBQJeexmSbTEDzERjBQUfs5McuPfEtB
0.7.1: QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o
93 changes: 85 additions & 8 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"runtime"

cid "github.com/ipfs/go-cid"
)

// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
Expand All @@ -23,14 +25,15 @@ var ErrClosed = errors.New("error: batch closed")
// to add or remove a lot of nodes all at once.
//
// If the passed context is canceled, any in-progress commits are aborted.
func NewBatch(ctx context.Context, ds DAGService, opts ...BatchOption) *Batch {
//
func NewBatch(ctx context.Context, na NodeAdder, opts ...BatchOption) *Batch {
ctx, cancel := context.WithCancel(ctx)
bopts := defaultBatchOptions
for _, o := range opts {
o(&bopts)
}
return &Batch{
ds: ds,
na: na,
ctx: ctx,
cancel: cancel,
commitResults: make(chan error, ParallelBatchCommits),
Expand All @@ -40,7 +43,7 @@ func NewBatch(ctx context.Context, ds DAGService, opts ...BatchOption) *Batch {

// Batch is a buffer for batching adds to a dag.
type Batch struct {
ds DAGService
na NodeAdder

ctx context.Context
cancel func()
Expand Down Expand Up @@ -89,12 +92,12 @@ func (t *Batch) asyncCommit() {
return
}
}
go func(ctx context.Context, b []Node, result chan error, ds DAGService) {
go func(ctx context.Context, b []Node, result chan error, na NodeAdder) {
select {
case result <- ds.AddMany(ctx, b):
case result <- na.AddMany(ctx, b):
case <-ctx.Done():
}
}(t.ctx, t.nodes, t.commitResults, t.ds)
}(t.ctx, t.nodes, t.commitResults, t.na)

t.activeCommits++
t.nodes = make([]Node, 0, numBlocks)
Expand All @@ -108,7 +111,7 @@ func (t *Batch) Add(ctx context.Context, nd Node) error {
return t.AddMany(ctx, []Node{nd})
}

// Add many calls Add for every given Node, thus batching and
// AddMany many calls Add for every given Node, thus batching and
// commiting them as needed.
func (t *Batch) AddMany(ctx context.Context, nodes []Node) error {
if t.err != nil {
Expand Down Expand Up @@ -175,7 +178,7 @@ loop:

// Be nice and cleanup. These can take a *lot* of memory.
t.commitResults = nil
t.ds = nil
t.na = nil
t.ctx = nil
t.nodes = nil
t.size = 0
Expand Down Expand Up @@ -216,3 +219,77 @@ func MaxNodesBatchOption(num int) BatchOption {
o.maxNodes = num
}
}

// BufferedDAG implements DAGService using a Batch NodeAdder to wrap add
// operations in the given DAGService. It will trigger Commit() before any
// non-Add operations, but otherwise calling Commit() is left to the user.
type BufferedDAG struct {
ds DAGService
b *Batch
}

// NewBufferedDAG creates a BufferedDAG using the given DAGService and the
// given options for the Batch NodeAdder.
func NewBufferedDAG(ctx context.Context, ds DAGService, opts ...BatchOption) *BufferedDAG {
hsanjuan marked this conversation as resolved.
Show resolved Hide resolved
return &BufferedDAG{
ds: ds,
b: NewBatch(ctx, ds, opts...),
}
}

// Commit calls commit on the Batch.
func (bd *BufferedDAG) Commit() error {
return bd.b.Commit()
}

// Add adds a new node using Batch.
func (bd *BufferedDAG) Add(ctx context.Context, n Node) error {
return bd.b.Add(ctx, n)
}

// AddMany adds many nodes using Batch.
func (bd *BufferedDAG) AddMany(ctx context.Context, nds []Node) error {
return bd.b.AddMany(ctx, nds)
}

// Get commits and gets a node from the DAGService.
func (bd *BufferedDAG) Get(ctx context.Context, c cid.Cid) (Node, error) {
err := bd.b.Commit()
if err != nil {
return nil, err
}
return bd.ds.Get(ctx, c)
}

// GetMany commits and gets nodes from the DAGService.
func (bd *BufferedDAG) GetMany(ctx context.Context, cs []cid.Cid) <-chan *NodeOption {
err := bd.b.Commit()
if err != nil {
ch := make(chan *NodeOption, 1)
defer close(ch)
ch <- &NodeOption{
Node: nil,
Err: err,
}
return ch
}
return bd.ds.GetMany(ctx, cs)
}

// Remove commits and removes a node from the DAGService.
func (bd *BufferedDAG) Remove(ctx context.Context, c cid.Cid) error {
err := bd.b.Commit()
if err != nil {
return err
}
return bd.ds.Remove(ctx, c)
}

// RemoveMany commits and removes nodes from the DAGService.
func (bd *BufferedDAG) RemoveMany(ctx context.Context, cs []cid.Cid) error {
err := bd.b.Commit()
if err != nil {
return err
}
return bd.ds.RemoveMany(ctx, cs)
}
20 changes: 20 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,26 @@ func TestBatch(t *testing.T) {
}
}

func TestBufferedDAG(t *testing.T) {
ds := newTestDag()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var bdag DAGService = NewBufferedDAG(ctx, ds)

for i := 0; i < 1000; i++ {
n := new(EmptyNode)
if err := bdag.Add(ctx, n); err != nil {
t.Fatal(err)
}
if _, err := bdag.Get(ctx, n.Cid()); err != nil {
t.Fatal(err)
}
if err := bdag.Remove(ctx, n.Cid()); err != nil {
t.Fatal(err)
}
}
}

func TestBatchOptions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@
"license": "",
"name": "go-ipld-format",
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "0.7.0"
"version": "0.7.1"
}