Skip to content

Commit

Permalink
all: use timer instead of time.After in loops, to avoid memleaks (#29241
Browse files Browse the repository at this point in the history
)

time.After is equivalent to NewTimer(d).C, and does not call Stop if the timer is no longer needed. This can cause memory leaks. This change changes many such occations to use NewTimer instead, and calling Stop once the timer is no longer needed.
  • Loading branch information
songzhibin97 authored Apr 9, 2024
1 parent 1126c6d commit 0bbd88b
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 9 deletions.
6 changes: 5 additions & 1 deletion core/bloombits/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,9 @@ func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets []
// of the session, any request in-flight need to be responded to! Empty responses
// are fine though in that case.
func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
waitTimer := time.NewTimer(wait)
defer waitTimer.Stop()

for {
// Allocate a new bloom bit index to retrieve data for, stopping when done
bit, ok := s.allocateRetrieval()
Expand All @@ -604,14 +607,15 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
}
// Bit allocated, throttle a bit if we're below our batch limit
if s.pendingSections(bit) < batch {
waitTimer.Reset(wait)
select {
case <-s.quit:
// Session terminating, we can't meaningfully service, abort
s.allocateSections(bit, 0)
s.deliverSections(bit, []uint64{}, [][]byte{})
return

case <-time.After(wait):
case <-waitTimer.C:
// Throttling up, fetch whatever is available
}
}
Expand Down
6 changes: 5 additions & 1 deletion eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
localHeaders = d.readHeaderRange(tail, int(count))
log.Warn("Retrieved beacon headers from local", "from", from, "count", count)
}
fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck)
defer fsHeaderContCheckTimer.Stop()

for {
// Some beacon headers might have appeared since the last cycle, make
// sure we're always syncing to all available ones
Expand Down Expand Up @@ -381,8 +384,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
}
// State sync still going, wait a bit for new headers and retry
log.Trace("Pivot not yet committed, waiting...")
fsHeaderContCheckTimer.Reset(fsHeaderContCheck)
select {
case <-time.After(fsHeaderContCheck):
case <-fsHeaderContCheckTimer.C:
case <-d.cancelCh:
return errCanceled
}
Expand Down
12 changes: 10 additions & 2 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1276,7 +1276,10 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
var (
mode = d.getMode()
gotHeaders = false // Wait for batches of headers to process
timer = time.NewTimer(time.Second)
)
defer timer.Stop()

for {
select {
case <-d.cancelCh:
Expand Down Expand Up @@ -1397,10 +1400,11 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
if mode == FullSync || mode == SnapSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
timer.Reset(time.Second)
select {
case <-d.cancelCh:
return errCanceled
case <-time.After(time.Second):
case <-timer.C:
}
}
// Otherwise insert the headers for content retrieval
Expand Down Expand Up @@ -1567,7 +1571,10 @@ func (d *Downloader) processSnapSyncContent() error {
var (
oldPivot *fetchResult // Locked in pivot block, might change eventually
oldTail []*fetchResult // Downloaded content after the pivot
timer = time.NewTimer(time.Second)
)
defer timer.Stop()

for {
// Wait for the next batch of downloaded data to be available. If we have
// not yet reached the pivot point, wait blockingly as there's no need to
Expand Down Expand Up @@ -1650,6 +1657,7 @@ func (d *Downloader) processSnapSyncContent() error {
oldPivot = P
}
// Wait for completion, occasionally checking for pivot staleness
timer.Reset(time.Second)
select {
case <-sync.done:
if sync.err != nil {
Expand All @@ -1660,7 +1668,7 @@ func (d *Downloader) processSnapSyncContent() error {
}
oldPivot = nil

case <-time.After(time.Second):
case <-timer.C:
oldTail = afterP
continue
}
Expand Down
5 changes: 4 additions & 1 deletion ethstats/ethstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,10 +544,13 @@ func (s *Service) reportLatency(conn *connWrapper) error {
return err
}
// Wait for the pong request to arrive back
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()

select {
case <-s.pongCh:
// Pong delivered, report the latency
case <-time.After(5 * time.Second):
case <-timer.C:
// Ping timeout, abort
return errors.New("ping timed out")
}
Expand Down
5 changes: 4 additions & 1 deletion p2p/simulations/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,13 @@ func (n *ExecNode) Stop() error {
go func() {
waitErr <- n.Cmd.Wait()
}()
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()

select {
case err := <-waitErr:
return err
case <-time.After(5 * time.Second):
case <-timer.C:
return n.Cmd.Process.Kill()
}
}
Expand Down
10 changes: 8 additions & 2 deletions p2p/simulations/mocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) {
if err != nil {
panic("Could not startup node network for mocker")
}
tick := time.NewTicker(10 * time.Second)
var (
tick = time.NewTicker(10 * time.Second)
timer = time.NewTimer(3 * time.Second)
)
defer tick.Stop()
defer timer.Stop()

for {
select {
case <-quit:
Expand All @@ -80,11 +85,12 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) {
return
}

timer.Reset(3 * time.Second)
select {
case <-quit:
log.Info("Terminating simulation loop")
return
case <-time.After(3 * time.Second):
case <-timer.C:
}

log.Debug("starting node", "id", id)
Expand Down
5 changes: 4 additions & 1 deletion p2p/simulations/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,11 +1028,14 @@ func (net *Network) Load(snap *Snapshot) error {
}
}

timeout := time.NewTimer(snapshotLoadTimeout)
defer timeout.Stop()

select {
// Wait until all connections from the snapshot are established.
case <-allConnected:
// Make sure that we do not wait forever.
case <-time.After(snapshotLoadTimeout):
case <-timeout.C:
return errors.New("snapshot connections not established")
}
return nil
Expand Down

0 comments on commit 0bbd88b

Please sign in to comment.