Skip to content

Commit

Permalink
Fix nonce handling (#339)
Browse files Browse the repository at this point in the history
The access to the pointer was not thread-safe which was triggering the
race tests.

I moved the initialization out of the hot section and sped things up.
  • Loading branch information
mkysel authored Dec 19, 2024
1 parent a8e6fae commit 1f90582
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 25 deletions.
1 change: 1 addition & 0 deletions cmd/replication/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func main() {
}

blockchainPublisher, err := blockchain.NewBlockchainPublisher(
ctx,
logger,
ethclient,
signer,
Expand Down
46 changes: 22 additions & 24 deletions pkg/blockchain/blockchainPublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ type BlockchainPublisher struct {
identityUpdateContract *abis.IdentityUpdates
logger *zap.Logger
mutexNonce sync.Mutex
nonce *uint64
nonce uint64
}

func NewBlockchainPublisher(
ctx context.Context,
logger *zap.Logger,
client *ethclient.Client,
signer TransactionSigner,
Expand All @@ -55,13 +56,26 @@ func NewBlockchainPublisher(
if err != nil {
return nil, err
}

nonce, err := client.PendingNonceAt(ctx, signer.FromAddress())
if err != nil {
return nil, err
}

// The nonce is the next ID to be used, not the current highest
// The nonce member variable represents the last recenty used, so it is pending-1
nonce = max(nonce-1, uint64(0))

logger.Info(fmt.Sprintf("Starting server with blockchain nonce: %d", nonce))

return &BlockchainPublisher{
signer: signer,
logger: logger.Named("GroupBlockchainPublisher").
With(zap.String("contractAddress", contractOptions.MessagesContractAddress)),
messagesContract: messagesContract,
identityUpdateContract: identityUpdateContract,
client: client,
nonce: nonce,
}, nil
}

Expand Down Expand Up @@ -154,32 +168,16 @@ func (m *BlockchainPublisher) PublishIdentityUpdate(
)
}

// once fetchNonce returns a nonce, it must be used
// otherwise the chain might see a gap and deadlock
func (m *BlockchainPublisher) fetchNonce(ctx context.Context) (uint64, error) {
// NOTE:since pendingNonce starts at 0, and we have to return that value exactly,
// we can't easily use Once with unsigned integers
if m.nonce == nil {
m.mutexNonce.Lock()
defer m.mutexNonce.Unlock()
if m.nonce == nil {
// PendingNonceAt gives the next nonce that should be used
// if we are the first thread to initialize the nonce, we want to return PendingNonce+0
nonce, err := m.client.PendingNonceAt(ctx, m.signer.FromAddress())
if err != nil {
return 0, err
}
m.nonce = &nonce
m.logger.Info(fmt.Sprintf("Starting server with blockchain nonce: %d", *m.nonce))
return *m.nonce, nil
}
}
// Once the nonce has been initialized we can depend on Atomic to return the next value
next := atomic.AddUint64(m.nonce, 1)

pending, err := m.client.PendingNonceAt(ctx, m.signer.FromAddress())
if err != nil {
return 0, err
}

next := atomic.AddUint64(&m.nonce, 1)

m.logger.Debug(
"Generated nonce",
zap.Uint64("pending_nonce", pending),
Expand All @@ -196,18 +194,18 @@ func (m *BlockchainPublisher) fetchNonce(ctx context.Context) (uint64, error) {
// this won't catch all possible timing scenarios, but it should self-heal if the chain jumps
m.mutexNonce.Lock()
defer m.mutexNonce.Unlock()
currentNonce := atomic.LoadUint64(m.nonce)
currentNonce := atomic.LoadUint64(&m.nonce)
if currentNonce < pending {
m.logger.Info(
"Nonce skew detected",
zap.Uint64("pending_nonce", pending),
zap.Uint64("current_nonce", currentNonce),
)
atomic.StoreUint64(m.nonce, pending)
atomic.StoreUint64(&m.nonce, pending)
return pending, nil
}

return atomic.AddUint64(m.nonce, 1), nil
return atomic.AddUint64(&m.nonce, 1), nil
}

func findLog[T any](
Expand Down
30 changes: 29 additions & 1 deletion pkg/blockchain/blockchainPublisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blockchain

import (
"context"
"sync"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -24,7 +25,7 @@ func buildPublisher(t *testing.T) (*BlockchainPublisher, func()) {
client, err := NewClient(ctx, contractsOptions.RpcUrl)
require.NoError(t, err)

publisher, err := NewBlockchainPublisher(logger, client, signer, contractsOptions)
publisher, err := NewBlockchainPublisher(ctx, logger, client, signer, contractsOptions)
require.NoError(t, err)

return publisher, func() {
Expand Down Expand Up @@ -140,3 +141,30 @@ func TestPublishGroupMessage(t *testing.T) {
})
}
}

func TestPublishGroupMessageConcurrent(t *testing.T) {
publisher, cleanup := buildPublisher(t)
defer cleanup()

const parallelRuns = 100

var wg sync.WaitGroup
wg.Add(parallelRuns)

for i := 0; i < parallelRuns; i++ {
go func() {
defer wg.Done()

_, err := publisher.PublishGroupMessage(
context.Background(),
testutils.RandomGroupID(),
testutils.RandomBytes(100),
)
require.NoError(t, err)
}()
}

// Wait for all goroutines to finish
wg.Wait()

}
1 change: 1 addition & 0 deletions pkg/indexer/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func messagePublisher(t *testing.T, ctx context.Context) *blockchain.BlockchainP
require.NoError(t, err)

publisher, err := blockchain.NewBlockchainPublisher(
ctx,
testutils.NewLog(t),
client,
signer,
Expand Down

0 comments on commit 1f90582

Please sign in to comment.