Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: start using the newly added transport and channel functionalities #7773

Merged
merged 6 commits into from
Nov 1, 2024

Conversation

easwars
Copy link
Contributor

@easwars easwars commented Oct 24, 2024

#a71-xds-fallback
#xdsclient-refactor
Addresses #6902

This PR is the last of the refactor PRs. Contains the following:

  • xDS client owns a bunch of xdsChannels and authorities
  • xdsChannel owns a transport and an LRS and ADS stream implementations
  • every authority owns one xdsChannel (although it could contain config for multiple)

In a follow-up PR, the TODOs mentioned for fallback will be filled in with e2e style tests.

RELEASE NOTES: none

@easwars easwars requested review from zasweq and purnesh42H October 24, 2024 12:22
@easwars easwars added the Type: Internal Cleanup Refactors, etc label Oct 24, 2024
@easwars easwars added this to the 1.68 Release milestone Oct 24, 2024
closed bool
resources map[xdsresource.Type]map[string]*resourceState

// An ordered list of xdsChannels along with their server configuration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: may be mention the order criteria? or is it just order of addition? In that case, do we even need to mention the order as list is implicit to maintain the order in which items are added.

Also, may be An ordered list of xdsChannels along with their server configuration to which authority has subscribed to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the ordered list of xds channels corresponding to the ordered list of server configuration present in the authority config in the bootstrap.

The ordering here specifies the priority, i.e. the first entry is preferred over everything that comes after it, the second entry is preferred over everything that comes after it etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expanded the comment a little. Maybe it is a little more clear now. Thanks.

// The current active xdsChannel. Here, active does not mean that the
// channel has a working connection to the server. It simply points to the
// channel that we are trying to work with, based on fallback logic.
activeChannel *xdsChannelWithConfig
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: activeXdsChannel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// server config. The error will be forwarded to all the resource watchers.
//
// Errors of type xdsresource.ErrTypeStreamFailedAfterRecv are ignored.
func (a *authority) adsStreamFailure(serverConfig *bootstrap.ServerConfig, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clarification: may be need to mention who calls adsStreamFailure and adsResourceUpdate called by xdsclient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

for _, rType := range a.resources {
for _, state := range rType {
for watcher := range state.watchers {
watcher := watcher
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why watcher := watcher?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the callback passed to the serializer access this loop variable. See https://fuchsoria.medium.com/shadowing-or-how-to-fix-loop-variable-i-captured-by-func-literal-f365c0ee984e

a.updateResourceStateAndScheduleCallbacks(rType, updates, md, onDone)
return err

// TODO(easwars-fallback): Trigger fallback here if conditions for fallback
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should still report error if fallback is being triggered?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no fallback support in this PR. It will be added in the next PR.

But what error do you want to report during fallback and to whom?

for watcher := range state.watchers {
watcher := watcher
watcherCnt.Add(1)
funcsToSchedule = append(funcsToSchedule, func(context.Context) { watcher.OnResourceDoesNotExist(done) })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where are these funcsToSchedule are scheduled on watcherCallbackSerializer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the top of this function via a defer.

}
cleanup = a.unwatchResource(rType, resourceName, watcher)
})
<-done
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need to spawn a separate thread for watcher if we need to wait for completion anyways?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because if we don't schedule it on the serializer, we would have to guard the fields with a mutex.

}
a.transport.SendRequest(rType.TypeURL(), resourcesToRequest)
a.xdsChannelConfigs[0].xc = xc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why always use first configuration? What are other configurations there for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only use the first configuration prior to supporting fallback. The other configurations will be used when fallback support is added as part of the next PR.

Copy link
Contributor

@zasweq zasweq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Midway through first pass; super light so far but figured I'd send part way since I'll finish the pass tomorrow so you can have something to in parallel work on.

xds/internal/xdsclient/authority.go Outdated Show resolved Hide resolved
xds/internal/xdsclient/authority.go Outdated Show resolved Hide resolved
xds/internal/xdsclient/authority.go Outdated Show resolved Hide resolved
xds/internal/xdsclient/authority.go Outdated Show resolved Hide resolved
xds/internal/xdsclient/authority.go Outdated Show resolved Hide resolved
// the management server. If an active channel is available, it returns that.
// Otherwise, it creates a new channel using the first server configuration in
// the list of configurations, and returns that.
func (a *authority) xdsChannelToUseLocked() *xdsChannelWithConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the onLocked method here and below; mention what mu needs to be held while calling these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no mutex in the authority type any more. So, just added a note that this needs to be run in the context of a serializer callback.

Copy link
Contributor

@zasweq zasweq Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Should we drop the "Locked" suffix then (similar to how handle functions don't have locked suffix)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

xds/internal/xdsclient/authority.go Outdated Show resolved Hide resolved
xds/internal/xdsclient/authority.go Outdated Show resolved Hide resolved
Comment on lines 138 to 140
watcherCallbackSerializer: args.serializer,
getChannelForADS: args.getChannelForADS,
serializer: grpcsync.NewCallbackSerializer(ctx),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the xDS Client serializer created inline but the watcherCallback is a knob? I see the watcher callback serializer getting passed from newClient as the client serializer? Is that to make all watches across all authorities serial?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the xdsClientSerializer is an implementation detail of the authority struct, while the serializer used in the xDS client to serialize watch callbacks is something that is shared across all watchers. For example, a single watcher implementation could register watches on multiple authorities. In such a case, we still need to guarantee that callbacks across the watches are not invoked concurrently. So, we need a single serializer for all watchers.

xds/internal/xdsclient/authority.go Show resolved Hide resolved
}

for name, cfg := range config.Authorities() {
// If server configus are specified in the authorities map, use that. Else,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: configus

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

for name, cfg := range config.Authorities() {
// If server configus are specified in the authorities map, use that. Else,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for my understanding, config.XDSServers() will be there only in case of fallback servers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

logPrefix: clientPrefix(c),
})
}
c.topLevelAuthority = newAuthority(authorityArgs{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for my understanding, what is the difference between top level authority and authority map ones?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Top-level server configuration is used when an authority configuration does not contain server configuration.

authorities map[string]*authority // Map from authority names in bootstrap to authority struct.
config *bootstrap.Config // Complete bootstrap configuration.
watchExpiryTimeout time.Duration // Expiry timeout for ADS watch.
backoff func(int) time.Duration // Backoff for ADS and LRS stream failures.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be uncommon to have different backoffs for ADS and LRS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't know of a scenario where that would be useful.

backoff func(int) time.Duration // Backoff for ADS and LRS stream failures.
transportBuilder transport.Builder // Builder to create transports to the xDS server.
resourceTypes *resourceTypeRegistry // Registry of resource types, for parsing incoming ADS responses.
serializer *grpcsync.CallbackSerializer // Serializer for invoking watcher callbacks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: `Serializer for invoking resource watcher callbacks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DOne.

return
}

cs.parent.channelsMu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer cs.parent.channelsMu.Unlock()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return
}

