Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

all: use timer instead of time.After in loops, to avoid memleaks #29241

Merged
merged 9 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/bloombits/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,9 @@ func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets []
// of the session, any request in-flight need to be responded to! Empty responses
// are fine though in that case.
func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
waitTimer := time.NewTimer(wait)
defer waitTimer.Stop()

for {
// Allocate a new bloom bit index to retrieve data for, stopping when done
bit, ok := s.allocateRetrieval()
Expand All @@ -604,14 +607,15 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
}
// Bit allocated, throttle a bit if we're below our batch limit
if s.pendingSections(bit) < batch {
waitTimer.Reset(wait)
select {
case <-s.quit:
// Session terminating, we can't meaningfully service, abort
s.allocateSections(bit, 0)
s.deliverSections(bit, []uint64{}, [][]byte{})
return

case <-time.After(wait):
case <-waitTimer.C:
// Throttling up, fetch whatever is available
}
}
Expand Down
6 changes: 5 additions & 1 deletion eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
localHeaders = d.readHeaderRange(tail, int(count))
log.Warn("Retrieved beacon headers from local", "from", from, "count", count)
}
fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck)
defer fsHeaderContCheckTimer.Stop()

for {
// Some beacon headers might have appeared since the last cycle, make
// sure we're always syncing to all available ones
Expand Down Expand Up @@ -381,8 +384,9 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
}
// State sync still going, wait a bit for new headers and retry
log.Trace("Pivot not yet committed, waiting...")
fsHeaderContCheckTimer.Reset(fsHeaderContCheck)
select {
case <-time.After(fsHeaderContCheck):
case <-fsHeaderContCheckTimer.C:
case <-d.cancelCh:
return errCanceled
}
Expand Down
29 changes: 21 additions & 8 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,11 +1015,15 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e

// Start pulling the header chain skeleton until all is done
var (
skeleton = true // Skeleton assembly phase or finishing up
pivoting = false // Whether the next request is pivot verification
ancestor = from
mode = d.getMode()
skeleton = true // Skeleton assembly phase or finishing up
pivoting = false // Whether the next request is pivot verification
ancestor = from
mode = d.getMode()
fsHeaderContCheckTimer = time.NewTimer(fsHeaderContCheck)
)

lightclient marked this conversation as resolved.
Show resolved Hide resolved
defer fsHeaderContCheckTimer.Stop()

for {
// Pull the next batch of headers, it either:
// - Pivot check to see if the chain moved too far
Expand Down Expand Up @@ -1124,8 +1128,9 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
// Don't abort header fetches while the pivot is downloading
if !d.committed.Load() && pivot <= from {
p.log.Debug("No headers, waiting for pivot commit")
fsHeaderContCheckTimer.Reset(fsHeaderContCheck)
select {
case <-time.After(fsHeaderContCheck):
case <-fsHeaderContCheckTimer.C:
continue
case <-d.cancelCh:
return errCanceled
Expand Down Expand Up @@ -1194,9 +1199,10 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
// sleep a bit and retry. Take care with headers already consumed during
// skeleton filling
if len(headers) == 0 && !progressed {
fsHeaderContCheckTimer.Reset(fsHeaderContCheck)
p.log.Trace("All headers delayed, waiting")
select {
case <-time.After(fsHeaderContCheck):
case <-fsHeaderContCheckTimer.C:
continue
case <-d.cancelCh:
return errCanceled
Expand Down Expand Up @@ -1276,7 +1282,10 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
var (
mode = d.getMode()
gotHeaders = false // Wait for batches of headers to process
timer = time.NewTimer(time.Second)
)
defer timer.Stop()

for {
select {
case <-d.cancelCh:
Expand Down Expand Up @@ -1397,10 +1406,11 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
if mode == FullSync || mode == SnapSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
timer.Reset(time.Second)
select {
case <-d.cancelCh:
return errCanceled
case <-time.After(time.Second):
case <-timer.C:
}
}
// Otherwise insert the headers for content retrieval
Expand Down Expand Up @@ -1567,7 +1577,9 @@ func (d *Downloader) processSnapSyncContent() error {
var (
oldPivot *fetchResult // Locked in pivot block, might change eventually
oldTail []*fetchResult // Downloaded content after the pivot
timer = time.NewTimer(time.Second)
)
defer timer.Stop()
lightclient marked this conversation as resolved.
Show resolved Hide resolved
for {
// Wait for the next batch of downloaded data to be available. If we have
// not yet reached the pivot point, wait blockingly as there's no need to
Expand Down Expand Up @@ -1650,6 +1662,7 @@ func (d *Downloader) processSnapSyncContent() error {
oldPivot = P
}
// Wait for completion, occasionally checking for pivot staleness
timer.Reset(time.Second)
select {
case <-sync.done:
if sync.err != nil {
Expand All @@ -1660,7 +1673,7 @@ func (d *Downloader) processSnapSyncContent() error {
}
oldPivot = nil

case <-time.After(time.Second):
case <-timer.C:
oldTail = afterP
continue
}
Expand Down
5 changes: 4 additions & 1 deletion ethstats/ethstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,10 +544,13 @@ func (s *Service) reportLatency(conn *connWrapper) error {
return err
}
// Wait for the pong request to arrive back
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()

select {
case <-s.pongCh:
// Pong delivered, report the latency
case <-time.After(5 * time.Second):
case <-timer.C:
// Ping timeout, abort
return errors.New("ping timed out")
}
Expand Down
5 changes: 4 additions & 1 deletion p2p/simulations/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,13 @@ func (n *ExecNode) Stop() error {
go func() {
waitErr <- n.Cmd.Wait()
}()
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()

select {
case err := <-waitErr:
return err
case <-time.After(5 * time.Second):
case <-timer.C:
return n.Cmd.Process.Kill()
}
}
Expand Down
10 changes: 8 additions & 2 deletions p2p/simulations/mocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) {
if err != nil {
panic("Could not startup node network for mocker")
}
tick := time.NewTicker(10 * time.Second)
var (
tick = time.NewTicker(10 * time.Second)
timer = time.NewTimer(3 * time.Second)
)
defer tick.Stop()
defer timer.Stop()

for {
select {
case <-quit:
Expand All @@ -80,11 +85,12 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) {
return
}

timer.Reset(3 * time.Second)
select {
case <-quit:
log.Info("Terminating simulation loop")
return
case <-time.After(3 * time.Second):
case <-timer.C:
}

log.Debug("starting node", "id", id)
Expand Down
5 changes: 4 additions & 1 deletion p2p/simulations/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,11 +1028,14 @@ func (net *Network) Load(snap *Snapshot) error {
}
}

timeout := time.NewTimer(snapshotLoadTimeout)
defer timeout.Stop()

select {
// Wait until all connections from the snapshot are established.
case <-allConnected:
// Make sure that we do not wait forever.
case <-time.After(snapshotLoadTimeout):
case <-timeout.C:
return errors.New("snapshot connections not established")
}
return nil
Expand Down
Loading