Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bloom index start height and error recover #1419

Merged
merged 2 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 54 additions & 46 deletions core/store/ledgerstore/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type BlockStore struct {
enableCache bool //Is enable lru cache
dbDir string //The path of store file
cache *BlockCache //The cache of block, if have.
indexer bloomIndexer // Background processor generating the index data content
bloomCache map[uint32]*types2.Bloom //bloomCache for bloom index, delete cached bloom after calculating bloom index
filterStart uint32 // Start block that filter supported
store *leveldbstore.LevelDBStore //block store handler
}
Expand All @@ -64,56 +64,17 @@ func NewBlockStore(dbDir string, enableCache bool) (*BlockStore, error) {
enableCache: enableCache,
store: store,
cache: cache,
bloomCache: make(map[uint32]*types2.Bloom, 4096*2),
}

_, curBlockHeight, err := blockStore.GetCurrentBlock()
if err != nil {
if err != scom.ErrNotFound {
return nil, fmt.Errorf("get current block: %s", err.Error())
}
curBlockHeight = 0
}

indexer := NewBloomIndexer(store, curBlockHeight/BloomBitsBlocks)

start, err := indexer.GetFilterStart()
if err != nil {
if err != scom.ErrNotFound {
return nil, fmt.Errorf("get filter start: %s", err.Error())
}

var tmp uint32
if curBlockHeight < config.GetAddDecimalsHeight() {
tmp = config.GetAddDecimalsHeight()
} else {
tmp = curBlockHeight
}
err = indexer.PutFilterStart(tmp)
if err != nil {
return nil, fmt.Errorf("put filter start: %s", err.Error())
}
start = tmp
}

blockStore.indexer = indexer
blockStore.filterStart = start

return blockStore, nil
}

func (this *BlockStore) PutFilterStart(height uint32) error {
return this.indexer.PutFilterStart(height)
}

//NewBatch start a commit batch
func (this *BlockStore) NewBatch() {
this.store.NewBatch()
}

func (this *BlockStore) GetIndexer() bloomIndexer {
return this.indexer
}

