Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/filter: check nil pointer when unsubscribe #16682

Merged
merged 4 commits into from
May 9, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 66 additions & 40 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,21 @@ type EventSystem struct {
backend Backend
lightMode bool
lastHead *types.Header
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification

// Subscriptions
txSub event.Subscription // Subscription for new transaction event
logsSub event.Subscription // Subscription for new log event
rmLogsSub event.Subscription // Subscription for removed log event
chainSub event.Subscription // Subscription for new chain event
pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event

// Channels
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txCh chan core.TxPreEvent // Channel to receive new transaction event
logsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
}

// NewEventSystem creates a new manager that listens for event on the given mux,
Expand All @@ -108,10 +121,36 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txCh: make(chan core.TxPreEvent, txChanSize),
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
}

go m.eventLoop()
// Subscribe events
m.txSub = m.backend.SubscribeTxPreEvent(m.txCh)
if m.txSub == nil {
return nil
}
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
if m.logsSub == nil {
return nil
}
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
if m.rmLogsSub == nil {
return nil
}
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
if m.chainSub == nil {
return nil
}
// TODO(rjl493456442): use feed to subscribe pending log event
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
if m.pendingLogSub.Closed() {
return nil
}
Copy link
Member

@karalabe karalabe May 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return nil-s here are problematic, because they promote bad use. It says to the caller that they should check whether the return is nil, and handle it as an error. If we want to handle error cases, we need to return an error. If we don't want to handle error cases, we need to abort.

Returning nil is furthermore problematic because it doesn't clean up previous successful subscriptions.

In theory these code paths cannot get called, at least if the initialization code is correct. As such we should either not care about them becoming nil (and panic if they do and find the culprit further up the stack), or if we want to handle things a bit more gracefully, we could do a log.Crit to abort.


go m.eventLoop()
return m
}

Expand Down Expand Up @@ -411,50 +450,36 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.

// eventLoop (un)installs filters and processes mux events.
func (es *EventSystem) eventLoop() {
var (
index = make(filterIndex)
sub = es.mux.Subscribe(core.PendingLogsEvent{})
// Subscribe TxPreEvent form txpool
txCh = make(chan core.TxPreEvent, txChanSize)
txSub = es.backend.SubscribeTxPreEvent(txCh)
// Subscribe RemovedLogsEvent
rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize)
rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh)
// Subscribe []*types.Log
logsCh = make(chan []*types.Log, logsChanSize)
logsSub = es.backend.SubscribeLogsEvent(logsCh)
// Subscribe ChainEvent
chainEvCh = make(chan core.ChainEvent, chainEvChanSize)
chainEvSub = es.backend.SubscribeChainEvent(chainEvCh)
)

// Unsubscribe all events
defer sub.Unsubscribe()
defer txSub.Unsubscribe()
defer rmLogsSub.Unsubscribe()
defer logsSub.Unsubscribe()
defer chainEvSub.Unsubscribe()
var index = make(filterIndex)

defer func() {
// Unsubscribe all events
es.pendingLogSub.Unsubscribe()
es.txSub.Unsubscribe()
es.logsSub.Unsubscribe()
es.rmLogsSub.Unsubscribe()
es.chainSub.Unsubscribe()
}()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestions:

  • Put the // Unsubscribe all events commen outside of the defer, on the line before it
  • Instead of var index = make(filterIndex), use index := make(filterIndex)
  • Move the index creation down below the defers, to keep things clean and everywhere where they are used.
func (es *EventSystem) eventLoop() {
	// Ensure all subscriptions get cleaned up
	defer func() {
		es.pendingLogSub.Unsubscribe()
		es.txSub.Unsubscribe()
		es.logsSub.Unsubscribe()
		es.rmLogsSub.Unsubscribe()
		es.chainSub.Unsubscribe()
	}()

	index := make(filterIndex)
	for i := UnknownSubscription; i < LastIndexSubscription; i++ {
		index[i] = make(map[rpc.ID]*subscription)
	}


for i := UnknownSubscription; i < LastIndexSubscription; i++ {
index[i] = make(map[rpc.ID]*subscription)
}

for {
select {
case ev, active := <-sub.Chan():
if !active { // system stopped
return
}
es.broadcast(index, ev)

// Handle subscribed events
case ev := <-txCh:
case ev := <-es.txCh:
es.broadcast(index, ev)
case ev := <-rmLogsCh:
case ev := <-es.logsCh:
es.broadcast(index, ev)
case ev := <-logsCh:
case ev := <-es.rmLogsCh:
es.broadcast(index, ev)
case ev := <-chainEvCh:
case ev := <-es.chainCh:
es.broadcast(index, ev)
case ev, active := <-es.pendingLogSub.Chan():
if !active { // system stopped
return
}
es.broadcast(index, ev)

case f := <-es.install:
Expand All @@ -466,6 +491,7 @@ func (es *EventSystem) eventLoop() {
index[f.typ][f.id] = f
}
close(f.installed)

case f := <-es.uninstall:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
Expand All @@ -477,13 +503,13 @@ func (es *EventSystem) eventLoop() {
close(f.err)

// System stopped
case <-txSub.Err():
case <-es.txSub.Err():
return
case <-rmLogsSub.Err():
case <-es.logsSub.Err():
return
case <-logsSub.Err():
case <-es.rmLogsSub.Err():
return
case <-chainEvSub.Err():
case <-es.chainSub.Err():
return
}
}
Expand Down
4 changes: 4 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ func (s *TypeMuxSubscription) Unsubscribe() {
s.closewait()
}

func (s *TypeMuxSubscription) Closed() bool {
return s.closed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is racey. You need to add a read lock for s.closeMu.

}

func (s *TypeMuxSubscription) closewait() {
s.closeMu.Lock()
defer s.closeMu.Unlock()
Expand Down