Skip to content

Commit

Permalink
fix: data race issues with api.Server (backport cosmos#11724) (cosmos…
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored and JeancarloBarrios committed Sep 28, 2024
1 parent b5693bb commit 2a573f0
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 46 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ Ref: https://keepachangelog.com/en/1.0.0/

## [Unreleased]

### Bug Fixes

* [\#11724](https://github.com/cosmos/cosmos-sdk/pull/11724) Fix data race issues with `api.Server`.

### Improvements

* [\#11693](https://github.com/cosmos/cosmos-sdk/pull/11693) Add validation for gentx cmd.
Expand Down
75 changes: 29 additions & 46 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ type Server struct {
Router *mux.Router
GRPCGatewayRouter *runtime.ServeMux
ClientCtx client.Context
GRPCSrv *grpc.Server
logger log.Logger
metrics *telemetry.Metrics

logger log.Logger
metrics *telemetry.Metrics
// Start() is blocking and generally called from a separate goroutine.
// Close() can be called asynchronously and access shared memory
// via the listener. Therefore, we sync access to Start and Close with
Expand Down Expand Up @@ -90,21 +89,28 @@ func New(clientCtx client.Context, logger log.Logger, grpcSrv *grpc.Server) *Ser

// Start starts the API server. Internally, the API server leverages CometBFT's
// JSON RPC server. Configuration options are provided via config.APIConfig
// and are delegated to the CometBFT JSON RPC server.
//
// Note, this creates a blocking process if the server is started successfully.
// Otherwise, an error is returned. The caller is expected to provide a Context
// that is properly canceled or closed to indicate the server should be stopped.
func (s *Server) Start(ctx context.Context, cfg config.Config) error {
// and are delegated to the Tendermint JSON RPC server. The process is
// non-blocking, so an external signal handler must be used.
func (s *Server) Start(cfg config.Config) error {
s.mtx.Lock()
if cfg.Telemetry.Enabled {
m, err := telemetry.New(cfg.Telemetry)
if err != nil {
s.mtx.Unlock()
return err
}

s.metrics = m
s.registerMetrics()
}

cmtCfg := tmrpcserver.DefaultConfig()
cmtCfg.MaxOpenConnections = int(cfg.API.MaxOpenConnections)
cmtCfg.ReadTimeout = time.Duration(cfg.API.RPCReadTimeout) * time.Second
cmtCfg.WriteTimeout = time.Duration(cfg.API.RPCWriteTimeout) * time.Second
cmtCfg.MaxBodyBytes = int64(cfg.API.RPCMaxBodyBytes)
tmCfg := tmrpcserver.DefaultConfig()
tmCfg.MaxOpenConnections = int(cfg.API.MaxOpenConnections)
tmCfg.ReadTimeout = time.Duration(cfg.API.RPCReadTimeout) * time.Second
tmCfg.WriteTimeout = time.Duration(cfg.API.RPCWriteTimeout) * time.Second
tmCfg.MaxBodyBytes = int64(cfg.API.RPCMaxBodyBytes)

listener, err := tmrpcserver.Listen(cfg.API.Address, cmtCfg.MaxOpenConnections)
listener, err := tmrpcserver.Listen(cfg.API.Address, tmCfg)
if err != nil {
s.mtx.Unlock()
return err
Expand All @@ -113,38 +119,15 @@ func (s *Server) Start(ctx context.Context, cfg config.Config) error {
s.listener = listener
s.mtx.Unlock()

// register grpc-gateway routes
s.Router.PathPrefix("/").Handler(s.GRPCGatewayRouter)

errCh := make(chan error)

// Start the API in an external goroutine as Serve is blocking and will return
// an error upon failure, which we'll send on the error channel that will be
// consumed by the for block below.
go func(enableUnsafeCORS bool) {
s.logger.Info("starting API server...", "address", cfg.API.Address)

if enableUnsafeCORS {
allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"}))
errCh <- tmrpcserver.Serve(s.listener, allowAllCORS(s.Router), servercmtlog.CometLoggerWrapper{Logger: s.logger}, cmtCfg)
} else {
errCh <- tmrpcserver.Serve(s.listener, s.Router, servercmtlog.CometLoggerWrapper{Logger: s.logger}, cmtCfg)
}
}(cfg.API.EnableUnsafeCORS)

// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
select {
case <-ctx.Done():
// The calling process canceled or closed the provided context, so we must
// gracefully stop the API server.
s.logger.Info("stopping API server...", "address", cfg.API.Address)
return s.Close()

case err := <-errCh:
s.logger.Error("failed to start API server", "err", err)
return err
if cfg.API.EnableUnsafeCORS {
allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"}))
s.mtx.Unlock()
return tmrpcserver.Serve(s.listener, allowAllCORS(h), s.logger, tmCfg)
}

s.logger.Info("starting API server...")
s.mtx.Unlock()
return tmrpcserver.Serve(s.listener, s.Router, s.logger, tmCfg)
}

// Close closes the API server.
Expand Down

0 comments on commit 2a573f0

Please sign in to comment.