cs.parent.channelsMu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer cs.parent.channelsMu.Unlock()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -69,22 +145,57 @@ func (c *clientImpl) BootstrapConfig() *bootstrap.Config {
return c.config
}

// close closes the gRPC connection to the management server.
// close closes the xDS client and releases all resources.
func (c *clientImpl) close() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i asked this above. Will user of the xds client call this or its internally called on some condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called when the reference count on the xDS client comes down to 0. See client_refcounted.go for details.

c.logger.Infof("Reviving an xdsChannel from the idle cache for server config %q", serverConfig)
}
state, _ := state.(*channelState)
c.xdsActiveChannels[serverConfig.String()] = state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.xdsActiveChannels[serverConfig.String()] = state. Shouldn't this need to be synchronized too like inside lock/unlock critical section? Actually may be the whole if block starting 266 needs to be in critical section?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I grab the lock at the top of this method. Are you saying that is not enough?

return nil, func() {}, fmt.Errorf("failed to create xdsChannel for server config %s: %v", serverConfig, err)
}
state.channel = channel
c.xdsActiveChannels[serverConfig.String()] = state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. why 304 and 305 don't need to be within critical section?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I grab the lock at the top of this method. Are you saying that is not enough?

@zasweq zasweq assigned easwars and unassigned zasweq Oct 29, 2024
@easwars easwars assigned zasweq and unassigned easwars Oct 29, 2024
Copy link

codecov bot commented Oct 29, 2024

Codecov Report

Attention: Patch coverage is 78.33656% with 112 lines in your changes missing coverage. Please review.

Project coverage is 81.94%. Comparing base (8212cf0) to head (75d4c70).
Report is 18 commits behind head on master.

