Skip to content

Commit

Permalink
process-agent: use the new statsd component (#21108)
Browse files Browse the repository at this point in the history
  • Loading branch information
iksaif authored and pull[bot] committed Feb 28, 2024
1 parent fb18737 commit 6f8255b
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 40 deletions.
7 changes: 6 additions & 1 deletion cmd/process-agent/main_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/sysprobeconfig/sysprobeconfigimpl"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors"
compstatsd "github.com/DataDog/datadog-agent/comp/dogstatsd/statsd"
hostMetadataUtils "github.com/DataDog/datadog-agent/comp/metadata/host/hostimpl/utils"
"github.com/DataDog/datadog-agent/comp/process"
"github.com/DataDog/datadog-agent/comp/process/apiserver"
Expand Down Expand Up @@ -135,6 +136,9 @@ func runApp(ctx context.Context, globalParams *command.GlobalParams) error {
// Provide remote config client module
rcclient.Module(),

// Provide statsd client module
compstatsd.Module(),

// Provide the corresponding workloadmeta Params to configure the catalog
collectors.GetCatalog(),
fx.Provide(func(c config.Component) workloadmeta.Params {
Expand Down Expand Up @@ -236,6 +240,7 @@ type miscDeps struct {
Lc fx.Lifecycle

Config config.Component
Statsd compstatsd.Component
Syscfg sysprobeconfig.Component
HostInfo hostinfo.Component
WorkloadMeta workloadmeta.Component
Expand All @@ -245,7 +250,7 @@ type miscDeps struct {
// Todo: (Components) WorkloadMeta, remoteTagger, statsd
// Todo: move metadata/workloadmeta/collector to workloadmeta
func initMisc(deps miscDeps) error {
if err := statsd.Configure(ddconfig.GetBindHost(), deps.Config.GetInt("dogstatsd_port"), false); err != nil {
if err := statsd.Configure(ddconfig.GetBindHost(), deps.Config.GetInt("dogstatsd_port"), deps.Statsd.CreateForHostPort); err != nil {
log.Criticalf("Error configuring statsd: %s", err)
return err
}
Expand Down
18 changes: 10 additions & 8 deletions cmd/system-probe/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"errors"
"fmt"
"net/http"

//nolint:revive // TODO(EBPF) Fix revive linter
_ "net/http/pprof"
"os"
Expand All @@ -36,14 +35,15 @@ import (
"github.com/DataDog/datadog-agent/comp/core/sysprobeconfig"
"github.com/DataDog/datadog-agent/comp/core/sysprobeconfig/sysprobeconfigimpl"
"github.com/DataDog/datadog-agent/comp/core/telemetry"
compstatsd "github.com/DataDog/datadog-agent/comp/dogstatsd/statsd"
"github.com/DataDog/datadog-agent/comp/remote-config/rcclient"
"github.com/DataDog/datadog-agent/pkg/api/healthprobe"
ddconfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/config/model"
"github.com/DataDog/datadog-agent/pkg/config/settings"
"github.com/DataDog/datadog-agent/pkg/ebpf"
"github.com/DataDog/datadog-agent/pkg/pidfile"
"github.com/DataDog/datadog-agent/pkg/process/statsd"
processstatsd "github.com/DataDog/datadog-agent/pkg/process/statsd"
ddruntime "github.com/DataDog/datadog-agent/pkg/runtime"
"github.com/DataDog/datadog-agent/pkg/util"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
Expand Down Expand Up @@ -77,6 +77,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command {
fx.Supply(config.NewAgentParams("", config.WithConfigMissingOK(true))),
fx.Supply(sysprobeconfigimpl.NewParams(sysprobeconfigimpl.WithSysProbeConfFilePath(globalParams.ConfFilePath))),
fx.Supply(logimpl.ForDaemon("SYS-PROBE", "log_file", common.DefaultLogFile)),
compstatsd.Module(),
config.Module(),
telemetry.Module(),
sysprobeconfigimpl.Module(),
Expand All @@ -94,7 +95,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command {
}

// run starts the main loop.
func run(log log.Component, _ config.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component, cliParams *cliParams) error {
func run(log log.Component, _ config.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component, cliParams *cliParams) error {
defer func() {
stopSystemProbe(cliParams)
}()
Expand Down Expand Up @@ -136,7 +137,7 @@ func run(log log.Component, _ config.Component, telemetry telemetry.Component, s
}
}()

if err := startSystemProbe(cliParams, log, telemetry, sysprobeconfig, rcclient); err != nil {
if err := startSystemProbe(cliParams, log, statsd, telemetry, sysprobeconfig, rcclient); err != nil {
if err == ErrNotEnabled {
// A sleep is necessary to ensure that supervisor registers this process as "STARTED"
// If the exit is "too quick", we enter a BACKOFF->FATAL loop even though this is an expected exit
Expand All @@ -163,9 +164,9 @@ func StartSystemProbeWithDefaults(ctxChan <-chan context.Context) (<-chan error,
// run startSystemProbe in an app, so that the log and config components get initialized
go func() {
err := fxutil.OneShot(
func(log log.Component, config config.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component) error {
func(log log.Component, config config.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component) error {
defer StopSystemProbeWithDefaults()
err := startSystemProbe(&cliParams{GlobalParams: &command.GlobalParams{}}, log, telemetry, sysprobeconfig, rcclient)
err := startSystemProbe(&cliParams{GlobalParams: &command.GlobalParams{}}, log, statsd, telemetry, sysprobeconfig, rcclient)
if err != nil {
return err
}
Expand Down Expand Up @@ -194,6 +195,7 @@ func StartSystemProbeWithDefaults(ctxChan <-chan context.Context) (<-chan error,
rcclient.Module(),
config.Module(),
telemetry.Module(),
compstatsd.Module(),
sysprobeconfigimpl.Module(),
// use system-probe config instead of agent config for logging
fx.Provide(func(lc fx.Lifecycle, params logimpl.Params, sysprobeconfig sysprobeconfig.Component) (log.Component, error) {
Expand Down Expand Up @@ -221,7 +223,7 @@ func StopSystemProbeWithDefaults() {
}

// startSystemProbe Initializes the system-probe process
func startSystemProbe(cliParams *cliParams, log log.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component) error {
func startSystemProbe(cliParams *cliParams, log log.Component, statsd compstatsd.Component, telemetry telemetry.Component, sysprobeconfig sysprobeconfig.Component, rcclient rcclient.Component) error {
var err error
var ctx context.Context
ctx, common.MainCtxCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -279,7 +281,7 @@ func startSystemProbe(cliParams *cliParams, log log.Component, telemetry telemet
return log.Criticalf("unable to configure auto-exit: %s", err)
}

if err := statsd.Configure(cfg.StatsdHost, cfg.StatsdPort, true); err != nil {
if err := processstatsd.Configure(cfg.StatsdHost, cfg.StatsdPort, statsd.CreateForHostPort); err != nil {
return log.Criticalf("error configuring statsd: %s", err)
}

Expand Down
36 changes: 5 additions & 31 deletions pkg/process/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,16 @@
package statsd

import (
"net"
"os"
"strconv"

"github.com/DataDog/datadog-go/v5/statsd"
)

// Client is a global Statsd client. When a client is configured via Configure,
// that becomes the new global Statsd client in the package.
var Client *statsd.Client
var Client statsd.ClientInterface = &statsd.NoOpClient{}

// Configure creates a statsd client from a dogweb.ini style config file and set it to the global Statsd.
func Configure(host string, port int, lookInEnv bool) error {
var statsdAddr string
if lookInEnv {
statsdAddr = os.Getenv("STATSD_URL")
}

if statsdAddr == "" {
statsdAddr = net.JoinHostPort(host, strconv.Itoa(port))
}

options := []statsd.Option{
// Create a separate client for the telemetry to be sure we don't loose it.
statsd.WithTelemetryAddr(statsdAddr),
// Enable recommended settings to reduce the number of packets sent and reduce
// potential lock contention on the critical path.
statsd.WithChannelMode(),
statsd.WithClientSideAggregation(),
statsd.WithExtendedClientSideAggregation(),
}
client, err := statsd.New(statsdAddr, options...)
if err != nil {
return err
}

Client = client
return nil
func Configure(host string, port int, create func(string, int, ...statsd.Option) (statsd.ClientInterface, error)) error {
var err error
Client, err = create(host, port)
return err
}

0 comments on commit 6f8255b

Please sign in to comment.