Skip to content

Commit

Permalink
Merge pull request #188 from Ethernal-Tech/feat-extend-db-tests
Browse files Browse the repository at this point in the history
Add read threads to storagev2 leveldb unit test
  • Loading branch information
oliverbundalo authored Apr 2, 2024
2 parents 51a8788 + 94c8c58 commit 1b47a22
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 25 deletions.
3 changes: 2 additions & 1 deletion blockchain/storagev2/leveldb/leveldb_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/syndtr/goleveldb/leveldb/opt"
)

func openStorage(t *testing.T, p string) (*storagev2.Storage, func(), string) {
Expand Down Expand Up @@ -82,7 +83,7 @@ func TestWriteBlockPerf(t *testing.T) {
time.Sleep(time.Second)

size := dbSize(t, path)
t.Logf("\tdb size %d MB", size/(1024*1024))
t.Logf("\tdb size %d MB", size/(1*opt.MiB))
t.Logf("\ttotal WriteBatch %d ms", watchTime)
}

Expand Down
110 changes: 87 additions & 23 deletions blockchain/storagev2/leveldb/leveldb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package leveldb

import (
"context"
"math/rand"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"testing"
"time"
Expand All @@ -13,6 +15,7 @@ import (
"github.com/0xPolygon/polygon-edge/types"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
"github.com/syndtr/goleveldb/leveldb/opt"
)

func newStorage(t *testing.T) (*storagev2.Storage, func()) {
Expand Down Expand Up @@ -99,16 +102,82 @@ func dirSize(t *testing.T, path string) int64 {
return size
}

func writeBlock(t *testing.T, s *storagev2.Storage, b *types.FullBlock) {
t.Helper()

batchWriter := s.NewWriter()

batchWriter.PutBody(b.Block.Number(), b.Block.Hash(), b.Block.Body())

for _, tx := range b.Block.Transactions {
batchWriter.PutTxLookup(tx.Hash(), b.Block.Number())
}

batchWriter.PutHeadHash(b.Block.Header.Hash)
batchWriter.PutHeadNumber(b.Block.Number())
batchWriter.PutBlockLookup(b.Block.Hash(), b.Block.Number())
batchWriter.PutHeader(b.Block.Header)
batchWriter.PutReceipts(b.Block.Number(), b.Block.Hash(), b.Receipts)
batchWriter.PutCanonicalHash(b.Block.Number(), b.Block.Hash())
require.NoError(t, batchWriter.WriteBatch())
}

func readBlock(t *testing.T, s *storagev2.Storage, blockCount int, wg *sync.WaitGroup, ctx context.Context) {
t.Helper()

defer wg.Done()

ticker := time.NewTicker(20 * time.Millisecond)

readCount := 1000
for i := 1; i <= readCount; i++ {
n := uint64(1 + rand.Intn(blockCount))

hn, ok := s.ReadHeadNumber()
if ok && n <= hn {
// If head number is read and chain progresed enough to contain canonical block #n
h, ok := s.ReadCanonicalHash(n)
require.True(t, ok)

_, err := s.ReadBody(n, h)
require.NoError(t, err)

_, err = s.ReadHeader(n, h)
require.NoError(t, err)

_, err = s.ReadReceipts(n, h)
require.NoError(t, err)

b, err := s.ReadBlockLookup(h)
require.NoError(t, err)

require.Equal(t, n, b)
}

select {
case <-ctx.Done():
ticker.Stop()

return
case <-ticker.C:
}
}

t.Logf("\tRead thread finished")
}

func TestStorage(t *testing.T) {
storagev2.TestStorage(t, newStorage)
}

func TestWriteFullBlock(t *testing.T) {
func TestWriteReadFullBlockInParallel(t *testing.T) {
s, _, path := newStorageP(t)
defer s.Close()

count := 100
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*45)
var wg sync.WaitGroup

blockCount := 100
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)

signchan := make(chan os.Signal, 1)
signal.Notify(signchan, syscall.SIGINT)
Expand All @@ -119,33 +188,28 @@ func TestWriteFullBlock(t *testing.T) {
}()

blockchain := make(chan *types.FullBlock, 1)
go storagev2.GenerateBlocks(t, count, blockchain, ctx)
go storagev2.GenerateBlocks(t, blockCount, blockchain, ctx)

readThreads := 3
for i := 1; i <= readThreads; i++ {
wg.Add(1)

go readBlock(t, s, blockCount, &wg, ctx)
}

insertloop:
for i := 1; i <= count; i++ {
for i := 1; i <= blockCount; i++ {
select {
case <-ctx.Done():
break insertloop
case b := <-blockchain:
batchWriter := s.NewWriter()

batchWriter.PutBody(b.Block.Number(), b.Block.Hash(), b.Block.Body())

for _, tx := range b.Block.Transactions {
batchWriter.PutTxLookup(tx.Hash(), b.Block.Number())
}

batchWriter.PutHeader(b.Block.Header)
batchWriter.PutHeadNumber(uint64(i))
batchWriter.PutHeadHash(b.Block.Header.Hash)
batchWriter.PutReceipts(b.Block.Number(), b.Block.Hash(), b.Receipts)
batchWriter.PutCanonicalHash(uint64(i), b.Block.Hash())
require.NoError(t, batchWriter.WriteBatch())

size := dirSize(t, path)
writeBlock(t, s, b)
t.Logf("writing block %d", i)
t.Logf("\tldb file count: %d", countLdbFilesInPath(path))
t.Logf("\tdir size %d MBs", size/1_000_000)
}
}

size := dirSize(t, path)
t.Logf("\tldb file count: %d", countLdbFilesInPath(path))
t.Logf("\tdir size %d MBs", size/(1*opt.MiB))
wg.Wait()
}
1 change: 1 addition & 0 deletions blockchain/storagev2/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ func generateBlock(t *testing.T, num uint64) *types.FullBlock {
b.Receipts[i].LogsBloom = types.CreateBloom(b.Receipts)
}

b.Block.Header.ComputeHash()
return b
}

Expand Down
2 changes: 1 addition & 1 deletion e2e-polybft/e2e/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestE2E_Storage(t *testing.T) {
// Send every second transaction as a dynamic fees one
var txn *types.Transaction

if i%2 == 0 { // Intentionally disable it since dynamic fee tx not working
if i%2 == 0 {
chainID, err := client.ChainID()
require.NoError(t, err)

Expand Down

0 comments on commit 1b47a22

Please sign in to comment.