Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallelize batch flushing #4296

Merged
merged 3 commits into from
Oct 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions merkledag/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package merkledag

import (
"runtime"

cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format"
blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
)

// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
// TODO(#4299): Experiment with multiple datastores, storage devices, and CPUs to find
// the right value/formula.
var ParallelBatchCommits = runtime.NumCPU() * 2

// Batch is a buffer for batching adds to a dag.
type Batch struct {
ds *dagService

activeCommits int
commitError error
commitResults chan error

blocks []blocks.Block
size int

MaxSize int
MaxBlocks int
}

func (t *Batch) processResults() {
for t.activeCommits > 0 && t.commitError == nil {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.commitError = err
}
default:
return
}
}
}

func (t *Batch) asyncCommit() {
numBlocks := len(t.blocks)
if numBlocks == 0 || t.commitError != nil {
return
}
if t.activeCommits >= ParallelBatchCommits {
err := <-t.commitResults
t.activeCommits--

if err != nil {
t.commitError = err
return
}
}
go func(b []blocks.Block) {
_, err := t.ds.Blocks.AddBlocks(b)
t.commitResults <- err
}(t.blocks)

t.activeCommits++
t.blocks = make([]blocks.Block, 0, numBlocks)
t.size = 0

return
}

// Add adds a node to the batch and commits the batch if necessary.
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
// Not strictly necessary but allows us to catch errors early.
t.processResults()
if t.commitError != nil {
return nil, t.commitError
}

t.blocks = append(t.blocks, nd)
t.size += len(nd.RawData())
if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks {
t.asyncCommit()
}
return nd.Cid(), t.commitError
}

// Commit commits batched nodes.
func (t *Batch) Commit() error {
t.asyncCommit()
for t.activeCommits > 0 && t.commitError == nil {
err := <-t.commitResults
t.activeCommits--
if err != nil {
t.commitError = err
}
}

return t.commitError
}
31 changes: 3 additions & 28 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format"
blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
ipldcbor "gx/ipfs/QmWCs8kMecJwCPK8JThue8TjgM2ieJ2HjTLDu7Cv2NEmZi/go-ipld-cbor"
)

Expand Down Expand Up @@ -75,8 +74,9 @@ func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {

func (n *dagService) Batch() *Batch {
return &Batch{
ds: n,
MaxSize: 8 << 20,
ds: n,
commitResults: make(chan error, ParallelBatchCommits),
MaxSize: 8 << 20,

// By default, only batch up to 128 nodes at a time.
// The current implementation of flatfs opens this many file
Expand Down Expand Up @@ -389,31 +389,6 @@ func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
}
}

type Batch struct {
ds *dagService

blocks []blocks.Block
size int
MaxSize int
MaxBlocks int
}

func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
t.blocks = append(t.blocks, nd)
t.size += len(nd.RawData())
if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks {
return nd.Cid(), t.Commit()
}
return nd.Cid(), nil
}

func (t *Batch) Commit() error {
_, err := t.ds.Blocks.AddBlocks(t.blocks)
t.blocks = nil
t.size = 0
return err
}

type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)

// EnumerateChildren will walk the dag below the given root node and add all
Expand Down