Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server/v2): wire telemetry + server refactors #21746

Merged
merged 11 commits into from
Sep 27, 2024
9 changes: 2 additions & 7 deletions server/v2/api/grpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,9 @@ import "math"

func DefaultConfig() *Config {
return &Config{
Enable: true,
// DefaultGRPCAddress defines the default address to bind the gRPC server to.
Address: "localhost:9090",
// DefaultGRPCMaxRecvMsgSize defines the default gRPC max message size in
// bytes the server can receive.
Enable: true,
Address: "localhost:9090",
MaxRecvMsgSize: 1024 * 1024 * 10,
// DefaultGRPCMaxSendMsgSize defines the default gRPC max message size in
// bytes the server can send.
MaxSendMsgSize: math.MaxInt32,
}
}
Expand Down
29 changes: 11 additions & 18 deletions server/v2/api/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (s *Server[T]) Name() string {
}

func (s *Server[T]) Config() any {
if s.config == nil || s.config == (&Config{}) {
if s.config == nil || s.config.Address == "" {
cfg := DefaultConfig()
// overwrite the default config with the provided options
for _, opt := range s.cfgOptions {
Expand All @@ -163,6 +163,7 @@ func (s *Server[T]) Config() any {

func (s *Server[T]) Start(ctx context.Context) error {
if !s.config.Enable {
s.logger.Info(fmt.Sprintf("%s server is disabled via config", s.Name()))
return nil
}

Expand All @@ -171,24 +172,12 @@ func (s *Server[T]) Start(ctx context.Context) error {
return fmt.Errorf("failed to listen on address %s: %w", s.config.Address, err)
}

errCh := make(chan error)

// Start the gRPC in an external goroutine as Serve is blocking and will return
// an error upon failure, which we'll send on the error channel that will be
// consumed by the for block below.
go func() {
s.logger.Info("starting gRPC server...", "address", s.config.Address)
errCh <- s.grpcSrv.Serve(listener)
}()

// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
err = <-errCh
if err != nil {
s.logger.Error("failed to start gRPC server", "err", err)
s.logger.Info("starting gRPC server...", "address", s.config.Address)
if err := s.grpcSrv.Serve(listener); err != nil {
return fmt.Errorf("failed to start gRPC server: %w", err)
}

return err
return nil
}

func (s *Server[T]) Stop(ctx context.Context) error {
Expand All @@ -198,6 +187,10 @@ func (s *Server[T]) Stop(ctx context.Context) error {

s.logger.Info("stopping gRPC server...", "address", s.config.Address)
s.grpcSrv.GracefulStop()

return nil
}

// GetGRPCServer returns the underlying gRPC server.
func (s *Server[T]) GetGRPCServer() *grpc.Server {
return s.grpcSrv
}
julienrbrt marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 5 additions & 1 deletion server/v2/api/grpcgateway/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package grpcgateway

func DefaultConfig() *Config {
return &Config{
Enable: true,
Enable: true,
Address: "localhost:1317",
}
}

type Config struct {
// Enable defines if the gRPC-gateway should be enabled.
Enable bool `mapstructure:"enable" toml:"enable" comment:"Enable defines if the gRPC-gateway should be enabled."`

// Address defines the address the gRPC-gateway server binds to.
Address string `mapstructure:"address" toml:"address" comment:"Address defines the address the gRPC-gateway server binds to."`
}

type CfgOption func(*Config)
Expand Down
76 changes: 39 additions & 37 deletions server/v2/api/grpcgateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

gateway "github.com/cosmos/gogogateway"
"github.com/cosmos/gogoproto/jsonpb"
"github.com/gorilla/mux"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"

Expand All @@ -17,26 +16,25 @@ import (
serverv2 "cosmossdk.io/server/v2"
)

var _ serverv2.ServerComponent[transaction.Tx] = (*GRPCGatewayServer[transaction.Tx])(nil)

const (
ServerName = "grpc-gateway"

// GRPCBlockHeightHeader is the gRPC header for block height.
GRPCBlockHeightHeader = "x-cosmos-block-height"
var (
_ serverv2.ServerComponent[transaction.Tx] = (*Server[transaction.Tx])(nil)
_ serverv2.HasConfig = (*Server[transaction.Tx])(nil)
)

type GRPCGatewayServer[T transaction.Tx] struct {
const ServerName = "grpc-gateway"

type Server[T transaction.Tx] struct {
logger log.Logger
config *Config
cfgOptions []CfgOption

GRPCSrv *grpc.Server
GRPCGatewayRouter *runtime.ServeMux
server *http.Server
gRPCSrv *grpc.Server
gRPCGatewayRouter *runtime.ServeMux
}

// New creates a new gRPC-gateway server.
func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptions ...CfgOption) *GRPCGatewayServer[T] {
func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptions ...CfgOption) *Server[T] {
// The default JSON marshaller used by the gRPC-Gateway is unable to marshal non-nullable non-scalar fields.
// Using the gogo/gateway package with the gRPC-Gateway WithMarshaler option fixes the scalar field marshaling issue.
marshalerOption := &gateway.JSONPb{
Expand All @@ -46,9 +44,9 @@ func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptio
AnyResolver: ir,
}

return &GRPCGatewayServer[T]{
GRPCSrv: grpcSrv,
GRPCGatewayRouter: runtime.NewServeMux(
return &Server[T]{
gRPCSrv: grpcSrv,
gRPCGatewayRouter: runtime.NewServeMux(
// Custom marshaler option is required for gogo proto
runtime.WithMarshalerOption(runtime.MIMEWildcard, marshalerOption),

Expand All @@ -64,12 +62,12 @@ func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptio
}
}

func (g *GRPCGatewayServer[T]) Name() string {
func (s *Server[T]) Name() string {
return ServerName
}

func (s *GRPCGatewayServer[T]) Config() any {
if s.config == nil || s.config == (&Config{}) {
func (s *Server[T]) Config() any {
if s.config == nil || s.config.Address == "" {
cfg := DefaultConfig()
// overwrite the default config with the provided options
for _, opt := range s.cfgOptions {
Expand All @@ -82,50 +80,51 @@ func (s *GRPCGatewayServer[T]) Config() any {
return s.config
}

func (s *GRPCGatewayServer[T]) Init(appI serverv2.AppI[transaction.Tx], cfg map[string]any, logger log.Logger) error {
func (s *Server[T]) Init(appI serverv2.AppI[transaction.Tx], cfg map[string]any, logger log.Logger) error {
serverCfg := s.Config().(*Config)
if len(cfg) > 0 {
if err := serverv2.UnmarshalSubConfig(cfg, s.Name(), &serverCfg); err != nil {
return fmt.Errorf("failed to unmarshal config: %w", err)
}
}

// Register the gRPC-Gateway server.
// appI.RegisterGRPCGatewayRoutes(s.GRPCGatewayRouter, s.GRPCSrv)
// TODO: register the gRPC-Gateway routes

s.logger = logger
s.logger = logger.With(log.ModuleKey, s.Name())
s.config = serverCfg

return nil
}
julienrbrt marked this conversation as resolved.
Show resolved Hide resolved

func (s *GRPCGatewayServer[T]) Start(ctx context.Context) error {
func (s *Server[T]) Start(ctx context.Context) error {
if !s.config.Enable {
s.logger.Info(fmt.Sprintf("%s server is disabled via config", s.Name()))
return nil
}

// TODO start a normal Go http server (and do not leverage comet's like https://github.com/cosmos/cosmos-sdk/blob/9df6019de6ee7999fe9864bac836deb2f36dd44a/server/api/server.go#L98)
mux := http.NewServeMux()
mux.Handle("/", s.gRPCGatewayRouter)

return nil
}
s.server = &http.Server{
Addr: s.config.Address,
Handler: mux,
}

func (s *GRPCGatewayServer[T]) Stop(ctx context.Context) error {
if !s.config.Enable {
return nil
s.logger.Info("starting gRPC-Gateway server...", "address", s.config.Address)
if err := s.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
return fmt.Errorf("failed to start gRPC-Gateway server: %w", err)
}

return nil
}

// Register implements registers a grpc-gateway server
func (s *GRPCGatewayServer[T]) Register(r mux.Router) error {
// configure grpc-gatway server
r.PathPrefix("/").Handler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
// Fall back to grpc gateway server.
s.GRPCGatewayRouter.ServeHTTP(w, req)
}))
func (s *Server[T]) Stop(ctx context.Context) error {
if !s.config.Enable {
return nil
}

return nil
s.logger.Info("stopping gRPC-Gateway server...", "address", s.config.Address)
return s.server.Shutdown(ctx)
}

// CustomGRPCHeaderMatcher for mapping request headers to
Expand All @@ -134,6 +133,9 @@ func (s *GRPCGatewayServer[T]) Register(r mux.Router) error {
// gRPC metadata after removing prefix 'Grpc-Metadata-'. We can use this
// CustomGRPCHeaderMatcher if headers don't start with `Grpc-Metadata-`
func CustomGRPCHeaderMatcher(key string) (string, bool) {
// GRPCBlockHeightHeader is the gRPC header for block height.
const GRPCBlockHeightHeader = "x-cosmos-block-height"

switch strings.ToLower(key) {
case GRPCBlockHeightHeader:
return GRPCBlockHeightHeader, true
Expand Down
29 changes: 24 additions & 5 deletions server/v2/api/telemetry/config.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
package telemetry

type Config struct {
// Prefixed with keys to separate services
ServiceName string `mapstructure:"service-name" toml:"service-name" comment:"Prefixed with keys to separate services."`
func DefaultConfig() *Config {
return &Config{
Enable: true,
Address: "localhost:1318",
ServiceName: "",
EnableHostname: false,
EnableHostnameLabel: false,
EnableServiceLabel: false,
PrometheusRetentionTime: 0,
GlobalLabels: nil,
MetricsSink: "",
StatsdAddr: "",
DatadogHostname: "",
}
}

// Enabled enables the application telemetry functionality. When enabled,
type Config struct {
// Enable enables the application telemetry functionality. When enabled,
// an in-memory sink is also enabled by default. Operators may also enabled
// other sinks such as Prometheus.
Enabled bool `mapstructure:"enabled" toml:"enabled" comment:"Enabled enables the application telemetry functionality. When enabled, an in-memory sink is also enabled by default. Operators may also enabled other sinks such as Prometheus."`
Enable bool `mapstructure:"enable" toml:"enable" comment:"Enable enables the application telemetry functionality. When enabled, an in-memory sink is also enabled by default. Operators may also enabled other sinks such as Prometheus."`

// Address defines the API server to listen on
Address string `mapstructure:"address" toml:"address" comment:"Address defines the metrics server address to bind to."`

// Prefixed with keys to separate services
ServiceName string `mapstructure:"service-name" toml:"service-name" comment:"Prefixed with keys to separate services."`

// Enable prefixing gauge values with hostname
EnableHostname bool `mapstructure:"enable-hostname" toml:"enable-hostname" comment:"Enable prefixing gauge values with hostname."`
Expand Down
9 changes: 2 additions & 7 deletions server/v2/api/telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,7 @@ type GatherResponse struct {
}

// New creates a new instance of Metrics
func New(cfg Config) (_ *Metrics, rerr error) {
if !cfg.Enabled {
return nil, nil
}

func NewMetrics(cfg *Config) (*Metrics, error) {
if numGlobalLabels := len(cfg.GlobalLabels); numGlobalLabels > 0 {
parsedGlobalLabels := make([]metrics.Label, numGlobalLabels)
for i, gl := range cfg.GlobalLabels {
Expand All @@ -89,12 +85,11 @@ func New(cfg Config) (_ *Metrics, rerr error) {
sink = memSink
inMemSig := metrics.DefaultInmemSignal(memSink)
defer func() {
if rerr != nil {
if err != nil {
inMemSig.Stop()
}
}()
}

if err != nil {
return nil, err
}
Expand Down
Loading
Loading