diff --git a/common/config/config.go b/common/config/config.go index 0dacdad4a2..9c35536902 100644 --- a/common/config/config.go +++ b/common/config/config.go @@ -79,7 +79,7 @@ const ( DEFAULT_WASM_GAS_FACTOR = uint64(10) DEFAULT_WASM_MAX_STEPCOUNT = uint64(8000000) - DEFAULT_DATA_DIR = "./Chain/" + DEFAULT_DATA_DIR = "./Chain" DEFAULT_RESERVED_FILE = "./peers.rsv" //DEFAULT_ETH_BLOCK_GAS_LIMIT = 800000000 diff --git a/core/store/common/data_entry_prefix.go b/core/store/common/data_entry_prefix.go index bebbd6023c..d7edcaf173 100644 --- a/core/store/common/data_entry_prefix.go +++ b/core/store/common/data_entry_prefix.go @@ -27,6 +27,7 @@ const ( DATA_HEADER = 0x01 //Block hash => block header+txhashes key prefix DATA_TRANSACTION = 0x02 //Transction hash => transaction key prefix DATA_STATE_MERKLE_ROOT = 0x21 // block height => write set hash + state merkle root + DATA_BLOOM = 0x23 // block height => block bloom data // Transaction ST_BOOKKEEPER DataEntryPrefix = 0x03 //BookKeeper state key prefix @@ -35,8 +36,9 @@ const ( ST_DESTROYED DataEntryPrefix = 0x06 // record destroyed smart contract: prefix+address -> height // eth state - ST_ETH_CODE DataEntryPrefix = 0x30 // eth contract code:hash -> bytes - ST_ETH_ACCOUNT DataEntryPrefix = 0x31 // eth account: address -> [nonce, codeHash] + ST_ETH_CODE DataEntryPrefix = 0x30 // eth contract code:hash -> bytes + ST_ETH_ACCOUNT DataEntryPrefix = 0x31 // eth account: address -> [nonce, codeHash] + ST_ETH_FILTER_START DataEntryPrefix = 0x32 // support eth filter height IX_HEADER_HASH_LIST DataEntryPrefix = 0x09 //Block height => block hash key prefix diff --git a/core/store/ledgerstore/block_store.go b/core/store/ledgerstore/block_store.go index cd022439de..c468a824b4 100644 --- a/core/store/ledgerstore/block_store.go +++ b/core/store/ledgerstore/block_store.go @@ -24,7 +24,9 @@ import ( "fmt" "io" + types2 "github.com/ethereum/go-ethereum/core/types" "github.com/ontio/ontology/common" + "github.com/ontio/ontology/common/config" "github.com/ontio/ontology/common/serialization" scom "github.com/ontio/ontology/core/store/common" "github.com/ontio/ontology/core/store/leveldbstore" @@ -36,6 +38,8 @@ type BlockStore struct { enableCache bool //Is enable lru cache dbDir string //The path of store file cache *BlockCache //The cache of block, if have. + indexer bloomIndexer // Background processor generating the index data content + filterStart uint32 // Start block that filter supported store *leveldbstore.LevelDBStore //block store handler } @@ -54,20 +58,62 @@ func NewBlockStore(dbDir string, enableCache bool) (*BlockStore, error) { if err != nil { return nil, err } + blockStore := &BlockStore{ dbDir: dbDir, enableCache: enableCache, store: store, cache: cache, } + + _, curBlockHeight, err := blockStore.GetCurrentBlock() + if err != nil { + if err != scom.ErrNotFound { + return nil, fmt.Errorf("get current block: %s", err.Error()) + } + curBlockHeight = 0 + } + + indexer := NewBloomIndexer(store, curBlockHeight/BloomBitsBlocks) + + start, err := indexer.GetFilterStart() + if err != nil { + if err != scom.ErrNotFound { + return nil, fmt.Errorf("get filter start: %s", err.Error()) + } + + var tmp uint32 + if curBlockHeight < config.GetAddDecimalsHeight() { + tmp = config.GetAddDecimalsHeight() + } else { + tmp = curBlockHeight + } + err = indexer.PutFilterStart(tmp) + if err != nil { + return nil, fmt.Errorf("put filter start: %s", err.Error()) + } + start = tmp + } + + blockStore.indexer = indexer + blockStore.filterStart = start + return blockStore, nil } +func (this *BlockStore) PutFilterStart(height uint32) error { + return this.indexer.PutFilterStart(height) +} + //NewBatch start a commit batch func (this *BlockStore) NewBatch() { this.store.NewBatch() } +func (this *BlockStore) GetIndexer() bloomIndexer { + return this.indexer +} + //SaveBlock persist block to store func (this *BlockStore) SaveBlock(block *types.Block) error { if this.enableCache { @@ -134,6 +180,10 @@ func (this *BlockStore) GetBlock(blockHash common.Uint256) (*types.Block, error) return block, nil } +func (this *BlockStore) GetDb() *leveldbstore.LevelDBStore { + return this.store +} + func (this *BlockStore) loadHeaderWithTx(blockHash common.Uint256) (*types.Header, []common.Uint256, error) { key := genHeaderKey(blockHash) value, err := this.store.Get(key) @@ -348,6 +398,40 @@ func (this *BlockStore) SaveBlockHash(height uint32, blockHash common.Uint256) { this.store.BatchPut(key, blockHash.ToArray()) } +//SaveBloomData persist block bloom data to store +func (this *BlockStore) SaveBloomData(height uint32, bloom types2.Bloom) { + this.Process(height, bloom) + if height != 0 && height%BloomBitsBlocks == 0 { + this.indexer.BatchPut() + } + key := this.genBloomKey(height) + this.store.BatchPut(key, bloom.Bytes()) +} + +func (this *BlockStore) Process(height uint32, bloom types2.Bloom) { + this.indexer.Process(height, bloom) +} + +//GetBloomData return bloom data by block height +func (this *BlockStore) GetBloomData(height uint32) (types2.Bloom, error) { + key := this.genBloomKey(height) + value, err := this.store.Get(key) + if err != nil && err != scom.ErrNotFound { + return types2.Bloom{}, err + } + if err == scom.ErrNotFound { + return types2.Bloom{}, nil + } + return types2.BytesToBloom(value), nil +} + +func (this *BlockStore) genBloomKey(height uint32) []byte { + temp := make([]byte, 5) + temp[0] = byte(scom.DATA_BLOOM) + binary.LittleEndian.PutUint32(temp[1:], height) + return temp +} + //SaveTransaction persist transaction to store func (this *BlockStore) SaveTransaction(tx *types.Transaction, height uint32) { if this.enableCache { diff --git a/core/store/ledgerstore/bloombits.go b/core/store/ledgerstore/bloombits.go new file mode 100644 index 0000000000..5c9478ebfe --- /dev/null +++ b/core/store/ledgerstore/bloombits.go @@ -0,0 +1,137 @@ +/* + * Copyright (C) 2018 The ontology Authors + * This file is part of The ontology library. + * + * The ontology is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The ontology is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with The ontology. If not, see . + */ +package ledgerstore + +import ( + "encoding/binary" + "io" + "time" + + "github.com/ethereum/go-ethereum/common/bitutil" + "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/types" + common2 "github.com/ontio/ontology/common" + scom "github.com/ontio/ontology/core/store/common" + "github.com/ontio/ontology/core/store/leveldbstore" +) + +const ( + // bloomServiceThreads is the number of goroutines used globally by an Ethereum + // instance to service bloombits lookups for all running filters. + BloomServiceThreads = 16 + + // bloomFilterThreads is the number of goroutines used locally per filter to + // multiplex requests onto the global servicing goroutines. + BloomFilterThreads = 3 + + // bloomRetrievalBatch is the maximum number of bloom bit retrievals to service + // in a single batch. + BloomRetrievalBatch = 16 + + // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests + // to accumulate request an entire batch (avoiding hysteresis). + BloomRetrievalWait = time.Duration(0) + + // BloomBitsBlocks is the number of blocks a single bloom bit section vector + // contains on the server side. + BloomBitsBlocks uint32 = 4096 +) + +var ( + bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint32 big endian) + hash -> bloom bits +) + +// bloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index +// for the Ethereum header bloom filters, permitting blazing fast filtering. +type bloomIndexer struct { + store *leveldbstore.LevelDBStore // database instance to write index data and metadata into + gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index + section uint32 // Section is the section number being processed currently +} + +func NewBloomIndexer(store *leveldbstore.LevelDBStore, section uint32) bloomIndexer { + gen, err := bloombits.NewGenerator(uint(BloomBitsBlocks)) + if err != nil { + panic(err) // never fired since BloomBitsBlocks is multiple of 8 + } + b := bloomIndexer{ + store: store, + } + b.gen, b.section = gen, section + return b +} + +// Process implements core.ChainIndexerBackend, adding a new header's bloom into +// the index. +func (b *bloomIndexer) Process(height uint32, bloom types.Bloom) { + b.gen.AddBloom(uint(height-b.section*BloomBitsBlocks), bloom) +} + +// BatchPut implements core.ChainIndexerBackend, finalizing the bloom section and +// writing it out into the database. +func (b *bloomIndexer) BatchPut() { + for i := 0; i < types.BloomBitLength; i++ { + bits, err := b.gen.Bitset(uint(i)) + if err != nil { + panic(err) // never fired since idx is always less than 8 and section should be right + } + value := bitutil.CompressBytes(bits) + b.store.BatchPut(bloomBitsKey(uint(i), b.section), value) + } + b.section++ +} + +func (this *bloomIndexer) PutFilterStart(height uint32) error { + key := genFilterStartKey() + sink := common2.NewZeroCopySink(nil) + sink.WriteUint32(height) + return this.store.Put(key, sink.Bytes()) +} + +func (this *bloomIndexer) GetFilterStart() (uint32, error) { + key := genFilterStartKey() + data, err := this.store.Get(key) + if err != nil { + return 0, err + } + height, eof := common2.NewZeroCopySource(data).NextUint32() + if eof { + return 0, io.ErrUnexpectedEOF + } + return height, nil +} + +func genFilterStartKey() []byte { + return []byte{byte(scom.ST_ETH_FILTER_START)} +} + +// bloomBitsKey = bloomBitsPrefix + bit (uint16 big endian) + section (uint32 big endian) +func bloomBitsKey(bit uint, section uint32) []byte { + key := append(bloomBitsPrefix, make([]byte, 6)...) + + binary.BigEndian.PutUint16(key[1:], uint16(bit)) + binary.BigEndian.PutUint32(key[3:], section) + + return key +} + +// ReadBloomBits retrieves the compressed bloom bit vector belonging to the given +// section and bit index from the. +func ReadBloomBits(db *leveldbstore.LevelDBStore, bit uint, section uint32) ([]byte, error) { + return db.Get(bloomBitsKey(bit, section)) +} diff --git a/core/store/ledgerstore/cross_chain_store.go b/core/store/ledgerstore/cross_chain_store.go index e170fe3ed0..e0a5706345 100644 --- a/core/store/ledgerstore/cross_chain_store.go +++ b/core/store/ledgerstore/cross_chain_store.go @@ -78,6 +78,11 @@ func (this *CrossChainStore) GetCrossChainMsg(height uint32) (*types.CrossChainM return msg, nil } +//Close CrossChainStore store +func (this *CrossChainStore) Close() error { + return this.store.Close() +} + func (this *CrossChainStore) genCrossChainMsgKey(height uint32) []byte { temp := make([]byte, 5) temp[0] = byte(scom.SYS_CROSS_CHAIN_MSG) diff --git a/core/store/ledgerstore/ledger_store.go b/core/store/ledgerstore/ledger_store.go index cbfc1c4466..ca694e6c1c 100644 --- a/core/store/ledgerstore/ledger_store.go +++ b/core/store/ledgerstore/ledger_store.go @@ -44,6 +44,7 @@ import ( "github.com/ontio/ontology/core/states" "github.com/ontio/ontology/core/store" scom "github.com/ontio/ontology/core/store/common" + "github.com/ontio/ontology/core/store/leveldbstore" "github.com/ontio/ontology/core/store/overlaydb" "github.com/ontio/ontology/core/types" "github.com/ontio/ontology/errors" @@ -53,7 +54,7 @@ import ( "github.com/ontio/ontology/smartcontract" "github.com/ontio/ontology/smartcontract/event" "github.com/ontio/ontology/smartcontract/service/evm" - types4 "github.com/ontio/ontology/smartcontract/service/evm/types" + types5 "github.com/ontio/ontology/smartcontract/service/evm/types" "github.com/ontio/ontology/smartcontract/service/evm/witness" "github.com/ontio/ontology/smartcontract/service/native/ong" "github.com/ontio/ontology/smartcontract/service/native/utils" @@ -95,6 +96,7 @@ type LedgerStoreImp struct { currBlockHeight uint32 //Current block height currBlockHash common.Uint256 //Current block hash headerCache map[common.Uint256]*types.Header //BlockHash => Header + bloomCache map[uint32]*types3.Bloom //bloomCache for bloom index, delete cached bloom after calculating bloom index headerIndex map[uint32]common.Uint256 //Header index, Mapping header height => block hash vbftPeerInfoMap map[uint32]map[string]uint32 //key:block height,value:peerInfo lock sync.RWMutex @@ -110,6 +112,7 @@ func NewLedgerStore(dataDir string, stateHashHeight uint32) (*LedgerStoreImp, er ledgerStore := &LedgerStoreImp{ headerIndex: make(map[uint32]common.Uint256), headerCache: make(map[common.Uint256]*types.Header, 0), + bloomCache: make(map[uint32]*types3.Bloom, 4096), vbftPeerInfoMap: make(map[uint32]map[string]uint32), savingBlockSemaphore: make(chan bool, 1), stateHashCheckHeight: stateHashHeight, @@ -123,7 +126,7 @@ func NewLedgerStore(dataDir string, stateHashHeight uint32) (*LedgerStoreImp, er crossChainStore, err := NewCrossChainStore(dataDir) if err != nil { - return nil, fmt.Errorf("NewBlockStore error %s", err) + return nil, fmt.Errorf("NewCrossChainStore error %s", err) } ledgerStore.crossChainStore = crossChainStore @@ -155,6 +158,10 @@ func (this *LedgerStoreImp) InitLedgerStoreWithGenesisBlock(genesisBlock *types. if err != nil { return fmt.Errorf("blockStore.ClearAll error %s", err) } + err = this.blockStore.PutFilterStart(0) + if err != nil { + return fmt.Errorf("blockStore.PutFilterStart error %s", err) + } err = this.stateStore.ClearAll() if err != nil { return fmt.Errorf("stateStore.ClearAll error %s", err) @@ -273,6 +280,38 @@ func (this *LedgerStoreImp) init() error { if err != nil { return fmt.Errorf("recoverStore error %s", err) } + err = this.loadBloomBits() + if err != nil { + return fmt.Errorf("loadBloomBits error %s", err) + } + return nil +} + +func (this *LedgerStoreImp) loadBloomBits() error { + _, currentBlockHeight, err := this.blockStore.GetCurrentBlock() + if err != nil { + if err != scom.ErrNotFound { + return fmt.Errorf("LoadCurrentBlock error %s", err) + } + return nil + } + + if currentBlockHeight < this.blockStore.filterStart { + return nil + } + + start := currentBlockHeight - currentBlockHeight%BloomBitsBlocks + if start < this.blockStore.filterStart { + start = this.blockStore.filterStart + } + + for i := start; i <= currentBlockHeight; i++ { + bloom, err := this.blockStore.GetBloomData(i) + if err != nil { + return fmt.Errorf("LoadBloom error %s", err) + } + this.blockStore.Process(i, bloom) + } return nil } @@ -411,6 +450,14 @@ func (this *LedgerStoreImp) GetCurrentBlockHash() common.Uint256 { return this.currBlockHash } +func (this *LedgerStoreImp) GetFilterStart() uint32 { + return this.blockStore.filterStart +} + +func (this *LedgerStoreImp) GetIndexStore() *leveldbstore.LevelDBStore { + return this.blockStore.GetDb() +} + //GetCurrentBlockHeight return the current block height func (this *LedgerStoreImp) GetCurrentBlockHeight() uint32 { this.lock.RLock() @@ -721,7 +768,7 @@ func (this *LedgerStoreImp) AddBlock(block *types.Block, ccMsg *types.CrossChain return nil } -func (this *LedgerStoreImp) saveBlockToBlockStore(block *types.Block) error { +func (this *LedgerStoreImp) saveBlockToBlockStore(block *types.Block, bloom types3.Bloom) error { blockHash := block.Hash() blockHeight := block.Header.Height @@ -739,6 +786,9 @@ func (this *LedgerStoreImp) saveBlockToBlockStore(block *types.Block) error { if err != nil { return fmt.Errorf("SaveBlock height %d hash %s error %s", blockHeight, blockHash.ToHexString(), err) } + if blockHeight >= config.GetAddDecimalsHeight() { + this.blockStore.SaveBloomData(blockHeight, bloom) + } return nil } @@ -767,9 +817,10 @@ func (this *LedgerStoreImp) executeBlock(block *types.Block) (result store.Execu return true }) cache := storage.NewCacheDB(overlay) + var allLogs []*types.StorageLog for i, tx := range block.Transactions { cache.Reset() - notify, crossStateHashes, e := this.handleTransaction(overlay, cache, gasTable, block, tx, uint32(i), evmWitness) + notify, crossStateHashes, receipt, e := this.handleTransaction(overlay, cache, gasTable, block, tx, uint32(i), evmWitness) if e != nil { err = e return @@ -780,7 +831,13 @@ func (this *LedgerStoreImp) executeBlock(block *types.Block) (result store.Execu notify.TxIndex = uint32(i) result.Notify = append(result.Notify, notify) result.CrossStates = append(result.CrossStates, crossStateHashes...) + if receipt != nil { + allLogs = append(allLogs, receipt.Logs...) + } } + bloomBytes := types3.LogsBloom(parseOntLogsToEth(allLogs)) + result.Bloom = types3.BytesToBloom(bloomBytes) + result.Hash = overlay.ChangeHash() result.WriteSet = overlay.GetWriteSet() if len(result.CrossStates) != 0 { @@ -807,6 +864,19 @@ func (this *LedgerStoreImp) executeBlock(block *types.Block) (result store.Execu return } +func parseOntLogsToEth(logs []*types.StorageLog) []*types3.Log { + var parseLogs []*types3.Log + for _, log := range logs { + parse := &types3.Log{ + Address: log.Address, + Topics: log.Topics, + Data: log.Data, + } + parseLogs = append(parseLogs, parse) + } + return parseLogs +} + func calculateTotalStateHash(overlay *overlaydb.OverlayDB) (result common.Uint256, err error) { stateDiff := sha256.New() @@ -841,7 +911,7 @@ func (this *LedgerStoreImp) saveBlockToStateStore(block *types.Block, result sto blockHeight := block.Header.Height for _, notify := range result.Notify { - if err := SaveNotify(this.eventStore, notify.TxHash, notify); err != nil { + if err := SaveNotify(this.eventStore, notify.TxHash, notify, block); err != nil { return err } } @@ -943,6 +1013,10 @@ func (this *LedgerStoreImp) tryPruneBlock(header *types.Header) bool { return true } +func (this *LedgerStoreImp) BloomStatus() (uint32, uint32) { + return BloomBitsBlocks, this.currBlockHeight / BloomBitsBlocks +} + //saveBlock do the job of execution samrt contract and commit block to store. func (this *LedgerStoreImp) submitBlock(block *types.Block, crossChainMsg *types.CrossChainMsg, result store.ExecuteResult) error { blockHash := block.Hash() @@ -956,7 +1030,8 @@ func (this *LedgerStoreImp) submitBlock(block *types.Block, crossChainMsg *types this.blockStore.NewBatch() this.stateStore.NewBatch() this.eventStore.NewBatch() - err := this.saveBlockToBlockStore(block) + + err := this.saveBlockToBlockStore(block, result.Bloom) if err != nil { return fmt.Errorf("save to block store height:%d error:%s", blockHeight, err) } @@ -991,6 +1066,7 @@ func (this *LedgerStoreImp) submitBlock(block *types.Block, crossChainMsg *types &message.SaveBlockCompleteMsg{ Block: block, }) + event.PushChainEvent(result.Notify, block, result.Bloom) } return nil } @@ -1026,16 +1102,17 @@ func (this *LedgerStoreImp) saveBlock(block *types.Block, ccMsg *types.CrossChai } func (this *LedgerStoreImp) handleTransaction(overlay *overlaydb.OverlayDB, cache *storage.CacheDB, gasTable map[string]uint64, - block *types.Block, tx *types.Transaction, txIndex uint32, evmWitness common2.Address) (*event.ExecuteNotify, []common.Uint256, error) { + block *types.Block, tx *types.Transaction, txIndex uint32, evmWitness common2.Address) (*event.ExecuteNotify, []common.Uint256, *types.Receipt, error) { txHash := tx.Hash() notify := &event.ExecuteNotify{TxHash: txHash, State: event.CONTRACT_STATE_FAIL, TxIndex: txIndex} var crossStateHashes []common.Uint256 var err error + var receipt *types.Receipt switch tx.TxType { case types.Deploy: err = this.stateStore.HandleDeployTransaction(this, overlay, gasTable, cache, tx, block, notify) if overlay.Error() != nil { - return nil, nil, fmt.Errorf("HandleDeployTransaction tx %s error %s", txHash.ToHexString(), overlay.Error()) + return nil, nil, nil, fmt.Errorf("HandleDeployTransaction tx %s error %s", txHash.ToHexString(), overlay.Error()) } if err != nil { log.Debugf("HandleDeployTransaction tx %s error %s", txHash.ToHexString(), err) @@ -1043,7 +1120,7 @@ func (this *LedgerStoreImp) handleTransaction(overlay *overlaydb.OverlayDB, cach case types.InvokeNeo, types.InvokeWasm: crossStateHashes, err = this.stateStore.HandleInvokeTransaction(this, overlay, gasTable, cache, tx, block, notify) if overlay.Error() != nil { - return nil, nil, fmt.Errorf("HandleInvokeTransaction tx %s error %s", txHash.ToHexString(), overlay.Error()) + return nil, nil, nil, fmt.Errorf("HandleInvokeTransaction tx %s error %s", txHash.ToHexString(), overlay.Error()) } if err != nil { log.Debugf("HandleInvokeTransaction tx %s error %s", txHash.ToHexString(), err) @@ -1051,7 +1128,7 @@ func (this *LedgerStoreImp) handleTransaction(overlay *overlaydb.OverlayDB, cach case types.EIP155: eiptx, err := tx.GetEIP155Tx() if err != nil { - return nil, nil, fmt.Errorf("HandleInvokeTransaction tx %s error %s", txHash.ToHexString(), err.Error()) + return nil, nil, nil, fmt.Errorf("HandleInvokeTransaction tx %s error %s", txHash.ToHexString(), err.Error()) } ctx := Eip155Context{ @@ -1060,9 +1137,9 @@ func (this *LedgerStoreImp) handleTransaction(overlay *overlaydb.OverlayDB, cach Height: block.Header.Height, Timestamp: block.Header.Timestamp, } - _, receipt, err := this.stateStore.HandleEIP155Transaction(this, cache, eiptx, ctx, notify, true) + _, receipt, err = this.stateStore.HandleEIP155Transaction(this, cache, eiptx, ctx, notify, true) if overlay.Error() != nil { - return nil, nil, fmt.Errorf("HandleInvokeTransaction tx %s error %s", txHash.ToHexString(), overlay.Error()) + return nil, nil, nil, fmt.Errorf("HandleInvokeTransaction tx %s error %s", txHash.ToHexString(), overlay.Error()) } if err != nil { log.Debugf("HandleInvokeTransaction tx %s error %s", txHash.ToHexString(), err) @@ -1079,7 +1156,7 @@ func (this *LedgerStoreImp) handleTransaction(overlay *overlaydb.OverlayDB, cach crossStateHashes = append(crossStateHashes, event.Hash) } } - return notify, crossStateHashes, nil + return notify, crossStateHashes, receipt, nil } func (this *LedgerStoreImp) saveHeaderIndexList() error { @@ -1278,7 +1355,7 @@ func (this *LedgerStoreImp) PreExecuteContractBatch(txes []*types.Transaction, a return results, height, nil } -func (this *LedgerStoreImp) PreExecuteEIP155(tx *types3.Transaction, ctx Eip155Context) (*types4.ExecutionResult, *event.ExecuteNotify, error) { +func (this *LedgerStoreImp) PreExecuteEIP155(tx *types3.Transaction, ctx Eip155Context) (*types5.ExecutionResult, *event.ExecuteNotify, error) { overlay := this.stateStore.NewOverlayDB() cache := storage.NewCacheDB(overlay) @@ -1299,6 +1376,10 @@ func (this *LedgerStoreImp) GetEthAccount(address common2.Address) (*storage.Eth return this.stateStore.GetEthAccount(address) } +func (this *LedgerStoreImp) GetBloomData(height uint32) (types3.Bloom, error) { + return this.blockStore.GetBloomData(height) +} + //PreExecuteContract return the result of smart contract execution without commit to store func (this *LedgerStoreImp) PreExecuteContractWithParam(tx *types.Transaction, preParam PrexecuteParam) (*sstate.PreExecResult, error) { height := this.GetCurrentBlockHeight() @@ -1432,7 +1513,7 @@ func (this *LedgerStoreImp) PreExecuteContract(tx *types.Transaction) (*sstate.P return this.PreExecuteContractWithParam(tx, param) } -func (this *LedgerStoreImp) PreExecuteEip155Tx(msg types3.Message) (*types4.ExecutionResult, error) { +func (this *LedgerStoreImp) PreExecuteEip155Tx(msg types3.Message) (*types5.ExecutionResult, error) { height := this.GetCurrentBlockHeight() // use previous block time to make it predictable for easy test blockTime := uint32(time.Now().Unix()) @@ -1472,6 +1553,10 @@ func (this *LedgerStoreImp) Close() error { if err != nil { return fmt.Errorf("eventStore close error %s", err) } + err = this.crossChainStore.Close() + if err != nil { + return fmt.Errorf("crossChainStore close error %s", err) + } err = this.stateStore.Close() if err != nil { return fmt.Errorf("stateStore close error %s", err) diff --git a/core/store/ledgerstore/tx_handler.go b/core/store/ledgerstore/tx_handler.go index c0bfc1a196..e7d9f12f95 100644 --- a/core/store/ledgerstore/tx_handler.go +++ b/core/store/ledgerstore/tx_handler.go @@ -292,7 +292,7 @@ func (self *StateStore) HandleInvokeTransaction(store store.LedgerStore, overlay return sc.CrossHashes, nil } -func SaveNotify(eventStore scommon.EventStore, txHash common.Uint256, notify *event.ExecuteNotify) error { +func SaveNotify(eventStore scommon.EventStore, txHash common.Uint256, notify *event.ExecuteNotify, blk *types.Block) error { if !sysconfig.DefConfig.Common.EnableEventLog { return nil } @@ -300,6 +300,7 @@ func SaveNotify(eventStore scommon.EventStore, txHash common.Uint256, notify *ev return fmt.Errorf("SaveEventNotifyByTx error %s", err) } event.PushSmartCodeEvent(txHash, 0, event.EVENT_NOTIFY, notify) + event.PushEthSmartCodeEvent(notify, blk) return nil } diff --git a/core/store/store.go b/core/store/store.go index 6fb62b4b0a..c04acdec16 100644 --- a/core/store/store.go +++ b/core/store/store.go @@ -25,6 +25,7 @@ import ( "github.com/ontio/ontology/common" "github.com/ontio/ontology/core/payload" "github.com/ontio/ontology/core/states" + "github.com/ontio/ontology/core/store/leveldbstore" "github.com/ontio/ontology/core/store/overlaydb" "github.com/ontio/ontology/core/types" "github.com/ontio/ontology/smartcontract/event" @@ -40,6 +41,7 @@ type ExecuteResult struct { CrossStates []common.Uint256 CrossStatesRoot common.Uint256 Notify []*event.ExecuteNotify + Bloom types2.Bloom } // LedgerStore provides func with store package. @@ -54,6 +56,8 @@ type LedgerStore interface { GetCurrentBlockHash() common.Uint256 GetCurrentBlockHeight() uint32 GetCurrentHeaderHeight() uint32 + GetFilterStart() uint32 + GetIndexStore() *leveldbstore.LevelDBStore GetCurrentHeaderHash() common.Uint256 GetBlockHash(height uint32) common.Uint256 GetHeaderByHash(blockHash common.Uint256) (*types.Header, error) @@ -62,6 +66,8 @@ type LedgerStore interface { GetBlockByHash(blockHash common.Uint256) (*types.Block, error) GetBlockByHeight(height uint32) (*types.Block, error) GetTransaction(txHash common.Uint256) (*types.Transaction, uint32, error) + GetBloomData(height uint32) (types2.Bloom, error) + BloomStatus() (uint32, uint32) IsContainBlock(blockHash common.Uint256) (bool, error) IsContainTransaction(txHash common.Uint256) (bool, error) GetBlockRootWithNewTxRoots(startHeight uint32, txRoots []common.Uint256) common.Uint256 diff --git a/events/message/message.go b/events/message/message.go index dc64eb3ede..3683bba93d 100644 --- a/events/message/message.go +++ b/events/message/message.go @@ -19,12 +19,17 @@ package message import ( + "github.com/ethereum/go-ethereum/core" + types2 "github.com/ethereum/go-ethereum/core/types" "github.com/ontio/ontology/core/types" ) const ( TOPIC_SAVE_BLOCK_COMPLETE = "svblkcmp" TOPIC_SMART_CODE_EVENT = "scevt" + TOPIC_PENDING_TX_EVENT = "pendingtx" + TOPIC_CHAIN_EVENT = "chainevt" + TOPIC_ETH_SC_EVENT = "ethscevt" ) type SaveBlockCompleteMsg struct { @@ -38,3 +43,18 @@ type SmartCodeEventMsg struct { type BlockConsensusComplete struct { Block *types.Block } + +type EthSmartCodeEventMsg struct { + Event EthSmartCodeEvent +} +type PendingTxs []*types2.Transaction + +type PendingTxMsg struct { + Event PendingTxs +} + +type EthSmartCodeEvent []*types2.Log + +type ChainEventMsg struct { + ChainEvent *core.ChainEvent +} diff --git a/http/base/actor/event.go b/http/base/actor/event.go index 22171e72dc..2ffd0e6a42 100644 --- a/http/base/actor/event.go +++ b/http/base/actor/event.go @@ -27,6 +27,9 @@ import ( type EventActor struct { blockPersistCompleted func(v interface{}) smartCodeEvt func(v interface{}) + chainEvt func(v interface{}) + ethSmartCodeEvt func(v interface{}) + pendingTxEvt func(v interface{}) } //receive from subscribed actor @@ -36,6 +39,12 @@ func (t *EventActor) Receive(c actor.Context) { t.blockPersistCompleted(*msg.Block) case *message.SmartCodeEventMsg: t.smartCodeEvt(*msg.Event) + case *message.ChainEventMsg: + t.chainEvt(*msg.ChainEvent) + case *message.PendingTxMsg: + t.pendingTxEvt(msg.Event) + case *message.EthSmartCodeEventMsg: + t.ethSmartCodeEvt(msg.Event) default: } } @@ -47,6 +56,12 @@ func SubscribeEvent(topic string, handler func(v interface{})) { return &EventActor{blockPersistCompleted: handler} } else if topic == message.TOPIC_SMART_CODE_EVENT { return &EventActor{smartCodeEvt: handler} + } else if topic == message.TOPIC_PENDING_TX_EVENT { + return &EventActor{pendingTxEvt: handler} + } else if topic == message.TOPIC_ETH_SC_EVENT { + return &EventActor{ethSmartCodeEvt: handler} + } else if topic == message.TOPIC_CHAIN_EVENT { + return &EventActor{chainEvt: handler} } else { return &EventActor{} } diff --git a/http/base/actor/ledger.go b/http/base/actor/ledger.go index c1ed1c4851..79c7407f25 100644 --- a/http/base/actor/ledger.go +++ b/http/base/actor/ledger.go @@ -24,6 +24,7 @@ import ( "github.com/ontio/ontology/common" "github.com/ontio/ontology/core/ledger" "github.com/ontio/ontology/core/payload" + "github.com/ontio/ontology/core/store/leveldbstore" "github.com/ontio/ontology/core/types" "github.com/ontio/ontology/smartcontract/event" types3 "github.com/ontio/ontology/smartcontract/service/evm/types" @@ -141,3 +142,19 @@ func PreExecuteEip155Tx(msg types2.Message) (*types3.ExecutionResult, error) { res, err := ledger.DefLedger.PreExecuteEip155Tx(msg) return res, err } + +func BloomStatus() (uint32, uint32) { + return ledger.DefLedger.BloomStatus() +} + +func GetBloomData(height uint32) (types2.Bloom, error) { + return ledger.DefLedger.GetBloomData(height) +} + +func GetFilterStart() uint32 { + return ledger.DefLedger.GetFilterStart() +} + +func GetIndexStore() *leveldbstore.LevelDBStore { + return ledger.DefLedger.GetIndexStore() +} diff --git a/http/base/common/common.go b/http/base/common/common.go index ea26434e3f..2d1dbc63da 100644 --- a/http/base/common/common.go +++ b/http/base/common/common.go @@ -27,16 +27,14 @@ import ( "strings" "time" - "github.com/ontio/ontology/core/states" - "github.com/laizy/bigint" - "github.com/ontio/ontology-crypto/keypair" "github.com/ontio/ontology/common" "github.com/ontio/ontology/common/constants" "github.com/ontio/ontology/common/log" "github.com/ontio/ontology/core/ledger" "github.com/ontio/ontology/core/payload" + "github.com/ontio/ontology/core/states" "github.com/ontio/ontology/core/types" cutils "github.com/ontio/ontology/core/utils" ontErrors "github.com/ontio/ontology/errors" diff --git a/http/ethrpc/backend/backend.go b/http/ethrpc/backend/backend.go new file mode 100644 index 0000000000..406385af47 --- /dev/null +++ b/http/ethrpc/backend/backend.go @@ -0,0 +1,88 @@ +/* + * Copyright (C) 2018 The ontology Authors + * This file is part of The ontology library. + * + * The ontology is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The ontology is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with The ontology. If not, see . + */ + +package backend + +import ( + "context" + + "github.com/ethereum/go-ethereum/common/bitutil" + "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ontio/ontology/core/store/ledgerstore" + "github.com/ontio/ontology/core/store/leveldbstore" + "github.com/ontio/ontology/http/base/actor" +) + +type BloomBackend struct { + bloomRequests chan chan *bloombits.Retrieval + closeBloomHandler chan struct{} +} + +func NewBloomBackend() *BloomBackend { + return &BloomBackend{ + bloomRequests: make(chan chan *bloombits.Retrieval), + closeBloomHandler: make(chan struct{}), + } +} + +// Close +func (b *BloomBackend) Close() { + close(b.closeBloomHandler) +} + +func (b *BloomBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { + for i := 0; i < ledgerstore.BloomFilterThreads; i++ { + go session.Multiplex(ledgerstore.BloomRetrievalBatch, ledgerstore.BloomRetrievalWait, b.bloomRequests) + } +} + +func (b *BloomBackend) BloomStatus() (uint32, uint32) { + return actor.BloomStatus() +} + +// startBloomHandlers starts a batch of goroutines to accept bloom bit database +// retrievals from possibly a range of filters and serving the data to satisfy. +func (b *BloomBackend) StartBloomHandlers(sectionSize uint32, db *leveldbstore.LevelDBStore) error { + for i := 0; i < ledgerstore.BloomServiceThreads; i++ { + go func() { + for { + select { + case <-b.closeBloomHandler: + return + + case request := <-b.bloomRequests: + task := <-request + task.Bitsets = make([][]byte, len(task.Sections)) + for i, section := range task.Sections { + if compVector, err := ledgerstore.ReadBloomBits(db, task.Bit, uint32(section)); err == nil { + if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil { + task.Bitsets[i] = blob + } else { + task.Error = err + } + } else { + task.Error = err + } + } + request <- task + } + } + }() + } + return nil +} diff --git a/http/ethrpc/eth/api.go b/http/ethrpc/eth/api.go index b9564b4f35..dffd1b3485 100644 --- a/http/ethrpc/eth/api.go +++ b/http/ethrpc/eth/api.go @@ -286,7 +286,7 @@ func (api *EthereumAPI) SendRawTransaction(data hexutil.Bytes) (common.Hash, err } func (api *EthereumAPI) Call(args types2.CallArgs, blockNumber types2.BlockNumber, _ *map[common.Address]types2.Account) (hexutil.Bytes, error) { - log.Debugf("eth_call args %v ,block number %v ", args, blockNumber) + log.Debugf("eth_call block number %v ", blockNumber) msg := args.AsMessage(RPCGasCap) res, err := bactor.PreExecuteEip155Tx(msg) if err != nil { @@ -316,7 +316,6 @@ type revertError struct { } func (api *EthereumAPI) EstimateGas(args types2.CallArgs) (hexutil.Uint64, error) { - log.Debugf("eth_estimateGas args %v", args) var ( lo uint64 = params.TxGas hi uint64 diff --git a/http/ethrpc/filters/api.go b/http/ethrpc/filters/api.go new file mode 100644 index 0000000000..5e9a2baa06 --- /dev/null +++ b/http/ethrpc/filters/api.go @@ -0,0 +1,446 @@ +/* + * Copyright (C) 2018 The ontology Authors + * This file is part of The ontology library. + * + * The ontology is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The ontology is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with The ontology. If not, see . + */ + +package filters + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/filters" + "github.com/ethereum/go-ethereum/rpc" +) + +// consider a filter inactive if it has not been polled for within deadline +var deadline = 5 * time.Minute + +// filter is a helper struct that holds meta information over the filter type +// and associated subscription in the event system. +type filter struct { + typ filters.Type + deadline *time.Timer // filter is inactive when deadline triggers + hashes []common.Hash + crit filters.FilterCriteria + logs []*ethtypes.Log + s *Subscription // associated subscription in event system +} + +type PublicFilterAPI struct { + backend Backend + filtersMu sync.Mutex + filters map[rpc.ID]*filter + events *EventSystem + timeout time.Duration +} + +func NewPublicFilterAPI(backend Backend) *PublicFilterAPI { + api := &PublicFilterAPI{ + backend: backend, + events: NewEventSystem(backend), + filters: make(map[rpc.ID]*filter), + timeout: deadline, + } + go api.timeoutLoop() + return api +} + +// timeoutLoop runs at the interval set by 'timeout' and deletes filters +// that have not been recently used. It is started when the API is created. +func (api *PublicFilterAPI) timeoutLoop() { + var toUninstall []*Subscription + ticker := time.NewTicker(deadline) + defer ticker.Stop() + for { + <-ticker.C + api.filtersMu.Lock() + for id, f := range api.filters { + select { + case <-f.deadline.C: + toUninstall = append(toUninstall) + delete(api.filters, id) + default: + continue + } + } + api.filtersMu.Unlock() + + // Unsubscribes are processed outside the lock to avoid the following scenario: + // event loop attempts broadcasting events to still active filters while + // Unsubscribe is waiting for it to process the uninstall request. + for _, s := range toUninstall { + s.Unsubscribe() + } + toUninstall = nil + } +} + +// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes +// as transactions enter the pending state. +// +// It is part of the filter package because this filter can be used through the +// `eth_getFilterChanges` polling method that is also used for log filters. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newPendingTransactionFilter +func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { + var ( + pendingTxs = make(chan []common.Hash) + pendingTxSub = api.events.SubscribePendingTxs(pendingTxs) + ) + + api.filtersMu.Lock() + api.filters[pendingTxSub.ID] = &filter{typ: filters.PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub} + api.filtersMu.Unlock() + + go func() { + for { + select { + case ph := <-pendingTxs: + api.filtersMu.Lock() + if f, found := api.filters[pendingTxSub.ID]; found { + f.hashes = append(f.hashes, ph...) + } + api.filtersMu.Unlock() + case <-pendingTxSub.Err(): + api.filtersMu.Lock() + delete(api.filters, pendingTxSub.ID) + api.filtersMu.Unlock() + return + } + } + }() + + return pendingTxSub.ID +} + +// NewPendingTransactions creates a subscription that is triggered each time a transaction +// enters the transaction pool and was signed from one of the transactions this nodes manages. +func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + rpcSub := notifier.CreateSubscription() + go func() { + txHashes := make(chan []common.Hash, 128) + pendingTxSub := api.events.SubscribePendingTxs(txHashes) + for { + select { + case hashes := <-txHashes: + // To keep the original behaviour, send a single tx hash in one notification. + // TODO(rjl493456442) Send a batch of tx hashes in one notification + for _, h := range hashes { + notifier.Notify(rpcSub.ID, h) + } + case <-rpcSub.Err(): + pendingTxSub.Unsubscribe() + return + case <-notifier.Closed(): + pendingTxSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + +// NewBlockFilter creates a filter that fetches blocks that are imported into the chain. +// It is part of the filter package since polling goes with eth_getFilterChanges. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter +func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { + var ( + headers = make(chan *ethtypes.Header) + headerSub = api.events.SubscribeNewHeads(headers) + ) + + api.filtersMu.Lock() + api.filters[headerSub.ID] = &filter{typ: filters.BlocksSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: headerSub} + api.filtersMu.Unlock() + + go func() { + for { + select { + case h := <-headers: + api.filtersMu.Lock() + if f, found := api.filters[headerSub.ID]; found { + f.hashes = append(f.hashes, h.Hash()) + } + api.filtersMu.Unlock() + case <-headerSub.Err(): + api.filtersMu.Lock() + delete(api.filters, headerSub.ID) + api.filtersMu.Unlock() + return + } + } + }() + + return headerSub.ID +} + +// NewHeads send a notification each time a new (header) block is appended to the chain. +func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + headers := make(chan *ethtypes.Header) + headersSub := api.events.SubscribeNewHeads(headers) + + for { + select { + case h := <-headers: + notifier.Notify(rpcSub.ID, h) + case <-rpcSub.Err(): + headersSub.Unsubscribe() + return + case <-notifier.Closed(): + headersSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + +// Logs creates a subscription that fires for all new log that match the given filter criteria. +func (api *PublicFilterAPI) Logs(ctx context.Context, crit filters.FilterCriteria) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + var ( + rpcSub = notifier.CreateSubscription() + matchedLogs = make(chan []*ethtypes.Log) + ) + + logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs) + if err != nil { + return nil, err + } + + go func() { + + for { + select { + case logs := <-matchedLogs: + for _, log := range logs { + notifier.Notify(rpcSub.ID, &log) + } + case <-rpcSub.Err(): // client send an unsubscribe request + logsSub.Unsubscribe() + return + case <-notifier.Closed(): // connection dropped + logsSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + +// FilterCriteria represents a request to create a new filter. +// Same as ethereum.FilterQuery but with UnmarshalJSON() method. +type FilterCriteria ethereum.FilterQuery + +// NewFilter creates a new filter and returns the filter id. It can be +// used to retrieve logs when the state changes. This method cannot be +// used to fetch logs that are already stored in the state. +// +// Default criteria for the from and to block are "latest". +// Using "latest" as block number will return logs for mined blocks. +// Using "pending" as block number returns logs for not yet mined (pending) blocks. +// In case logs are removed (chain reorg) previously returned logs are returned +// again but with the removed property set to true. +// +// In case "fromBlock" > "toBlock" an error is returned. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter +func (api *PublicFilterAPI) NewFilter(criteria filters.FilterCriteria) (rpc.ID, error) { + logs := make(chan []*ethtypes.Log) + logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(criteria), logs) + if err != nil { + return "", err + } + + api.filtersMu.Lock() + api.filters[logsSub.ID] = &filter{typ: filters.LogsSubscription, crit: criteria, deadline: time.NewTimer(api.timeout), logs: make([]*ethtypes.Log, 0), s: logsSub} + api.filtersMu.Unlock() + + go func() { + for { + select { + case l := <-logs: + api.filtersMu.Lock() + if f, found := api.filters[logsSub.ID]; found { + f.logs = append(f.logs, l...) + } + api.filtersMu.Unlock() + case <-logsSub.Err(): + api.filtersMu.Lock() + delete(api.filters, logsSub.ID) + api.filtersMu.Unlock() + return + } + } + }() + + return logsSub.ID, nil +} + +// GetLogs returns logs matching the given argument that are stored within the state. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getLogs +func (api *PublicFilterAPI) GetLogs(ctx context.Context, criteria filters.FilterCriteria) ([]*ethtypes.Log, error) { + var filter *Filter + + if criteria.BlockHash != nil { + // Block filter requested, construct a single-shot filter + filter = NewBlockFilter(api.backend, criteria) + } else { + // Convert the RPC block numbers into internal representations + begin := rpc.LatestBlockNumber.Int64() + if criteria.FromBlock != nil { + begin = criteria.FromBlock.Int64() + } + end := rpc.LatestBlockNumber.Int64() + if criteria.ToBlock != nil { + end = criteria.ToBlock.Int64() + } + // Construct the range filter + filter = NewRangeFilter(api.backend, begin, end, criteria.Addresses, criteria.Topics) + } + + // Run the filter and return all the logs + logs, err := filter.Logs(ctx) + if err != nil { + return logs, err + } + return returnLogs(logs), nil +} + +// UninstallFilter removes the filter with the given filter id. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter +func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool { + api.filtersMu.Lock() + f, found := api.filters[id] + if found { + delete(api.filters, id) + } + api.filtersMu.Unlock() + if found { + f.s.Unsubscribe() + } + + return found +} + +// GetFilterLogs returns the logs for the filter with the given id. +// If the filter could not be found an empty array of logs is returned. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs +func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ethtypes.Log, error) { + api.filtersMu.Lock() + f, found := api.filters[id] + api.filtersMu.Unlock() + + if !found { + return returnLogs(nil), fmt.Errorf("filter %s not found", id) + } + + if f.typ != filters.LogsSubscription { + return returnLogs(nil), fmt.Errorf("filter %s doesn't have a LogsSubscription type: got %d", id, f.typ) + } + + var filter *Filter + var err error + if f.crit.BlockHash != nil { + // Block filter requested, construct a single-shot filter + filter = NewBlockFilter(api.backend, f.crit) + } else { + // Convert the RPC block numbers into internal representations + begin := rpc.LatestBlockNumber.Int64() + if f.crit.FromBlock != nil { + begin = f.crit.FromBlock.Int64() + } + end := rpc.LatestBlockNumber.Int64() + if f.crit.ToBlock != nil { + end = f.crit.ToBlock.Int64() + } + // Construct the range filter + filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics) + } + // Run the filter and return all the logs + logs, err := filter.Logs(ctx) + if err != nil { + return nil, err + } + return returnLogs(logs), nil +} + +// GetFilterChanges returns the logs for the filter with the given id since +// last time it was called. This can be used for polling. +// +// For pending transaction and block filters the result is []common.Hash. +// (pending)Log filters return []Log. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges +func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { + api.filtersMu.Lock() + defer api.filtersMu.Unlock() + + f, found := api.filters[id] + if !found { + return nil, fmt.Errorf("filter %s not found", id) + } + + if !f.deadline.Stop() { + // timer expired but filter is not yet removed in timeout loop + // receive timer value and reset timer + <-f.deadline.C + } + f.deadline.Reset(deadline) + + switch f.typ { + case filters.PendingTransactionsSubscription, filters.BlocksSubscription: + hashes := f.hashes + f.hashes = nil + return returnHashes(hashes), nil + case filters.LogsSubscription, filters.MinedAndPendingLogsSubscription: + logs := make([]*ethtypes.Log, len(f.logs)) + copy(logs, f.logs) + f.logs = []*ethtypes.Log{} + return returnLogs(logs), nil + default: + return nil, fmt.Errorf("invalid filter %s type %d", id, f.typ) + } +} diff --git a/http/ethrpc/filters/filter.go b/http/ethrpc/filters/filter.go new file mode 100644 index 0000000000..6a3035f1b1 --- /dev/null +++ b/http/ethrpc/filters/filter.go @@ -0,0 +1,489 @@ +/* + * Copyright (C) 2018 The ontology Authors + * This file is part of The ontology library. + * + * The ontology is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The ontology is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with The ontology. If not, see . + */ + +package filters + +import ( + "context" + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/bloombits" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/filters" + common2 "github.com/ontio/ontology/common" + common4 "github.com/ontio/ontology/core/store/common" + "github.com/ontio/ontology/core/types" + "github.com/ontio/ontology/http/base/actor" + utils2 "github.com/ontio/ontology/http/ethrpc/utils" + "github.com/ontio/ontology/smartcontract/event" +) + +const MAX_SEARCH_RANGE = 100000 + +type Backend interface { + BloomStatus() (uint32, uint32) + ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) +} + +// Filter can be used to retrieve and filter logs. +type Filter struct { + backend Backend + criteria filters.FilterCriteria + matcher *bloombits.Matcher +} + +// NewBlockFilter creates a new filter which directly inspects the contents of +// a block to figure out whether it is interesting or not. +func NewBlockFilter(backend Backend, criteria filters.FilterCriteria) *Filter { + // Create a generic filter and convert it into a block filter + return newFilter(backend, criteria, nil) +} + +// NewRangeFilter creates a new filter which uses a bloom filter on blocks to +// figure out whether a particular block is interesting or not. +func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { + // Flatten the address and topic filter clauses into a single bloombits filter + // system. Since the bloombits are not positional, nil topics are permitted, + // which get flattened into a nil byte slice. + var filtersBz [][][]byte // nolint: prealloc + if len(addresses) > 0 { + filter := make([][]byte, len(addresses)) + for i, address := range addresses { + filter[i] = address.Bytes() + } + filtersBz = append(filtersBz, filter) + } + + for _, topicList := range topics { + filter := make([][]byte, len(topicList)) + for i, topic := range topicList { + filter[i] = topic.Bytes() + } + filtersBz = append(filtersBz, filter) + } + + size, _ := actor.BloomStatus() + + // Create a generic filter and convert it into a range filter + criteria := filters.FilterCriteria{ + FromBlock: big.NewInt(begin), + ToBlock: big.NewInt(end), + Addresses: addresses, + Topics: topics, + } + + return newFilter(backend, criteria, bloombits.NewMatcher(uint64(size), filtersBz)) +} + +// newFilter returns a new Filter +func newFilter(backend Backend, criteria filters.FilterCriteria, matcher *bloombits.Matcher) *Filter { + return &Filter{ + backend: backend, + criteria: criteria, + matcher: matcher, + } +} + +// Logs searches the blockchain for matching log entries, returning all from the +// first block that contains matches, updating the start of the filter accordingly. +func (f *Filter) Logs(ctx context.Context) ([]*ethtypes.Log, error) { + var logs []*ethtypes.Log + var err error + + // If we're doing singleton block filtering, execute and return + if f.criteria.BlockHash != nil && f.criteria.BlockHash != (&common.Hash{}) { + block, err := actor.GetBlockFromStore(common2.Uint256(*f.criteria.BlockHash)) + if err != nil { + return nil, err + } + if block.Header == nil { + return nil, fmt.Errorf("unknown block header %s", f.criteria.BlockHash.String()) + } + bloom, err := actor.GetBloomData(block.Header.Height) + if err != nil { + return nil, err + } + return f.blockLogs(bloom, common2.Uint256(*f.criteria.BlockHash)) + } + + // Figure out the limits of the filter range + curHeight := actor.GetCurrentBlockHeight() + block, err := actor.GetBlockByHeight(curHeight) + if err != nil { + return nil, err + } + + if block == nil { + return nil, nil + } + + if f.criteria.FromBlock.Int64() == -1 { + f.criteria.FromBlock = big.NewInt(int64(curHeight)) + } + if f.criteria.ToBlock.Int64() == -1 { + f.criteria.ToBlock = big.NewInt(int64(curHeight)) + } + + start := actor.GetFilterStart() + + if f.criteria.FromBlock.Int64() < int64(start) || + f.criteria.ToBlock.Int64() < int64(start) { + return nil, fmt.Errorf("from and to block height must greater than %d", int64(start)) + } + + if f.criteria.ToBlock.Int64()-f.criteria.FromBlock.Int64() > MAX_SEARCH_RANGE { + return nil, fmt.Errorf("the span between fromBlock and toBlock must be less than or equal to %d", MAX_SEARCH_RANGE) + } + + begin := f.criteria.FromBlock.Uint64() + end := f.criteria.ToBlock.Uint64() + size, sections := actor.BloomStatus() + + if indexed := uint64(sections * size); indexed > begin { + if indexed > end { + logs, err = f.indexedLogs(ctx, end) + } else { + logs, err = f.indexedLogs(ctx, indexed-1) + } + if err != nil { + return logs, err + } + } + rest, err := f.unindexedLogs(ctx, end) + logs = append(logs, rest...) + return logs, err +} + +// blockLogs returns the logs matching the filter criteria within a single block. +func (f *Filter) blockLogs(bloom ethtypes.Bloom, hash common2.Uint256) ([]*ethtypes.Log, error) { + if !bloomFilter(bloom, f.criteria.Addresses, f.criteria.Topics) { + return []*ethtypes.Log{}, nil + } + + logsList, err := getLogs(hash) + if err != nil { + return []*ethtypes.Log{}, err + } + + var unfiltered []*ethtypes.Log // nolint: prealloc + for _, logs := range logsList { + unfiltered = append(unfiltered, logs...) + } + logs := FilterLogs(unfiltered, nil, nil, f.criteria.Addresses, f.criteria.Topics) + if len(logs) == 0 { + return []*ethtypes.Log{}, nil + } + return logs, nil +} + +func getLogs(hash common2.Uint256) ([][]*ethtypes.Log, error) { + block, err := actor.GetBlockFromStore(hash) + if err != nil { + if err == common4.ErrNotFound { + return nil, err + } + return nil, err + } + + var res [][]*ethtypes.Log + for _, tx := range block.Transactions { + if tx.TxType != types.EIP155 { + continue + } + notify, err := actor.GetEventNotifyByTxHash(tx.Hash()) + if err != nil { + if err == common4.ErrNotFound { + continue + } + return nil, err + } + if notify != nil { + txLogs, err := generateLog(notify) + if err != nil { + return nil, err + } + if txLogs != nil && len(txLogs) != 0 { + res = append(res, txLogs) + } + } + } + return res, nil +} + +func generateLog(rawNotify *event.ExecuteNotify) ([]*ethtypes.Log, error) { + var res []*ethtypes.Log + txHash := rawNotify.TxHash + height, _, err := actor.GetTxnWithHeightByTxHash(txHash) + if err != nil { + return nil, err + } + hash := actor.GetBlockHashFromStore(height) + ethHash := utils2.OntToEthHash(hash) + for idx, n := range rawNotify.Notify { + storageLog, err := event.NotifyEventInfoToEvmLog(n) + if err != nil { + return nil, err + } + res = append(res, + ðtypes.Log{ + Address: storageLog.Address, + Topics: storageLog.Topics, + Data: storageLog.Data, + BlockNumber: uint64(height), + TxHash: utils2.OntToEthHash(txHash), + TxIndex: uint(rawNotify.TxIndex), + BlockHash: ethHash, + Index: uint(idx), + Removed: false, + }) + } + + return res, nil +} + +// checkMatches checks if the receipts belonging to the given header contain any log events that +// match the filter criteria. This function is called when the bloom filter signals a potential match. +func (f *Filter) checkMatches(hash common2.Uint256) (logs []*ethtypes.Log, err error) { + // Get the logs of the block + logsList, err := getLogs(hash) + if err != nil { + return nil, err + } + var unfiltered []*ethtypes.Log + for _, logs := range logsList { + unfiltered = append(unfiltered, logs...) + } + logs = filterLogs(unfiltered, nil, nil, f.criteria.Addresses, f.criteria.Topics) + return logs, nil +} + +// indexedLogs returns the logs matching the filter criteria based on the bloom +// bits indexed available locally or via the network. +func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*ethtypes.Log, error) { + // Create a matcher session and request servicing from the backend + matches := make(chan uint64, 64) + + session, err := f.matcher.Start(ctx, f.criteria.FromBlock.Uint64(), end, matches) + if err != nil { + return nil, err + } + defer session.Close() + f.backend.ServiceFilter(ctx, session) + + // Iterate over the matches until exhausted or context closed + var logs []*ethtypes.Log + + bigEnd := big.NewInt(int64(end)) + for { + select { + case number, ok := <-matches: + + // Abort if all matches have been fulfilled + if !ok { + err := session.Error() + if err == nil { + f.criteria.FromBlock = bigEnd.Add(bigEnd, big.NewInt(1)) + } + return logs, err + } + f.criteria.FromBlock = big.NewInt(int64(number)).Add(big.NewInt(int64(number)), big.NewInt(1)) + + // Retrieve the suggested block and pull any truly matching logs + block, err := actor.GetBlockByHeight(uint32(number)) + if err != nil { + return nil, err + } + if block == nil { + return nil, fmt.Errorf("block %v not found", number) + } + found, err := f.checkMatches(block.Hash()) + if err != nil { + return logs, err + } + logs = append(logs, found...) + + case <-ctx.Done(): + return logs, ctx.Err() + } + } +} + +// unindexedLogs returns the logs matching the filter criteria based on raw block +// iteration and bloom matching. +func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*ethtypes.Log, error) { + var logs []*ethtypes.Log + begin := f.criteria.FromBlock.Int64() + beginPtr := &begin + defer f.criteria.FromBlock.SetInt64(*beginPtr) + + for ; begin <= int64(end); begin++ { + block, err := actor.GetBlockByHeight(uint32(begin)) + if err != nil { + return nil, err + } + if block == nil { + return logs, nil + } + if block.Header == nil { + return nil, fmt.Errorf("unknown block header %s", f.criteria.BlockHash.String()) + } + bloom, err := actor.GetBloomData(block.Header.Height) + if err != nil { + return nil, err + } + found, err := f.blockLogs(bloom, block.Hash()) + if err != nil { + return logs, err + } + logs = append(logs, found...) + } + return logs, nil +} + +// filterLogs creates a slice of logs matching the given criteria. +func filterLogs(logs []*ethtypes.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*ethtypes.Log { + var ret []*ethtypes.Log +Logs: + for _, log := range logs { + if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber { + continue + } + if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber { + continue + } + + if len(addresses) > 0 && !includes(addresses, log.Address) { + continue + } + // If the to filtered topics is greater than the amount of topics in logs, skip. + if len(topics) > len(log.Topics) { + continue Logs + } + for i, sub := range topics { + match := len(sub) == 0 // empty rule set == wildcard + for _, topic := range sub { + if log.Topics[i] == topic { + match = true + break + } + } + if !match { + continue Logs + } + } + ret = append(ret, log) + } + return ret +} + +// filterLogs creates a slice of logs matching the given criteria. +// [] -> anything +// [A] -> A in first position of log topics, anything after +// [null, B] -> anything in first position, B in second position +// [A, B] -> A in first position and B in second position +// [[A, B], [A, B]] -> A or B in first position, A or B in second position +func FilterLogs(logs []*ethtypes.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*ethtypes.Log { + var ret []*ethtypes.Log +Logs: + for _, log := range logs { + if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber { + continue + } + if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber { + continue + } + if len(addresses) > 0 && !includes(addresses, log.Address) { + continue + } + // If the to filtered topics is greater than the amount of topics in logs, skip. + if len(topics) > len(log.Topics) { + continue + } + for i, sub := range topics { + match := len(sub) == 0 // empty rule set == wildcard + for _, topic := range sub { + if log.Topics[i] == topic { + match = true + break + } + } + if !match { + continue Logs + } + } + ret = append(ret, log) + } + return ret +} + +func includes(addresses []common.Address, a common.Address) bool { + for _, addr := range addresses { + if addr == a { + return true + } + } + + return false +} + +func bloomFilter(bloom ethtypes.Bloom, addresses []common.Address, topics [][]common.Hash) bool { + var included bool = true + if len(addresses) > 0 { + included = false + for _, addr := range addresses { + if ethtypes.BloomLookup(bloom, addr) { + included = true + break + } + } + if !included { + return false + } + } + + for _, sub := range topics { + included = len(sub) == 0 // empty rule set == wildcard + for _, topic := range sub { + if ethtypes.BloomLookup(bloom, topic) { + included = true + break + } + } + } + return included +} + +// returnHashes is a helper that will return an empty hash array case the given hash array is nil, +// otherwise the given hashes array is returned. +func returnHashes(hashes []common.Hash) []common.Hash { + if hashes == nil { + return []common.Hash{} + } + return hashes +} + +// returnLogs is a helper that will return an empty log array in case the given logs array is nil, +// otherwise the given logs array is returned. +func returnLogs(logs []*ethtypes.Log) []*ethtypes.Log { + if logs == nil { + return []*ethtypes.Log{} + } + return logs +} diff --git a/http/ethrpc/filters/filter_system.go b/http/ethrpc/filters/filter_system.go new file mode 100644 index 0000000000..6eff2664be --- /dev/null +++ b/http/ethrpc/filters/filter_system.go @@ -0,0 +1,316 @@ +/* + * Copyright (C) 2018 The ontology Authors + * This file is part of The ontology library. + * + * The ontology is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The ontology is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with The ontology. If not, see . + */ +package filters + +import ( + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ontio/ontology/events/message" + bactor "github.com/ontio/ontology/http/base/actor" +) + +// Type determines the kind of filter and is used to put the filter in to +// the correct bucket when added. +type Type byte + +const ( + // UnknownSubscription indicates an unknown subscription type + UnknownSubscription Type = iota + // LogsSubscription queries for new or removed (chain reorg) logs + LogsSubscription + // PendingTransactionsSubscription queries tx hashes for pending + // transactions entering the pending state + PendingTransactionsSubscription + // BlocksSubscription queries hashes for blocks that are imported + BlocksSubscription + // LastSubscription keeps track of the last index + LastIndexSubscription +) + +const ( + // txChanSize is the size of channel listening to NewTxsEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 + // logsChanSize is the size of channel listening to LogsEvent. + logsChanSize = 10 + // chainEvChanSize is the size of channel listening to ChainEvent. + chainEvChanSize = 10 +) + +type subscription struct { + id rpc.ID + typ Type + created time.Time + logsCrit ethereum.FilterQuery + logs chan []*types.Log + hashes chan []common.Hash + headers chan *types.Header + installed chan struct{} // closed when the filter is installed + err chan error // closed when the filter is uninstalled +} + +// EventSystem creates subscriptions, processes events and broadcasts them to the +// subscription which match the subscription criteria. +type EventSystem struct { + backend Backend + lastHead *types.Header + + // Channels + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txsCh chan core.NewTxsEvent // Channel to receive new transactions event + logsCh chan []*types.Log // Channel to receive new log event + chainCh chan core.ChainEvent // Channel to receive new chain event +} + +// NewEventSystem creates a new manager that listens for event on the given mux, +// parses and filters them. It uses the all map to retrieve filter changes. The +// work loop holds its own index that is used to forward events to filters. +// +// The returned manager has a loop that needs to be stopped with the Stop function +// or by stopping the given mux. +func NewEventSystem(backend Backend) *EventSystem { + m := &EventSystem{ + backend: backend, + install: make(chan *subscription), + uninstall: make(chan *subscription), + txsCh: make(chan core.NewTxsEvent, txChanSize), + logsCh: make(chan []*types.Log, logsChanSize), + chainCh: make(chan core.ChainEvent, chainEvChanSize), + } + + // Subscribe events + bactor.SubscribeEvent(message.TOPIC_PENDING_TX_EVENT, m.pushPendingTxEvent) + bactor.SubscribeEvent(message.TOPIC_ETH_SC_EVENT, m.pushSCEvent) + bactor.SubscribeEvent(message.TOPIC_CHAIN_EVENT, m.pushChainEvent) + + go m.eventLoop() + return m +} + +func (es *EventSystem) pushPendingTxEvent(v interface{}) { + rs, ok := v.(message.PendingTxs) + if !ok { + return + } + es.txsCh <- core.NewTxsEvent{ + Txs: rs, + } +} + +func (es *EventSystem) pushSCEvent(v interface{}) { + rs, ok := v.(message.EthSmartCodeEvent) + if !ok { + return + } + es.logsCh <- rs +} + +func (es *EventSystem) pushChainEvent(v interface{}) { + rs, ok := v.(core.ChainEvent) + if !ok { + return + } + es.chainCh <- rs +} + +// Subscription is created when the client registers itself for a particular event. +type Subscription struct { + ID rpc.ID + f *subscription + es *EventSystem + unsubOnce sync.Once +} + +// Err returns a channel that is closed when unsubscribed. +func (sub *Subscription) Err() <-chan error { + return sub.f.err +} + +// Unsubscribe uninstalls the subscription from the event broadcast loop. +func (sub *Subscription) Unsubscribe() { + sub.unsubOnce.Do(func() { + uninstallLoop: + for { + // write uninstall request and consume logs/hashes. This prevents + // the eventLoop broadcast method to deadlock when writing to the + // filter event channel while the subscription loop is waiting for + // this method to return (and thus not reading these events). + select { + case sub.es.uninstall <- sub.f: + break uninstallLoop + case <-sub.f.logs: + case <-sub.f.hashes: + case <-sub.f.headers: + } + } + + // wait for filter to be uninstalled in work loop before returning + // this ensures that the manager won't use the event channel which + // will probably be closed by the client asap after this method returns. + <-sub.Err() + }) +} + +// subscribe installs the subscription in the event broadcast loop. +func (es *EventSystem) subscribe(sub *subscription) *Subscription { + es.install <- sub + <-sub.installed + return &Subscription{ID: sub.id, f: sub, es: es} +} + +// SubscribeLogs creates a subscription that will write all logs matching the +// given criteria to the given logs channel. Default value for the from and to +// block is "latest". If the fromBlock > toBlock an error is returned. +func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) (*Subscription, error) { + var from, to rpc.BlockNumber + if crit.FromBlock == nil { + from = rpc.LatestBlockNumber + } else { + from = rpc.BlockNumber(crit.FromBlock.Int64()) + } + if crit.ToBlock == nil { + to = rpc.LatestBlockNumber + } else { + to = rpc.BlockNumber(crit.ToBlock.Int64()) + } + + // only interested in new mined logs + if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber { + return es.subscribeLogs(crit, logs), nil + } + // only interested in mined logs within a specific block range + if from >= 0 && to >= 0 && to >= from { + return es.subscribeLogs(crit, logs), nil + } + + // interested in logs from a specific block number to new mined blocks + if from >= 0 && to == rpc.LatestBlockNumber { + return es.subscribeLogs(crit, logs), nil + } + return nil, fmt.Errorf("invalid from and to block combination: from > to") +} + +// subscribeLogs creates a subscription that will write all logs matching the +// given criteria to the given logs channel. +func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: LogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// SubscribeNewHeads creates a subscription that writes the header of a block that is +// imported in the chain. +func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: BlocksSubscription, + created: time.Now(), + headers: headers, + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// SubscribePendingTxs creates a subscription that writes transaction hashes for +// transactions that enter the transaction pool. +func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: PendingTransactionsSubscription, + created: time.Now(), + hashes: hashes, + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +type filterIndex map[Type]map[rpc.ID]*subscription + +func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) { + if len(ev) == 0 { + return + } + for _, f := range filters[LogsSubscription] { + matchedLogs := filterLogs(ev, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics) + if len(matchedLogs) > 0 { + f.logs <- matchedLogs + } + } +} + +func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) { + hashes := make([]common.Hash, 0, len(ev.Txs)) + for _, tx := range ev.Txs { + hashes = append(hashes, tx.Hash()) + } + for _, f := range filters[PendingTransactionsSubscription] { + f.hashes <- hashes + } +} + +func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) { + for _, f := range filters[BlocksSubscription] { + f.headers <- ev.Block.Header() + } +} + +// eventLoop (un)installs filters and processes mux events. +func (es *EventSystem) eventLoop() { + // Ensure all subscriptions get cleaned up + + index := make(filterIndex) + for i := UnknownSubscription; i < LastIndexSubscription; i++ { + index[i] = make(map[rpc.ID]*subscription) + } + + for { + select { + case ev := <-es.txsCh: + es.handleTxsEvent(index, ev) + case ev := <-es.logsCh: + es.handleLogs(index, ev) + case ev := <-es.chainCh: + es.handleChainEvent(index, ev) + + case f := <-es.install: + index[f.typ][f.id] = f + close(f.installed) + + case f := <-es.uninstall: + delete(index[f.typ], f.id) + close(f.err) + } + } +} diff --git a/http/ethrpc/filters/filter_system_test.go b/http/ethrpc/filters/filter_system_test.go new file mode 100644 index 0000000000..7ceb6f1189 --- /dev/null +++ b/http/ethrpc/filters/filter_system_test.go @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2018 The ontology Authors + * This file is part of The ontology library. + * + * The ontology is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The ontology is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with The ontology. If not, see . + */ + +package filters + +import ( + "fmt" + "math/big" + "testing" + "time" + + common2 "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ontio/ontology/events" + "github.com/ontio/ontology/events/message" +) + +func TestNewEventSystem(t *testing.T) { + events.Init() + + es := NewEventSystem(nil) + fmt.Println("es:", es) + + header := &types.Header{Number: big.NewInt(1)} + chainEvtMsg := &message.ChainEventMsg{ + ChainEvent: &core.ChainEvent{ + Block: types.NewBlockWithHeader(header), + Hash: common2.Hash{}, + Logs: make([]*types.Log, 0), + }, + } + events.DefActorPublisher.Publish(message.TOPIC_CHAIN_EVENT, chainEvtMsg) + time.Sleep(time.Second) + + scEvt := make(message.EthSmartCodeEvent, 0) + scEvt = append(scEvt, &types.Log{}) + ethLog := &message.EthSmartCodeEventMsg{Event: scEvt} + events.DefActorPublisher.Publish(message.TOPIC_ETH_SC_EVENT, ethLog) + time.Sleep(time.Second) + + pendingTxEvt := &message.PendingTxMsg{Event: []*types.Transaction{&types.Transaction{}}} + events.DefActorPublisher.Publish(message.TOPIC_PENDING_TX_EVENT, pendingTxEvt) + time.Sleep(time.Second) +} diff --git a/http/ethrpc/rpc_server.go b/http/ethrpc/rpc_server.go index ded55c4914..9f64c36e93 100644 --- a/http/ethrpc/rpc_server.go +++ b/http/ethrpc/rpc_server.go @@ -24,7 +24,11 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" cfg "github.com/ontio/ontology/common/config" + "github.com/ontio/ontology/core/store/ledgerstore" + "github.com/ontio/ontology/http/base/actor" + backend2 "github.com/ontio/ontology/http/ethrpc/backend" "github.com/ontio/ontology/http/ethrpc/eth" + filters2 "github.com/ontio/ontology/http/ethrpc/filters" "github.com/ontio/ontology/http/ethrpc/net" "github.com/ontio/ontology/http/ethrpc/utils" "github.com/ontio/ontology/http/ethrpc/web3" @@ -33,20 +37,24 @@ import ( func StartEthServer(txpool *tp.TXPoolServer) error { log.Root().SetHandler(utils.OntLogHandler()) - ethAPI := eth.NewEthereumAPI(txpool) server := rpc.NewServer() - err := server.RegisterName("eth", ethAPI) - if err != nil { + if err := server.RegisterName("eth", eth.NewEthereumAPI(txpool)); err != nil { return err } - netRpcService := net.NewPublicNetAPI() - err = server.RegisterName("net", netRpcService) + + backend := backend2.NewBloomBackend() + err := backend.StartBloomHandlers(ledgerstore.BloomBitsBlocks, actor.GetIndexStore()) if err != nil { return err } - web3API := web3.NewAPI() - err = server.RegisterName("web3", web3API) - if err != nil { + + if err := server.RegisterName("eth", filters2.NewPublicFilterAPI(backend)); err != nil { + return err + } + if err := server.RegisterName("net", net.NewPublicNetAPI()); err != nil { + return err + } + if err := server.RegisterName("web3", web3.NewAPI()); err != nil { return err } err = http.ListenAndServe(":"+strconv.Itoa(int(cfg.DefConfig.Rpc.EthJsonPort)), server) diff --git a/http/ethrpc/utils/utils.go b/http/ethrpc/utils/utils.go index c82bfcf94b..d471fae3e2 100644 --- a/http/ethrpc/utils/utils.go +++ b/http/ethrpc/utils/utils.go @@ -24,6 +24,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" types2 "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/trie" oComm "github.com/ontio/ontology/common" sysconfig "github.com/ontio/ontology/common/config" "github.com/ontio/ontology/core/types" @@ -47,6 +48,33 @@ func EthBlockFromOntology(block *types.Block, fullTx bool) map[string]interface{ return FormatBlock(*block, 0, gasUsed, blockTxs) } +func RawEthBlockFromOntology(block *types.Block, bloom types2.Bloom) *types2.Block { + if block == nil { + return nil + } + hash := block.Hash() + gasUsed, ethTxs := RawEthTransactionsFromOntology(block.Transactions, common.BytesToHash(hash.ToArray()), uint64(block.Header.Height)) + + h := &types2.Header{ + ParentHash: common.Hash(block.Header.PrevBlockHash), + UncleHash: common.Hash{}, + Coinbase: common.Address{}, + Root: common.Hash{}, + TxHash: common.Hash(block.Header.TransactionsRoot), + ReceiptHash: common.Hash{}, + Bloom: bloom, + Difficulty: big.NewInt(0), + Number: big.NewInt(int64(block.Header.Height)), + GasLimit: 0, + GasUsed: gasUsed.Uint64(), + Time: uint64(block.Header.Timestamp), + Extra: []byte{}, + MixDigest: common.Hash{}, + Nonce: types2.BlockNonce{}, + } + return types2.NewBlock(h, ethTxs, nil, nil, new(trie.Trie)) +} + func EthTransactionsFromOntology(txs []*types.Transaction, blockHash common.Hash, blockNumber uint64) ([]common.Hash, *big.Int, []*types3.Transaction) { var transactionHashes []common.Hash var transactions []*types3.Transaction @@ -64,6 +92,22 @@ func EthTransactionsFromOntology(txs []*types.Transaction, blockHash common.Hash return transactionHashes, gasUsed, transactions } +func RawEthTransactionsFromOntology(txs []*types.Transaction, blockHash common.Hash, blockNumber uint64) (*big.Int, []*types2.Transaction) { + var transactions []*types2.Transaction + gasUsed := big.NewInt(0) + for _, tx := range txs { + if tx.IsEipTx() { + eipTx, err := tx.GetEIP155Tx() + if err != nil { + continue + } + gasUsed.Add(gasUsed, big.NewInt(int64(eipTx.Gas()))) + transactions = append(transactions, eipTx) + } + } + return gasUsed, transactions +} + func OntTxToEthTx(tx types.Transaction, blockHash common.Hash, blockNumber, index uint64) (*types3.Transaction, error) { eip155Tx, err := tx.GetEIP155Tx() if err != nil { @@ -81,7 +125,7 @@ func FormatBlock(block types.Block, gasLimit uint64, gasUsed *big.Int, transacti "hash": hexutil.Bytes(hash[:]), "parentHash": hexutil.Bytes(header.PrevBlockHash[:]), "nonce": types2.BlockNonce{}, // PoW specific - "sha3Uncles": common.Hash{}, // No uncles in Tendermint + "sha3Uncles": common.Hash{}, "logsBloom": types2.Bloom{}, "transactionsRoot": hexutil.Bytes(header.TransactionsRoot[:]), "stateRoot": hexutil.Bytes{}, diff --git a/smartcontract/event/event.go b/smartcontract/event/event.go index df6ce32a25..9eb9391f05 100644 --- a/smartcontract/event/event.go +++ b/smartcontract/event/event.go @@ -19,10 +19,14 @@ package event import ( + common2 "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + types3 "github.com/ethereum/go-ethereum/core/types" "github.com/ontio/ontology/common" "github.com/ontio/ontology/core/types" "github.com/ontio/ontology/events" "github.com/ontio/ontology/events/message" + utils2 "github.com/ontio/ontology/http/ethrpc/utils" ) const ( @@ -43,3 +47,78 @@ func PushSmartCodeEvent(txHash common.Uint256, errcode int64, action string, res } events.DefActorPublisher.Publish(message.TOPIC_SMART_CODE_EVENT, &message.SmartCodeEventMsg{Event: smartCodeEvt}) } + +// PushSmartCodeEvent push event content to socket.io +func PushEthSmartCodeEvent(rawNotify *ExecuteNotify, blk *types.Block) { + if events.DefActorPublisher == nil { + return + } + msg := extractSingleEthLog(rawNotify, blk) + events.DefActorPublisher.Publish(message.TOPIC_ETH_SC_EVENT, &message.EthSmartCodeEventMsg{Event: msg}) +} + +// PushSmartCodeEvent push event content to socket.io +func PushChainEvent(rawNotify []*ExecuteNotify, blk *types.Block, bloom types3.Bloom) { + if events.DefActorPublisher == nil { + return + } + events.DefActorPublisher.Publish( + message.TOPIC_CHAIN_EVENT, + &message.ChainEventMsg{ + ChainEvent: &core.ChainEvent{ + Block: utils2.RawEthBlockFromOntology(blk, bloom), + Hash: common2.Hash(blk.Hash()), + Logs: extractEthLog(rawNotify, blk), + }, + }) +} + +func extractSingleEthLog(rawNotify *ExecuteNotify, blk *types.Block) []*types3.Log { + var res []*types3.Log + if isEIP155Tx(blk, rawNotify.TxHash) { + res = genEthLog(rawNotify, blk) + } + return res +} + +func extractEthLog(rawNotify []*ExecuteNotify, blk *types.Block) []*types3.Log { + var res []*types3.Log + for _, rn := range rawNotify { + res = append(res, extractSingleEthLog(rn, blk)...) + } + return res +} + +func genEthLog(rawNotify *ExecuteNotify, blk *types.Block) []*types3.Log { + var res []*types3.Log + txHash := rawNotify.TxHash + ethHash := utils2.OntToEthHash(txHash) + for idx, n := range rawNotify.Notify { + storageLog, err := NotifyEventInfoToEvmLog(n) + if err != nil { + return nil + } + res = append(res, + &types3.Log{ + Address: storageLog.Address, + Topics: storageLog.Topics, + Data: storageLog.Data, + BlockNumber: uint64(blk.Header.Height), + TxHash: utils2.OntToEthHash(txHash), + TxIndex: uint(rawNotify.TxIndex), + BlockHash: ethHash, + Index: uint(idx), + Removed: false, + }) + } + return res +} + +func isEIP155Tx(block *types.Block, txHash common.Uint256) bool { + for _, tx := range block.Transactions { + if tx.Hash() == txHash { + return tx.IsEipTx() + } + } + return false +} diff --git a/smartcontract/event/notify_event_args.go b/smartcontract/event/notify_event_args.go index 7582b108a7..0dfd04b75d 100644 --- a/smartcontract/event/notify_event_args.go +++ b/smartcontract/event/notify_event_args.go @@ -19,13 +19,11 @@ package event import ( - "errors" "fmt" - "github.com/ontio/ontology/common/constants" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ontio/ontology/common" + "github.com/ontio/ontology/common/constants" "github.com/ontio/ontology/core/types" ) @@ -80,23 +78,27 @@ func NotifyEventInfoFromEvmLog(log *types.StorageLog) *NotifyEventInfo { } } -func NotifyEventInfoToEvmLog(info *NotifyEventInfo) (*types.StorageLog, error) { - n := info +func NotifyEventInfoToEvmLog(n *NotifyEventInfo) (*types.StorageLog, error) { if !n.IsEvm { return nil, fmt.Errorf("not evm event") } - states, ok := n.States.(string) - if !ok { - return nil, errors.New("event info states is not string") - } - data, err := hexutil.Decode(states) - if err != nil { - return nil, err + + var data []byte + var err error + switch val := n.States.(type) { + case string: + if data, err = hexutil.Decode(val); err != nil { + return nil, err + } + case hexutil.Bytes: + data = val + default: + return nil, fmt.Errorf("not support such states type") } + source := common.NewZeroCopySource(data) var storageLog types.StorageLog - err = storageLog.Deserialization(source) - if err != nil { + if err = storageLog.Deserialization(source); err != nil { return nil, err } diff --git a/smartcontract/event/notify_event_args_test.go b/smartcontract/event/notify_event_args_test.go new file mode 100644 index 0000000000..2306cc059a --- /dev/null +++ b/smartcontract/event/notify_event_args_test.go @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2018 The ontology Authors + * This file is part of The ontology library. + * + * The ontology is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The ontology is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with The ontology. If not, see . + */ + +package event + +import ( + "encoding/hex" + "fmt" + "math/big" + "testing" + + "github.com/ontio/ontology/common" + "github.com/ontio/ontology/core/types" + "github.com/stretchr/testify/assert" +) + +func TestDeserialization(t *testing.T) { + data, err := hex.DecodeString("000000000000000000000000000000000000000203000000ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef000000000000000000000000a34886547e00d8f15eaf5a98f99f4f76aaeb3bd500000000000000000000000000000000000000000000000000000000000000072000000000000000000000000000000000000000000000000000ffa4f70a6cd800") + if err != nil { + panic(err) + } + source := common.NewZeroCopySource(data) + sl := &types.StorageLog{} + sl.Deserialization(source) + fmt.Println(sl.Address.String()) + a := big.NewInt(0).SetBytes(sl.Data) + fmt.Println(a.String()) + info := NotifyEventInfoFromEvmLog(sl) + sl2, err := NotifyEventInfoToEvmLog(info) + assert.Nil(t, err) + fmt.Println(sl2) +} diff --git a/txnpool/proc/txnpool_server.go b/txnpool/proc/txnpool_server.go index 0c7a67e41b..d21a898364 100644 --- a/txnpool/proc/txnpool_server.go +++ b/txnpool/proc/txnpool_server.go @@ -33,6 +33,8 @@ import ( "github.com/ontio/ontology/core/ledger" txtypes "github.com/ontio/ontology/core/types" "github.com/ontio/ontology/errors" + "github.com/ontio/ontology/events" + "github.com/ontio/ontology/events/message" msgpack "github.com/ontio/ontology/p2pserver/message/msg_pack" p2p "github.com/ontio/ontology/p2pserver/net/protocol" tc "github.com/ontio/ontology/txnpool/common" @@ -218,6 +220,10 @@ func (s *TXPoolServer) setPendingTx(tx *txtypes.Transaction, sender tc.SenderTyp } s.allPendingTxs[tx.Hash()] = pt + if ethTx, err := tx.GetEIP155Tx(); err == nil && events.DefActorPublisher != nil { + events.DefActorPublisher.Publish(message.TOPIC_PENDING_TX_EVENT, + &message.PendingTxMsg{Event: []*ethtype.Transaction{ethTx}}) + } return pt }