diff --git a/integration-tests/testsetups/ocr.go b/integration-tests/testsetups/ocr.go index 73b142b6297..259cf6bfec3 100644 --- a/integration-tests/testsetups/ocr.go +++ b/integration-tests/testsetups/ocr.go @@ -11,6 +11,7 @@ import ( "os/signal" "sort" "strings" + "sync" "syscall" "testing" "time" @@ -20,7 +21,6 @@ import ( geth "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" "github.com/pelletier/go-toml/v2" "github.com/rs/zerolog" "github.com/stretchr/testify/require" @@ -615,12 +615,19 @@ func (o *OCRSoakTest) testLoop(testDuration time.Duration, newValue int) { interruption := make(chan os.Signal, 1) //nolint:staticcheck //ignore SA1016 we need to send the os.Kill signal signal.Notify(interruption, os.Kill, os.Interrupt, syscall.SIGTERM) + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + // Channel to signal polling to reset round event counter + resetEventCounter := make(chan struct{}) + defer close(resetEventCounter) + lastValue := 0 newRoundTrigger := time.NewTimer(0) // Want to trigger a new round ASAP defer newRoundTrigger.Stop() o.setFilterQuery() - err := o.observeOCREvents() - require.NoError(o.t, err, "Error subscribing to OCR events") + wg.Add(1) + go o.pollingOCREvents(ctx, &wg, resetEventCounter) n := o.Config.GetNetworkConfig() @@ -709,6 +716,8 @@ func (o *OCRSoakTest) testLoop(testDuration time.Duration, newValue int) { o.deleteChaosSimulations() os.Exit(interruptedExitCode) // Exit with interrupted code to indicate test was interrupted, not just a normal failure case <-endTest: + cancel() + wg.Wait() // Wait for polling to complete return case <-newRoundTrigger.C: err := o.triggerNewRound(newValue) @@ -719,6 +728,8 @@ func (o *OCRSoakTest) testLoop(testDuration time.Duration, newValue int) { Str("Waiting", timerReset.String()). Msg("Error triggering new round, waiting and trying again. Possible connection issues with mockserver") } + // Signal polling to reset event counter + resetEventCounter <- struct{}{} newRoundTrigger.Reset(timerReset) // Change value for the next round @@ -824,75 +835,171 @@ func (o *OCRSoakTest) setFilterQuery() { Msg("Filter Query Set") } -// observeOCREvents subscribes to OCR events and logs them to the test logger -// WARNING: Should only be used for observation and logging. This is not a reliable way to collect events. -func (o *OCRSoakTest) observeOCREvents() error { - eventLogs := make(chan types.Log) - ctx, cancel := context.WithTimeout(testcontext.Get(o.t), 5*time.Second) - eventSub, err := o.seth.Client.SubscribeFilterLogs(ctx, o.filterQuery, eventLogs) - cancel() - if err != nil { - return err - } +// pollingOCREvents Polls the blocks for OCR events and logs them to the test logger +func (o *OCRSoakTest) pollingOCREvents(ctx context.Context, wg *sync.WaitGroup, resetEventCounter <-chan struct{}) { + defer wg.Done() + // Keep track of the last processed block number + processedBlockNum := o.startingBlockNum - 1 + // TODO: Make this configurable + pollInterval := time.Second * 30 + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + + // Retrieve expected number of events per round from configuration + expectedEventsPerRound := *o.Config.GetActiveOCRConfig().Common.NumberOfContracts + eventCounter := 0 + roundTimeout := o.Config.GetActiveOCRConfig().Soak.TimeBetweenRounds.Duration + timeoutTimer := time.NewTimer(roundTimeout) + round := 0 + defer timeoutTimer.Stop() + + o.log.Info().Msg("Start Polling for Answer Updated Events") - go func() { - for { - select { - case event := <-eventLogs: - if o.OCRVersion == "1" { - answerUpdated, err := o.ocrV1Instances[0].ParseEventAnswerUpdated(event) - if err != nil { - o.log.Warn(). - Err(err). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Msg("Error parsing event as AnswerUpdated") - continue - } - o.log.Info(). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Uint64("Round ID", answerUpdated.RoundId.Uint64()). - Int64("Answer", answerUpdated.Current.Int64()). - Msg("Answer Updated Event") - } else if o.OCRVersion == "2" { - answerUpdated, err := o.ocrV2Instances[0].ParseEventAnswerUpdated(event) - if err != nil { - o.log.Warn(). - Err(err). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Msg("Error parsing event as AnswerUpdated") - continue - } + for { + select { + case <-resetEventCounter: + if round != 0 { + if eventCounter == expectedEventsPerRound { o.log.Info(). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Uint64("Round ID", answerUpdated.RoundId.Uint64()). - Int64("Answer", answerUpdated.Current.Int64()). - Msg("Answer Updated Event") + Int("Events found", eventCounter). + Int("Events Expected", expectedEventsPerRound). + Msg("All expected events found") + } else if eventCounter < expectedEventsPerRound { + o.log.Warn(). + Int("Events found", eventCounter). + Int("Events Expected", expectedEventsPerRound). + Msg("Expected to find more events") } - case err = <-eventSub.Err(): - backoff := time.Second - for err != nil { - o.log.Info(). - Err(err). - Str("Backoff", backoff.String()). - Interface("Query", o.filterQuery). - Msg("Error while subscribed to OCR Logs. Resubscribing") - ctx, cancel = context.WithTimeout(testcontext.Get(o.t), backoff) - eventSub, err = o.seth.Client.SubscribeFilterLogs(ctx, o.filterQuery, eventLogs) - cancel() - if err != nil { - time.Sleep(backoff) - backoff = time.Duration(math.Min(float64(backoff)*2, float64(30*time.Second))) - } + } + // Reset event counter and timer for new round + eventCounter = 0 + // Safely stop and drain the timer if a value is present + if !timeoutTimer.Stop() { + <-timeoutTimer.C + } + timeoutTimer.Reset(roundTimeout) + o.log.Info().Msg("Polling for new round, event counter reset") + round++ + case <-ctx.Done(): + o.log.Info().Msg("Test duration ended, finalizing event polling") + timeoutTimer.Reset(roundTimeout) + // Wait until expected events are fetched or until timeout + for eventCounter < expectedEventsPerRound { + select { + case <-timeoutTimer.C: + o.log.Warn().Msg("Timeout reached while waiting for final events") + return + case <-ticker.C: + o.fetchAndProcessEvents(&eventCounter, expectedEventsPerRound, &processedBlockNum) } } + o.log.Info(). + Int("Events found", eventCounter). + Int("Events Expected", expectedEventsPerRound). + Msg("Stop polling.") + return + case <-ticker.C: + o.fetchAndProcessEvents(&eventCounter, expectedEventsPerRound, &processedBlockNum) } - }() + } +} - return nil +// Helper function to poll events and update eventCounter +func (o *OCRSoakTest) fetchAndProcessEvents(eventCounter *int, expectedEvents int, processedBlockNum *uint64) { + latestBlock, err := o.seth.Client.BlockNumber(context.Background()) + if err != nil { + o.log.Error().Err(err).Msg("Error getting latest block number") + return + } + + if *processedBlockNum == latestBlock { + o.log.Debug(). + Uint64("Latest Block", latestBlock). + Uint64("Last Processed Block Number", *processedBlockNum). + Msg("No new blocks since last poll") + return + } + + // Check if the latest block is behind processedBlockNum due to possible reorgs + if *processedBlockNum > latestBlock { + o.log.Error(). + Uint64("From Block", *processedBlockNum). + Uint64("To Block", latestBlock). + Msg("The latest block is behind the processed block. This could happen due to RPC issues or possibly a reorg") + *processedBlockNum = latestBlock + return + } + + fromBlock := *processedBlockNum + 1 + o.filterQuery.FromBlock = big.NewInt(0).SetUint64(fromBlock) + o.filterQuery.ToBlock = big.NewInt(0).SetUint64(latestBlock) + + o.log.Debug(). + Uint64("From Block", fromBlock). + Uint64("To Block", latestBlock). + Msg("Fetching logs for the specified range") + + logs, err := o.seth.Client.FilterLogs(context.Background(), o.filterQuery) + if err != nil { + o.log.Error().Err(err).Msg("Error fetching logs") + return + } + + for _, event := range logs { + *eventCounter++ + if o.OCRVersion == "1" { + answerUpdated, err := o.ocrV1Instances[0].ParseEventAnswerUpdated(event) + if err != nil { + o.log.Warn(). + Err(err). + Str("Address", event.Address.Hex()). + Uint64("Block Number", event.BlockNumber). + Msg("Error parsing event as AnswerUpdated") + continue + } + if *eventCounter <= expectedEvents { + o.log.Info(). + Str("Address", event.Address.Hex()). + Uint64("Block Number", event.BlockNumber). + Uint64("Round ID", answerUpdated.RoundId.Uint64()). + Int64("Answer", answerUpdated.Current.Int64()). + Msg("Answer Updated Event") + } else { + o.log.Error(). + Str("Address", event.Address.Hex()). + Uint64("Block Number", event.BlockNumber). + Uint64("Round ID", answerUpdated.RoundId.Uint64()). + Int64("Answer", answerUpdated.Current.Int64()). + Msg("Excess event detected, beyond expected count") + } + } else if o.OCRVersion == "2" { + answerUpdated, err := o.ocrV2Instances[0].ParseEventAnswerUpdated(event) + if err != nil { + o.log.Warn(). + Err(err). + Str("Address", event.Address.Hex()). + Uint64("Block Number", event.BlockNumber). + Msg("Error parsing event as AnswerUpdated") + continue + } + if *eventCounter <= expectedEvents { + o.log.Info(). + Str("Address", event.Address.Hex()). + Uint64("Block Number", event.BlockNumber). + Uint64("Round ID", answerUpdated.RoundId.Uint64()). + Int64("Answer", answerUpdated.Current.Int64()). + Msg("Answer Updated Event") + } else { + o.log.Error(). + Str("Address", event.Address.Hex()). + Uint64("Block Number", event.BlockNumber). + Uint64("Round ID", answerUpdated.RoundId.Uint64()). + Int64("Answer", answerUpdated.Current.Int64()). + Msg("Excess event detected, beyond expected count") + } + } + } + *processedBlockNum = latestBlock } // triggers a new OCR round by setting a new mock adapter value @@ -941,6 +1048,9 @@ func (o *OCRSoakTest) collectEvents() error { o.ocrRoundStates[len(o.ocrRoundStates)-1].EndTime = start // Set end time for last expected event o.log.Info().Msg("Collecting on-chain events") + // Set from block to be starting block before filtering + o.filterQuery.FromBlock = big.NewInt(0).SetUint64(o.startingBlockNum) + // We must retrieve the events, use exponential backoff for timeout to retry timeout := time.Second * 15 o.log.Info().Interface("Filter Query", o.filterQuery).Str("Timeout", timeout.String()).Msg("Retrieving on-chain events")