Skip to content

Commit

Permalink
Add metrics to grpc server
Browse files Browse the repository at this point in the history
Signed-off-by: zeroalphat <taichi-takemura@cybozu.co.jp>
  • Loading branch information
zeroalphat committed Oct 23, 2023
1 parent ff78541 commit 39333a6
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 15 deletions.
4 changes: 3 additions & 1 deletion cmd/necoperf-daemon/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var (
port int
runtimeEndpoint string
workDir string
metricsPort int
)

func NewDaemonCommand() *cobra.Command {
Expand All @@ -22,7 +23,7 @@ func NewDaemonCommand() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
handler := slog.NewTextHandler(os.Stderr, nil)
logger := slog.New(handler)
daemon, err := daemon.New(logger, port, runtimeEndpoint, workDir)
daemon, err := daemon.New(logger, port, metricsPort, runtimeEndpoint, workDir)
if err != nil {
return err
}
Expand All @@ -31,6 +32,7 @@ func NewDaemonCommand() *cobra.Command {
},
}
cmd.Flags().IntVar(&port, "port", constants.NecoPerfGrpcServerPort, "Port number on which the grpc server runs")
cmd.Flags().IntVar(&metricsPort, "metrics-port", constants.NecoPerfMetricsPort, "Port number on which the metrics server runs")
cmd.Flags().StringVar(&runtimeEndpoint, "runtime-endpoint", "unix:///run/containerd/containerd.sock", "Container runtime endpoint to connect to")
cmd.Flags().StringVar(&workDir, "work-dir", "/var/necoperf", "Directory for storing profiling result")

Expand Down
1 change: 1 addition & 0 deletions docs/necoperf-daemon.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ Start necoperf-daemon on the server.
| Option | Default value |Description |
|:-------|:--------------|:-----------|
| `--port` | `6543` | Port number on which the grpc server runs |
| `--metrics-port` | `6541` | Port number on which the metrics server runs |
| `--runtime-endpoint` | `unix:///run/containerd/containerd.sock` | Container runtime endpoint to connect to |
| `--work-dir` | `/var/necoperf` | Directory for storing profiling results |
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ toolchain go1.21.1

require (
github.com/google/uuid v1.3.0
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0
github.com/oklog/run v1.1.0
github.com/onsi/ginkgo/v2 v2.11.0
github.com/onsi/gomega v1.27.10
github.com/prometheus/client_golang v1.16.0
github.com/spf13/cobra v1.7.0
github.com/stretchr/testify v1.8.4
golang.org/x/sync v0.3.0
Expand Down Expand Up @@ -57,7 +59,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0 h1:f4tggROQKKcnh4eItay6z/HbHLqghBxS8g7pyMhmDio=
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0/go.mod h1:hKAkSgNkL0FII46ZkJcpVEAai4KV+swlIWCKfekd1pA=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0 h1:2cz5kSrxzMYHiWOBbKj8itQm+nRykkB8aMv4ThcHYHA=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0/go.mod h1:w9Y7gY31krpLmrVU5ZPG9H7l9fZuRu5/3R3S3FMtVQ4=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
Expand Down
1 change: 1 addition & 0 deletions internal/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
)

const (
NecoPerfMetricsPort = 6541
NecoPerfGrpcServerPort = 6543
NecoperfGrpcPortName = "necoperf-grpc"
)
62 changes: 49 additions & 13 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ import (
"fmt"
"log/slog"
"net"
"net/http"
"os"
"time"

"github.com/cybozu-go/necoperf/internal/resource"
"github.com/cybozu-go/necoperf/internal/rpc"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
Expand All @@ -22,12 +26,13 @@ import (
)

type DaemonServer struct {
logger *slog.Logger
server *grpc.Server
port int
endpoint string
workDir string
semaphore *semaphore.Weighted
logger *slog.Logger
server *grpc.Server
port int
metricsPort int
endpoint string
workDir string
semaphore *semaphore.Weighted
rpc.UnimplementedNecoPerfServer
container *resource.Container
perfExecuter *resource.PerfExecuter
Expand All @@ -39,7 +44,17 @@ const (
maxWorkers = 2
)

func New(logger *slog.Logger, port int, endpoint, workDir string) (*DaemonServer, error) {
var (
reg = prometheus.NewRegistry()
metricsHandler = promhttp.HandlerFor(
reg,
promhttp.HandlerOpts{
ErrorHandling: promhttp.ContinueOnError,
},
)
)

func New(logger *slog.Logger, port, metricsPort int, endpoint, workDir string) (*DaemonServer, error) {
opts := []logging.Option{
logging.WithLogOnEvents(logging.StartCall, logging.FinishCall),
}
Expand All @@ -48,27 +63,34 @@ func New(logger *slog.Logger, port int, endpoint, workDir string) (*DaemonServer
MinTime: minTime,
}

srvMetrics := grpcprom.NewServerMetrics()
reg.MustRegister(srvMetrics)

serv := grpc.NewServer(
grpc.ChainUnaryInterceptor(
srvMetrics.UnaryServerInterceptor(),
logging.UnaryServerInterceptor(InterceptorLogger(logger), opts...),
),
grpc.ChainStreamInterceptor(
srvMetrics.StreamServerInterceptor(),
logging.StreamServerInterceptor(InterceptorLogger(logger), opts...),
),
grpc.KeepaliveEnforcementPolicy(
kep,
),
)
srvMetrics.InitializeMetrics(serv)

semaphore := semaphore.NewWeighted(maxWorkers)

return &DaemonServer{
logger: logger,
server: serv,
port: port,
endpoint: endpoint,
workDir: workDir,
semaphore: semaphore,
logger: logger,
server: serv,
port: port,
metricsPort: metricsPort,
endpoint: endpoint,
workDir: workDir,
semaphore: semaphore,
}, nil
}

Expand Down Expand Up @@ -114,6 +136,20 @@ func (d *DaemonServer) Start() error {
d.server.Stop()
})

addr := fmt.Sprintf(":%d", d.metricsPort)
metricsServer := &http.Server{Addr: addr}
g.Add(func() error {
m := http.NewServeMux()
m.Handle("/metrics", metricsHandler)
metricsServer.Handler = m
d.logger.Info("metrics server is running", "port", d.metricsPort)
return metricsServer.ListenAndServe()
}, func(err error) {
if err := metricsServer.Close(); err != nil {
d.logger.Error("metrics server shutdown is failed", "error", err)
}
})

return g.Run()
}

Expand Down

0 comments on commit 39333a6

Please sign in to comment.