Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ledger: report catchpoint writing only when it actually started #5413

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions ledger/acctonline.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/algorand/go-deadlock"
Expand Down Expand Up @@ -348,7 +347,7 @@ func (ao *onlineAccounts) consecutiveVersion(offset uint64) uint64 {
return offset
}

func (ao *onlineAccounts) handleUnorderedCommit(dcc *deferredCommitContext) {
func (ao *onlineAccounts) handleUnorderedCommitOrError(dcc *deferredCommitContext) {
}

func (ao *onlineAccounts) maxBalLookback() uint64 {
Expand All @@ -366,7 +365,7 @@ func (ao *onlineAccounts) prepareCommit(dcc *deferredCommitContext) error {
return ao.voters.prepareCommit(dcc)
}

// prepareCommitInternal preforms preapreCommit's logic without locking the tracker's mutex.
// prepareCommitInternal preforms prepareCommit's logic without locking the tracker's mutex.
func (ao *onlineAccounts) prepareCommitInternal(dcc *deferredCommitContext) error {
offset := dcc.offset

Expand All @@ -381,13 +380,6 @@ func (ao *onlineAccounts) prepareCommitInternal(dcc *deferredCommitContext) erro
// Index that corresponds to the oldest round still in deltas
startIndex := len(ao.onlineRoundParamsData) - len(ao.deltas) - 1
if ao.onlineRoundParamsData[startIndex+1].CurrentProtocol != ao.onlineRoundParamsData[startIndex+int(offset)].CurrentProtocol {
// in scheduleCommit, we expect that this function to update the catchpointWriting when
// it's on a catchpoint round and the node is configured to generate catchpoints. Doing this in a deferred function
// here would prevent us from "forgetting" to update this variable later on.
// The same is repeated in commitRound on errors.
if dcc.catchpointFirstStage && dcc.enableGeneratingCatchpointFiles {
atomic.StoreInt32(dcc.catchpointDataWriting, 0)
}
return fmt.Errorf("attempted to commit series of rounds with non-uniform consensus versions")
}

Expand Down
19 changes: 1 addition & 18 deletions ledger/acctupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/algorand/go-deadlock"
Expand Down Expand Up @@ -1602,7 +1601,7 @@ func (au *accountUpdates) roundOffset(rnd basics.Round) (offset uint64, err erro
return off, nil
}

func (au *accountUpdates) handleUnorderedCommit(dcc *deferredCommitContext) {
func (au *accountUpdates) handleUnorderedCommitOrError(dcc *deferredCommitContext) {
}

// prepareCommit prepares data to write to the database a "chunk" of rounds, and update the cached dbRound accordingly.
Expand All @@ -1625,14 +1624,6 @@ func (au *accountUpdates) prepareCommit(dcc *deferredCommitContext) error {
// verify version correctness : all the entries in the au.versions[1:offset+1] should have the *same* version, and the committedUpTo should be enforcing that.
if au.versions[1] != au.versions[offset] {
au.accountsMu.RUnlock()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is the same as catchpointtracker's handleUnorderedCommitOrError and called after commitRound / prepareCommit for all trackers in case of errors.

// in scheduleCommit, we expect that this function to update the catchpointWriting when
// it's on a catchpoint round and the node is configured to generate catchpoints. Doing this in a deferred function
// here would prevent us from "forgetting" to update this variable later on.
// The same is repeated in commitRound on errors.
if dcc.catchpointFirstStage && dcc.enableGeneratingCatchpointFiles {
atomic.StoreInt32(dcc.catchpointDataWriting, 0)
}
return fmt.Errorf("attempted to commit series of rounds with non-uniform consensus versions")
}

Expand Down Expand Up @@ -1664,14 +1655,6 @@ func (au *accountUpdates) commitRound(ctx context.Context, tx trackerdb.Transact
offset := dcc.offset
dbRound := dcc.oldBase

defer func() {
if err != nil {
if dcc.catchpointFirstStage && dcc.enableGeneratingCatchpointFiles {
atomic.StoreInt32(dcc.catchpointDataWriting, 0)
}
}
}()

_, err = tx.ResetTransactionWarnDeadline(ctx, time.Now().Add(accountsUpdatePerRoundHighWatermark*time.Duration(offset)))
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion ledger/bulletin.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (b *bulletin) postCommit(ctx context.Context, dcc *deferredCommitContext) {
func (b *bulletin) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (b *bulletin) handleUnorderedCommit(*deferredCommitContext) {
func (b *bulletin) handleUnorderedCommitOrError(*deferredCommitContext) {
}

func (b *bulletin) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
Expand Down
18 changes: 10 additions & 8 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (ct *catchpointTracker) produceCommittingTask(committedRound basics.Round,
ct.catchpointInterval, dcr.catchpointLookback)

// if we're still writing the previous balances, we can't move forward yet.
if ct.IsWritingCatchpointDataFile() {
if ct.isWritingCatchpointDataFile() {
// if we hit this path, it means that we're still writing a catchpoint.
// see if the new delta range contains another catchpoint.
if hasIntermediateFirstStageRound {
Expand All @@ -469,16 +469,13 @@ func (ct *catchpointTracker) produceCommittingTask(committedRound basics.Round,
dcr.catchpointFirstStage = true

if ct.enableGeneratingCatchpointFiles {
// store non-zero ( all ones ) into the catchpointWriting atomic variable to indicate that a catchpoint is being written ( or, queued to be written )
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the fix: moved to prepareCommit

atomic.StoreInt32(&ct.catchpointDataWriting, int32(-1))
ct.catchpointDataSlowWriting = make(chan struct{}, 1)
if hasMultipleIntermediateFirstStageRounds {
close(ct.catchpointDataSlowWriting)
}
}
}

dcr.catchpointDataWriting = &ct.catchpointDataWriting
dcr.enableGeneratingCatchpointFiles = ct.enableGeneratingCatchpointFiles

rounds := ct.calculateCatchpointRounds(dcr)
Expand All @@ -493,6 +490,11 @@ func (ct *catchpointTracker) prepareCommit(dcc *deferredCommitContext) error {
ct.catchpointsMu.RLock()
defer ct.catchpointsMu.RUnlock()

if ct.enableGeneratingCatchpointFiles && dcc.catchpointFirstStage {
// store non-zero ( all ones ) into the catchpointWriting atomic variable to indicate that a catchpoint is being written
atomic.StoreInt32(&ct.catchpointDataWriting, int32(-1))
}

dcc.committedRoundDigests = make([]crypto.Digest, dcc.offset)
copy(dcc.committedRoundDigests, ct.roundDigest[:dcc.offset])

Expand Down Expand Up @@ -926,10 +928,10 @@ func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferr
}
}

// handleUnorderedCommit is a special method for handling deferred commits that are out of order.
// handleUnorderedCommitOrError is a special method for handling deferred commits that are out of order.
// Tracker might update own state in this case. For example, account catchpoint tracker cancels
// scheduled catchpoint writing that deferred commit.
func (ct *catchpointTracker) handleUnorderedCommit(dcc *deferredCommitContext) {
func (ct *catchpointTracker) handleUnorderedCommitOrError(dcc *deferredCommitContext) {
// if the node is configured to generate catchpoint files, we might need to update the catchpointWriting variable.
if ct.enableGeneratingCatchpointFiles {
// determine if this was a catchpoint round
Expand Down Expand Up @@ -1085,9 +1087,9 @@ func (ct *catchpointTracker) accountsUpdateBalances(accountsDeltas compactAccoun
return
}

// IsWritingCatchpointDataFile returns true iff a (first stage) catchpoint data file
// isWritingCatchpointDataFile returns true iff a (first stage) catchpoint data file
// is being generated.
func (ct *catchpointTracker) IsWritingCatchpointDataFile() bool {
func (ct *catchpointTracker) isWritingCatchpointDataFile() bool {
return atomic.LoadInt32(&ct.catchpointDataWriting) != 0
}

Expand Down
18 changes: 9 additions & 9 deletions ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ func TestCatchpointIsWritingCatchpointFile(t *testing.T) {
ct := &catchpointTracker{}

ct.catchpointDataWriting = -1
ans := ct.IsWritingCatchpointDataFile()
ans := ct.isWritingCatchpointDataFile()
require.True(t, ans)

ct.catchpointDataWriting = 0
ans = ct.IsWritingCatchpointDataFile()
ans = ct.isWritingCatchpointDataFile()
require.False(t, ans)
}

Expand Down Expand Up @@ -569,7 +569,7 @@ func TestCatchpointReproducibleLabels(t *testing.T) {
ml.trackers.waitAccountsWriting()

// Let catchpoint data generation finish so that nothing gets skipped.
for ct.IsWritingCatchpointDataFile() {
for ct.isWritingCatchpointDataFile() {
time.Sleep(time.Millisecond)
}
}
Expand Down Expand Up @@ -612,7 +612,7 @@ func TestCatchpointReproducibleLabels(t *testing.T) {
ml2.trackers.committedUpTo(i)
ml2.trackers.waitAccountsWriting()
// Let catchpoint data generation finish so that nothing gets skipped.
for ct.IsWritingCatchpointDataFile() {
for ct.isWritingCatchpointDataFile() {
time.Sleep(time.Millisecond)
}
}
Expand Down Expand Up @@ -694,8 +694,8 @@ func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferred
}
}

// handleUnorderedCommit is not used by the blockingTracker
func (bt *blockingTracker) handleUnorderedCommit(*deferredCommitContext) {
// handleUnorderedCommitOrError is not used by the blockingTracker
func (bt *blockingTracker) handleUnorderedCommitOrError(*deferredCommitContext) {
}

// close is not used by the blockingTracker
Expand Down Expand Up @@ -1089,7 +1089,7 @@ func TestCatchpointFirstStageInfoPruning(t *testing.T) {
ml.trackers.committedUpTo(i)
ml.trackers.waitAccountsWriting()
// Let catchpoint data generation finish so that nothing gets skipped.
for ct.IsWritingCatchpointDataFile() {
for ct.isWritingCatchpointDataFile() {
time.Sleep(time.Millisecond)
}
}
Expand Down Expand Up @@ -1304,7 +1304,7 @@ func TestCatchpointSecondStagePersistence(t *testing.T) {
ml.trackers.committedUpTo(i)
ml.trackers.waitAccountsWriting()
// Let catchpoint data generation finish so that nothing gets skipped.
for ct.IsWritingCatchpointDataFile() {
for ct.isWritingCatchpointDataFile() {
time.Sleep(time.Millisecond)
}
}
Expand Down Expand Up @@ -1509,7 +1509,7 @@ func TestCatchpointSecondStageDeletesUnfinishedCatchpointRecordAfterRestart(t *t
ml.addToBlockQueue(blockEntry{block: blk}, delta)

// Let catchpoint data generation finish so that nothing gets skipped.
for ct.IsWritingCatchpointDataFile() {
for ct.isWritingCatchpointDataFile() {
time.Sleep(time.Millisecond)
}
}
Expand Down
2 changes: 1 addition & 1 deletion ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ func (l *Ledger) trackerEvalVerified(blk bookkeeping.Block, accUpdatesLedger eva
func (l *Ledger) IsWritingCatchpointDataFile() bool {
l.trackerMu.RLock()
defer l.trackerMu.RUnlock()
return l.catchpoint.IsWritingCatchpointDataFile()
return l.catchpoint.isWritingCatchpointDataFile()
}

// VerifiedTransactionCache returns the verify.VerifiedTransactionCache
Expand Down
2 changes: 1 addition & 1 deletion ledger/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (mt *metricsTracker) postCommit(ctx context.Context, dcc *deferredCommitCon
func (mt *metricsTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (mt *metricsTracker) handleUnorderedCommit(*deferredCommitContext) {
func (mt *metricsTracker) handleUnorderedCommitOrError(*deferredCommitContext) {
}

func (mt *metricsTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
Expand Down
2 changes: 1 addition & 1 deletion ledger/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (bn *blockNotifier) postCommit(ctx context.Context, dcc *deferredCommitCont
func (bn *blockNotifier) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (bn *blockNotifier) handleUnorderedCommit(*deferredCommitContext) {
func (bn *blockNotifier) handleUnorderedCommitOrError(*deferredCommitContext) {
}

func (bn *blockNotifier) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
Expand Down
2 changes: 1 addition & 1 deletion ledger/spverificationtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (spt *spVerificationTracker) postCommit(_ context.Context, dcc *deferredCom
func (spt *spVerificationTracker) postCommitUnlocked(context.Context, *deferredCommitContext) {
}

func (spt *spVerificationTracker) handleUnorderedCommit(*deferredCommitContext) {
func (spt *spVerificationTracker) handleUnorderedCommitOrError(*deferredCommitContext) {
}

func (spt *spVerificationTracker) close() {
Expand Down
34 changes: 20 additions & 14 deletions ledger/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ type ledgerTracker interface {
// An optional context is provided for long-running operations.
postCommitUnlocked(context.Context, *deferredCommitContext)

// handleUnorderedCommit is a special method for handling deferred commits that are out of order.
// handleUnorderedCommitOrError is a special method for handling deferred commits that are out of order
// or to handle errors reported by other trackers while committing a batch.
// Tracker might update own state in this case. For example, account updates tracker cancels
// scheduled catchpoint writing that deferred commit.
handleUnorderedCommit(*deferredCommitContext)
// scheduled catchpoint writing flag for this batch.
handleUnorderedCommitOrError(*deferredCommitContext)

// close terminates the tracker, reclaiming any resources
// like open database connections or goroutines. close may
Expand Down Expand Up @@ -214,12 +215,6 @@ type deferredCommitRange struct {
// a catchpoint data file, in this commit cycle iteration.
catchpointFirstStage bool

// catchpointDataWriting is a pointer to a variable with the same name in the
// catchpointTracker. It's used in order to reset the catchpointDataWriting flag from
// the acctupdates's prepareCommit/commitRound (which is called before the
// corresponding catchpoint tracker method.
catchpointDataWriting *int32

// enableGeneratingCatchpointFiles controls whether the node produces catchpoint files or not.
enableGeneratingCatchpointFiles bool

Expand Down Expand Up @@ -514,7 +509,7 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error {
if tr.dbRound < dbRound || offset < uint64(tr.dbRound-dbRound) {
tr.log.Warnf("out of order deferred commit: offset %d, dbRound %d but current tracker DB round is %d", offset, dbRound, tr.dbRound)
for _, lt := range tr.trackers {
lt.handleUnorderedCommit(dcc)
lt.handleUnorderedCommitOrError(dcc)
}
tr.mu.RUnlock()
return nil
Expand All @@ -538,19 +533,27 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error {
dcc.oldBase = dbRound
dcc.flushTime = time.Now()

var err error
for _, lt := range tr.trackers {
err := lt.prepareCommit(dcc)
err = lt.prepareCommit(dcc)
if err != nil {
tr.log.Errorf(err.Error())
tr.mu.RUnlock()
return err
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
break
}
}
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if another tracker overwrites the err with nil it won't be passed to handleUnorderedCommitOrError?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe nest the error handling right inside the iteration, is this crazy?

	for _, lt := range tr.trackers {
		err := lt.prepareCommit(dcc)
		if err != nil {
			tr.log.Errorf(err.Error())
			for _, lt := range tr.trackers {
				lt.handleUnorderedCommitOrError(dcc)
			}
			tr.mu.RUnlock()
			return err
		}
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this before and did not really liked so moved out of the outer loop

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current approach is pretty straightforward to follow now.

for _, lt := range tr.trackers {
lt.handleUnorderedCommitOrError(dcc)
}
tr.mu.RUnlock()
return err
}

tr.mu.RUnlock()

start := time.Now()
ledgerCommitroundCount.Inc(nil)
err := tr.dbs.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) {
err = tr.dbs.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) {
arw, err := tx.MakeAccountsReaderWriter()
if err != nil {
return err
Expand All @@ -568,6 +571,9 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error {
ledgerCommitroundMicros.AddMicrosecondsSince(start, nil)

if err != nil {
for _, lt := range tr.trackers {
lt.handleUnorderedCommitOrError(dcc)
}
tr.log.Warnf("unable to advance tracker db snapshot (%d-%d): %v", dbRound, dbRound+basics.Round(offset), err)
return err
}
Expand Down
4 changes: 2 additions & 2 deletions ledger/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func (bt *producePrepareBlockingTracker) postCommit(ctx context.Context, dcc *de
func (bt *producePrepareBlockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

// handleUnorderedCommit is not used by the blockingTracker
func (bt *producePrepareBlockingTracker) handleUnorderedCommit(*deferredCommitContext) {
// handleUnorderedCommitOrError is not used by the blockingTracker
func (bt *producePrepareBlockingTracker) handleUnorderedCommitOrError(*deferredCommitContext) {
}

// close is not used by the blockingTracker
Expand Down
2 changes: 1 addition & 1 deletion ledger/txtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (t *txTail) postCommit(ctx context.Context, dcc *deferredCommitContext) {
func (t *txTail) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
}

func (t *txTail) handleUnorderedCommit(*deferredCommitContext) {
func (t *txTail) handleUnorderedCommitOrError(*deferredCommitContext) {
}

func (t *txTail) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
Expand Down