Skip to content

Commit

Permalink
support bloom filter (#1407)
Browse files Browse the repository at this point in the history
* clear code

fix bug

fix publish bug

add unit test

fix

fix

fix

fix bloom

add chain event

fix bug & add forking settings

impl uninstallFilter

fix bug

fix bug

add timeoutLoop

fix

fix

fix

add eth event

clear code

clear code

update

add subscribe

move save block bloom

move save block bloom

fmt code

get block logs

fmt code

init index store

fmt code

impl get filters

fix annotation

fix

add api interface to impl

get bloom status

fix close database

add bloombits store

* fmt code

* add license

* add license

* add license

* fmt code

* fmt code

* fmt code

* fix bug

* fmt code

* fmt code

* clean up

* Update http/ethrpc/filters/filter_system.go

* Update http/ethrpc/filters/filter_system.go

* tmp commit

* fix bug

* fix

* fix

* fix

* break import cycle

* fix

* add max search span

* rm filter height

* fix

* fix

* delete unused channel

* add bloom cache

* fix

* add annotation

* add lock

* fix

* fix

* fix

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix bug

* fix

* fix

* fix bug

* delete unused code

* fix

* use block db to store bloom index

* refactor code

* remove filter start field

* fix

* fix

* fix

* fix

* add annotation

Co-authored-by: laizy <aochyi@126.com>
  • Loading branch information
hero5512 and laizy authored Nov 2, 2022
1 parent 10750e9 commit 91b7841
Show file tree
Hide file tree
Showing 24 changed files with 2,001 additions and 47 deletions.
2 changes: 1 addition & 1 deletion common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ const (
DEFAULT_WASM_GAS_FACTOR = uint64(10)
DEFAULT_WASM_MAX_STEPCOUNT = uint64(8000000)

DEFAULT_DATA_DIR = "./Chain/"
DEFAULT_DATA_DIR = "./Chain"
DEFAULT_RESERVED_FILE = "./peers.rsv"

//DEFAULT_ETH_BLOCK_GAS_LIMIT = 800000000
Expand Down
6 changes: 4 additions & 2 deletions core/store/common/data_entry_prefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
DATA_HEADER = 0x01 //Block hash => block header+txhashes key prefix
DATA_TRANSACTION = 0x02 //Transction hash => transaction key prefix
DATA_STATE_MERKLE_ROOT = 0x21 // block height => write set hash + state merkle root
DATA_BLOOM = 0x23 // block height => block bloom data

// Transaction
ST_BOOKKEEPER DataEntryPrefix = 0x03 //BookKeeper state key prefix
Expand All @@ -35,8 +36,9 @@ const (
ST_DESTROYED DataEntryPrefix = 0x06 // record destroyed smart contract: prefix+address -> height

// eth state
ST_ETH_CODE DataEntryPrefix = 0x30 // eth contract code:hash -> bytes
ST_ETH_ACCOUNT DataEntryPrefix = 0x31 // eth account: address -> [nonce, codeHash]
ST_ETH_CODE DataEntryPrefix = 0x30 // eth contract code:hash -> bytes
ST_ETH_ACCOUNT DataEntryPrefix = 0x31 // eth account: address -> [nonce, codeHash]
ST_ETH_FILTER_START DataEntryPrefix = 0x32 // support eth filter height

IX_HEADER_HASH_LIST DataEntryPrefix = 0x09 //Block height => block hash key prefix

Expand Down
84 changes: 84 additions & 0 deletions core/store/ledgerstore/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"fmt"
"io"

types2 "github.com/ethereum/go-ethereum/core/types"
"github.com/ontio/ontology/common"
"github.com/ontio/ontology/common/config"
"github.com/ontio/ontology/common/serialization"
scom "github.com/ontio/ontology/core/store/common"
"github.com/ontio/ontology/core/store/leveldbstore"
Expand All @@ -36,6 +38,8 @@ type BlockStore struct {
enableCache bool //Is enable lru cache
dbDir string //The path of store file
cache *BlockCache //The cache of block, if have.
indexer bloomIndexer // Background processor generating the index data content
filterStart uint32 // Start block that filter supported
store *leveldbstore.LevelDBStore //block store handler
}

Expand All @@ -54,20 +58,62 @@ func NewBlockStore(dbDir string, enableCache bool) (*BlockStore, error) {
if err != nil {
return nil, err
}

blockStore := &BlockStore{
dbDir: dbDir,
enableCache: enableCache,
store: store,
cache: cache,
}

_, curBlockHeight, err := blockStore.GetCurrentBlock()
if err != nil {
if err != scom.ErrNotFound {
return nil, fmt.Errorf("get current block: %s", err.Error())
}
curBlockHeight = 0
}

indexer := NewBloomIndexer(store, curBlockHeight/BloomBitsBlocks)

start, err := indexer.GetFilterStart()
if err != nil {
if err != scom.ErrNotFound {
return nil, fmt.Errorf("get filter start: %s", err.Error())
}

var tmp uint32
if curBlockHeight < config.GetAddDecimalsHeight() {
tmp = config.GetAddDecimalsHeight()
} else {
tmp = curBlockHeight
}
err = indexer.PutFilterStart(tmp)
if err != nil {
return nil, fmt.Errorf("put filter start: %s", err.Error())
}
start = tmp
}

blockStore.indexer = indexer
blockStore.filterStart = start

return blockStore, nil
}

func (this *BlockStore) PutFilterStart(height uint32) error {
return this.indexer.PutFilterStart(height)
}

//NewBatch start a commit batch
func (this *BlockStore) NewBatch() {
this.store.NewBatch()
}

func (this *BlockStore) GetIndexer() bloomIndexer {
return this.indexer
}

//SaveBlock persist block to store
func (this *BlockStore) SaveBlock(block *types.Block) error {
if this.enableCache {
Expand Down Expand Up @@ -134,6 +180,10 @@ func (this *BlockStore) GetBlock(blockHash common.Uint256) (*types.Block, error)
return block, nil
}

func (this *BlockStore) GetDb() *leveldbstore.LevelDBStore {
return this.store
}

func (this *BlockStore) loadHeaderWithTx(blockHash common.Uint256) (*types.Header, []common.Uint256, error) {
key := genHeaderKey(blockHash)
value, err := this.store.Get(key)
Expand Down Expand Up @@ -348,6 +398,40 @@ func (this *BlockStore) SaveBlockHash(height uint32, blockHash common.Uint256) {
this.store.BatchPut(key, blockHash.ToArray())
}

//SaveBloomData persist block bloom data to store
func (this *BlockStore) SaveBloomData(height uint32, bloom types2.Bloom) {
this.Process(height, bloom)
if height != 0 && height%BloomBitsBlocks == 0 {
this.indexer.BatchPut()
}
key := this.genBloomKey(height)
this.store.BatchPut(key, bloom.Bytes())
}

func (this *BlockStore) Process(height uint32, bloom types2.Bloom) {
this.indexer.Process(height, bloom)
}

//GetBloomData return bloom data by block height
func (this *BlockStore) GetBloomData(height uint32) (types2.Bloom, error) {
key := this.genBloomKey(height)
value, err := this.store.Get(key)
if err != nil && err != scom.ErrNotFound {
return types2.Bloom{}, err
}
if err == scom.ErrNotFound {
return types2.Bloom{}, nil
}
return types2.BytesToBloom(value), nil
}

func (this *BlockStore) genBloomKey(height uint32) []byte {
temp := make([]byte, 5)
temp[0] = byte(scom.DATA_BLOOM)
binary.LittleEndian.PutUint32(temp[1:], height)
return temp
}

//SaveTransaction persist transaction to store
func (this *BlockStore) SaveTransaction(tx *types.Transaction, height uint32) {
if this.enableCache {
Expand Down
137 changes: 137 additions & 0 deletions core/store/ledgerstore/bloombits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright (C) 2018 The ontology Authors
* This file is part of The ontology library.
*
* The ontology is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* The ontology is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with The ontology. If not, see <http://www.gnu.org/licenses/>.
*/
package ledgerstore

import (
"encoding/binary"
"io"
"time"

"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
common2 "github.com/ontio/ontology/common"
scom "github.com/ontio/ontology/core/store/common"
"github.com/ontio/ontology/core/store/leveldbstore"
)

const (
// bloomServiceThreads is the number of goroutines used globally by an Ethereum
// instance to service bloombits lookups for all running filters.
BloomServiceThreads = 16

// bloomFilterThreads is the number of goroutines used locally per filter to
// multiplex requests onto the global servicing goroutines.
BloomFilterThreads = 3

// bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
// in a single batch.
BloomRetrievalBatch = 16

// bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
// to accumulate request an entire batch (avoiding hysteresis).
BloomRetrievalWait = time.Duration(0)

// BloomBitsBlocks is the number of blocks a single bloom bit section vector
// contains on the server side.
BloomBitsBlocks uint32 = 4096
)

var (
bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint32 big endian) + hash -> bloom bits
)

// bloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
// for the Ethereum header bloom filters, permitting blazing fast filtering.
type bloomIndexer struct {
store *leveldbstore.LevelDBStore // database instance to write index data and metadata into
gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
section uint32 // Section is the section number being processed currently
}

func NewBloomIndexer(store *leveldbstore.LevelDBStore, section uint32) bloomIndexer {
gen, err := bloombits.NewGenerator(uint(BloomBitsBlocks))
if err != nil {
panic(err) // never fired since BloomBitsBlocks is multiple of 8
}
b := bloomIndexer{
store: store,
}
b.gen, b.section = gen, section
return b
}

// Process implements core.ChainIndexerBackend, adding a new header's bloom into
// the index.
func (b *bloomIndexer) Process(height uint32, bloom types.Bloom) {
b.gen.AddBloom(uint(height-b.section*BloomBitsBlocks), bloom)
}

// BatchPut implements core.ChainIndexerBackend, finalizing the bloom section and
// writing it out into the database.
func (b *bloomIndexer) BatchPut() {
for i := 0; i < types.BloomBitLength; i++ {
bits, err := b.gen.Bitset(uint(i))
if err != nil {
panic(err) // never fired since idx is always less than 8 and section should be right
}
value := bitutil.CompressBytes(bits)
b.store.BatchPut(bloomBitsKey(uint(i), b.section), value)
}
b.section++
}

func (this *bloomIndexer) PutFilterStart(height uint32) error {
key := genFilterStartKey()
sink := common2.NewZeroCopySink(nil)
sink.WriteUint32(height)
return this.store.Put(key, sink.Bytes())
}

func (this *bloomIndexer) GetFilterStart() (uint32, error) {
key := genFilterStartKey()
data, err := this.store.Get(key)
if err != nil {
return 0, err
}
height, eof := common2.NewZeroCopySource(data).NextUint32()
if eof {
return 0, io.ErrUnexpectedEOF
}
return height, nil
}

func genFilterStartKey() []byte {
return []byte{byte(scom.ST_ETH_FILTER_START)}
}

// bloomBitsKey = bloomBitsPrefix + bit (uint16 big endian) + section (uint32 big endian)
func bloomBitsKey(bit uint, section uint32) []byte {
key := append(bloomBitsPrefix, make([]byte, 6)...)

binary.BigEndian.PutUint16(key[1:], uint16(bit))
binary.BigEndian.PutUint32(key[3:], section)

return key
}

// ReadBloomBits retrieves the compressed bloom bit vector belonging to the given
// section and bit index from the.
func ReadBloomBits(db *leveldbstore.LevelDBStore, bit uint, section uint32) ([]byte, error) {
return db.Get(bloomBitsKey(bit, section))
}
5 changes: 5 additions & 0 deletions core/store/ledgerstore/cross_chain_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func (this *CrossChainStore) GetCrossChainMsg(height uint32) (*types.CrossChainM
return msg, nil
}

//Close CrossChainStore store
func (this *CrossChainStore) Close() error {
return this.store.Close()
}

func (this *CrossChainStore) genCrossChainMsgKey(height uint32) []byte {
temp := make([]byte, 5)
temp[0] = byte(scom.SYS_CROSS_CHAIN_MSG)
Expand Down
Loading

0 comments on commit 91b7841

Please sign in to comment.