Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

completely delete WatchListener and WatchRouteConfig APIs #6849

Merged
merged 3 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 0 additions & 153 deletions xds/internal/testutils/fakeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/load"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

// Client is a fake implementation of an xds client. It exposes a bunch of
Expand All @@ -38,151 +37,10 @@ type Client struct {
xdsclient.XDSClient

name string
ldsWatchCh *testutils.Channel
rdsWatchCh *testutils.Channel
cdsWatchCh *testutils.Channel
edsWatchCh *testutils.Channel
ldsCancelCh *testutils.Channel
rdsCancelCh *testutils.Channel
cdsCancelCh *testutils.Channel
edsCancelCh *testutils.Channel
loadReportCh *testutils.Channel
lrsCancelCh *testutils.Channel
loadStore *load.Store
bootstrapCfg *bootstrap.Config

ldsCb func(xdsresource.ListenerUpdate, error)
rdsCbs map[string]func(xdsresource.RouteConfigUpdate, error)
cdsCbs map[string]func(xdsresource.ClusterUpdate, error)
edsCbs map[string]func(xdsresource.EndpointsUpdate, error)
}

// WatchListener registers a LDS watch.
func (xdsC *Client) WatchListener(serviceName string, callback func(xdsresource.ListenerUpdate, error)) func() {
xdsC.ldsCb = callback
xdsC.ldsWatchCh.Send(serviceName)
return func() {
xdsC.ldsCancelCh.Send(nil)
}
}

// WaitForWatchListener waits for WatchCluster to be invoked on this client and
// returns the serviceName being watched.
func (xdsC *Client) WaitForWatchListener(ctx context.Context) (string, error) {
val, err := xdsC.ldsWatchCh.Receive(ctx)
if err != nil {
return "", err
}
return val.(string), err
}

// InvokeWatchListenerCallback invokes the registered ldsWatch callback.
//
// Not thread safe with WatchListener. Only call this after
// WaitForWatchListener.
func (xdsC *Client) InvokeWatchListenerCallback(update xdsresource.ListenerUpdate, err error) {
xdsC.ldsCb(update, err)
}

// WaitForCancelListenerWatch waits for a LDS watch to be cancelled and returns
// context.DeadlineExceeded otherwise.
func (xdsC *Client) WaitForCancelListenerWatch(ctx context.Context) error {
_, err := xdsC.ldsCancelCh.Receive(ctx)
return err
}

// WatchRouteConfig registers a RDS watch.
func (xdsC *Client) WatchRouteConfig(routeName string, callback func(xdsresource.RouteConfigUpdate, error)) func() {
xdsC.rdsCbs[routeName] = callback
xdsC.rdsWatchCh.Send(routeName)
return func() {
xdsC.rdsCancelCh.Send(routeName)
}
}

// WaitForWatchRouteConfig waits for WatchCluster to be invoked on this client and
// returns the routeName being watched.
func (xdsC *Client) WaitForWatchRouteConfig(ctx context.Context) (string, error) {
val, err := xdsC.rdsWatchCh.Receive(ctx)
if err != nil {
return "", err
}
return val.(string), err
}

// InvokeWatchRouteConfigCallback invokes the registered rdsWatch callback.
//
// Not thread safe with WatchRouteConfig. Only call this after
// WaitForWatchRouteConfig.
func (xdsC *Client) InvokeWatchRouteConfigCallback(name string, update xdsresource.RouteConfigUpdate, err error) {
if len(xdsC.rdsCbs) != 1 {
xdsC.rdsCbs[name](update, err)
return
}
// Keeps functionality with previous usage of this on client side, if single
// callback call that callback.
var routeName string
for route := range xdsC.rdsCbs {
routeName = route
}
xdsC.rdsCbs[routeName](update, err)
}

// WaitForCancelRouteConfigWatch waits for a RDS watch to be cancelled and returns
// context.DeadlineExceeded otherwise.
func (xdsC *Client) WaitForCancelRouteConfigWatch(ctx context.Context) (string, error) {
val, err := xdsC.rdsCancelCh.Receive(ctx)
if err != nil {
return "", err
}
return val.(string), err
}

// WatchEndpoints registers an EDS watch for provided clusterName.
func (xdsC *Client) WatchEndpoints(clusterName string, callback func(xdsresource.EndpointsUpdate, error)) (cancel func()) {
xdsC.edsCbs[clusterName] = callback
xdsC.edsWatchCh.Send(clusterName)
return func() {
xdsC.edsCancelCh.Send(clusterName)
}
}

// WaitForWatchEDS waits for WatchEndpoints to be invoked on this client and
// returns the clusterName being watched.
func (xdsC *Client) WaitForWatchEDS(ctx context.Context) (string, error) {
val, err := xdsC.edsWatchCh.Receive(ctx)
if err != nil {
return "", err
}
return val.(string), err
}

// InvokeWatchEDSCallback invokes the registered edsWatch callback.
//
// Not thread safe with WatchEndpoints. Only call this after
// WaitForWatchEDS.
func (xdsC *Client) InvokeWatchEDSCallback(name string, update xdsresource.EndpointsUpdate, err error) {
if len(xdsC.edsCbs) != 1 {
// This may panic if name isn't found. But it's fine for tests.
xdsC.edsCbs[name](update, err)
return
}
// Keeps functionality with previous usage of this, if single callback call
// that callback.
for n := range xdsC.edsCbs {
name = n
}
xdsC.edsCbs[name](update, err)
}

// WaitForCancelEDSWatch waits for a EDS watch to be cancelled and returns
// context.DeadlineExceeded otherwise.
func (xdsC *Client) WaitForCancelEDSWatch(ctx context.Context) (string, error) {
edsNameReceived, err := xdsC.edsCancelCh.Receive(ctx)
if err != nil {
return "", err
}
return edsNameReceived.(string), err
}

