From 7f343120a49507410ce0866d77246a72b66d0812 Mon Sep 17 00:00:00 2001 From: Matthias <5011972+fasmat@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:44:35 +0000 Subject: [PATCH 1/6] Rename WithTx to WithTxImmediate and add new WithTx that doesn't start an immediate transaction --- activation/handler_v1.go | 2 +- activation/handler_v2.go | 4 ++-- api/grpcserver/transaction_service_test.go | 2 +- api/grpcserver/v2alpha1/transaction_test.go | 2 +- blocks/certifier.go | 2 +- checkpoint/recovery.go | 4 ++-- cmd/merge-nodes/internal/merge_action.go | 2 +- malfeasance/handler.go | 2 +- mesh/ballotwriter/ballotwriter_test.go | 6 +++--- mesh/mesh.go | 6 +++--- sql/atxs/atxs_test.go | 4 ++-- sql/database_test.go | 2 +- sql/schema.go | 6 +++--- sql/transactions/iterator_test.go | 6 +++--- sql/transactions/transactions_test.go | 22 ++++++++++----------- syncer/atxsync/syncer.go | 2 +- syncer/malsync/syncer.go | 13 ++++++++---- txs/cache.go | 8 ++++---- 18 files changed, 50 insertions(+), 45 deletions(-) diff --git a/activation/handler_v1.go b/activation/handler_v1.go index 7ab8c25d54..5519573562 100644 --- a/activation/handler_v1.go +++ b/activation/handler_v1.go @@ -500,7 +500,7 @@ func (h *HandlerV1) storeAtx( proof *mwire.MalfeasanceProof malicious bool ) - if err := h.cdb.WithTx(ctx, func(tx sql.Transaction) error { + if err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error { var err error malicious, err = identities.IsMalicious(tx, atx.SmesherID) if err != nil { diff --git a/activation/handler_v2.go b/activation/handler_v2.go index 7136dd46a1..3f3e6606a2 100644 --- a/activation/handler_v2.go +++ b/activation/handler_v2.go @@ -851,7 +851,7 @@ func (h *HandlerV2) checkPrevAtx(ctx context.Context, tx sql.Transaction, atx *a // Store an ATX in the DB. func (h *HandlerV2) storeAtx(ctx context.Context, atx *types.ActivationTx, watx *activationTx) error { - if err := h.cdb.WithTx(ctx, func(tx sql.Transaction) error { + if err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error { if len(watx.marriages) != 0 { newMarriageID, err := marriage.NewID(tx) if err != nil { @@ -927,7 +927,7 @@ func (h *HandlerV2) storeAtx(ctx context.Context, atx *types.ActivationTx, watx atxs.AtxAdded(h.cdb, atx) malicious := false - err := h.cdb.WithTx(ctx, func(tx sql.Transaction) error { + err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error { // malfeasance check happens after storing the ATX because storing updates the marriage set // that is needed for the malfeasance proof // TODO(mafa): don't store own ATX if it would mark the node as malicious diff --git a/api/grpcserver/transaction_service_test.go b/api/grpcserver/transaction_service_test.go index caa29e0a76..f6a6786154 100644 --- a/api/grpcserver/transaction_service_test.go +++ b/api/grpcserver/transaction_service_test.go @@ -37,7 +37,7 @@ func TestTransactionService_StreamResults(t *testing.T) { gen := fixture.NewTransactionResultGenerator(). WithAddresses(2) txs := make([]types.TransactionWithResult, 100) - require.NoError(t, db.WithTx(ctx, func(dtx sql.Transaction) error { + require.NoError(t, db.WithTxImmediate(ctx, func(dtx sql.Transaction) error { for i := range txs { tx := gen.Next() diff --git a/api/grpcserver/v2alpha1/transaction_test.go b/api/grpcserver/v2alpha1/transaction_test.go index c743c6e842..6177c7e9b2 100644 --- a/api/grpcserver/v2alpha1/transaction_test.go +++ b/api/grpcserver/v2alpha1/transaction_test.go @@ -43,7 +43,7 @@ func TestTransactionService_List(t *testing.T) { gen := fixture.NewTransactionResultGenerator().WithAddresses(2) txsList := make([]types.TransactionWithResult, 100) - require.NoError(t, db.WithTx(ctx, func(dtx sql.Transaction) error { + require.NoError(t, db.WithTxImmediate(ctx, func(dtx sql.Transaction) error { for i := range txsList { tx := gen.Next() diff --git a/blocks/certifier.go b/blocks/certifier.go index 0931b022ec..ab03ade84d 100644 --- a/blocks/certifier.go +++ b/blocks/certifier.go @@ -564,7 +564,7 @@ func (c *Certifier) save( if len(valid)+len(invalid) == 0 { return certificates.Add(c.db, lid, cert) } - return c.db.WithTx(ctx, func(dbtx sql.Transaction) error { + return c.db.WithTxImmediate(ctx, func(dbtx sql.Transaction) error { if err := certificates.Add(dbtx, lid, cert); err != nil { return err } diff --git a/checkpoint/recovery.go b/checkpoint/recovery.go index 7fb0cb775d..f4a5088940 100644 --- a/checkpoint/recovery.go +++ b/checkpoint/recovery.go @@ -138,7 +138,7 @@ func Recover( } defer localDB.Close() logger.Info("clearing atx and malfeasance sync metadata from local database") - if err := localDB.WithTx(ctx, func(tx sql.Transaction) error { + if err := localDB.WithTxImmediate(ctx, func(tx sql.Transaction) error { if err := atxsync.Clear(tx); err != nil { return err } @@ -274,7 +274,7 @@ func RecoverFromLocalFile( zap.Int("num accounts", len(data.accounts)), zap.Int("num atxs", len(data.atxs)), ) - if err = newDB.WithTx(ctx, func(tx sql.Transaction) error { + if err = newDB.WithTxImmediate(ctx, func(tx sql.Transaction) error { for _, acct := range data.accounts { if err = accounts.Update(tx, acct); err != nil { return fmt.Errorf("restore account snapshot: %w", err) diff --git a/cmd/merge-nodes/internal/merge_action.go b/cmd/merge-nodes/internal/merge_action.go index caa78b830b..7f8b3dbfb8 100644 --- a/cmd/merge-nodes/internal/merge_action.go +++ b/cmd/merge-nodes/internal/merge_action.go @@ -159,7 +159,7 @@ func MergeDBs(ctx context.Context, dbLog *zap.Logger, from, to string) error { } dbLog.Info("merging databases", zap.String("from", from), zap.String("to", to)) - err = dstDB.WithTx(ctx, func(tx sql.Transaction) error { + err = dstDB.WithTxImmediate(ctx, func(tx sql.Transaction) error { enc := func(stmt *sql.Statement) { stmt.BindText(1, filepath.Join(from, localDbFile)) } diff --git a/malfeasance/handler.go b/malfeasance/handler.go index a6e1d0f847..ded77bd507 100644 --- a/malfeasance/handler.go +++ b/malfeasance/handler.go @@ -195,7 +195,7 @@ func (h *Handler) validateAndSave(ctx context.Context, p *wire.MalfeasanceProof) return types.EmptyNodeID, errors.Join(err, pubsub.ErrValidationReject) } proofBytes := codec.MustEncode(p) - if err := h.cdb.WithTx(ctx, func(dbtx sql.Transaction) error { + if err := h.cdb.WithTxImmediate(ctx, func(dbtx sql.Transaction) error { malicious, err := identities.IsMalicious(dbtx, nodeID) if err != nil { return fmt.Errorf("check known malicious: %w", err) diff --git a/mesh/ballotwriter/ballotwriter_test.go b/mesh/ballotwriter/ballotwriter_test.go index 5da0eb4942..79c85aad74 100644 --- a/mesh/ballotwriter/ballotwriter_test.go +++ b/mesh/ballotwriter/ballotwriter_test.go @@ -123,7 +123,7 @@ func BenchmarkWriteCoalescing(b *testing.B) { db := newDiskSqlite(b) b.ResetTimer() for i := 0; i < b.N; i++ { - if err := db.WithTx(context.Background(), func(tx sql.Transaction) error { + if err := db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error { if err := writeFn(a[i], tx); err != nil { b.Fatal(err) } @@ -138,7 +138,7 @@ func BenchmarkWriteCoalescing(b *testing.B) { db := newDiskSqlite(b) b.ResetTimer() for j := 0; j < b.N/1000; j++ { - if err := db.WithTx(context.Background(), func(tx sql.Transaction) error { + if err := db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error { var err error for i := (j * 1000); i < (j*1000)+1000; i++ { if err = writeFn(a[i], tx); err != nil { @@ -156,7 +156,7 @@ func BenchmarkWriteCoalescing(b *testing.B) { db := newDiskSqlite(b) b.ResetTimer() for j := 0; j < b.N/5000; j++ { - if err := db.WithTx(context.Background(), func(tx sql.Transaction) error { + if err := db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error { var err error for i := (j * 5000); i < (j*5000)+5000; i++ { if err = writeFn(a[i], tx); err != nil { diff --git a/mesh/mesh.go b/mesh/mesh.go index d72dc954c3..1d509fc9f7 100644 --- a/mesh/mesh.go +++ b/mesh/mesh.go @@ -95,7 +95,7 @@ func NewMesh( } genesis := types.GetEffectiveGenesis() - if err = db.WithTx(context.Background(), func(dbtx sql.Transaction) error { + if err = db.WithTxImmediate(context.Background(), func(dbtx sql.Transaction) error { if err = layers.SetProcessed(dbtx, genesis); err != nil { return fmt.Errorf("mesh init: %w", err) } @@ -385,7 +385,7 @@ func (msh *Mesh) applyResults(ctx context.Context, results []result.Layer) error return fmt.Errorf("execute block %v/%v: %w", layer.Layer, target, err) } } - if err := msh.cdb.WithTx(ctx, func(dbtx sql.Transaction) error { + if err := msh.cdb.WithTxImmediate(ctx, func(dbtx sql.Transaction) error { if err := layers.SetApplied(dbtx, layer.Layer, target); err != nil { return fmt.Errorf("set applied for %v/%v: %w", layer.Layer, target, err) } @@ -440,7 +440,7 @@ func (msh *Mesh) saveHareOutput(ctx context.Context, lid types.LayerID, bid type certs []certificates.CertValidity err error ) - if err = msh.cdb.WithTx(ctx, func(tx sql.Transaction) error { + if err = msh.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error { // check if a certificate has been generated or sync'ed. // - node generated the certificate when it collected enough certify messages // - hare outputs are processed in layer order. i.e. when hare fails for a previous layer N, diff --git a/sql/atxs/atxs_test.go b/sql/atxs/atxs_test.go index 817e3f40d1..124b914d6f 100644 --- a/sql/atxs/atxs_test.go +++ b/sql/atxs/atxs_test.go @@ -433,7 +433,7 @@ func TestGetIDsByEpochCached(t *testing.T) { require.Equal(t, 11, db.QueryCount()) } - require.NoError(t, db.WithTx(context.Background(), func(tx sql.Transaction) error { + require.NoError(t, db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error { atxs.Add(tx, atx5, types.AtxBlob{}) return nil })) @@ -445,7 +445,7 @@ func TestGetIDsByEpochCached(t *testing.T) { require.ElementsMatch(t, []types.ATXID{atx4.ID(), atx5.ID()}, ids3) require.Equal(t, 13, db.QueryCount()) // not incremented after Add - require.Error(t, db.WithTx(context.Background(), func(tx sql.Transaction) error { + require.Error(t, db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error { atxs.Add(tx, atx6, types.AtxBlob{}) return errors.New("fail") // rollback })) diff --git a/sql/database_test.go b/sql/database_test.go index 46bc62aff4..d197d5e497 100644 --- a/sql/database_test.go +++ b/sql/database_test.go @@ -520,7 +520,7 @@ func TestDBClosed(t *testing.T) { require.NoError(t, db.Close()) _, err := db.Exec("select 1", nil, nil) require.ErrorIs(t, err, ErrClosed) - err = db.WithTx(context.Background(), func(tx Transaction) error { return nil }) + err = db.WithTxImmediate(context.Background(), func(tx Transaction) error { return nil }) require.ErrorIs(t, err, ErrClosed) } diff --git a/sql/schema.go b/sql/schema.go index 20a949b324..f393d7534f 100644 --- a/sql/schema.go +++ b/sql/schema.go @@ -85,7 +85,7 @@ func (s *Schema) SkipMigrations(i ...int) { // Apply applies the schema to the database. func (s *Schema) Apply(db Database) error { - return db.WithTx(context.Background(), func(tx Transaction) error { + return db.WithTxImmediate(context.Background(), func(tx Transaction) error { scanner := bufio.NewScanner(strings.NewReader(s.Script)) scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { if i := bytes.Index(data, []byte(";")); i >= 0 { @@ -147,7 +147,7 @@ func (s *Schema) Migrate(logger *zap.Logger, db Database, before, vacuumState in if m.Order() <= before { continue } - if err := db.WithTx(context.Background(), func(tx Transaction) error { + if err := db.WithTxImmediate(context.Background(), func(tx Transaction) error { if _, ok := s.skipMigration[m.Order()]; !ok { if err := m.Apply(tx, logger); err != nil { for j := i; j >= 0 && s.Migrations[j].Order() > before; j-- { @@ -196,7 +196,7 @@ func (s *Schema) MigrateTempDB(logger *zap.Logger, db Database, before int) erro } if _, ok := s.skipMigration[m.Order()]; !ok { - if err := db.WithTx(context.Background(), func(tx Transaction) error { + if err := db.WithTxImmediate(context.Background(), func(tx Transaction) error { return m.Apply(tx, logger) }); err != nil { return fmt.Errorf("apply %s: %w", m.Name(), err) diff --git a/sql/transactions/iterator_test.go b/sql/transactions/iterator_test.go index 432e2f3c69..8988d9917d 100644 --- a/sql/transactions/iterator_test.go +++ b/sql/transactions/iterator_test.go @@ -64,7 +64,7 @@ func TestIterateResults(t *testing.T) { gen := fixture.NewTransactionResultGenerator() txs := make([]types.TransactionWithResult, 100) - require.NoError(t, db.WithTx(context.TODO(), func(dtx sql.Transaction) error { + require.NoError(t, db.WithTxImmediate(context.Background(), func(dtx sql.Transaction) error { for i := range txs { tx := gen.Next() @@ -148,7 +148,7 @@ func TestIterateSnapshot(t *testing.T) { require.NoError(t, err) gen := fixture.NewTransactionResultGenerator() expect := 10 - require.NoError(t, db.WithTx(context.Background(), func(dtx sql.Transaction) error { + require.NoError(t, db.WithTxImmediate(context.Background(), func(dtx sql.Transaction) error { for i := 0; i < expect; i++ { tx := gen.Next() @@ -176,7 +176,7 @@ func TestIterateSnapshot(t *testing.T) { }() <-initialized - require.NoError(t, db.WithTx(context.TODO(), func(dtx sql.Transaction) error { + require.NoError(t, db.WithTxImmediate(context.Background(), func(dtx sql.Transaction) error { for i := 0; i < 10; i++ { tx := gen.Next() diff --git a/sql/transactions/transactions_test.go b/sql/transactions/transactions_test.go index ab4781874f..0bdac033b3 100644 --- a/sql/transactions/transactions_test.go +++ b/sql/transactions/transactions_test.go @@ -232,17 +232,17 @@ func TestApply_AlreadyApplied(t *testing.T) { require.NoError(t, transactions.Add(db, tx, time.Now())) bid := types.RandomBlockID() - require.NoError(t, db.WithTx(context.Background(), func(dtx sql.Transaction) error { + require.NoError(t, db.WithTxImmediate(context.Background(), func(dtx sql.Transaction) error { return transactions.AddResult(dtx, tx.ID, &types.TransactionResult{Layer: lid, Block: bid}) })) // same block applied again - require.Error(t, db.WithTx(context.Background(), func(dtx sql.Transaction) error { + require.Error(t, db.WithTxImmediate(context.Background(), func(dtx sql.Transaction) error { return transactions.AddResult(dtx, tx.ID, &types.TransactionResult{Layer: lid, Block: bid}) })) // different block applied again - require.Error(t, db.WithTx(context.Background(), func(dtx sql.Transaction) error { + require.Error(t, db.WithTxImmediate(context.Background(), func(dtx sql.Transaction) error { return transactions.AddResult( dtx, tx.ID, @@ -254,7 +254,7 @@ func TestApply_AlreadyApplied(t *testing.T) { func TestUndoLayers_Empty(t *testing.T) { db := statesql.InMemoryTest(t) - require.NoError(t, db.WithTx(context.Background(), func(dtx sql.Transaction) error { + require.NoError(t, db.WithTxImmediate(context.Background(), func(dtx sql.Transaction) error { return transactions.UndoLayers(dtx, types.LayerID(199)) })) } @@ -273,7 +273,7 @@ func TestApplyAndUndoLayers(t *testing.T) { require.NoError(t, transactions.Add(db, tx, time.Now())) bid := types.RandomBlockID() - require.NoError(t, db.WithTx(context.Background(), func(dtx sql.Transaction) error { + require.NoError(t, db.WithTxImmediate(context.Background(), func(dtx sql.Transaction) error { return transactions.AddResult(dtx, tx.ID, &types.TransactionResult{Layer: lid, Block: bid}) })) applied = append(applied, tx.ID) @@ -285,7 +285,7 @@ func TestApplyAndUndoLayers(t *testing.T) { require.Equal(t, types.APPLIED, mtx.State) } // revert to firstLayer - require.NoError(t, db.WithTx(context.Background(), func(dtx sql.Transaction) error { + require.NoError(t, db.WithTxImmediate(context.Background(), func(dtx sql.Transaction) error { return transactions.UndoLayers(dtx, firstLayer.Add(1)) })) @@ -349,7 +349,7 @@ func TestGetByAddress(t *testing.T) { createTX(t, signer1, signer2Address, 1, 191, 1), } received := time.Now() - require.NoError(t, db.WithTx(context.Background(), func(dbtx sql.Transaction) error { + require.NoError(t, db.WithTxImmediate(context.Background(), func(dbtx sql.Transaction) error { for _, tx := range txs { require.NoError(t, transactions.Add(dbtx, tx, received)) require.NoError(t, transactions.AddResult(dbtx, tx.ID, &types.TransactionResult{Layer: lid})) @@ -418,7 +418,7 @@ func TestAppliedLayer(t *testing.T) { for _, tx := range txs { require.NoError(t, transactions.Add(db, tx, time.Now())) } - require.NoError(t, db.WithTx(context.Background(), func(dtx sql.Transaction) error { + require.NoError(t, db.WithTxImmediate(context.Background(), func(dtx sql.Transaction) error { return transactions.AddResult(dtx, txs[0].ID, &types.TransactionResult{Layer: lid, Block: types.BlockID{1, 1}}) })) @@ -429,7 +429,7 @@ func TestAppliedLayer(t *testing.T) { _, err = transactions.GetAppliedLayer(db, txs[1].ID) require.ErrorIs(t, err, sql.ErrNotFound) - require.NoError(t, db.WithTx(context.Background(), func(dtx sql.Transaction) error { + require.NoError(t, db.WithTxImmediate(context.Background(), func(dtx sql.Transaction) error { return transactions.UndoLayers(dtx, lid) })) _, err = transactions.GetAppliedLayer(db, txs[0].ID) @@ -466,7 +466,7 @@ func TestAddressesWithPendingTransactions(t *testing.T) { {Address: principals[0], Nonce: txs[0].Nonce}, {Address: principals[1], Nonce: txs[2].Nonce}, }, rst) - require.NoError(t, db.WithTx(context.Background(), func(dbtx sql.Transaction) error { + require.NoError(t, db.WithTxImmediate(context.Background(), func(dbtx sql.Transaction) error { return transactions.AddResult(dbtx, txs[0].ID, &types.TransactionResult{Message: "hey"}) })) rst, err = transactions.AddressesWithPendingTransactions(db) @@ -475,7 +475,7 @@ func TestAddressesWithPendingTransactions(t *testing.T) { {Address: principals[0], Nonce: txs[1].Nonce}, {Address: principals[1], Nonce: txs[2].Nonce}, }, rst) - require.NoError(t, db.WithTx(context.Background(), func(dbtx sql.Transaction) error { + require.NoError(t, db.WithTxImmediate(context.Background(), func(dbtx sql.Transaction) error { return transactions.AddResult(dbtx, txs[2].ID, &types.TransactionResult{Message: "hey"}) })) rst, err = transactions.AddressesWithPendingTransactions(db) diff --git a/syncer/atxsync/syncer.go b/syncer/atxsync/syncer.go index 99a47d741f..7bcea2a253 100644 --- a/syncer/atxsync/syncer.go +++ b/syncer/atxsync/syncer.go @@ -332,7 +332,7 @@ func (s *Syncer) downloadAtxs( } } - if err := s.localdb.WithTx(context.Background(), func(tx sql.Transaction) error { + if err := s.localdb.WithTxImmediate(context.Background(), func(tx sql.Transaction) error { err := atxsync.SaveRequest(tx, publish, lastSuccess, int64(len(state)), int64(len(downloaded))) if err != nil { return fmt.Errorf("failed to save request time: %w", err) diff --git a/syncer/malsync/syncer.go b/syncer/malsync/syncer.go index f35e2175cb..687d4380ed 100644 --- a/syncer/malsync/syncer.go +++ b/syncer/malsync/syncer.go @@ -341,7 +341,7 @@ func (s *Syncer) downloadNodeIDs(ctx context.Context, initial bool, updates chan } func (s *Syncer) updateState(ctx context.Context) error { - if err := s.localdb.WithTx(ctx, func(tx sql.Transaction) error { + if err := s.localdb.WithTxImmediate(ctx, func(tx sql.Transaction) error { return malsync.UpdateSyncState(tx, s.clock.Now()) }); err != nil { if ctx.Err() != nil { @@ -382,7 +382,9 @@ func (s *Syncer) downloadMalfeasanceProofs(ctx context.Context, initial bool, up return ctx.Err() case update = <-updates: s.logger.Debug("malfeasance sync update", - log.ZContext(ctx), zap.Int("count", len(update.nodeIDs))) + log.ZContext(ctx), + zap.Int("count", len(update.nodeIDs)), + ) sst.update(update) gotUpdate = true } @@ -392,7 +394,9 @@ func (s *Syncer) downloadMalfeasanceProofs(ctx context.Context, initial bool, up return ctx.Err() case update = <-updates: s.logger.Debug("malfeasance sync update", - log.ZContext(ctx), zap.Int("count", len(update.nodeIDs))) + log.ZContext(ctx), + zap.Int("count", len(update.nodeIDs)), + ) sst.update(update) gotUpdate = true default: @@ -417,7 +421,8 @@ func (s *Syncer) downloadMalfeasanceProofs(ctx context.Context, initial bool, up if len(batch) != 0 { s.logger.Debug("retrieving malfeasant identities", log.ZContext(ctx), - zap.Int("count", len(batch))) + zap.Int("count", len(batch)), + ) err := s.fetcher.GetMalfeasanceProofs(ctx, batch) if err != nil { if errors.Is(err, context.Canceled) { diff --git a/txs/cache.go b/txs/cache.go index e23b92a9ff..ee49cedfca 100644 --- a/txs/cache.go +++ b/txs/cache.go @@ -688,7 +688,7 @@ func (c *Cache) ApplyLayer( // commit results before reporting them // TODO(dshulyak) save results in vm - if err := db.WithTx(context.Background(), func(dbtx sql.Transaction) error { + if err := db.WithTxImmediate(context.Background(), func(dbtx sql.Transaction) error { for _, rst := range results { err := transactions.AddResult(dbtx, rst.ID, &rst.TransactionResult) if err != nil { @@ -835,7 +835,7 @@ func checkApplyOrder(logger *zap.Logger, db sql.StateDatabase, toApply types.Lay } func addToProposal(db sql.StateDatabase, lid types.LayerID, pid types.ProposalID, tids []types.TransactionID) error { - return db.WithTx(context.Background(), func(dbtx sql.Transaction) error { + return db.WithTxImmediate(context.Background(), func(dbtx sql.Transaction) error { for _, tid := range tids { if err := transactions.AddToProposal(dbtx, tid, lid, pid); err != nil { return fmt.Errorf("add2prop %w", err) @@ -846,7 +846,7 @@ func addToProposal(db sql.StateDatabase, lid types.LayerID, pid types.ProposalID } func addToBlock(db sql.StateDatabase, lid types.LayerID, bid types.BlockID, tids []types.TransactionID) error { - return db.WithTx(context.Background(), func(dbtx sql.Transaction) error { + return db.WithTxImmediate(context.Background(), func(dbtx sql.Transaction) error { for _, tid := range tids { if err := transactions.AddToBlock(dbtx, tid, lid, bid); err != nil { return fmt.Errorf("add2block %w", err) @@ -857,7 +857,7 @@ func addToBlock(db sql.StateDatabase, lid types.LayerID, bid types.BlockID, tids } func undoLayers(db sql.StateDatabase, from types.LayerID) error { - return db.WithTx(context.Background(), func(dbtx sql.Transaction) error { + return db.WithTxImmediate(context.Background(), func(dbtx sql.Transaction) error { err := transactions.UndoLayers(dbtx, from) if err != nil { return fmt.Errorf("undo %w", err) From f4d9ce3879ec0f311211368c9ff1142cd2cee3ce Mon Sep 17 00:00:00 2001 From: Matthias <5011972+fasmat@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:45:44 +0000 Subject: [PATCH 2/6] Fix flaky TestSyncer test by ensuring a received connection is always put back into the pool --- sql/activesets/activesets.go | 13 +++++-------- sql/database.go | 9 ++++----- syncer/malsync/syncer_test.go | 17 ++++++----------- 3 files changed, 15 insertions(+), 24 deletions(-) diff --git a/sql/activesets/activesets.go b/sql/activesets/activesets.go index 58d30339e7..7e95202ad7 100644 --- a/sql/activesets/activesets.go +++ b/sql/activesets/activesets.go @@ -18,7 +18,7 @@ func Add(db sql.Executor, id types.Hash32, set *types.EpochActiveSet) error { (id, epoch, active_set) values (?1, ?2, ?3);`, func(stmt *sql.Statement) { - stmt.BindBytes(1, id[:]) + stmt.BindBytes(1, id.Bytes()) stmt.BindInt64(2, int64(set.Epoch)) stmt.BindBytes(3, codec.MustEncode(set)) }, nil) @@ -100,9 +100,7 @@ func getBlob(ctx context.Context, db sql.Executor, id []byte) ([]byte, error) { func DeleteBeforeEpoch(db sql.Executor, epoch types.EpochID) error { _, err := db.Exec("delete from activesets where epoch < ?1;", - func(stmt *sql.Statement) { - stmt.BindInt64(1, int64(epoch)) - }, + func(stmt *sql.Statement) { stmt.BindInt64(1, int64(epoch)) }, nil, ) if err != nil { @@ -111,10 +109,9 @@ func DeleteBeforeEpoch(db sql.Executor, epoch types.EpochID) error { return nil } -func Has(db sql.Executor, id []byte) (bool, error) { - rows, err := db.Exec( - "select 1 from activesets where id = ?1;", - func(stmt *sql.Statement) { stmt.BindBytes(1, id) }, +func Has(db sql.Executor, id types.Hash32) (bool, error) { + rows, err := db.Exec("select 1 from activesets where id = ?1;", + func(stmt *sql.Statement) { stmt.BindBytes(1, id.Bytes()) }, nil, ) if err != nil { diff --git a/sql/database.go b/sql/database.go index 3006e1cda5..4f4224b710 100644 --- a/sql/database.go +++ b/sql/database.go @@ -617,6 +617,8 @@ func (db *sqliteDatabase) getTx(ctx context.Context, initstmt string) (*sqliteTx } tx := &sqliteTx{queryCache: db.queryCache, db: db, conn: conn, freeConn: cancel} if err := tx.begin(initstmt); err != nil { + cancel() + db.pool.Put(conn) return nil, err } return tx, nil @@ -686,7 +688,7 @@ func (db *sqliteDatabase) Tx(ctx context.Context) (Transaction, error) { // WithTx will pass initialized deferred transaction to exec callback. // Will commit only if error is nil. func (db *sqliteDatabase) WithTx(ctx context.Context, exec func(Transaction) error) error { - return db.withTx(ctx, beginImmediate, exec) + return db.withTx(ctx, beginDefault, exec) } // TxImmediate creates immediate transaction. @@ -700,10 +702,7 @@ func (db *sqliteDatabase) TxImmediate(ctx context.Context) (Transaction, error) // WithTxImmediate will pass initialized immediate transaction to exec callback. // Will commit only if error is nil. -func (db *sqliteDatabase) WithTxImmediate( - ctx context.Context, - exec func(Transaction) error, -) error { +func (db *sqliteDatabase) WithTxImmediate(ctx context.Context, exec func(Transaction) error) error { return db.withTx(ctx, beginImmediate, exec) } diff --git a/syncer/malsync/syncer_test.go b/syncer/malsync/syncer_test.go index b1397cd3e0..f3b9f53f03 100644 --- a/syncer/malsync/syncer_test.go +++ b/syncer/malsync/syncer_test.go @@ -225,8 +225,7 @@ func TestSyncer(t *testing.T) { tester.expectGetProofs(nil) epochStart := tester.clock.Now().Truncate(time.Second) epochEnd := epochStart.Add(10 * time.Minute) - require.NoError(t, - tester.syncer.EnsureInSync(context.Background(), epochStart, epochEnd)) + require.NoError(t, tester.syncer.EnsureInSync(context.Background(), epochStart, epochEnd)) require.ElementsMatch(t, []types.NodeID{ nid("1"), nid("2"), nid("3"), nid("4"), }, maps.Keys(tester.received)) @@ -238,8 +237,7 @@ func TestSyncer(t *testing.T) { }, tester.attempts) tester.clock.Advance(1 * time.Minute) // second call does nothing after recent sync - require.NoError(t, - tester.syncer.EnsureInSync(context.Background(), epochStart, epochEnd)) + require.NoError(t, tester.syncer.EnsureInSync(context.Background(), epochStart, epochEnd)) require.Zero(t, tester.peerErrCount.n) }) t.Run("EnsureInSync with no malfeasant identities", func(t *testing.T) { @@ -295,7 +293,7 @@ func TestSyncer(t *testing.T) { cancel() eg.Wait() }) - t.Run("gettings ids from MinSyncPeers peers is enough", func(t *testing.T) { + t.Run("getting ids from MinSyncPeers peers is enough", func(t *testing.T) { cfg := DefaultConfig() cfg.MinSyncPeers = 2 tester := newTester(t, cfg) @@ -324,8 +322,7 @@ func TestSyncer(t *testing.T) { }, tester.attempts) tester.clock.Advance(1 * time.Minute) // second call does nothing after recent sync - require.NoError(t, - tester.syncer.EnsureInSync(context.Background(), epochStart, epochEnd)) + require.NoError(t, tester.syncer.EnsureInSync(context.Background(), epochStart, epochEnd)) require.Equal(t, 1, tester.peerErrCount.n) }) t.Run("skip hashes after max retries", func(t *testing.T) { @@ -352,8 +349,7 @@ func TestSyncer(t *testing.T) { }, tester.attempts) tester.clock.Advance(1 * time.Minute) // second call does nothing after recent sync - require.NoError(t, - tester.syncer.EnsureInSync(context.Background(), epochStart, epochEnd)) + require.NoError(t, tester.syncer.EnsureInSync(context.Background(), epochStart, epochEnd)) }) t.Run("skip hashes after validation reject", func(t *testing.T) { tester := newTester(t, DefaultConfig()) @@ -379,7 +375,6 @@ func TestSyncer(t *testing.T) { }, tester.attempts) tester.clock.Advance(1 * time.Minute) // second call does nothing after recent sync - require.NoError(t, - tester.syncer.EnsureInSync(context.Background(), epochStart, epochEnd)) + require.NoError(t, tester.syncer.EnsureInSync(context.Background(), epochStart, epochEnd)) }) } From 7bdcff385b1608b6b2efe62a8cd7d074f2e7250f Mon Sep 17 00:00:00 2001 From: Matthias <5011972+fasmat@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:45:54 +0000 Subject: [PATCH 3/6] Cleanup --- datastore/store.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/datastore/store.go b/datastore/store.go index 256301e52e..db2923e09f 100644 --- a/datastore/store.go +++ b/datastore/store.go @@ -351,13 +351,11 @@ func (bs *BlobStore) Has(hint Hint, key []byte) (bool, error) { case TXDB: return transactions.Has(bs.DB, types.TransactionID(types.BytesToHash(key))) case POETDB: - var ref types.PoetProofRef - copy(ref[:], key) - return poets.Has(bs.DB, ref) + return poets.Has(bs.DB, types.PoetProofRef(key)) case Malfeasance: return identities.IsMalicious(bs.DB, types.BytesToNodeID(key)) case ActiveSet: - return activesets.Has(bs.DB, key) + return activesets.Has(bs.DB, types.BytesToHash(key)) } return false, fmt.Errorf("blob store not found %s", hint) } From 051246e3a931924c6e7db5c6ca48ca257cb67774 Mon Sep 17 00:00:00 2001 From: Matthias <5011972+fasmat@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:46:23 +0000 Subject: [PATCH 4/6] Persist active set in DB at the start of the epoch instead when creating the proposal --- miner/proposal_builder.go | 51 +++++++++++++++++----------------- miner/proposal_builder_test.go | 17 ------------ 2 files changed, 26 insertions(+), 42 deletions(-) diff --git a/miner/proposal_builder.go b/miner/proposal_builder.go index d0c7873f8f..f0ec54d16a 100644 --- a/miner/proposal_builder.go +++ b/miner/proposal_builder.go @@ -65,7 +65,7 @@ type ProposalBuilder struct { logger *zap.Logger cfg config - db sql.Executor + db sql.StateDatabase localdb sql.Executor atxsdata atxsData clock layerClock @@ -204,7 +204,7 @@ func WithLayerSize(size uint32) Opt { } } -// WithWorkersLimit configures paralelization factor for builder operation when working with +// WithWorkersLimit configures parallelization factor for builder operation when working with // more than one signer. func WithWorkersLimit(limit int) Opt { return func(pb *ProposalBuilder) { @@ -270,7 +270,7 @@ func WithActivesetPreparation(prep ActiveSetPreparation) Opt { // New creates a struct of block builder type. func New( clock layerClock, - db sql.Executor, + db sql.StateDatabase, localdb sql.Executor, atxsdata atxsData, publisher pubsub.Publisher, @@ -449,7 +449,7 @@ func (pb *ProposalBuilder) UpdateActiveSet(target types.EpochID, set []types.ATX pb.activeGen.updateFallback(target, set) } -func (pb *ProposalBuilder) initSharedData(current types.LayerID) error { +func (pb *ProposalBuilder) initSharedData(ctx context.Context, current types.LayerID) error { if pb.shared.epoch != current.GetEpoch() { pb.shared = sharedSession{epoch: current.GetEpoch()} } @@ -476,7 +476,27 @@ func (pb *ProposalBuilder) initSharedData(current types.LayerID) error { pb.shared.active.id = id pb.shared.active.set = set pb.shared.active.weight = weight - return nil + + // Ideally we only persist the active set when we are actually eligible with at least one identity in at least one + // layer, but since at the moment we use a bootstrapped activeset, `activesets.Has` will always return + // true anyways. + // + // Additionally all activesets that are older than 2 epochs are deleted at the beginning of an epoch anyway, but + // maybe we should revisit this when activesets are no longer bootstrapped. + return pb.db.WithTx(ctx, func(tx sql.Transaction) error { + yes, err := activesets.Has(tx, pb.shared.active.id) + if err != nil { + return err + } + if yes { + return nil + } + + return activesets.Add(tx, pb.shared.active.id, &types.EpochActiveSet{ + Epoch: pb.shared.epoch, + Set: pb.shared.active.set, + }) + }) } func (pb *ProposalBuilder) initSignerData(ss *signerSession, lid types.LayerID) error { @@ -548,7 +568,7 @@ func (pb *ProposalBuilder) initSignerData(ss *signerSession, lid types.LayerID) func (pb *ProposalBuilder) build(ctx context.Context, lid types.LayerID) error { buildStartTime := time.Now() - if err := pb.initSharedData(lid); err != nil { + if err := pb.initSharedData(ctx, lid); err != nil { return err } @@ -578,17 +598,6 @@ func (pb *ProposalBuilder) build(ctx context.Context, lid types.LayerID) error { return meshHash }) - persistActiveSetOnce := sync.OnceValue(func() error { - err := activesets.Add(pb.db, pb.shared.active.id, &types.EpochActiveSet{ - Epoch: pb.shared.epoch, - Set: pb.shared.active.set, - }) - if err != nil && !errors.Is(err, sql.ErrObjectExists) { - return err - } - return nil - }) - // Two stage pipeline, with the stages running in parallel. // 1. Initializes signers. Runs limited number of goroutines because the initialization is CPU and DB bound. // 2. Collects eligible signers' sessions from the stage 1 and creates and publishes proposals. @@ -662,14 +671,6 @@ func (pb *ProposalBuilder) build(ctx context.Context, lid types.LayerID) error { ss.latency.hash = time.Since(start) eg2.Go(func() error { - // needs to be saved before publishing, as we will query it in handler - if ss.session.ref == types.EmptyBallotID { - start := time.Now() - if err := persistActiveSetOnce(); err != nil { - return err - } - ss.latency.activeSet = time.Since(start) - } proofs := ss.session.eligibilities.proofs[lid] start = time.Now() diff --git a/miner/proposal_builder_test.go b/miner/proposal_builder_test.go index c7fa9f74bf..2e9c4d368b 100644 --- a/miner/proposal_builder_test.go +++ b/miner/proposal_builder_test.go @@ -3,7 +3,6 @@ package miner import ( "bytes" "context" - "encoding/hex" "errors" "fmt" "math/rand" @@ -1272,19 +1271,3 @@ func BenchmarkDoubleCache(b *testing.B) { require.Equal(b, types.EmptyATXID, found) } - -func BenchmarkDB(b *testing.B) { - db, err := statesql.Open("file:state.sql") - require.NoError(b, err) - defer db.Close() - - bytes, err := hex.DecodeString("00003ce28800fadd692c522f7b1db219f675b49108aec7f818e2c4fd935573f6") - require.NoError(b, err) - nodeID := types.BytesToNodeID(bytes) - var found types.ATXID - b.ResetTimer() - for i := 0; i < b.N; i++ { - found, _ = atxs.GetByEpochAndNodeID(db, 30, nodeID) - } - require.NotEqual(b, types.EmptyATXID, found) -} From 5da57a5f7b8b95e83155cd01d3971a7640ac5c5f Mon Sep 17 00:00:00 2001 From: Matthias <5011972+fasmat@users.noreply.github.com> Date: Tue, 29 Oct 2024 17:03:43 +0000 Subject: [PATCH 5/6] Update missed WithTx call --- mesh/ballotwriter/ballotwriter.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mesh/ballotwriter/ballotwriter.go b/mesh/ballotwriter/ballotwriter.go index 35cb2ec43c..d3696e672d 100644 --- a/mesh/ballotwriter/ballotwriter.go +++ b/mesh/ballotwriter/ballotwriter.go @@ -78,7 +78,7 @@ func (w *BallotWriter) Start(ctx context.Context) { // we use a context.Background() because: on shutdown the canceling of the // context may exit the transaction halfway and leave the db in some state where it // causes crawshaw to panic on a "not all connections returned to pool". - if err := w.db.WithTx(context.Background(), func(tx sql.Transaction) error { + if err := w.db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error { for _, ballot := range batch { if !ballot.IsMalicious() { layerBallotStart := time.Now() @@ -167,5 +167,5 @@ type batchResult struct { type db interface { sql.Executor - WithTx(context.Context, func(sql.Transaction) error) error + WithTxImmediate(context.Context, func(sql.Transaction) error) error } From f264397592ab6d5c4abfb3c9820cf53c59bbc266 Mon Sep 17 00:00:00 2001 From: Matthias <5011972+fasmat@users.noreply.github.com> Date: Tue, 29 Oct 2024 17:09:25 +0000 Subject: [PATCH 6/6] Replace custom interface with general --- mesh/ballotwriter/ballotwriter.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/mesh/ballotwriter/ballotwriter.go b/mesh/ballotwriter/ballotwriter.go index d3696e672d..991293f13e 100644 --- a/mesh/ballotwriter/ballotwriter.go +++ b/mesh/ballotwriter/ballotwriter.go @@ -20,7 +20,7 @@ import ( var writerDelay = 100 * time.Millisecond type BallotWriter struct { - db db + db sql.StateDatabase logger *zap.Logger atxMu sync.Mutex @@ -30,7 +30,7 @@ type BallotWriter struct { ballotBatchResult *batchResult } -func New(db db, logger *zap.Logger) *BallotWriter { +func New(db sql.StateDatabase, logger *zap.Logger) *BallotWriter { // create a stopped ticker that can be started later timer := time.NewTicker(writerDelay) timer.Stop() @@ -163,9 +163,3 @@ type batchResult struct { doneC chan struct{} err error } - -type db interface { - sql.Executor - - WithTxImmediate(context.Context, func(sql.Transaction) error) error -}