From 0f2db7abb843cd00178c5f69c420c7240f944984 Mon Sep 17 00:00:00 2001 From: jokuniew Date: Tue, 7 Feb 2023 16:45:27 +0100 Subject: [PATCH] feat(inputs.p4runtime): Implementation of P4Runtime input plugin (#12473) Co-Authored-By: Jakub Sikorski --- docs/LICENSE_OF_DEPENDENCIES.md | 1 + go.mod | 7 +- go.sum | 13 +- plugins/inputs/all/p4runtime.go | 5 + plugins/inputs/p4runtime/README.md | 95 +++ plugins/inputs/p4runtime/p4runtime.go | 230 +++++++ .../p4runtime/p4runtime_fake_client_test.go | 128 ++++ plugins/inputs/p4runtime/p4runtime_test.go | 650 ++++++++++++++++++ plugins/inputs/p4runtime/sample.conf | 23 + 9 files changed, 1146 insertions(+), 6 deletions(-) create mode 100644 plugins/inputs/all/p4runtime.go create mode 100644 plugins/inputs/p4runtime/README.md create mode 100644 plugins/inputs/p4runtime/p4runtime.go create mode 100644 plugins/inputs/p4runtime/p4runtime_fake_client_test.go create mode 100644 plugins/inputs/p4runtime/p4runtime_test.go create mode 100644 plugins/inputs/p4runtime/sample.conf diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index d3a523ef1c416..83cdf327cbdae 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -259,6 +259,7 @@ following works: - github.com/opencontainers/runc [Apache License 2.0](https://github.com/opencontainers/runc/blob/main/LICENSE) - github.com/opensearch-project/opensearch-go [Apache License 2.0](https://github.com/opensearch-project/opensearch-go/blob/main/LICENSE.txt) - github.com/opentracing/opentracing-go [Apache License 2.0](https://github.com/opentracing/opentracing-go/blob/master/LICENSE) +- github.com/p4lang/p4runtime [Apache License 2.0](https://github.com/p4lang/p4runtime/blob/main/LICENSE) - github.com/pborman/ansi [BSD 3-Clause "New" or "Revised" License](https://github.com/pborman/ansi/blob/master/LICENSE) - github.com/philhofer/fwd [MIT License](https://github.com/philhofer/fwd/blob/master/LICENSE.md) - github.com/pierrec/lz4 [BSD 3-Clause "New" or "Revised" License](https://github.com/pierrec/lz4/blob/master/LICENSE) diff --git a/go.mod b/go.mod index b9b70383536de..c805193bdea6e 100644 --- a/go.mod +++ b/go.mod @@ -132,6 +132,7 @@ require ( github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5 github.com/openzipkin/zipkin-go v0.2.5 + github.com/p4lang/p4runtime v1.3.0 github.com/pborman/ansi v1.0.0 github.com/pion/dtls/v2 v2.1.5 github.com/pkg/errors v0.9.1 @@ -172,7 +173,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.34.0 go.opentelemetry.io/otel/sdk/metric v0.34.0 go.starlark.net v0.0.0-20220328144851-d1966c6b9fcd - golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 + golang.org/x/mod v0.6.0 golang.org/x/net v0.5.0 golang.org/x/oauth2 v0.3.0 golang.org/x/sync v0.1.0 @@ -423,9 +424,9 @@ require ( go.uber.org/multierr v1.9.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/crypto v0.3.0 // indirect - golang.org/x/exp v0.0.0-20200513190911-00229845015e // indirect + golang.org/x/exp v0.0.0-20230202163644-54bba9f4231b golang.org/x/time v0.1.0 // indirect - golang.org/x/tools v0.1.12 // indirect + golang.org/x/tools v0.2.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect golang.zx2c4.com/wireguard v0.0.0-20211209221555-9c9e7e272434 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index 1894cc67e80b2..8908aae269803 100644 --- a/go.sum +++ b/go.sum @@ -2132,6 +2132,8 @@ github.com/openzipkin/zipkin-go v0.2.5 h1:UwtQQx2pyPIgWYHRg+epgdx1/HnBQTgN3/oIYE github.com/openzipkin/zipkin-go v0.2.5/go.mod h1:KpXfKdgRDnnhsxw4pNIH9Md5lyFqKUa4YDFlwRYAMyE= github.com/ory/go-acc v0.2.6/go.mod h1:4Kb/UnPcT8qRAk3IAxta+hvVapdxTLWtrr7bFLlEgpw= github.com/ory/viper v1.7.5/go.mod h1:ypOuyJmEUb3oENywQZRgeAMwqgOyDqwboO1tj3DjTaM= +github.com/p4lang/p4runtime v1.3.0 h1:3fUhHj0JtsGcL2Bh0uxpACdBJBDqpZyLgj93tqKzoJY= +github.com/p4lang/p4runtime v1.3.0/go.mod h1:voPsRsgz/TDEhcaFvBxfMbI++hSKR/QGJusJveEs9Jg= github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= @@ -2883,8 +2885,9 @@ golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20200331195152-e8c3332aa8e5/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw= -golang.org/x/exp v0.0.0-20200513190911-00229845015e h1:rMqLP+9XLy+LdbCXHjJHAmTfXCr93W7oruWA6Hq1Alc= golang.org/x/exp v0.0.0-20200513190911-00229845015e/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw= +golang.org/x/exp v0.0.0-20230202163644-54bba9f4231b h1:EqBVA+nNsObCwQoBEHy4wLU0pi7i8a4AL3pbItPdPkE= +golang.org/x/exp v0.0.0-20230202163644-54bba9f4231b/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -2918,8 +2921,9 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.6.0 h1:b9gGHsz9/HhJ3HF5DHQytPpuwocVTChQJK3AvoLRD5I= +golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI= golang.org/x/net v0.0.0-20150829230318-ea47fc708ee3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -3396,8 +3400,9 @@ golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyj golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= golang.org/x/tools v0.1.8/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.11/go.mod h1:SgwaegtQh8clINPpECJMqnxLv9I09HLqnW3RMqW0CA4= -golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE= +golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -3510,6 +3515,7 @@ google.golang.org/genproto v0.0.0-20200228133532-8c2c7df3a383/go.mod h1:55QSHmfG google.golang.org/genproto v0.0.0-20200305110556-506484158171/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200312145019-da6875a35672/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200413115906-b5235f65be36/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= @@ -3597,6 +3603,7 @@ google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= +google.golang.org/grpc v1.28.1/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= diff --git a/plugins/inputs/all/p4runtime.go b/plugins/inputs/all/p4runtime.go new file mode 100644 index 0000000000000..84d8b923b21bc --- /dev/null +++ b/plugins/inputs/all/p4runtime.go @@ -0,0 +1,5 @@ +//go:build !custom || inputs || inputs.p4runtime + +package all + +import _ "github.com/influxdata/telegraf/plugins/inputs/p4runtime" // register plugin diff --git a/plugins/inputs/p4runtime/README.md b/plugins/inputs/p4runtime/README.md new file mode 100644 index 0000000000000..25e7476ed5e5f --- /dev/null +++ b/plugins/inputs/p4runtime/README.md @@ -0,0 +1,95 @@ +# P4 Runtime Input Plugin + +P4 is a language for programming the data plane of network devices, +such as Programmable Switches or Programmable Network Interface Cards. +The P4Runtime API is a control plane specification to manage +the data plane elements of those devices dynamically by a P4 program. + +The `p4runtime` plugin gathers metrics about `Counter` values +present in P4 Program loaded onto networking device. +Metrics are collected through gRPC connection with +[P4Runtime](https://github.com/p4lang/p4runtime) server. + +P4Runtime Plugin uses `PkgInfo.Name` field. +If user wants to gather information about program name, please follow +[6.2.1. Annotating P4 code with PkgInfo] instruction and apply changes +to your P4 program. + +## Global configuration options + +In addition to the plugin-specific configuration settings, plugins support +additional global and plugin configuration settings. These settings are used to +modify metrics, tags, and field or create aliases and configure ordering, etc. +See the [CONFIGURATION.md][CONFIGURATION.md] for more details. + +[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins + +## Configuration + +```toml @sample.conf +# P4Runtime telemetry input plugin +[[inputs.p4runtime]] + ## Define the endpoint of P4Runtime gRPC server to collect metrics. + # endpoint = "127.0.0.1:9559" + ## Set DeviceID required for Client Arbitration. + ## https://p4.org/p4-spec/p4runtime/main/P4Runtime-Spec.html#sec-client-arbitration-and-controller-replication + # device_id = 1 + ## Filter counters by their names that should be observed. + ## Example: counter_names_include=["ingressCounter", "egressCounter"] + # counter_names_include = [] + + ## Optional TLS Config. + ## Enable client-side TLS and define CA to authenticate the device. + # enable_tls = false + # tls_ca = "/etc/telegraf/ca.crt" + ## Set minimal TLS version to accept by the client. + # tls_min_version = "TLS12" + ## Use TLS but skip chain & host verification. + # insecure_skip_verify = true + + ## Define client-side TLS certificate & key to authenticate to the device. + # tls_cert = "/etc/telegraf/client.crt" + # tls_key = "/etc/telegraf/client.key" +``` + +## Metrics + +P4Runtime gRPC server communicates using [p4runtime.proto] Protocol Buffer. +Static information about P4 program loaded into programmable switch +are collected by `GetForwardingPipelineConfigRequest` message. +Plugin gathers dynamic metrics with `Read` method. +`Readrequest` is defined with single `Entity` of type `CounterEntry`. +Since P4 Counter is array, plugin collects values of every cell of array +by [wildcard query]. + +Counters defined in P4 Program have unique ID and name. +Counters are arrays, thus `counter_index` informs +which cell value of array is described in metric. + +Tags are constructed in given manner: + +- `p4program_name`: P4 program name provided by user. +If user wants to gather information about program name, please follow +[6.2.1. Annotating P4 code with PkgInfo] instruction and apply changes +to your P4 program. +- `counter_name`: Name of given counter in P4 program. +- `counter_type`: Type of counter (BYTES, PACKETS, BOTH). + +Fields are constructed in given manner: + +- `bytes`: Number of bytes gathered in counter. +- `packets` Number of packets gathered in counter. +- `counter_index`: Index at which metrics are collected in P4 counter. + +## Example Output + +Expected output for p4runtime plugin instance +running on host named `p4runtime-host`: + +```shell +p4_runtime,counter_name=MyIngress.egressTunnelCounter,counter_type=BOTH,host=p4 bytes=408i,packets=4i,counter_index=200i 1675175030000000000 +``` + +[6.2.1. Annotating P4 code with PkgInfo]: https://p4.org/p4-spec/p4runtime/main/P4Runtime-Spec.html#sec-annotating-p4-code-with-pkginfo +[p4runtime.proto]: https://github.com/p4lang/p4runtime/blob/main/proto/p4/v1/p4runtime.proto +[wildcard query]: https://github.com/p4lang/p4runtime/blob/main/proto/p4/v1/p4runtime.proto#L379 diff --git a/plugins/inputs/p4runtime/p4runtime.go b/plugins/inputs/p4runtime/p4runtime.go new file mode 100644 index 0000000000000..00b0ff0e178cc --- /dev/null +++ b/plugins/inputs/p4runtime/p4runtime.go @@ -0,0 +1,230 @@ +package p4runtime + +import ( + "context" + "crypto/tls" + _ "embed" + "fmt" + "io" + "sync" + + p4ConfigV1 "github.com/p4lang/p4runtime/go/p4/config/v1" + p4v1 "github.com/p4lang/p4runtime/go/p4/v1" + "golang.org/x/exp/slices" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + + "github.com/influxdata/telegraf" + internaltls "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/inputs" +) + +//go:embed sample.conf +var sampleConfig string + +const ( + defaultDeviceID = 1 + defaultEndpoint = "127.0.0.1:9559" +) + +type P4runtime struct { + Endpoint string `toml:"endpoint"` + DeviceID uint64 `toml:"device_id"` + CounterNamesInclude []string `toml:"counter_names_include"` + Log telegraf.Logger `toml:"-"` + EnableTLS bool `toml:"enable_tls"` + internaltls.ClientConfig + + conn *grpc.ClientConn + client p4v1.P4RuntimeClient + wg sync.WaitGroup +} + +func (*P4runtime) SampleConfig() string { + return sampleConfig +} + +func (p *P4runtime) Init() error { + if p.Endpoint == "" { + p.Log.Debugf("Using default Endpoint: %v", defaultEndpoint) + p.Endpoint = defaultEndpoint + } + + return p.newP4RuntimeClient() +} + +func (p *P4runtime) Gather(acc telegraf.Accumulator) error { + p4Info, err := p.getP4Info() + if err != nil { + return err + } + + if len(p4Info.Counters) == 0 { + p.Log.Warn("No counters available in P4 Program!") + return nil + } + + filteredCounters := filterCounters(p4Info.Counters, p.CounterNamesInclude) + if len(filteredCounters) == 0 { + p.Log.Warn("No filtered counters available in P4 Program!") + return nil + } + + for _, counter := range filteredCounters { + p.wg.Add(1) + go func(counter *p4ConfigV1.Counter) { + defer p.wg.Done() + entries, err := p.readAllEntries(counter.Preamble.Id) + if err != nil { + acc.AddError( + fmt.Errorf( + "reading counter entries with ID=%v failed with error: %w", + counter.Preamble.Id, + err, + ), + ) + return + } + + for _, entry := range entries { + ce := entry.GetCounterEntry() + + if ce == nil { + acc.AddError(fmt.Errorf("reading counter entry from entry %v failed", entry)) + continue + } + + if ce.Data.ByteCount == 0 && ce.Data.PacketCount == 0 { + continue + } + + fields := map[string]interface{}{ + "bytes": ce.Data.ByteCount, + "packets": ce.Data.PacketCount, + "counter_index": ce.Index.Index, + } + + tags := map[string]string{ + "p4program_name": p4Info.PkgInfo.Name, + "counter_name": counter.Preamble.Name, + "counter_type": counter.Spec.Unit.String(), + } + + acc.AddFields("p4_runtime", fields, tags) + } + }(counter) + } + p.wg.Wait() + return nil +} + +func (p *P4runtime) Stop() { + p.conn.Close() + p.wg.Wait() +} + +func initConnection(endpoint string, tlscfg *tls.Config) (*grpc.ClientConn, error) { + var creds credentials.TransportCredentials + if tlscfg != nil { + creds = credentials.NewTLS(tlscfg) + } else { + creds = insecure.NewCredentials() + } + return grpc.Dial(endpoint, grpc.WithTransportCredentials(creds)) +} + +func (p *P4runtime) getP4Info() (*p4ConfigV1.P4Info, error) { + req := &p4v1.GetForwardingPipelineConfigRequest{ + DeviceId: p.DeviceID, + ResponseType: p4v1.GetForwardingPipelineConfigRequest_ALL, + } + resp, err := p.client.GetForwardingPipelineConfig(context.Background(), req) + if err != nil { + return nil, fmt.Errorf("error when retrieving forwarding pipeline config: %w", err) + } + + config := resp.GetConfig() + if config == nil { + return nil, fmt.Errorf( + "error when retrieving config from forwarding pipeline - pipeline doesn't have a config yet: %w", + err, + ) + } + + p4info := config.GetP4Info() + if p4info == nil { + return nil, fmt.Errorf( + "error when retrieving P4Info from config - config doesn't have a P4Info: %w", + err, + ) + } + + return p4info, nil +} + +func filterCounters(counters []*p4ConfigV1.Counter, counterNamesInclude []string) []*p4ConfigV1.Counter { + if len(counterNamesInclude) == 0 { + return counters + } + + var filteredCounters []*p4ConfigV1.Counter + for _, counter := range counters { + if counter == nil { + continue + } + if slices.Contains(counterNamesInclude, counter.Preamble.Name) { + filteredCounters = append(filteredCounters, counter) + } + } + return filteredCounters +} + +func (p *P4runtime) newP4RuntimeClient() error { + var tlscfg *tls.Config + var err error + + if p.EnableTLS { + if tlscfg, err = p.ClientConfig.TLSConfig(); err != nil { + return err + } + } + + conn, err := initConnection(p.Endpoint, tlscfg) + if err != nil { + return fmt.Errorf("cannot connect to the server: %w", err) + } + p.conn = conn + p.client = p4v1.NewP4RuntimeClient(conn) + return nil +} + +func (p *P4runtime) readAllEntries(counterID uint32) ([]*p4v1.Entity, error) { + readRequest := &p4v1.ReadRequest{ + DeviceId: p.DeviceID, + Entities: []*p4v1.Entity{{ + Entity: &p4v1.Entity_CounterEntry{ + CounterEntry: &p4v1.CounterEntry{ + CounterId: counterID}}}}} + + stream, err := p.client.Read(context.Background(), readRequest) + if err != nil { + return nil, err + } + + rep, err := stream.Recv() + if err != nil && err != io.EOF { + return nil, err + } + + return rep.Entities, nil +} + +func init() { + inputs.Add("p4runtime", func() telegraf.Input { + p4runtime := &P4runtime{ + DeviceID: defaultDeviceID, + } + return p4runtime + }) +} diff --git a/plugins/inputs/p4runtime/p4runtime_fake_client_test.go b/plugins/inputs/p4runtime/p4runtime_fake_client_test.go new file mode 100644 index 0000000000000..204c39c1a74b4 --- /dev/null +++ b/plugins/inputs/p4runtime/p4runtime_fake_client_test.go @@ -0,0 +1,128 @@ +package p4runtime + +import ( + "context" + + p4v1 "github.com/p4lang/p4runtime/go/p4/v1" + "google.golang.org/grpc" +) + +type fakeP4RuntimeClient struct { + writeFn func( + ctx context.Context, + in *p4v1.WriteRequest, + opts ...grpc.CallOption, + ) (*p4v1.WriteResponse, error) + + readFn func( + ctx context.Context, + in *p4v1.ReadRequest, + opts ...grpc.CallOption, + ) (p4v1.P4Runtime_ReadClient, error) + + setForwardingPipelineConfigFn func( + ctx context.Context, + in *p4v1.SetForwardingPipelineConfigRequest, + opts ...grpc.CallOption, + ) (*p4v1.SetForwardingPipelineConfigResponse, error) + + getForwardingPipelineConfigFn func( + ctx context.Context, + in *p4v1.GetForwardingPipelineConfigRequest, + opts ...grpc.CallOption, + ) (*p4v1.GetForwardingPipelineConfigResponse, error) + + streamChannelFn func( + ctx context.Context, + opts ...grpc.CallOption, + ) (p4v1.P4Runtime_StreamChannelClient, error) + + capabilitiesFn func( + ctx context.Context, + in *p4v1.CapabilitiesRequest, + opts ...grpc.CallOption, + ) (*p4v1.CapabilitiesResponse, error) +} + +// fakeP4RuntimeClient implements the p4v1.P4RuntimeClient interface +var _ p4v1.P4RuntimeClient = &fakeP4RuntimeClient{} + +func (c *fakeP4RuntimeClient) Write( + ctx context.Context, + in *p4v1.WriteRequest, + opts ...grpc.CallOption, +) (*p4v1.WriteResponse, error) { + if c.writeFn == nil { + panic("No mock defined for Write RPC") + } + return c.writeFn(ctx, in, opts...) +} + +func (c *fakeP4RuntimeClient) Read( + ctx context.Context, + in *p4v1.ReadRequest, + opts ...grpc.CallOption, +) (p4v1.P4Runtime_ReadClient, error) { + if c.readFn == nil { + panic("No mock defined for Read RPC") + } + return c.readFn(ctx, in, opts...) +} + +func (c *fakeP4RuntimeClient) SetForwardingPipelineConfig( + ctx context.Context, + in *p4v1.SetForwardingPipelineConfigRequest, + opts ...grpc.CallOption, +) (*p4v1.SetForwardingPipelineConfigResponse, error) { + if c.setForwardingPipelineConfigFn == nil { + panic("No mock defined for SetForwardingPipelineConfig RPC") + } + return c.setForwardingPipelineConfigFn(ctx, in, opts...) +} + +func (c *fakeP4RuntimeClient) GetForwardingPipelineConfig( + ctx context.Context, + in *p4v1.GetForwardingPipelineConfigRequest, + opts ...grpc.CallOption, +) (*p4v1.GetForwardingPipelineConfigResponse, error) { + if c.getForwardingPipelineConfigFn == nil { + panic("No mock defined for GetForwardingPipelineConfig RPC") + } + return c.getForwardingPipelineConfigFn(ctx, in, opts...) +} + +func (c *fakeP4RuntimeClient) StreamChannel( + ctx context.Context, + opts ...grpc.CallOption, +) (p4v1.P4Runtime_StreamChannelClient, error) { + if c.streamChannelFn == nil { + panic("No mock defined for StreamChannel") + } + return c.streamChannelFn(ctx, opts...) +} + +func (c *fakeP4RuntimeClient) Capabilities( + ctx context.Context, + in *p4v1.CapabilitiesRequest, + opts ...grpc.CallOption, +) (*p4v1.CapabilitiesResponse, error) { + if c.capabilitiesFn == nil { + panic("No mock defined for Capabilities RPC") + } + return c.capabilitiesFn(ctx, in, opts...) +} + +type fakeP4RuntimeReadClient struct { + grpc.ClientStream + recvFn func() (*p4v1.ReadResponse, error) +} + +// fakeP4RuntimeReadClient implements the p4v1.P4Runtime_ReadClient interface +var _ p4v1.P4Runtime_ReadClient = &fakeP4RuntimeReadClient{} + +func (c *fakeP4RuntimeReadClient) Recv() (*p4v1.ReadResponse, error) { + if c.recvFn == nil { + panic("No mock provided for Recv function") + } + return c.recvFn() +} diff --git a/plugins/inputs/p4runtime/p4runtime_test.go b/plugins/inputs/p4runtime/p4runtime_test.go new file mode 100644 index 0000000000000..0348b37a2ec4d --- /dev/null +++ b/plugins/inputs/p4runtime/p4runtime_test.go @@ -0,0 +1,650 @@ +package p4runtime + +import ( + "context" + "errors" + "fmt" + "net" + "testing" + "time" + + p4ConfigV1 "github.com/p4lang/p4runtime/go/p4/config/v1" + p4v1 "github.com/p4lang/p4runtime/go/p4/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" +) + +// CounterSpec available here https://github.com/p4lang/p4runtime/blob/main/proto/p4/config/v1/p4info.proto#L289 +func createCounter( + name string, + id uint32, + unit p4ConfigV1.CounterSpec_Unit, +) *p4ConfigV1.Counter { + return &p4ConfigV1.Counter{ + Preamble: &p4ConfigV1.Preamble{Name: name, Id: id}, + Spec: &p4ConfigV1.CounterSpec{Unit: unit}, + } +} + +func createEntityCounterEntry( + counterID uint32, + index int64, + data *p4v1.CounterData, +) *p4v1.Entity_CounterEntry { + return &p4v1.Entity_CounterEntry{ + CounterEntry: &p4v1.CounterEntry{ + CounterId: counterID, + Index: &p4v1.Index{Index: index}, + Data: data, + }, + } +} + +func NewTestP4RuntimeClient( + p4RuntimeClient *fakeP4RuntimeClient, + addr string, +) *P4runtime { + conn, _ := grpc.Dial( + addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + return &P4runtime{ + Endpoint: addr, + DeviceID: uint64(1), + Log: testutil.Logger{}, + conn: conn, + client: p4RuntimeClient, + } +} + +func TestInitDefault(t *testing.T) { + plugin := &P4runtime{Log: testutil.Logger{}} + require.NoError(t, plugin.Init()) + require.Equal(t, "127.0.0.1:9559", plugin.Endpoint) + require.Equal(t, uint64(0), plugin.DeviceID) + require.Empty(t, plugin.CounterNamesInclude) + require.False(t, plugin.EnableTLS) +} + +func TestErrorGetP4Info(t *testing.T) { + responses := []struct { + getForwardingPipelineConfigResponse *p4v1.GetForwardingPipelineConfigResponse + getForwardingPipelineConfigResponseError error + }{ + { + getForwardingPipelineConfigResponse: nil, + getForwardingPipelineConfigResponseError: fmt.Errorf( + "error when retrieving forwarding pipeline config", + ), + }, { + getForwardingPipelineConfigResponse: &p4v1.GetForwardingPipelineConfigResponse{ + Config: nil, + }, + getForwardingPipelineConfigResponseError: nil, + }, { + getForwardingPipelineConfigResponse: &p4v1.GetForwardingPipelineConfigResponse{ + Config: &p4v1.ForwardingPipelineConfig{P4Info: nil}, + }, + getForwardingPipelineConfigResponseError: nil, + }, + } + + for _, response := range responses { + p4RtClient := &fakeP4RuntimeClient{ + getForwardingPipelineConfigFn: func( + ctx context.Context, + in *p4v1.GetForwardingPipelineConfigRequest, + opts ...grpc.CallOption, + ) (*p4v1.GetForwardingPipelineConfigResponse, error) { + return response.getForwardingPipelineConfigResponse, response.getForwardingPipelineConfigResponseError + }, + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String()) + + var acc testutil.Accumulator + require.Error(t, plugin.Gather(&acc)) + } +} + +func TestOneCounterRead(t *testing.T) { + tests := []struct { + forwardingPipelineConfig *p4v1.ForwardingPipelineConfig + EntityCounterEntry *p4v1.Entity_CounterEntry + expected []telegraf.Metric + }{ + { + forwardingPipelineConfig: &p4v1.ForwardingPipelineConfig{ + P4Info: &p4ConfigV1.P4Info{ + Counters: []*p4ConfigV1.Counter{ + createCounter("foo", 1111, p4ConfigV1.CounterSpec_BOTH), + }, + PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"}, + }, + }, + EntityCounterEntry: createEntityCounterEntry( + 1111, + 5, + &p4v1.CounterData{ByteCount: 5, PacketCount: 1}, + ), + expected: []telegraf.Metric{testutil.MustMetric( + "p4_runtime", + map[string]string{ + "p4program_name": "P4Program", + "counter_name": "foo", + "counter_type": "BOTH", + }, + map[string]interface{}{ + "bytes": int64(5), + "packets": int64(1), + "counter_index": 5}, + time.Unix(0, 0)), + }, + }, { + forwardingPipelineConfig: &p4v1.ForwardingPipelineConfig{ + P4Info: &p4ConfigV1.P4Info{ + Counters: []*p4ConfigV1.Counter{ + createCounter( + "foo", + 2222, + p4ConfigV1.CounterSpec_BYTES, + ), + }, + PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"}, + }, + }, + EntityCounterEntry: createEntityCounterEntry( + 2222, + 5, + &p4v1.CounterData{ByteCount: 5}, + ), + expected: []telegraf.Metric{testutil.MustMetric( + "p4_runtime", + map[string]string{ + "p4program_name": "P4Program", + "counter_name": "foo", + "counter_type": "BYTES", + }, + map[string]interface{}{ + "bytes": int64(5), + "packets": int64(0), + "counter_index": 5}, + time.Unix(0, 0)), + }, + }, { + forwardingPipelineConfig: &p4v1.ForwardingPipelineConfig{ + P4Info: &p4ConfigV1.P4Info{ + Counters: []*p4ConfigV1.Counter{ + createCounter( + "foo", + 3333, + p4ConfigV1.CounterSpec_PACKETS, + ), + }, + PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"}, + }, + }, + EntityCounterEntry: createEntityCounterEntry( + 3333, + 5, + &p4v1.CounterData{PacketCount: 1}, + ), + expected: []telegraf.Metric{testutil.MustMetric( + "p4_runtime", + map[string]string{ + "p4program_name": "P4Program", + "counter_name": "foo", + "counter_type": "PACKETS", + }, + map[string]interface{}{ + "bytes": int64(0), + "packets": int64(1), + "counter_index": 5}, + time.Unix(0, 0)), + }, + }, { + forwardingPipelineConfig: &p4v1.ForwardingPipelineConfig{ + P4Info: &p4ConfigV1.P4Info{ + Counters: []*p4ConfigV1.Counter{ + createCounter("foo", 4444, p4ConfigV1.CounterSpec_BOTH), + }, + PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"}, + }, + }, + EntityCounterEntry: createEntityCounterEntry( + 4444, + 5, + &p4v1.CounterData{}, + ), + expected: nil, + }, + } + + for _, tt := range tests { + p4RtReadClient := &fakeP4RuntimeReadClient{ + recvFn: func() (*p4v1.ReadResponse, error) { + return &p4v1.ReadResponse{ + Entities: []*p4v1.Entity{{Entity: tt.EntityCounterEntry}}, + }, nil + }, + } + + p4RtClient := &fakeP4RuntimeClient{ + readFn: func(ctx context.Context, in *p4v1.ReadRequest, opts ...grpc.CallOption) (p4v1.P4Runtime_ReadClient, error) { + return p4RtReadClient, nil + }, + getForwardingPipelineConfigFn: func( + ctx context.Context, + in *p4v1.GetForwardingPipelineConfigRequest, + opts ...grpc.CallOption, + ) (*p4v1.GetForwardingPipelineConfigResponse, error) { + return &p4v1.GetForwardingPipelineConfigResponse{ + Config: tt.forwardingPipelineConfig, + }, nil + }, + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String()) + + var acc testutil.Accumulator + require.NoError(t, plugin.Gather(&acc)) + + testutil.RequireMetricsEqual( + t, + tt.expected, + acc.GetTelegrafMetrics(), + testutil.IgnoreTime(), + ) + } +} + +func TestMultipleEntitiesSingleCounterRead(t *testing.T) { + totalNumOfEntriesArr := [3]int{2, 10, 100} + + for _, totalNumOfEntries := range totalNumOfEntriesArr { + var expected []telegraf.Metric + + fmt.Println( + "Running TestMultipleEntitiesSingleCounterRead with ", + totalNumOfEntries, + "totalNumOfCounters", + ) + entities := make([]*p4v1.Entity, 0, totalNumOfEntries) + p4InfoCounters := make([]*p4ConfigV1.Counter, 0, totalNumOfEntries) + p4InfoCounters = append( + p4InfoCounters, + createCounter("foo", 0, p4ConfigV1.CounterSpec_BOTH), + ) + + for i := 0; i < totalNumOfEntries; i++ { + counterEntry := &p4v1.Entity{ + Entity: createEntityCounterEntry( + 0, + int64(i), + &p4v1.CounterData{ + ByteCount: int64(10), + PacketCount: int64(10), + }, + ), + } + + entities = append(entities, counterEntry) + expected = append(expected, testutil.MustMetric( + "p4_runtime", + map[string]string{ + "p4program_name": "P4Program", + "counter_name": "foo", + "counter_type": "BOTH", + }, + map[string]interface{}{ + "bytes": int64(10), + "packets": int64(10), + "counter_index": i, + }, + time.Unix(0, 0), + )) + } + + forwardingPipelineConfig := &p4v1.ForwardingPipelineConfig{ + P4Info: &p4ConfigV1.P4Info{ + Counters: p4InfoCounters, + PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"}, + }, + } + + p4RtReadClient := &fakeP4RuntimeReadClient{ + recvFn: func() (*p4v1.ReadResponse, error) { + return &p4v1.ReadResponse{Entities: entities}, nil + }, + } + + p4RtClient := &fakeP4RuntimeClient{ + readFn: func(ctx context.Context, in *p4v1.ReadRequest, opts ...grpc.CallOption) (p4v1.P4Runtime_ReadClient, error) { + return p4RtReadClient, nil + }, + getForwardingPipelineConfigFn: func( + ctx context.Context, + in *p4v1.GetForwardingPipelineConfigRequest, + opts ...grpc.CallOption, + ) (*p4v1.GetForwardingPipelineConfigResponse, error) { + return &p4v1.GetForwardingPipelineConfigResponse{ + Config: forwardingPipelineConfig, + }, nil + }, + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String()) + + var acc testutil.Accumulator + require.NoError(t, plugin.Gather(&acc)) + acc.Wait(totalNumOfEntries) + + testutil.RequireMetricsEqual( + t, + expected, + acc.GetTelegrafMetrics(), + testutil.IgnoreTime(), + ) + } +} + +func TestSingleEntitiesMultipleCounterRead(t *testing.T) { + totalNumOfCountersArr := [3]int{2, 10, 100} + + for _, totalNumOfCounters := range totalNumOfCountersArr { + var expected []telegraf.Metric + + fmt.Println( + "Running TestSingleEntitiesMultipleCounterRead with ", + totalNumOfCounters, + "totalNumOfCounters", + ) + p4InfoCounters := make([]*p4ConfigV1.Counter, 0, totalNumOfCounters) + + for i := 1; i <= totalNumOfCounters; i++ { + counterName := fmt.Sprintf("foo%v", i) + p4InfoCounters = append( + p4InfoCounters, + createCounter( + counterName, + uint32(i), + p4ConfigV1.CounterSpec_BOTH, + ), + ) + + expected = append(expected, testutil.MustMetric( + "p4_runtime", + map[string]string{ + "p4program_name": "P4Program", + "counter_name": counterName, + "counter_type": "BOTH", + }, + map[string]interface{}{ + "bytes": int64(10), + "packets": int64(10), + "counter_index": 1, + }, + time.Unix(0, 0), + )) + } + + forwardingPipelineConfig := &p4v1.ForwardingPipelineConfig{ + P4Info: &p4ConfigV1.P4Info{ + Counters: p4InfoCounters, + PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"}, + }, + } + + p4RtClient := &fakeP4RuntimeClient{ + readFn: func(ctx context.Context, in *p4v1.ReadRequest, opts ...grpc.CallOption) (p4v1.P4Runtime_ReadClient, error) { + counterID := in.Entities[0].GetCounterEntry().CounterId + return &fakeP4RuntimeReadClient{ + recvFn: func() (*p4v1.ReadResponse, error) { + return &p4v1.ReadResponse{ + Entities: []*p4v1.Entity{{ + Entity: createEntityCounterEntry( + counterID, + 1, + &p4v1.CounterData{ + ByteCount: 10, + PacketCount: 10, + }, + ), + }}, + }, nil + }, + }, nil + }, + getForwardingPipelineConfigFn: func( + ctx context.Context, + in *p4v1.GetForwardingPipelineConfigRequest, + opts ...grpc.CallOption, + ) (*p4v1.GetForwardingPipelineConfigResponse, error) { + return &p4v1.GetForwardingPipelineConfigResponse{ + Config: forwardingPipelineConfig, + }, nil + }, + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String()) + + var acc testutil.Accumulator + require.NoError(t, plugin.Gather(&acc)) + acc.Wait(totalNumOfCounters) + + testutil.RequireMetricsEqual( + t, + expected, + acc.GetTelegrafMetrics(), + testutil.SortMetrics(), + testutil.IgnoreTime(), + ) + } +} + +func TestNoCountersAvailable(t *testing.T) { + forwardingPipelineConfig := &p4v1.ForwardingPipelineConfig{ + P4Info: &p4ConfigV1.P4Info{Counters: []*p4ConfigV1.Counter{}}, + } + + p4RtClient := &fakeP4RuntimeClient{ + getForwardingPipelineConfigFn: func( + ctx context.Context, + in *p4v1.GetForwardingPipelineConfigRequest, + opts ...grpc.CallOption, + ) (*p4v1.GetForwardingPipelineConfigResponse, error) { + return &p4v1.GetForwardingPipelineConfigResponse{ + Config: forwardingPipelineConfig, + }, nil + }, + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String()) + + var acc testutil.Accumulator + require.NoError(t, plugin.Gather(&acc)) +} + +func TestFilterCounters(t *testing.T) { + forwardingPipelineConfig := &p4v1.ForwardingPipelineConfig{ + P4Info: &p4ConfigV1.P4Info{ + Counters: []*p4ConfigV1.Counter{ + createCounter("foo", 1, p4ConfigV1.CounterSpec_BOTH), + }, + PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"}, + }, + } + + p4RtClient := &fakeP4RuntimeClient{ + getForwardingPipelineConfigFn: func( + ctx context.Context, + in *p4v1.GetForwardingPipelineConfigRequest, + opts ...grpc.CallOption, + ) (*p4v1.GetForwardingPipelineConfigResponse, error) { + return &p4v1.GetForwardingPipelineConfigResponse{ + Config: forwardingPipelineConfig, + }, nil + }, + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String()) + + plugin.CounterNamesInclude = []string{"oof"} + + var acc testutil.Accumulator + require.NoError(t, plugin.Gather(&acc)) + testutil.RequireMetricsEqual( + t, + nil, + acc.GetTelegrafMetrics(), + testutil.IgnoreTime(), + ) +} + +func TestFailReadCounterEntryFromEntry(t *testing.T) { + p4RtReadClient := &fakeP4RuntimeReadClient{ + recvFn: func() (*p4v1.ReadResponse, error) { + return &p4v1.ReadResponse{ + Entities: []*p4v1.Entity{{ + Entity: &p4v1.Entity_TableEntry{ + TableEntry: &p4v1.TableEntry{}, + }}}}, nil + }, + } + + p4RtClient := &fakeP4RuntimeClient{ + readFn: func(ctx context.Context, in *p4v1.ReadRequest, opts ...grpc.CallOption) (p4v1.P4Runtime_ReadClient, error) { + return p4RtReadClient, nil + }, + getForwardingPipelineConfigFn: func( + ctx context.Context, + in *p4v1.GetForwardingPipelineConfigRequest, + opts ...grpc.CallOption, + ) (*p4v1.GetForwardingPipelineConfigResponse, error) { + return &p4v1.GetForwardingPipelineConfigResponse{ + Config: &p4v1.ForwardingPipelineConfig{ + P4Info: &p4ConfigV1.P4Info{ + Counters: []*p4ConfigV1.Counter{ + createCounter( + "foo", + 1111, + p4ConfigV1.CounterSpec_BOTH, + ), + }, + PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"}, + }, + }, + }, nil + }, + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String()) + + var acc testutil.Accumulator + require.NoError(t, plugin.Gather(&acc)) + assert.Equal( + t, + acc.Errors[0], + errors.New("reading counter entry from entry table_entry:<> failed"), + ) + testutil.RequireMetricsEqual( + t, + nil, + acc.GetTelegrafMetrics(), + testutil.IgnoreTime(), + ) +} + +func TestFailReadAllEntries(t *testing.T) { + p4RtClient := &fakeP4RuntimeClient{ + readFn: func(ctx context.Context, in *p4v1.ReadRequest, opts ...grpc.CallOption) (p4v1.P4Runtime_ReadClient, error) { + return nil, errors.New("Connection error") + }, + getForwardingPipelineConfigFn: func( + ctx context.Context, + in *p4v1.GetForwardingPipelineConfigRequest, + opts ...grpc.CallOption, + ) (*p4v1.GetForwardingPipelineConfigResponse, error) { + return &p4v1.GetForwardingPipelineConfigResponse{ + Config: &p4v1.ForwardingPipelineConfig{ + P4Info: &p4ConfigV1.P4Info{ + Counters: []*p4ConfigV1.Counter{ + createCounter( + "foo", + 1111, + p4ConfigV1.CounterSpec_BOTH, + ), + }, + PkgInfo: &p4ConfigV1.PkgInfo{Name: "P4Program"}, + }, + }, + }, nil + }, + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + plugin := NewTestP4RuntimeClient(p4RtClient, listener.Addr().String()) + + var acc testutil.Accumulator + require.NoError(t, plugin.Gather(&acc)) + assert.Equal( + t, + acc.Errors[0], + fmt.Errorf("reading counter entries with ID=1111 failed with error: %w", + errors.New("Connection error")), + ) + testutil.RequireMetricsEqual( + t, + nil, + acc.GetTelegrafMetrics(), + testutil.IgnoreTime(), + ) +} + +func TestFilterCounterNamesInclude(t *testing.T) { + counters := []*p4ConfigV1.Counter{ + createCounter("foo", 1, p4ConfigV1.CounterSpec_BOTH), + createCounter("bar", 2, p4ConfigV1.CounterSpec_BOTH), + nil, + createCounter("", 3, p4ConfigV1.CounterSpec_BOTH), + } + + counterNamesInclude := []string{"bar"} + + filteredCounters := filterCounters(counters, counterNamesInclude) + assert.Equal( + t, + filteredCounters, + []*p4ConfigV1.Counter{ + createCounter("bar", 2, p4ConfigV1.CounterSpec_BOTH), + }, + ) +} diff --git a/plugins/inputs/p4runtime/sample.conf b/plugins/inputs/p4runtime/sample.conf new file mode 100644 index 0000000000000..66df7a7e13d16 --- /dev/null +++ b/plugins/inputs/p4runtime/sample.conf @@ -0,0 +1,23 @@ +# P4Runtime telemetry input plugin +[[inputs.p4runtime]] + ## Define the endpoint of P4Runtime gRPC server to collect metrics. + # endpoint = "127.0.0.1:9559" + ## Set DeviceID required for Client Arbitration. + ## https://p4.org/p4-spec/p4runtime/main/P4Runtime-Spec.html#sec-client-arbitration-and-controller-replication + # device_id = 1 + ## Filter counters by their names that should be observed. + ## Example: counter_names_include=["ingressCounter", "egressCounter"] + # counter_names_include = [] + + ## Optional TLS Config. + ## Enable client-side TLS and define CA to authenticate the device. + # enable_tls = false + # tls_ca = "/etc/telegraf/ca.crt" + ## Set minimal TLS version to accept by the client. + # tls_min_version = "TLS12" + ## Use TLS but skip chain & host verification. + # insecure_skip_verify = true + + ## Define client-side TLS certificate & key to authenticate to the device. + # tls_cert = "/etc/telegraf/client.crt" + # tls_key = "/etc/telegraf/client.key"