diff --git a/common/config/config.go b/common/config/config.go
index 0dacdad4a2..9c35536902 100644
--- a/common/config/config.go
+++ b/common/config/config.go
@@ -79,7 +79,7 @@ const (
DEFAULT_WASM_GAS_FACTOR = uint64(10)
DEFAULT_WASM_MAX_STEPCOUNT = uint64(8000000)
- DEFAULT_DATA_DIR = "./Chain/"
+ DEFAULT_DATA_DIR = "./Chain"
DEFAULT_RESERVED_FILE = "./peers.rsv"
//DEFAULT_ETH_BLOCK_GAS_LIMIT = 800000000
diff --git a/core/store/common/data_entry_prefix.go b/core/store/common/data_entry_prefix.go
index bebbd6023c..d7edcaf173 100644
--- a/core/store/common/data_entry_prefix.go
+++ b/core/store/common/data_entry_prefix.go
@@ -27,6 +27,7 @@ const (
DATA_HEADER = 0x01 //Block hash => block header+txhashes key prefix
DATA_TRANSACTION = 0x02 //Transction hash => transaction key prefix
DATA_STATE_MERKLE_ROOT = 0x21 // block height => write set hash + state merkle root
+ DATA_BLOOM = 0x23 // block height => block bloom data
// Transaction
ST_BOOKKEEPER DataEntryPrefix = 0x03 //BookKeeper state key prefix
@@ -35,8 +36,9 @@ const (
ST_DESTROYED DataEntryPrefix = 0x06 // record destroyed smart contract: prefix+address -> height
// eth state
- ST_ETH_CODE DataEntryPrefix = 0x30 // eth contract code:hash -> bytes
- ST_ETH_ACCOUNT DataEntryPrefix = 0x31 // eth account: address -> [nonce, codeHash]
+ ST_ETH_CODE DataEntryPrefix = 0x30 // eth contract code:hash -> bytes
+ ST_ETH_ACCOUNT DataEntryPrefix = 0x31 // eth account: address -> [nonce, codeHash]
+ ST_ETH_FILTER_START DataEntryPrefix = 0x32 // support eth filter height
IX_HEADER_HASH_LIST DataEntryPrefix = 0x09 //Block height => block hash key prefix
diff --git a/core/store/ledgerstore/block_store.go b/core/store/ledgerstore/block_store.go
index cd022439de..c468a824b4 100644
--- a/core/store/ledgerstore/block_store.go
+++ b/core/store/ledgerstore/block_store.go
@@ -24,7 +24,9 @@ import (
"fmt"
"io"
+ types2 "github.com/ethereum/go-ethereum/core/types"
"github.com/ontio/ontology/common"
+ "github.com/ontio/ontology/common/config"
"github.com/ontio/ontology/common/serialization"
scom "github.com/ontio/ontology/core/store/common"
"github.com/ontio/ontology/core/store/leveldbstore"
@@ -36,6 +38,8 @@ type BlockStore struct {
enableCache bool //Is enable lru cache
dbDir string //The path of store file
cache *BlockCache //The cache of block, if have.
+ indexer bloomIndexer // Background processor generating the index data content
+ filterStart uint32 // Start block that filter supported
store *leveldbstore.LevelDBStore //block store handler
}
@@ -54,20 +58,62 @@ func NewBlockStore(dbDir string, enableCache bool) (*BlockStore, error) {
if err != nil {
return nil, err
}
+
blockStore := &BlockStore{
dbDir: dbDir,
enableCache: enableCache,
store: store,
cache: cache,
}
+
+ _, curBlockHeight, err := blockStore.GetCurrentBlock()
+ if err != nil {
+ if err != scom.ErrNotFound {
+ return nil, fmt.Errorf("get current block: %s", err.Error())
+ }
+ curBlockHeight = 0
+ }
+
+ indexer := NewBloomIndexer(store, curBlockHeight/BloomBitsBlocks)
+
+ start, err := indexer.GetFilterStart()
+ if err != nil {
+ if err != scom.ErrNotFound {
+ return nil, fmt.Errorf("get filter start: %s", err.Error())
+ }
+
+ var tmp uint32
+ if curBlockHeight < config.GetAddDecimalsHeight() {
+ tmp = config.GetAddDecimalsHeight()
+ } else {
+ tmp = curBlockHeight
+ }
+ err = indexer.PutFilterStart(tmp)
+ if err != nil {
+ return nil, fmt.Errorf("put filter start: %s", err.Error())
+ }
+ start = tmp
+ }
+
+ blockStore.indexer = indexer
+ blockStore.filterStart = start
+
return blockStore, nil
}
+func (this *BlockStore) PutFilterStart(height uint32) error {
+ return this.indexer.PutFilterStart(height)
+}
+
//NewBatch start a commit batch
func (this *BlockStore) NewBatch() {
this.store.NewBatch()
}
+func (this *BlockStore) GetIndexer() bloomIndexer {
+ return this.indexer
+}
+
//SaveBlock persist block to store
func (this *BlockStore) SaveBlock(block *types.Block) error {
if this.enableCache {
@@ -134,6 +180,10 @@ func (this *BlockStore) GetBlock(blockHash common.Uint256) (*types.Block, error)
return block, nil
}
+func (this *BlockStore) GetDb() *leveldbstore.LevelDBStore {
+ return this.store
+}
+
func (this *BlockStore) loadHeaderWithTx(blockHash common.Uint256) (*types.Header, []common.Uint256, error) {
key := genHeaderKey(blockHash)
value, err := this.store.Get(key)
@@ -348,6 +398,40 @@ func (this *BlockStore) SaveBlockHash(height uint32, blockHash common.Uint256) {
this.store.BatchPut(key, blockHash.ToArray())
}
+//SaveBloomData persist block bloom data to store
+func (this *BlockStore) SaveBloomData(height uint32, bloom types2.Bloom) {
+ this.Process(height, bloom)
+ if height != 0 && height%BloomBitsBlocks == 0 {
+ this.indexer.BatchPut()
+ }
+ key := this.genBloomKey(height)
+ this.store.BatchPut(key, bloom.Bytes())
+}
+
+func (this *BlockStore) Process(height uint32, bloom types2.Bloom) {
+ this.indexer.Process(height, bloom)
+}
+
+//GetBloomData return bloom data by block height
+func (this *BlockStore) GetBloomData(height uint32) (types2.Bloom, error) {
+ key := this.genBloomKey(height)
+ value, err := this.store.Get(key)
+ if err != nil && err != scom.ErrNotFound {
+ return types2.Bloom{}, err
+ }
+ if err == scom.ErrNotFound {
+ return types2.Bloom{}, nil
+ }
+ return types2.BytesToBloom(value), nil
+}
+
+func (this *BlockStore) genBloomKey(height uint32) []byte {
+ temp := make([]byte, 5)
+ temp[0] = byte(scom.DATA_BLOOM)
+ binary.LittleEndian.PutUint32(temp[1:], height)
+ return temp
+}
+
//SaveTransaction persist transaction to store
func (this *BlockStore) SaveTransaction(tx *types.Transaction, height uint32) {
if this.enableCache {
diff --git a/core/store/ledgerstore/bloombits.go b/core/store/ledgerstore/bloombits.go
new file mode 100644
index 0000000000..5c9478ebfe
--- /dev/null
+++ b/core/store/ledgerstore/bloombits.go
@@ -0,0 +1,137 @@
+/*
+ * Copyright (C) 2018 The ontology Authors
+ * This file is part of The ontology library.
+ *
+ * The ontology is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The ontology is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with The ontology. If not, see .
+ */
+package ledgerstore
+
+import (
+ "encoding/binary"
+ "io"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/bitutil"
+ "github.com/ethereum/go-ethereum/core/bloombits"
+ "github.com/ethereum/go-ethereum/core/types"
+ common2 "github.com/ontio/ontology/common"
+ scom "github.com/ontio/ontology/core/store/common"
+ "github.com/ontio/ontology/core/store/leveldbstore"
+)
+
+const (
+ // bloomServiceThreads is the number of goroutines used globally by an Ethereum
+ // instance to service bloombits lookups for all running filters.
+ BloomServiceThreads = 16
+
+ // bloomFilterThreads is the number of goroutines used locally per filter to
+ // multiplex requests onto the global servicing goroutines.
+ BloomFilterThreads = 3
+
+ // bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
+ // in a single batch.
+ BloomRetrievalBatch = 16
+
+ // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
+ // to accumulate request an entire batch (avoiding hysteresis).
+ BloomRetrievalWait = time.Duration(0)
+
+ // BloomBitsBlocks is the number of blocks a single bloom bit section vector
+ // contains on the server side.
+ BloomBitsBlocks uint32 = 4096
+)
+
+var (
+ bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint32 big endian) + hash -> bloom bits
+)
+
+// bloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
+// for the Ethereum header bloom filters, permitting blazing fast filtering.
+type bloomIndexer struct {
+ store *leveldbstore.LevelDBStore // database instance to write index data and metadata into
+ gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
+ section uint32 // Section is the section number being processed currently
+}
+
+func NewBloomIndexer(store *leveldbstore.LevelDBStore, section uint32) bloomIndexer {
+ gen, err := bloombits.NewGenerator(uint(BloomBitsBlocks))
+ if err != nil {
+ panic(err) // never fired since BloomBitsBlocks is multiple of 8
+ }
+ b := bloomIndexer{
+ store: store,
+ }
+ b.gen, b.section = gen, section
+ return b
+}
+
+// Process implements core.ChainIndexerBackend, adding a new header's bloom into
+// the index.
+func (b *bloomIndexer) Process(height uint32, bloom types.Bloom) {
+ b.gen.AddBloom(uint(height-b.section*BloomBitsBlocks), bloom)
+}
+
+// BatchPut implements core.ChainIndexerBackend, finalizing the bloom section and
+// writing it out into the database.
+func (b *bloomIndexer) BatchPut() {
+ for i := 0; i < types.BloomBitLength; i++ {
+ bits, err := b.gen.Bitset(uint(i))
+ if err != nil {
+ panic(err) // never fired since idx is always less than 8 and section should be right
+ }
+ value := bitutil.CompressBytes(bits)
+ b.store.BatchPut(bloomBitsKey(uint(i), b.section), value)
+ }
+ b.section++
+}
+
+func (this *bloomIndexer) PutFilterStart(height uint32) error {
+ key := genFilterStartKey()
+ sink := common2.NewZeroCopySink(nil)
+ sink.WriteUint32(height)
+ return this.store.Put(key, sink.Bytes())
+}
+
+func (this *bloomIndexer) GetFilterStart() (uint32, error) {
+ key := genFilterStartKey()
+ data, err := this.store.Get(key)
+ if err != nil {
+ return 0, err
+ }
+ height, eof := common2.NewZeroCopySource(data).NextUint32()
+ if eof {
+ return 0, io.ErrUnexpectedEOF
+ }
+ return height, nil
+}
+
+func genFilterStartKey() []byte {
+ return []byte{byte(scom.ST_ETH_FILTER_START)}
+}
+
+// bloomBitsKey = bloomBitsPrefix + bit (uint16 big endian) + section (uint32 big endian)
+func bloomBitsKey(bit uint, section uint32) []byte {
+ key := append(bloomBitsPrefix, make([]byte, 6)...)
+
+ binary.BigEndian.PutUint16(key[1:], uint16(bit))
+ binary.BigEndian.PutUint32(key[3:], section)
+
+ return key
+}
+
+// ReadBloomBits retrieves the compressed bloom bit vector belonging to the given
+// section and bit index from the.
+func ReadBloomBits(db *leveldbstore.LevelDBStore, bit uint, section uint32) ([]byte, error) {
+ return db.Get(bloomBitsKey(bit, section))
+}
diff --git a/core/store/ledgerstore/cross_chain_store.go b/core/store/ledgerstore/cross_chain_store.go
index e170fe3ed0..e0a5706345 100644
--- a/core/store/ledgerstore/cross_chain_store.go
+++ b/core/store/ledgerstore/cross_chain_store.go
@@ -78,6 +78,11 @@ func (this *CrossChainStore) GetCrossChainMsg(height uint32) (*types.CrossChainM
return msg, nil
}
+//Close CrossChainStore store
+func (this *CrossChainStore) Close() error {
+ return this.store.Close()
+}
+
func (this *CrossChainStore) genCrossChainMsgKey(height uint32) []byte {
temp := make([]byte, 5)
temp[0] = byte(scom.SYS_CROSS_CHAIN_MSG)
diff --git a/core/store/ledgerstore/ledger_store.go b/core/store/ledgerstore/ledger_store.go
index cbfc1c4466..ca694e6c1c 100644
--- a/core/store/ledgerstore/ledger_store.go
+++ b/core/store/ledgerstore/ledger_store.go
@@ -44,6 +44,7 @@ import (
"github.com/ontio/ontology/core/states"
"github.com/ontio/ontology/core/store"
scom "github.com/ontio/ontology/core/store/common"
+ "github.com/ontio/ontology/core/store/leveldbstore"
"github.com/ontio/ontology/core/store/overlaydb"
"github.com/ontio/ontology/core/types"
"github.com/ontio/ontology/errors"
@@ -53,7 +54,7 @@ import (
"github.com/ontio/ontology/smartcontract"
"github.com/ontio/ontology/smartcontract/event"
"github.com/ontio/ontology/smartcontract/service/evm"
- types4 "github.com/ontio/ontology/smartcontract/service/evm/types"
+ types5 "github.com/ontio/ontology/smartcontract/service/evm/types"
"github.com/ontio/ontology/smartcontract/service/evm/witness"
"github.com/ontio/ontology/smartcontract/service/native/ong"
"github.com/ontio/ontology/smartcontract/service/native/utils"
@@ -95,6 +96,7 @@ type LedgerStoreImp struct {
currBlockHeight uint32 //Current block height
currBlockHash common.Uint256 //Current block hash
headerCache map[common.Uint256]*types.Header //BlockHash => Header
+ bloomCache map[uint32]*types3.Bloom //bloomCache for bloom index, delete cached bloom after calculating bloom index
headerIndex map[uint32]common.Uint256 //Header index, Mapping header height => block hash
vbftPeerInfoMap map[uint32]map[string]uint32 //key:block height,value:peerInfo
lock sync.RWMutex
@@ -110,6 +112,7 @@ func NewLedgerStore(dataDir string, stateHashHeight uint32) (*LedgerStoreImp, er
ledgerStore := &LedgerStoreImp{
headerIndex: make(map[uint32]common.Uint256),
headerCache: make(map[common.Uint256]*types.Header, 0),
+ bloomCache: make(map[uint32]*types3.Bloom, 4096),
vbftPeerInfoMap: make(map[uint32]map[string]uint32),
savingBlockSemaphore: make(chan bool, 1),
stateHashCheckHeight: stateHashHeight,
@@ -123,7 +126,7 @@ func NewLedgerStore(dataDir string, stateHashHeight uint32) (*LedgerStoreImp, er
crossChainStore, err := NewCrossChainStore(dataDir)
if err != nil {
- return nil, fmt.Errorf("NewBlockStore error %s", err)
+ return nil, fmt.Errorf("NewCrossChainStore error %s", err)
}
ledgerStore.crossChainStore = crossChainStore
@@ -155,6 +158,10 @@ func (this *LedgerStoreImp) InitLedgerStoreWithGenesisBlock(genesisBlock *types.
if err != nil {
return fmt.Errorf("blockStore.ClearAll error %s", err)
}
+ err = this.blockStore.PutFilterStart(0)
+ if err != nil {
+ return fmt.Errorf("blockStore.PutFilterStart error %s", err)
+ }
err = this.stateStore.ClearAll()
if err != nil {
return fmt.Errorf("stateStore.ClearAll error %s", err)
@@ -273,6 +280,38 @@ func (this *LedgerStoreImp) init() error {
if err != nil {
return fmt.Errorf("recoverStore error %s", err)
}
+ err = this.loadBloomBits()
+ if err != nil {
+ return fmt.Errorf("loadBloomBits error %s", err)
+ }
+ return nil
+}
+
+func (this *LedgerStoreImp) loadBloomBits() error {
+ _, currentBlockHeight, err := this.blockStore.GetCurrentBlock()
+ if err != nil {
+ if err != scom.ErrNotFound {
+ return fmt.Errorf("LoadCurrentBlock error %s", err)
+ }
+ return nil
+ }
+
+ if currentBlockHeight < this.blockStore.filterStart {
+ return nil
+ }
+
+ start := currentBlockHeight - currentBlockHeight%BloomBitsBlocks
+ if start < this.blockStore.filterStart {
+ start = this.blockStore.filterStart
+ }
+
+ for i := start; i <= currentBlockHeight; i++ {
+ bloom, err := this.blockStore.GetBloomData(i)
+ if err != nil {
+ return fmt.Errorf("LoadBloom error %s", err)
+ }
+ this.blockStore.Process(i, bloom)
+ }
return nil
}
@@ -411,6 +450,14 @@ func (this *LedgerStoreImp) GetCurrentBlockHash() common.Uint256 {
return this.currBlockHash
}
+func (this *LedgerStoreImp) GetFilterStart() uint32 {
+ return this.blockStore.filterStart
+}
+
+func (this *LedgerStoreImp) GetIndexStore() *leveldbstore.LevelDBStore {
+ return this.blockStore.GetDb()
+}
+
//GetCurrentBlockHeight return the current block height
func (this *LedgerStoreImp) GetCurrentBlockHeight() uint32 {
this.lock.RLock()
@@ -721,7 +768,7 @@ func (this *LedgerStoreImp) AddBlock(block *types.Block, ccMsg *types.CrossChain
return nil
}
-func (this *LedgerStoreImp) saveBlockToBlockStore(block *types.Block) error {
+func (this *LedgerStoreImp) saveBlockToBlockStore(block *types.Block, bloom types3.Bloom) error {
blockHash := block.Hash()
blockHeight := block.Header.Height
@@ -739,6 +786,9 @@ func (this *LedgerStoreImp) saveBlockToBlockStore(block *types.Block) error {
if err != nil {
return fmt.Errorf("SaveBlock height %d hash %s error %s", blockHeight, blockHash.ToHexString(), err)
}
+ if blockHeight >= config.GetAddDecimalsHeight() {
+ this.blockStore.SaveBloomData(blockHeight, bloom)
+ }
return nil
}
@@ -767,9 +817,10 @@ func (this *LedgerStoreImp) executeBlock(block *types.Block) (result store.Execu
return true
})
cache := storage.NewCacheDB(overlay)
+ var allLogs []*types.StorageLog
for i, tx := range block.Transactions {
cache.Reset()
- notify, crossStateHashes, e := this.handleTransaction(overlay, cache, gasTable, block, tx, uint32(i), evmWitness)
+ notify, crossStateHashes, receipt, e := this.handleTransaction(overlay, cache, gasTable, block, tx, uint32(i), evmWitness)
if e != nil {
err = e
return
@@ -780,7 +831,13 @@ func (this *LedgerStoreImp) executeBlock(block *types.Block) (result store.Execu
notify.TxIndex = uint32(i)
result.Notify = append(result.Notify, notify)
result.CrossStates = append(result.CrossStates, crossStateHashes...)
+ if receipt != nil {
+ allLogs = append(allLogs, receipt.Logs...)
+ }
}
+ bloomBytes := types3.LogsBloom(parseOntLogsToEth(allLogs))
+ result.Bloom = types3.BytesToBloom(bloomBytes)
+
result.Hash = overlay.ChangeHash()
result.WriteSet = overlay.GetWriteSet()
if len(result.CrossStates) != 0 {
@@ -807,6 +864,19 @@ func (this *LedgerStoreImp) executeBlock(block *types.Block) (result store.Execu
return
}
+func parseOntLogsToEth(logs []*types.StorageLog) []*types3.Log {
+ var parseLogs []*types3.Log
+ for _, log := range logs {
+ parse := &types3.Log{
+ Address: log.Address,
+ Topics: log.Topics,
+ Data: log.Data,
+ }
+ parseLogs = append(parseLogs, parse)
+ }
+ return parseLogs
+}
+
func calculateTotalStateHash(overlay *overlaydb.OverlayDB) (result common.Uint256, err error) {
stateDiff := sha256.New()
@@ -841,7 +911,7 @@ func (this *LedgerStoreImp) saveBlockToStateStore(block *types.Block, result sto
blockHeight := block.Header.Height
for _, notify := range result.Notify {
- if err := SaveNotify(this.eventStore, notify.TxHash, notify); err != nil {
+ if err := SaveNotify(this.eventStore, notify.TxHash, notify, block); err != nil {
return err
}
}
@@ -943,6 +1013,10 @@ func (this *LedgerStoreImp) tryPruneBlock(header *types.Header) bool {
return true
}
+func (this *LedgerStoreImp) BloomStatus() (uint32, uint32) {
+ return BloomBitsBlocks, this.currBlockHeight / BloomBitsBlocks
+}
+
//saveBlock do the job of execution samrt contract and commit block to store.
func (this *LedgerStoreImp) submitBlock(block *types.Block, crossChainMsg *types.CrossChainMsg, result store.ExecuteResult) error {
blockHash := block.Hash()
@@ -956,7 +1030,8 @@ func (this *LedgerStoreImp) submitBlock(block *types.Block, crossChainMsg *types
this.blockStore.NewBatch()
this.stateStore.NewBatch()
this.eventStore.NewBatch()
- err := this.saveBlockToBlockStore(block)
+
+ err := this.saveBlockToBlockStore(block, result.Bloom)
if err != nil {
return fmt.Errorf("save to block store height:%d error:%s", blockHeight, err)
}
@@ -991,6 +1066,7 @@ func (this *LedgerStoreImp) submitBlock(block *types.Block, crossChainMsg *types
&message.SaveBlockCompleteMsg{
Block: block,
})
+ event.PushChainEvent(result.Notify, block, result.Bloom)
}
return nil
}
@@ -1026,16 +1102,17 @@ func (this *LedgerStoreImp) saveBlock(block *types.Block, ccMsg *types.CrossChai
}
func (this *LedgerStoreImp) handleTransaction(overlay *overlaydb.OverlayDB, cache *storage.CacheDB, gasTable map[string]uint64,
- block *types.Block, tx *types.Transaction, txIndex uint32, evmWitness common2.Address) (*event.ExecuteNotify, []common.Uint256, error) {
+ block *types.Block, tx *types.Transaction, txIndex uint32, evmWitness common2.Address) (*event.ExecuteNotify, []common.Uint256, *types.Receipt, error) {
txHash := tx.Hash()
notify := &event.ExecuteNotify{TxHash: txHash, State: event.CONTRACT_STATE_FAIL, TxIndex: txIndex}
var crossStateHashes []common.Uint256
var err error
+ var receipt *types.Receipt
switch tx.TxType {
case types.Deploy:
err = this.stateStore.HandleDeployTransaction(this, overlay, gasTable, cache, tx, block, notify)
if overlay.Error() != nil {
- return nil, nil, fmt.Errorf("HandleDeployTransaction tx %s error %s", txHash.ToHexString(), overlay.Error())
+ return nil, nil, nil, fmt.Errorf("HandleDeployTransaction tx %s error %s", txHash.ToHexString(), overlay.Error())
}
if err != nil {
log.Debugf("HandleDeployTransaction tx %s error %s", txHash.ToHexString(), err)
@@ -1043,7 +1120,7 @@ func (this *LedgerStoreImp) handleTransaction(overlay *overlaydb.OverlayDB, cach
case types.InvokeNeo, types.InvokeWasm:
crossStateHashes, err = this.stateStore.HandleInvokeTransaction(this, overlay, gasTable, cache, tx, block, notify)
if overlay.Error() != nil {
- return nil, nil, fmt.Errorf("HandleInvokeTransaction tx %s error %s", txHash.ToHexString(), overlay.Error())
+ return nil, nil, nil, fmt.Errorf("HandleInvokeTransaction tx %s error %s", txHash.ToHexString(), overlay.Error())
}
if err != nil {
log.Debugf("HandleInvokeTransaction tx %s error %s", txHash.ToHexString(), err)
@@ -1051,7 +1128,7 @@ func (this *LedgerStoreImp) handleTransaction(overlay *overlaydb.OverlayDB, cach
case types.EIP155:
eiptx, err := tx.GetEIP155Tx()
if err != nil {
- return nil, nil, fmt.Errorf("HandleInvokeTransaction tx %s error %s", txHash.ToHexString(), err.Error())
+ return nil, nil, nil, fmt.Errorf("HandleInvokeTransaction tx %s error %s", txHash.ToHexString(), err.Error())
}
ctx := Eip155Context{
@@ -1060,9 +1137,9 @@ func (this *LedgerStoreImp) handleTransaction(overlay *overlaydb.OverlayDB, cach
Height: block.Header.Height,
Timestamp: block.Header.Timestamp,
}
- _, receipt, err := this.stateStore.HandleEIP155Transaction(this, cache, eiptx, ctx, notify, true)
+ _, receipt, err = this.stateStore.HandleEIP155Transaction(this, cache, eiptx, ctx, notify, true)
if overlay.Error() != nil {
- return nil, nil, fmt.Errorf("HandleInvokeTransaction tx %s error %s", txHash.ToHexString(), overlay.Error())
+ return nil, nil, nil, fmt.Errorf("HandleInvokeTransaction tx %s error %s", txHash.ToHexString(), overlay.Error())
}
if err != nil {
log.Debugf("HandleInvokeTransaction tx %s error %s", txHash.ToHexString(), err)
@@ -1079,7 +1156,7 @@ func (this *LedgerStoreImp) handleTransaction(overlay *overlaydb.OverlayDB, cach
crossStateHashes = append(crossStateHashes, event.Hash)
}
}
- return notify, crossStateHashes, nil
+ return notify, crossStateHashes, receipt, nil
}
func (this *LedgerStoreImp) saveHeaderIndexList() error {
@@ -1278,7 +1355,7 @@ func (this *LedgerStoreImp) PreExecuteContractBatch(txes []*types.Transaction, a
return results, height, nil
}
-func (this *LedgerStoreImp) PreExecuteEIP155(tx *types3.Transaction, ctx Eip155Context) (*types4.ExecutionResult, *event.ExecuteNotify, error) {
+func (this *LedgerStoreImp) PreExecuteEIP155(tx *types3.Transaction, ctx Eip155Context) (*types5.ExecutionResult, *event.ExecuteNotify, error) {
overlay := this.stateStore.NewOverlayDB()
cache := storage.NewCacheDB(overlay)
@@ -1299,6 +1376,10 @@ func (this *LedgerStoreImp) GetEthAccount(address common2.Address) (*storage.Eth
return this.stateStore.GetEthAccount(address)
}
+func (this *LedgerStoreImp) GetBloomData(height uint32) (types3.Bloom, error) {
+ return this.blockStore.GetBloomData(height)
+}
+
//PreExecuteContract return the result of smart contract execution without commit to store
func (this *LedgerStoreImp) PreExecuteContractWithParam(tx *types.Transaction, preParam PrexecuteParam) (*sstate.PreExecResult, error) {
height := this.GetCurrentBlockHeight()
@@ -1432,7 +1513,7 @@ func (this *LedgerStoreImp) PreExecuteContract(tx *types.Transaction) (*sstate.P
return this.PreExecuteContractWithParam(tx, param)
}
-func (this *LedgerStoreImp) PreExecuteEip155Tx(msg types3.Message) (*types4.ExecutionResult, error) {
+func (this *LedgerStoreImp) PreExecuteEip155Tx(msg types3.Message) (*types5.ExecutionResult, error) {
height := this.GetCurrentBlockHeight()
// use previous block time to make it predictable for easy test
blockTime := uint32(time.Now().Unix())
@@ -1472,6 +1553,10 @@ func (this *LedgerStoreImp) Close() error {
if err != nil {
return fmt.Errorf("eventStore close error %s", err)
}
+ err = this.crossChainStore.Close()
+ if err != nil {
+ return fmt.Errorf("crossChainStore close error %s", err)
+ }
err = this.stateStore.Close()
if err != nil {
return fmt.Errorf("stateStore close error %s", err)
diff --git a/core/store/ledgerstore/tx_handler.go b/core/store/ledgerstore/tx_handler.go
index c0bfc1a196..e7d9f12f95 100644
--- a/core/store/ledgerstore/tx_handler.go
+++ b/core/store/ledgerstore/tx_handler.go
@@ -292,7 +292,7 @@ func (self *StateStore) HandleInvokeTransaction(store store.LedgerStore, overlay
return sc.CrossHashes, nil
}
-func SaveNotify(eventStore scommon.EventStore, txHash common.Uint256, notify *event.ExecuteNotify) error {
+func SaveNotify(eventStore scommon.EventStore, txHash common.Uint256, notify *event.ExecuteNotify, blk *types.Block) error {
if !sysconfig.DefConfig.Common.EnableEventLog {
return nil
}
@@ -300,6 +300,7 @@ func SaveNotify(eventStore scommon.EventStore, txHash common.Uint256, notify *ev
return fmt.Errorf("SaveEventNotifyByTx error %s", err)
}
event.PushSmartCodeEvent(txHash, 0, event.EVENT_NOTIFY, notify)
+ event.PushEthSmartCodeEvent(notify, blk)
return nil
}
diff --git a/core/store/store.go b/core/store/store.go
index 6fb62b4b0a..c04acdec16 100644
--- a/core/store/store.go
+++ b/core/store/store.go
@@ -25,6 +25,7 @@ import (
"github.com/ontio/ontology/common"
"github.com/ontio/ontology/core/payload"
"github.com/ontio/ontology/core/states"
+ "github.com/ontio/ontology/core/store/leveldbstore"
"github.com/ontio/ontology/core/store/overlaydb"
"github.com/ontio/ontology/core/types"
"github.com/ontio/ontology/smartcontract/event"
@@ -40,6 +41,7 @@ type ExecuteResult struct {
CrossStates []common.Uint256
CrossStatesRoot common.Uint256
Notify []*event.ExecuteNotify
+ Bloom types2.Bloom
}
// LedgerStore provides func with store package.
@@ -54,6 +56,8 @@ type LedgerStore interface {
GetCurrentBlockHash() common.Uint256
GetCurrentBlockHeight() uint32
GetCurrentHeaderHeight() uint32
+ GetFilterStart() uint32
+ GetIndexStore() *leveldbstore.LevelDBStore
GetCurrentHeaderHash() common.Uint256
GetBlockHash(height uint32) common.Uint256
GetHeaderByHash(blockHash common.Uint256) (*types.Header, error)
@@ -62,6 +66,8 @@ type LedgerStore interface {
GetBlockByHash(blockHash common.Uint256) (*types.Block, error)
GetBlockByHeight(height uint32) (*types.Block, error)
GetTransaction(txHash common.Uint256) (*types.Transaction, uint32, error)
+ GetBloomData(height uint32) (types2.Bloom, error)
+ BloomStatus() (uint32, uint32)
IsContainBlock(blockHash common.Uint256) (bool, error)
IsContainTransaction(txHash common.Uint256) (bool, error)
GetBlockRootWithNewTxRoots(startHeight uint32, txRoots []common.Uint256) common.Uint256
diff --git a/events/message/message.go b/events/message/message.go
index dc64eb3ede..3683bba93d 100644
--- a/events/message/message.go
+++ b/events/message/message.go
@@ -19,12 +19,17 @@
package message
import (
+ "github.com/ethereum/go-ethereum/core"
+ types2 "github.com/ethereum/go-ethereum/core/types"
"github.com/ontio/ontology/core/types"
)
const (
TOPIC_SAVE_BLOCK_COMPLETE = "svblkcmp"
TOPIC_SMART_CODE_EVENT = "scevt"
+ TOPIC_PENDING_TX_EVENT = "pendingtx"
+ TOPIC_CHAIN_EVENT = "chainevt"
+ TOPIC_ETH_SC_EVENT = "ethscevt"
)
type SaveBlockCompleteMsg struct {
@@ -38,3 +43,18 @@ type SmartCodeEventMsg struct {
type BlockConsensusComplete struct {
Block *types.Block
}
+
+type EthSmartCodeEventMsg struct {
+ Event EthSmartCodeEvent
+}
+type PendingTxs []*types2.Transaction
+
+type PendingTxMsg struct {
+ Event PendingTxs
+}
+
+type EthSmartCodeEvent []*types2.Log
+
+type ChainEventMsg struct {
+ ChainEvent *core.ChainEvent
+}
diff --git a/http/base/actor/event.go b/http/base/actor/event.go
index 22171e72dc..2ffd0e6a42 100644
--- a/http/base/actor/event.go
+++ b/http/base/actor/event.go
@@ -27,6 +27,9 @@ import (
type EventActor struct {
blockPersistCompleted func(v interface{})
smartCodeEvt func(v interface{})
+ chainEvt func(v interface{})
+ ethSmartCodeEvt func(v interface{})
+ pendingTxEvt func(v interface{})
}
//receive from subscribed actor
@@ -36,6 +39,12 @@ func (t *EventActor) Receive(c actor.Context) {
t.blockPersistCompleted(*msg.Block)
case *message.SmartCodeEventMsg:
t.smartCodeEvt(*msg.Event)
+ case *message.ChainEventMsg:
+ t.chainEvt(*msg.ChainEvent)
+ case *message.PendingTxMsg:
+ t.pendingTxEvt(msg.Event)
+ case *message.EthSmartCodeEventMsg:
+ t.ethSmartCodeEvt(msg.Event)
default:
}
}
@@ -47,6 +56,12 @@ func SubscribeEvent(topic string, handler func(v interface{})) {
return &EventActor{blockPersistCompleted: handler}
} else if topic == message.TOPIC_SMART_CODE_EVENT {
return &EventActor{smartCodeEvt: handler}
+ } else if topic == message.TOPIC_PENDING_TX_EVENT {
+ return &EventActor{pendingTxEvt: handler}
+ } else if topic == message.TOPIC_ETH_SC_EVENT {
+ return &EventActor{ethSmartCodeEvt: handler}
+ } else if topic == message.TOPIC_CHAIN_EVENT {
+ return &EventActor{chainEvt: handler}
} else {
return &EventActor{}
}
diff --git a/http/base/actor/ledger.go b/http/base/actor/ledger.go
index c1ed1c4851..79c7407f25 100644
--- a/http/base/actor/ledger.go
+++ b/http/base/actor/ledger.go
@@ -24,6 +24,7 @@ import (
"github.com/ontio/ontology/common"
"github.com/ontio/ontology/core/ledger"
"github.com/ontio/ontology/core/payload"
+ "github.com/ontio/ontology/core/store/leveldbstore"
"github.com/ontio/ontology/core/types"
"github.com/ontio/ontology/smartcontract/event"
types3 "github.com/ontio/ontology/smartcontract/service/evm/types"
@@ -141,3 +142,19 @@ func PreExecuteEip155Tx(msg types2.Message) (*types3.ExecutionResult, error) {
res, err := ledger.DefLedger.PreExecuteEip155Tx(msg)
return res, err
}
+
+func BloomStatus() (uint32, uint32) {
+ return ledger.DefLedger.BloomStatus()
+}
+
+func GetBloomData(height uint32) (types2.Bloom, error) {
+ return ledger.DefLedger.GetBloomData(height)
+}
+
+func GetFilterStart() uint32 {
+ return ledger.DefLedger.GetFilterStart()
+}
+
+func GetIndexStore() *leveldbstore.LevelDBStore {
+ return ledger.DefLedger.GetIndexStore()
+}
diff --git a/http/base/common/common.go b/http/base/common/common.go
index ea26434e3f..2d1dbc63da 100644
--- a/http/base/common/common.go
+++ b/http/base/common/common.go
@@ -27,16 +27,14 @@ import (
"strings"
"time"
- "github.com/ontio/ontology/core/states"
-
"github.com/laizy/bigint"
-
"github.com/ontio/ontology-crypto/keypair"
"github.com/ontio/ontology/common"
"github.com/ontio/ontology/common/constants"
"github.com/ontio/ontology/common/log"
"github.com/ontio/ontology/core/ledger"
"github.com/ontio/ontology/core/payload"
+ "github.com/ontio/ontology/core/states"
"github.com/ontio/ontology/core/types"
cutils "github.com/ontio/ontology/core/utils"
ontErrors "github.com/ontio/ontology/errors"
diff --git a/http/ethrpc/backend/backend.go b/http/ethrpc/backend/backend.go
new file mode 100644
index 0000000000..406385af47
--- /dev/null
+++ b/http/ethrpc/backend/backend.go
@@ -0,0 +1,88 @@
+/*
+ * Copyright (C) 2018 The ontology Authors
+ * This file is part of The ontology library.
+ *
+ * The ontology is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The ontology is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with The ontology. If not, see .
+ */
+
+package backend
+
+import (
+ "context"
+
+ "github.com/ethereum/go-ethereum/common/bitutil"
+ "github.com/ethereum/go-ethereum/core/bloombits"
+ "github.com/ontio/ontology/core/store/ledgerstore"
+ "github.com/ontio/ontology/core/store/leveldbstore"
+ "github.com/ontio/ontology/http/base/actor"
+)
+
+type BloomBackend struct {
+ bloomRequests chan chan *bloombits.Retrieval
+ closeBloomHandler chan struct{}
+}
+
+func NewBloomBackend() *BloomBackend {
+ return &BloomBackend{
+ bloomRequests: make(chan chan *bloombits.Retrieval),
+ closeBloomHandler: make(chan struct{}),
+ }
+}
+
+// Close
+func (b *BloomBackend) Close() {
+ close(b.closeBloomHandler)
+}
+
+func (b *BloomBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
+ for i := 0; i < ledgerstore.BloomFilterThreads; i++ {
+ go session.Multiplex(ledgerstore.BloomRetrievalBatch, ledgerstore.BloomRetrievalWait, b.bloomRequests)
+ }
+}
+
+func (b *BloomBackend) BloomStatus() (uint32, uint32) {
+ return actor.BloomStatus()
+}
+
+// startBloomHandlers starts a batch of goroutines to accept bloom bit database
+// retrievals from possibly a range of filters and serving the data to satisfy.
+func (b *BloomBackend) StartBloomHandlers(sectionSize uint32, db *leveldbstore.LevelDBStore) error {
+ for i := 0; i < ledgerstore.BloomServiceThreads; i++ {
+ go func() {
+ for {
+ select {
+ case <-b.closeBloomHandler:
+ return
+
+ case request := <-b.bloomRequests:
+ task := <-request
+ task.Bitsets = make([][]byte, len(task.Sections))
+ for i, section := range task.Sections {
+ if compVector, err := ledgerstore.ReadBloomBits(db, task.Bit, uint32(section)); err == nil {
+ if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil {
+ task.Bitsets[i] = blob
+ } else {
+ task.Error = err
+ }
+ } else {
+ task.Error = err
+ }
+ }
+ request <- task
+ }
+ }
+ }()
+ }
+ return nil
+}
diff --git a/http/ethrpc/eth/api.go b/http/ethrpc/eth/api.go
index b9564b4f35..dffd1b3485 100644
--- a/http/ethrpc/eth/api.go
+++ b/http/ethrpc/eth/api.go
@@ -286,7 +286,7 @@ func (api *EthereumAPI) SendRawTransaction(data hexutil.Bytes) (common.Hash, err
}
func (api *EthereumAPI) Call(args types2.CallArgs, blockNumber types2.BlockNumber, _ *map[common.Address]types2.Account) (hexutil.Bytes, error) {
- log.Debugf("eth_call args %v ,block number %v ", args, blockNumber)
+ log.Debugf("eth_call block number %v ", blockNumber)
msg := args.AsMessage(RPCGasCap)
res, err := bactor.PreExecuteEip155Tx(msg)
if err != nil {
@@ -316,7 +316,6 @@ type revertError struct {
}
func (api *EthereumAPI) EstimateGas(args types2.CallArgs) (hexutil.Uint64, error) {
- log.Debugf("eth_estimateGas args %v", args)
var (
lo uint64 = params.TxGas
hi uint64
diff --git a/http/ethrpc/filters/api.go b/http/ethrpc/filters/api.go
new file mode 100644
index 0000000000..5e9a2baa06
--- /dev/null
+++ b/http/ethrpc/filters/api.go
@@ -0,0 +1,446 @@
+/*
+ * Copyright (C) 2018 The ontology Authors
+ * This file is part of The ontology library.
+ *
+ * The ontology is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The ontology is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with The ontology. If not, see .
+ */
+
+package filters
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum"
+ "github.com/ethereum/go-ethereum/common"
+ ethtypes "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/eth/filters"
+ "github.com/ethereum/go-ethereum/rpc"
+)
+
+// consider a filter inactive if it has not been polled for within deadline
+var deadline = 5 * time.Minute
+
+// filter is a helper struct that holds meta information over the filter type
+// and associated subscription in the event system.
+type filter struct {
+ typ filters.Type
+ deadline *time.Timer // filter is inactive when deadline triggers
+ hashes []common.Hash
+ crit filters.FilterCriteria
+ logs []*ethtypes.Log
+ s *Subscription // associated subscription in event system
+}
+
+type PublicFilterAPI struct {
+ backend Backend
+ filtersMu sync.Mutex
+ filters map[rpc.ID]*filter
+ events *EventSystem
+ timeout time.Duration
+}
+
+func NewPublicFilterAPI(backend Backend) *PublicFilterAPI {
+ api := &PublicFilterAPI{
+ backend: backend,
+ events: NewEventSystem(backend),
+ filters: make(map[rpc.ID]*filter),
+ timeout: deadline,
+ }
+ go api.timeoutLoop()
+ return api
+}
+
+// timeoutLoop runs at the interval set by 'timeout' and deletes filters
+// that have not been recently used. It is started when the API is created.
+func (api *PublicFilterAPI) timeoutLoop() {
+ var toUninstall []*Subscription
+ ticker := time.NewTicker(deadline)
+ defer ticker.Stop()
+ for {
+ <-ticker.C
+ api.filtersMu.Lock()
+ for id, f := range api.filters {
+ select {
+ case <-f.deadline.C:
+ toUninstall = append(toUninstall)
+ delete(api.filters, id)
+ default:
+ continue
+ }
+ }
+ api.filtersMu.Unlock()
+
+ // Unsubscribes are processed outside the lock to avoid the following scenario:
+ // event loop attempts broadcasting events to still active filters while
+ // Unsubscribe is waiting for it to process the uninstall request.
+ for _, s := range toUninstall {
+ s.Unsubscribe()
+ }
+ toUninstall = nil
+ }
+}
+
+// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
+// as transactions enter the pending state.
+//
+// It is part of the filter package because this filter can be used through the
+// `eth_getFilterChanges` polling method that is also used for log filters.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newPendingTransactionFilter
+func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
+ var (
+ pendingTxs = make(chan []common.Hash)
+ pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
+ )
+
+ api.filtersMu.Lock()
+ api.filters[pendingTxSub.ID] = &filter{typ: filters.PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case ph := <-pendingTxs:
+ api.filtersMu.Lock()
+ if f, found := api.filters[pendingTxSub.ID]; found {
+ f.hashes = append(f.hashes, ph...)
+ }
+ api.filtersMu.Unlock()
+ case <-pendingTxSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, pendingTxSub.ID)
+ api.filtersMu.Unlock()
+ return
+ }
+ }
+ }()
+
+ return pendingTxSub.ID
+}
+
+// NewPendingTransactions creates a subscription that is triggered each time a transaction
+// enters the transaction pool and was signed from one of the transactions this nodes manages.
+func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+ }
+ rpcSub := notifier.CreateSubscription()
+ go func() {
+ txHashes := make(chan []common.Hash, 128)
+ pendingTxSub := api.events.SubscribePendingTxs(txHashes)
+ for {
+ select {
+ case hashes := <-txHashes:
+ // To keep the original behaviour, send a single tx hash in one notification.
+ // TODO(rjl493456442) Send a batch of tx hashes in one notification
+ for _, h := range hashes {
+ notifier.Notify(rpcSub.ID, h)
+ }
+ case <-rpcSub.Err():
+ pendingTxSub.Unsubscribe()
+ return
+ case <-notifier.Closed():
+ pendingTxSub.Unsubscribe()
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
+
+// NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
+// It is part of the filter package since polling goes with eth_getFilterChanges.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter
+func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
+ var (
+ headers = make(chan *ethtypes.Header)
+ headerSub = api.events.SubscribeNewHeads(headers)
+ )
+
+ api.filtersMu.Lock()
+ api.filters[headerSub.ID] = &filter{typ: filters.BlocksSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: headerSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case h := <-headers:
+ api.filtersMu.Lock()
+ if f, found := api.filters[headerSub.ID]; found {
+ f.hashes = append(f.hashes, h.Hash())
+ }
+ api.filtersMu.Unlock()
+ case <-headerSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, headerSub.ID)
+ api.filtersMu.Unlock()
+ return
+ }
+ }
+ }()
+
+ return headerSub.ID
+}
+
+// NewHeads send a notification each time a new (header) block is appended to the chain.
+func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+ }
+
+ rpcSub := notifier.CreateSubscription()
+
+ go func() {
+ headers := make(chan *ethtypes.Header)
+ headersSub := api.events.SubscribeNewHeads(headers)
+
+ for {
+ select {
+ case h := <-headers:
+ notifier.Notify(rpcSub.ID, h)
+ case <-rpcSub.Err():
+ headersSub.Unsubscribe()
+ return
+ case <-notifier.Closed():
+ headersSub.Unsubscribe()
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
+
+// Logs creates a subscription that fires for all new log that match the given filter criteria.
+func (api *PublicFilterAPI) Logs(ctx context.Context, crit filters.FilterCriteria) (*rpc.Subscription, error) {
+ notifier, supported := rpc.NotifierFromContext(ctx)
+ if !supported {
+ return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
+ }
+
+ var (
+ rpcSub = notifier.CreateSubscription()
+ matchedLogs = make(chan []*ethtypes.Log)
+ )
+
+ logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs)
+ if err != nil {
+ return nil, err
+ }
+
+ go func() {
+
+ for {
+ select {
+ case logs := <-matchedLogs:
+ for _, log := range logs {
+ notifier.Notify(rpcSub.ID, &log)
+ }
+ case <-rpcSub.Err(): // client send an unsubscribe request
+ logsSub.Unsubscribe()
+ return
+ case <-notifier.Closed(): // connection dropped
+ logsSub.Unsubscribe()
+ return
+ }
+ }
+ }()
+
+ return rpcSub, nil
+}
+
+// FilterCriteria represents a request to create a new filter.
+// Same as ethereum.FilterQuery but with UnmarshalJSON() method.
+type FilterCriteria ethereum.FilterQuery
+
+// NewFilter creates a new filter and returns the filter id. It can be
+// used to retrieve logs when the state changes. This method cannot be
+// used to fetch logs that are already stored in the state.
+//
+// Default criteria for the from and to block are "latest".
+// Using "latest" as block number will return logs for mined blocks.
+// Using "pending" as block number returns logs for not yet mined (pending) blocks.
+// In case logs are removed (chain reorg) previously returned logs are returned
+// again but with the removed property set to true.
+//
+// In case "fromBlock" > "toBlock" an error is returned.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter
+func (api *PublicFilterAPI) NewFilter(criteria filters.FilterCriteria) (rpc.ID, error) {
+ logs := make(chan []*ethtypes.Log)
+ logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(criteria), logs)
+ if err != nil {
+ return "", err
+ }
+
+ api.filtersMu.Lock()
+ api.filters[logsSub.ID] = &filter{typ: filters.LogsSubscription, crit: criteria, deadline: time.NewTimer(api.timeout), logs: make([]*ethtypes.Log, 0), s: logsSub}
+ api.filtersMu.Unlock()
+
+ go func() {
+ for {
+ select {
+ case l := <-logs:
+ api.filtersMu.Lock()
+ if f, found := api.filters[logsSub.ID]; found {
+ f.logs = append(f.logs, l...)
+ }
+ api.filtersMu.Unlock()
+ case <-logsSub.Err():
+ api.filtersMu.Lock()
+ delete(api.filters, logsSub.ID)
+ api.filtersMu.Unlock()
+ return
+ }
+ }
+ }()
+
+ return logsSub.ID, nil
+}
+
+// GetLogs returns logs matching the given argument that are stored within the state.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getLogs
+func (api *PublicFilterAPI) GetLogs(ctx context.Context, criteria filters.FilterCriteria) ([]*ethtypes.Log, error) {
+ var filter *Filter
+
+ if criteria.BlockHash != nil {
+ // Block filter requested, construct a single-shot filter
+ filter = NewBlockFilter(api.backend, criteria)
+ } else {
+ // Convert the RPC block numbers into internal representations
+ begin := rpc.LatestBlockNumber.Int64()
+ if criteria.FromBlock != nil {
+ begin = criteria.FromBlock.Int64()
+ }
+ end := rpc.LatestBlockNumber.Int64()
+ if criteria.ToBlock != nil {
+ end = criteria.ToBlock.Int64()
+ }
+ // Construct the range filter
+ filter = NewRangeFilter(api.backend, begin, end, criteria.Addresses, criteria.Topics)
+ }
+
+ // Run the filter and return all the logs
+ logs, err := filter.Logs(ctx)
+ if err != nil {
+ return logs, err
+ }
+ return returnLogs(logs), nil
+}
+
+// UninstallFilter removes the filter with the given filter id.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter
+func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool {
+ api.filtersMu.Lock()
+ f, found := api.filters[id]
+ if found {
+ delete(api.filters, id)
+ }
+ api.filtersMu.Unlock()
+ if found {
+ f.s.Unsubscribe()
+ }
+
+ return found
+}
+
+// GetFilterLogs returns the logs for the filter with the given id.
+// If the filter could not be found an empty array of logs is returned.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs
+func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ethtypes.Log, error) {
+ api.filtersMu.Lock()
+ f, found := api.filters[id]
+ api.filtersMu.Unlock()
+
+ if !found {
+ return returnLogs(nil), fmt.Errorf("filter %s not found", id)
+ }
+
+ if f.typ != filters.LogsSubscription {
+ return returnLogs(nil), fmt.Errorf("filter %s doesn't have a LogsSubscription type: got %d", id, f.typ)
+ }
+
+ var filter *Filter
+ var err error
+ if f.crit.BlockHash != nil {
+ // Block filter requested, construct a single-shot filter
+ filter = NewBlockFilter(api.backend, f.crit)
+ } else {
+ // Convert the RPC block numbers into internal representations
+ begin := rpc.LatestBlockNumber.Int64()
+ if f.crit.FromBlock != nil {
+ begin = f.crit.FromBlock.Int64()
+ }
+ end := rpc.LatestBlockNumber.Int64()
+ if f.crit.ToBlock != nil {
+ end = f.crit.ToBlock.Int64()
+ }
+ // Construct the range filter
+ filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
+ }
+ // Run the filter and return all the logs
+ logs, err := filter.Logs(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return returnLogs(logs), nil
+}
+
+// GetFilterChanges returns the logs for the filter with the given id since
+// last time it was called. This can be used for polling.
+//
+// For pending transaction and block filters the result is []common.Hash.
+// (pending)Log filters return []Log.
+//
+// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
+func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
+ api.filtersMu.Lock()
+ defer api.filtersMu.Unlock()
+
+ f, found := api.filters[id]
+ if !found {
+ return nil, fmt.Errorf("filter %s not found", id)
+ }
+
+ if !f.deadline.Stop() {
+ // timer expired but filter is not yet removed in timeout loop
+ // receive timer value and reset timer
+ <-f.deadline.C
+ }
+ f.deadline.Reset(deadline)
+
+ switch f.typ {
+ case filters.PendingTransactionsSubscription, filters.BlocksSubscription:
+ hashes := f.hashes
+ f.hashes = nil
+ return returnHashes(hashes), nil
+ case filters.LogsSubscription, filters.MinedAndPendingLogsSubscription:
+ logs := make([]*ethtypes.Log, len(f.logs))
+ copy(logs, f.logs)
+ f.logs = []*ethtypes.Log{}
+ return returnLogs(logs), nil
+ default:
+ return nil, fmt.Errorf("invalid filter %s type %d", id, f.typ)
+ }
+}
diff --git a/http/ethrpc/filters/filter.go b/http/ethrpc/filters/filter.go
new file mode 100644
index 0000000000..6a3035f1b1
--- /dev/null
+++ b/http/ethrpc/filters/filter.go
@@ -0,0 +1,489 @@
+/*
+ * Copyright (C) 2018 The ontology Authors
+ * This file is part of The ontology library.
+ *
+ * The ontology is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The ontology is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with The ontology. If not, see .
+ */
+
+package filters
+
+import (
+ "context"
+ "fmt"
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/bloombits"
+ ethtypes "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/eth/filters"
+ common2 "github.com/ontio/ontology/common"
+ common4 "github.com/ontio/ontology/core/store/common"
+ "github.com/ontio/ontology/core/types"
+ "github.com/ontio/ontology/http/base/actor"
+ utils2 "github.com/ontio/ontology/http/ethrpc/utils"
+ "github.com/ontio/ontology/smartcontract/event"
+)
+
+const MAX_SEARCH_RANGE = 100000
+
+type Backend interface {
+ BloomStatus() (uint32, uint32)
+ ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
+}
+
+// Filter can be used to retrieve and filter logs.
+type Filter struct {
+ backend Backend
+ criteria filters.FilterCriteria
+ matcher *bloombits.Matcher
+}
+
+// NewBlockFilter creates a new filter which directly inspects the contents of
+// a block to figure out whether it is interesting or not.
+func NewBlockFilter(backend Backend, criteria filters.FilterCriteria) *Filter {
+ // Create a generic filter and convert it into a block filter
+ return newFilter(backend, criteria, nil)
+}
+
+// NewRangeFilter creates a new filter which uses a bloom filter on blocks to
+// figure out whether a particular block is interesting or not.
+func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
+ // Flatten the address and topic filter clauses into a single bloombits filter
+ // system. Since the bloombits are not positional, nil topics are permitted,
+ // which get flattened into a nil byte slice.
+ var filtersBz [][][]byte // nolint: prealloc
+ if len(addresses) > 0 {
+ filter := make([][]byte, len(addresses))
+ for i, address := range addresses {
+ filter[i] = address.Bytes()
+ }
+ filtersBz = append(filtersBz, filter)
+ }
+
+ for _, topicList := range topics {
+ filter := make([][]byte, len(topicList))
+ for i, topic := range topicList {
+ filter[i] = topic.Bytes()
+ }
+ filtersBz = append(filtersBz, filter)
+ }
+
+ size, _ := actor.BloomStatus()
+
+ // Create a generic filter and convert it into a range filter
+ criteria := filters.FilterCriteria{
+ FromBlock: big.NewInt(begin),
+ ToBlock: big.NewInt(end),
+ Addresses: addresses,
+ Topics: topics,
+ }
+
+ return newFilter(backend, criteria, bloombits.NewMatcher(uint64(size), filtersBz))
+}
+
+// newFilter returns a new Filter
+func newFilter(backend Backend, criteria filters.FilterCriteria, matcher *bloombits.Matcher) *Filter {
+ return &Filter{
+ backend: backend,
+ criteria: criteria,
+ matcher: matcher,
+ }
+}
+
+// Logs searches the blockchain for matching log entries, returning all from the
+// first block that contains matches, updating the start of the filter accordingly.
+func (f *Filter) Logs(ctx context.Context) ([]*ethtypes.Log, error) {
+ var logs []*ethtypes.Log
+ var err error
+
+ // If we're doing singleton block filtering, execute and return
+ if f.criteria.BlockHash != nil && f.criteria.BlockHash != (&common.Hash{}) {
+ block, err := actor.GetBlockFromStore(common2.Uint256(*f.criteria.BlockHash))
+ if err != nil {
+ return nil, err
+ }
+ if block.Header == nil {
+ return nil, fmt.Errorf("unknown block header %s", f.criteria.BlockHash.String())
+ }
+ bloom, err := actor.GetBloomData(block.Header.Height)
+ if err != nil {
+ return nil, err
+ }
+ return f.blockLogs(bloom, common2.Uint256(*f.criteria.BlockHash))
+ }
+
+ // Figure out the limits of the filter range
+ curHeight := actor.GetCurrentBlockHeight()
+ block, err := actor.GetBlockByHeight(curHeight)
+ if err != nil {
+ return nil, err
+ }
+
+ if block == nil {
+ return nil, nil
+ }
+
+ if f.criteria.FromBlock.Int64() == -1 {
+ f.criteria.FromBlock = big.NewInt(int64(curHeight))
+ }
+ if f.criteria.ToBlock.Int64() == -1 {
+ f.criteria.ToBlock = big.NewInt(int64(curHeight))
+ }
+
+ start := actor.GetFilterStart()
+
+ if f.criteria.FromBlock.Int64() < int64(start) ||
+ f.criteria.ToBlock.Int64() < int64(start) {
+ return nil, fmt.Errorf("from and to block height must greater than %d", int64(start))
+ }
+
+ if f.criteria.ToBlock.Int64()-f.criteria.FromBlock.Int64() > MAX_SEARCH_RANGE {
+ return nil, fmt.Errorf("the span between fromBlock and toBlock must be less than or equal to %d", MAX_SEARCH_RANGE)
+ }
+
+ begin := f.criteria.FromBlock.Uint64()
+ end := f.criteria.ToBlock.Uint64()
+ size, sections := actor.BloomStatus()
+
+ if indexed := uint64(sections * size); indexed > begin {
+ if indexed > end {
+ logs, err = f.indexedLogs(ctx, end)
+ } else {
+ logs, err = f.indexedLogs(ctx, indexed-1)
+ }
+ if err != nil {
+ return logs, err
+ }
+ }
+ rest, err := f.unindexedLogs(ctx, end)
+ logs = append(logs, rest...)
+ return logs, err
+}
+
+// blockLogs returns the logs matching the filter criteria within a single block.
+func (f *Filter) blockLogs(bloom ethtypes.Bloom, hash common2.Uint256) ([]*ethtypes.Log, error) {
+ if !bloomFilter(bloom, f.criteria.Addresses, f.criteria.Topics) {
+ return []*ethtypes.Log{}, nil
+ }
+
+ logsList, err := getLogs(hash)
+ if err != nil {
+ return []*ethtypes.Log{}, err
+ }
+
+ var unfiltered []*ethtypes.Log // nolint: prealloc
+ for _, logs := range logsList {
+ unfiltered = append(unfiltered, logs...)
+ }
+ logs := FilterLogs(unfiltered, nil, nil, f.criteria.Addresses, f.criteria.Topics)
+ if len(logs) == 0 {
+ return []*ethtypes.Log{}, nil
+ }
+ return logs, nil
+}
+
+func getLogs(hash common2.Uint256) ([][]*ethtypes.Log, error) {
+ block, err := actor.GetBlockFromStore(hash)
+ if err != nil {
+ if err == common4.ErrNotFound {
+ return nil, err
+ }
+ return nil, err
+ }
+
+ var res [][]*ethtypes.Log
+ for _, tx := range block.Transactions {
+ if tx.TxType != types.EIP155 {
+ continue
+ }
+ notify, err := actor.GetEventNotifyByTxHash(tx.Hash())
+ if err != nil {
+ if err == common4.ErrNotFound {
+ continue
+ }
+ return nil, err
+ }
+ if notify != nil {
+ txLogs, err := generateLog(notify)
+ if err != nil {
+ return nil, err
+ }
+ if txLogs != nil && len(txLogs) != 0 {
+ res = append(res, txLogs)
+ }
+ }
+ }
+ return res, nil
+}
+
+func generateLog(rawNotify *event.ExecuteNotify) ([]*ethtypes.Log, error) {
+ var res []*ethtypes.Log
+ txHash := rawNotify.TxHash
+ height, _, err := actor.GetTxnWithHeightByTxHash(txHash)
+ if err != nil {
+ return nil, err
+ }
+ hash := actor.GetBlockHashFromStore(height)
+ ethHash := utils2.OntToEthHash(hash)
+ for idx, n := range rawNotify.Notify {
+ storageLog, err := event.NotifyEventInfoToEvmLog(n)
+ if err != nil {
+ return nil, err
+ }
+ res = append(res,
+ ðtypes.Log{
+ Address: storageLog.Address,
+ Topics: storageLog.Topics,
+ Data: storageLog.Data,
+ BlockNumber: uint64(height),
+ TxHash: utils2.OntToEthHash(txHash),
+ TxIndex: uint(rawNotify.TxIndex),
+ BlockHash: ethHash,
+ Index: uint(idx),
+ Removed: false,
+ })
+ }
+
+ return res, nil
+}
+
+// checkMatches checks if the receipts belonging to the given header contain any log events that
+// match the filter criteria. This function is called when the bloom filter signals a potential match.
+func (f *Filter) checkMatches(hash common2.Uint256) (logs []*ethtypes.Log, err error) {
+ // Get the logs of the block
+ logsList, err := getLogs(hash)
+ if err != nil {
+ return nil, err
+ }
+ var unfiltered []*ethtypes.Log
+ for _, logs := range logsList {
+ unfiltered = append(unfiltered, logs...)
+ }
+ logs = filterLogs(unfiltered, nil, nil, f.criteria.Addresses, f.criteria.Topics)
+ return logs, nil
+}
+
+// indexedLogs returns the logs matching the filter criteria based on the bloom
+// bits indexed available locally or via the network.
+func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*ethtypes.Log, error) {
+ // Create a matcher session and request servicing from the backend
+ matches := make(chan uint64, 64)
+
+ session, err := f.matcher.Start(ctx, f.criteria.FromBlock.Uint64(), end, matches)
+ if err != nil {
+ return nil, err
+ }
+ defer session.Close()
+ f.backend.ServiceFilter(ctx, session)
+
+ // Iterate over the matches until exhausted or context closed
+ var logs []*ethtypes.Log
+
+ bigEnd := big.NewInt(int64(end))
+ for {
+ select {
+ case number, ok := <-matches:
+
+ // Abort if all matches have been fulfilled
+ if !ok {
+ err := session.Error()
+ if err == nil {
+ f.criteria.FromBlock = bigEnd.Add(bigEnd, big.NewInt(1))
+ }
+ return logs, err
+ }
+ f.criteria.FromBlock = big.NewInt(int64(number)).Add(big.NewInt(int64(number)), big.NewInt(1))
+
+ // Retrieve the suggested block and pull any truly matching logs
+ block, err := actor.GetBlockByHeight(uint32(number))
+ if err != nil {
+ return nil, err
+ }
+ if block == nil {
+ return nil, fmt.Errorf("block %v not found", number)
+ }
+ found, err := f.checkMatches(block.Hash())
+ if err != nil {
+ return logs, err
+ }
+ logs = append(logs, found...)
+
+ case <-ctx.Done():
+ return logs, ctx.Err()
+ }
+ }
+}
+
+// unindexedLogs returns the logs matching the filter criteria based on raw block
+// iteration and bloom matching.
+func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*ethtypes.Log, error) {
+ var logs []*ethtypes.Log
+ begin := f.criteria.FromBlock.Int64()
+ beginPtr := &begin
+ defer f.criteria.FromBlock.SetInt64(*beginPtr)
+
+ for ; begin <= int64(end); begin++ {
+ block, err := actor.GetBlockByHeight(uint32(begin))
+ if err != nil {
+ return nil, err
+ }
+ if block == nil {
+ return logs, nil
+ }
+ if block.Header == nil {
+ return nil, fmt.Errorf("unknown block header %s", f.criteria.BlockHash.String())
+ }
+ bloom, err := actor.GetBloomData(block.Header.Height)
+ if err != nil {
+ return nil, err
+ }
+ found, err := f.blockLogs(bloom, block.Hash())
+ if err != nil {
+ return logs, err
+ }
+ logs = append(logs, found...)
+ }
+ return logs, nil
+}
+
+// filterLogs creates a slice of logs matching the given criteria.
+func filterLogs(logs []*ethtypes.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*ethtypes.Log {
+ var ret []*ethtypes.Log
+Logs:
+ for _, log := range logs {
+ if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber {
+ continue
+ }
+ if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber {
+ continue
+ }
+
+ if len(addresses) > 0 && !includes(addresses, log.Address) {
+ continue
+ }
+ // If the to filtered topics is greater than the amount of topics in logs, skip.
+ if len(topics) > len(log.Topics) {
+ continue Logs
+ }
+ for i, sub := range topics {
+ match := len(sub) == 0 // empty rule set == wildcard
+ for _, topic := range sub {
+ if log.Topics[i] == topic {
+ match = true
+ break
+ }
+ }
+ if !match {
+ continue Logs
+ }
+ }
+ ret = append(ret, log)
+ }
+ return ret
+}
+
+// filterLogs creates a slice of logs matching the given criteria.
+// [] -> anything
+// [A] -> A in first position of log topics, anything after
+// [null, B] -> anything in first position, B in second position
+// [A, B] -> A in first position and B in second position
+// [[A, B], [A, B]] -> A or B in first position, A or B in second position
+func FilterLogs(logs []*ethtypes.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*ethtypes.Log {
+ var ret []*ethtypes.Log
+Logs:
+ for _, log := range logs {
+ if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber {
+ continue
+ }
+ if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber {
+ continue
+ }
+ if len(addresses) > 0 && !includes(addresses, log.Address) {
+ continue
+ }
+ // If the to filtered topics is greater than the amount of topics in logs, skip.
+ if len(topics) > len(log.Topics) {
+ continue
+ }
+ for i, sub := range topics {
+ match := len(sub) == 0 // empty rule set == wildcard
+ for _, topic := range sub {
+ if log.Topics[i] == topic {
+ match = true
+ break
+ }
+ }
+ if !match {
+ continue Logs
+ }
+ }
+ ret = append(ret, log)
+ }
+ return ret
+}
+
+func includes(addresses []common.Address, a common.Address) bool {
+ for _, addr := range addresses {
+ if addr == a {
+ return true
+ }
+ }
+
+ return false
+}
+
+func bloomFilter(bloom ethtypes.Bloom, addresses []common.Address, topics [][]common.Hash) bool {
+ var included bool = true
+ if len(addresses) > 0 {
+ included = false
+ for _, addr := range addresses {
+ if ethtypes.BloomLookup(bloom, addr) {
+ included = true
+ break
+ }
+ }
+ if !included {
+ return false
+ }
+ }
+
+ for _, sub := range topics {
+ included = len(sub) == 0 // empty rule set == wildcard
+ for _, topic := range sub {
+ if ethtypes.BloomLookup(bloom, topic) {
+ included = true
+ break
+ }
+ }
+ }
+ return included
+}
+
+// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
+// otherwise the given hashes array is returned.
+func returnHashes(hashes []common.Hash) []common.Hash {
+ if hashes == nil {
+ return []common.Hash{}
+ }
+ return hashes
+}
+
+// returnLogs is a helper that will return an empty log array in case the given logs array is nil,
+// otherwise the given logs array is returned.
+func returnLogs(logs []*ethtypes.Log) []*ethtypes.Log {
+ if logs == nil {
+ return []*ethtypes.Log{}
+ }
+ return logs
+}
diff --git a/http/ethrpc/filters/filter_system.go b/http/ethrpc/filters/filter_system.go
new file mode 100644
index 0000000000..6eff2664be
--- /dev/null
+++ b/http/ethrpc/filters/filter_system.go
@@ -0,0 +1,316 @@
+/*
+ * Copyright (C) 2018 The ontology Authors
+ * This file is part of The ontology library.
+ *
+ * The ontology is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The ontology is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with The ontology. If not, see .
+ */
+package filters
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ontio/ontology/events/message"
+ bactor "github.com/ontio/ontology/http/base/actor"
+)
+
+// Type determines the kind of filter and is used to put the filter in to
+// the correct bucket when added.
+type Type byte
+
+const (
+ // UnknownSubscription indicates an unknown subscription type
+ UnknownSubscription Type = iota
+ // LogsSubscription queries for new or removed (chain reorg) logs
+ LogsSubscription
+ // PendingTransactionsSubscription queries tx hashes for pending
+ // transactions entering the pending state
+ PendingTransactionsSubscription
+ // BlocksSubscription queries hashes for blocks that are imported
+ BlocksSubscription
+ // LastSubscription keeps track of the last index
+ LastIndexSubscription
+)
+
+const (
+ // txChanSize is the size of channel listening to NewTxsEvent.
+ // The number is referenced from the size of tx pool.
+ txChanSize = 4096
+ // logsChanSize is the size of channel listening to LogsEvent.
+ logsChanSize = 10
+ // chainEvChanSize is the size of channel listening to ChainEvent.
+ chainEvChanSize = 10
+)
+
+type subscription struct {
+ id rpc.ID
+ typ Type
+ created time.Time
+ logsCrit ethereum.FilterQuery
+ logs chan []*types.Log
+ hashes chan []common.Hash
+ headers chan *types.Header
+ installed chan struct{} // closed when the filter is installed
+ err chan error // closed when the filter is uninstalled
+}
+
+// EventSystem creates subscriptions, processes events and broadcasts them to the
+// subscription which match the subscription criteria.
+type EventSystem struct {
+ backend Backend
+ lastHead *types.Header
+
+ // Channels
+ install chan *subscription // install filter for event notification
+ uninstall chan *subscription // remove filter for event notification
+ txsCh chan core.NewTxsEvent // Channel to receive new transactions event
+ logsCh chan []*types.Log // Channel to receive new log event
+ chainCh chan core.ChainEvent // Channel to receive new chain event
+}
+
+// NewEventSystem creates a new manager that listens for event on the given mux,
+// parses and filters them. It uses the all map to retrieve filter changes. The
+// work loop holds its own index that is used to forward events to filters.
+//
+// The returned manager has a loop that needs to be stopped with the Stop function
+// or by stopping the given mux.
+func NewEventSystem(backend Backend) *EventSystem {
+ m := &EventSystem{
+ backend: backend,
+ install: make(chan *subscription),
+ uninstall: make(chan *subscription),
+ txsCh: make(chan core.NewTxsEvent, txChanSize),
+ logsCh: make(chan []*types.Log, logsChanSize),
+ chainCh: make(chan core.ChainEvent, chainEvChanSize),
+ }
+
+ // Subscribe events
+ bactor.SubscribeEvent(message.TOPIC_PENDING_TX_EVENT, m.pushPendingTxEvent)
+ bactor.SubscribeEvent(message.TOPIC_ETH_SC_EVENT, m.pushSCEvent)
+ bactor.SubscribeEvent(message.TOPIC_CHAIN_EVENT, m.pushChainEvent)
+
+ go m.eventLoop()
+ return m
+}
+
+func (es *EventSystem) pushPendingTxEvent(v interface{}) {
+ rs, ok := v.(message.PendingTxs)
+ if !ok {
+ return
+ }
+ es.txsCh <- core.NewTxsEvent{
+ Txs: rs,
+ }
+}
+
+func (es *EventSystem) pushSCEvent(v interface{}) {
+ rs, ok := v.(message.EthSmartCodeEvent)
+ if !ok {
+ return
+ }
+ es.logsCh <- rs
+}
+
+func (es *EventSystem) pushChainEvent(v interface{}) {
+ rs, ok := v.(core.ChainEvent)
+ if !ok {
+ return
+ }
+ es.chainCh <- rs
+}
+
+// Subscription is created when the client registers itself for a particular event.
+type Subscription struct {
+ ID rpc.ID
+ f *subscription
+ es *EventSystem
+ unsubOnce sync.Once
+}
+
+// Err returns a channel that is closed when unsubscribed.
+func (sub *Subscription) Err() <-chan error {
+ return sub.f.err
+}
+
+// Unsubscribe uninstalls the subscription from the event broadcast loop.
+func (sub *Subscription) Unsubscribe() {
+ sub.unsubOnce.Do(func() {
+ uninstallLoop:
+ for {
+ // write uninstall request and consume logs/hashes. This prevents
+ // the eventLoop broadcast method to deadlock when writing to the
+ // filter event channel while the subscription loop is waiting for
+ // this method to return (and thus not reading these events).
+ select {
+ case sub.es.uninstall <- sub.f:
+ break uninstallLoop
+ case <-sub.f.logs:
+ case <-sub.f.hashes:
+ case <-sub.f.headers:
+ }
+ }
+
+ // wait for filter to be uninstalled in work loop before returning
+ // this ensures that the manager won't use the event channel which
+ // will probably be closed by the client asap after this method returns.
+ <-sub.Err()
+ })
+}
+
+// subscribe installs the subscription in the event broadcast loop.
+func (es *EventSystem) subscribe(sub *subscription) *Subscription {
+ es.install <- sub
+ <-sub.installed
+ return &Subscription{ID: sub.id, f: sub, es: es}
+}
+
+// SubscribeLogs creates a subscription that will write all logs matching the
+// given criteria to the given logs channel. Default value for the from and to
+// block is "latest". If the fromBlock > toBlock an error is returned.
+func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) (*Subscription, error) {
+ var from, to rpc.BlockNumber
+ if crit.FromBlock == nil {
+ from = rpc.LatestBlockNumber
+ } else {
+ from = rpc.BlockNumber(crit.FromBlock.Int64())
+ }
+ if crit.ToBlock == nil {
+ to = rpc.LatestBlockNumber
+ } else {
+ to = rpc.BlockNumber(crit.ToBlock.Int64())
+ }
+
+ // only interested in new mined logs
+ if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber {
+ return es.subscribeLogs(crit, logs), nil
+ }
+ // only interested in mined logs within a specific block range
+ if from >= 0 && to >= 0 && to >= from {
+ return es.subscribeLogs(crit, logs), nil
+ }
+
+ // interested in logs from a specific block number to new mined blocks
+ if from >= 0 && to == rpc.LatestBlockNumber {
+ return es.subscribeLogs(crit, logs), nil
+ }
+ return nil, fmt.Errorf("invalid from and to block combination: from > to")
+}
+
+// subscribeLogs creates a subscription that will write all logs matching the
+// given criteria to the given logs channel.
+func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: LogsSubscription,
+ logsCrit: crit,
+ created: time.Now(),
+ logs: logs,
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+ return es.subscribe(sub)
+}
+
+// SubscribeNewHeads creates a subscription that writes the header of a block that is
+// imported in the chain.
+func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: BlocksSubscription,
+ created: time.Now(),
+ headers: headers,
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+ return es.subscribe(sub)
+}
+
+// SubscribePendingTxs creates a subscription that writes transaction hashes for
+// transactions that enter the transaction pool.
+func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: PendingTransactionsSubscription,
+ created: time.Now(),
+ hashes: hashes,
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+ return es.subscribe(sub)
+}
+
+type filterIndex map[Type]map[rpc.ID]*subscription
+
+func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
+ if len(ev) == 0 {
+ return
+ }
+ for _, f := range filters[LogsSubscription] {
+ matchedLogs := filterLogs(ev, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
+ if len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
+ }
+}
+
+func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
+ hashes := make([]common.Hash, 0, len(ev.Txs))
+ for _, tx := range ev.Txs {
+ hashes = append(hashes, tx.Hash())
+ }
+ for _, f := range filters[PendingTransactionsSubscription] {
+ f.hashes <- hashes
+ }
+}
+
+func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) {
+ for _, f := range filters[BlocksSubscription] {
+ f.headers <- ev.Block.Header()
+ }
+}
+
+// eventLoop (un)installs filters and processes mux events.
+func (es *EventSystem) eventLoop() {
+ // Ensure all subscriptions get cleaned up
+
+ index := make(filterIndex)
+ for i := UnknownSubscription; i < LastIndexSubscription; i++ {
+ index[i] = make(map[rpc.ID]*subscription)
+ }
+
+ for {
+ select {
+ case ev := <-es.txsCh:
+ es.handleTxsEvent(index, ev)
+ case ev := <-es.logsCh:
+ es.handleLogs(index, ev)
+ case ev := <-es.chainCh:
+ es.handleChainEvent(index, ev)
+
+ case f := <-es.install:
+ index[f.typ][f.id] = f
+ close(f.installed)
+
+ case f := <-es.uninstall:
+ delete(index[f.typ], f.id)
+ close(f.err)
+ }
+ }
+}
diff --git a/http/ethrpc/filters/filter_system_test.go b/http/ethrpc/filters/filter_system_test.go
new file mode 100644
index 0000000000..7ceb6f1189
--- /dev/null
+++ b/http/ethrpc/filters/filter_system_test.go
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) 2018 The ontology Authors
+ * This file is part of The ontology library.
+ *
+ * The ontology is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The ontology is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with The ontology. If not, see .
+ */
+
+package filters
+
+import (
+ "fmt"
+ "math/big"
+ "testing"
+ "time"
+
+ common2 "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ontio/ontology/events"
+ "github.com/ontio/ontology/events/message"
+)
+
+func TestNewEventSystem(t *testing.T) {
+ events.Init()
+
+ es := NewEventSystem(nil)
+ fmt.Println("es:", es)
+
+ header := &types.Header{Number: big.NewInt(1)}
+ chainEvtMsg := &message.ChainEventMsg{
+ ChainEvent: &core.ChainEvent{
+ Block: types.NewBlockWithHeader(header),
+ Hash: common2.Hash{},
+ Logs: make([]*types.Log, 0),
+ },
+ }
+ events.DefActorPublisher.Publish(message.TOPIC_CHAIN_EVENT, chainEvtMsg)
+ time.Sleep(time.Second)
+
+ scEvt := make(message.EthSmartCodeEvent, 0)
+ scEvt = append(scEvt, &types.Log{})
+ ethLog := &message.EthSmartCodeEventMsg{Event: scEvt}
+ events.DefActorPublisher.Publish(message.TOPIC_ETH_SC_EVENT, ethLog)
+ time.Sleep(time.Second)
+
+ pendingTxEvt := &message.PendingTxMsg{Event: []*types.Transaction{&types.Transaction{}}}
+ events.DefActorPublisher.Publish(message.TOPIC_PENDING_TX_EVENT, pendingTxEvt)
+ time.Sleep(time.Second)
+}
diff --git a/http/ethrpc/rpc_server.go b/http/ethrpc/rpc_server.go
index b57553c369..d2771a04a0 100644
--- a/http/ethrpc/rpc_server.go
+++ b/http/ethrpc/rpc_server.go
@@ -25,7 +25,11 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
cfg "github.com/ontio/ontology/common/config"
+ "github.com/ontio/ontology/core/store/ledgerstore"
+ "github.com/ontio/ontology/http/base/actor"
+ backend2 "github.com/ontio/ontology/http/ethrpc/backend"
"github.com/ontio/ontology/http/ethrpc/eth"
+ filters2 "github.com/ontio/ontology/http/ethrpc/filters"
"github.com/ontio/ontology/http/ethrpc/net"
"github.com/ontio/ontology/http/ethrpc/utils"
"github.com/ontio/ontology/http/ethrpc/web3"
@@ -44,20 +48,24 @@ var (
func StartEthServer(txpool *tp.TXPoolServer) error {
log.Root().SetHandler(utils.OntLogHandler())
- ethAPI := eth.NewEthereumAPI(txpool)
server := rpc.NewServer()
- err := server.RegisterName("eth", ethAPI)
- if err != nil {
+ if err := server.RegisterName("eth", eth.NewEthereumAPI(txpool)); err != nil {
return err
}
- netRpcService := net.NewPublicNetAPI()
- err = server.RegisterName("net", netRpcService)
+
+ backend := backend2.NewBloomBackend()
+ err := backend.StartBloomHandlers(ledgerstore.BloomBitsBlocks, actor.GetIndexStore())
if err != nil {
return err
}
- web3API := web3.NewAPI()
- err = server.RegisterName("web3", web3API)
- if err != nil {
+
+ if err := server.RegisterName("eth", filters2.NewPublicFilterAPI(backend)); err != nil {
+ return err
+ }
+ if err := server.RegisterName("net", net.NewPublicNetAPI()); err != nil {
+ return err
+ }
+ if err := server.RegisterName("web3", web3.NewAPI()); err != nil {
return err
}
diff --git a/http/ethrpc/utils/utils.go b/http/ethrpc/utils/utils.go
index c82bfcf94b..d471fae3e2 100644
--- a/http/ethrpc/utils/utils.go
+++ b/http/ethrpc/utils/utils.go
@@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
types2 "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/trie"
oComm "github.com/ontio/ontology/common"
sysconfig "github.com/ontio/ontology/common/config"
"github.com/ontio/ontology/core/types"
@@ -47,6 +48,33 @@ func EthBlockFromOntology(block *types.Block, fullTx bool) map[string]interface{
return FormatBlock(*block, 0, gasUsed, blockTxs)
}
+func RawEthBlockFromOntology(block *types.Block, bloom types2.Bloom) *types2.Block {
+ if block == nil {
+ return nil
+ }
+ hash := block.Hash()
+ gasUsed, ethTxs := RawEthTransactionsFromOntology(block.Transactions, common.BytesToHash(hash.ToArray()), uint64(block.Header.Height))
+
+ h := &types2.Header{
+ ParentHash: common.Hash(block.Header.PrevBlockHash),
+ UncleHash: common.Hash{},
+ Coinbase: common.Address{},
+ Root: common.Hash{},
+ TxHash: common.Hash(block.Header.TransactionsRoot),
+ ReceiptHash: common.Hash{},
+ Bloom: bloom,
+ Difficulty: big.NewInt(0),
+ Number: big.NewInt(int64(block.Header.Height)),
+ GasLimit: 0,
+ GasUsed: gasUsed.Uint64(),
+ Time: uint64(block.Header.Timestamp),
+ Extra: []byte{},
+ MixDigest: common.Hash{},
+ Nonce: types2.BlockNonce{},
+ }
+ return types2.NewBlock(h, ethTxs, nil, nil, new(trie.Trie))
+}
+
func EthTransactionsFromOntology(txs []*types.Transaction, blockHash common.Hash, blockNumber uint64) ([]common.Hash, *big.Int, []*types3.Transaction) {
var transactionHashes []common.Hash
var transactions []*types3.Transaction
@@ -64,6 +92,22 @@ func EthTransactionsFromOntology(txs []*types.Transaction, blockHash common.Hash
return transactionHashes, gasUsed, transactions
}
+func RawEthTransactionsFromOntology(txs []*types.Transaction, blockHash common.Hash, blockNumber uint64) (*big.Int, []*types2.Transaction) {
+ var transactions []*types2.Transaction
+ gasUsed := big.NewInt(0)
+ for _, tx := range txs {
+ if tx.IsEipTx() {
+ eipTx, err := tx.GetEIP155Tx()
+ if err != nil {
+ continue
+ }
+ gasUsed.Add(gasUsed, big.NewInt(int64(eipTx.Gas())))
+ transactions = append(transactions, eipTx)
+ }
+ }
+ return gasUsed, transactions
+}
+
func OntTxToEthTx(tx types.Transaction, blockHash common.Hash, blockNumber, index uint64) (*types3.Transaction, error) {
eip155Tx, err := tx.GetEIP155Tx()
if err != nil {
@@ -81,7 +125,7 @@ func FormatBlock(block types.Block, gasLimit uint64, gasUsed *big.Int, transacti
"hash": hexutil.Bytes(hash[:]),
"parentHash": hexutil.Bytes(header.PrevBlockHash[:]),
"nonce": types2.BlockNonce{}, // PoW specific
- "sha3Uncles": common.Hash{}, // No uncles in Tendermint
+ "sha3Uncles": common.Hash{},
"logsBloom": types2.Bloom{},
"transactionsRoot": hexutil.Bytes(header.TransactionsRoot[:]),
"stateRoot": hexutil.Bytes{},
diff --git a/smartcontract/event/event.go b/smartcontract/event/event.go
index df6ce32a25..9eb9391f05 100644
--- a/smartcontract/event/event.go
+++ b/smartcontract/event/event.go
@@ -19,10 +19,14 @@
package event
import (
+ common2 "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core"
+ types3 "github.com/ethereum/go-ethereum/core/types"
"github.com/ontio/ontology/common"
"github.com/ontio/ontology/core/types"
"github.com/ontio/ontology/events"
"github.com/ontio/ontology/events/message"
+ utils2 "github.com/ontio/ontology/http/ethrpc/utils"
)
const (
@@ -43,3 +47,78 @@ func PushSmartCodeEvent(txHash common.Uint256, errcode int64, action string, res
}
events.DefActorPublisher.Publish(message.TOPIC_SMART_CODE_EVENT, &message.SmartCodeEventMsg{Event: smartCodeEvt})
}
+
+// PushSmartCodeEvent push event content to socket.io
+func PushEthSmartCodeEvent(rawNotify *ExecuteNotify, blk *types.Block) {
+ if events.DefActorPublisher == nil {
+ return
+ }
+ msg := extractSingleEthLog(rawNotify, blk)
+ events.DefActorPublisher.Publish(message.TOPIC_ETH_SC_EVENT, &message.EthSmartCodeEventMsg{Event: msg})
+}
+
+// PushSmartCodeEvent push event content to socket.io
+func PushChainEvent(rawNotify []*ExecuteNotify, blk *types.Block, bloom types3.Bloom) {
+ if events.DefActorPublisher == nil {
+ return
+ }
+ events.DefActorPublisher.Publish(
+ message.TOPIC_CHAIN_EVENT,
+ &message.ChainEventMsg{
+ ChainEvent: &core.ChainEvent{
+ Block: utils2.RawEthBlockFromOntology(blk, bloom),
+ Hash: common2.Hash(blk.Hash()),
+ Logs: extractEthLog(rawNotify, blk),
+ },
+ })
+}
+
+func extractSingleEthLog(rawNotify *ExecuteNotify, blk *types.Block) []*types3.Log {
+ var res []*types3.Log
+ if isEIP155Tx(blk, rawNotify.TxHash) {
+ res = genEthLog(rawNotify, blk)
+ }
+ return res
+}
+
+func extractEthLog(rawNotify []*ExecuteNotify, blk *types.Block) []*types3.Log {
+ var res []*types3.Log
+ for _, rn := range rawNotify {
+ res = append(res, extractSingleEthLog(rn, blk)...)
+ }
+ return res
+}
+
+func genEthLog(rawNotify *ExecuteNotify, blk *types.Block) []*types3.Log {
+ var res []*types3.Log
+ txHash := rawNotify.TxHash
+ ethHash := utils2.OntToEthHash(txHash)
+ for idx, n := range rawNotify.Notify {
+ storageLog, err := NotifyEventInfoToEvmLog(n)
+ if err != nil {
+ return nil
+ }
+ res = append(res,
+ &types3.Log{
+ Address: storageLog.Address,
+ Topics: storageLog.Topics,
+ Data: storageLog.Data,
+ BlockNumber: uint64(blk.Header.Height),
+ TxHash: utils2.OntToEthHash(txHash),
+ TxIndex: uint(rawNotify.TxIndex),
+ BlockHash: ethHash,
+ Index: uint(idx),
+ Removed: false,
+ })
+ }
+ return res
+}
+
+func isEIP155Tx(block *types.Block, txHash common.Uint256) bool {
+ for _, tx := range block.Transactions {
+ if tx.Hash() == txHash {
+ return tx.IsEipTx()
+ }
+ }
+ return false
+}
diff --git a/smartcontract/event/notify_event_args.go b/smartcontract/event/notify_event_args.go
index 7582b108a7..0dfd04b75d 100644
--- a/smartcontract/event/notify_event_args.go
+++ b/smartcontract/event/notify_event_args.go
@@ -19,13 +19,11 @@
package event
import (
- "errors"
"fmt"
- "github.com/ontio/ontology/common/constants"
-
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ontio/ontology/common"
+ "github.com/ontio/ontology/common/constants"
"github.com/ontio/ontology/core/types"
)
@@ -80,23 +78,27 @@ func NotifyEventInfoFromEvmLog(log *types.StorageLog) *NotifyEventInfo {
}
}
-func NotifyEventInfoToEvmLog(info *NotifyEventInfo) (*types.StorageLog, error) {
- n := info
+func NotifyEventInfoToEvmLog(n *NotifyEventInfo) (*types.StorageLog, error) {
if !n.IsEvm {
return nil, fmt.Errorf("not evm event")
}
- states, ok := n.States.(string)
- if !ok {
- return nil, errors.New("event info states is not string")
- }
- data, err := hexutil.Decode(states)
- if err != nil {
- return nil, err
+
+ var data []byte
+ var err error
+ switch val := n.States.(type) {
+ case string:
+ if data, err = hexutil.Decode(val); err != nil {
+ return nil, err
+ }
+ case hexutil.Bytes:
+ data = val
+ default:
+ return nil, fmt.Errorf("not support such states type")
}
+
source := common.NewZeroCopySource(data)
var storageLog types.StorageLog
- err = storageLog.Deserialization(source)
- if err != nil {
+ if err = storageLog.Deserialization(source); err != nil {
return nil, err
}
diff --git a/smartcontract/event/notify_event_args_test.go b/smartcontract/event/notify_event_args_test.go
new file mode 100644
index 0000000000..2306cc059a
--- /dev/null
+++ b/smartcontract/event/notify_event_args_test.go
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2018 The ontology Authors
+ * This file is part of The ontology library.
+ *
+ * The ontology is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The ontology is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with The ontology. If not, see .
+ */
+
+package event
+
+import (
+ "encoding/hex"
+ "fmt"
+ "math/big"
+ "testing"
+
+ "github.com/ontio/ontology/common"
+ "github.com/ontio/ontology/core/types"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestDeserialization(t *testing.T) {
+ data, err := hex.DecodeString("000000000000000000000000000000000000000203000000ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef000000000000000000000000a34886547e00d8f15eaf5a98f99f4f76aaeb3bd500000000000000000000000000000000000000000000000000000000000000072000000000000000000000000000000000000000000000000000ffa4f70a6cd800")
+ if err != nil {
+ panic(err)
+ }
+ source := common.NewZeroCopySource(data)
+ sl := &types.StorageLog{}
+ sl.Deserialization(source)
+ fmt.Println(sl.Address.String())
+ a := big.NewInt(0).SetBytes(sl.Data)
+ fmt.Println(a.String())
+ info := NotifyEventInfoFromEvmLog(sl)
+ sl2, err := NotifyEventInfoToEvmLog(info)
+ assert.Nil(t, err)
+ fmt.Println(sl2)
+}
diff --git a/txnpool/proc/txnpool_server.go b/txnpool/proc/txnpool_server.go
index 0c7a67e41b..d21a898364 100644
--- a/txnpool/proc/txnpool_server.go
+++ b/txnpool/proc/txnpool_server.go
@@ -33,6 +33,8 @@ import (
"github.com/ontio/ontology/core/ledger"
txtypes "github.com/ontio/ontology/core/types"
"github.com/ontio/ontology/errors"
+ "github.com/ontio/ontology/events"
+ "github.com/ontio/ontology/events/message"
msgpack "github.com/ontio/ontology/p2pserver/message/msg_pack"
p2p "github.com/ontio/ontology/p2pserver/net/protocol"
tc "github.com/ontio/ontology/txnpool/common"
@@ -218,6 +220,10 @@ func (s *TXPoolServer) setPendingTx(tx *txtypes.Transaction, sender tc.SenderTyp
}
s.allPendingTxs[tx.Hash()] = pt
+ if ethTx, err := tx.GetEIP155Tx(); err == nil && events.DefActorPublisher != nil {
+ events.DefActorPublisher.Publish(message.TOPIC_PENDING_TX_EVENT,
+ &message.PendingTxMsg{Event: []*ethtype.Transaction{ethTx}})
+ }
return pt
}