Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LevelDB with 2 databases #126

Merged
merged 30 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e5cb86a
Storage V2 implementation. LevelDB with 2 databases.
oliverbundalo Feb 19, 2024
16c782b
Refactoring before Mdbx database import
oliverbundalo Feb 20, 2024
6e792e1
LevelDB 2 databases final
oliverbundalo Feb 20, 2024
50a8850
Merge branch 'develop' into feat/state-db-refactoring
oliverbundalo Feb 20, 2024
3f5b546
go.mod tidy
oliverbundalo Feb 20, 2024
45320fb
Build fix after merge
oliverbundalo Feb 20, 2024
9db3b89
lint fixes
oliverbundalo Feb 20, 2024
0530769
SkipNow for storage perf tests
oliverbundalo Feb 21, 2024
7ba3467
mdbx added into storage V2
oliverbundalo Feb 22, 2024
f359cb1
Merge branch 'develop' into feat/state-db-refactoring
oliverbundalo Feb 22, 2024
a81b56e
Return bool for some storage Read methods
oliverbundalo Feb 22, 2024
215b089
e2e storage test
oliverbundalo Feb 23, 2024
febecbb
storage replaced with storagev2
oliverbundalo Feb 23, 2024
5b62172
Merge branch 'develop' into feat/state-db-refactoring
oliverbundalo Feb 26, 2024
f2668fd
Merge fix
oliverbundalo Feb 26, 2024
fff2ad6
Reuse SendTxn from the cluster
Stefan-Ethernal Feb 26, 2024
428f425
Revert "Reuse SendTxn from the cluster"
oliverbundalo Feb 26, 2024
8609468
Remove stopwatch dependency
oliverbundalo Feb 26, 2024
1a09afd
Remove assert for Legacy tx type from storage e2e test
oliverbundalo Feb 26, 2024
89e648d
TestE2E_Storage -> use TestCluster.SendTxn
oliverbundalo Feb 26, 2024
63f046b
Added block hash into block key
oliverbundalo Feb 26, 2024
18221b5
Fixed comments for tableMapper
oliverbundalo Mar 4, 2024
5f8aa57
Minor fix (check the appropriate error reference)
Stefan-Ethernal Mar 7, 2024
40765a7
Update e2e-polybft/e2e/storage_test.go
oliverbundalo Mar 22, 2024
3fd1f94
Changes from PR, 1st part
oliverbundalo Mar 22, 2024
1c9a2d4
PR comments - completed
oliverbundalo Mar 22, 2024
416a2b0
Merge branch 'develop' into feat/state-db-refactoring
oliverbundalo Mar 22, 2024
4e6cd79
Merge
oliverbundalo Mar 22, 2024
c2dd121
gidlid cleanup & lint fixes
oliverbundalo Mar 22, 2024
096a568
Merge branch 'develop' into feat/state-db-refactoring
oliverbundalo Mar 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 52 additions & 28 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"
"sync/atomic"

