Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support bloom filter #1407

Merged
merged 52 commits into from
Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
026c5cf
clear code
hero5512 Apr 13, 2022
a7701b1
fmt code
hero5512 May 9, 2022
d97f500
add license
hero5512 May 9, 2022
14189bd
add license
hero5512 May 9, 2022
36baa20
add license
hero5512 May 9, 2022
69b3d30
fmt code
hero5512 May 9, 2022
da2bac3
fmt code
hero5512 May 9, 2022
028719d
fmt code
hero5512 May 9, 2022
e83cc7a
fix bug
hero5512 May 9, 2022
c757a54
fmt code
hero5512 May 9, 2022
faaeb00
fmt code
hero5512 May 9, 2022
dcbbea9
clean up
laizy May 16, 2022
c1bc6c8
Update http/ethrpc/filters/filter_system.go
laizy May 17, 2022
c3cfe70
Update http/ethrpc/filters/filter_system.go
laizy May 17, 2022
efc57c1
tmp commit
hero5512 May 23, 2022
bb2b692
fix bug
hero5512 May 24, 2022
965eaa9
fix
hero5512 May 24, 2022
63ad335
fix
hero5512 May 24, 2022
82a681a
fix
hero5512 May 24, 2022
cc39037
break import cycle
hero5512 May 24, 2022
9c5a7bb
fix
hero5512 May 24, 2022
c80b8c3
add max search span
hero5512 May 24, 2022
a19b669
rm filter height
hero5512 May 24, 2022
fef2881
fix
hero5512 May 24, 2022
20986c6
fix
hero5512 May 24, 2022
fa48174
delete unused channel
hero5512 May 26, 2022
ca238ae
add bloom cache
hero5512 May 26, 2022
16b0624
fix
hero5512 May 26, 2022
3abbf1a
add annotation
hero5512 May 26, 2022
5e910d3
add lock
hero5512 May 26, 2022
75e792a
fix
hero5512 May 27, 2022
fff9155
fix
hero5512 May 27, 2022
773f337
fix
hero5512 May 27, 2022
3b4ee5b
fix bug
hero5512 May 27, 2022
9d522c2
fix bug
hero5512 May 30, 2022
99c5fb3
fix bug
hero5512 May 30, 2022
9b137ee
fix bug
hero5512 May 30, 2022
5d2748d
fix bug
hero5512 May 30, 2022
36c05ac
fix bug
hero5512 May 30, 2022
3a5d324
fix
hero5512 May 31, 2022
31d6ada
fix
hero5512 May 31, 2022
504cb29
fix bug
hero5512 May 31, 2022
878d256
delete unused code
hero5512 May 31, 2022
dfecc90
fix
hero5512 Jun 1, 2022
a555c66
use block db to store bloom index
hero5512 Jun 3, 2022
47ac49f
refactor code
hero5512 Jun 8, 2022
7d4d37c
remove filter start field
hero5512 Jun 8, 2022
c25863a
fix
hero5512 Jun 8, 2022
663a1bb
fix
hero5512 Jun 8, 2022
c1768bd
fix
hero5512 Jun 8, 2022
2395af1
fix
hero5512 Jun 8, 2022
a700f9b
add annotation
hero5512 Jun 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ const (
DEFAULT_WASM_GAS_FACTOR = uint64(10)
DEFAULT_WASM_MAX_STEPCOUNT = uint64(8000000)

DEFAULT_DATA_DIR = "./Chain/"
DEFAULT_DATA_DIR = "./Chain"
DEFAULT_RESERVED_FILE = "./peers.rsv"

//DEFAULT_ETH_BLOCK_GAS_LIMIT = 800000000
Expand Down
6 changes: 4 additions & 2 deletions core/store/common/data_entry_prefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
DATA_HEADER = 0x01 //Block hash => block header+txhashes key prefix
DATA_TRANSACTION = 0x02 //Transction hash => transaction key prefix
DATA_STATE_MERKLE_ROOT = 0x21 // block height => write set hash + state merkle root
DATA_BLOOM = 0x23 // block height => block bloom data

// Transaction
ST_BOOKKEEPER DataEntryPrefix = 0x03 //BookKeeper state key prefix
Expand All @@ -35,8 +36,9 @@ const (
ST_DESTROYED DataEntryPrefix = 0x06 // record destroyed smart contract: prefix+address -> height

// eth state
ST_ETH_CODE DataEntryPrefix = 0x30 // eth contract code:hash -> bytes
ST_ETH_ACCOUNT DataEntryPrefix = 0x31 // eth account: address -> [nonce, codeHash]
ST_ETH_CODE DataEntryPrefix = 0x30 // eth contract code:hash -> bytes
ST_ETH_ACCOUNT DataEntryPrefix = 0x31 // eth account: address -> [nonce, codeHash]
ST_ETH_FILTER_START DataEntryPrefix = 0x32 // support eth filter height

IX_HEADER_HASH_LIST DataEntryPrefix = 0x09 //Block height => block hash key prefix

Expand Down
84 changes: 84 additions & 0 deletions core/store/ledgerstore/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"fmt"
"io"

types2 "github.com/ethereum/go-ethereum/core/types"
"github.com/ontio/ontology/common"
"github.com/ontio/ontology/common/config"
"github.com/ontio/ontology/common/serialization"
scom "github.com/ontio/ontology/core/store/common"
"github.com/ontio/ontology/core/store/leveldbstore"
Expand All @@ -36,6 +38,8 @@ type BlockStore struct {
enableCache bool //Is enable lru cache
dbDir string //The path of store file
cache *BlockCache //The cache of block, if have.
indexer bloomIndexer // Background processor generating the index data content
filterStart uint32 // Start block that filter supported
store *leveldbstore.LevelDBStore //block store handler
}

Expand All @@ -54,20 +58,62 @@ func NewBlockStore(dbDir string, enableCache bool) (*BlockStore, error) {
if err != nil {
return nil, err
}

blockStore := &BlockStore{
dbDir: dbDir,
enableCache: enableCache,
store: store,
cache: cache,
}

_, curBlockHeight, err := blockStore.GetCurrentBlock()
if err != nil {
if err != scom.ErrNotFound {
return nil, fmt.Errorf("get current block: %s", err.Error())
}
curBlockHeight = 0
}

indexer := NewBloomIndexer(store, curBlockHeight/BloomBitsBlocks)

start, err := indexer.GetFilterStart()
if err != nil {
if err != scom.ErrNotFound {
return nil, fmt.Errorf("get filter start: %s", err.Error())
}

var tmp uint32
if curBlockHeight < config.GetAddDecimalsHeight() {
tmp = config.GetAddDecimalsHeight()
} else {
tmp = curBlockHeight
}
err = indexer.PutFilterStart(tmp)
if err != nil {
return nil, fmt.Errorf("put filter start: %s", err.Error())
}
start = tmp
}

blockStore.indexer = indexer
blockStore.filterStart = start

return blockStore, nil
}

func (this *BlockStore) PutFilterStart(height uint32) error {
return this.indexer.PutFilterStart(height)
}

//NewBatch start a commit batch
func (this *BlockStore) NewBatch() {
this.store.NewBatch()
}

func (this *BlockStore) GetIndexer() bloomIndexer {
return this.indexer
}

//SaveBlock persist block to store
func (this *BlockStore) SaveBlock(block *types.Block) error {
if this.enableCache {
Expand Down Expand Up @@ -134,6 +180,10 @@ func (this *BlockStore) GetBlock(blockHash common.Uint256) (*types.Block, error)
return block, nil
}

func (this *BlockStore) GetDb() *leveldbstore.LevelDBStore {
return this.store
}

func (this *BlockStore) loadHeaderWithTx(blockHash common.Uint256) (*types.Header, []common.Uint256, error) {
key := genHeaderKey(blockHash)
value, err := this.store.Get(key)
Expand Down Expand Up @@ -348,6 +398,40 @@ func (this *BlockStore) SaveBlockHash(height uint32, blockHash common.Uint256) {
this.store.BatchPut(key, blockHash.ToArray())
}

//SaveBloomData persist block bloom data to store
func (this *BlockStore) SaveBloomData(height uint32, bloom types2.Bloom) {
this.Process(height, bloom)
if height != 0 && height%BloomBitsBlocks == 0 {
this.indexer.BatchPut()
}
key := this.genBloomKey(height)
this.store.BatchPut(key, bloom.Bytes())
}

func (this *BlockStore) Process(height uint32, bloom types2.Bloom) {
this.indexer.Process(height, bloom)
}

//GetBloomData return bloom data by block height
func (this *BlockStore) GetBloomData(height uint32) (types2.Bloom, error) {
key := this.genBloomKey(height)
value, err := this.store.Get(key)
if err != nil && err != scom.ErrNotFound {
return types2.Bloom{}, err
}
if err == scom.ErrNotFound {
return types2.Bloom{}, nil
}
return types2.BytesToBloom(value), nil
}

func (this *BlockStore) genBloomKey(height uint32) []byte {
temp := make([]byte, 5)
temp[0] = byte(scom.DATA_BLOOM)
binary.LittleEndian.PutUint32(temp[1:], height)
return temp
}

//SaveTransaction persist transaction to store
func (this *BlockStore) SaveTransaction(tx *types.Transaction, height uint32) {
if this.enableCache {
Expand Down
137 changes: 137 additions & 0 deletions core/store/ledgerstore/bloombits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright (C) 2018 The ontology Authors
* This file is part of The ontology library.
*
* The ontology is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* The ontology is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with The ontology. If not, see <http://www.gnu.org/licenses/>.
*/
package ledgerstore

import (
"encoding/binary"
"io"
"time"

"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
common2 "github.com/ontio/ontology/common"
scom "github.com/ontio/ontology/core/store/common"
"github.com/ontio/ontology/core/store/leveldbstore"
)

const (
// bloomServiceThreads is the number of goroutines used globally by an Ethereum
// instance to service bloombits lookups for all running filters.
BloomServiceThreads = 16

// bloomFilterThreads is the number of goroutines used locally per filter to
// multiplex requests onto the global servicing goroutines.
BloomFilterThreads = 3

// bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
// in a single batch.
BloomRetrievalBatch = 16

// bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
// to accumulate request an entire batch (avoiding hysteresis).
BloomRetrievalWait = time.Duration(0)

// BloomBitsBlocks is the number of blocks a single bloom bit section vector
// contains on the server side.
BloomBitsBlocks uint32 = 4096
)

var (
bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint32 big endian) + hash -> bloom bits
)

// bloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
// for the Ethereum header bloom filters, permitting blazing fast filtering.
type bloomIndexer struct {
store *leveldbstore.LevelDBStore // database instance to write index data and metadata into
gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
section uint32 // Section is the section number being processed currently
}

func NewBloomIndexer(store *leveldbstore.LevelDBStore, section uint32) bloomIndexer {
gen, err := bloombits.NewGenerator(uint(BloomBitsBlocks))
if err != nil {
panic(err) // never fired since BloomBitsBlocks is multiple of 8
}
b := bloomIndexer{
store: store,
}
b.gen, b.section = gen, section
return b
}

// Process implements core.ChainIndexerBackend, adding a new header's bloom into
// the index.
func (b *bloomIndexer) Process(height uint32, bloom types.Bloom) {
b.gen.AddBloom(uint(height-b.section*BloomBitsBlocks), bloom)
}

// BatchPut implements core.ChainIndexerBackend, finalizing the bloom section and
// writing it out into the database.
func (b *bloomIndexer) BatchPut() {
for i := 0; i < types.BloomBitLength; i++ {
bits, err := b.gen.Bitset(uint(i))
if err != nil {
panic(err) // never fired since idx is always less than 8 and section should be right
}
value := bitutil.CompressBytes(bits)
b.store.BatchPut(bloomBitsKey(uint(i), b.section), value)
}
b.section++
}

func (this *bloomIndexer) PutFilterStart(height uint32) error {
key := genFilterStartKey()
sink := common2.NewZeroCopySink(nil)
sink.WriteUint32(height)
return this.store.Put(key, sink.Bytes())
}

func (this *bloomIndexer) GetFilterStart() (uint32, error) {
key := genFilterStartKey()
data, err := this.store.Get(key)
if err != nil {
return 0, err
}
height, eof := common2.NewZeroCopySource(data).NextUint32()
if eof {
return 0, io.ErrUnexpectedEOF
}
return height, nil
}

func genFilterStartKey() []byte {
return []byte{byte(scom.ST_ETH_FILTER_START)}
}

// bloomBitsKey = bloomBitsPrefix + bit (uint16 big endian) + section (uint32 big endian)
func bloomBitsKey(bit uint, section uint32) []byte {
key := append(bloomBitsPrefix, make([]byte, 6)...)

binary.BigEndian.PutUint16(key[1:], uint16(bit))
binary.BigEndian.PutUint32(key[3:], section)

return key
}

// ReadBloomBits retrieves the compressed bloom bit vector belonging to the given
// section and bit index from the.
func ReadBloomBits(db *leveldbstore.LevelDBStore, bit uint, section uint32) ([]byte, error) {
return db.Get(bloomBitsKey(bit, section))
}
5 changes: 5 additions & 0 deletions core/store/ledgerstore/cross_chain_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func (this *CrossChainStore) GetCrossChainMsg(height uint32) (*types.CrossChainM
return msg, nil
}

//Close CrossChainStore store
func (this *CrossChainStore) Close() error {
return this.store.Close()
}

func (this *CrossChainStore) genCrossChainMsgKey(height uint32) []byte {
temp := make([]byte, 5)
temp[0] = byte(scom.SYS_CROSS_CHAIN_MSG)
Expand Down
Loading