Skip to content

Commit

Permalink
fix(core/listener): wait listener to shutdown before exit (#3775)
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss authored Sep 25, 2024
1 parent 7eca603 commit 98a127b
Showing 1 changed file with 17 additions and 3 deletions.
20 changes: 17 additions & 3 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Listener struct {

listenerTimeout time.Duration
cancel context.CancelFunc
closed chan struct{}
}

func NewListener(
Expand Down Expand Up @@ -97,6 +98,7 @@ func (cl *Listener) Start(context.Context) error {

ctx, cancel := context.WithCancel(context.Background())
cl.cancel = cancel
cl.closed = make(chan struct{})

sub, err := cl.fetcher.SubscribeNewBlockEvent(ctx)
if err != nil {
Expand All @@ -114,13 +116,25 @@ func (cl *Listener) Stop(ctx context.Context) error {
}

cl.cancel()
cl.cancel = nil
return cl.metrics.Close()
select {
case <-cl.closed:
cl.cancel = nil
cl.closed = nil
case <-ctx.Done():
return ctx.Err()
}

err = cl.metrics.Close()
if err != nil {
log.Warnw("listener: closing metrics", "err", err)
}
return nil
}

// runSubscriber runs a subscriber to receive event data of new signed blocks. It will attempt to
// resubscribe in case error happens during listening of subscription
func (cl *Listener) runSubscriber(ctx context.Context, sub <-chan types.EventDataSignedBlock) {
defer close(cl.closed)
for {
err := cl.listen(ctx, sub)
if ctx.Err() != nil {
Expand All @@ -129,7 +143,7 @@ func (cl *Listener) runSubscriber(ctx context.Context, sub <-chan types.EventDat
}
if errors.Is(err, errInvalidSubscription) {
// stop node if there is a critical issue with the block subscription
log.Fatalf("listener: %v", err)
log.Fatalf("listener: %v", err) //nolint:gocritic
}

log.Warnw("listener: subscriber error, resubscribing...", "err", err)
Expand Down

0 comments on commit 98a127b

Please sign in to comment.