Skip to content

Commit

Permalink
Switch to atomic pointers
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Jan 11, 2024
1 parent 6ea0d50 commit 7f59c17
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 29 deletions.
6 changes: 2 additions & 4 deletions xds/internal/server/conn_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"google.golang.org/grpc/credentials/tls/certprovider"
xdsinternal "google.golang.org/grpc/internal/credentials/xds"
Expand Down Expand Up @@ -68,14 +67,13 @@ type connWrapper struct {

// The virtual hosts with matchable routes and instantiated HTTP Filters per
// route, or an error.
urc *unsafe.Pointer // *xdsresource.UsableRouteConfiguration
urc *atomic.Pointer[xdsresource.UsableRouteConfiguration]
}

// UsableRouteConfiguration returns the UsableRouteConfiguration to be used for
// server side routing.
func (c *connWrapper) UsableRouteConfiguration() xdsresource.UsableRouteConfiguration {
uPtr := atomic.LoadPointer(c.urc)
return *(*xdsresource.UsableRouteConfiguration)(uPtr)
return *c.urc.Load()
}

// SetDeadline makes a copy of the passed in deadline and forwards the call to
Expand Down
16 changes: 6 additions & 10 deletions xds/internal/server/listener_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"unsafe"

"google.golang.org/grpc/backoff"
"google.golang.org/grpc/connectivity"
Expand Down Expand Up @@ -229,12 +227,12 @@ func (l *listenerWrapper) handleRDSUpdate(routeName string, rcu rdsWatcherUpdate
continue

Check warning on line 227 in xds/internal/server/listener_wrapper.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/listener_wrapper.go#L227

Added line #L227 was not covered by tests
}
if rcu.err != nil && rcu.update == nil { // Either NACK before update, or resource not found triggers this conditional.
atomic.StorePointer(fc.UsableRouteConfiguration, unsafe.Pointer(&xdsresource.UsableRouteConfiguration{
fc.UsableRouteConfiguration.Store(&xdsresource.UsableRouteConfiguration{
Err: rcu.err,
}))
})
continue
}
atomic.StorePointer(fc.UsableRouteConfiguration, unsafe.Pointer(fc.ConstructUsableRouteConfiguration(*rcu.update)))
fc.UsableRouteConfiguration.Store(fc.ConstructUsableRouteConfiguration(*rcu.update))
}
}
if l.rdsHandler.determineRouteConfigurationReady() {
Expand All @@ -249,17 +247,15 @@ func (l *listenerWrapper) handleRDSUpdate(routeName string, rcu rdsWatcherUpdate
func (l *listenerWrapper) instantiateFilterChainRoutingConfigurationsLocked() {
for _, fc := range l.activeFilterChainManager.FilterChains() {
if fc.InlineRouteConfig != nil {
atomic.StorePointer(fc.UsableRouteConfiguration, unsafe.Pointer(fc.ConstructUsableRouteConfiguration(*fc.InlineRouteConfig))) // Can't race with an RPC coming in but no harm making atomic.
fc.UsableRouteConfiguration.Store(fc.ConstructUsableRouteConfiguration(*fc.InlineRouteConfig)) // Can't race with an RPC coming in but no harm making atomic.
continue
} // Inline configuration constructed once here, will remain for lifetime of filter chain.
rcu := l.rdsHandler.updates[fc.RouteConfigName]
if rcu.err != nil && rcu.update == nil {
atomic.StorePointer(fc.UsableRouteConfiguration, unsafe.Pointer(&xdsresource.UsableRouteConfiguration{
Err: rcu.err,
}))
fc.UsableRouteConfiguration.Store(&xdsresource.UsableRouteConfiguration{Err: rcu.err})
continue
}
atomic.StorePointer(fc.UsableRouteConfiguration, unsafe.Pointer(fc.ConstructUsableRouteConfiguration(*rcu.update))) // Can't race with an RPC coming in but no harm making atomic.
fc.UsableRouteConfiguration.Store(fc.ConstructUsableRouteConfiguration(*rcu.update)) // Can't race with an RPC coming in but no harm making atomic.
}
}

Expand Down
17 changes: 6 additions & 11 deletions xds/internal/xdsclient/client_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"

"google.golang.org/grpc/internal"
Expand Down Expand Up @@ -107,14 +108,11 @@ func init() {
}

var (
singletonClientForTestingMu sync.Mutex
singletonClientForTesting *clientRefCounted
singletonClientForTesting atomic.Pointer[clientRefCounted]
)

func triggerXDSResourceNameNotFoundClient(resourceType, resourceName string) error {
singletonClientForTestingMu.Lock()
c := singletonClientForTesting
singletonClientForTestingMu.Unlock()
c := singletonClientForTesting.Load()
return internal.TriggerXDSResourceNameNotFoundForTesting.(func(func(xdsresource.Type, string) error, string, string) error)(c.clientImpl.triggerResourceNotFoundForTesting, resourceType, resourceName)
}

Expand All @@ -141,18 +139,15 @@ func NewWithBootstrapContentsForTesting(contents []byte) (XDSClient, func(), err
if err != nil {
return nil, nil, err
}
singletonClientForTestingMu.Lock()
singletonClientForTesting = c
singletonClientForTestingMu.Unlock()
singletonClientForTesting = atomic.Pointer[clientRefCounted]{}
singletonClientForTesting.Store(c)
return c, grpcsync.OnceFunc(func() {
clientsMu.Lock()
defer clientsMu.Unlock()
if c.decrRef() == 0 {
c.close()
delete(clients, string(contents))
singletonClientForTestingMu.Lock()
singletonClientForTesting = nil
singletonClientForTestingMu.Unlock()
singletonClientForTesting.Store(nil)
}
}), nil
}
Expand Down
9 changes: 5 additions & 4 deletions xds/internal/xdsclient/xdsresource/filter_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"errors"
"fmt"
"net"
"unsafe"
"sync/atomic"

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
Expand Down Expand Up @@ -70,7 +70,7 @@ type FilterChain struct {
InlineRouteConfig *RouteConfigUpdate
// UsableRouteConfiguration is the routing configuration for this filter
// chain (LDS + RDS).
UsableRouteConfiguration *unsafe.Pointer // *(UsableRouteConfiguration)
UsableRouteConfiguration *atomic.Pointer[UsableRouteConfiguration]
}

// VirtualHostWithInterceptors captures information present in a VirtualHost
Expand Down Expand Up @@ -614,10 +614,11 @@ func (fcm *FilterChainManager) Validate(f func(fc *FilterChain) error) error {
}

func processNetworkFilters(filters []*v3listenerpb.Filter) (*FilterChain, error) {
rc := unsafe.Pointer(&UsableRouteConfiguration{})
rc := &UsableRouteConfiguration{}
filterChain := &FilterChain{
UsableRouteConfiguration: &rc,
UsableRouteConfiguration: &atomic.Pointer[UsableRouteConfiguration]{},
}
filterChain.UsableRouteConfiguration.Store(rc)
seenNames := make(map[string]bool, len(filters))
seenHCM := false
for _, filter := range filters {
Expand Down

0 comments on commit 7f59c17

Please sign in to comment.