Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/state: Expanding the eth_subscribe mode #27617

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@ import (
)

// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }
type NewTxsEvent struct{
Txs []*types.Transaction
}

// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }
type NewMinedBlockEvent struct{
Block *types.Block
}

// RemovedLogsEvent is posted when a reorg happens
type RemovedLogsEvent struct{ Logs []*types.Log }
type RemovedLogsEvent struct{
Logs []*types.Log
}

type ChainEvent struct {
Block *types.Block
Expand All @@ -40,4 +46,10 @@ type ChainSideEvent struct {
Block *types.Block
}

type ChainHeadEvent struct{ Block *types.Block }
type ChainHeadEvent struct{
Block *types.Block
}

type AccountStateEvent struct {
Addresses []common.Hash
}
26 changes: 26 additions & 0 deletions core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
Expand Down Expand Up @@ -170,6 +171,7 @@ func (h storageBloomHasher) Sum64() uint64 {
// newDiffLayer creates a new diff on top of an existing snapshot, whether that's a low
// level persistent database or a hierarchical diff already.
func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer {
var feed event.Feed
// Create the new layer with some pre-allocated data segments
dl := &diffLayer{
parent: parent,
Expand All @@ -187,6 +189,11 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s
default:
panic("unknown parent type")
}

// We use a map to collect the addresses. The reason for using a map is because the keys in a map are unique.
// This means if we try to add the same address to the map twice, it will just overwrite the existing entry,
// effectively ignoring any duplicates. The `common.Hash` type is the type of the account addresses.
addresses := make(map[common.Hash]struct{})
// Sanity check that accounts or storage slots are never nil
for accountHash, blob := range accounts {
if blob == nil {
Expand All @@ -195,6 +202,9 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s
// Determine memory size and track the dirty writes
dl.memory += uint64(common.HashLength + len(blob))
snapshotDirtyAccountWriteMeter.Mark(int64(len(blob)))
// Add the account address to our map. We use an empty struct{} as the value in the map because it doesn't
// take any additional space. We only care about the keys in the map (the addresses), not the values.
addresses[accountHash] = struct{}{}
}
for accountHash, slots := range storage {
if slots == nil {
Expand All @@ -205,8 +215,24 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s
dl.memory += uint64(common.HashLength + len(data))
snapshotDirtyStorageWriteMeter.Mark(int64(len(data)))
}
// Add the account address to our map. We use an empty struct{} as the value in the map because it doesn't
// take any additional space. We only care about the keys in the map (the addresses), not the values.
addresses[accountHash] = struct{}{}
}
dl.memory += uint64(len(destructs) * common.HashLength)
// Create an event and add all addresses from the addresses map to event.Addresses slice
// todo: use this type from core/events.go directly(fix import cycle)
type AccountStateEvent struct {
Addresses []common.Hash
}
event := AccountStateEvent{
Addresses: make([]common.Hash, 0, len(addresses)),
}
for address := range addresses {
event.Addresses = append(event.Addresses, address)
}
// Send the event
feed.Send(event)
return dl
}

Expand Down
6 changes: 5 additions & 1 deletion eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type Backend interface {
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribeAccountStateEvent(ch chan<- core.AccountStateEvent) event.Subscription


BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
Expand Down Expand Up @@ -204,6 +206,7 @@ type EventSystem struct {
rmLogsSub event.Subscription // Subscription for removed log event
pendingLogsSub event.Subscription // Subscription for pending log event
chainSub event.Subscription // Subscription for new chain event
accountSub event.Subscription // Subscription for account state change event

// Channels
install chan *subscription // install filter for event notification
Expand All @@ -213,6 +216,7 @@ type EventSystem struct {
pendingLogsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
accountCh chan core.AccountStateEvent// Channel to receive new chain event
}

// NewEventSystem creates a new manager that listens for event on the given mux,
Expand Down Expand Up @@ -241,7 +245,7 @@ func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem {
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)

m.accountSub = m.backend.SubscribeAccountStateEvent(m.accountCh)
// Make sure none of the subscriptions are empty
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
log.Crit("Subscribe for event system failed")
Expand Down