diff --git a/blockchain/storagev2/leveldb/leveldb_perf_test.go b/blockchain/storagev2/leveldb/leveldb_perf_test.go index 337075f0c6..b05fd58be6 100644 --- a/blockchain/storagev2/leveldb/leveldb_perf_test.go +++ b/blockchain/storagev2/leveldb/leveldb_perf_test.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/syndtr/goleveldb/leveldb/opt" ) func openStorage(t *testing.T, p string) (*storagev2.Storage, func(), string) { @@ -82,7 +83,7 @@ func TestWriteBlockPerf(t *testing.T) { time.Sleep(time.Second) size := dbSize(t, path) - t.Logf("\tdb size %d MB", size/(1024*1024)) + t.Logf("\tdb size %d MB", size/(1*opt.MiB)) t.Logf("\ttotal WriteBatch %d ms", watchTime) } diff --git a/blockchain/storagev2/leveldb/leveldb_test.go b/blockchain/storagev2/leveldb/leveldb_test.go index d3d4dd8c3c..cb180a5212 100644 --- a/blockchain/storagev2/leveldb/leveldb_test.go +++ b/blockchain/storagev2/leveldb/leveldb_test.go @@ -2,9 +2,11 @@ package leveldb import ( "context" + "math/rand" "os" "os/signal" "path/filepath" + "sync" "syscall" "testing" "time" @@ -13,6 +15,7 @@ import ( "github.com/0xPolygon/polygon-edge/types" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" + "github.com/syndtr/goleveldb/leveldb/opt" ) func newStorage(t *testing.T) (*storagev2.Storage, func()) { @@ -99,16 +102,82 @@ func dirSize(t *testing.T, path string) int64 { return size } +func writeBlock(t *testing.T, s *storagev2.Storage, b *types.FullBlock) { + t.Helper() + + batchWriter := s.NewWriter() + + batchWriter.PutBody(b.Block.Number(), b.Block.Hash(), b.Block.Body()) + + for _, tx := range b.Block.Transactions { + batchWriter.PutTxLookup(tx.Hash(), b.Block.Number()) + } + + batchWriter.PutHeadHash(b.Block.Header.Hash) + batchWriter.PutHeadNumber(b.Block.Number()) + batchWriter.PutBlockLookup(b.Block.Hash(), b.Block.Number()) + batchWriter.PutHeader(b.Block.Header) + batchWriter.PutReceipts(b.Block.Number(), b.Block.Hash(), b.Receipts) + batchWriter.PutCanonicalHash(b.Block.Number(), b.Block.Hash()) + require.NoError(t, batchWriter.WriteBatch()) +} + +func readBlock(t *testing.T, s *storagev2.Storage, blockCount int, wg *sync.WaitGroup, ctx context.Context) { + t.Helper() + + defer wg.Done() + + ticker := time.NewTicker(20 * time.Millisecond) + + readCount := 1000 + for i := 1; i <= readCount; i++ { + n := uint64(1 + rand.Intn(blockCount)) + + hn, ok := s.ReadHeadNumber() + if ok && n <= hn { + // If head number is read and chain progresed enough to contain canonical block #n + h, ok := s.ReadCanonicalHash(n) + require.True(t, ok) + + _, err := s.ReadBody(n, h) + require.NoError(t, err) + + _, err = s.ReadHeader(n, h) + require.NoError(t, err) + + _, err = s.ReadReceipts(n, h) + require.NoError(t, err) + + b, err := s.ReadBlockLookup(h) + require.NoError(t, err) + + require.Equal(t, n, b) + } + + select { + case <-ctx.Done(): + ticker.Stop() + + return + case <-ticker.C: + } + } + + t.Logf("\tRead thread finished") +} + func TestStorage(t *testing.T) { storagev2.TestStorage(t, newStorage) } -func TestWriteFullBlock(t *testing.T) { +func TestWriteReadFullBlockInParallel(t *testing.T) { s, _, path := newStorageP(t) defer s.Close() - count := 100 - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*45) + var wg sync.WaitGroup + + blockCount := 100 + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) signchan := make(chan os.Signal, 1) signal.Notify(signchan, syscall.SIGINT) @@ -119,33 +188,28 @@ func TestWriteFullBlock(t *testing.T) { }() blockchain := make(chan *types.FullBlock, 1) - go storagev2.GenerateBlocks(t, count, blockchain, ctx) + go storagev2.GenerateBlocks(t, blockCount, blockchain, ctx) + + readThreads := 3 + for i := 1; i <= readThreads; i++ { + wg.Add(1) + + go readBlock(t, s, blockCount, &wg, ctx) + } insertloop: - for i := 1; i <= count; i++ { + for i := 1; i <= blockCount; i++ { select { case <-ctx.Done(): break insertloop case b := <-blockchain: - batchWriter := s.NewWriter() - - batchWriter.PutBody(b.Block.Number(), b.Block.Hash(), b.Block.Body()) - - for _, tx := range b.Block.Transactions { - batchWriter.PutTxLookup(tx.Hash(), b.Block.Number()) - } - - batchWriter.PutHeader(b.Block.Header) - batchWriter.PutHeadNumber(uint64(i)) - batchWriter.PutHeadHash(b.Block.Header.Hash) - batchWriter.PutReceipts(b.Block.Number(), b.Block.Hash(), b.Receipts) - batchWriter.PutCanonicalHash(uint64(i), b.Block.Hash()) - require.NoError(t, batchWriter.WriteBatch()) - - size := dirSize(t, path) + writeBlock(t, s, b) t.Logf("writing block %d", i) - t.Logf("\tldb file count: %d", countLdbFilesInPath(path)) - t.Logf("\tdir size %d MBs", size/1_000_000) } } + + size := dirSize(t, path) + t.Logf("\tldb file count: %d", countLdbFilesInPath(path)) + t.Logf("\tdir size %d MBs", size/(1*opt.MiB)) + wg.Wait() } diff --git a/blockchain/storagev2/testing.go b/blockchain/storagev2/testing.go index d98dd0454f..2ce4cef19f 100644 --- a/blockchain/storagev2/testing.go +++ b/blockchain/storagev2/testing.go @@ -512,6 +512,7 @@ func generateBlock(t *testing.T, num uint64) *types.FullBlock { b.Receipts[i].LogsBloom = types.CreateBloom(b.Receipts) } + b.Block.Header.ComputeHash() return b } diff --git a/e2e-polybft/e2e/storage_test.go b/e2e-polybft/e2e/storage_test.go index 689af5e7af..a031fd9a75 100644 --- a/e2e-polybft/e2e/storage_test.go +++ b/e2e-polybft/e2e/storage_test.go @@ -50,7 +50,7 @@ func TestE2E_Storage(t *testing.T) { // Send every second transaction as a dynamic fees one var txn *types.Transaction - if i%2 == 0 { // Intentionally disable it since dynamic fee tx not working + if i%2 == 0 { chainID, err := client.ChainID() require.NoError(t, err)