Skip to content

Commit

Permalink
fix bloom index start height and error recover (#1419)
Browse files Browse the repository at this point in the history
  • Loading branch information
laizy authored Dec 14, 2022
1 parent ce0b99d commit ac6044f
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 139 deletions.
100 changes: 54 additions & 46 deletions core/store/ledgerstore/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type BlockStore struct {
enableCache bool //Is enable lru cache
dbDir string //The path of store file
cache *BlockCache //The cache of block, if have.
indexer bloomIndexer // Background processor generating the index data content
bloomCache map[uint32]*types2.Bloom //bloomCache for bloom index, delete cached bloom after calculating bloom index
filterStart uint32 // Start block that filter supported
store *leveldbstore.LevelDBStore //block store handler
}
Expand All @@ -64,56 +64,17 @@ func NewBlockStore(dbDir string, enableCache bool) (*BlockStore, error) {
enableCache: enableCache,
store: store,
cache: cache,
bloomCache: make(map[uint32]*types2.Bloom, 4096*2),
}

_, curBlockHeight, err := blockStore.GetCurrentBlock()
if err != nil {
if err != scom.ErrNotFound {
return nil, fmt.Errorf("get current block: %s", err.Error())
}
curBlockHeight = 0
}

indexer := NewBloomIndexer(store, curBlockHeight/BloomBitsBlocks)

start, err := indexer.GetFilterStart()
if err != nil {
if err != scom.ErrNotFound {
return nil, fmt.Errorf("get filter start: %s", err.Error())
}

var tmp uint32
if curBlockHeight < config.GetAddDecimalsHeight() {
tmp = config.GetAddDecimalsHeight()
} else {
tmp = curBlockHeight
}
err = indexer.PutFilterStart(tmp)
if err != nil {
return nil, fmt.Errorf("put filter start: %s", err.Error())
}
start = tmp
}

blockStore.indexer = indexer
blockStore.filterStart = start

return blockStore, nil
}

func (this *BlockStore) PutFilterStart(height uint32) error {
return this.indexer.PutFilterStart(height)
}

//NewBatch start a commit batch
func (this *BlockStore) NewBatch() {
this.store.NewBatch()
}

func (this *BlockStore) GetIndexer() bloomIndexer {
return this.indexer
}

