From 09136e931b1748a5c67190af4a2c3f9d162ec25e Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 10 Oct 2017 19:21:17 -0700 Subject: [PATCH 1/3] parallelize batch flushing 1. Modern storage devices (i.e., SSDs) tend to be highly parallel. 2. Allows us to read and write at the same time (avoids pausing while flushing). fixes https://github.com/ipfs/go-ipfs/issues/898#issuecomment-331849064 License: MIT Signed-off-by: Steven Allen --- merkledag/batch.go | 98 ++++++++++++++++++++++++++++++++++++++++++ merkledag/merkledag.go | 31 ++----------- 2 files changed, 101 insertions(+), 28 deletions(-) create mode 100644 merkledag/batch.go diff --git a/merkledag/batch.go b/merkledag/batch.go new file mode 100644 index 00000000000..5879ad99b2d --- /dev/null +++ b/merkledag/batch.go @@ -0,0 +1,98 @@ +package merkledag + +import ( + "runtime" + + cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" + node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format" + blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format" +) + +// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking. +// TODO: Experiment with multiple datastores, storage devices, and CPUs to find +// the right value/formula. +var ParallelBatchCommits = runtime.NumCPU() * 2 + +// Batch is a buffer for batching adds to a dag. +type Batch struct { + ds *dagService + + activeCommits int + commitError error + commitResults chan error + + blocks []blocks.Block + size int + + MaxSize int + MaxBlocks int +} + +func (t *Batch) processResults() { + for t.activeCommits > 0 && t.commitError == nil { + select { + case err := <-t.commitResults: + t.activeCommits-- + if err != nil { + t.commitError = err + } + default: + return + } + } +} + +func (t *Batch) asyncCommit() { + if len(t.blocks) == 0 || t.commitError != nil { + return + } + if t.activeCommits >= ParallelBatchCommits { + err := <-t.commitResults + t.activeCommits-- + + if err != nil { + t.commitError = err + return + } + } + go func(b []blocks.Block) { + _, err := t.ds.Blocks.AddBlocks(b) + t.commitResults <- err + }(t.blocks) + + t.activeCommits++ + t.blocks = nil + t.size = 0 + + return +} + +// Add adds a node to the batch and commits the batch if necessary. +func (t *Batch) Add(nd node.Node) (*cid.Cid, error) { + // Not strictly necessary but allows us to catch errors early. + t.processResults() + if t.commitError != nil { + return nil, t.commitError + } + + t.blocks = append(t.blocks, nd) + t.size += len(nd.RawData()) + if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks { + t.asyncCommit() + } + return nd.Cid(), t.commitError +} + +// Commit commits batched nodes. +func (t *Batch) Commit() error { + t.asyncCommit() + for t.activeCommits > 0 && t.commitError == nil { + err := <-t.commitResults + t.activeCommits-- + if err != nil { + t.commitError = err + } + } + + return t.commitError +} diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 214fbbd8407..92cb5fa866f 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -11,7 +11,6 @@ import ( cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format" - blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format" ipldcbor "gx/ipfs/QmWCs8kMecJwCPK8JThue8TjgM2ieJ2HjTLDu7Cv2NEmZi/go-ipld-cbor" ) @@ -75,8 +74,9 @@ func (n *dagService) Add(nd node.Node) (*cid.Cid, error) { func (n *dagService) Batch() *Batch { return &Batch{ - ds: n, - MaxSize: 8 << 20, + ds: n, + commitResults: make(chan error, ParallelBatchCommits), + MaxSize: 8 << 20, // By default, only batch up to 128 nodes at a time. // The current implementation of flatfs opens this many file @@ -389,31 +389,6 @@ func (np *nodePromise) Get(ctx context.Context) (node.Node, error) { } } -type Batch struct { - ds *dagService - - blocks []blocks.Block - size int - MaxSize int - MaxBlocks int -} - -func (t *Batch) Add(nd node.Node) (*cid.Cid, error) { - t.blocks = append(t.blocks, nd) - t.size += len(nd.RawData()) - if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks { - return nd.Cid(), t.Commit() - } - return nd.Cid(), nil -} - -func (t *Batch) Commit() error { - _, err := t.ds.Blocks.AddBlocks(t.blocks) - t.blocks = nil - t.size = 0 - return err -} - type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error) // EnumerateChildren will walk the dag below the given root node and add all From e41848c537ed55bd03731214e8fef48889ca60ef Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 11 Oct 2017 11:40:54 -0700 Subject: [PATCH 2/3] create an issue for tuning the parallelism constant License: MIT Signed-off-by: Steven Allen --- merkledag/batch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merkledag/batch.go b/merkledag/batch.go index 5879ad99b2d..bf3e7404879 100644 --- a/merkledag/batch.go +++ b/merkledag/batch.go @@ -9,7 +9,7 @@ import ( ) // ParallelBatchCommits is the number of batch commits that can be in-flight before blocking. -// TODO: Experiment with multiple datastores, storage devices, and CPUs to find +// TODO(#4299): Experiment with multiple datastores, storage devices, and CPUs to find // the right value/formula. var ParallelBatchCommits = runtime.NumCPU() * 2 From 9de031b432003cd2a086677f19f240744a30a236 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 11 Oct 2017 11:43:30 -0700 Subject: [PATCH 3/3] preallocate Batch's blocks buffer on commit. It's probably safe to assume that this buffer will be about the same time each flush. This could cause 1 extra allocation (if this is the last commit) but that's unlikely to be an issue. License: MIT Signed-off-by: Steven Allen --- merkledag/batch.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/merkledag/batch.go b/merkledag/batch.go index bf3e7404879..b6dd286b9e1 100644 --- a/merkledag/batch.go +++ b/merkledag/batch.go @@ -43,7 +43,8 @@ func (t *Batch) processResults() { } func (t *Batch) asyncCommit() { - if len(t.blocks) == 0 || t.commitError != nil { + numBlocks := len(t.blocks) + if numBlocks == 0 || t.commitError != nil { return } if t.activeCommits >= ParallelBatchCommits { @@ -61,7 +62,7 @@ func (t *Batch) asyncCommit() { }(t.blocks) t.activeCommits++ - t.blocks = nil + t.blocks = make([]blocks.Block, 0, numBlocks) t.size = 0 return