From a6704acb606b7fa4754c298cfc89628982ff81f4 Mon Sep 17 00:00:00 2001 From: james-prysm Date: Tue, 19 Mar 2024 15:08:26 -0500 Subject: [PATCH 1/8] addressing errors on events endpoint --- api/client/event/event_stream.go | 5 +- beacon-chain/rpc/eth/events/BUILD.bazel | 2 +- beacon-chain/rpc/eth/events/events.go | 198 +++++++++--------- .../rpc/prysm/v1alpha1/node/server.go | 6 +- .../beacon-api/beacon_api_validator_client.go | 3 +- 5 files changed, 104 insertions(+), 110 deletions(-) diff --git a/api/client/event/event_stream.go b/api/client/event/event_stream.go index 48a1951b2b90..2759bc06d3e1 100644 --- a/api/client/event/event_stream.go +++ b/api/client/event/event_stream.go @@ -106,7 +106,10 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) { var eventType, data string // Variables to store event type and data // Iterate over lines of the event stream - for scanner.Scan() { + for { + if ok := scanner.Scan(); !ok { + break + } select { case <-h.ctx.Done(): log.Info("Context canceled, stopping event stream") diff --git a/beacon-chain/rpc/eth/events/BUILD.bazel b/beacon-chain/rpc/eth/events/BUILD.bazel index 44b89fcab35f..f795688bf5ce 100644 --- a/beacon-chain/rpc/eth/events/BUILD.bazel +++ b/beacon-chain/rpc/eth/events/BUILD.bazel @@ -25,7 +25,7 @@ go_library( "//runtime/version:go_default_library", "//time/slots:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", - "@com_github_sirupsen_logrus//:go_default_library", + "@com_github_pkg_errors//:go_default_library", "@io_opencensus_go//trace:go_default_library", ], ) diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index bc27e0bb033e..3fbc5120bd6a 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -8,6 +8,7 @@ import ( time2 "time" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/api" "github.com/prysmaticlabs/prysm/v5/api/server/structs" "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain" @@ -23,7 +24,6 @@ import ( ethpbv2 "github.com/prysmaticlabs/prysm/v5/proto/eth/v2" "github.com/prysmaticlabs/prysm/v5/runtime/version" "github.com/prysmaticlabs/prysm/v5/time/slots" - log "github.com/sirupsen/logrus" "go.opencensus.io/trace" ) @@ -124,86 +124,97 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) { // stalling while waiting for the first response chunk. // After that we send a keepalive dummy message every SECONDS_PER_SLOT // to prevent anyone (e.g. proxy servers) from closing connections. - sendKeepalive(w, flusher) + if err := sendKeepalive(w, flusher); err != nil { + httputil.HandleError(w, err.Error(), http.StatusInternalServerError) + return + } keepaliveTicker := time2.NewTicker(time2.Duration(params.BeaconConfig().SecondsPerSlot) * time2.Second) + for { select { case event := <-opsChan: - handleBlockOperationEvents(w, flusher, topicsMap, event) + if err := handleBlockOperationEvents(w, flusher, topicsMap, event); err != nil { + httputil.HandleError(w, err.Error(), http.StatusInternalServerError) + return + } case event := <-stateChan: - s.handleStateEvents(ctx, w, flusher, topicsMap, event) + if err := s.handleStateEvents(ctx, w, flusher, topicsMap, event); err != nil { + httputil.HandleError(w, err.Error(), http.StatusInternalServerError) + return + } case <-keepaliveTicker.C: - sendKeepalive(w, flusher) + if err := sendKeepalive(w, flusher); err != nil { + httputil.HandleError(w, err.Error(), http.StatusInternalServerError) + return + } case <-ctx.Done(): + httputil.HandleError(w, "context closed", http.StatusServiceUnavailable) return } } + } -func handleBlockOperationEvents(w http.ResponseWriter, flusher http.Flusher, requestedTopics map[string]bool, event *feed.Event) { +func handleBlockOperationEvents(w http.ResponseWriter, flusher http.Flusher, requestedTopics map[string]bool, event *feed.Event) error { switch event.Type { case operation.AggregatedAttReceived: if _, ok := requestedTopics[AttestationTopic]; !ok { - return + return nil } attData, ok := event.Data.(*operation.AggregatedAttReceivedData) if !ok { - write(w, flusher, topicDataMismatch, event.Data, AttestationTopic) - return + return write(w, flusher, topicDataMismatch, event.Data, AttestationTopic) + } att := structs.AttFromConsensus(attData.Attestation.Aggregate) - send(w, flusher, AttestationTopic, att) + return send(w, flusher, AttestationTopic, att) case operation.UnaggregatedAttReceived: if _, ok := requestedTopics[AttestationTopic]; !ok { - return + return nil } attData, ok := event.Data.(*operation.UnAggregatedAttReceivedData) if !ok { - write(w, flusher, topicDataMismatch, event.Data, AttestationTopic) - return + return write(w, flusher, topicDataMismatch, event.Data, AttestationTopic) + } att := structs.AttFromConsensus(attData.Attestation) - send(w, flusher, AttestationTopic, att) + return send(w, flusher, AttestationTopic, att) case operation.ExitReceived: if _, ok := requestedTopics[VoluntaryExitTopic]; !ok { - return + return nil } exitData, ok := event.Data.(*operation.ExitReceivedData) if !ok { - write(w, flusher, topicDataMismatch, event.Data, VoluntaryExitTopic) - return + return write(w, flusher, topicDataMismatch, event.Data, VoluntaryExitTopic) } exit := structs.SignedExitFromConsensus(exitData.Exit) - send(w, flusher, VoluntaryExitTopic, exit) + return send(w, flusher, VoluntaryExitTopic, exit) case operation.SyncCommitteeContributionReceived: if _, ok := requestedTopics[SyncCommitteeContributionTopic]; !ok { - return + return nil } contributionData, ok := event.Data.(*operation.SyncCommitteeContributionReceivedData) if !ok { - write(w, flusher, topicDataMismatch, event.Data, SyncCommitteeContributionTopic) - return + return write(w, flusher, topicDataMismatch, event.Data, SyncCommitteeContributionTopic) } contribution := structs.SignedContributionAndProofFromConsensus(contributionData.Contribution) - send(w, flusher, SyncCommitteeContributionTopic, contribution) + return send(w, flusher, SyncCommitteeContributionTopic, contribution) case operation.BLSToExecutionChangeReceived: if _, ok := requestedTopics[BLSToExecutionChangeTopic]; !ok { - return + return nil } changeData, ok := event.Data.(*operation.BLSToExecutionChangeReceivedData) if !ok { - write(w, flusher, topicDataMismatch, event.Data, BLSToExecutionChangeTopic) - return + return write(w, flusher, topicDataMismatch, event.Data, BLSToExecutionChangeTopic) } - send(w, flusher, BLSToExecutionChangeTopic, structs.SignedBLSChangeFromConsensus(changeData.Change)) + return send(w, flusher, BLSToExecutionChangeTopic, structs.SignedBLSChangeFromConsensus(changeData.Change)) case operation.BlobSidecarReceived: if _, ok := requestedTopics[BlobSidecarTopic]; !ok { - return + return nil } blobData, ok := event.Data.(*operation.BlobSidecarReceivedData) if !ok { - write(w, flusher, topicDataMismatch, event.Data, BlobSidecarTopic) - return + return write(w, flusher, topicDataMismatch, event.Data, BlobSidecarTopic) } versionedHash := blockchain.ConvertKzgCommitmentToVersionedHash(blobData.Blob.KzgCommitment) blobEvent := &structs.BlobSidecarEvent{ @@ -213,38 +224,36 @@ func handleBlockOperationEvents(w http.ResponseWriter, flusher http.Flusher, req VersionedHash: versionedHash.String(), KzgCommitment: hexutil.Encode(blobData.Blob.KzgCommitment), } - send(w, flusher, BlobSidecarTopic, blobEvent) + return send(w, flusher, BlobSidecarTopic, blobEvent) case operation.AttesterSlashingReceived: if _, ok := requestedTopics[AttesterSlashingTopic]; !ok { - return + return nil } attesterSlashingData, ok := event.Data.(*operation.AttesterSlashingReceivedData) if !ok { - write(w, flusher, topicDataMismatch, event.Data, AttesterSlashingTopic) - return + return write(w, flusher, topicDataMismatch, event.Data, AttesterSlashingTopic) } - send(w, flusher, AttesterSlashingTopic, structs.AttesterSlashingFromConsensus(attesterSlashingData.AttesterSlashing)) + return send(w, flusher, AttesterSlashingTopic, structs.AttesterSlashingFromConsensus(attesterSlashingData.AttesterSlashing)) case operation.ProposerSlashingReceived: if _, ok := requestedTopics[ProposerSlashingTopic]; !ok { - return + return nil } proposerSlashingData, ok := event.Data.(*operation.ProposerSlashingReceivedData) if !ok { - write(w, flusher, topicDataMismatch, event.Data, ProposerSlashingTopic) - return + return write(w, flusher, topicDataMismatch, event.Data, ProposerSlashingTopic) } - send(w, flusher, ProposerSlashingTopic, structs.ProposerSlashingFromConsensus(proposerSlashingData.ProposerSlashing)) + return send(w, flusher, ProposerSlashingTopic, structs.ProposerSlashingFromConsensus(proposerSlashingData.ProposerSlashing)) } + return nil } -func (s *Server) handleStateEvents(ctx context.Context, w http.ResponseWriter, flusher http.Flusher, requestedTopics map[string]bool, event *feed.Event) { +func (s *Server) handleStateEvents(ctx context.Context, w http.ResponseWriter, flusher http.Flusher, requestedTopics map[string]bool, event *feed.Event) error { switch event.Type { case statefeed.NewHead: if _, ok := requestedTopics[HeadTopic]; ok { headData, ok := event.Data.(*ethpb.EventHead) if !ok { - write(w, flusher, topicDataMismatch, event.Data, HeadTopic) - return + return write(w, flusher, topicDataMismatch, event.Data, HeadTopic) } head := &structs.HeadEvent{ Slot: fmt.Sprintf("%d", headData.Slot), @@ -255,23 +264,22 @@ func (s *Server) handleStateEvents(ctx context.Context, w http.ResponseWriter, f PreviousDutyDependentRoot: hexutil.Encode(headData.PreviousDutyDependentRoot), CurrentDutyDependentRoot: hexutil.Encode(headData.CurrentDutyDependentRoot), } - send(w, flusher, HeadTopic, head) + return send(w, flusher, HeadTopic, head) } if _, ok := requestedTopics[PayloadAttributesTopic]; ok { - s.sendPayloadAttributes(ctx, w, flusher) + return s.sendPayloadAttributes(ctx, w, flusher) } case statefeed.MissedSlot: if _, ok := requestedTopics[PayloadAttributesTopic]; ok { - s.sendPayloadAttributes(ctx, w, flusher) + return s.sendPayloadAttributes(ctx, w, flusher) } case statefeed.FinalizedCheckpoint: if _, ok := requestedTopics[FinalizedCheckpointTopic]; !ok { - return + return nil } checkpointData, ok := event.Data.(*ethpb.EventFinalizedCheckpoint) if !ok { - write(w, flusher, topicDataMismatch, event.Data, FinalizedCheckpointTopic) - return + return write(w, flusher, topicDataMismatch, event.Data, FinalizedCheckpointTopic) } checkpoint := &structs.FinalizedCheckpointEvent{ Block: hexutil.Encode(checkpointData.Block), @@ -279,15 +287,14 @@ func (s *Server) handleStateEvents(ctx context.Context, w http.ResponseWriter, f Epoch: fmt.Sprintf("%d", checkpointData.Epoch), ExecutionOptimistic: checkpointData.ExecutionOptimistic, } - send(w, flusher, FinalizedCheckpointTopic, checkpoint) + return send(w, flusher, FinalizedCheckpointTopic, checkpoint) case statefeed.LightClientFinalityUpdate: if _, ok := requestedTopics[LightClientFinalityUpdateTopic]; !ok { - return + return nil } updateData, ok := event.Data.(*ethpbv2.LightClientFinalityUpdateWithVersion) if !ok { - write(w, flusher, topicDataMismatch, event.Data, LightClientFinalityUpdateTopic) - return + return write(w, flusher, topicDataMismatch, event.Data, LightClientFinalityUpdateTopic) } var finalityBranch []string @@ -318,15 +325,14 @@ func (s *Server) handleStateEvents(ctx context.Context, w http.ResponseWriter, f SignatureSlot: fmt.Sprintf("%d", updateData.Data.SignatureSlot), }, } - send(w, flusher, LightClientFinalityUpdateTopic, update) + return send(w, flusher, LightClientFinalityUpdateTopic, update) case statefeed.LightClientOptimisticUpdate: if _, ok := requestedTopics[LightClientOptimisticUpdateTopic]; !ok { - return + return nil } updateData, ok := event.Data.(*ethpbv2.LightClientOptimisticUpdateWithVersion) if !ok { - write(w, flusher, topicDataMismatch, event.Data, LightClientOptimisticUpdateTopic) - return + return write(w, flusher, topicDataMismatch, event.Data, LightClientOptimisticUpdateTopic) } update := &structs.LightClientOptimisticUpdateEvent{ Version: version.String(int(updateData.Version)), @@ -345,15 +351,14 @@ func (s *Server) handleStateEvents(ctx context.Context, w http.ResponseWriter, f SignatureSlot: fmt.Sprintf("%d", updateData.Data.SignatureSlot), }, } - send(w, flusher, LightClientOptimisticUpdateTopic, update) + return send(w, flusher, LightClientOptimisticUpdateTopic, update) case statefeed.Reorg: if _, ok := requestedTopics[ChainReorgTopic]; !ok { - return + return nil } reorgData, ok := event.Data.(*ethpb.EventChainReorg) if !ok { - write(w, flusher, topicDataMismatch, event.Data, ChainReorgTopic) - return + return write(w, flusher, topicDataMismatch, event.Data, ChainReorgTopic) } reorg := &structs.ChainReorgEvent{ Slot: fmt.Sprintf("%d", reorgData.Slot), @@ -365,78 +370,69 @@ func (s *Server) handleStateEvents(ctx context.Context, w http.ResponseWriter, f Epoch: fmt.Sprintf("%d", reorgData.Epoch), ExecutionOptimistic: reorgData.ExecutionOptimistic, } - send(w, flusher, ChainReorgTopic, reorg) + return send(w, flusher, ChainReorgTopic, reorg) case statefeed.BlockProcessed: if _, ok := requestedTopics[BlockTopic]; !ok { - return + return nil } blkData, ok := event.Data.(*statefeed.BlockProcessedData) if !ok { - write(w, flusher, topicDataMismatch, event.Data, BlockTopic) - return + return write(w, flusher, topicDataMismatch, event.Data, BlockTopic) } blockRoot, err := blkData.SignedBlock.Block().HashTreeRoot() if err != nil { - write(w, flusher, "Could not get block root: "+err.Error()) - return + return write(w, flusher, "Could not get block root: "+err.Error()) } blk := &structs.BlockEvent{ Slot: fmt.Sprintf("%d", blkData.Slot), Block: hexutil.Encode(blockRoot[:]), ExecutionOptimistic: blkData.Optimistic, } - send(w, flusher, BlockTopic, blk) + return send(w, flusher, BlockTopic, blk) } + return nil } // This event stream is intended to be used by builders and relays. // Parent fields are based on state at N_{current_slot}, while the rest of fields are based on state of N_{current_slot + 1} -func (s *Server) sendPayloadAttributes(ctx context.Context, w http.ResponseWriter, flusher http.Flusher) { +func (s *Server) sendPayloadAttributes(ctx context.Context, w http.ResponseWriter, flusher http.Flusher) error { headRoot, err := s.HeadFetcher.HeadRoot(ctx) if err != nil { - write(w, flusher, "Could not get head root: "+err.Error()) - return + return write(w, flusher, "Could not get head root: "+err.Error()) } st, err := s.HeadFetcher.HeadState(ctx) if err != nil { - write(w, flusher, "Could not get head state: "+err.Error()) - return + return write(w, flusher, "Could not get head state: "+err.Error()) } // advance the head state headState, err := transition.ProcessSlotsIfPossible(ctx, st, s.ChainInfoFetcher.CurrentSlot()+1) if err != nil { - write(w, flusher, "Could not advance head state: "+err.Error()) - return + return write(w, flusher, "Could not advance head state: "+err.Error()) } headBlock, err := s.HeadFetcher.HeadBlock(ctx) if err != nil { - write(w, flusher, "Could not get head block: "+err.Error()) - return + return write(w, flusher, "Could not get head block: "+err.Error()) } headPayload, err := headBlock.Block().Body().Execution() if err != nil { - write(w, flusher, "Could not get execution payload: "+err.Error()) - return + return write(w, flusher, "Could not get execution payload: "+err.Error()) } t, err := slots.ToTime(headState.GenesisTime(), headState.Slot()) if err != nil { - write(w, flusher, "Could not get head state slot time: "+err.Error()) - return + return write(w, flusher, "Could not get head state slot time: "+err.Error()) } prevRando, err := helpers.RandaoMix(headState, time.CurrentEpoch(headState)) if err != nil { - write(w, flusher, "Could not get head state randao mix: "+err.Error()) - return + return write(w, flusher, "Could not get head state randao mix: "+err.Error()) } proposerIndex, err := helpers.BeaconProposerIndex(ctx, headState) if err != nil { - write(w, flusher, "Could not get head state proposer index: "+err.Error()) - return + return write(w, flusher, "Could not get head state proposer index: "+err.Error()) } var attributes interface{} @@ -450,8 +446,7 @@ func (s *Server) sendPayloadAttributes(ctx context.Context, w http.ResponseWrite case version.Capella: withdrawals, err := headState.ExpectedWithdrawals() if err != nil { - write(w, flusher, "Could not get head state expected withdrawals: "+err.Error()) - return + return write(w, flusher, "Could not get head state expected withdrawals: "+err.Error()) } attributes = &structs.PayloadAttributesV2{ Timestamp: fmt.Sprintf("%d", t.Unix()), @@ -462,13 +457,11 @@ func (s *Server) sendPayloadAttributes(ctx context.Context, w http.ResponseWrite case version.Deneb: withdrawals, err := headState.ExpectedWithdrawals() if err != nil { - write(w, flusher, "Could not get head state expected withdrawals: "+err.Error()) - return + return write(w, flusher, "Could not get head state expected withdrawals: "+err.Error()) } parentRoot, err := headBlock.Block().HashTreeRoot() if err != nil { - write(w, flusher, "Could not get head block root: "+err.Error()) - return + return write(w, flusher, "Could not get head block root: "+err.Error()) } attributes = &structs.PayloadAttributesV3{ Timestamp: fmt.Sprintf("%d", t.Unix()), @@ -478,14 +471,12 @@ func (s *Server) sendPayloadAttributes(ctx context.Context, w http.ResponseWrite ParentBeaconBlockRoot: hexutil.Encode(parentRoot[:]), } default: - write(w, flusher, "Payload version %s is not supported", version.String(headState.Version())) - return + return write(w, flusher, "Payload version %s is not supported", version.String(headState.Version())) } attributesBytes, err := json.Marshal(attributes) if err != nil { - write(w, flusher, err.Error()) - return + return write(w, flusher, err.Error()) } eventData := structs.PayloadAttributesEventData{ ProposerIndex: fmt.Sprintf("%d", proposerIndex), @@ -497,32 +488,31 @@ func (s *Server) sendPayloadAttributes(ctx context.Context, w http.ResponseWrite } eventDataBytes, err := json.Marshal(eventData) if err != nil { - write(w, flusher, err.Error()) - return + return write(w, flusher, err.Error()) } - send(w, flusher, PayloadAttributesTopic, &structs.PayloadAttributesEvent{ + return send(w, flusher, PayloadAttributesTopic, &structs.PayloadAttributesEvent{ Version: version.String(headState.Version()), Data: eventDataBytes, }) } -func send(w http.ResponseWriter, flusher http.Flusher, name string, data interface{}) { +func send(w http.ResponseWriter, flusher http.Flusher, name string, data interface{}) error { j, err := json.Marshal(data) if err != nil { - write(w, flusher, "Could not marshal event to JSON: "+err.Error()) - return + return write(w, flusher, "Could not marshal event to JSON: "+err.Error()) } - write(w, flusher, "event: %s\ndata: %s\n\n", name, string(j)) + return write(w, flusher, "event: %s\ndata: %s\n\n", name, string(j)) } -func sendKeepalive(w http.ResponseWriter, flusher http.Flusher) { - write(w, flusher, ":\n\n") +func sendKeepalive(w http.ResponseWriter, flusher http.Flusher) error { + return write(w, flusher, ":\n\n") } -func write(w http.ResponseWriter, flusher http.Flusher, format string, a ...any) { +func write(w http.ResponseWriter, flusher http.Flusher, format string, a ...any) error { _, err := fmt.Fprintf(w, format, a...) if err != nil { - log.WithError(err).Error("Could not write to response writer") + return errors.Wrap(err, "Could not write to response writer") } flusher.Flush() + return nil } diff --git a/beacon-chain/rpc/prysm/v1alpha1/node/server.go b/beacon-chain/rpc/prysm/v1alpha1/node/server.go index 67c3b863f8c1..26ba15bf8a2e 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/node/server.go +++ b/beacon-chain/rpc/prysm/v1alpha1/node/server.go @@ -55,9 +55,9 @@ func (ns *Server) GetHealth(ctx context.Context, request *ethpb.HealthRequest) ( defer span.End() // Set a timeout for the health check operation - timeoutDuration := 10 * time.Second - ctx, cancel := context.WithTimeout(ctx, timeoutDuration) - defer cancel() // Important to avoid a context leak + //timeoutDuration := 10 * time.Second + //ctx, cancel := context.WithTimeout(ctx, timeoutDuration) + //defer cancel() // Important to avoid a context leak if ns.SyncChecker.Synced() { return &empty.Empty{}, nil diff --git a/validator/client/beacon-api/beacon_api_validator_client.go b/validator/client/beacon-api/beacon_api_validator_client.go index 12463e05e5d3..6c71eac0efbd 100644 --- a/validator/client/beacon-api/beacon_api_validator_client.go +++ b/validator/client/beacon-api/beacon_api_validator_client.go @@ -2,6 +2,7 @@ package beacon_api import ( "context" + "net/http" "time" "github.com/ethereum/go-ethereum/common/hexutil" @@ -191,7 +192,7 @@ func (c *beaconApiValidatorClient) WaitForChainStart(ctx context.Context, _ *emp } func (c *beaconApiValidatorClient) StartEventStream(ctx context.Context, topics []string, eventsChannel chan<- *event.Event) { - eventStream, err := event.NewEventStream(ctx, c.jsonRestHandler.HttpClient(), c.jsonRestHandler.Host(), topics) + eventStream, err := event.NewEventStream(ctx, &http.Client{}, c.jsonRestHandler.Host(), topics) // event stream should not be subject to the same settings as other api calls if err != nil { eventsChannel <- &event.Event{ EventType: event.EventError, From 1c0d421e147d87899b40ddfb557862efac11c428 Mon Sep 17 00:00:00 2001 From: james-prysm Date: Tue, 19 Mar 2024 15:13:17 -0500 Subject: [PATCH 2/8] reverting timeout on get health --- beacon-chain/rpc/prysm/v1alpha1/node/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/beacon-chain/rpc/prysm/v1alpha1/node/server.go b/beacon-chain/rpc/prysm/v1alpha1/node/server.go index 26ba15bf8a2e..67c3b863f8c1 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/node/server.go +++ b/beacon-chain/rpc/prysm/v1alpha1/node/server.go @@ -55,9 +55,9 @@ func (ns *Server) GetHealth(ctx context.Context, request *ethpb.HealthRequest) ( defer span.End() // Set a timeout for the health check operation - //timeoutDuration := 10 * time.Second - //ctx, cancel := context.WithTimeout(ctx, timeoutDuration) - //defer cancel() // Important to avoid a context leak + timeoutDuration := 10 * time.Second + ctx, cancel := context.WithTimeout(ctx, timeoutDuration) + defer cancel() // Important to avoid a context leak if ns.SyncChecker.Synced() { return &empty.Empty{}, nil From e2c577f0ed76d631cf24de0eca235a0170df7e1b Mon Sep 17 00:00:00 2001 From: james-prysm Date: Tue, 19 Mar 2024 15:50:09 -0500 Subject: [PATCH 3/8] fixing linting --- beacon-chain/rpc/eth/events/events.go | 1 - 1 file changed, 1 deletion(-) diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index 3fbc5120bd6a..2f5224604fac 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -152,7 +152,6 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) { return } } - } func handleBlockOperationEvents(w http.ResponseWriter, flusher http.Flusher, requestedTopics map[string]bool, event *feed.Event) error { From 58d1ec6f321af14fadbf9f87e2ff2d721bf73221 Mon Sep 17 00:00:00 2001 From: james-prysm Date: Tue, 19 Mar 2024 16:00:03 -0500 Subject: [PATCH 4/8] fixing more linting --- beacon-chain/rpc/eth/events/events.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index 2f5224604fac..7b6da4c1b133 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -163,7 +163,6 @@ func handleBlockOperationEvents(w http.ResponseWriter, flusher http.Flusher, req attData, ok := event.Data.(*operation.AggregatedAttReceivedData) if !ok { return write(w, flusher, topicDataMismatch, event.Data, AttestationTopic) - } att := structs.AttFromConsensus(attData.Attestation.Aggregate) return send(w, flusher, AttestationTopic, att) @@ -174,7 +173,6 @@ func handleBlockOperationEvents(w http.ResponseWriter, flusher http.Flusher, req attData, ok := event.Data.(*operation.UnAggregatedAttReceivedData) if !ok { return write(w, flusher, topicDataMismatch, event.Data, AttestationTopic) - } att := structs.AttFromConsensus(attData.Attestation) return send(w, flusher, AttestationTopic, att) From 9c70989a1b3ab37a1003f27b53e40287b7529883 Mon Sep 17 00:00:00 2001 From: james-prysm <90280386+james-prysm@users.noreply.github.com> Date: Tue, 19 Mar 2024 17:02:40 -0500 Subject: [PATCH 5/8] Update validator/client/beacon-api/beacon_api_validator_client.go Co-authored-by: Preston Van Loon --- validator/client/beacon-api/beacon_api_validator_client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/validator/client/beacon-api/beacon_api_validator_client.go b/validator/client/beacon-api/beacon_api_validator_client.go index 570f10187a79..9fa6279ebb10 100644 --- a/validator/client/beacon-api/beacon_api_validator_client.go +++ b/validator/client/beacon-api/beacon_api_validator_client.go @@ -193,7 +193,8 @@ func (c *beaconApiValidatorClient) WaitForChainStart(ctx context.Context, _ *emp } func (c *beaconApiValidatorClient) StartEventStream(ctx context.Context, topics []string, eventsChannel chan<- *event.Event) { - eventStream, err := event.NewEventStream(ctx, &http.Client{}, c.jsonRestHandler.Host(), topics) // event stream should not be subject to the same settings as other api calls + client := &http.Client{} // event stream should not be subject to the same settings as other api calls, so we won't use c.jsonRestHandler.HttpClient() + eventStream, err := event.NewEventStream(ctx, client, c.jsonRestHandler.Host(), topics) if err != nil { eventsChannel <- &event.Event{ EventType: event.EventError, From 28e00c7021d63b8dcdae28875bc5ab57945e193c Mon Sep 17 00:00:00 2001 From: james-prysm <90280386+james-prysm@users.noreply.github.com> Date: Tue, 19 Mar 2024 17:02:46 -0500 Subject: [PATCH 6/8] Update beacon-chain/rpc/eth/events/events.go Co-authored-by: Preston Van Loon --- beacon-chain/rpc/eth/events/events.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index 7b6da4c1b133..d03fa1ebd5a2 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -508,7 +508,7 @@ func sendKeepalive(w http.ResponseWriter, flusher http.Flusher) error { func write(w http.ResponseWriter, flusher http.Flusher, format string, a ...any) error { _, err := fmt.Fprintf(w, format, a...) if err != nil { - return errors.Wrap(err, "Could not write to response writer") + return errors.Wrap(err, "could not write to response writer") } flusher.Flush() return nil From 1d2690506dd147271747688d6bbaeb790e31a04c Mon Sep 17 00:00:00 2001 From: james-prysm Date: Tue, 19 Mar 2024 17:04:44 -0500 Subject: [PATCH 7/8] reverting change and removing line on context done which creates a superfluous response.WriteHeader error --- api/client/event/event_stream.go | 5 +---- beacon-chain/rpc/eth/events/events.go | 1 - 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/api/client/event/event_stream.go b/api/client/event/event_stream.go index 2759bc06d3e1..48a1951b2b90 100644 --- a/api/client/event/event_stream.go +++ b/api/client/event/event_stream.go @@ -106,10 +106,7 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) { var eventType, data string // Variables to store event type and data // Iterate over lines of the event stream - for { - if ok := scanner.Scan(); !ok { - break - } + for scanner.Scan() { select { case <-h.ctx.Done(): log.Info("Context canceled, stopping event stream") diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index d03fa1ebd5a2..9382766d44ad 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -148,7 +148,6 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) { return } case <-ctx.Done(): - httputil.HandleError(w, "context closed", http.StatusServiceUnavailable) return } } From 2007c6b5ea0b46a5dd6d1b8163fca435dba5e164 Mon Sep 17 00:00:00 2001 From: james-prysm Date: Tue, 19 Mar 2024 17:11:34 -0500 Subject: [PATCH 8/8] gofmt --- validator/client/beacon-api/beacon_api_validator_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator/client/beacon-api/beacon_api_validator_client.go b/validator/client/beacon-api/beacon_api_validator_client.go index 9fa6279ebb10..115053c5623d 100644 --- a/validator/client/beacon-api/beacon_api_validator_client.go +++ b/validator/client/beacon-api/beacon_api_validator_client.go @@ -194,7 +194,7 @@ func (c *beaconApiValidatorClient) WaitForChainStart(ctx context.Context, _ *emp func (c *beaconApiValidatorClient) StartEventStream(ctx context.Context, topics []string, eventsChannel chan<- *event.Event) { client := &http.Client{} // event stream should not be subject to the same settings as other api calls, so we won't use c.jsonRestHandler.HttpClient() - eventStream, err := event.NewEventStream(ctx, client, c.jsonRestHandler.Host(), topics) + eventStream, err := event.NewEventStream(ctx, client, c.jsonRestHandler.Host(), topics) if err != nil { eventsChannel <- &event.Event{ EventType: event.EventError,