"github.com/0xPolygon/polygon-edge/blockchain/storage"
"github.com/0xPolygon/polygon-edge/blockchain/storagev2"
"github.com/0xPolygon/polygon-edge/chain"
"github.com/0xPolygon/polygon-edge/helper/common"
"github.com/0xPolygon/polygon-edge/state"
Expand Down Expand Up @@ -44,7 +44,7 @@ var (
type Blockchain struct {
logger hclog.Logger // The logger object

db storage.Storage // The Storage object (database)
db *storagev2.Storage // The Storage object (database)
consensus Verifier
executor Executor
txSigner TxSigner
Expand Down Expand Up @@ -188,7 +188,7 @@ func (b *Blockchain) GetAvgGasPrice() *big.Int {
// NewBlockchain creates a new blockchain object
func NewBlockchain(
logger hclog.Logger,
db storage.Storage,
db *storagev2.Storage,
genesisConfig *chain.Chain,
consensus Verifier,
executor Executor,
Expand Down Expand Up @@ -398,7 +398,7 @@ func (b *Blockchain) writeGenesis(genesis *chain.Genesis) error {

// writeGenesisImpl writes the genesis file to the DB + blockchain reference
func (b *Blockchain) writeGenesisImpl(header *types.Header) error {
batchWriter := storage.NewBatchWriter(b.db)
batchWriter := b.db.NewWriter()

newTD := new(big.Int).SetUint64(header.Difficulty)

Expand Down Expand Up @@ -438,9 +438,14 @@ func (b *Blockchain) GetTD(hash types.Hash) (*big.Int, bool) {
return b.readTotalDifficulty(hash)
}

// GetReceiptsByHash returns the receipts by their hash
// GetReceiptsByHash returns the receipts by block hash
func (b *Blockchain) GetReceiptsByHash(hash types.Hash) ([]*types.Receipt, error) {
return b.db.ReadReceipts(hash)
n, err := b.db.ReadBlockLookup(hash)
if err != nil {
return nil, err
}

return b.db.ReadReceipts(n, hash)
}

// GetBodyByHash returns the body by their hash
Expand Down Expand Up @@ -468,7 +473,12 @@ func (b *Blockchain) readHeader(hash types.Hash) (*types.Header, bool) {
}

// Cache miss, load it from the DB
hh, err := b.db.ReadHeader(hash)
n, err := b.db.ReadBlockLookup(hash)
if err != nil {
return nil, false
}

hh, err := b.db.ReadHeader(n, hash)
if err != nil {
return nil, false
}
Expand All @@ -482,7 +492,12 @@ func (b *Blockchain) readHeader(hash types.Hash) (*types.Header, bool) {

// readBody reads the block's body, using the block hash
func (b *Blockchain) readBody(hash types.Hash) (*types.Body, bool) {
bb, err := b.db.ReadBody(hash)
n, err := b.db.ReadBlockLookup(hash)
if err != nil {
return nil, false
}

bb, err := b.db.ReadBody(n, hash)
if err != nil {
b.logger.Error("failed to read body", "err", err)

Expand All @@ -491,9 +506,9 @@ func (b *Blockchain) readBody(hash types.Hash) (*types.Body, bool) {

// To return from field in the transactions of the past blocks
if updated := b.recoverFromFieldsInTransactions(bb.Transactions); updated {
batchWriter := storage.NewBatchWriter(b.db)
batchWriter := b.db.NewWriter()

batchWriter.PutBody(hash, bb)
batchWriter.PutBody(n, hash, bb)

if err := batchWriter.WriteBatch(); err != nil {
b.logger.Warn("failed to write body into storage", "hash", hash, "err", err)
Expand All @@ -518,7 +533,12 @@ func (b *Blockchain) readTotalDifficulty(headerHash types.Hash) (*big.Int, bool)
}

// Miss, read the difficulty from the DB
dbDifficulty, ok := b.db.ReadTotalDifficulty(headerHash)
n, err := b.db.ReadBlockLookup(headerHash)
if err != nil {
return nil, false
}

dbDifficulty, ok := b.db.ReadTotalDifficulty(n, headerHash)
if !ok {
return nil, false
}
Expand Down Expand Up @@ -573,7 +593,7 @@ func (b *Blockchain) WriteHeadersWithBodies(headers []*types.Header) error {
for _, header := range headers {
event := &Event{}

batchWriter := storage.NewBatchWriter(b.db)
batchWriter := b.db.NewWriter()

isCanonical, newTD, err := b.writeHeaderImpl(batchWriter, event, header)
if err != nil {
Expand Down Expand Up @@ -811,7 +831,7 @@ func (b *Blockchain) WriteFullBlock(fblock *types.FullBlock, source string) erro

header := block.Header

batchWriter := storage.NewBatchWriter(b.db)
batchWriter := b.db.NewWriter()

if err := b.writeBody(batchWriter, block); err != nil {
return err
Expand All @@ -828,7 +848,7 @@ func (b *Blockchain) WriteFullBlock(fblock *types.FullBlock, source string) erro
// write the receipts, do it only after the header has been written.
// Otherwise, a client might ask for a header once the receipt is valid,
// but before it is written into the storage
batchWriter.PutReceipts(block.Hash(), fblock.Receipts)
batchWriter.PutReceipts(block.Number(), block.Hash(), fblock.Receipts)

// update snapshot
if err := b.consensus.ProcessHeaders([]*types.Header{header}); err != nil {
Expand Down Expand Up @@ -876,7 +896,7 @@ func (b *Blockchain) WriteBlock(block *types.Block, source string) error {

header := block.Header

batchWriter := storage.NewBatchWriter(b.db)
batchWriter := b.db.NewWriter()

if err := b.writeBody(batchWriter, block); err != nil {
return err
Expand All @@ -899,7 +919,7 @@ func (b *Blockchain) WriteBlock(block *types.Block, source string) error {
// write the receipts, do it only after the header has been written.
// Otherwise, a client might ask for a header once the receipt is valid,
// but before it is written into the storage
batchWriter.PutReceipts(block.Hash(), blockReceipts)
batchWriter.PutReceipts(block.Number(), block.Hash(), blockReceipts)

// update snapshot
if err := b.consensus.ProcessHeaders([]*types.Header{header}); err != nil {
Expand Down Expand Up @@ -990,7 +1010,7 @@ func (b *Blockchain) updateGasPriceAvgWithBlock(block *types.Block) {

// writeBody writes the block body to the DB.
// Additionally, it also updates the txn lookup, for txnHash -> block lookups
func (b *Blockchain) writeBody(batchWriter *storage.BatchWriter, block *types.Block) error {
func (b *Blockchain) writeBody(batchWriter *storagev2.Writer, block *types.Block) error {
// Recover 'from' field in tx before saving
// Because the block passed from the consensus layer doesn't have from field in tx,
// due to missing encoding in RLP
Expand All @@ -999,21 +1019,24 @@ func (b *Blockchain) writeBody(batchWriter *storage.BatchWriter, block *types.Bl
}

// Write the full body (txns + receipts)
batchWriter.PutBody(block.Header.Hash, block.Body())
batchWriter.PutBody(block.Number(), block.Hash(), block.Body())

// Write txn lookups (txHash -> block)
// Write txn lookups (txHash -> block number)
for _, txn := range block.Transactions {
batchWriter.PutTxLookup(txn.Hash(), block.Hash())
batchWriter.PutTxLookup(txn.Hash(), block.Number())
}

return nil
}

// ReadTxLookup returns the block hash using the transaction hash
func (b *Blockchain) ReadTxLookup(hash types.Hash) (types.Hash, bool) {
v, ok := b.db.ReadTxLookup(hash)
func (b *Blockchain) ReadTxLookup(hash types.Hash) (uint64, bool) {
v, err := b.db.ReadTxLookup(hash)
if err != nil {
return 0, false
}

return v, ok
return v, true
}

// recoverFromFieldsInBlock recovers 'from' fields in the transactions of the given block
Expand Down Expand Up @@ -1140,7 +1163,7 @@ func (b *Blockchain) dispatchEvent(evnt *Event) {
// writeHeaderImpl writes a block and the data, assumes the genesis is already set
// Returning parameters (is canonical header, new total difficulty, error)
func (b *Blockchain) writeHeaderImpl(
batchWriter *storage.BatchWriter, evnt *Event, header *types.Header) (bool, *big.Int, error) {
batchWriter *storagev2.Writer, evnt *Event, header *types.Header) (bool, *big.Int, error) {
// parent total difficulty of incoming header
parentTD, ok := b.readTotalDifficulty(header.ParentHash)
if !ok {
Expand Down Expand Up @@ -1188,8 +1211,9 @@ func (b *Blockchain) writeHeaderImpl(
}

batchWriter.PutHeader(header)
batchWriter.PutTotalDifficulty(header.Hash, incomingTD)
batchWriter.PutTotalDifficulty(header.Number, header.Hash, incomingTD)
batchWriter.PutForks(forks)
batchWriter.PutBlockLookup(header.Hash, header.Number)

// new block has lower difficulty, create a new fork
evnt.AddOldHeader(header)
Expand All @@ -1202,7 +1226,7 @@ func (b *Blockchain) writeHeaderImpl(
func (b *Blockchain) getForksToWrite(header *types.Header) ([]types.Hash, error) {
forks, err := b.db.ReadForks()
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
if errors.Is(err, storagev2.ErrNotFound) {
forks = []types.Hash{}
} else {
return nil, err
Expand All @@ -1222,7 +1246,7 @@ func (b *Blockchain) getForksToWrite(header *types.Header) ([]types.Hash, error)

// handleReorg handles a reorganization event
func (b *Blockchain) handleReorg(
batchWriter *storage.BatchWriter,
batchWriter *storagev2.Writer,
evnt *Event,
oldHeader *types.Header,
newHeader *types.Header,
Expand Down Expand Up @@ -1414,7 +1438,7 @@ func (b *Blockchain) calcBaseFeeDelta(gasUsedDelta, parentGasTarget, baseFee uin
}

func (b *Blockchain) writeBatchAndUpdate(
batchWriter *storage.BatchWriter,
batchWriter *storagev2.Writer,
header *types.Header,
newTD *big.Int,
isCanonnical bool) error {
Expand Down
Loading
Loading