From cf133fed7d2f90cf15b9e4d002dc3ad5d61eee74 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Fri, 4 May 2018 20:12:32 +0800 Subject: [PATCH 1/4] eth/filter: check nil pointer when unsubscribe --- eth/filters/filter_system.go | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index f8097c7b95ea..7a7eb76c20da 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -428,12 +428,24 @@ func (es *EventSystem) eventLoop() { chainEvSub = es.backend.SubscribeChainEvent(chainEvCh) ) - // Unsubscribe all events - defer sub.Unsubscribe() - defer txSub.Unsubscribe() - defer rmLogsSub.Unsubscribe() - defer logsSub.Unsubscribe() - defer chainEvSub.Unsubscribe() + defer func() { + // Unsubscribe all events + if sub != nil { + sub.Unsubscribe() + } + if txSub != nil { + txSub.Unsubscribe() + } + if rmLogsSub != nil { + rmLogsSub.Unsubscribe() + } + if logsSub != nil { + logsSub.Unsubscribe() + } + if chainEvSub != nil { + chainEvSub.Unsubscribe() + } + }() for i := UnknownSubscription; i < LastIndexSubscription; i++ { index[i] = make(map[rpc.ID]*subscription) From 96a79b96f2535a49379c23ad60e5225dd84c887f Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Mon, 7 May 2018 18:37:55 +0800 Subject: [PATCH 2/4] eth/filters, accounts, rpc: abort system if subscribe failed --- eth/filters/filter_system.go | 110 ++++++++++++++++++++--------------- event/event.go | 4 ++ 2 files changed, 66 insertions(+), 48 deletions(-) diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 7a7eb76c20da..ff319df41e74 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -91,8 +91,21 @@ type EventSystem struct { backend Backend lightMode bool lastHead *types.Header - install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification + + // Subscriptions + txSub event.Subscription // Subscription for new transaction event + logsSub event.Subscription // Subscription for new log event + rmLogsSub event.Subscription // Subscription for removed log event + chainSub event.Subscription // Subscription for new chain event + pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event + + // Channels + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txCh chan core.TxPreEvent // Channel to receive new transaction event + logsCh chan []*types.Log // Channel to receive new log event + rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event + chainCh chan core.ChainEvent // Channel to receive new chain event } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -108,10 +121,36 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), + txCh: make(chan core.TxPreEvent, txChanSize), + logsCh: make(chan []*types.Log, logsChanSize), + rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), + chainCh: make(chan core.ChainEvent, chainEvChanSize), } - go m.eventLoop() + // Subscribe events + m.txSub = m.backend.SubscribeTxPreEvent(m.txCh) + if m.txSub == nil { + return nil + } + m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) + if m.logsSub == nil { + return nil + } + m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) + if m.rmLogsSub == nil { + return nil + } + m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) + if m.chainSub == nil { + return nil + } + // TODO(rjl493456442): use feed to subscribe pending log event + m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) + if m.pendingLogSub.Closed() { + return nil + } + go m.eventLoop() return m } @@ -411,40 +450,15 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. // eventLoop (un)installs filters and processes mux events. func (es *EventSystem) eventLoop() { - var ( - index = make(filterIndex) - sub = es.mux.Subscribe(core.PendingLogsEvent{}) - // Subscribe TxPreEvent form txpool - txCh = make(chan core.TxPreEvent, txChanSize) - txSub = es.backend.SubscribeTxPreEvent(txCh) - // Subscribe RemovedLogsEvent - rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize) - rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh) - // Subscribe []*types.Log - logsCh = make(chan []*types.Log, logsChanSize) - logsSub = es.backend.SubscribeLogsEvent(logsCh) - // Subscribe ChainEvent - chainEvCh = make(chan core.ChainEvent, chainEvChanSize) - chainEvSub = es.backend.SubscribeChainEvent(chainEvCh) - ) + var index = make(filterIndex) defer func() { // Unsubscribe all events - if sub != nil { - sub.Unsubscribe() - } - if txSub != nil { - txSub.Unsubscribe() - } - if rmLogsSub != nil { - rmLogsSub.Unsubscribe() - } - if logsSub != nil { - logsSub.Unsubscribe() - } - if chainEvSub != nil { - chainEvSub.Unsubscribe() - } + es.pendingLogSub.Unsubscribe() + es.txSub.Unsubscribe() + es.logsSub.Unsubscribe() + es.rmLogsSub.Unsubscribe() + es.chainSub.Unsubscribe() }() for i := UnknownSubscription; i < LastIndexSubscription; i++ { @@ -453,20 +467,19 @@ func (es *EventSystem) eventLoop() { for { select { - case ev, active := <-sub.Chan(): - if !active { // system stopped - return - } - es.broadcast(index, ev) - // Handle subscribed events - case ev := <-txCh: + case ev := <-es.txCh: es.broadcast(index, ev) - case ev := <-rmLogsCh: + case ev := <-es.logsCh: es.broadcast(index, ev) - case ev := <-logsCh: + case ev := <-es.rmLogsCh: es.broadcast(index, ev) - case ev := <-chainEvCh: + case ev := <-es.chainCh: + es.broadcast(index, ev) + case ev, active := <-es.pendingLogSub.Chan(): + if !active { // system stopped + return + } es.broadcast(index, ev) case f := <-es.install: @@ -478,6 +491,7 @@ func (es *EventSystem) eventLoop() { index[f.typ][f.id] = f } close(f.installed) + case f := <-es.uninstall: if f.typ == MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions @@ -489,13 +503,13 @@ func (es *EventSystem) eventLoop() { close(f.err) // System stopped - case <-txSub.Err(): + case <-es.txSub.Err(): return - case <-rmLogsSub.Err(): + case <-es.logsSub.Err(): return - case <-logsSub.Err(): + case <-es.rmLogsSub.Err(): return - case <-chainEvSub.Err(): + case <-es.chainSub.Err(): return } } diff --git a/event/event.go b/event/event.go index 20d20d1f57aa..c933f1e84b44 100644 --- a/event/event.go +++ b/event/event.go @@ -180,6 +180,10 @@ func (s *TypeMuxSubscription) Unsubscribe() { s.closewait() } +func (s *TypeMuxSubscription) Closed() bool { + return s.closed +} + func (s *TypeMuxSubscription) closewait() { s.closeMu.Lock() defer s.closeMu.Unlock() From f11ac1ecccfb4666385614777f76aa297d770fab Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Mon, 7 May 2018 21:39:17 +0800 Subject: [PATCH 3/4] eth/filter: add crit log before exit --- eth/filters/filter_system.go | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index ff319df41e74..05bc55959dc2 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -30,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" ) @@ -129,25 +130,16 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS // Subscribe events m.txSub = m.backend.SubscribeTxPreEvent(m.txCh) - if m.txSub == nil { - return nil - } m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) - if m.logsSub == nil { - return nil - } m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) - if m.rmLogsSub == nil { - return nil - } m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) - if m.chainSub == nil { - return nil - } // TODO(rjl493456442): use feed to subscribe pending log event m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) - if m.pendingLogSub.Closed() { - return nil + + // Make sure all the subscriptions are not empty + if m.txSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || + m.pendingLogSub.Closed() { + log.Crit("Subscribe for event system failed") } go m.eventLoop() From dec5843fedb426090f08d7b6489558af87879377 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Wed, 9 May 2018 15:57:32 +0800 Subject: [PATCH 4/4] eth/filter, event: minor fixes --- eth/filters/filter_system.go | 7 +++---- event/event.go | 2 ++ 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 05bc55959dc2..16b452aa32f2 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -136,7 +136,7 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS // TODO(rjl493456442): use feed to subscribe pending log event m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) - // Make sure all the subscriptions are not empty + // Make sure none of the subscriptions are empty if m.txSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogSub.Closed() { log.Crit("Subscribe for event system failed") @@ -442,10 +442,8 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. // eventLoop (un)installs filters and processes mux events. func (es *EventSystem) eventLoop() { - var index = make(filterIndex) - + // Ensure all subscriptions get cleaned up defer func() { - // Unsubscribe all events es.pendingLogSub.Unsubscribe() es.txSub.Unsubscribe() es.logsSub.Unsubscribe() @@ -453,6 +451,7 @@ func (es *EventSystem) eventLoop() { es.chainSub.Unsubscribe() }() + index := make(filterIndex) for i := UnknownSubscription; i < LastIndexSubscription; i++ { index[i] = make(map[rpc.ID]*subscription) } diff --git a/event/event.go b/event/event.go index c933f1e84b44..423278731483 100644 --- a/event/event.go +++ b/event/event.go @@ -181,6 +181,8 @@ func (s *TypeMuxSubscription) Unsubscribe() { } func (s *TypeMuxSubscription) Closed() bool { + s.closeMu.Lock() + defer s.closeMu.Unlock() return s.closed }