Skip to content

Commit

Permalink
go-algorand 3.23.0-beta Release PR (#5950)
Browse files Browse the repository at this point in the history
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Gary <982483+gmalouf@users.noreply.github.com>
Co-authored-by: John Jannotti <jannotti@gmail.com>
Co-authored-by: nullun <nullun@users.noreply.github.com>
Co-authored-by: John Lee <64482439+algojohnlee@users.noreply.github.com>
Co-authored-by: Halimao <1065621723@qq.com>
Co-authored-by: John Lee <john.lee@algorand.com>
Co-authored-by: Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com>
Co-authored-by: cce <51567+cce@users.noreply.github.com>
Co-authored-by: ohill <145173879+ohill@users.noreply.github.com>
  • Loading branch information
11 people authored Mar 8, 2024
1 parent eb702d4 commit 58fea01
Show file tree
Hide file tree
Showing 40 changed files with 1,290 additions and 638 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM ubuntu:20.04 as builder

ARG GO_VERSION="1.20.7"
ARG GO_VERSION="1.20.14"

ARG CHANNEL
ARG URL
Expand Down
2 changes: 1 addition & 1 deletion agreement/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ type ensureAction struct {
Payload proposal
// the certificate proving commitment
Certificate Certificate
// The time that the winning proposal-vote was validated for round credentialRoundLag back from the current one
// The time that the lowest proposal-vote was validated for `credentialRoundLag` rounds ago (R-credentialRoundLag). This may not have been the winning proposal, since we wait `credentialRoundLag` rounds to see if there was a better one.
voteValidatedAt time.Duration
// The dynamic filter timeout calculated for this round, even if not enabled, for reporting to telemetry.
dynamicFilterTimeout time.Duration
Expand Down
2 changes: 1 addition & 1 deletion buildnumber.dat
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1
0
89 changes: 36 additions & 53 deletions catchup/catchpointService.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package catchup

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -69,7 +70,7 @@ type CatchpointCatchupStats struct {
type CatchpointCatchupService struct {
// stats is the statistics object, updated async while downloading the ledger
stats CatchpointCatchupStats
// statsMu synchronizes access to stats, as we could attempt to update it while querying for it's current state
// statsMu synchronizes access to stats, as we could attempt to update it while querying for its current state
statsMu deadlock.Mutex
node CatchpointCatchupNodeServices
// ctx is the node cancellation context, used when the node is being stopped.
Expand Down Expand Up @@ -98,7 +99,7 @@ type CatchpointCatchupService struct {
abortCtx context.Context
abortCtxFunc context.CancelFunc
// blocksDownloadPeerSelector is the peer selector used for downloading blocks.
blocksDownloadPeerSelector *peerSelector
blocksDownloadPeerSelector peerSelector
}

// MakeResumedCatchpointCatchupService creates a catchpoint catchup service for a node that is already in catchpoint catchup mode
Expand Down Expand Up @@ -280,51 +281,50 @@ func (cs *CatchpointCatchupService) processStageInactive() (err error) {
}

// processStageLedgerDownload is the second catchpoint catchup stage. It downloads the ledger.
func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
func (cs *CatchpointCatchupService) processStageLedgerDownload() error {

Check warning on line 284 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L284

Added line #L284 was not covered by tests
cs.statsMu.Lock()
label := cs.stats.CatchpointLabel
cs.statsMu.Unlock()
round, _, err0 := ledgercore.ParseCatchpointLabel(label)
round, _, err := ledgercore.ParseCatchpointLabel(label)

Check warning on line 288 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L288

Added line #L288 was not covered by tests

if err0 != nil {
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to parse label : %v", err0))
if err != nil {
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to parse label : %v", err))

Check warning on line 291 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L290-L291

Added lines #L290 - L291 were not covered by tests
}

// download balances file.
peerSelector := cs.makeCatchpointPeerSelector()
ledgerFetcher := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config)
lf := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config)

Check warning on line 295 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L295

Added line #L295 was not covered by tests
attemptsCount := 0

for {
attemptsCount++

err = cs.ledgerAccessor.ResetStagingBalances(cs.ctx, true)
if err != nil {
err0 := cs.ledgerAccessor.ResetStagingBalances(cs.ctx, true)
if err0 != nil {

Check warning on line 302 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L301-L302

Added lines #L301 - L302 were not covered by tests
if cs.ctx.Err() != nil {
return cs.stopOrAbort()
}
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to reset staging balances : %v", err))
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to reset staging balances : %v", err0))

Check warning on line 306 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L306

Added line #L306 was not covered by tests
}
psp, err := peerSelector.getNextPeer()
if err != nil {
err = fmt.Errorf("processStageLedgerDownload: catchpoint catchup was unable to obtain a list of peers to retrieve the catchpoint file from")
return cs.abort(err)
psp, err0 := cs.blocksDownloadPeerSelector.getNextPeer()
if err0 != nil {
err0 = fmt.Errorf("processStageLedgerDownload: catchpoint catchup was unable to obtain a list of peers to retrieve the catchpoint file from")
return cs.abort(err0)

Check warning on line 311 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L308-L311

Added lines #L308 - L311 were not covered by tests
}
peer := psp.Peer
start := time.Now()
err = ledgerFetcher.downloadLedger(cs.ctx, peer, round)
if err == nil {
err0 = lf.downloadLedger(cs.ctx, peer, round)
if err0 == nil {

Check warning on line 316 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L315-L316

Added lines #L315 - L316 were not covered by tests
cs.log.Infof("ledger downloaded in %d seconds", time.Since(start)/time.Second)
start = time.Now()
err = cs.ledgerAccessor.BuildMerkleTrie(cs.ctx, cs.updateVerifiedCounts)
if err == nil {
err0 = cs.ledgerAccessor.BuildMerkleTrie(cs.ctx, cs.updateVerifiedCounts)
if err0 == nil {

Check warning on line 320 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L319-L320

Added lines #L319 - L320 were not covered by tests
cs.log.Infof("built merkle trie in %d seconds", time.Since(start)/time.Second)
break
}
// failed to build the merkle trie for the above catchpoint file.
peerSelector.rankPeer(psp, peerRankInvalidDownload)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload)

Check warning on line 325 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L325

Added line #L325 was not covered by tests
} else {
peerSelector.rankPeer(psp, peerRankDownloadFailed)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankDownloadFailed)

Check warning on line 327 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L327

Added line #L327 was not covered by tests
}

// instead of testing for err == cs.ctx.Err() , we'll check on the context itself.
Expand All @@ -335,10 +335,10 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
}

if attemptsCount >= cs.config.CatchupLedgerDownloadRetryAttempts {
err = fmt.Errorf("processStageLedgerDownload: catchpoint catchup exceeded number of attempts to retrieve ledger")
return cs.abort(err)
err0 = fmt.Errorf("processStageLedgerDownload: catchpoint catchup exceeded number of attempts to retrieve ledger")
return cs.abort(err0)

Check warning on line 339 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L338-L339

Added lines #L338 - L339 were not covered by tests
}
cs.log.Warnf("unable to download ledger : %v", err)
cs.log.Warnf("unable to download ledger : %v", err0)

Check warning on line 341 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L341

Added line #L341 was not covered by tests
}

err = cs.updateStage(ledger.CatchpointCatchupStateLatestBlockDownload)
Expand Down Expand Up @@ -506,14 +506,14 @@ func lookbackForStateproofsSupport(topBlock *bookkeeping.Block) uint64 {
return uint64(topBlock.Round().SubSaturate(lowestStateProofRound))
}

// processStageBlocksDownload is the fourth catchpoint catchup stage. It downloads all the reminder of the blocks, verifying each one of them against it's predecessor.
// processStageBlocksDownload is the fourth catchpoint catchup stage. It downloads all the reminder of the blocks, verifying each one of them against its predecessor.
func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
topBlock, err := cs.ledgerAccessor.EnsureFirstBlock(cs.ctx)
if err != nil {
return cs.abort(fmt.Errorf("processStageBlocksDownload failed, unable to ensure first block : %v", err))
}

// pick the lookback with the greater of
// pick the lookback with the greatest of
// either (MaxTxnLife+DeeperBlockHeaderHistory+CatchpointLookback) or MaxBalLookback
// Explanation:
// 1. catchpoint snapshots accounts at round X-CatchpointLookback
Expand All @@ -531,13 +531,13 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
}

// in case the effective lookback is going before our rounds count, trim it there.
// ( a catchpoint is generated starting round MaxBalLookback, and this is a possible in any round in the range of MaxBalLookback..MaxTxnLife)
// ( a catchpoint is generated starting round MaxBalLookback, and this is a possible in any round in the range of MaxBalLookback...MaxTxnLife)
if lookback >= uint64(topBlock.Round()) {
lookback = uint64(topBlock.Round() - 1)
}

cs.statsMu.Lock()
cs.stats.TotalBlocks = uint64(lookback)
cs.stats.TotalBlocks = lookback
cs.stats.AcquiredBlocks = 0
cs.stats.VerifiedBlocks = 0
cs.statsMu.Unlock()
Expand All @@ -558,8 +558,9 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
blk = &ledgerBlock
cert = &ledgerCert
} else {
switch err0.(type) {
case ledgercore.ErrNoEntry:
var errNoEntry ledgercore.ErrNoEntry
switch {
case errors.As(err0, &errNoEntry):

Check warning on line 563 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L561-L563

Added lines #L561 - L563 were not covered by tests
// this is expected, ignore this one.
default:
cs.log.Warnf("processStageBlocksDownload encountered the following error when attempting to retrieve the block for round %d : %v", topBlock.Round()-basics.Round(blocksFetched), err0)
Expand Down Expand Up @@ -658,7 +659,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, cert *agreement.Certificate, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) {
psp, err = cs.blocksDownloadPeerSelector.getNextPeer()
if err != nil {
if err == errPeerSelectorNoPeerPoolsAvailable {
if errors.Is(err, errPeerSelectorNoPeerPoolsAvailable) {

Check warning on line 662 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L662

Added line #L662 was not covered by tests
cs.log.Infof("fetchBlock: unable to obtain a list of peers to retrieve the latest block from; will retry shortly.")
// this is a possible on startup, since the network package might have yet to retrieve the list of peers.
time.Sleep(noPeersAvailableSleepInterval)
Expand Down Expand Up @@ -718,7 +719,7 @@ func (cs *CatchpointCatchupService) processStageSwitch() (err error) {
// stopOrAbort is called when any of the stage processing function sees that cs.ctx has been canceled. It can be
// due to the end user attempting to abort the current catchpoint catchup operation or due to a node shutdown.
func (cs *CatchpointCatchupService) stopOrAbort() error {
if cs.abortCtx.Err() == context.Canceled {
if errors.Is(cs.abortCtx.Err(), context.Canceled) {

Check warning on line 722 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L722

Added line #L722 was not covered by tests
return cs.abort(context.Canceled)
}
return nil
Expand Down Expand Up @@ -749,7 +750,7 @@ func (cs *CatchpointCatchupService) updateStage(newStage ledger.CatchpointCatchu
return nil
}

// updateNodeCatchupMode requests the node to change it's operational mode from
// updateNodeCatchupMode requests the node to change its operational mode from
// catchup mode to normal mode and vice versa.
func (cs *CatchpointCatchupService) updateNodeCatchupMode(catchupModeEnabled bool) {
newCtxCh := cs.node.SetCatchpointCatchupMode(catchupModeEnabled)
Expand Down Expand Up @@ -802,24 +803,7 @@ func (cs *CatchpointCatchupService) updateBlockRetrievalStatistics(acquiredBlock
}

func (cs *CatchpointCatchupService) initDownloadPeerSelector() {
cs.blocksDownloadPeerSelector = cs.makeCatchpointPeerSelector()
}

func (cs *CatchpointCatchupService) makeCatchpointPeerSelector() *peerSelector {
if cs.config.EnableCatchupFromArchiveServers {
return makePeerSelector(
cs.net,
[]peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivers},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookRelays},
})
} else {
return makePeerSelector(
cs.net,
[]peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookRelays},
})
}
cs.blocksDownloadPeerSelector = makeCatchpointPeerSelector(cs.net)
}

// checkLedgerDownload sends a HEAD request to the ledger endpoint of peers to validate the catchpoint's availability
Expand All @@ -830,10 +814,9 @@ func (cs *CatchpointCatchupService) checkLedgerDownload() error {
if err != nil {
return fmt.Errorf("failed to parse catchpoint label : %v", err)
}
peerSelector := cs.makeCatchpointPeerSelector()
ledgerFetcher := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config)
for i := 0; i < cs.config.CatchupLedgerDownloadRetryAttempts; i++ {
psp, peerError := peerSelector.getNextPeer()
psp, peerError := cs.blocksDownloadPeerSelector.getNextPeer()

Check warning on line 819 in catchup/catchpointService.go

View check run for this annotation

Codecov / codecov/patch

catchup/catchpointService.go#L819

Added line #L819 was not covered by tests
if peerError != nil {
return err
}
Expand Down
156 changes: 156 additions & 0 deletions catchup/classBasedPeerSelector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package catchup

import (
"errors"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-deadlock"
"time"
)

// classBasedPeerSelector is a rankPooledPeerSelector that tracks and ranks classes of peers based on their response behavior.
// It is used to select the most appropriate peers to download blocks from - this is most useful when catching up
// and needing to figure out whether the blocks can be retrieved from relay nodes or require archive nodes.
// The ordering of the peerSelectors directly determines the priority of the classes of peers.
type classBasedPeerSelector struct {
mu deadlock.Mutex
peerSelectors []*wrappedPeerSelector
}

func makeClassBasedPeerSelector(peerSelectors []*wrappedPeerSelector) *classBasedPeerSelector {
return &classBasedPeerSelector{
peerSelectors: peerSelectors,
}
}

func (c *classBasedPeerSelector) rankPeer(psp *peerSelectorPeer, rank int) (int, int) {
c.mu.Lock()
defer c.mu.Unlock()

oldRank, newRank := -1, -1
for _, wp := range c.peerSelectors {
// See if the peer is in the class, ranking it appropriately if so
if psp.peerClass != wp.peerClass {
continue
}

oldRank, newRank = wp.peerSelector.rankPeer(psp, rank)
if oldRank < 0 || newRank < 0 {
// Peer not found in this selector
continue
}

// Peer was in this class, if there was any kind of download issue, we increment the failure count
if rank >= peerRankNoBlockForRound {
wp.downloadFailures++
}

break
}

return oldRank, newRank
}

func (c *classBasedPeerSelector) peerDownloadDurationToRank(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) {
c.mu.Lock()
defer c.mu.Unlock()

for _, wp := range c.peerSelectors {
rank = wp.peerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration)
// If rank is peerRankInvalidDownload, we check the next class's rankPooledPeerSelector
if rank >= peerRankInvalidDownload {
continue
}
// Should be a legit ranking, we return it
return rank
}
// If we reached here, we have exhausted all classes without finding the peer
return peerRankInvalidDownload
}

func (c *classBasedPeerSelector) getNextPeer() (psp *peerSelectorPeer, err error) {
c.mu.Lock()
defer c.mu.Unlock()
return c.internalGetNextPeer(0)
}

// internalGetNextPeer is a helper function that should be called with the lock held
func (c *classBasedPeerSelector) internalGetNextPeer(recurseCount int8) (psp *peerSelectorPeer, err error) {
// Safety check to prevent infinite recursion
if recurseCount > 1 {
return nil, errPeerSelectorNoPeerPoolsAvailable

Check warning on line 96 in catchup/classBasedPeerSelector.go

View check run for this annotation

Codecov / codecov/patch

catchup/classBasedPeerSelector.go#L96

Added line #L96 was not covered by tests
}
selectorDisabledCount := 0
for _, wp := range c.peerSelectors {
if wp.downloadFailures > wp.toleranceFactor {
// peerSelector is disabled for now, we move to the next one
selectorDisabledCount++
continue
}
psp, err = wp.peerSelector.getNextPeer()

if err != nil {
// This is mostly just future-proofing, as we don't expect any other errors from getNextPeer
if errors.Is(err, errPeerSelectorNoPeerPoolsAvailable) {
// We penalize this class the equivalent of one download failure (in case this is transient)
wp.downloadFailures++
}
continue
}
return psp, nil
}
// If we reached here, we have exhausted all classes and still have no peers
// IFF all classes are disabled, we reset the downloadFailures for all classes and start over
if len(c.peerSelectors) != 0 && selectorDisabledCount == len(c.peerSelectors) {
for _, wp := range c.peerSelectors {
wp.downloadFailures = 0
}
// Recurse to try again, we should have at least one class enabled now
return c.internalGetNextPeer(recurseCount + 1)
}
// If we reached here, we have exhausted all classes without finding a peer, not due to all classes being disabled
return nil, errPeerSelectorNoPeerPoolsAvailable
}

type wrappedPeerSelector struct {
peerSelector peerSelector // The underlying peerSelector for this class
peerClass network.PeerOption // The class of peers the peerSelector is responsible for
toleranceFactor int // The number of times we can net fail for any reason before we move to the next class's rankPooledPeerSelector
downloadFailures int // The number of times we have failed to download a block from this class's rankPooledPeerSelector since it was last reset
}

// makeCatchpointPeerSelector returns a classBasedPeerSelector that selects peers based on their class and response behavior.
// These are the preferred configurations for the catchpoint service.
func makeCatchpointPeerSelector(net peersRetriever) peerSelector {
wrappedPeerSelectors := []*wrappedPeerSelector{
{
peerClass: network.PeersPhonebookRelays,
peerSelector: makeRankPooledPeerSelector(net,
[]peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookRelays}}),
toleranceFactor: 3,
},
{
peerClass: network.PeersPhonebookArchivalNodes,
peerSelector: makeRankPooledPeerSelector(net,
[]peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivalNodes}}),
toleranceFactor: 10,
},
}

return makeClassBasedPeerSelector(wrappedPeerSelectors)
}
Loading

0 comments on commit 58fea01

Please sign in to comment.