Skip to content

Commit

Permalink
p2p: handle txns in pubsub validator (#6070)
Browse files Browse the repository at this point in the history
Co-authored-by: cce <51567+cce@users.noreply.github.com>
Co-authored-by: Jason Paulos <jasonpaulos@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 8, 2024
1 parent c6a433b commit 602d950
Show file tree
Hide file tree
Showing 15 changed files with 558 additions and 250 deletions.
4 changes: 2 additions & 2 deletions components/mocks/mockNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func (network *MockNetwork) RegisterHandlers(dispatch []network.TaggedMessageHan
func (network *MockNetwork) ClearHandlers() {
}

// RegisterProcessors - empty implementation.
func (network *MockNetwork) RegisterProcessors(dispatch []network.TaggedMessageProcessor) {
// RegisterValidatorHandlers - empty implementation.
func (network *MockNetwork) RegisterValidatorHandlers(dispatch []network.TaggedMessageValidatorHandler) {
}

// ClearProcessors - empty implementation
Expand Down
54 changes: 41 additions & 13 deletions data/transactions/verify/txnBatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package verify

import (
"errors"
"fmt"
"sync/atomic"

"github.com/algorand/go-algorand/crypto"
Expand Down Expand Up @@ -98,10 +98,16 @@ func (bl *batchLoad) addLoad(txngrp []transactions.SignedTxn, gctx *GroupContext

}

// TxnGroupBatchSigVerifier provides Verify method to synchronously verify a group of transactions
// It starts a new block listener to receive latests block headers for the sig verification
type TxnGroupBatchSigVerifier struct {
cache VerifiedTransactionCache
nbw *NewBlockWatcher
ledger logic.LedgerForSignature
}

type txnSigBatchProcessor struct {
cache VerifiedTransactionCache
nbw *NewBlockWatcher
ledger logic.LedgerForSignature
TxnGroupBatchSigVerifier
resultChan chan<- *VerificationResult
droppedChan chan<- *UnverifiedTxnSigJob
}
Expand Down Expand Up @@ -142,27 +148,49 @@ func (tbp txnSigBatchProcessor) sendResult(veTxnGroup []transactions.SignedTxn,
}
}

// MakeSigVerifyJobProcessor returns the object implementing the stream verifier Helper interface
func MakeSigVerifyJobProcessor(ledger LedgerForStreamVerifier, cache VerifiedTransactionCache,
resultChan chan<- *VerificationResult, droppedChan chan<- *UnverifiedTxnSigJob) (svp execpool.BatchProcessor, err error) {
// MakeSigVerifier creats a new TxnGroupBatchSigVerifier for synchronous verification of transactions
func MakeSigVerifier(ledger LedgerForStreamVerifier, cache VerifiedTransactionCache) (TxnGroupBatchSigVerifier, error) {
latest := ledger.Latest()
latestHdr, err := ledger.BlockHdr(latest)
if err != nil {
return nil, errors.New("MakeStreamVerifier: Could not get header for previous block")
return TxnGroupBatchSigVerifier{}, fmt.Errorf("MakeSigVerifier: Could not get header for previous block: %w", err)
}

nbw := MakeNewBlockWatcher(latestHdr)
ledger.RegisterBlockListeners([]ledgercore.BlockListener{nbw})

verifier := TxnGroupBatchSigVerifier{
cache: cache,
nbw: nbw,
ledger: ledger,
}

return verifier, nil
}

// MakeSigVerifyJobProcessor returns the object implementing the stream verifier Helper interface
func MakeSigVerifyJobProcessor(
ledger LedgerForStreamVerifier, cache VerifiedTransactionCache,
resultChan chan<- *VerificationResult, droppedChan chan<- *UnverifiedTxnSigJob,
) (svp execpool.BatchProcessor, err error) {
sigVerifier, err := MakeSigVerifier(ledger, cache)
if err != nil {
return nil, err
}
return &txnSigBatchProcessor{
cache: cache,
nbw: nbw,
ledger: ledger,
droppedChan: droppedChan,
resultChan: resultChan,
TxnGroupBatchSigVerifier: sigVerifier,
droppedChan: droppedChan,
resultChan: resultChan,
}, nil
}

// Verify synchronously verifies the signatures of the transactions in the group
func (sv *TxnGroupBatchSigVerifier) Verify(stxs []transactions.SignedTxn) error {
blockHeader := sv.nbw.getBlockHeader()
_, err := txnGroup(stxs, blockHeader, sv.cache, sv.ledger, nil)
return err
}

func (tbp *txnSigBatchProcessor) ProcessBatch(txns []execpool.InputJob) {
batchVerifier, ctx := tbp.preProcessUnverifiedTxns(txns)
failed, err := batchVerifier.VerifyWithFeedback()
Expand Down
48 changes: 41 additions & 7 deletions data/transactions/verify/txnBatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,7 @@ func verifyResults(txnGroups [][]transactions.SignedTxn, badTxnGroups map[uint64
require.GreaterOrEqual(t, len(unverifiedGroups), badSigResultCounter)
for _, txn := range unverifiedGroups {
u, _ := binary.Uvarint(txn[0].Txn.Note)
if _, has := badTxnGroups[u]; has {
delete(badTxnGroups, u)
}
delete(badTxnGroups, u)
}
require.Empty(t, badTxnGroups, "unverifiedGroups should have all the transactions with invalid sigs")
}
Expand Down Expand Up @@ -301,6 +299,7 @@ func TestGetNumberOfBatchableSigsInGroup(t *testing.T) {
txnGroups[mod][0].Sig = crypto.Signature{}
batchSigs, err := UnverifiedTxnSigJob{TxnGroup: txnGroups[mod]}.GetNumberOfBatchableItems()
require.ErrorIs(t, err, errTxnSigHasNoSig)
require.Equal(t, uint64(0), batchSigs)
mod++

_, signedTxns, secrets, addrs := generateTestObjects(numOfTxns, 20, 0, 50)
Expand Down Expand Up @@ -353,6 +352,7 @@ byte base64 5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E=
txnGroups[mod][0].Msig = mSigTxn[0].Msig
batchSigs, err = UnverifiedTxnSigJob{TxnGroup: txnGroups[mod]}.GetNumberOfBatchableItems()
require.ErrorIs(t, err, errTxnSigNotWellFormed)
require.Equal(t, uint64(0), batchSigs)
}

// TestStreamToBatchPoolShutdown tests what happens when the exec pool shuts down
Expand Down Expand Up @@ -437,10 +437,11 @@ func TestStreamToBatchPoolShutdown(t *testing.T) { //nolint:paralleltest // Not
// send txn groups to be verified
go func() {
defer wg.Done()
outer:
for _, tg := range txnGroups {
select {
case <-ctx.Done():
break
break outer
case inputChan <- &UnverifiedTxnSigJob{TxnGroup: tg, BacklogMessage: nil}:
}
}
Expand Down Expand Up @@ -493,6 +494,7 @@ func TestStreamToBatchRestart(t *testing.T) {
// send txn groups to be verified
go func() {
defer wg.Done()
outer:
for i, tg := range txnGroups {
if (i+1)%10 == 0 {
cancel()
Expand All @@ -502,7 +504,7 @@ func TestStreamToBatchRestart(t *testing.T) {
}
select {
case <-ctx2.Done():
break
break outer
case inputChan <- &UnverifiedTxnSigJob{TxnGroup: tg, BacklogMessage: nil}:
}
}
Expand Down Expand Up @@ -798,7 +800,10 @@ func TestStreamToBatchPostVBlocked(t *testing.T) {

func TestStreamToBatchMakeStreamToBatchErr(t *testing.T) {
partitiontest.PartitionTest(t)
_, err := MakeSigVerifyJobProcessor(&DummyLedgerForSignature{badHdr: true}, nil, nil, nil)
_, err := MakeSigVerifier(&DummyLedgerForSignature{badHdr: true}, nil)
require.Error(t, err)

_, err = MakeSigVerifyJobProcessor(&DummyLedgerForSignature{badHdr: true}, nil, nil, nil)
require.Error(t, err)
}

Expand Down Expand Up @@ -863,11 +868,40 @@ func TestGetErredUnprocessed(t *testing.T) {

droppedChan := make(chan *UnverifiedTxnSigJob, 1)
svh := txnSigBatchProcessor{
resultChan: make(chan<- *VerificationResult, 0),
resultChan: make(chan<- *VerificationResult),
droppedChan: droppedChan,
}

svh.GetErredUnprocessed(&UnverifiedTxnSigJob{}, nil)
dropped := <-droppedChan
require.Equal(t, *dropped, UnverifiedTxnSigJob{})
}

func TestSigVerifier(t *testing.T) {
partitiontest.PartitionTest(t)

numOfTxns := 16
txnGroups, badTxnGroups := getSignedTransactions(numOfTxns, numOfTxns, 0, 0)
require.GreaterOrEqual(t, len(txnGroups), 1)
require.Equal(t, len(badTxnGroups), 0)
txnGroup := txnGroups[0]

verificationPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, t)
defer verificationPool.Shutdown()

cache := MakeVerifiedTransactionCache(50000)

verifier, err := MakeSigVerifier(&DummyLedgerForSignature{}, cache)
require.NoError(t, err)

err = verifier.Verify(txnGroup)
require.NoError(t, err)

txnGroups, badTxnGroups = getSignedTransactions(numOfTxns, numOfTxns, 0, 1)
require.GreaterOrEqual(t, len(txnGroups), 1)
require.Greater(t, len(badTxnGroups), 0)
txnGroup = txnGroups[0]

err = verifier.Verify(txnGroup)
require.Error(t, err)
}
Loading

0 comments on commit 602d950

Please sign in to comment.