diff --git a/proxy/receiver_api.go b/proxy/receiver_api.go index fc46a2a..8286028 100644 --- a/proxy/receiver_api.go +++ b/proxy/receiver_api.go @@ -33,6 +33,8 @@ var ( errUUIDParse = errors.New("failed to parse UUID") apiNow = time.Now + + handleParsedRequestTimeout = time.Second * 1 ) func (prx *ReceiverProxy) PublicJSONRPCHandler(maxRequestBodySizeBytes int64) (*rpcserver.JSONRPCHandler, error) { @@ -79,12 +81,15 @@ func (prx *ReceiverProxy) ValidateSigner(ctx context.Context, req *ParsedRequest return nil } + prx.Log.Debug("Received signed request on a public endpoint", slog.Any("signer", req.signer)) + if req.signer == prx.FlashbotsSignerAddress { req.peerName = FlashbotsPeerName return nil } prx.peersMu.RLock() + defer prx.peersMu.RUnlock() found := false peerName := "" for _, peer := range prx.lastFetchedPeers { @@ -97,7 +102,6 @@ func (prx *ReceiverProxy) ValidateSigner(ctx context.Context, req *ParsedRequest if !found { return errUnknownPeer } - prx.peersMu.RUnlock() req.peerName = peerName return nil } @@ -304,6 +308,9 @@ type ParsedRequest struct { } func (prx *ReceiverProxy) HandleParsedRequest(ctx context.Context, parsedRequest ParsedRequest) error { + ctx, cancel := context.WithTimeout(ctx, handleParsedRequestTimeout) + defer cancel() + parsedRequest.receivedAt = apiNow() prx.Log.Debug("Received request", slog.Bool("isPublicEndpoint", parsedRequest.publicEndpoint), slog.String("method", parsedRequest.method)) if parsedRequest.publicEndpoint { @@ -325,11 +332,13 @@ func (prx *ReceiverProxy) HandleParsedRequest(ctx context.Context, parsedRequest } select { case <-ctx.Done(): + prx.Log.Error("Shared queue is stalling") case prx.shareQueue <- &parsedRequest: } if !parsedRequest.publicEndpoint { select { case <-ctx.Done(): + prx.Log.Error("Archive queue is stalling") case prx.archiveQueue <- &parsedRequest: } } diff --git a/proxy/servers.go b/proxy/receiver_servers.go similarity index 100% rename from proxy/servers.go rename to proxy/receiver_servers.go