Skip to content

Commit

Permalink
reduce fluentd connection churn during high event-handler load (#48909)…
Browse files Browse the repository at this point in the history
… (#49035)
  • Loading branch information
fspmarshall authored Nov 15, 2024
1 parent a0c6f1b commit ead745a
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 44 deletions.
10 changes: 10 additions & 0 deletions integrations/event-handler/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type FluentdConfig struct {

// FluentdCA is a path to fluentd CA
FluentdCA string `help:"fluentd TLS CA file" type:"existingfile" env:"FDWRD_FLUENTD_CA"`

// FluentdMaxConnections caps the number of connections to fluentd. Defaults to a dynamic value
// calculated relative to app-level concurrency.
FluentdMaxConnections int `help:"Maximum number of connections to fluentd" env:"FDWRD_MAX_CONNECTIONS"`
}

// TeleportConfig is Teleport instance configuration
Expand Down Expand Up @@ -237,6 +241,11 @@ func (c *StartCmdConfig) Validate() error {
c.SkipSessionTypes = lib.SliceToAnonymousMap(c.SkipSessionTypesRaw)
c.SkipEventTypes = lib.SliceToAnonymousMap(c.SkipEventTypesRaw)

if c.FluentdMaxConnections < 1 {
// 2x concurrency is effectively uncapped.
c.FluentdMaxConnections = c.Concurrency * 2
}

return nil
}

Expand All @@ -256,6 +265,7 @@ func (c *StartCmdConfig) Dump(ctx context.Context) {
log.WithField("ca", c.FluentdCA).Info("Using Fluentd ca")
log.WithField("cert", c.FluentdCert).Info("Using Fluentd cert")
log.WithField("key", c.FluentdKey).Info("Using Fluentd key")
log.WithField("max_connections", c.FluentdMaxConnections).Info("Using Fluentd max connections")
log.WithField("window-size", c.WindowSize).Info("Using window size")

if c.TeleportIdentityFile != "" {
Expand Down
33 changes: 18 additions & 15 deletions integrations/event-handler/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ func TestStartCmdConfig(t *testing.T) {
Debug: false,
Start: StartCmdConfig{
FluentdConfig: FluentdConfig{
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: path.Join(wd, "testdata", "fake-file"),
FluentdKey: path.Join(wd, "testdata", "fake-file"),
FluentdCA: path.Join(wd, "testdata", "fake-file"),
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: path.Join(wd, "testdata", "fake-file"),
FluentdKey: path.Join(wd, "testdata", "fake-file"),
FluentdCA: path.Join(wd, "testdata", "fake-file"),
FluentdMaxConnections: 10,
},
TeleportConfig: TeleportConfig{
TeleportAddr: "localhost:3025",
Expand Down Expand Up @@ -83,11 +84,12 @@ func TestStartCmdConfig(t *testing.T) {
Debug: true,
Start: StartCmdConfig{
FluentdConfig: FluentdConfig{
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: path.Join(wd, "testdata", "fake-file"),
FluentdKey: path.Join(wd, "testdata", "fake-file"),
FluentdCA: path.Join(wd, "testdata", "fake-file"),
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: path.Join(wd, "testdata", "fake-file"),
FluentdKey: path.Join(wd, "testdata", "fake-file"),
FluentdCA: path.Join(wd, "testdata", "fake-file"),
FluentdMaxConnections: 10,
},
TeleportConfig: TeleportConfig{
TeleportAddr: "localhost:3025",
Expand Down Expand Up @@ -121,11 +123,12 @@ func TestStartCmdConfig(t *testing.T) {
Debug: true,
Start: StartCmdConfig{
FluentdConfig: FluentdConfig{
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: path.Join(wd, "testdata", "fake-file"),
FluentdKey: path.Join(wd, "testdata", "fake-file"),
FluentdCA: path.Join(wd, "testdata", "fake-file"),
FluentdURL: "https://localhost:8888/test.log",
FluentdSessionURL: "https://localhost:8888/session",
FluentdCert: path.Join(wd, "testdata", "fake-file"),
FluentdKey: path.Join(wd, "testdata", "fake-file"),
FluentdCA: path.Join(wd, "testdata", "fake-file"),
FluentdMaxConnections: 10,
},
TeleportConfig: TeleportConfig{
TeleportAddr: "localhost:3025",
Expand Down
7 changes: 4 additions & 3 deletions integrations/event-handler/fake_fluentd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,10 @@ func (f *FakeFluentd) createServer() error {
// GetClientConfig returns FlientdConfig to connect to this fake fluentd server instance
func (f *FakeFluentd) GetClientConfig() FluentdConfig {
return FluentdConfig{
FluentdCA: f.caCertPath,
FluentdCert: f.clientCertPath,
FluentdKey: f.clientKeyPath,
FluentdCA: f.caCertPath,
FluentdCert: f.clientCertPath,
FluentdKey: f.clientKeyPath,
FluentdMaxConnections: 3,
}
}

Expand Down
35 changes: 27 additions & 8 deletions integrations/event-handler/fluentd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/gravitational/trace"
log "github.com/sirupsen/logrus"
"golang.org/x/net/http2"

tlib "github.com/gravitational/teleport/integrations/lib"
)
Expand All @@ -40,6 +41,7 @@ const (
type FluentdClient struct {
// client HTTP client to send requests
client *http.Client
sem chan struct{}
}

// NewFluentdClient creates new FluentdClient
Expand All @@ -55,22 +57,34 @@ func NewFluentdClient(c *FluentdConfig) (*FluentdClient, error) {
return nil, trace.BadParameter("both fluentd_cert and fluentd_key should be specified")
}

if c.FluentdMaxConnections <= 0 {
return nil, trace.BadParameter("fluentd_max_connections should be greater than 0")
}

ca, err := getCertPool(c)
if err != nil {
return nil, trace.Wrap(err)
}

client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: ca,
Certificates: certs,
},
transport := &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: ca,
Certificates: certs,
},
Timeout: httpTimeout,
MaxIdleConnsPerHost: c.FluentdMaxConnections,
IdleConnTimeout: httpTimeout,
}

if err := http2.ConfigureTransport(transport); err != nil {
return nil, trace.Wrap(err)
}

return &FluentdClient{client: client}, nil
client := &http.Client{
Transport: transport,
Timeout: httpTimeout,
}

return &FluentdClient{client: client, sem: make(chan struct{}, c.FluentdMaxConnections)}, nil
}

// getCertPool reads CA certificate and returns CA cert pool if passed
Expand All @@ -91,6 +105,11 @@ func getCertPool(c *FluentdConfig) (*x509.CertPool, error) {

// Send sends event to fluentd
func (f *FluentdClient) Send(ctx context.Context, url string, b []byte) error {
f.sem <- struct{}{}
defer func() {
<-f.sem
}()

log.WithField("payload", string(b)).Debug("Sending event to Fluentd")

req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(b))
Expand Down
12 changes: 6 additions & 6 deletions integrations/event-handler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/sethvargo/go-limiter v0.7.2
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
golang.org/x/net v0.30.0
golang.org/x/time v0.5.0
google.golang.org/protobuf v1.34.2
)
Expand Down Expand Up @@ -282,15 +283,14 @@ require (
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20231108232855-2478ac86f678 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.19.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/term v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/api v0.177.0 // indirect
google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240429193739-8cf5692501f6 // indirect
Expand Down
24 changes: 12 additions & 12 deletions integrations/event-handler/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -997,8 +997,8 @@ golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20231108232855-2478ac86f678 h1:mchzmB1XO2pMaKFRqk/+MV3mgGG96aqaPXaMifQU47w=
golang.org/x/exp v0.0.0-20231108232855-2478ac86f678/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE=
Expand Down Expand Up @@ -1041,8 +1041,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw=
Expand All @@ -1056,8 +1056,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -1098,8 +1098,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand All @@ -1111,8 +1111,8 @@ golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o=
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA=
golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0=
golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24=
golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand All @@ -1125,8 +1125,8 @@ golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
Expand Down

0 comments on commit ead745a

Please sign in to comment.