// ReportLoadArgs wraps the arguments passed to ReportLoad.
Expand Down Expand Up @@ -247,20 +105,9 @@ func NewClient() *Client {
func NewClientWithName(name string) *Client {
return &Client{
name: name,
ldsWatchCh: testutils.NewChannelWithSize(10),
rdsWatchCh: testutils.NewChannelWithSize(10),
cdsWatchCh: testutils.NewChannelWithSize(10),
edsWatchCh: testutils.NewChannelWithSize(10),
ldsCancelCh: testutils.NewChannelWithSize(10),
rdsCancelCh: testutils.NewChannelWithSize(10),
cdsCancelCh: testutils.NewChannelWithSize(10),
edsCancelCh: testutils.NewChannelWithSize(10),
loadReportCh: testutils.NewChannel(),
lrsCancelCh: testutils.NewChannel(),
loadStore: load.NewStore(),
bootstrapCfg: &bootstrap.Config{ClientDefaultListenerResourceNameTemplate: "%s"},
rdsCbs: make(map[string]func(xdsresource.RouteConfigUpdate, error)),
cdsCbs: make(map[string]func(xdsresource.ClusterUpdate, error)),
edsCbs: make(map[string]func(xdsresource.EndpointsUpdate, error)),
}
}
2 changes: 1 addition & 1 deletion xds/internal/testutils/resource_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
// used to receive updates on watches registered with the xDS client, when using
// the resource-type agnostic WatchResource API.
//
// Tests can the channels provided by this tyep to get access to updates and
// Tests can use the channels provided by this type to get access to updates and
// errors sent by the xDS client.
type TestResourceWatcher struct {
// UpdateCh is the channel on which xDS client updates are delivered.
Expand Down
6 changes: 0 additions & 6 deletions xds/internal/xdsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ import (
// (collectively termed as xDS) on a remote management server, to discover
// various dynamic resources.
type XDSClient interface {
WatchListener(string, func(xdsresource.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func()

// WatchResource uses xDS to discover the resource associated with the
// provided resource name. The resource type implementation determines how
// xDS requests are sent out and how responses are deserialized and
Expand All @@ -47,9 +44,6 @@ type XDSClient interface {
// During a race (e.g. an xDS response is received while the user is calling
// cancel()), there's a small window where the callback can be called after
// the watcher is canceled. Callers need to handle this case.
//
// TODO: Once this generic client API is fully implemented and integrated,
// delete the resource type specific watch APIs on this interface.
WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func())

// DumpResources returns the status of the xDS resources. Returns a map of
Expand Down
56 changes: 0 additions & 56 deletions xds/internal/xdsclient/clientimpl_watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,62 +25,6 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

// This is only required temporarily, while we modify the
// clientImpl.WatchListener API to be implemented via the wrapper
// WatchListener() API which calls the WatchResource() API.
type listenerWatcher struct {
resourceName string
cb func(xdsresource.ListenerUpdate, error)
}

func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
l.cb(update.Resource, nil)
}

func (l *listenerWatcher) OnError(err error) {
l.cb(xdsresource.ListenerUpdate{}, err)
}

func (l *listenerWatcher) OnResourceDoesNotExist() {
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Listener not found in received response", l.resourceName)
l.cb(xdsresource.ListenerUpdate{}, err)
}

// WatchListener uses LDS to discover information about the Listener resource
// identified by resourceName.
func (c *clientImpl) WatchListener(resourceName string, cb func(xdsresource.ListenerUpdate, error)) (cancel func()) {
watcher := &listenerWatcher{resourceName: resourceName, cb: cb}
return xdsresource.WatchListener(c, resourceName, watcher)
}

// This is only required temporarily, while we modify the
// clientImpl.WatchRouteConfig API to be implemented via the wrapper
// WatchRouteConfig() API which calls the WatchResource() API.
type routeConfigWatcher struct {
resourceName string
cb func(xdsresource.RouteConfigUpdate, error)
}

func (r *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
r.cb(update.Resource, nil)
}

func (r *routeConfigWatcher) OnError(err error) {
r.cb(xdsresource.RouteConfigUpdate{}, err)
}

func (r *routeConfigWatcher) OnResourceDoesNotExist() {
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type RouteConfiguration not found in received response", r.resourceName)
r.cb(xdsresource.RouteConfigUpdate{}, err)
}

// WatchRouteConfig uses RDS to discover information about the
// RouteConfiguration resource identified by resourceName.
func (c *clientImpl) WatchRouteConfig(resourceName string, cb func(xdsresource.RouteConfigUpdate, error)) (cancel func()) {
watcher := &routeConfigWatcher{resourceName: resourceName, cb: cb}
return xdsresource.WatchRouteConfig(c, resourceName, watcher)
}

// WatchResource uses xDS to discover the resource associated with the provided
// resource name. The resource type implementation determines how xDS requests
// are sent out and how responses are deserialized and validated. Upon receipt
Expand Down
6 changes: 0 additions & 6 deletions xds/internal/xdsclient/tests/authority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,6 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time.
return lisDefault, lisNonDefault, client, close
}

type noopClusterWatcher struct{}

func (noopClusterWatcher) OnUpdate(update *xdsresource.ClusterResourceData) {}
func (noopClusterWatcher) OnError(err error) {}
func (noopClusterWatcher) OnResourceDoesNotExist() {}

// TestAuthorityShare tests the authority sharing logic. The test verifies the
// following scenarios:
// - A watch for a resource name with an authority matching an existing watch
Expand Down
Loading
Loading