-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
xds: Fix xDS Server in the case of dynamic RDS #6889
Conversation
7d784f1
to
5600ccf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made as detailed as pass as I could. Hopefully this is helpful while you wait for Doug to return.
// DefaultServerListenerWithRouteConfigName returns a basic xds Listener | ||
// resource to be used on the server side. The returned Listener resource | ||
// contains a RouteCongiguration resource name that needs to be resolved. | ||
func DefaultServerListenerWithRouteConfigName(host string, port uint32, secLevel SecurityLevel, routeName string) *v3listenerpb.Listener { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Existing function DefaultServerListener
calls defaultServerListenerCommon
whose last parameter indicates whether the route configuration is to be inlined or not? Can the implementation of this function be replaced with a one-liner return defaultServerListenerCommon(host, port, secLevel, routeName, false)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good catch. This was left over from when I wrote the e2e test to trigger failure.
// specifies to route to a route specfying non forwarding action. This is | ||
// intended to be used on the server side for RDS requests, and corresponds to | ||
// the inline route configuration in DefaultServerListener. | ||
func RouteConfigNonForwardingTarget(routeName string) *v3routepb.RouteConfiguration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We currently have a RouteConfigOptions
struct that is passed to RouteConfigResourceWithOptions
. Can the options struct be extended to include a field which specifies the type of route action instead of having separate functions for each type of route action?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this, and I think what I have now is better/simpler. The issue is the route config's route action is currently coupeled to the cluster specifier type, and also has another layer of knobs for each specific route action. This is just the cluster specifier type cluster, but adding a knob that only applies to one cluster specifier plugin feels weird to me. Thank you for the suggestion though.
// CallbackConn is a conn with a callback function. | ||
type CallbackConn interface { | ||
Callback(ServerTransport) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See: go/go-style/decisions#interfaces
Go interfaces generally belong in the package that consumes values of the interface type,
not a package that implements the interface type.
Why does this interface have to be defined in the internal/transport
package? I don't see this package consuming this interface or implementing it.
Also, this interface is utterly underspecified. It does not explain any of the following:
- when is this method called?
- is it called only ever once or can it called multiple times?
- who calls this method?
- who implements the interface?
- what are implementations supposed to do in this callback?
- do implementations have to be non-blocking in this callback?
From the PR description:
server: Call a callback with the server transport once it’s created on the Conn.
This gives access to the server transport to xDS layer, which will be gracefully
closed on transitions into not serving and transitions to a new LDS configuration.
It also guarantees at some point the server transport will be gracefully drained.
This replaces the Drain() operation previously present.
I don't see the problem with the existing approach of draining server transports that this new method overcomes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method guarantees synchornization between the Conns closing, and a new Conn being accepted. Previously, there was no way in the case you yielded thread after an Accept and synchronized with the possibility of a new Conn being added to the map, so it races. This guarantees synchronization between those components. Will document this further. (You don't have the server transport object until it wraps the accepted conn with the http2_server, so we were doing it wrong). Previously, the server would drain all the conns.
// Accept() for all incoming connections, but writes happen rarely (when we | ||
// get a Listener resource update). | ||
// mu guards access to the current serving mode and the active filter chain | ||
// manager. | ||
mu sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that this is defined as a read-write mutex in existing code as well. But I don't see a reason why this needs to be a read-write mutex. Currently, the only place where a write lock is acquired is from the for
loop in Accept
. There cannot be more than one goroutine executing that for loop concurrently.
Also, using a read-write mutex needs to be done with extreme caution. We don't have tools that prevent us from writing to the data that is being protected by this mutex when holding only a read lock. I personally debugged a race due to a bug where we were writing while only holding the read lock, and it was not fun.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The write lock is also held in maybeUpdateFilterChains, called in both LDS and RDS flow, which can come in async. It also is potentially held at the beginning of LDS processing, and in an LDS Resource Not Found.
// Filter chains received as part of the last good update. | ||
filterChains *xdsresource.FilterChainManager | ||
// Filter chain manager currently serving. | ||
activeFilterChainManager *xdsresource.FilterChainManager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that this field is being accessed from handleRDSUpdate
where we are not holding the lock.
It is also accessed from instantiateFilterChainRoutingConfigurations
which does not hold the lock. I see that it is currently being called with the lock held, but that doesn't guarantee that it will always be called with the lock held in the future as well. Please mark functions with a Locked
suffix to indicate that they need to be called while holding a mutex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added locked. The reason it is ok to read that in handleRDSUpdate, is it can only be written to in instantiateFilterChainRoutingConfigurations, which is guaranteed to be sync (as also called within an xDS update, which is all sync).
test/xds/xds_server_test.go
Outdated
waitForFailedRPCWithStatusCode(ctx, t, cc, status.New(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding")) | ||
} | ||
|
||
// waitForFailedRPCWithStatusCode makes Unary RPC's until it receives the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: s/Unary/unary/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
test/xds/xds_server_test.go
Outdated
t.Helper() | ||
|
||
c := testgrpc.NewTestServiceClient(cc) | ||
ticker := time.NewTicker(1 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One second is a lot of time. Something like 10ms
should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched. This was one second in master, wonder why (I switched it to expect a certain status and error and to use a ticker).
test/xds/xds_server_test.go
Outdated
for { | ||
select { | ||
case <-ctx.Done(): | ||
t.Fatalf("failure when waiting for RPCs to fail with certain status %v: %v. most recent error received from RPC: %v", sts, ctx.Err(), err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error message puts want
before got
. See: go/go-style/decisions#got-before-want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sts is the want, so I think this is correct here.
_, err = c.EmptyCall(ctx, &testpb.Empty{}) | ||
for _, st := range sts { | ||
if status.Code(err) == st.Code() && strings.Contains(err.Error(), st.Message()) { | ||
t.Logf("most recent error happy case: %v", err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this logline useful when the test fails?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but it is part of the t.Fatalf argument above.
// Close any new connections. After it has received the resource not found | ||
// error, the server should move to serving, successfully Accept Connections, | ||
// and fail at the L7 level with resource not found specified. | ||
func (s) TestResourceNotFoundRDS(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a lot of WIP tests from this point on. So, I'm stopping here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## master #6889 +/- ##
==========================================
+ Coverage 83.65% 83.76% +0.10%
==========================================
Files 286 287 +1
Lines 30756 30928 +172
==========================================
+ Hits 25730 25907 +177
- Misses 3963 3965 +2
+ Partials 1063 1056 -7
|
Continued in #6915. The main thing about this PR is the diff with respect to Easwar's comments, as I did it in one commit :). |
Fix the xDS Server to align with spec in https://github.com/grpc/proposal/blob/master/A36-xds-for-servers.md.
xds/internal/server/listener_wrapper:
xds/internal/server/conn_wrapper: Read routing configuration dynamically through an atomic load of a pointer, also add L7 error representation.
xds/internal/server/rds_handler: Persist rds updates and error conditions for each specific route resource being watched. Add a helper which determines whether all dynamic rds resources needed have been received. RDS errors mean the RDS has received configuration, trigger failure at L7 routing layer against incoming RPCs.
xds/server: Don’t block on a “good update”, immediately start serving on lis passed into Serve once wrapped, if xDS resources aren’t ready or LDS returns resource not found Accept() and Close() in not serving mode. Log any L7 routing errors due to xDS Configuration problems.
server: Call a callback with the server transport once it’s created on the Conn. This gives access to the server transport to xDS layer, which will be gracefully closed on transitions into not serving and transitions to a new LDS configuration. It also guarantees at some point the server transport will be gracefully drained. This replaces the Drain() operation previously present.
All LDS + RDS callbacks from the client are synchronous. The events that can happen asynchronous are the Accepting of a Conn (protected by mutex) and also an RPC on the Conn (protected with an atomic pointer). Thus dropped spawned event goroutines handleServingModeChanges() in the xDS Server, and run() on the listener_wrapper. Handled all the xDS resources inline and synced data structures/behaviors using above methods. L7 error = fail RPC with status code UNAVAILABLE (don’t give back more information due to security risk).
Fixes #6788.
RELEASE NOTES: