Skip to content

Commit

Permalink
Try #6422:
Browse files Browse the repository at this point in the history
  • Loading branch information
spacemesh-bors[bot] authored Oct 29, 2024
2 parents 258e0f7 + f264397 commit d1ccb48
Show file tree
Hide file tree
Showing 25 changed files with 96 additions and 124 deletions.
2 changes: 1 addition & 1 deletion activation/handler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func (h *HandlerV1) storeAtx(
proof *mwire.MalfeasanceProof
malicious bool
)
if err := h.cdb.WithTx(ctx, func(tx sql.Transaction) error {
if err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
var err error
malicious, err = identities.IsMalicious(tx, atx.SmesherID)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions activation/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ func (h *HandlerV2) checkPrevAtx(ctx context.Context, tx sql.Transaction, atx *a

// Store an ATX in the DB.
func (h *HandlerV2) storeAtx(ctx context.Context, atx *types.ActivationTx, watx *activationTx) error {
if err := h.cdb.WithTx(ctx, func(tx sql.Transaction) error {
if err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
if len(watx.marriages) != 0 {
newMarriageID, err := marriage.NewID(tx)
if err != nil {
Expand Down Expand Up @@ -927,7 +927,7 @@ func (h *HandlerV2) storeAtx(ctx context.Context, atx *types.ActivationTx, watx
atxs.AtxAdded(h.cdb, atx)

malicious := false
err := h.cdb.WithTx(ctx, func(tx sql.Transaction) error {
err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
// malfeasance check happens after storing the ATX because storing updates the marriage set
// that is needed for the malfeasance proof
// TODO(mafa): don't store own ATX if it would mark the node as malicious
Expand Down
2 changes: 1 addition & 1 deletion api/grpcserver/transaction_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestTransactionService_StreamResults(t *testing.T) {
gen := fixture.NewTransactionResultGenerator().
WithAddresses(2)
txs := make([]types.TransactionWithResult, 100)
require.NoError(t, db.WithTx(ctx, func(dtx sql.Transaction) error {
require.NoError(t, db.WithTxImmediate(ctx, func(dtx sql.Transaction) error {
for i := range txs {
tx := gen.Next()

Expand Down
2 changes: 1 addition & 1 deletion api/grpcserver/v2alpha1/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestTransactionService_List(t *testing.T) {

gen := fixture.NewTransactionResultGenerator().WithAddresses(2)
txsList := make([]types.TransactionWithResult, 100)
require.NoError(t, db.WithTx(ctx, func(dtx sql.Transaction) error {
require.NoError(t, db.WithTxImmediate(ctx, func(dtx sql.Transaction) error {
for i := range txsList {
tx := gen.Next()

Expand Down
2 changes: 1 addition & 1 deletion blocks/certifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func (c *Certifier) save(
if len(valid)+len(invalid) == 0 {
return certificates.Add(c.db, lid, cert)
}
return c.db.WithTx(ctx, func(dbtx sql.Transaction) error {
return c.db.WithTxImmediate(ctx, func(dbtx sql.Transaction) error {
if err := certificates.Add(dbtx, lid, cert); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions checkpoint/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func Recover(
}
defer localDB.Close()
logger.Info("clearing atx and malfeasance sync metadata from local database")
if err := localDB.WithTx(ctx, func(tx sql.Transaction) error {
if err := localDB.WithTxImmediate(ctx, func(tx sql.Transaction) error {
if err := atxsync.Clear(tx); err != nil {
return err
}
Expand Down Expand Up @@ -274,7 +274,7 @@ func RecoverFromLocalFile(
zap.Int("num accounts", len(data.accounts)),
zap.Int("num atxs", len(data.atxs)),
)
if err = newDB.WithTx(ctx, func(tx sql.Transaction) error {
if err = newDB.WithTxImmediate(ctx, func(tx sql.Transaction) error {
for _, acct := range data.accounts {
if err = accounts.Update(tx, acct); err != nil {
return fmt.Errorf("restore account snapshot: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/merge-nodes/internal/merge_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func MergeDBs(ctx context.Context, dbLog *zap.Logger, from, to string) error {
}

dbLog.Info("merging databases", zap.String("from", from), zap.String("to", to))
err = dstDB.WithTx(ctx, func(tx sql.Transaction) error {
err = dstDB.WithTxImmediate(ctx, func(tx sql.Transaction) error {
enc := func(stmt *sql.Statement) {
stmt.BindText(1, filepath.Join(from, localDbFile))
}
Expand Down
6 changes: 2 additions & 4 deletions datastore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,11 @@ func (bs *BlobStore) Has(hint Hint, key []byte) (bool, error) {
case TXDB:
return transactions.Has(bs.DB, types.TransactionID(types.BytesToHash(key)))
case POETDB:
var ref types.PoetProofRef
copy(ref[:], key)
return poets.Has(bs.DB, ref)
return poets.Has(bs.DB, types.PoetProofRef(key))
case Malfeasance:
return identities.IsMalicious(bs.DB, types.BytesToNodeID(key))
case ActiveSet:
return activesets.Has(bs.DB, key)
return activesets.Has(bs.DB, types.BytesToHash(key))
}
return false, fmt.Errorf("blob store not found %s", hint)
}
2 changes: 1 addition & 1 deletion malfeasance/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (h *Handler) validateAndSave(ctx context.Context, p *wire.MalfeasanceProof)
return types.EmptyNodeID, errors.Join(err, pubsub.ErrValidationReject)
}
proofBytes := codec.MustEncode(p)
if err := h.cdb.WithTx(ctx, func(dbtx sql.Transaction) error {
if err := h.cdb.WithTxImmediate(ctx, func(dbtx sql.Transaction) error {
malicious, err := identities.IsMalicious(dbtx, nodeID)
if err != nil {
return fmt.Errorf("check known malicious: %w", err)
Expand Down
12 changes: 3 additions & 9 deletions mesh/ballotwriter/ballotwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
var writerDelay = 100 * time.Millisecond

type BallotWriter struct {
db db
db sql.StateDatabase
logger *zap.Logger

atxMu sync.Mutex
Expand All @@ -30,7 +30,7 @@ type BallotWriter struct {
ballotBatchResult *batchResult
}

func New(db db, logger *zap.Logger) *BallotWriter {
func New(db sql.StateDatabase, logger *zap.Logger) *BallotWriter {
// create a stopped ticker that can be started later
timer := time.NewTicker(writerDelay)
timer.Stop()
Expand Down Expand Up @@ -78,7 +78,7 @@ func (w *BallotWriter) Start(ctx context.Context) {
// we use a context.Background() because: on shutdown the canceling of the
// context may exit the transaction halfway and leave the db in some state where it
// causes crawshaw to panic on a "not all connections returned to pool".
if err := w.db.WithTx(context.Background(), func(tx sql.Transaction) error {
if err := w.db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
for _, ballot := range batch {
if !ballot.IsMalicious() {
layerBallotStart := time.Now()
Expand Down Expand Up @@ -163,9 +163,3 @@ type batchResult struct {
doneC chan struct{}
err error
}

type db interface {
sql.Executor

WithTx(context.Context, func(sql.Transaction) error) error
}
6 changes: 3 additions & 3 deletions mesh/ballotwriter/ballotwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func BenchmarkWriteCoalescing(b *testing.B) {
db := newDiskSqlite(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := db.WithTx(context.Background(), func(tx sql.Transaction) error {
if err := db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
if err := writeFn(a[i], tx); err != nil {
b.Fatal(err)
}
Expand All @@ -138,7 +138,7 @@ func BenchmarkWriteCoalescing(b *testing.B) {
db := newDiskSqlite(b)
b.ResetTimer()
for j := 0; j < b.N/1000; j++ {
if err := db.WithTx(context.Background(), func(tx sql.Transaction) error {
if err := db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
var err error
for i := (j * 1000); i < (j*1000)+1000; i++ {
if err = writeFn(a[i], tx); err != nil {
Expand All @@ -156,7 +156,7 @@ func BenchmarkWriteCoalescing(b *testing.B) {
db := newDiskSqlite(b)
b.ResetTimer()
for j := 0; j < b.N/5000; j++ {
if err := db.WithTx(context.Background(), func(tx sql.Transaction) error {
if err := db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
var err error
for i := (j * 5000); i < (j*5000)+5000; i++ {
if err = writeFn(a[i], tx); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewMesh(
}

genesis := types.GetEffectiveGenesis()
if err = db.WithTx(context.Background(), func(dbtx sql.Transaction) error {
if err = db.WithTxImmediate(context.Background(), func(dbtx sql.Transaction) error {
if err = layers.SetProcessed(dbtx, genesis); err != nil {
return fmt.Errorf("mesh init: %w", err)
}
Expand Down Expand Up @@ -385,7 +385,7 @@ func (msh *Mesh) applyResults(ctx context.Context, results []result.Layer) error
return fmt.Errorf("execute block %v/%v: %w", layer.Layer, target, err)
}
}
if err := msh.cdb.WithTx(ctx, func(dbtx sql.Transaction) error {
if err := msh.cdb.WithTxImmediate(ctx, func(dbtx sql.Transaction) error {
if err := layers.SetApplied(dbtx, layer.Layer, target); err != nil {
return fmt.Errorf("set applied for %v/%v: %w", layer.Layer, target, err)
}
Expand Down Expand Up @@ -440,7 +440,7 @@ func (msh *Mesh) saveHareOutput(ctx context.Context, lid types.LayerID, bid type
certs []certificates.CertValidity
err error
)
if err = msh.cdb.WithTx(ctx, func(tx sql.Transaction) error {
if err = msh.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
// check if a certificate has been generated or sync'ed.
// - node generated the certificate when it collected enough certify messages
// - hare outputs are processed in layer order. i.e. when hare fails for a previous layer N,
Expand Down
51 changes: 26 additions & 25 deletions miner/proposal_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type ProposalBuilder struct {
logger *zap.Logger
cfg config

db sql.Executor
db sql.StateDatabase
localdb sql.Executor
atxsdata atxsData
clock layerClock
Expand Down Expand Up @@ -204,7 +204,7 @@ func WithLayerSize(size uint32) Opt {
}
}

// WithWorkersLimit configures paralelization factor for builder operation when working with
// WithWorkersLimit configures parallelization factor for builder operation when working with
// more than one signer.
func WithWorkersLimit(limit int) Opt {
return func(pb *ProposalBuilder) {
Expand Down Expand Up @@ -270,7 +270,7 @@ func WithActivesetPreparation(prep ActiveSetPreparation) Opt {
// New creates a struct of block builder type.
func New(
clock layerClock,
db sql.Executor,
db sql.StateDatabase,
localdb sql.Executor,
atxsdata atxsData,
publisher pubsub.Publisher,
Expand Down Expand Up @@ -449,7 +449,7 @@ func (pb *ProposalBuilder) UpdateActiveSet(target types.EpochID, set []types.ATX
pb.activeGen.updateFallback(target, set)
}

func (pb *ProposalBuilder) initSharedData(current types.LayerID) error {
func (pb *ProposalBuilder) initSharedData(ctx context.Context, current types.LayerID) error {
if pb.shared.epoch != current.GetEpoch() {
pb.shared = sharedSession{epoch: current.GetEpoch()}
}
Expand All @@ -476,7 +476,27 @@ func (pb *ProposalBuilder) initSharedData(current types.LayerID) error {
pb.shared.active.id = id
pb.shared.active.set = set
pb.shared.active.weight = weight
return nil

// Ideally we only persist the active set when we are actually eligible with at least one identity in at least one
// layer, but since at the moment we use a bootstrapped activeset, `activesets.Has` will always return
// true anyways.
//
// Additionally all activesets that are older than 2 epochs are deleted at the beginning of an epoch anyway, but
// maybe we should revisit this when activesets are no longer bootstrapped.
return pb.db.WithTx(ctx, func(tx sql.Transaction) error {
yes, err := activesets.Has(tx, pb.shared.active.id)
if err != nil {
return err
}
if yes {
return nil
}

return activesets.Add(tx, pb.shared.active.id, &types.EpochActiveSet{
Epoch: pb.shared.epoch,
Set: pb.shared.active.set,
})
})
}

func (pb *ProposalBuilder) initSignerData(ss *signerSession, lid types.LayerID) error {
Expand Down Expand Up @@ -548,7 +568,7 @@ func (pb *ProposalBuilder) initSignerData(ss *signerSession, lid types.LayerID)

func (pb *ProposalBuilder) build(ctx context.Context, lid types.LayerID) error {
buildStartTime := time.Now()
if err := pb.initSharedData(lid); err != nil {
if err := pb.initSharedData(ctx, lid); err != nil {
return err
}

Expand Down Expand Up @@ -578,17 +598,6 @@ func (pb *ProposalBuilder) build(ctx context.Context, lid types.LayerID) error {
return meshHash
})

persistActiveSetOnce := sync.OnceValue(func() error {
err := activesets.Add(pb.db, pb.shared.active.id, &types.EpochActiveSet{
Epoch: pb.shared.epoch,
Set: pb.shared.active.set,
})
if err != nil && !errors.Is(err, sql.ErrObjectExists) {
return err
}
return nil
})

// Two stage pipeline, with the stages running in parallel.
// 1. Initializes signers. Runs limited number of goroutines because the initialization is CPU and DB bound.
// 2. Collects eligible signers' sessions from the stage 1 and creates and publishes proposals.
Expand Down Expand Up @@ -662,14 +671,6 @@ func (pb *ProposalBuilder) build(ctx context.Context, lid types.LayerID) error {
ss.latency.hash = time.Since(start)

eg2.Go(func() error {
// needs to be saved before publishing, as we will query it in handler
if ss.session.ref == types.EmptyBallotID {
start := time.Now()
if err := persistActiveSetOnce(); err != nil {
return err
}
ss.latency.activeSet = time.Since(start)
}
proofs := ss.session.eligibilities.proofs[lid]

start = time.Now()
Expand Down
17 changes: 0 additions & 17 deletions miner/proposal_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package miner
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"math/rand"
Expand Down Expand Up @@ -1272,19 +1271,3 @@ func BenchmarkDoubleCache(b *testing.B) {

require.Equal(b, types.EmptyATXID, found)
}

func BenchmarkDB(b *testing.B) {
db, err := statesql.Open("file:state.sql")
require.NoError(b, err)
defer db.Close()

bytes, err := hex.DecodeString("00003ce28800fadd692c522f7b1db219f675b49108aec7f818e2c4fd935573f6")
require.NoError(b, err)
nodeID := types.BytesToNodeID(bytes)
var found types.ATXID
b.ResetTimer()
for i := 0; i < b.N; i++ {
found, _ = atxs.GetByEpochAndNodeID(db, 30, nodeID)
}
require.NotEqual(b, types.EmptyATXID, found)
}
13 changes: 5 additions & 8 deletions sql/activesets/activesets.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func Add(db sql.Executor, id types.Hash32, set *types.EpochActiveSet) error {
(id, epoch, active_set)
values (?1, ?2, ?3);`,
func(stmt *sql.Statement) {
stmt.BindBytes(1, id[:])
stmt.BindBytes(1, id.Bytes())
stmt.BindInt64(2, int64(set.Epoch))
stmt.BindBytes(3, codec.MustEncode(set))
}, nil)
Expand Down Expand Up @@ -100,9 +100,7 @@ func getBlob(ctx context.Context, db sql.Executor, id []byte) ([]byte, error) {

func DeleteBeforeEpoch(db sql.Executor, epoch types.EpochID) error {
_, err := db.Exec("delete from activesets where epoch < ?1;",
func(stmt *sql.Statement) {
stmt.BindInt64(1, int64(epoch))
},
func(stmt *sql.Statement) { stmt.BindInt64(1, int64(epoch)) },
nil,
)
if err != nil {
Expand All @@ -111,10 +109,9 @@ func DeleteBeforeEpoch(db sql.Executor, epoch types.EpochID) error {
return nil
}

func Has(db sql.Executor, id []byte) (bool, error) {
rows, err := db.Exec(
"select 1 from activesets where id = ?1;",
func(stmt *sql.Statement) { stmt.BindBytes(1, id) },
func Has(db sql.Executor, id types.Hash32) (bool, error) {
rows, err := db.Exec("select 1 from activesets where id = ?1;",
func(stmt *sql.Statement) { stmt.BindBytes(1, id.Bytes()) },
nil,
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions sql/atxs/atxs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func TestGetIDsByEpochCached(t *testing.T) {
require.Equal(t, 11, db.QueryCount())
}

require.NoError(t, db.WithTx(context.Background(), func(tx sql.Transaction) error {
require.NoError(t, db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
atxs.Add(tx, atx5, types.AtxBlob{})
return nil
}))
Expand All @@ -445,7 +445,7 @@ func TestGetIDsByEpochCached(t *testing.T) {
require.ElementsMatch(t, []types.ATXID{atx4.ID(), atx5.ID()}, ids3)
require.Equal(t, 13, db.QueryCount()) // not incremented after Add

require.Error(t, db.WithTx(context.Background(), func(tx sql.Transaction) error {
require.Error(t, db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
atxs.Add(tx, atx6, types.AtxBlob{})
return errors.New("fail") // rollback
}))
Expand Down
Loading

0 comments on commit d1ccb48

Please sign in to comment.