//SaveBlock persist block to store
func (this *BlockStore) SaveBlock(block *types.Block) error {
if this.enableCache {
Expand Down Expand Up @@ -400,16 +361,28 @@ func (this *BlockStore) SaveBlockHash(height uint32, blockHash common.Uint256) {

//SaveBloomData persist block bloom data to store
func (this *BlockStore) SaveBloomData(height uint32, bloom types2.Bloom) {
this.Process(height, bloom)
if height != 0 && height%BloomBitsBlocks == 0 {
this.indexer.BatchPut()
if height < this.filterStart {
return
}
key := this.genBloomKey(height)
this.store.BatchPut(key, bloom.Bytes())
this.bloomCache[height] = &bloom
this.cleanStaleBloomData(height)

if (height+1)%BloomBitsBlocks == 0 {
var blooms []types2.Bloom
for i := 0; i < BloomBitsBlocks; i++ {
blooms = append(blooms, *this.bloomCache[height+uint32(i)+1-BloomBitsBlocks])
}
section := height / BloomBitsBlocks
PutBloomIndex(this.store, blooms, section)
}
}

func (this *BlockStore) Process(height uint32, bloom types2.Bloom) {
this.indexer.Process(height, bloom)
func (this *BlockStore) cleanStaleBloomData(curHeight uint32) {
if curHeight > BloomBitsBlocks*2 {
delete(this.bloomCache, curHeight-BloomBitsBlocks*2)
}
}

//GetBloomData return bloom data by block height
Expand Down Expand Up @@ -638,3 +611,38 @@ func (this *BlockStore) PruneBlock(hash common.Uint256) []common.Uint256 {
this.store.BatchDelete(key)
return txHashes
}

func (this *BlockStore) LoadBloomBits() error {
_, curBlockHeight, err := this.GetCurrentBlock()
if err != nil {
if err != scom.ErrNotFound {
return fmt.Errorf("get current block: %s", err.Error())
}
curBlockHeight = 0
}

initStart := (curBlockHeight + 4095) / 4096
if curBlockHeight < config.GetAddDecimalsHeight() {
initStart = config.GetAddDecimalsHeight() / 4096 * 4096
}

start, err := GetOrSetFilterStart(this.store, initStart)
if err != nil {
return err
}
this.filterStart = start

if curBlockHeight < this.filterStart {
return nil
}

loadStart := curBlockHeight - curBlockHeight%BloomBitsBlocks
for i := loadStart; i <= curBlockHeight; i++ {
bloom, err := this.GetBloomData(i)
if err != nil {
return fmt.Errorf("LoadBloom error %s", err)
}
this.bloomCache[i] = &bloom
}
return nil
}
85 changes: 33 additions & 52 deletions core/store/ledgerstore/bloombits.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package ledgerstore

import (
"encoding/binary"
"fmt"
"io"
"time"

"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core/bloombits"
Expand All @@ -30,82 +30,63 @@ import (
"github.com/ontio/ontology/core/store/leveldbstore"
)

const (
// bloomServiceThreads is the number of goroutines used globally by an Ethereum
// instance to service bloombits lookups for all running filters.
BloomServiceThreads = 16

// bloomFilterThreads is the number of goroutines used locally per filter to
// multiplex requests onto the global servicing goroutines.
BloomFilterThreads = 3

// bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
// in a single batch.
BloomRetrievalBatch = 16

// bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
// to accumulate request an entire batch (avoiding hysteresis).
BloomRetrievalWait = time.Duration(0)

// BloomBitsBlocks is the number of blocks a single bloom bit section vector
// contains on the server side.
BloomBitsBlocks uint32 = 4096
)
// BloomBitsBlocks is the number of blocks a single bloom bit section vector
// contains on the server side.
const BloomBitsBlocks = 4096

var (
bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint32 big endian) + hash -> bloom bits
)

// bloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
// for the Ethereum header bloom filters, permitting blazing fast filtering.
type bloomIndexer struct {
store *leveldbstore.LevelDBStore // database instance to write index data and metadata into
gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
section uint32 // Section is the section number being processed currently
}

func NewBloomIndexer(store *leveldbstore.LevelDBStore, section uint32) bloomIndexer {
func PutBloomIndex(store *leveldbstore.LevelDBStore, blooms []types.Bloom, section uint32) {
gen, err := bloombits.NewGenerator(uint(BloomBitsBlocks))
if err != nil {
panic(err) // never fired since BloomBitsBlocks is multiple of 8
}
b := bloomIndexer{
store: store,
for i, b := range blooms {
err := gen.AddBloom(uint(i), b)
if err != nil {
panic(err) // never fired
}
}
b.gen, b.section = gen, section
return b
}

// Process implements core.ChainIndexerBackend, adding a new header's bloom into
// the index.
func (b *bloomIndexer) Process(height uint32, bloom types.Bloom) {
b.gen.AddBloom(uint(height-b.section*BloomBitsBlocks), bloom)
}

// BatchPut implements core.ChainIndexerBackend, finalizing the bloom section and
// writing it out into the database.
func (b *bloomIndexer) BatchPut() {
for i := 0; i < types.BloomBitLength; i++ {
bits, err := b.gen.Bitset(uint(i))
bits, err := gen.Bitset(uint(i))
if err != nil {
panic(err) // never fired since idx is always less than 8 and section should be right
}
value := bitutil.CompressBytes(bits)
b.store.BatchPut(bloomBitsKey(uint(i), b.section), value)
store.BatchPut(bloomBitsKey(uint(i), section), value)
}
b.section++
}

func (this *bloomIndexer) PutFilterStart(height uint32) error {
func PutFilterStart(db *leveldbstore.LevelDBStore, height uint32) error {
key := genFilterStartKey()
sink := common2.NewZeroCopySink(nil)
sink.WriteUint32(height)
return this.store.Put(key, sink.Bytes())
return db.Put(key, sink.Bytes())
}

func GetOrSetFilterStart(db *leveldbstore.LevelDBStore, def uint32) (uint32, error) {
start, err := GetFilterStart(db)
if err != nil {
if err != scom.ErrNotFound {
return 0, fmt.Errorf("get filter start: %s", err.Error())
}

err = PutFilterStart(db, def)
if err != nil {
return 0, fmt.Errorf("put filter start: %s", err.Error())
}
start = def
}

return start, nil
}

func (this *bloomIndexer) GetFilterStart() (uint32, error) {
func GetFilterStart(db *leveldbstore.LevelDBStore) (uint32, error) {
key := genFilterStartKey()
data, err := this.store.Get(key)
data, err := db.Get(key)
if err != nil {
return 0, err
}
Expand Down
40 changes: 2 additions & 38 deletions core/store/ledgerstore/ledger_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ type LedgerStoreImp struct {
currBlockHeight uint32 //Current block height
currBlockHash common.Uint256 //Current block hash
headerCache map[common.Uint256]*types.Header //BlockHash => Header
bloomCache map[uint32]*types3.Bloom //bloomCache for bloom index, delete cached bloom after calculating bloom index
headerIndex map[uint32]common.Uint256 //Header index, Mapping header height => block hash
vbftPeerInfoMap map[uint32]map[string]uint32 //key:block height,value:peerInfo
lock sync.RWMutex
Expand All @@ -112,7 +111,6 @@ func NewLedgerStore(dataDir string, stateHashHeight uint32) (*LedgerStoreImp, er
ledgerStore := &LedgerStoreImp{
headerIndex: make(map[uint32]common.Uint256),
headerCache: make(map[common.Uint256]*types.Header, 0),
bloomCache: make(map[uint32]*types3.Bloom, 4096),
vbftPeerInfoMap: make(map[uint32]map[string]uint32),
savingBlockSemaphore: make(chan bool, 1),
stateHashCheckHeight: stateHashHeight,
Expand Down Expand Up @@ -158,10 +156,6 @@ func (this *LedgerStoreImp) InitLedgerStoreWithGenesisBlock(genesisBlock *types.
if err != nil {
return fmt.Errorf("blockStore.ClearAll error %s", err)
}
err = this.blockStore.PutFilterStart(0)
if err != nil {
return fmt.Errorf("blockStore.PutFilterStart error %s", err)
}
err = this.stateStore.ClearAll()
if err != nil {
return fmt.Errorf("stateStore.ClearAll error %s", err)
Expand Down Expand Up @@ -280,41 +274,13 @@ func (this *LedgerStoreImp) init() error {
if err != nil {
return fmt.Errorf("recoverStore error %s", err)
}
err = this.loadBloomBits()
err = this.blockStore.LoadBloomBits()
if err != nil {
return fmt.Errorf("loadBloomBits error %s", err)
}
return nil
}

func (this *LedgerStoreImp) loadBloomBits() error {
_, currentBlockHeight, err := this.blockStore.GetCurrentBlock()
if err != nil {
if err != scom.ErrNotFound {
return fmt.Errorf("LoadCurrentBlock error %s", err)
}
return nil
}

if currentBlockHeight < this.blockStore.filterStart {
return nil
}

start := currentBlockHeight - currentBlockHeight%BloomBitsBlocks
if start < this.blockStore.filterStart {
start = this.blockStore.filterStart
}

for i := start; i <= currentBlockHeight; i++ {
bloom, err := this.blockStore.GetBloomData(i)
if err != nil {
return fmt.Errorf("LoadBloom error %s", err)
}
this.blockStore.Process(i, bloom)
}
return nil
}

func (this *LedgerStoreImp) loadCurrentBlock() error {
currentBlockHash, currentBlockHeight, err := this.blockStore.GetCurrentBlock()
if err != nil {
Expand Down Expand Up @@ -786,9 +752,7 @@ func (this *LedgerStoreImp) saveBlockToBlockStore(block *types.Block, bloom type
if err != nil {
return fmt.Errorf("SaveBlock height %d hash %s error %s", blockHeight, blockHash.ToHexString(), err)
}
if blockHeight >= config.GetAddDecimalsHeight() {
this.blockStore.SaveBloomData(blockHeight, bloom)
}
this.blockStore.SaveBloomData(blockHeight, bloom)
return nil
}

Expand Down
25 changes: 22 additions & 3 deletions http/ethrpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package backend

import (
"context"
"time"

"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core/bloombits"
Expand All @@ -28,6 +29,24 @@ import (
"github.com/ontio/ontology/http/base/actor"
)

const (
// bloomServiceThreads is the number of goroutines used globally by an Ethereum
// instance to service bloombits lookups for all running filters.
BloomServiceThreads = 16

// bloomFilterThreads is the number of goroutines used locally per filter to
// multiplex requests onto the global servicing goroutines.
BloomFilterThreads = 3

// bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
// in a single batch.
BloomRetrievalBatch = 16

// bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
// to accumulate request an entire batch (avoiding hysteresis).
BloomRetrievalWait = time.Duration(0)
)

type BloomBackend struct {
bloomRequests chan chan *bloombits.Retrieval
closeBloomHandler chan struct{}
Expand All @@ -46,8 +65,8 @@ func (b *BloomBackend) Close() {
}

func (b *BloomBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
for i := 0; i < ledgerstore.BloomFilterThreads; i++ {
go session.Multiplex(ledgerstore.BloomRetrievalBatch, ledgerstore.BloomRetrievalWait, b.bloomRequests)
for i := 0; i < BloomFilterThreads; i++ {
go session.Multiplex(BloomRetrievalBatch, BloomRetrievalWait, b.bloomRequests)
}
}

Expand All @@ -58,7 +77,7 @@ func (b *BloomBackend) BloomStatus() (uint32, uint32) {
// startBloomHandlers starts a batch of goroutines to accept bloom bit database
// retrievals from possibly a range of filters and serving the data to satisfy.
func (b *BloomBackend) StartBloomHandlers(sectionSize uint32, db *leveldbstore.LevelDBStore) error {
for i := 0; i < ledgerstore.BloomServiceThreads; i++ {
for i := 0; i < BloomServiceThreads; i++ {
go func() {
for {
select {
Expand Down