Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
julienrbrt committed Sep 27, 2024
1 parent 907123e commit c80099b
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 94 deletions.
26 changes: 9 additions & 17 deletions server/v2/api/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,24 +172,12 @@ func (s *Server[T]) Start(ctx context.Context) error {
return fmt.Errorf("failed to listen on address %s: %w", s.config.Address, err)
}

errCh := make(chan error)

// Start the gRPC in an external goroutine as Serve is blocking and will return
// an error upon failure, which we'll send on the error channel that will be
// consumed by the for block below.
go func() {
s.logger.Info("starting gRPC server...", "address", s.config.Address)
errCh <- s.grpcSrv.Serve(listener)
}()

// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
err = <-errCh
if err != nil {
s.logger.Error("failed to start gRPC server", "err", err)
s.logger.Info("starting gRPC server...", "address", s.config.Address)
if err := s.grpcSrv.Serve(listener); err != nil {
return fmt.Errorf("failed to start gRPC server: %w", err)
}

return err
return nil
}

func (s *Server[T]) Stop(ctx context.Context) error {
Expand All @@ -199,6 +187,10 @@ func (s *Server[T]) Stop(ctx context.Context) error {

s.logger.Info("stopping gRPC server...", "address", s.config.Address)
s.grpcSrv.GracefulStop()

return nil
}

// GetGRPCServer returns the underlying gRPC server.
func (s *Server[T]) GetGRPCServer() *grpc.Server {
return s.grpcSrv
}
6 changes: 5 additions & 1 deletion server/v2/api/grpcgateway/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package grpcgateway

func DefaultConfig() *Config {
return &Config{
Enable: true,
Enable: true,
Address: "localhost:1317",
}
}

type Config struct {
// Enable defines if the gRPC-gateway should be enabled.
Enable bool `mapstructure:"enable" toml:"enable" comment:"Enable defines if the gRPC-gateway should be enabled."`

// Address defines the address the gRPC-gateway server binds to.
Address string `mapstructure:"address" toml:"address" comment:"Address defines the address the gRPC-gateway server binds to."`
}

type CfgOption func(*Config)
Expand Down
74 changes: 36 additions & 38 deletions server/v2/api/grpcgateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

gateway "github.com/cosmos/gogogateway"
"github.com/cosmos/gogoproto/jsonpb"
"github.com/gorilla/mux"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"

Expand All @@ -18,28 +17,24 @@ import (
)

var (
_ serverv2.ServerComponent[transaction.Tx] = (*GRPCGatewayServer[transaction.Tx])(nil)
_ serverv2.HasConfig = (*GRPCGatewayServer[transaction.Tx])(nil)
_ serverv2.ServerComponent[transaction.Tx] = (*Server[transaction.Tx])(nil)
_ serverv2.HasConfig = (*Server[transaction.Tx])(nil)
)

const (
ServerName = "grpc-gateway"
const ServerName = "grpc-gateway"

// GRPCBlockHeightHeader is the gRPC header for block height.
GRPCBlockHeightHeader = "x-cosmos-block-height"
)

type GRPCGatewayServer[T transaction.Tx] struct {
type Server[T transaction.Tx] struct {
logger log.Logger
config *Config
cfgOptions []CfgOption

GRPCSrv *grpc.Server
GRPCGatewayRouter *runtime.ServeMux
server *http.Server
gRPCSrv *grpc.Server
gRPCGatewayRouter *runtime.ServeMux
}

// New creates a new gRPC-gateway server.
func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptions ...CfgOption) *GRPCGatewayServer[T] {
func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptions ...CfgOption) *Server[T] {
// The default JSON marshaller used by the gRPC-Gateway is unable to marshal non-nullable non-scalar fields.
// Using the gogo/gateway package with the gRPC-Gateway WithMarshaler option fixes the scalar field marshaling issue.
marshalerOption := &gateway.JSONPb{
Expand All @@ -49,9 +44,9 @@ func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptio
AnyResolver: ir,
}

return &GRPCGatewayServer[T]{
GRPCSrv: grpcSrv,
GRPCGatewayRouter: runtime.NewServeMux(
return &Server[T]{
gRPCSrv: grpcSrv,
gRPCGatewayRouter: runtime.NewServeMux(
// Custom marshaler option is required for gogo proto
runtime.WithMarshalerOption(runtime.MIMEWildcard, marshalerOption),

Expand All @@ -67,12 +62,12 @@ func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptio
}
}

func (g *GRPCGatewayServer[T]) Name() string {
func (s *Server[T]) Name() string {
return ServerName
}

func (s *GRPCGatewayServer[T]) Config() any {
if s.config == nil {
func (s *Server[T]) Config() any {
if s.config == nil || s.config.Address == "" {
cfg := DefaultConfig()
// overwrite the default config with the provided options
for _, opt := range s.cfgOptions {
Expand All @@ -85,51 +80,51 @@ func (s *GRPCGatewayServer[T]) Config() any {
return s.config
}

func (s *GRPCGatewayServer[T]) Init(appI serverv2.AppI[transaction.Tx], cfg map[string]any, logger log.Logger) error {
func (s *Server[T]) Init(appI serverv2.AppI[transaction.Tx], cfg map[string]any, logger log.Logger) error {
serverCfg := s.Config().(*Config)
if len(cfg) > 0 {
if err := serverv2.UnmarshalSubConfig(cfg, s.Name(), &serverCfg); err != nil {
return fmt.Errorf("failed to unmarshal config: %w", err)
}
}

// Register the gRPC-Gateway server.
// appI.RegisterGRPCGatewayRoutes(s.GRPCGatewayRouter, s.GRPCSrv)
// TODO: register the gRPC-Gateway routes

s.logger = logger
s.logger = logger.With(log.ModuleKey, s.Name())
s.config = serverCfg

return nil
}

func (s *GRPCGatewayServer[T]) Start(ctx context.Context) error {
func (s *Server[T]) Start(ctx context.Context) error {
if !s.config.Enable {
s.logger.Info(fmt.Sprintf("%s server is disabled via config", s.Name()))
return nil
}

// TODO start a normal Go http server (and do not leverage comet's like https://github.com/cosmos/cosmos-sdk/blob/9df6019de6ee7999fe9864bac836deb2f36dd44a/server/api/server.go#L98)
mux := http.NewServeMux()
mux.Handle("/", s.gRPCGatewayRouter)

return nil
}
s.server = &http.Server{
Addr: s.config.Address,
Handler: mux,
}

func (s *GRPCGatewayServer[T]) Stop(ctx context.Context) error {
if !s.config.Enable {
return nil
s.logger.Info("starting gRPC-Gateway server...", "address", s.config.Address)
if err := s.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
return fmt.Errorf("failed to start gRPC-Gateway server: %w", err)
}

return nil
}

// Register implements registers a grpc-gateway server
func (s *GRPCGatewayServer[T]) Register(r mux.Router) error {
// configure grpc-gatway server
r.PathPrefix("/").Handler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
// Fall back to grpc gateway server.
s.GRPCGatewayRouter.ServeHTTP(w, req)
}))
func (s *Server[T]) Stop(ctx context.Context) error {
if !s.config.Enable {
return nil
}

return nil
s.logger.Info("stopping gRPC-Gateway server...", "address", s.config.Address)
return s.server.Shutdown(ctx)
}

// CustomGRPCHeaderMatcher for mapping request headers to
Expand All @@ -138,6 +133,9 @@ func (s *GRPCGatewayServer[T]) Register(r mux.Router) error {
// gRPC metadata after removing prefix 'Grpc-Metadata-'. We can use this
// CustomGRPCHeaderMatcher if headers don't start with `Grpc-Metadata-`
func CustomGRPCHeaderMatcher(key string) (string, bool) {
// GRPCBlockHeightHeader is the gRPC header for block height.
const GRPCBlockHeightHeader = "x-cosmos-block-height"

switch strings.ToLower(key) {
case GRPCBlockHeightHeader:
return GRPCBlockHeightHeader, true
Expand Down
2 changes: 1 addition & 1 deletion server/v2/api/telemetry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package telemetry
func DefaultConfig() *Config {
return &Config{
Enable: true,
Address: "localhost:1338",
Address: "localhost:1318",
ServiceName: "",
EnableHostname: false,
EnableHostnameLabel: false,
Expand Down
26 changes: 14 additions & 12 deletions server/v2/api/telemetry/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,30 @@ import (
)

var (
_ serverv2.ServerComponent[transaction.Tx] = (*TelemetryServer[transaction.Tx])(nil)
_ serverv2.HasConfig = (*TelemetryServer[transaction.Tx])(nil)
_ serverv2.ServerComponent[transaction.Tx] = (*Server[transaction.Tx])(nil)
_ serverv2.HasConfig = (*Server[transaction.Tx])(nil)
)

const ServerName = "telemetry"

type TelemetryServer[T transaction.Tx] struct {
type Server[T transaction.Tx] struct {
config *Config
logger log.Logger
server *http.Server
metrics *Metrics
}

// New creates a new telemetry server.
func New[T transaction.Tx]() *TelemetryServer[T] {
return &TelemetryServer[T]{}
func New[T transaction.Tx]() *Server[T] {
return &Server[T]{}
}

// Name returns the server name.
func (s *TelemetryServer[T]) Name() string {
func (s *Server[T]) Name() string {
return ServerName
}

func (s *TelemetryServer[T]) Config() any {
func (s *Server[T]) Config() any {
if s.config == nil || s.config.Address == "" {
return DefaultConfig()
}
Expand All @@ -45,15 +45,15 @@ func (s *TelemetryServer[T]) Config() any {
}

// Init implements serverv2.ServerComponent.
func (s *TelemetryServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logger log.Logger) error {
func (s *Server[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logger log.Logger) error {
serverCfg := s.Config().(*Config)
if len(cfg) > 0 {
if err := serverv2.UnmarshalSubConfig(cfg, s.Name(), &serverCfg); err != nil {
return fmt.Errorf("failed to unmarshal config: %w", err)
}
}
s.config = serverCfg
s.logger = logger
s.logger = logger.With(log.ModuleKey, s.Name())

metrics, err := NewMetrics(s.config)
if err != nil {
Expand All @@ -64,7 +64,7 @@ func (s *TelemetryServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, log
return nil
}

func (s *TelemetryServer[T]) Start(ctx context.Context) error {
func (s *Server[T]) Start(ctx context.Context) error {
if !s.config.Enable {
s.logger.Info(fmt.Sprintf("%s server is disabled via config", s.Name()))
return nil
Expand All @@ -82,22 +82,24 @@ func (s *TelemetryServer[T]) Start(ctx context.Context) error {
Handler: mux,
}

s.logger.Info("starting telemetry server...", "address", s.config.Address)
if err := s.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
return fmt.Errorf("failed to start telemetry server: %w", err)
}

return nil
}

func (s *TelemetryServer[T]) Stop(ctx context.Context) error {
func (s *Server[T]) Stop(ctx context.Context) error {
if !s.config.Enable || s.server == nil {
return nil
}

s.logger.Info("stopping telemetry server...", "address", s.config.Address)
return s.server.Shutdown(ctx)
}

func (s *TelemetryServer[T]) metricsHandler(w http.ResponseWriter, r *http.Request) {
func (s *Server[T]) metricsHandler(w http.ResponseWriter, r *http.Request) {
format := strings.TrimSpace(r.FormValue("format"))

// errorResponse defines the attributes of a JSON error response.
Expand Down
1 change: 0 additions & 1 deletion server/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ require (
github.com/cosmos/gogogateway v1.2.0
github.com/cosmos/gogoproto v1.7.0
github.com/golang/protobuf v1.5.4
github.com/gorilla/mux v1.8.1
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/hashicorp/go-hclog v1.6.3
github.com/hashicorp/go-metrics v0.5.3
Expand Down
2 changes: 0 additions & 2 deletions server/v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
Expand Down
2 changes: 1 addition & 1 deletion server/v2/store/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

// QueryBlockResultsCmd implements the default command for a BlockResults query.
func (s *StoreComponent[T]) PrunesCmd() *cobra.Command {
func (s *Server[T]) PrunesCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "prune [pruning-method]",
Short: "Prune app history states by keeping the recent heights and deleting old heights",
Expand Down
Loading

0 comments on commit c80099b

Please sign in to comment.