Skip to content

Commit

Permalink
feat: Uses BatchWithFlusher in iavl tree (#807)
Browse files Browse the repository at this point in the history
  • Loading branch information
catShaark authored Aug 11, 2023
1 parent c364b65 commit b42a0bc
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 69 deletions.
24 changes: 11 additions & 13 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,20 @@ import (
// around batch that flushes batch's data to disk
// as soon as the configurable limit is reached.
type BatchWithFlusher struct {
db dbm.DB // This is only used to create new batch
batch dbm.Batch // Batched writing buffer.
flushThreshold int // The maximum size of the batch in bytes before it gets flushed to disk
db dbm.DB // This is only used to create new batch
batch dbm.Batch // Batched writing buffer.
}

var _ dbm.Batch = &BatchWithFlusher{}

// Ethereum has found that commit of 100KB is optimal, ref ethereum/go-ethereum#15115
// var defaultFlushThreshold = 100000
var flushThreshold = 100000

// NewBatchWithFlusher returns new BatchWithFlusher wrapping the passed in batch
func NewBatchWithFlusher(db dbm.DB, flushThreshold int) *BatchWithFlusher {
func NewBatchWithFlusher(db dbm.DB) *BatchWithFlusher {
return &BatchWithFlusher{
db: db,
batch: db.NewBatchWithSize(flushThreshold),
flushThreshold: flushThreshold,
db: db,
batch: db.NewBatchWithSize(flushThreshold),
}
}

Expand All @@ -44,15 +42,15 @@ func (b *BatchWithFlusher) estimateSizeAfterSetting(key []byte, value []byte) (i
}

// Set sets value at the given key to the db.
// If the set causes the underlying batch size to exceed batchSizeFlushThreshold,
// If the set causes the underlying batch size to exceed flushThreshold,
// the batch is flushed to disk, cleared, and a new one is created with buffer pre-allocated to threshold.
// The addition entry is then added to the batch.
func (b *BatchWithFlusher) Set(key []byte, value []byte) error {
batchSizeAfter, err := b.estimateSizeAfterSetting(key, value)
if err != nil {
return err
}
if batchSizeAfter > b.flushThreshold {
if batchSizeAfter > flushThreshold {
err = b.batch.Write()
if err != nil {
return err
Expand All @@ -61,7 +59,7 @@ func (b *BatchWithFlusher) Set(key []byte, value []byte) error {
if err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
b.batch = b.db.NewBatchWithSize(flushThreshold)
}
err = b.batch.Set(key, value)
if err != nil {
Expand All @@ -79,7 +77,7 @@ func (b *BatchWithFlusher) Delete(key []byte) error {
if err != nil {
return err
}
if batchSizeAfter > b.flushThreshold {
if batchSizeAfter > flushThreshold {
err = b.batch.Write()
if err != nil {
return err
Expand All @@ -88,7 +86,7 @@ func (b *BatchWithFlusher) Delete(key []byte) error {
if err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
b.batch = b.db.NewBatchWithSize(flushThreshold)
}
err = b.batch.Delete(key)
if err != nil {
Expand Down
47 changes: 4 additions & 43 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,63 +26,24 @@ func makeKey(n uint16) []byte {
return key
}

func BenchmarkBatchWithFlusher(b *testing.B) {
testedBackends := []dbm.BackendType{
dbm.GoLevelDBBackend,
}

// we benchmark batch writing data of size 10MBs with different flush threshold
for _, flushThreshold := range []int{100000, 1000000, 10000000} {
for _, backend := range testedBackends {
b.Run(fmt.Sprintf("threshold=%d/backend=%s", flushThreshold, backend), func(b *testing.B) {
benchmarkBatchWithFlusher(b, backend, flushThreshold)
})
}
}
}

func benchmarkBatchWithFlusher(b *testing.B, backend dbm.BackendType, flushThreshold int) {
name := fmt.Sprintf("test_%x", randstr(12))
dir := b.TempDir()
db, err := dbm.NewDB(name, backend, dir)
require.NoError(b, err)
defer cleanupDBDir(dir, name)

batchWithFlusher := NewBatchWithFlusher(db, flushThreshold)

// we'll try to to commit 10MBs (1000 * 10KBs each entries) of data into the db
for keyNonce := uint16(0); keyNonce < 1000; keyNonce++ {
// each key / value is 10 KBs of zero bytes
key := makeKey(keyNonce)
err := batchWithFlusher.Set(key, bytesArrayOfSize10KB[:])
if err != nil {
panic(err)
}
}
require.NoError(b, batchWithFlusher.Write())
}

func TestBatchWithFlusher(t *testing.T) {
testedBackends := []dbm.BackendType{
dbm.GoLevelDBBackend,
}

// we test batch writing data of size 10MBs with different flush threshold
for _, flushThreshold := range []int{100000, 1000000, 10000000} {
for _, backend := range testedBackends {
testBatchWithFlusher(t, backend, flushThreshold)
}
for _, backend := range testedBackends {
testBatchWithFlusher(t, backend)
}
}

func testBatchWithFlusher(t *testing.T, backend dbm.BackendType, flushThreshold int) {
func testBatchWithFlusher(t *testing.T, backend dbm.BackendType) {
name := fmt.Sprintf("test_%x", randstr(12))
dir := t.TempDir()
db, err := dbm.NewDB(name, backend, dir)
require.NoError(t, err)
defer cleanupDBDir(dir, name)

batchWithFlusher := NewBatchWithFlusher(db, flushThreshold)
batchWithFlusher := NewBatchWithFlusher(db)

// we'll try to to commit 10MBs (1000 * 10KBs each entries) of data into the db
for keyNonce := uint16(0); keyNonce < 1000; keyNonce++ {
Expand Down
11 changes: 7 additions & 4 deletions mutable_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ func TestUpgradeStorageToFast_DbErrorConstructor_Failure(t *testing.T) {
expectedError := errors.New("some db error")

dbMock.EXPECT().Get(gomock.Any()).Return(nil, expectedError).Times(1)
dbMock.EXPECT().NewBatch().Return(nil).Times(1)
dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(nil).Times(1)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1)

tree := NewMutableTree(dbMock, 0, false, log.NewNopLogger())
Expand All @@ -822,7 +822,7 @@ func TestUpgradeStorageToFast_DbErrorEnableFastStorage_Failure(t *testing.T) {
batchMock := mock.NewMockBatch(ctrl)

dbMock.EXPECT().Get(gomock.Any()).Return(nil, nil).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(1)
dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(batchMock).Times(1)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1)

iterMock := mock.NewMockIterator(ctrl)
Expand All @@ -832,6 +832,7 @@ func TestUpgradeStorageToFast_DbErrorEnableFastStorage_Failure(t *testing.T) {
iterMock.EXPECT().Close()

batchMock.EXPECT().Set(gomock.Any(), gomock.Any()).Return(expectedError).Times(1)
batchMock.EXPECT().GetByteSize().Return(100, nil).Times(1)

tree := NewMutableTree(dbMock, 0, false, log.NewNopLogger())
require.NotNil(t, tree)
Expand Down Expand Up @@ -870,7 +871,7 @@ func TestFastStorageReUpgradeProtection_NoForceUpgrade_Success(t *testing.T) {
batchMock := mock.NewMockBatch(ctrl)

dbMock.EXPECT().Get(gomock.Any()).Return(expectedStorageVersion, nil).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(1)
dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(batchMock).Times(1)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1) // called to get latest version

tree := NewMutableTree(dbMock, 0, false, log.NewNopLogger())
Expand Down Expand Up @@ -917,7 +918,8 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_

// dbMock represents the underlying database under the hood of nodeDB
dbMock.EXPECT().Get(gomock.Any()).Return(expectedStorageVersion, nil).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(3)

dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(batchMock).Times(3)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1) // called to get latest version
startFormat := fastKeyFormat.Key()
endFormat := fastKeyFormat.Key()
Expand All @@ -936,6 +938,7 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_
updatedExpectedStorageVersion := make([]byte, len(expectedStorageVersion))
copy(updatedExpectedStorageVersion, expectedStorageVersion)
updatedExpectedStorageVersion[len(updatedExpectedStorageVersion)-1]++
batchMock.EXPECT().GetByteSize().Return(100, nil).Times(2)
batchMock.EXPECT().Delete(fastKeyFormat.Key(fastNodeKeyToDelete)).Return(nil).Times(1)
batchMock.EXPECT().Set(metadataKeyFormat.Key([]byte(storageVersionKey)), updatedExpectedStorageVersion).Return(nil).Times(1)
batchMock.EXPECT().Write().Return(nil).Times(2)
Expand Down
6 changes: 3 additions & 3 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func newNodeDB(db dbm.DB, cacheSize int, opts *Options, lg log.Logger) *nodeDB {
return &nodeDB{
logger: lg,
db: db,
batch: db.NewBatch(),
batch: NewBatchWithFlusher(db),
opts: *opts,
firstVersion: 0,
latestVersion: 0, // initially invalid
Expand Down Expand Up @@ -382,7 +382,7 @@ func (ndb *nodeDB) writeBatch() error {
return err
}

ndb.batch = ndb.db.NewBatch()
ndb.batch = NewBatchWithFlusher(ndb.db)

return nil
}
Expand Down Expand Up @@ -877,7 +877,7 @@ func (ndb *nodeDB) Commit() error {
}

ndb.batch.Close()
ndb.batch = ndb.db.NewBatch()
ndb.batch = NewBatchWithFlusher(ndb.db)

return nil
}
Expand Down
12 changes: 6 additions & 6 deletions nodedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestNewNoDbStorage_StorageVersionInDb_Success(t *testing.T) {
dbMock := mock.NewMockDB(ctrl)

dbMock.EXPECT().Get(gomock.Any()).Return([]byte(expectedVersion), nil).Times(1)
dbMock.EXPECT().NewBatch().Return(nil).Times(1)
dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(nil).Times(1)

ndb := newNodeDB(dbMock, 0, nil, log.NewNopLogger())
require.Equal(t, expectedVersion, ndb.storageVersion)
Expand All @@ -61,8 +61,7 @@ func TestNewNoDbStorage_ErrorInConstructor_DefaultSet(t *testing.T) {
dbMock := mock.NewMockDB(ctrl)

dbMock.EXPECT().Get(gomock.Any()).Return(nil, errors.New("some db error")).Times(1)
dbMock.EXPECT().NewBatch().Return(nil).Times(1)

dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(nil).Times(1)
ndb := newNodeDB(dbMock, 0, nil, log.NewNopLogger())
require.Equal(t, expectedVersion, ndb.getStorageVersion())
}
Expand All @@ -74,7 +73,7 @@ func TestNewNoDbStorage_DoesNotExist_DefaultSet(t *testing.T) {
dbMock := mock.NewMockDB(ctrl)

dbMock.EXPECT().Get(gomock.Any()).Return(nil, nil).Times(1)
dbMock.EXPECT().NewBatch().Return(nil).Times(1)
dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(nil).Times(1)

ndb := newNodeDB(dbMock, 0, nil, log.NewNopLogger())
require.Equal(t, expectedVersion, ndb.getStorageVersion())
Expand Down Expand Up @@ -108,14 +107,15 @@ func TestSetStorageVersion_DBFailure_OldKept(t *testing.T) {
expectedFastCacheVersion := 2

dbMock.EXPECT().Get(gomock.Any()).Return([]byte(defaultStorageVersionValue), nil).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(1)
dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(batchMock).Times(1)

// rIterMock is used to get the latest version from disk. We are mocking that rIterMock returns latestTreeVersion from disk
rIterMock.EXPECT().Valid().Return(true).Times(1)
rIterMock.EXPECT().Key().Return(nodeKeyFormat.Key(GetRootKey(int64(expectedFastCacheVersion)))).Times(1)
rIterMock.EXPECT().Close().Return(nil).Times(1)

dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1)
batchMock.EXPECT().GetByteSize().Return(100, nil).Times(1)
batchMock.EXPECT().Set(metadataKeyFormat.Key([]byte(storageVersionKey)), []byte(fastStorageVersionValue+fastStorageVersionDelimiter+strconv.Itoa(expectedFastCacheVersion))).Return(errors.New(expectedErrorMsg)).Times(1)

ndb := newNodeDB(dbMock, 0, nil, log.NewNopLogger())
Expand All @@ -137,7 +137,7 @@ func TestSetStorageVersion_InvalidVersionFailure_OldKept(t *testing.T) {
invalidStorageVersion := fastStorageVersionValue + fastStorageVersionDelimiter + "1" + fastStorageVersionDelimiter + "2"

dbMock.EXPECT().Get(gomock.Any()).Return([]byte(invalidStorageVersion), nil).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(1)
dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(batchMock).Times(1)

ndb := newNodeDB(dbMock, 0, nil, log.NewNopLogger())
require.Equal(t, invalidStorageVersion, ndb.getStorageVersion())
Expand Down

0 comments on commit b42a0bc

Please sign in to comment.