Skip to content

Commit

Permalink
ledger: report catchpoint writing only when it actually started (#5413)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored May 24, 2023
1 parent 9d2bfef commit 3fb6964
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 67 deletions.
12 changes: 2 additions & 10 deletions ledger/acctonline.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/algorand/go-deadlock"
Expand Down Expand Up @@ -348,7 +347,7 @@ func (ao *onlineAccounts) consecutiveVersion(offset uint64) uint64 {
return offset
}

func (ao *onlineAccounts) handleUnorderedCommit(dcc *deferredCommitContext) {
func (ao *onlineAccounts) handleUnorderedCommitOrError(dcc *deferredCommitContext) {
}

func (ao *onlineAccounts) maxBalLookback() uint64 {
Expand All @@ -366,7 +365,7 @@ func (ao *onlineAccounts) prepareCommit(dcc *deferredCommitContext) error {
return ao.voters.prepareCommit(dcc)
}

// prepareCommitInternal preforms preapreCommit's logic without locking the tracker's mutex.
// prepareCommitInternal preforms prepareCommit's logic without locking the tracker's mutex.
func (ao *onlineAccounts) prepareCommitInternal(dcc *deferredCommitContext) error {
offset := dcc.offset

Expand All @@ -381,13 +380,6 @@ func (ao *onlineAccounts) prepareCommitInternal(dcc *deferredCommitContext) erro
// Index that corresponds to the oldest round still in deltas
startIndex := len(ao.onlineRoundParamsData) - len(ao.deltas) - 1
if ao.onlineRoundParamsData[startIndex+1].CurrentProtocol != ao.onlineRoundParamsData[startIndex+int(offset)].CurrentProtocol {
// in scheduleCommit, we expect that this function to update the catchpointWriting when
// it's on a catchpoint round and the node is configured to generate catchpoints. Doing this in a deferred function
// here would prevent us from "forgetting" to update this variable later on.
// The same is repeated in commitRound on errors.
if dcc.catchpointFirstStage && dcc.enableGeneratingCatchpointFiles {
atomic.StoreInt32(dcc.catchpointDataWriting, 0)
}
return fmt.Errorf("attempted to commit series of rounds with non-uniform consensus versions")
}

Expand Down
19 changes: 1 addition & 18 deletions ledger/acctupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/algorand/go-deadlock"
Expand Down Expand Up @@ -1602,7 +1601,7 @@ func (au *accountUpdates) roundOffset(rnd basics.Round) (offset uint64, err erro
return off, nil
}

func (au *accountUpdates) handleUnorderedCommit(dcc *deferredCommitContext) {
func (au *accountUpdates) handleUnorderedCommitOrError(dcc *deferredCommitContext) {
}

// prepareCommit prepares data to write to the database a "chunk" of rounds, and update the cached dbRound accordingly.
Expand All @@ -1625,14 +1624,6 @@ func (au *accountUpdates) prepareCommit(dcc *deferredCommitContext) error {
// verify version correctness : all the entries in the au.versions[1:offset+1] should have the *same* version, and the committedUpTo should be enforcing that.
if au.versions[1] != au.versions[offset] {
au.accountsMu.RUnlock()

// in scheduleCommit, we expect that this function to update the catchpointWriting when
// it's on a catchpoint round and the node is configured to generate catchpoints. Doing this in a deferred function
// here would prevent us from "forgetting" to update this variable later on.
// The same is repeated in commitRound on errors.
if dcc.catchpointFirstStage && dcc.enableGeneratingCatchpointFiles {
atomic.StoreInt32(dcc.catchpointDataWriting, 0)
}
return fmt.Errorf("attempted to commit series of rounds with non-uniform consensus versions")
}

Expand Down Expand Up @@ -1664,14 +1655,6 @@ func (au *accountUpdates) commitRound(ctx context.Context, tx trackerdb.Transact
offset := dcc.offset
dbRound := dcc.oldBase

defer func() {
if err != nil {
if dcc.catchpointFirstStage && dcc.enableGeneratingCatchpointFiles {
atomic.StoreInt32(dcc.catchpointDataWriting, 0)
}
}
}()

_, err = tx.ResetTransactionWarnDeadline(ctx, time.Now().Add(accountsUpdatePerRoundHighWatermark*time.Duration(offset)))
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion ledger/bulletin.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (b *bulletin) postCommit(ctx context.Context, dcc *deferredCommitContext) {
func (b *bulletin) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (b *bulletin) handleUnorderedCommit(*deferredCommitContext) {
func (b *bulletin) handleUnorderedCommitOrError(*deferredCommitContext) {
}

func (b *bulletin) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
Expand Down
18 changes: 10 additions & 8 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (ct *catchpointTracker) produceCommittingTask(committedRound basics.Round,
ct.catchpointInterval, dcr.catchpointLookback)

// if we're still writing the previous balances, we can't move forward yet.
if ct.IsWritingCatchpointDataFile() {
if ct.isWritingCatchpointDataFile() {
// if we hit this path, it means that we're still writing a catchpoint.
// see if the new delta range contains another catchpoint.
if hasIntermediateFirstStageRound {
Expand All @@ -469,16 +469,13 @@ func (ct *catchpointTracker) produceCommittingTask(committedRound basics.Round,
dcr.catchpointFirstStage = true

if ct.enableGeneratingCatchpointFiles {
// store non-zero ( all ones ) into the catchpointWriting atomic variable to indicate that a catchpoint is being written ( or, queued to be written )
atomic.StoreInt32(&ct.catchpointDataWriting, int32(-1))
ct.catchpointDataSlowWriting = make(chan struct{}, 1)
if hasMultipleIntermediateFirstStageRounds {
close(ct.catchpointDataSlowWriting)
}
}
}

dcr.catchpointDataWriting = &ct.catchpointDataWriting
dcr.enableGeneratingCatchpointFiles = ct.enableGeneratingCatchpointFiles

rounds := ct.calculateCatchpointRounds(dcr)
Expand All @@ -493,6 +490,11 @@ func (ct *catchpointTracker) prepareCommit(dcc *deferredCommitContext) error {
ct.catchpointsMu.RLock()
defer ct.catchpointsMu.RUnlock()

if ct.enableGeneratingCatchpointFiles && dcc.catchpointFirstStage {
// store non-zero ( all ones ) into the catchpointWriting atomic variable to indicate that a catchpoint is being written
atomic.StoreInt32(&ct.catchpointDataWriting, int32(-1))
}

dcc.committedRoundDigests = make([]crypto.Digest, dcc.offset)
copy(dcc.committedRoundDigests, ct.roundDigest[:dcc.offset])

Expand Down Expand Up @@ -926,10 +928,10 @@ func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferr
}
}

// handleUnorderedCommit is a special method for handling deferred commits that are out of order.
// handleUnorderedCommitOrError is a special method for handling deferred commits that are out of order.
// Tracker might update own state in this case. For example, account catchpoint tracker cancels
// scheduled catchpoint writing that deferred commit.
func (ct *catchpointTracker) handleUnorderedCommit(dcc *deferredCommitContext) {
func (ct *catchpointTracker) handleUnorderedCommitOrError(dcc *deferredCommitContext) {
// if the node is configured to generate catchpoint files, we might need to update the catchpointWriting variable.
if ct.enableGeneratingCatchpointFiles {
// determine if this was a catchpoint round
Expand Down Expand Up @@ -1085,9 +1087,9 @@ func (ct *catchpointTracker) accountsUpdateBalances(accountsDeltas compactAccoun
return
}

// IsWritingCatchpointDataFile returns true iff a (first stage) catchpoint data file
// isWritingCatchpointDataFile returns true iff a (first stage) catchpoint data file
// is being generated.
func (ct *catchpointTracker) IsWritingCatchpointDataFile() bool {
func (ct *catchpointTracker) isWritingCatchpointDataFile() bool {
return atomic.LoadInt32(&ct.catchpointDataWriting) != 0
}

Expand Down
18 changes: 9 additions & 9 deletions ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ func TestCatchpointIsWritingCatchpointFile(t *testing.T) {
ct := &catchpointTracker{}

ct.catchpointDataWriting = -1
ans := ct.IsWritingCatchpointDataFile()
ans := ct.isWritingCatchpointDataFile()
require.True(t, ans)

ct.catchpointDataWriting = 0
ans = ct.IsWritingCatchpointDataFile()
ans = ct.isWritingCatchpointDataFile()
require.False(t, ans)
}

Expand Down Expand Up @@ -569,7 +569,7 @@ func TestCatchpointReproducibleLabels(t *testing.T) {
ml.trackers.waitAccountsWriting()

// Let catchpoint data generation finish so that nothing gets skipped.
for ct.IsWritingCatchpointDataFile() {
for ct.isWritingCatchpointDataFile() {
time.Sleep(time.Millisecond)
}
}
Expand Down Expand Up @@ -612,7 +612,7 @@ func TestCatchpointReproducibleLabels(t *testing.T) {
ml2.trackers.committedUpTo(i)
ml2.trackers.waitAccountsWriting()
// Let catchpoint data generation finish so that nothing gets skipped.
for ct.IsWritingCatchpointDataFile() {
for ct.isWritingCatchpointDataFile() {
time.Sleep(time.Millisecond)
}
}
Expand Down Expand Up @@ -694,8 +694,8 @@ func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferred
}
}

// handleUnorderedCommit is not used by the blockingTracker
func (bt *blockingTracker) handleUnorderedCommit(*deferredCommitContext) {
// handleUnorderedCommitOrError is not used by the blockingTracker
func (bt *blockingTracker) handleUnorderedCommitOrError(*deferredCommitContext) {
}

// close is not used by the blockingTracker
Expand Down Expand Up @@ -1089,7 +1089,7 @@ func TestCatchpointFirstStageInfoPruning(t *testing.T) {
ml.trackers.committedUpTo(i)
ml.trackers.waitAccountsWriting()
// Let catchpoint data generation finish so that nothing gets skipped.
for ct.IsWritingCatchpointDataFile() {
for ct.isWritingCatchpointDataFile() {
time.Sleep(time.Millisecond)
}
}
Expand Down Expand Up @@ -1304,7 +1304,7 @@ func TestCatchpointSecondStagePersistence(t *testing.T) {
ml.trackers.committedUpTo(i)
ml.trackers.waitAccountsWriting()
// Let catchpoint data generation finish so that nothing gets skipped.
for ct.IsWritingCatchpointDataFile() {
for ct.isWritingCatchpointDataFile() {
time.Sleep(time.Millisecond)
}
}
Expand Down Expand Up @@ -1509,7 +1509,7 @@ func TestCatchpointSecondStageDeletesUnfinishedCatchpointRecordAfterRestart(t *t
ml.addToBlockQueue(blockEntry{block: blk}, delta)

// Let catchpoint data generation finish so that nothing gets skipped.
for ct.IsWritingCatchpointDataFile() {
for ct.isWritingCatchpointDataFile() {
time.Sleep(time.Millisecond)
}
}
Expand Down
2 changes: 1 addition & 1 deletion ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ func (l *Ledger) trackerEvalVerified(blk bookkeeping.Block, accUpdatesLedger eva
func (l *Ledger) IsWritingCatchpointDataFile() bool {
l.trackerMu.RLock()
defer l.trackerMu.RUnlock()
return l.catchpoint.IsWritingCatchpointDataFile()
return l.catchpoint.isWritingCatchpointDataFile()
}

// VerifiedTransactionCache returns the verify.VerifiedTransactionCache
Expand Down
2 changes: 1 addition & 1 deletion ledger/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (mt *metricsTracker) postCommit(ctx context.Context, dcc *deferredCommitCon
func (mt *metricsTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (mt *metricsTracker) handleUnorderedCommit(*deferredCommitContext) {
func (mt *metricsTracker) handleUnorderedCommitOrError(*deferredCommitContext) {
}

func (mt *metricsTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
Expand Down
2 changes: 1 addition & 1 deletion ledger/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (bn *blockNotifier) postCommit(ctx context.Context, dcc *deferredCommitCont
func (bn *blockNotifier) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (bn *blockNotifier) handleUnorderedCommit(*deferredCommitContext) {
func (bn *blockNotifier) handleUnorderedCommitOrError(*deferredCommitContext) {
}

func (bn *blockNotifier) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
Expand Down
2 changes: 1 addition & 1 deletion ledger/spverificationtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (spt *spVerificationTracker) postCommit(_ context.Context, dcc *deferredCom
func (spt *spVerificationTracker) postCommitUnlocked(context.Context, *deferredCommitContext) {
}

func (spt *spVerificationTracker) handleUnorderedCommit(*deferredCommitContext) {
func (spt *spVerificationTracker) handleUnorderedCommitOrError(*deferredCommitContext) {
}

func (spt *spVerificationTracker) close() {
Expand Down
34 changes: 20 additions & 14 deletions ledger/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ type ledgerTracker interface {
// An optional context is provided for long-running operations.
postCommitUnlocked(context.Context, *deferredCommitContext)

// handleUnorderedCommit is a special method for handling deferred commits that are out of order.
// handleUnorderedCommitOrError is a special method for handling deferred commits that are out of order
// or to handle errors reported by other trackers while committing a batch.
// Tracker might update own state in this case. For example, account updates tracker cancels
// scheduled catchpoint writing that deferred commit.
handleUnorderedCommit(*deferredCommitContext)
// scheduled catchpoint writing flag for this batch.
handleUnorderedCommitOrError(*deferredCommitContext)

// close terminates the tracker, reclaiming any resources
// like open database connections or goroutines. close may
Expand Down Expand Up @@ -214,12 +215,6 @@ type deferredCommitRange struct {
// a catchpoint data file, in this commit cycle iteration.
catchpointFirstStage bool

// catchpointDataWriting is a pointer to a variable with the same name in the
// catchpointTracker. It's used in order to reset the catchpointDataWriting flag from
// the acctupdates's prepareCommit/commitRound (which is called before the
// corresponding catchpoint tracker method.
catchpointDataWriting *int32

// enableGeneratingCatchpointFiles controls whether the node produces catchpoint files or not.
enableGeneratingCatchpointFiles bool

Expand Down Expand Up @@ -514,7 +509,7 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error {
if tr.dbRound < dbRound || offset < uint64(tr.dbRound-dbRound) {
tr.log.Warnf("out of order deferred commit: offset %d, dbRound %d but current tracker DB round is %d", offset, dbRound, tr.dbRound)
for _, lt := range tr.trackers {
lt.handleUnorderedCommit(dcc)
lt.handleUnorderedCommitOrError(dcc)
}
tr.mu.RUnlock()
return nil
Expand All @@ -538,19 +533,27 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error {
dcc.oldBase = dbRound
dcc.flushTime = time.Now()

var err error
for _, lt := range tr.trackers {
err := lt.prepareCommit(dcc)
err = lt.prepareCommit(dcc)
if err != nil {
tr.log.Errorf(err.Error())
tr.mu.RUnlock()
return err
break
}
}
if err != nil {
for _, lt := range tr.trackers {
lt.handleUnorderedCommitOrError(dcc)
}
tr.mu.RUnlock()
return err
}

tr.mu.RUnlock()

start := time.Now()
ledgerCommitroundCount.Inc(nil)
err := tr.dbs.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) {
err = tr.dbs.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) {
arw, err := tx.MakeAccountsReaderWriter()
if err != nil {
return err
Expand All @@ -568,6 +571,9 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error {
ledgerCommitroundMicros.AddMicrosecondsSince(start, nil)

if err != nil {
for _, lt := range tr.trackers {
lt.handleUnorderedCommitOrError(dcc)
}
tr.log.Warnf("unable to advance tracker db snapshot (%d-%d): %v", dbRound, dbRound+basics.Round(offset), err)
return err
}
Expand Down
4 changes: 2 additions & 2 deletions ledger/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func (bt *producePrepareBlockingTracker) postCommit(ctx context.Context, dcc *de
func (bt *producePrepareBlockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

// handleUnorderedCommit is not used by the blockingTracker
func (bt *producePrepareBlockingTracker) handleUnorderedCommit(*deferredCommitContext) {
// handleUnorderedCommitOrError is not used by the blockingTracker
func (bt *producePrepareBlockingTracker) handleUnorderedCommitOrError(*deferredCommitContext) {
}

// close is not used by the blockingTracker
Expand Down
2 changes: 1 addition & 1 deletion ledger/txtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (t *txTail) postCommit(ctx context.Context, dcc *deferredCommitContext) {
func (t *txTail) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (t *txTail) handleUnorderedCommit(*deferredCommitContext) {
func (t *txTail) handleUnorderedCommitOrError(*deferredCommitContext) {
}

func (t *txTail) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
Expand Down

0 comments on commit 3fb6964

Please sign in to comment.