Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/filter: check nil pointer when unsubscribe #16682

Merged
merged 4 commits into from
May 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 58 additions & 41 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)

Expand Down Expand Up @@ -91,8 +92,21 @@ type EventSystem struct {
backend Backend
lightMode bool
lastHead *types.Header
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification

// Subscriptions
txSub event.Subscription // Subscription for new transaction event
logsSub event.Subscription // Subscription for new log event
rmLogsSub event.Subscription // Subscription for removed log event
chainSub event.Subscription // Subscription for new chain event
pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event

// Channels
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txCh chan core.TxPreEvent // Channel to receive new transaction event
logsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
}

// NewEventSystem creates a new manager that listens for event on the given mux,
Expand All @@ -108,10 +122,27 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txCh: make(chan core.TxPreEvent, txChanSize),
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
}

go m.eventLoop()
// Subscribe events
m.txSub = m.backend.SubscribeTxPreEvent(m.txCh)
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
// TODO(rjl493456442): use feed to subscribe pending log event
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})

// Make sure none of the subscriptions are empty
if m.txSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
m.pendingLogSub.Closed() {
log.Crit("Subscribe for event system failed")
}

go m.eventLoop()
return m
}

Expand Down Expand Up @@ -411,50 +442,35 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.

// eventLoop (un)installs filters and processes mux events.
func (es *EventSystem) eventLoop() {
var (
index = make(filterIndex)
sub = es.mux.Subscribe(core.PendingLogsEvent{})
// Subscribe TxPreEvent form txpool
txCh = make(chan core.TxPreEvent, txChanSize)
txSub = es.backend.SubscribeTxPreEvent(txCh)
// Subscribe RemovedLogsEvent
rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize)
rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh)
// Subscribe []*types.Log
logsCh = make(chan []*types.Log, logsChanSize)
logsSub = es.backend.SubscribeLogsEvent(logsCh)
// Subscribe ChainEvent
chainEvCh = make(chan core.ChainEvent, chainEvChanSize)
chainEvSub = es.backend.SubscribeChainEvent(chainEvCh)
)

// Unsubscribe all events
defer sub.Unsubscribe()
defer txSub.Unsubscribe()
defer rmLogsSub.Unsubscribe()
defer logsSub.Unsubscribe()
defer chainEvSub.Unsubscribe()

// Ensure all subscriptions get cleaned up
defer func() {
es.pendingLogSub.Unsubscribe()
es.txSub.Unsubscribe()
es.logsSub.Unsubscribe()
es.rmLogsSub.Unsubscribe()
es.chainSub.Unsubscribe()
}()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestions:

  • Put the // Unsubscribe all events commen outside of the defer, on the line before it
  • Instead of var index = make(filterIndex), use index := make(filterIndex)
  • Move the index creation down below the defers, to keep things clean and everywhere where they are used.
func (es *EventSystem) eventLoop() {
	// Ensure all subscriptions get cleaned up
	defer func() {
		es.pendingLogSub.Unsubscribe()
		es.txSub.Unsubscribe()
		es.logsSub.Unsubscribe()
		es.rmLogsSub.Unsubscribe()
		es.chainSub.Unsubscribe()
	}()

	index := make(filterIndex)
	for i := UnknownSubscription; i < LastIndexSubscription; i++ {
		index[i] = make(map[rpc.ID]*subscription)
	}


index := make(filterIndex)
for i := UnknownSubscription; i < LastIndexSubscription; i++ {
index[i] = make(map[rpc.ID]*subscription)
}

for {
select {
case ev, active := <-sub.Chan():
if !active { // system stopped
return
}
es.broadcast(index, ev)

// Handle subscribed events
case ev := <-txCh:
case ev := <-es.txCh:
es.broadcast(index, ev)
case ev := <-rmLogsCh:
case ev := <-es.logsCh:
es.broadcast(index, ev)
case ev := <-logsCh:
case ev := <-es.rmLogsCh:
es.broadcast(index, ev)
case ev := <-chainEvCh:
case ev := <-es.chainCh:
es.broadcast(index, ev)
case ev, active := <-es.pendingLogSub.Chan():
if !active { // system stopped
return
}
es.broadcast(index, ev)

case f := <-es.install:
Expand All @@ -466,6 +482,7 @@ func (es *EventSystem) eventLoop() {
index[f.typ][f.id] = f
}
close(f.installed)

case f := <-es.uninstall:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
Expand All @@ -477,13 +494,13 @@ func (es *EventSystem) eventLoop() {
close(f.err)

// System stopped
case <-txSub.Err():
case <-es.txSub.Err():
return
case <-rmLogsSub.Err():
case <-es.logsSub.Err():
return
case <-logsSub.Err():
case <-es.rmLogsSub.Err():
return
case <-chainEvSub.Err():
case <-es.chainSub.Err():
return
}
}
Expand Down
6 changes: 6 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ func (s *TypeMuxSubscription) Unsubscribe() {
s.closewait()
}

func (s *TypeMuxSubscription) Closed() bool {
s.closeMu.Lock()
defer s.closeMu.Unlock()
return s.closed
}

func (s *TypeMuxSubscription) closewait() {
s.closeMu.Lock()
defer s.closeMu.Unlock()
Expand Down