From 7f59c17fa3f3dad094924e68dfb38f2df1954ef4 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Thu, 11 Jan 2024 11:59:52 -0500 Subject: [PATCH] Switch to atomic pointers --- xds/internal/server/conn_wrapper.go | 6 ++---- xds/internal/server/listener_wrapper.go | 16 ++++++---------- xds/internal/xdsclient/client_new.go | 17 ++++++----------- .../xdsclient/xdsresource/filter_chain.go | 9 +++++---- 4 files changed, 19 insertions(+), 29 deletions(-) diff --git a/xds/internal/server/conn_wrapper.go b/xds/internal/server/conn_wrapper.go index ee766842c5aa..30fb74d32db6 100644 --- a/xds/internal/server/conn_wrapper.go +++ b/xds/internal/server/conn_wrapper.go @@ -24,7 +24,6 @@ import ( "sync" "sync/atomic" "time" - "unsafe" "google.golang.org/grpc/credentials/tls/certprovider" xdsinternal "google.golang.org/grpc/internal/credentials/xds" @@ -68,14 +67,13 @@ type connWrapper struct { // The virtual hosts with matchable routes and instantiated HTTP Filters per // route, or an error. - urc *unsafe.Pointer // *xdsresource.UsableRouteConfiguration + urc *atomic.Pointer[xdsresource.UsableRouteConfiguration] } // UsableRouteConfiguration returns the UsableRouteConfiguration to be used for // server side routing. func (c *connWrapper) UsableRouteConfiguration() xdsresource.UsableRouteConfiguration { - uPtr := atomic.LoadPointer(c.urc) - return *(*xdsresource.UsableRouteConfiguration)(uPtr) + return *c.urc.Load() } // SetDeadline makes a copy of the passed in deadline and forwards the call to diff --git a/xds/internal/server/listener_wrapper.go b/xds/internal/server/listener_wrapper.go index 78643ab0bfa0..4657925835d5 100644 --- a/xds/internal/server/listener_wrapper.go +++ b/xds/internal/server/listener_wrapper.go @@ -24,9 +24,7 @@ import ( "fmt" "net" "sync" - "sync/atomic" "time" - "unsafe" "google.golang.org/grpc/backoff" "google.golang.org/grpc/connectivity" @@ -229,12 +227,12 @@ func (l *listenerWrapper) handleRDSUpdate(routeName string, rcu rdsWatcherUpdate continue } if rcu.err != nil && rcu.update == nil { // Either NACK before update, or resource not found triggers this conditional. - atomic.StorePointer(fc.UsableRouteConfiguration, unsafe.Pointer(&xdsresource.UsableRouteConfiguration{ + fc.UsableRouteConfiguration.Store(&xdsresource.UsableRouteConfiguration{ Err: rcu.err, - })) + }) continue } - atomic.StorePointer(fc.UsableRouteConfiguration, unsafe.Pointer(fc.ConstructUsableRouteConfiguration(*rcu.update))) + fc.UsableRouteConfiguration.Store(fc.ConstructUsableRouteConfiguration(*rcu.update)) } } if l.rdsHandler.determineRouteConfigurationReady() { @@ -249,17 +247,15 @@ func (l *listenerWrapper) handleRDSUpdate(routeName string, rcu rdsWatcherUpdate func (l *listenerWrapper) instantiateFilterChainRoutingConfigurationsLocked() { for _, fc := range l.activeFilterChainManager.FilterChains() { if fc.InlineRouteConfig != nil { - atomic.StorePointer(fc.UsableRouteConfiguration, unsafe.Pointer(fc.ConstructUsableRouteConfiguration(*fc.InlineRouteConfig))) // Can't race with an RPC coming in but no harm making atomic. + fc.UsableRouteConfiguration.Store(fc.ConstructUsableRouteConfiguration(*fc.InlineRouteConfig)) // Can't race with an RPC coming in but no harm making atomic. continue } // Inline configuration constructed once here, will remain for lifetime of filter chain. rcu := l.rdsHandler.updates[fc.RouteConfigName] if rcu.err != nil && rcu.update == nil { - atomic.StorePointer(fc.UsableRouteConfiguration, unsafe.Pointer(&xdsresource.UsableRouteConfiguration{ - Err: rcu.err, - })) + fc.UsableRouteConfiguration.Store(&xdsresource.UsableRouteConfiguration{Err: rcu.err}) continue } - atomic.StorePointer(fc.UsableRouteConfiguration, unsafe.Pointer(fc.ConstructUsableRouteConfiguration(*rcu.update))) // Can't race with an RPC coming in but no harm making atomic. + fc.UsableRouteConfiguration.Store(fc.ConstructUsableRouteConfiguration(*rcu.update)) // Can't race with an RPC coming in but no harm making atomic. } } diff --git a/xds/internal/xdsclient/client_new.go b/xds/internal/xdsclient/client_new.go index 5b19e10b9884..a3e33315b19b 100644 --- a/xds/internal/xdsclient/client_new.go +++ b/xds/internal/xdsclient/client_new.go @@ -24,6 +24,7 @@ import ( "encoding/json" "fmt" "sync" + "sync/atomic" "time" "google.golang.org/grpc/internal" @@ -107,14 +108,11 @@ func init() { } var ( - singletonClientForTestingMu sync.Mutex - singletonClientForTesting *clientRefCounted + singletonClientForTesting atomic.Pointer[clientRefCounted] ) func triggerXDSResourceNameNotFoundClient(resourceType, resourceName string) error { - singletonClientForTestingMu.Lock() - c := singletonClientForTesting - singletonClientForTestingMu.Unlock() + c := singletonClientForTesting.Load() return internal.TriggerXDSResourceNameNotFoundForTesting.(func(func(xdsresource.Type, string) error, string, string) error)(c.clientImpl.triggerResourceNotFoundForTesting, resourceType, resourceName) } @@ -141,18 +139,15 @@ func NewWithBootstrapContentsForTesting(contents []byte) (XDSClient, func(), err if err != nil { return nil, nil, err } - singletonClientForTestingMu.Lock() - singletonClientForTesting = c - singletonClientForTestingMu.Unlock() + singletonClientForTesting = atomic.Pointer[clientRefCounted]{} + singletonClientForTesting.Store(c) return c, grpcsync.OnceFunc(func() { clientsMu.Lock() defer clientsMu.Unlock() if c.decrRef() == 0 { c.close() delete(clients, string(contents)) - singletonClientForTestingMu.Lock() - singletonClientForTesting = nil - singletonClientForTestingMu.Unlock() + singletonClientForTesting.Store(nil) } }), nil } diff --git a/xds/internal/xdsclient/xdsresource/filter_chain.go b/xds/internal/xdsclient/xdsresource/filter_chain.go index 861c45da5e4e..cb63a450acb9 100644 --- a/xds/internal/xdsclient/xdsresource/filter_chain.go +++ b/xds/internal/xdsclient/xdsresource/filter_chain.go @@ -21,7 +21,7 @@ import ( "errors" "fmt" "net" - "unsafe" + "sync/atomic" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" @@ -70,7 +70,7 @@ type FilterChain struct { InlineRouteConfig *RouteConfigUpdate // UsableRouteConfiguration is the routing configuration for this filter // chain (LDS + RDS). - UsableRouteConfiguration *unsafe.Pointer // *(UsableRouteConfiguration) + UsableRouteConfiguration *atomic.Pointer[UsableRouteConfiguration] } // VirtualHostWithInterceptors captures information present in a VirtualHost @@ -614,10 +614,11 @@ func (fcm *FilterChainManager) Validate(f func(fc *FilterChain) error) error { } func processNetworkFilters(filters []*v3listenerpb.Filter) (*FilterChain, error) { - rc := unsafe.Pointer(&UsableRouteConfiguration{}) + rc := &UsableRouteConfiguration{} filterChain := &FilterChain{ - UsableRouteConfiguration: &rc, + UsableRouteConfiguration: &atomic.Pointer[UsableRouteConfiguration]{}, } + filterChain.UsableRouteConfiguration.Store(rc) seenNames := make(map[string]bool, len(filters)) seenHCM := false for _, filter := range filters {