Files with missing lines Patch % Lines
xds/internal/xdsclient/authority.go 81.62% 29 Missing and 14 partials ⚠️
xds/internal/xdsclient/clientimpl.go 76.33% 26 Missing and 14 partials ⚠️
xds/internal/xdsclient/channel.go 57.14% 8 Missing and 4 partials ⚠️
xds/internal/xdsclient/clientimpl_watchers.go 78.78% 5 Missing and 2 partials ⚠️
xds/internal/xdsclient/transport/lrs/lrs_stream.go 0.00% 5 Missing and 1 partial ⚠️
xds/internal/xdsclient/client_new.go 92.10% 2 Missing and 1 partial ⚠️
xds/internal/xdsclient/clientimpl_loadreport.go 83.33% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #7773      +/-   ##
==========================================
+ Coverage   81.40%   81.94%   +0.53%     
==========================================
  Files         368      371       +3     
  Lines       36768    37702     +934     
==========================================
+ Hits        29932    30895     +963     
+ Misses       5607     5528      -79     
- Partials     1229     1279      +50     
Files with missing lines Coverage Δ
xds/internal/xdsclient/client_refcounted.go 83.72% <100.00%> (ø)
xds/internal/xdsclient/clientimpl_dump.go 100.00% <100.00%> (ø)
xds/internal/xdsclient/logging.go 100.00% <ø> (ø)
xds/internal/xdsclient/clientimpl_loadreport.go 70.00% <83.33%> (-3.34%) ⬇️
xds/internal/xdsclient/client_new.go 85.88% <92.10%> (+2.54%) ⬆️
xds/internal/xdsclient/transport/lrs/lrs_stream.go 70.55% <0.00%> (+31.81%) ⬆️
xds/internal/xdsclient/clientimpl_watchers.go 78.87% <78.78%> (+2.94%) ⬆️
xds/internal/xdsclient/channel.go 78.16% <57.14%> (-0.61%) ⬇️
xds/internal/xdsclient/clientimpl.go 77.60% <76.33%> (-11.69%) ⬇️
xds/internal/xdsclient/authority.go 82.50% <81.62%> (-7.25%) ⬇️

... and 44 files with indirect coverage changes

// a channel to the first server configuration is created when the first watch
// is registered, and more channels are created as needed by the fallback logic.
func newAuthority(args authorityBuildOptions) *authority {
ctx, cancel := context.WithCancel(context.Background())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not applicable to this PR; but just to remind about context scoping discussion for the future when you refactor xDS Client @purnesh42H

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

xds/internal/xdsclient/authority.go Outdated Show resolved Hide resolved
xds/internal/xdsclient/authority.go Outdated Show resolved Hide resolved
xds/internal/xdsclient/authority.go Show resolved Hide resolved
@@ -1,295 +0,0 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a pure refactor; can we not keep some of these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are already moved out to e2e style tests in xds/internal/xdsclient/tests/authority_test.go, xds/internal/xdsclient/tests/ads_stream_watch_test.go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh those cover the same functionality? The diffs in those e2e tests seemed pretty minor so I thought we just deleted these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following tests old tests TestTimerAndWatchStateOnSendCallback, TestTimerAndWatchStateOnErrorCallback and TestWatchResourceTimerCanRestartOnIgnoredADSRecvError are now covered in the new test TestADS_WatchState_StreamBreaks

And we also have another new test TestADS_WatchState_TimerFires which explicitly tests timer firing scenario.

xds/internal/xdsclient/clientimpl.go Outdated Show resolved Hide resolved
xds/internal/xdsclient/clientimpl.go Outdated Show resolved Hide resolved
xds/internal/xdsclient/clientimpl.go Show resolved Hide resolved
xds/internal/xdsclient/client_new.go Outdated Show resolved Hide resolved
xds/internal/xdsclient/client_new.go Show resolved Hide resolved
@zasweq zasweq assigned easwars and unassigned zasweq Oct 30, 2024
@easwars easwars assigned zasweq and unassigned easwars Oct 30, 2024
Copy link
Contributor

@zasweq zasweq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@easwars easwars merged commit ef0f617 into grpc:master Nov 1, 2024
14 of 15 checks passed
@easwars easwars deleted the xdsclient_with_newly_added_components branch November 1, 2024 15:50
misvivek pushed a commit to misvivek/grpc-go that referenced this pull request Nov 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants