diff --git a/cmd/necoperf-daemon/cmd/daemon.go b/cmd/necoperf-daemon/cmd/daemon.go index a180311..44b3c9e 100644 --- a/cmd/necoperf-daemon/cmd/daemon.go +++ b/cmd/necoperf-daemon/cmd/daemon.go @@ -13,6 +13,7 @@ var ( port int runtimeEndpoint string workDir string + metricsPort int ) func NewDaemonCommand() *cobra.Command { @@ -22,7 +23,7 @@ func NewDaemonCommand() *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { handler := slog.NewTextHandler(os.Stderr, nil) logger := slog.New(handler) - daemon, err := daemon.New(logger, port, runtimeEndpoint, workDir) + daemon, err := daemon.New(logger, port, metricsPort, runtimeEndpoint, workDir) if err != nil { return err } @@ -31,6 +32,7 @@ func NewDaemonCommand() *cobra.Command { }, } cmd.Flags().IntVar(&port, "port", constants.NecoPerfGrpcServerPort, "Port number on which the grpc server runs") + cmd.Flags().IntVar(&metricsPort, "metrics-port", constants.NecoPerfMetricsPort, "Port number on which the metrics server runs") cmd.Flags().StringVar(&runtimeEndpoint, "runtime-endpoint", "unix:///run/containerd/containerd.sock", "Container runtime endpoint to connect to") cmd.Flags().StringVar(&workDir, "work-dir", "/var/necoperf", "Directory for storing profiling result") diff --git a/docs/necoperf-daemon.md b/docs/necoperf-daemon.md index 9965309..6e4a14d 100644 --- a/docs/necoperf-daemon.md +++ b/docs/necoperf-daemon.md @@ -13,5 +13,6 @@ Start necoperf-daemon on the server. | Option | Default value |Description | |:-------|:--------------|:-----------| | `--port` | `6543` | Port number on which the grpc server runs | +| `--metrics-port` | `6541` | Port number on which the metrics server runs | | `--runtime-endpoint` | `unix:///run/containerd/containerd.sock` | Container runtime endpoint to connect to | | `--work-dir` | `/var/necoperf` | Directory for storing profiling results | diff --git a/go.mod b/go.mod index b6e17a4..85fda9d 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,12 @@ toolchain go1.21.1 require ( github.com/google/uuid v1.3.0 + github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0 github.com/oklog/run v1.1.0 github.com/onsi/ginkgo/v2 v2.11.0 github.com/onsi/gomega v1.27.10 + github.com/prometheus/client_golang v1.16.0 github.com/spf13/cobra v1.7.0 github.com/stretchr/testify v1.8.4 golang.org/x/sync v0.3.0 @@ -57,7 +59,6 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.16.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect diff --git a/go.sum b/go.sum index e13813f..5349f85 100644 --- a/go.sum +++ b/go.sum @@ -179,6 +179,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= +github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 h1:f4tggROQKKcnh4eItay6z/HbHLqghBxS8g7pyMhmDio= +github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0/go.mod h1:hKAkSgNkL0FII46ZkJcpVEAai4KV+swlIWCKfekd1pA= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0 h1:2cz5kSrxzMYHiWOBbKj8itQm+nRykkB8aMv4ThcHYHA= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0/go.mod h1:w9Y7gY31krpLmrVU5ZPG9H7l9fZuRu5/3R3S3FMtVQ4= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= diff --git a/internal/constants/constants.go b/internal/constants/constants.go index 8b8e28f..08344c1 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -15,6 +15,7 @@ const ( ) const ( + NecoPerfMetricsPort = 6541 NecoPerfGrpcServerPort = 6543 NecoperfGrpcPortName = "necoperf-grpc" ) diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index 66656b1..3260c20 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -5,13 +5,17 @@ import ( "fmt" "log/slog" "net" + "net/http" "os" "time" "github.com/cybozu-go/necoperf/internal/resource" "github.com/cybozu-go/necoperf/internal/rpc" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "golang.org/x/sync/semaphore" "google.golang.org/grpc" "google.golang.org/grpc/health" @@ -22,12 +26,13 @@ import ( ) type DaemonServer struct { - logger *slog.Logger - server *grpc.Server - port int - endpoint string - workDir string - semaphore *semaphore.Weighted + logger *slog.Logger + server *grpc.Server + port int + metricsPort int + endpoint string + workDir string + semaphore *semaphore.Weighted rpc.UnimplementedNecoPerfServer container *resource.Container perfExecuter *resource.PerfExecuter @@ -39,7 +44,17 @@ const ( maxWorkers = 2 ) -func New(logger *slog.Logger, port int, endpoint, workDir string) (*DaemonServer, error) { +var ( + reg = prometheus.NewRegistry() + metricsHandler = promhttp.HandlerFor( + reg, + promhttp.HandlerOpts{ + ErrorHandling: promhttp.ContinueOnError, + }, + ) +) + +func New(logger *slog.Logger, port, metricsPort int, endpoint, workDir string) (*DaemonServer, error) { opts := []logging.Option{ logging.WithLogOnEvents(logging.StartCall, logging.FinishCall), } @@ -48,27 +63,34 @@ func New(logger *slog.Logger, port int, endpoint, workDir string) (*DaemonServer MinTime: minTime, } + srvMetrics := grpcprom.NewServerMetrics() + reg.MustRegister(srvMetrics) + serv := grpc.NewServer( grpc.ChainUnaryInterceptor( + srvMetrics.UnaryServerInterceptor(), logging.UnaryServerInterceptor(InterceptorLogger(logger), opts...), ), grpc.ChainStreamInterceptor( + srvMetrics.StreamServerInterceptor(), logging.StreamServerInterceptor(InterceptorLogger(logger), opts...), ), grpc.KeepaliveEnforcementPolicy( kep, ), ) + srvMetrics.InitializeMetrics(serv) semaphore := semaphore.NewWeighted(maxWorkers) return &DaemonServer{ - logger: logger, - server: serv, - port: port, - endpoint: endpoint, - workDir: workDir, - semaphore: semaphore, + logger: logger, + server: serv, + port: port, + metricsPort: metricsPort, + endpoint: endpoint, + workDir: workDir, + semaphore: semaphore, }, nil } @@ -114,6 +136,20 @@ func (d *DaemonServer) Start() error { d.server.Stop() }) + addr := fmt.Sprintf(":%d", d.metricsPort) + metricsServer := &http.Server{Addr: addr} + g.Add(func() error { + m := http.NewServeMux() + m.Handle("/metrics", metricsHandler) + metricsServer.Handler = m + d.logger.Info("metrics server is running", "port", d.metricsPort) + return metricsServer.ListenAndServe() + }, func(err error) { + if err := metricsServer.Close(); err != nil { + d.logger.Error("metrics server shutdown is failed", "error", err) + } + }) + return g.Run() }