From 6f8255b3c6d029c1cc60439ffc7c4387ac1d26e6 Mon Sep 17 00:00:00 2001 From: Corentin Chary Date: Tue, 12 Dec 2023 15:48:35 +0100 Subject: [PATCH] process-agent: use the new statsd component (#21108) --- cmd/process-agent/main_common.go | 7 +++- cmd/system-probe/subcommands/run/command.go | 18 ++++++----- pkg/process/statsd/statsd.go | 36 +++------------------ 3 files changed, 21 insertions(+), 40 deletions(-) diff --git a/cmd/process-agent/main_common.go b/cmd/process-agent/main_common.go index 10bf59ea8a30b3..a6e6f432068b63 100644 --- a/cmd/process-agent/main_common.go +++ b/cmd/process-agent/main_common.go @@ -27,6 +27,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core/sysprobeconfig/sysprobeconfigimpl" "github.com/DataDog/datadog-agent/comp/core/workloadmeta" "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors" + compstatsd "github.com/DataDog/datadog-agent/comp/dogstatsd/statsd" hostMetadataUtils "github.com/DataDog/datadog-agent/comp/metadata/host/hostimpl/utils" "github.com/DataDog/datadog-agent/comp/process" "github.com/DataDog/datadog-agent/comp/process/apiserver" @@ -135,6 +136,9 @@ func runApp(ctx context.Context, globalParams *command.GlobalParams) error { // Provide remote config client module rcclient.Module(), + // Provide statsd client module + compstatsd.Module(), + // Provide the corresponding workloadmeta Params to configure the catalog collectors.GetCatalog(), fx.Provide(func(c config.Component) workloadmeta.Params { @@ -236,6 +240,7 @@ type miscDeps struct { Lc fx.Lifecycle Config config.Component + Statsd compstatsd.Component Syscfg sysprobeconfig.Component HostInfo hostinfo.Component WorkloadMeta workloadmeta.Component @@ -245,7 +250,7 @@ type miscDeps struct { // Todo: (Components) WorkloadMeta, remoteTagger, statsd // Todo: move metadata/workloadmeta/collector to workloadmeta func initMisc(deps miscDeps) error { - if err := statsd.Configure(ddconfig.GetBindHost(), deps.Config.GetInt("dogstatsd_port"), false); err != nil { + if err := statsd.Configure(ddconfig.GetBindHost(), deps.Config.GetInt("dogstatsd_port"), deps.Statsd.CreateForHostPort); err != nil { log.Criticalf("Error configuring statsd: %s", err) return err } diff --git a/cmd/system-probe/subcommands/run/command.go b/cmd/system-probe/subcommands/run/command.go index efca253127762b..dc8996f25c48d1 100644 --- a/cmd/system-probe/subcommands/run/command.go +++ b/cmd/system-probe/subcommands/run/command.go @@ -11,7 +11,6 @@ import ( "errors" "fmt" "net/http" - //nolint:revive // TODO(EBPF) Fix revive linter _ "net/http/pprof" "os" @@ -36,6 +35,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core/sysprobeconfig" "github.com/DataDog/datadog-agent/comp/core/sysprobeconfig/sysprobeconfigimpl" "github.com/DataDog/datadog-agent/comp/core/telemetry" + compstatsd "github.com/DataDog/datadog-agent/comp/dogstatsd/statsd" "github.com/DataDog/datadog-agent/comp/remote-config/rcclient" "github.com/DataDog/datadog-agent/pkg/api/healthprobe" ddconfig "github.com/DataDog/datadog-agent/pkg/config" @@ -43,7 +43,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/config/settings" "github.com/DataDog/datadog-agent/pkg/ebpf" "github.com/DataDog/datadog-agent/pkg/pidfile" - "github.com/DataDog/datadog-agent/pkg/process/statsd" + processstatsd "github.com/DataDog/datadog-agent/pkg/process/statsd" ddruntime "github.com/DataDog/datadog-agent/pkg/runtime" "github.com/DataDog/datadog-agent/pkg/util" "github.com/DataDog/datadog-agent/pkg/util/fxutil" @@ -77,6 +77,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command { fx.Supply(config.NewAgentParams("", config.WithConfigMissingOK(true))), fx.Supply(sysprobeconfigimpl.NewParams(sysprobeconfigimpl.WithSysProbeConfFilePath(globalParams.ConfFilePath))), fx.Supply(logimpl.ForDaemon("SYS-PROBE", "log_file", common.DefaultLogFile)), + compstatsd.Module(), config.Module(), telemetry.Module(), sysprobeconfigimpl.Module(), @@ -94,7 +95,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command { } // run starts the main loop. -func run(log log.Component, _ config.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component, cliParams *cliParams) error { +func run(log log.Component, _ config.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component, cliParams *cliParams) error { defer func() { stopSystemProbe(cliParams) }() @@ -136,7 +137,7 @@ func run(log log.Component, _ config.Component, telemetry telemetry.Component, s } }() - if err := startSystemProbe(cliParams, log, telemetry, sysprobeconfig, rcclient); err != nil { + if err := startSystemProbe(cliParams, log, statsd, telemetry, sysprobeconfig, rcclient); err != nil { if err == ErrNotEnabled { // A sleep is necessary to ensure that supervisor registers this process as "STARTED" // If the exit is "too quick", we enter a BACKOFF->FATAL loop even though this is an expected exit @@ -163,9 +164,9 @@ func StartSystemProbeWithDefaults(ctxChan <-chan context.Context) (<-chan error, // run startSystemProbe in an app, so that the log and config components get initialized go func() { err := fxutil.OneShot( - func(log log.Component, config config.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component) error { + func(log log.Component, config config.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component) error { defer StopSystemProbeWithDefaults() - err := startSystemProbe(&cliParams{GlobalParams: &command.GlobalParams{}}, log, telemetry, sysprobeconfig, rcclient) + err := startSystemProbe(&cliParams{GlobalParams: &command.GlobalParams{}}, log, statsd, telemetry, sysprobeconfig, rcclient) if err != nil { return err } @@ -194,6 +195,7 @@ func StartSystemProbeWithDefaults(ctxChan <-chan context.Context) (<-chan error, rcclient.Module(), config.Module(), telemetry.Module(), + compstatsd.Module(), sysprobeconfigimpl.Module(), // use system-probe config instead of agent config for logging fx.Provide(func(lc fx.Lifecycle, params logimpl.Params, sysprobeconfig sysprobeconfig.Component) (log.Component, error) { @@ -221,7 +223,7 @@ func StopSystemProbeWithDefaults() { } // startSystemProbe Initializes the system-probe process -func startSystemProbe(cliParams *cliParams, log log.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component) error { +func startSystemProbe(cliParams *cliParams, log log.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component) error { var err error var ctx context.Context ctx, common.MainCtxCancel = context.WithCancel(context.Background()) @@ -279,7 +281,7 @@ func startSystemProbe(cliParams *cliParams, log log.Component, telemetry telemet return log.Criticalf("unable to configure auto-exit: %s", err) } - if err := statsd.Configure(cfg.StatsdHost, cfg.StatsdPort, true); err != nil { + if err := processstatsd.Configure(cfg.StatsdHost, cfg.StatsdPort, statsd.CreateForHostPort); err != nil { return log.Criticalf("error configuring statsd: %s", err) } diff --git a/pkg/process/statsd/statsd.go b/pkg/process/statsd/statsd.go index 9d83f1e81a3316..8583bd78dd4e6a 100644 --- a/pkg/process/statsd/statsd.go +++ b/pkg/process/statsd/statsd.go @@ -7,42 +7,16 @@ package statsd import ( - "net" - "os" - "strconv" - "github.com/DataDog/datadog-go/v5/statsd" ) // Client is a global Statsd client. When a client is configured via Configure, // that becomes the new global Statsd client in the package. -var Client *statsd.Client +var Client statsd.ClientInterface = &statsd.NoOpClient{} // Configure creates a statsd client from a dogweb.ini style config file and set it to the global Statsd. -func Configure(host string, port int, lookInEnv bool) error { - var statsdAddr string - if lookInEnv { - statsdAddr = os.Getenv("STATSD_URL") - } - - if statsdAddr == "" { - statsdAddr = net.JoinHostPort(host, strconv.Itoa(port)) - } - - options := []statsd.Option{ - // Create a separate client for the telemetry to be sure we don't loose it. - statsd.WithTelemetryAddr(statsdAddr), - // Enable recommended settings to reduce the number of packets sent and reduce - // potential lock contention on the critical path. - statsd.WithChannelMode(), - statsd.WithClientSideAggregation(), - statsd.WithExtendedClientSideAggregation(), - } - client, err := statsd.New(statsdAddr, options...) - if err != nil { - return err - } - - Client = client - return nil +func Configure(host string, port int, create func(string, int, ...statsd.Option) (statsd.ClientInterface, error)) error { + var err error + Client, err = create(host, port) + return err }