Skip to content

Commit

Permalink
xds: Process telemetry labels from CDS in xDS Balancers (#7116)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq authored Apr 15, 2024
1 parent a4afd4d commit b37cd81
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 23 deletions.
42 changes: 42 additions & 0 deletions internal/stats/labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package stats provides internal stats related functionality.
package stats

import "context"

// Labels are the labels for metrics.
type Labels struct {
// TelemetryLabels are the telemetry labels to record.
TelemetryLabels map[string]string
}

type labelsKey struct{}

// GetLabels returns the Labels stored in the context, or nil if there is one.
func GetLabels(ctx context.Context) *Labels {
labels, _ := ctx.Value(labelsKey{}).(*Labels)
return labels
}

// SetLabels sets the Labels in the context.
func SetLabels(ctx context.Context, labels *Labels) context.Context {
// could also append
return context.WithValue(ctx, labelsKey{}, labels)
}
139 changes: 139 additions & 0 deletions test/xds/xds_telemetry_labels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds_test

import (
"context"
"fmt"
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/stats"

v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/protobuf/types/known/structpb"
)

const serviceNameKey = "service_name"
const serviceNamespaceKey = "service_namespace"
const serviceNameValue = "grpc-service"
const serviceNamespaceValue = "grpc-service-namespace"

// TestTelemetryLabels tests that telemetry labels from CDS make their way to
// the stats handler. The stats handler sets the mutable context value that the
// cluster impl picker will write telemetry labels to, and then the stats
// handler asserts that subsequent HandleRPC calls from the RPC lifecycle
// contain telemetry labels that it can see.
func (s) TestTelemetryLabels(t *testing.T) {
managementServer, nodeID, _, resolver, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()

server := stubserver.StartTestService(t, nil)
defer server.Stop()

const xdsServiceName = "my-service-client-side-xds"
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: xdsServiceName,
NodeID: nodeID,
Host: "localhost",
Port: testutils.ParsePort(t, server.Address),
SecLevel: e2e.SecurityLevelNone,
})

resources.Clusters[0].Metadata = &v3corepb.Metadata{
FilterMetadata: map[string]*structpb.Struct{
"com.google.csm.telemetry_labels": {
Fields: map[string]*structpb.Value{
serviceNameKey: structpb.NewStringValue(serviceNameValue),
serviceNamespaceKey: structpb.NewStringValue(serviceNamespaceValue),
},
},
},
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

fsh := &fakeStatsHandler{
t: t,
}

cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", xdsServiceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver), grpc.WithStatsHandler(fsh))
if err != nil {
t.Fatalf("failed to create a new client to local test server: %v", err)
}
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}
}

type fakeStatsHandler struct {
labels *istats.Labels

t *testing.T
}

func (fsh *fakeStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}

func (fsh *fakeStatsHandler) HandleConn(context.Context, stats.ConnStats) {}

func (fsh *fakeStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
labels := &istats.Labels{
TelemetryLabels: make(map[string]string),
}
fsh.labels = labels
ctx = istats.SetLabels(ctx, labels) // ctx passed is immutable, however cluster_impl writes to the map of Telemetry Labels on the heap.
return ctx
}

func (fsh *fakeStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
switch rs.(type) {
// stats.Begin won't get Telemetry Labels because happens after picker
// picks.

// These three stats callouts trigger all metrics for OpenTelemetry that
// aren't started. All of these should have access to the desired telemetry
// labels.
case *stats.OutPayload, *stats.InPayload, *stats.End:
if label, ok := fsh.labels.TelemetryLabels[serviceNameKey]; !ok || label != serviceNameValue {
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNameKey, serviceNameValue, label)
}
if label, ok := fsh.labels.TelemetryLabels[serviceNamespaceKey]; !ok || label != serviceNamespaceValue {
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNamespaceKey, serviceNamespaceValue, label)
}

default:
// Nothing to assert for the other stats.Handler callouts.
}

}
2 changes: 2 additions & 0 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,8 @@ func (b *cdsBalancer) generateDMsForCluster(name string, depth int, dms []cluste
}
dm.OutlierDetection = odJSON

dm.TelemetryLabels = cluster.TelemetryLabels

return append(dms, dm), true, nil
}

Expand Down
8 changes: 5 additions & 3 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type clusterImplBalancer struct {
requestCounterService string // The service name for the request counter.
requestCounter *xdsclient.ClusterRequestsCounter
requestCountMax uint32
telemetryLabels map[string]string
pickerUpdateCh *buffer.Unbounded
}

