Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement A72: OpenTelemetry Tracing #9

Open
wants to merge 57 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
c63aeef
transport: add send operations to ClientStream and ServerStream (#7808)
dfawley Nov 20, 2024
87f0254
xdsclient: fix new watcher hang when registering for removed resource…
purnesh42H Nov 21, 2024
44a5eb9
xdsclient: fix new watcher to get both old good update and nack error…
purnesh42H Nov 21, 2024
93f1cc1
credentials/alts: avoid SRV and TXT lookups for handshaker service (#…
townba Nov 22, 2024
13d5a16
balancer/weightedroundrobin: Switch Weighted Round Robin to use pick …
zasweq Nov 23, 2024
8b70aeb
stats/opentelemetry: introduce tracing propagator and carrier (#7677)
purnesh42H Nov 25, 2024
dcba136
test/xds: remove redundant server when using stubserver in tests (#7846)
janardhanvissa Nov 25, 2024
bb7ae0a
Change logger to avoid Printf when disabled (#7471)
rob05c Nov 26, 2024
967ba46
balancer/pickfirst: Add pick first metrics (#7839)
zasweq Nov 26, 2024
4c07bca
stream: add jitter to retry backoff in accordance with gRFC A6 (#7869)
isgj Nov 26, 2024
3c0586a
stats/opentelemetry: Cleanup OpenTelemetry API's before stabilization…
zasweq Dec 2, 2024
3ce87dd
credentials/google: Add cloud-platform scope for ADC (#7887)
halvards Dec 2, 2024
ab189b0
examples/features/csm_observability: Add xDS Credentials (#7875)
zasweq Dec 2, 2024
17d08f7
scripts/gen-deps: filter out grpc modules (#7890)
dfawley Dec 2, 2024
00272e8
dns: Support link local IPv6 addresses (#7889)
arjan-bal Dec 3, 2024
78aa51b
pickfirst: Stop test servers without closing listeners (#7872)
arjan-bal Dec 3, 2024
634497b
test: Split import paths for generated message and service code (#7891)
arjan-bal Dec 3, 2024
5565631
balancer/pickfirst: replace grpc.Dial with grpc.NewClient in tests (#…
hanut19 Dec 4, 2024
317271b
pickfirst: Register a health listener when used as a leaf policy (#7832)
arjan-bal Dec 5, 2024
d7286fb
Change version to 1.70.0-dev (#7903)
purnesh42H Dec 5, 2024
645aadf
deps: update dependencies for all modules (#7904)
purnesh42H Dec 5, 2024
f53724d
serviceconfig: Return errors instead of skipping invalid retry policy…
dfawley Dec 5, 2024
adad26d
test/kokoro: Add psm-fallback build config (#7899)
easwars Dec 5, 2024
66ba4b2
examples/features/gracefulstop: add example to demonstrate server gra…
purnesh42H Dec 6, 2024
0027558
internal/transport: replace integer status codes with http constants …
JovialYip Dec 10, 2024
b1f70ce
test: replace grpc.Dial with grpc.NewClient
janardhanvissa Dec 10, 2024
e4d084a
examples: replace printf with print for log message in gracefulstop (…
eshitachandwani Dec 10, 2024
c1b6b37
Update README.md (#7921)
ashupednekar Dec 11, 2024
38a8b9a
health, grpc: Deliver health service updates through the health liste…
arjan-bal Dec 12, 2024
7ee073d
experimental/stats: re-add type aliases for migration (#7929)
dfawley Dec 13, 2024
3f76275
xdsclient: stop caching xdsChannels for potential reuse, after all re…
easwars Dec 13, 2024
cc161de
xds: Add support for multiple addresses per endpoint (#7858)
arjan-bal Dec 16, 2024
d0716f9
examples/features/csm_observability: Make CSM Observability example s…
zasweq Dec 16, 2024
e8055ea
grpcs: update `WithContextDialer` documentation to include using pass…
purnesh42H Dec 17, 2024
b3bdacb
test: switching to stubserver in tests instead of testservice (#7925)
pvsravani Dec 18, 2024
56a14ba
cleanup: replace dial with newclient (#7920)
janardhanvissa Dec 18, 2024
fc09e2c
final rebase with master
aranjans Dec 9, 2024
2747371
update e2e tests
aranjans Dec 9, 2024
96f92a0
fix e2e tests
aranjans Dec 10, 2024
f650e37
remove fmt logs
aranjans Dec 10, 2024
175d4c5
move grpc_trace_bin_propagator to experimental
aranjans Dec 11, 2024
c30c44a
move TraceOptions api to experimental
aranjans Dec 11, 2024
90ffa23
addressed purnesh's comments
aranjans Dec 16, 2024
2083830
don't change opentelemetry/e2e_test.go package name from opentelemetr…
aranjans Dec 16, 2024
e8e9d53
make vet happy
aranjans Dec 16, 2024
57fd38a
update TestServerWithMetricsAndTraceOptions
aranjans Dec 17, 2024
b0aad8a
fixed nits
aranjans Dec 17, 2024
8680ae8
fixed nits
aranjans Dec 17, 2024
98d11b7
fix: small nits
aranjans Dec 18, 2024
d0e1a0a
Add test with metrics and traces disabled.
aranjans Dec 19, 2024
b6503f7
make vet happy
aranjans Dec 19, 2024
b2831f1
pull out logic to find name resolution delay
aranjans Dec 20, 2024
1cb4396
remove experimental notice from grpc_trace_bin_propagator
aranjans Dec 20, 2024
b8fe8db
refactor and addressed comments from doug
aranjans Dec 23, 2024
d4ae3ab
Add copyright notice to client_tracing.go
aranjans Dec 23, 2024
c705b97
let client set the propagator and trace provider
aranjans Dec 24, 2024
5571e3b
fix breaking tests
aranjans Dec 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions balancer/endpointsharding/endpointsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,33 @@ package endpointsharding
import (
"encoding/json"
"errors"
"fmt"
rand "math/rand/v2"
"sync"
"sync/atomic"

rand "math/rand/v2"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)

// PickFirstConfig is a pick first config without shuffling enabled.
var PickFirstConfig string

func init() {
name := pickfirst.Name
if !envconfig.NewPickFirstEnabled {
name = pickfirstleaf.Name
}
PickFirstConfig = fmt.Sprintf("[{%q: {}}]", name)
}

// ChildState is the balancer state of a child along with the endpoint which
// identifies the child balancer.
type ChildState struct {
Expand Down Expand Up @@ -100,9 +114,6 @@ func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState

// Update/Create new children.
for _, endpoint := range state.ResolverState.Endpoints {
if len(endpoint.Addresses) == 0 {
continue
}
if _, ok := newChildren.Get(endpoint); ok {
// Endpoint child was already created, continue to avoid duplicate
// update.
Expand Down Expand Up @@ -143,6 +154,9 @@ func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState
}
}
es.children.Store(newChildren)
if newChildren.Len() == 0 {
return balancer.ErrBadResolverState
}
return ret
}

Expand Down Expand Up @@ -306,6 +320,3 @@ func (bw *balancerWrapper) UpdateState(state balancer.State) {
func ParseConfig(cfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return gracefulswitch.ParseConfig(cfg)
}

// PickFirstConfig is a pick first config without shuffling enabled.
const PickFirstConfig = "[{\"pick_first\": {}}]"
2 changes: 1 addition & 1 deletion balancer/grpclb/grpc_lb_v1/load_balancer.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 21 additions & 18 deletions balancer/grpclb/grpclb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (s) TestGRPCLB_Basic(t *testing.T) {
}
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()

Expand Down Expand Up @@ -517,7 +517,7 @@ func (s) TestGRPCLB_Weighted(t *testing.T) {
}
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()

Expand Down Expand Up @@ -597,7 +597,7 @@ func (s) TestGRPCLB_DropRequest(t *testing.T) {
}
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()
testC := testgrpc.NewTestServiceClient(cc)
Expand Down Expand Up @@ -769,7 +769,7 @@ func (s) TestGRPCLB_BalancerDisconnects(t *testing.T) {
}
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()
testC := testgrpc.NewTestServiceClient(cc)
Expand Down Expand Up @@ -940,7 +940,7 @@ func (s) TestGRPCLB_ExplicitFallback(t *testing.T) {
}
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()
testC := testgrpc.NewTestServiceClient(cc)
Expand Down Expand Up @@ -1008,11 +1008,12 @@ func (s) TestGRPCLB_FallBackWithNoServerAddress(t *testing.T) {
grpc.WithTransportCredentials(&serverNameCheckCreds{}),
grpc.WithContextDialer(fakeNameDialer),
}
cc, err := grpc.Dial(r.Scheme()+":///"+beServerName, dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()
cc.Connect()
testC := testgrpc.NewTestServiceClient(cc)

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand Down Expand Up @@ -1102,10 +1103,11 @@ func (s) TestGRPCLB_PickFirst(t *testing.T) {
grpc.WithTransportCredentials(&serverNameCheckCreds{}),
grpc.WithContextDialer(fakeNameDialer),
}
cc, err := grpc.Dial(r.Scheme()+":///"+beServerName, dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend: %v", err)
}
cc.Connect()
defer cc.Close()

// Push a service config with grpclb as the load balancing policy and
Expand Down Expand Up @@ -1198,7 +1200,7 @@ func (s) TestGRPCLB_BackendConnectionErrorPropagation(t *testing.T) {
grpc.WithTransportCredentials(&serverNameCheckCreds{}),
grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to create new client to the backend %v", err)
t.Fatalf("Failed to create a client for the backend: %v", err)
}
defer cc.Close()
testC := testgrpc.NewTestServiceClient(cc)
Expand Down Expand Up @@ -1241,10 +1243,11 @@ func testGRPCLBEmptyServerList(t *testing.T, svcfg string) {
grpc.WithTransportCredentials(&serverNameCheckCreds{}),
grpc.WithContextDialer(fakeNameDialer),
}
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
cc.Connect()
defer cc.Close()
testC := testgrpc.NewTestServiceClient(cc)

Expand Down Expand Up @@ -1311,15 +1314,16 @@ func (s) TestGRPCLBWithTargetNameFieldInConfig(t *testing.T) {
// Push the backend address to the remote balancer.
tss.ls.sls <- sl

cc, err := grpc.Dial(r.Scheme()+":///"+beServerName,
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName,
grpc.WithResolvers(r),
grpc.WithTransportCredentials(&serverNameCheckCreds{}),
grpc.WithContextDialer(fakeNameDialer),
grpc.WithUserAgent(testUserAgent))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
defer cc.Close()
cc.Connect()
testC := testgrpc.NewTestServiceClient(cc)

// Push a resolver update with grpclb configuration which does not contain the
Expand Down Expand Up @@ -1418,15 +1422,14 @@ func runAndCheckStats(t *testing.T, drop bool, statsChan chan *lbpb.ClientStats,
tss.ls.statsDura = 100 * time.Millisecond
creds := serverNameCheckCreds{}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
grpc.WithTransportCredentials(&creds),
grpc.WithPerRPCCredentials(failPreRPCCred{}),
grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
t.Fatalf("Failed to create a client for the backend %v", err)
}
cc.Connect()
defer cc.Close()

rstate := resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig)}
Expand Down
18 changes: 9 additions & 9 deletions balancer/pickfirst/pickfirst_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,12 @@ func (s) TestPickFirst_StickyTransientFailure(t *testing.T) {
},
}),
}
cc, err := grpc.Dial(lis.Addr().String(), dopts...)
cc, err := grpc.NewClient(lis.Addr().String(), dopts...)
if err != nil {
t.Fatalf("Failed to dial server at %q: %v", lis.Addr(), err)
t.Fatalf("Failed to create new client: %v", err)
}
t.Cleanup(func() { cc.Close() })

cc.Connect()
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)

// Spawn a goroutine to ensure that the channel stays in TransientFailure.
Expand Down Expand Up @@ -837,12 +837,12 @@ func (s) TestPickFirst_ResolverError_WithPreviousUpdate_Connecting(t *testing.T)
grpc.WithResolvers(r),
grpc.WithDefaultServiceConfig(pickFirstServiceConfig),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
t.Fatalf("grpc.NewClient() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })

cc.Connect()
addrs := []resolver.Address{{Addr: lis.Addr().String()}}
r.UpdateState(resolver.State{Addresses: addrs})
testutils.AwaitState(ctx, t, cc, connectivity.Connecting)
Expand Down Expand Up @@ -892,12 +892,12 @@ func (s) TestPickFirst_ResolverError_WithPreviousUpdate_TransientFailure(t *test
grpc.WithResolvers(r),
grpc.WithDefaultServiceConfig(pickFirstServiceConfig),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
t.Fatalf("grpc.NewClient() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })

cc.Connect()
addrs := []resolver.Address{{Addr: lis.Addr().String()}}
r.UpdateState(resolver.State{Addresses: addrs})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand Down
5 changes: 3 additions & 2 deletions balancer/pickfirst/pickfirst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/resolver"
)

Expand All @@ -55,7 +56,7 @@ func (s) TestPickFirst_InitialResolverError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{})
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
defer bal.Close()
bal.ResolverError(errors.New("resolution failed: test error"))

Expand Down Expand Up @@ -88,7 +89,7 @@ func (s) TestPickFirst_ResolverErrorinTF(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{})
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
defer bal.Close()

// After sending a valid update, the LB policy should report CONNECTING.
Expand Down
Loading
Loading