Skip to content

Commit

Permalink
[SHIP-3903]Remove WS requirement for data feeds soak tests (#15003)
Browse files Browse the repository at this point in the history
* Add events polling

* Sort endless loop

* Increase polling time

* Update logic

* Adjust

* Update

* Add todo

* Improve readability

* Track goroutine

* Update event logic
  • Loading branch information
davidcauchi authored Nov 5, 2024
1 parent f8a6218 commit 0b38fca
Showing 1 changed file with 175 additions and 65 deletions.
240 changes: 175 additions & 65 deletions integration-tests/testsetups/ocr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os/signal"
"sort"
"strings"
"sync"
"syscall"
"testing"
"time"
Expand All @@ -20,7 +21,6 @@ import (

geth "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/pelletier/go-toml/v2"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -615,12 +615,19 @@ func (o *OCRSoakTest) testLoop(testDuration time.Duration, newValue int) {
interruption := make(chan os.Signal, 1)
//nolint:staticcheck //ignore SA1016 we need to send the os.Kill signal
signal.Notify(interruption, os.Kill, os.Interrupt, syscall.SIGTERM)

ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
// Channel to signal polling to reset round event counter
resetEventCounter := make(chan struct{})
defer close(resetEventCounter)

lastValue := 0
newRoundTrigger := time.NewTimer(0) // Want to trigger a new round ASAP
defer newRoundTrigger.Stop()
o.setFilterQuery()
err := o.observeOCREvents()
require.NoError(o.t, err, "Error subscribing to OCR events")
wg.Add(1)
go o.pollingOCREvents(ctx, &wg, resetEventCounter)

n := o.Config.GetNetworkConfig()

Expand Down Expand Up @@ -709,6 +716,8 @@ func (o *OCRSoakTest) testLoop(testDuration time.Duration, newValue int) {
o.deleteChaosSimulations()
os.Exit(interruptedExitCode) // Exit with interrupted code to indicate test was interrupted, not just a normal failure
case <-endTest:
cancel()
wg.Wait() // Wait for polling to complete
return
case <-newRoundTrigger.C:
err := o.triggerNewRound(newValue)
Expand All @@ -719,6 +728,8 @@ func (o *OCRSoakTest) testLoop(testDuration time.Duration, newValue int) {
Str("Waiting", timerReset.String()).
Msg("Error triggering new round, waiting and trying again. Possible connection issues with mockserver")
}
// Signal polling to reset event counter
resetEventCounter <- struct{}{}
newRoundTrigger.Reset(timerReset)

// Change value for the next round
Expand Down Expand Up @@ -824,75 +835,171 @@ func (o *OCRSoakTest) setFilterQuery() {
Msg("Filter Query Set")
}

// observeOCREvents subscribes to OCR events and logs them to the test logger
// WARNING: Should only be used for observation and logging. This is not a reliable way to collect events.
func (o *OCRSoakTest) observeOCREvents() error {
eventLogs := make(chan types.Log)
ctx, cancel := context.WithTimeout(testcontext.Get(o.t), 5*time.Second)
eventSub, err := o.seth.Client.SubscribeFilterLogs(ctx, o.filterQuery, eventLogs)
cancel()
if err != nil {
return err
}
// pollingOCREvents Polls the blocks for OCR events and logs them to the test logger
func (o *OCRSoakTest) pollingOCREvents(ctx context.Context, wg *sync.WaitGroup, resetEventCounter <-chan struct{}) {
defer wg.Done()
// Keep track of the last processed block number
processedBlockNum := o.startingBlockNum - 1
// TODO: Make this configurable
pollInterval := time.Second * 30
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()

// Retrieve expected number of events per round from configuration
expectedEventsPerRound := *o.Config.GetActiveOCRConfig().Common.NumberOfContracts
eventCounter := 0
roundTimeout := o.Config.GetActiveOCRConfig().Soak.TimeBetweenRounds.Duration
timeoutTimer := time.NewTimer(roundTimeout)
round := 0
defer timeoutTimer.Stop()

o.log.Info().Msg("Start Polling for Answer Updated Events")

go func() {
for {
select {
case event := <-eventLogs:
if o.OCRVersion == "1" {
answerUpdated, err := o.ocrV1Instances[0].ParseEventAnswerUpdated(event)
if err != nil {
o.log.Warn().
Err(err).
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Msg("Error parsing event as AnswerUpdated")
continue
}
o.log.Info().
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Uint64("Round ID", answerUpdated.RoundId.Uint64()).
Int64("Answer", answerUpdated.Current.Int64()).
Msg("Answer Updated Event")
} else if o.OCRVersion == "2" {
answerUpdated, err := o.ocrV2Instances[0].ParseEventAnswerUpdated(event)
if err != nil {
o.log.Warn().
Err(err).
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Msg("Error parsing event as AnswerUpdated")
continue
}
for {
select {
case <-resetEventCounter:
if round != 0 {
if eventCounter == expectedEventsPerRound {
o.log.Info().
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Uint64("Round ID", answerUpdated.RoundId.Uint64()).
Int64("Answer", answerUpdated.Current.Int64()).
Msg("Answer Updated Event")
Int("Events found", eventCounter).
Int("Events Expected", expectedEventsPerRound).
Msg("All expected events found")
} else if eventCounter < expectedEventsPerRound {
o.log.Warn().
Int("Events found", eventCounter).
Int("Events Expected", expectedEventsPerRound).
Msg("Expected to find more events")
}
case err = <-eventSub.Err():
backoff := time.Second
for err != nil {
o.log.Info().
Err(err).
Str("Backoff", backoff.String()).
Interface("Query", o.filterQuery).
Msg("Error while subscribed to OCR Logs. Resubscribing")
ctx, cancel = context.WithTimeout(testcontext.Get(o.t), backoff)
eventSub, err = o.seth.Client.SubscribeFilterLogs(ctx, o.filterQuery, eventLogs)
cancel()
if err != nil {
time.Sleep(backoff)
backoff = time.Duration(math.Min(float64(backoff)*2, float64(30*time.Second)))
}
}
// Reset event counter and timer for new round
eventCounter = 0
// Safely stop and drain the timer if a value is present
if !timeoutTimer.Stop() {
<-timeoutTimer.C
}
timeoutTimer.Reset(roundTimeout)
o.log.Info().Msg("Polling for new round, event counter reset")
round++
case <-ctx.Done():
o.log.Info().Msg("Test duration ended, finalizing event polling")
timeoutTimer.Reset(roundTimeout)
// Wait until expected events are fetched or until timeout
for eventCounter < expectedEventsPerRound {
select {
case <-timeoutTimer.C:
o.log.Warn().Msg("Timeout reached while waiting for final events")
return
case <-ticker.C:
o.fetchAndProcessEvents(&eventCounter, expectedEventsPerRound, &processedBlockNum)
}
}
o.log.Info().
Int("Events found", eventCounter).
Int("Events Expected", expectedEventsPerRound).
Msg("Stop polling.")
return
case <-ticker.C:
o.fetchAndProcessEvents(&eventCounter, expectedEventsPerRound, &processedBlockNum)
}
}()
}
}

return nil
// Helper function to poll events and update eventCounter
func (o *OCRSoakTest) fetchAndProcessEvents(eventCounter *int, expectedEvents int, processedBlockNum *uint64) {
latestBlock, err := o.seth.Client.BlockNumber(context.Background())
if err != nil {
o.log.Error().Err(err).Msg("Error getting latest block number")
return
}

if *processedBlockNum == latestBlock {
o.log.Debug().
Uint64("Latest Block", latestBlock).
Uint64("Last Processed Block Number", *processedBlockNum).
Msg("No new blocks since last poll")
return
}

// Check if the latest block is behind processedBlockNum due to possible reorgs
if *processedBlockNum > latestBlock {
o.log.Error().
Uint64("From Block", *processedBlockNum).
Uint64("To Block", latestBlock).
Msg("The latest block is behind the processed block. This could happen due to RPC issues or possibly a reorg")
*processedBlockNum = latestBlock
return
}

fromBlock := *processedBlockNum + 1
o.filterQuery.FromBlock = big.NewInt(0).SetUint64(fromBlock)
o.filterQuery.ToBlock = big.NewInt(0).SetUint64(latestBlock)

o.log.Debug().
Uint64("From Block", fromBlock).
Uint64("To Block", latestBlock).
Msg("Fetching logs for the specified range")

logs, err := o.seth.Client.FilterLogs(context.Background(), o.filterQuery)
if err != nil {
o.log.Error().Err(err).Msg("Error fetching logs")
return
}

for _, event := range logs {
*eventCounter++
if o.OCRVersion == "1" {
answerUpdated, err := o.ocrV1Instances[0].ParseEventAnswerUpdated(event)
if err != nil {
o.log.Warn().
Err(err).
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Msg("Error parsing event as AnswerUpdated")
continue
}
if *eventCounter <= expectedEvents {
o.log.Info().
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Uint64("Round ID", answerUpdated.RoundId.Uint64()).
Int64("Answer", answerUpdated.Current.Int64()).
Msg("Answer Updated Event")
} else {
o.log.Error().
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Uint64("Round ID", answerUpdated.RoundId.Uint64()).
Int64("Answer", answerUpdated.Current.Int64()).
Msg("Excess event detected, beyond expected count")
}
} else if o.OCRVersion == "2" {
answerUpdated, err := o.ocrV2Instances[0].ParseEventAnswerUpdated(event)
if err != nil {
o.log.Warn().
Err(err).
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Msg("Error parsing event as AnswerUpdated")
continue
}
if *eventCounter <= expectedEvents {
o.log.Info().
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Uint64("Round ID", answerUpdated.RoundId.Uint64()).
Int64("Answer", answerUpdated.Current.Int64()).
Msg("Answer Updated Event")
} else {
o.log.Error().
Str("Address", event.Address.Hex()).
Uint64("Block Number", event.BlockNumber).
Uint64("Round ID", answerUpdated.RoundId.Uint64()).
Int64("Answer", answerUpdated.Current.Int64()).
Msg("Excess event detected, beyond expected count")
}
}
}
*processedBlockNum = latestBlock
}

// triggers a new OCR round by setting a new mock adapter value
Expand Down Expand Up @@ -941,6 +1048,9 @@ func (o *OCRSoakTest) collectEvents() error {
o.ocrRoundStates[len(o.ocrRoundStates)-1].EndTime = start // Set end time for last expected event
o.log.Info().Msg("Collecting on-chain events")

// Set from block to be starting block before filtering
o.filterQuery.FromBlock = big.NewInt(0).SetUint64(o.startingBlockNum)

// We must retrieve the events, use exponential backoff for timeout to retry
timeout := time.Second * 15
o.log.Info().Interface("Filter Query", o.filterQuery).Str("Timeout", timeout.String()).Msg("Retrieving on-chain events")
Expand Down

0 comments on commit 0b38fca

Please sign in to comment.