Expand Down Expand Up @@ -465,18 +466,19 @@ func (b *clusterImplBalancer) run() {
b.childState = u
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: newPicker(b.childState, &dropConfigs{
Picker: b.newPicker(&dropConfigs{
drops: b.drops,
requestCounter: b.requestCounter,
requestCountMax: b.requestCountMax,
}, b.loadWrapper),
}),
})
case *LBConfig:
b.telemetryLabels = u.TelemetryLabels
dc := b.handleDropAndRequestCount(u)
if dc != nil && b.childState.Picker != nil {
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: newPicker(b.childState, dc, b.loadWrapper),
Picker: b.newPicker(dc),
})
}
}
Expand Down
10 changes: 6 additions & 4 deletions xds/internal/balancer/clusterimpl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ type LBConfig struct {
EDSServiceName string `json:"edsServiceName,omitempty"`
// LoadReportingServer is the LRS server to send load reports to. If not
// present, load reporting will be disabled.
LoadReportingServer *bootstrap.ServerConfig `json:"lrsLoadReportingServer,omitempty"`
MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"`
DropCategories []DropConfig `json:"dropCategories,omitempty"`
ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
LoadReportingServer *bootstrap.ServerConfig `json:"lrsLoadReportingServer,omitempty"`
MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"`
DropCategories []DropConfig `json:"dropCategories,omitempty"`
// TelemetryLabels are the telemetry Labels associated with this cluster.
TelemetryLabels map[string]string `json:"telemetryLabels,omitempty"`
ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
}

func parseConfig(c json.RawMessage) (*LBConfig, error) {
Expand Down
39 changes: 25 additions & 14 deletions xds/internal/balancer/clusterimpl/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
package clusterimpl

import (
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/load"

v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
)

// NewRandomWRR is used when calculating drops. It's exported so that tests can
Expand Down Expand Up @@ -78,24 +77,36 @@ type loadReporter interface {

// Picker implements RPC drop, circuit breaking drop and load reporting.
type picker struct {
drops []*dropper
s balancer.State
loadStore loadReporter
counter *xdsclient.ClusterRequestsCounter
countMax uint32
drops []*dropper
s balancer.State
loadStore loadReporter
counter *xdsclient.ClusterRequestsCounter
countMax uint32
telemetryLabels map[string]string
}

func newPicker(s balancer.State, config *dropConfigs, loadStore load.PerClusterReporter) *picker {
func (b *clusterImplBalancer) newPicker(config *dropConfigs) *picker {
return &picker{
drops: config.drops,
s: s,
loadStore: loadStore,
counter: config.requestCounter,
countMax: config.requestCountMax,
drops: config.drops,
s: b.childState,
loadStore: b.loadWrapper,
counter: config.requestCounter,
countMax: config.requestCountMax,
telemetryLabels: b.telemetryLabels,
}
}

func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// Unconditionally set labels if present, even dropped or queued RPC's can
// use these labels.
if info.Ctx != nil {
if labels := stats.GetLabels(info.Ctx); labels != nil && labels.TelemetryLabels != nil {
for key, value := range d.telemetryLabels {
labels.TelemetryLabels[key] = value
}
}
}

// Don't drop unless the inner picker is READY. Similar to
// https://github.com/grpc/grpc-go/issues/2622.
if d.s.ConnectivityState == connectivity.Ready {
Expand Down
2 changes: 2 additions & 0 deletions xds/internal/balancer/clusterresolver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type DiscoveryMechanism struct {
// OutlierDetection is the Outlier Detection LB configuration for this
// priority.
OutlierDetection json.RawMessage `json:"outlierDetection,omitempty"`
// TelemetryLabels are the telemetry labels associated with this cluster.
TelemetryLabels map[string]string `json:"telemetryLabels,omitempty"`
outlierDetection outlierdetection.LBConfig
}

Expand Down
6 changes: 4 additions & 2 deletions xds/internal/balancer/clusterresolver/configbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism
retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName}))
}
return pName, &clusterimpl.LBConfig{
Cluster: mechanism.Cluster,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy},
Cluster: mechanism.Cluster,
TelemetryLabels: mechanism.TelemetryLabels,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy},
}, retAddrs
}

Expand Down Expand Up @@ -283,6 +284,7 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority
EDSServiceName: mechanism.EDSServiceName,
LoadReportingServer: mechanism.LoadReportingServer,
MaxConcurrentRequests: mechanism.MaxConcurrentRequests,
TelemetryLabels: mechanism.TelemetryLabels,
DropCategories: drops,
ChildPolicy: xdsLBPolicy,
}, addrs, nil
Expand Down

0 comments on commit b37cd81

Please sign in to comment.