Skip to content

Commit

Permalink
Redo
Browse files Browse the repository at this point in the history
  • Loading branch information
saolyn committed Jan 23, 2024
1 parent 206207b commit 5437c44
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 88 deletions.
1 change: 0 additions & 1 deletion validator/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ go_library(
"log.go",
"metrics.go",
"multiple_endpoints_grpc_resolver.go",
"multiple_endpoints_http_resolver.go",
"propose.go",
"propose_protect.go",
"registration.go",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,14 @@ func (c beaconApiBeaconChainClient) GetValidatorParticipation(ctx context.Contex
panic("beaconApiBeaconChainClient.GetValidatorParticipation is not implemented. To use a fallback client, pass a fallback client as the last argument of NewBeaconApiBeaconChainClientWithFallback.")
}

func (c beaconApiBeaconChainClient) MultipleEndpointResolver(ctx context.Context, beaconApiUrls []string) {
c.jsonRestHandler.SwitchBeaconEndpoint(ctx, beaconApiUrls)
}

func NewBeaconApiBeaconChainClientWithFallback(host string, timeout time.Duration, fallbackClient iface.BeaconChainClient) iface.BeaconChainClient {
jsonRestHandler := beaconApiJsonRestHandler{
httpClient: http.Client{Timeout: timeout},
host: host,
host: func() string { return host },
}

return &beaconApiBeaconChainClient{
Expand Down
2 changes: 1 addition & 1 deletion validator/client/beacon-api/beacon_api_node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (c *beaconApiNodeClient) ListPeers(ctx context.Context, in *empty.Empty) (*
func NewNodeClientWithFallback(host string, timeout time.Duration, fallbackClient iface.NodeClient) iface.NodeClient {
jsonRestHandler := beaconApiJsonRestHandler{
httpClient: http.Client{Timeout: timeout},
host: host,
host: func() string { return host },
}

return &beaconApiNodeClient{
Expand Down
2 changes: 1 addition & 1 deletion validator/client/beacon-api/beacon_api_validator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type beaconApiValidatorClient struct {
func NewBeaconApiValidatorClient(host string, timeout time.Duration) iface.ValidatorClient {
jsonRestHandler := beaconApiJsonRestHandler{
httpClient: http.Client{Timeout: timeout},
host: host,
host: func() string { return host },
}

return &beaconApiValidatorClient{
Expand Down
40 changes: 36 additions & 4 deletions validator/client/beacon-api/json_rest_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"io"
"net/http"
"time"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/api"
Expand All @@ -15,11 +16,12 @@ import (
type JsonRestHandler interface {
Get(ctx context.Context, query string, resp interface{}) error
Post(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer, resp interface{}) error
SwitchBeaconEndpoint(ctx context.Context, beaconApiUrls []string)
}

type beaconApiJsonRestHandler struct {
httpClient http.Client
host string
host func() string
}

// Get sends a GET request and decodes the response body as a JSON object into the passed in object.
Expand All @@ -29,7 +31,7 @@ func (c beaconApiJsonRestHandler) Get(ctx context.Context, endpoint string, resp
return errors.New("resp is nil")
}

url := c.host + endpoint
url := c.host() + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return errors.Wrapf(err, "failed to create request for endpoint %s", url)
Expand Down Expand Up @@ -60,8 +62,7 @@ func (c beaconApiJsonRestHandler) Post(
if data == nil {
return errors.New("data is nil")
}

url := c.host + apiEndpoint
url := c.host() + apiEndpoint
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, data)
if err != nil {
return errors.Wrapf(err, "failed to create request for endpoint %s", url)
Expand Down Expand Up @@ -115,3 +116,34 @@ func decodeResp(httpResp *http.Response, resp interface{}) error {

return nil
}

// SwitchBeaconEndpoint switches to the next available endpoint, this is circular.
func (c beaconApiJsonRestHandler) SwitchBeaconEndpoint(ctx context.Context, beaconApiUrls []string) {
const endpoint = "/eth/v1/node/health"
ticker := time.NewTicker(5 * time.Second) // Check every 5 seconds
go func() {
for {
select {
case <-ticker.C:
// GET request to the health endpoint using the current host
err := c.Get(ctx, endpoint, nil)
if err != nil {
for i, url := range beaconApiUrls {
if url == c.host() {
next := (i + 1) % len(beaconApiUrls)
c.changeHost(beaconApiUrls[next])
break
}
}
}
}
}
}()
defer ticker.Stop()
select {}
}

// changeHost updates the host function in beaconApiJsonRestHandler
func (c beaconApiJsonRestHandler) changeHost(newHost string) {
c.host = func() string { return newHost }
}
4 changes: 2 additions & 2 deletions validator/client/beacon-api/json_rest_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestGet(t *testing.T) {

jsonRestHandler := beaconApiJsonRestHandler{
httpClient: http.Client{Timeout: time.Second * 5},
host: server.URL,
host: func() string { return server.URL },
}
resp := &beacon.GetGenesisResponse{}
require.NoError(t, jsonRestHandler.Get(ctx, endpoint+"?arg1=abc&arg2=def", resp))
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestPost(t *testing.T) {

jsonRestHandler := beaconApiJsonRestHandler{
httpClient: http.Client{Timeout: time.Second * 5},
host: server.URL,
host: func() string { return server.URL },
}
resp := &beacon.GetGenesisResponse{}
require.NoError(t, jsonRestHandler.Post(ctx, endpoint, headers, bytes.NewBuffer(dataBytes), resp))
Expand Down
2 changes: 1 addition & 1 deletion validator/client/beacon-api/prysm_beacon_chain_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
func NewPrysmBeaconChainClient(host string, timeout time.Duration, nodeClient iface.NodeClient) iface.PrysmBeaconChainClient {
jsonRestHandler := beaconApiJsonRestHandler{
httpClient: http.Client{Timeout: timeout},
host: host,
host: func() string { return host },
}

return prysmBeaconChainClient{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package validator_client_factory

import (
"context"
"strings"

"github.com/prysmaticlabs/prysm/v4/config/features"
beaconApi "github.com/prysmaticlabs/prysm/v4/validator/client/beacon-api"
grpcApi "github.com/prysmaticlabs/prysm/v4/validator/client/grpc-api"
Expand All @@ -9,19 +12,21 @@ import (
validatorHelpers "github.com/prysmaticlabs/prysm/v4/validator/helpers"
)

func NewBeaconChainClient(validatorConn validatorHelpers.NodeConnection) iface.BeaconChainClient {
func NewBeaconChainClient(ctx context.Context, validatorConn validatorHelpers.NodeConnection) iface.BeaconChainClient {
grpcClient := grpcApi.NewGrpcBeaconChainClient(validatorConn.GetGrpcClientConn())
featureFlags := features.Get()

if featureFlags.EnableBeaconRESTApi {
return beaconApi.NewBeaconApiBeaconChainClientWithFallback(
validatorConn.GetBeaconApiUrl(),
urls := strings.Split(validatorConn.GetBeaconApiUrl(), ",")
bc := beaconApi.NewBeaconApiBeaconChainClientWithFallback(
urls[0],
validatorConn.GetBeaconApiTimeout(),
grpcClient,
)
} else {
return grpcClient
bc.MultipleEndpointResolver(ctx)
return bc
}
return grpcClient
}

func NewPrysmBeaconClient(validatorConn validatorHelpers.NodeConnection) iface.PrysmBeaconChainClient {
Expand Down
1 change: 1 addition & 0 deletions validator/client/iface/beacon_chain_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ type BeaconChainClient interface {
GetValidatorQueue(ctx context.Context, in *empty.Empty) (*ethpb.ValidatorQueue, error)
GetValidatorPerformance(ctx context.Context, in *ethpb.ValidatorPerformanceRequest) (*ethpb.ValidatorPerformanceResponse, error)
GetValidatorParticipation(ctx context.Context, in *ethpb.GetValidatorParticipationRequest) (*ethpb.ValidatorParticipationResponse, error)
MultipleEndpointResolver(ctx context.Context, beaconApiUrls []string)
}
70 changes: 0 additions & 70 deletions validator/client/multiple_endpoints_http_resolver.go

This file was deleted.

2 changes: 1 addition & 1 deletion validator/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (v *ValidatorService) Start() {
}

validatorClient := validatorClientFactory.NewValidatorClient(v.conn)
beaconClient := beaconChainClientFactory.NewBeaconChainClient(v.conn)
beaconClient := beaconChainClientFactory.NewBeaconChainClient(v.ctx, v.conn)
prysmBeaconClient := beaconChainClientFactory.NewPrysmBeaconClient(v.conn)

valStruct := &validator{
Expand Down
2 changes: 1 addition & 1 deletion validator/rpc/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *Server) registerBeaconClient() error {
s.beaconApiTimeout,
)

s.beaconChainClient = beaconChainClientFactory.NewBeaconChainClient(conn)
s.beaconChainClient = beaconChainClientFactory.NewBeaconChainClient(s.ctx, conn)
s.beaconNodeClient = nodeClientFactory.NewNodeClient(conn)
s.beaconNodeValidatorClient = validatorClientFactory.NewValidatorClient(conn)
return nil
Expand Down

0 comments on commit 5437c44

Please sign in to comment.