//SaveBlock persist block to store
func (this *BlockStore) SaveBlock(block *types.Block) error {
if this.enableCache {
Expand Down Expand Up @@ -400,16 +361,28 @@ func (this *BlockStore) SaveBlockHash(height uint32, blockHash common.Uint256) {

//SaveBloomData persist block bloom data to store
func (this *BlockStore) SaveBloomData(height uint32, bloom types2.Bloom) {
this.Process(height, bloom)
if height != 0 && height%BloomBitsBlocks == 0 {
this.indexer.BatchPut()
if height < this.filterStart {
return
}
key := this.genBloomKey(height)
this.store.BatchPut(key, bloom.Bytes())
this.bloomCache[height] = &bloom
this.cleanStaleBloomData(height)

if (height+1)%BloomBitsBlocks == 0 {
var blooms []types2.Bloom
for i := 0; i < BloomBitsBlocks; i++ {
blooms = append(blooms, *this.bloomCache[height+uint32(i)+1-BloomBitsBlocks])
}
section := height / BloomBitsBlocks
PutBloomIndex(this.store, blooms, section)
}
}

func (this *BlockStore) Process(height uint32, bloom types2.Bloom) {
this.indexer.Process(height, bloom)
func (this *BlockStore) cleanStaleBloomData(curHeight uint32) {
if curHeight > BloomBitsBlocks*2 {
delete(this.bloomCache, curHeight-BloomBitsBlocks*2)
}
}

//GetBloomData return bloom data by block height
Expand Down Expand Up @@ -638,3 +611,38 @@ func (this *BlockStore) PruneBlock(hash common.Uint256) []common.Uint256 {
this.store.BatchDelete(key)
return txHashes
}

func (this *BlockStore) LoadBloomBits() error {
_, curBlockHeight, err := this.GetCurrentBlock()
if err != nil {
if err != scom.ErrNotFound {
return fmt.Errorf("get current block: %s", err.Error())
}
curBlockHeight = 0
}

initStart := (curBlockHeight + 4095) / 4096
if curBlockHeight < config.GetAddDecimalsHeight() {
initStart = config.GetAddDecimalsHeight() / 4096 * 4096
}

start, err := GetOrSetFilterStart(this.store, initStart)
if err != nil {
return err
}
this.filterStart = start

if curBlockHeight < this.filterStart {
return nil
}

loadStart := curBlockHeight - curBlockHeight%BloomBitsBlocks
for i := loadStart; i <= curBlockHeight; i++ {
bloom, err := this.GetBloomData(i)
if err != nil {
return fmt.Errorf("LoadBloom error %s", err)
}
this.bloomCache[i] = &bloom
}
return nil
}
85 changes: 33 additions & 52 deletions core/store/ledgerstore/bloombits.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package ledgerstore

import (
"encoding/binary"
"fmt"
"io"
"time"

"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core/bloombits"
Expand All @@ -30,82 +30,63 @@ import (
"github.com/ontio/ontology/core/store/leveldbstore"
)

const (
// bloomServiceThreads is the number of goroutines used globally by an Ethereum
// instance to service bloombits lookups for all running filters.
BloomServiceThreads = 16

// bloomFilterThreads is the number of goroutines used locally per filter to
// multiplex requests onto the global servicing goroutines.
BloomFilterThreads = 3

// bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
// in a single batch.
BloomRetrievalBatch = 16

// bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
// to accumulate request an entire batch (avoiding hysteresis).
BloomRetrievalWait = time.Duration(0)

// BloomBitsBlocks is the number of blocks a single bloom bit section vector
// contains on the server side.
BloomBitsBlocks uint32 = 4096
)
// BloomBitsBlocks is the number of blocks a single bloom bit section vector
// contains on the server side.
const BloomBitsBlocks = 4096

var (
bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint32 big endian) + hash -> bloom bits
)

// bloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
// for the Ethereum header bloom filters, permitting blazing fast filtering.
type bloomIndexer struct {
store *leveldbstore.LevelDBStore // database instance to write index data and metadata into
gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
section uint32 // Section is the section number being processed currently
}

func NewBloomIndexer(store *leveldbstore.LevelDBStore, section uint32) bloomIndexer {
func PutBloomIndex(store *leveldbstore.LevelDBStore, blooms []types.Bloom, section uint32) {
gen, err := bloombits.NewGenerator(uint(BloomBitsBlocks))
if err != nil {
panic(err) // never fired since BloomBitsBlocks is multiple of 8
}
b := bloomIndexer{
store: store,
for i, b := range blooms {
err := gen.AddBloom(uint(i), b)
if err != nil {
panic(err) // never fired
}
}
b.gen, b.section = gen, section
return b
}

// Process implements core.ChainIndexerBackend, adding a new header's bloom into
// the index.
func (b *bloomIndexer) Process(height uint32, bloom types.Bloom) {
b.gen.AddBloom(uint(height-b.section*BloomBitsBlocks), bloom)
}

// BatchPut implements core.ChainIndexerBackend, finalizing the bloom section and
// writing it out into the database.
func (b *bloomIndexer) BatchPut() {
for i := 0; i < types.BloomBitLength; i++ {
bits, err := b.gen.Bitset(uint(i))
bits, err := gen.Bitset(uint(i))
if err != nil {
panic(err) // never fired since idx is always less than 8 and section should be right
}
value := bitutil.CompressBytes(bits)
b.store.BatchPut(bloomBitsKey(uint(i), b.section), value)
store.BatchPut(bloomBitsKey(uint(i), section), value)
}
b.section++
}

func (this *bloomIndexer) PutFilterStart(height uint32) error {
func PutFilterStart(db *leveldbstore.LevelDBStore, height uint32) error {
key := genFilterStartKey()
sink := common2.NewZeroCopySink(nil)
sink.WriteUint32(height)
return this.store.Put(key, sink.Bytes())
return db.Put(key, sink.Bytes())
}

func GetOrSetFilterStart(db *leveldbstore.LevelDBStore, def uint32) (uint32, error) {
start, err := GetFilterStart(db)
if err != nil {
if err != scom.ErrNotFound {
return 0, fmt.Errorf("get filter start: %s", err.Error())
}

err = PutFilterStart(db, def)
if err != nil {
return 0, fmt.Errorf("put filter start: %s", err.Error())
}
start = def
}

return start, nil
}

func (this *bloomIndexer) GetFilterStart() (uint32, error) {
func GetFilterStart(db *leveldbstore.LevelDBStore) (uint32, error) {
key := genFilterStartKey()
data, err := this.store.Get(key)
data, err := db.Get(key)
if err != nil {
return 0, err
}
Expand Down
40 changes: 2 additions & 38 deletions core/store/ledgerstore/ledger_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ type LedgerStoreImp struct {
currBlockHeight uint32 //Current block height
currBlockHash common.Uint256 //Current block hash
headerCache map[common.Uint256]*types.Header //BlockHash => Header
bloomCache map[uint32]*types3.Bloom //bloomCache for bloom index, delete cached bloom after calculating bloom index
headerIndex map[uint32]common.Uint256 //Header index, Mapping header height => block hash
vbftPeerInfoMap map[uint32]map[string]uint32 //key:block height,value:peerInfo
lock sync.RWMutex
Expand All @@ -112,7 +111,6 @@ func NewLedgerStore(dataDir string, stateHashHeight uint32) (*LedgerStoreImp, er
ledgerStore := &LedgerStoreImp{
headerIndex: make(map[uint32]common.Uint256),
headerCache: make(map[common.Uint256]*types.Header, 0),
bloomCache: make(map[uint32]*types3.Bloom, 4096),
vbftPeerInfoMap: make(map[uint32]map[string]uint32),
savingBlockSemaphore: make(chan bool, 1),
stateHashCheckHeight: stateHashHeight,
Expand Down Expand Up @@ -158,10 +156,6 @@ func (this *LedgerStoreImp) InitLedgerStoreWithGenesisBlock(genesisBlock *types.
if err != nil {
return fmt.Errorf("blockStore.ClearAll error %s", err)
}
err = this.blockStore.PutFilterStart(0)
if err != nil {
return fmt.Errorf("blockStore.PutFilterStart error %s", err)
}
err = this.stateStore.ClearAll()
if err != nil {
return fmt.Errorf("stateStore.ClearAll error %s", err)
Expand Down Expand Up @@ -280,41 +274,13 @@ func (this *LedgerStoreImp) init() error {
if err != nil {
return fmt.Errorf("recoverStore error %s", err)
}
err = this.loadBloomBits()
err = this.blockStore.LoadBloomBits()
if err != nil {
return fmt.Errorf("loadBloomBits error %s", err)
}
return nil
}

func (this *LedgerStoreImp) loadBloomBits() error {
_, currentBlockHeight, err := this.blockStore.GetCurrentBlock()
if err != nil {
if err != scom.ErrNotFound {
return fmt.Errorf("LoadCurrentBlock error %s", err)
}
return nil
}

if currentBlockHeight < this.blockStore.filterStart {
return nil
}

start := currentBlockHeight - currentBlockHeight%BloomBitsBlocks
if start < this.blockStore.filterStart {
start = this.blockStore.filterStart
}

for i := start; i <= currentBlockHeight; i++ {
bloom, err := this.blockStore.GetBloomData(i)
if err != nil {
return fmt.Errorf("LoadBloom error %s", err)
}
this.blockStore.Process(i, bloom)
}
return nil
}

func (this *LedgerStoreImp) loadCurrentBlock() error {
currentBlockHash, currentBlockHeight, err := this.blockStore.GetCurrentBlock()
if err != nil {
Expand Down Expand Up @@ -786,9 +752,7 @@ func (this *LedgerStoreImp) saveBlockToBlockStore(block *types.Block, bloom type
if err != nil {
return fmt.Errorf("SaveBlock height %d hash %s error %s", blockHeight, blockHash.ToHexString(), err)
}
if blockHeight >= config.GetAddDecimalsHeight() {
this.blockStore.SaveBloomData(blockHeight, bloom)
}
this.blockStore.SaveBloomData(blockHeight, bloom)
return nil
}

Expand Down
25 changes: 22 additions & 3 deletions http/ethrpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package backend

import (
"context"
"time"

"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core/bloombits"
Expand All @@ -28,6 +29,24 @@ import (
"github.com/ontio/ontology/http/base/actor"
)

const (
// bloomServiceThreads is the number of goroutines used globally by an Ethereum
// instance to service bloombits lookups for all running filters.
BloomServiceThreads = 16

// bloomFilterThreads is the number of goroutines used locally per filter to
// multiplex requests onto the global servicing goroutines.
BloomFilterThreads = 3

// bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
// in a single batch.
BloomRetrievalBatch = 16

// bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
// to accumulate request an entire batch (avoiding hysteresis).
BloomRetrievalWait = time.Duration(0)
)

type BloomBackend struct {
bloomRequests chan chan *bloombits.Retrieval
closeBloomHandler chan struct{}
Expand All @@ -46,8 +65,8 @@ func (b *BloomBackend) Close() {
}

func (b *BloomBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
for i := 0; i < ledgerstore.BloomFilterThreads; i++ {
go session.Multiplex(ledgerstore.BloomRetrievalBatch, ledgerstore.BloomRetrievalWait, b.bloomRequests)
for i := 0; i < BloomFilterThreads; i++ {
go session.Multiplex(BloomRetrievalBatch, BloomRetrievalWait, b.bloomRequests)
}
}

Expand All @@ -58,7 +77,7 @@ func (b *BloomBackend) BloomStatus() (uint32, uint32) {
// startBloomHandlers starts a batch of goroutines to accept bloom bit database
// retrievals from possibly a range of filters and serving the data to satisfy.
func (b *BloomBackend) StartBloomHandlers(sectionSize uint32, db *leveldbstore.LevelDBStore) error {
for i := 0; i < ledgerstore.BloomServiceThreads; i++ {
for i := 0; i < BloomServiceThreads; i++ {
go func() {
for {
select {
Expand Down

0 comments on commit ac6044f

Please sign in to comment.