diff --git a/internal/stats/labels.go b/internal/stats/labels.go new file mode 100644 index 000000000000..fd33af51ae89 --- /dev/null +++ b/internal/stats/labels.go @@ -0,0 +1,42 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package stats provides internal stats related functionality. +package stats + +import "context" + +// Labels are the labels for metrics. +type Labels struct { + // TelemetryLabels are the telemetry labels to record. + TelemetryLabels map[string]string +} + +type labelsKey struct{} + +// GetLabels returns the Labels stored in the context, or nil if there is one. +func GetLabels(ctx context.Context) *Labels { + labels, _ := ctx.Value(labelsKey{}).(*Labels) + return labels +} + +// SetLabels sets the Labels in the context. +func SetLabels(ctx context.Context, labels *Labels) context.Context { + // could also append + return context.WithValue(ctx, labelsKey{}, labels) +} diff --git a/test/xds/xds_telemetry_labels_test.go b/test/xds/xds_telemetry_labels_test.go new file mode 100644 index 000000000000..8607b6179816 --- /dev/null +++ b/test/xds/xds_telemetry_labels_test.go @@ -0,0 +1,139 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package xds_test + +import ( + "context" + "fmt" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + istats "google.golang.org/grpc/internal/stats" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/stats" + + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" + "google.golang.org/protobuf/types/known/structpb" +) + +const serviceNameKey = "service_name" +const serviceNamespaceKey = "service_namespace" +const serviceNameValue = "grpc-service" +const serviceNamespaceValue = "grpc-service-namespace" + +// TestTelemetryLabels tests that telemetry labels from CDS make their way to +// the stats handler. The stats handler sets the mutable context value that the +// cluster impl picker will write telemetry labels to, and then the stats +// handler asserts that subsequent HandleRPC calls from the RPC lifecycle +// contain telemetry labels that it can see. +func (s) TestTelemetryLabels(t *testing.T) { + managementServer, nodeID, _, resolver, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + defer cleanup() + + server := stubserver.StartTestService(t, nil) + defer server.Stop() + + const xdsServiceName = "my-service-client-side-xds" + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: xdsServiceName, + NodeID: nodeID, + Host: "localhost", + Port: testutils.ParsePort(t, server.Address), + SecLevel: e2e.SecurityLevelNone, + }) + + resources.Clusters[0].Metadata = &v3corepb.Metadata{ + FilterMetadata: map[string]*structpb.Struct{ + "com.google.csm.telemetry_labels": { + Fields: map[string]*structpb.Value{ + serviceNameKey: structpb.NewStringValue(serviceNameValue), + serviceNamespaceKey: structpb.NewStringValue(serviceNamespaceValue), + }, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + fsh := &fakeStatsHandler{ + t: t, + } + + cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", xdsServiceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver), grpc.WithStatsHandler(fsh)) + if err != nil { + t.Fatalf("failed to create a new client to local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } +} + +type fakeStatsHandler struct { + labels *istats.Labels + + t *testing.T +} + +func (fsh *fakeStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { + return ctx +} + +func (fsh *fakeStatsHandler) HandleConn(context.Context, stats.ConnStats) {} + +func (fsh *fakeStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { + labels := &istats.Labels{ + TelemetryLabels: make(map[string]string), + } + fsh.labels = labels + ctx = istats.SetLabels(ctx, labels) // ctx passed is immutable, however cluster_impl writes to the map of Telemetry Labels on the heap. + return ctx +} + +func (fsh *fakeStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + switch rs.(type) { + // stats.Begin won't get Telemetry Labels because happens after picker + // picks. + + // These three stats callouts trigger all metrics for OpenTelemetry that + // aren't started. All of these should have access to the desired telemetry + // labels. + case *stats.OutPayload, *stats.InPayload, *stats.End: + if label, ok := fsh.labels.TelemetryLabels[serviceNameKey]; !ok || label != serviceNameValue { + fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNameKey, serviceNameValue, label) + } + if label, ok := fsh.labels.TelemetryLabels[serviceNamespaceKey]; !ok || label != serviceNamespaceValue { + fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNamespaceKey, serviceNamespaceValue, label) + } + + default: + // Nothing to assert for the other stats.Handler callouts. + } + +} diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index b0f38aff8ee8..cf3434d09992 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -645,6 +645,8 @@ func (b *cdsBalancer) generateDMsForCluster(name string, depth int, dms []cluste } dm.OutlierDetection = odJSON + dm.TelemetryLabels = cluster.TelemetryLabels + return append(dms, dm), true, nil } diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index bee5d2c97a33..9e747ba0892f 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -123,6 +123,7 @@ type clusterImplBalancer struct { requestCounterService string // The service name for the request counter. requestCounter *xdsclient.ClusterRequestsCounter requestCountMax uint32 + telemetryLabels map[string]string pickerUpdateCh *buffer.Unbounded } @@ -465,18 +466,19 @@ func (b *clusterImplBalancer) run() { b.childState = u b.ClientConn.UpdateState(balancer.State{ ConnectivityState: b.childState.ConnectivityState, - Picker: newPicker(b.childState, &dropConfigs{ + Picker: b.newPicker(&dropConfigs{ drops: b.drops, requestCounter: b.requestCounter, requestCountMax: b.requestCountMax, - }, b.loadWrapper), + }), }) case *LBConfig: + b.telemetryLabels = u.TelemetryLabels dc := b.handleDropAndRequestCount(u) if dc != nil && b.childState.Picker != nil { b.ClientConn.UpdateState(balancer.State{ ConnectivityState: b.childState.ConnectivityState, - Picker: newPicker(b.childState, dc, b.loadWrapper), + Picker: b.newPicker(dc), }) } } diff --git a/xds/internal/balancer/clusterimpl/config.go b/xds/internal/balancer/clusterimpl/config.go index cfddc6fb2a1b..70f7265c15a5 100644 --- a/xds/internal/balancer/clusterimpl/config.go +++ b/xds/internal/balancer/clusterimpl/config.go @@ -40,10 +40,12 @@ type LBConfig struct { EDSServiceName string `json:"edsServiceName,omitempty"` // LoadReportingServer is the LRS server to send load reports to. If not // present, load reporting will be disabled. - LoadReportingServer *bootstrap.ServerConfig `json:"lrsLoadReportingServer,omitempty"` - MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"` - DropCategories []DropConfig `json:"dropCategories,omitempty"` - ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"` + LoadReportingServer *bootstrap.ServerConfig `json:"lrsLoadReportingServer,omitempty"` + MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"` + DropCategories []DropConfig `json:"dropCategories,omitempty"` + // TelemetryLabels are the telemetry Labels associated with this cluster. + TelemetryLabels map[string]string `json:"telemetryLabels,omitempty"` + ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"` } func parseConfig(c json.RawMessage) (*LBConfig, error) { diff --git a/xds/internal/balancer/clusterimpl/picker.go b/xds/internal/balancer/clusterimpl/picker.go index 0788d22481d8..d8cb8df1a81c 100644 --- a/xds/internal/balancer/clusterimpl/picker.go +++ b/xds/internal/balancer/clusterimpl/picker.go @@ -19,15 +19,14 @@ package clusterimpl import ( + v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/stats" "google.golang.org/grpc/internal/wrr" "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal/xdsclient" - "google.golang.org/grpc/xds/internal/xdsclient/load" - - v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" ) // NewRandomWRR is used when calculating drops. It's exported so that tests can @@ -78,24 +77,36 @@ type loadReporter interface { // Picker implements RPC drop, circuit breaking drop and load reporting. type picker struct { - drops []*dropper - s balancer.State - loadStore loadReporter - counter *xdsclient.ClusterRequestsCounter - countMax uint32 + drops []*dropper + s balancer.State + loadStore loadReporter + counter *xdsclient.ClusterRequestsCounter + countMax uint32 + telemetryLabels map[string]string } -func newPicker(s balancer.State, config *dropConfigs, loadStore load.PerClusterReporter) *picker { +func (b *clusterImplBalancer) newPicker(config *dropConfigs) *picker { return &picker{ - drops: config.drops, - s: s, - loadStore: loadStore, - counter: config.requestCounter, - countMax: config.requestCountMax, + drops: config.drops, + s: b.childState, + loadStore: b.loadWrapper, + counter: config.requestCounter, + countMax: config.requestCountMax, + telemetryLabels: b.telemetryLabels, } } func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + // Unconditionally set labels if present, even dropped or queued RPC's can + // use these labels. + if info.Ctx != nil { + if labels := stats.GetLabels(info.Ctx); labels != nil && labels.TelemetryLabels != nil { + for key, value := range d.telemetryLabels { + labels.TelemetryLabels[key] = value + } + } + } + // Don't drop unless the inner picker is READY. Similar to // https://github.com/grpc/grpc-go/issues/2622. if d.s.ConnectivityState == connectivity.Ready { diff --git a/xds/internal/balancer/clusterresolver/config.go b/xds/internal/balancer/clusterresolver/config.go index c67608819185..7f88bfacd295 100644 --- a/xds/internal/balancer/clusterresolver/config.go +++ b/xds/internal/balancer/clusterresolver/config.go @@ -103,6 +103,8 @@ type DiscoveryMechanism struct { // OutlierDetection is the Outlier Detection LB configuration for this // priority. OutlierDetection json.RawMessage `json:"outlierDetection,omitempty"` + // TelemetryLabels are the telemetry labels associated with this cluster. + TelemetryLabels map[string]string `json:"telemetryLabels,omitempty"` outlierDetection outlierdetection.LBConfig } diff --git a/xds/internal/balancer/clusterresolver/configbuilder.go b/xds/internal/balancer/clusterresolver/configbuilder.go index 4a878e6da42c..8740360eefdd 100644 --- a/xds/internal/balancer/clusterresolver/configbuilder.go +++ b/xds/internal/balancer/clusterresolver/configbuilder.go @@ -146,8 +146,9 @@ func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName})) } return pName, &clusterimpl.LBConfig{ - Cluster: mechanism.Cluster, - ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy}, + Cluster: mechanism.Cluster, + TelemetryLabels: mechanism.TelemetryLabels, + ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy}, }, retAddrs } @@ -283,6 +284,7 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority EDSServiceName: mechanism.EDSServiceName, LoadReportingServer: mechanism.LoadReportingServer, MaxConcurrentRequests: mechanism.MaxConcurrentRequests, + TelemetryLabels: mechanism.TelemetryLabels, DropCategories: drops, ChildPolicy: xdsLBPolicy, }, addrs, nil