-
Notifications
You must be signed in to change notification settings - Fork 517
/
server.go
226 lines (189 loc) · 6.6 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
package delta
import (
"context"
"errors"
"strconv"
"sync/atomic"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)
// Server is a wrapper interface which is meant to hold the proper stream handler for each xDS protocol.
type Server interface {
DeltaStreamHandler(stream stream.DeltaStream, typeURL string) error
}
type Callbacks interface {
// OnDeltaStreamOpen is called once an incremental xDS stream is open with a stream ID and the type URL (or "" for ADS).
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
OnDeltaStreamOpen(context.Context, int64, string) error
// OnDeltaStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
OnDeltaStreamClosed(int64)
// OnStreamDeltaRequest is called once a request is received on a stream.
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
OnStreamDeltaRequest(int64, *discovery.DeltaDiscoveryRequest) error
// OnStreamDelatResponse is called immediately prior to sending a response on a stream.
OnStreamDeltaResponse(int64, *discovery.DeltaDiscoveryRequest, *discovery.DeltaDiscoveryResponse)
}
var deltaErrorResponse = &cache.RawDeltaResponse{}
type server struct {
cache cache.ConfigWatcher
callbacks Callbacks
// total stream count for counting bi-di streams
streamCount int64
ctx context.Context
}
// NewServer creates a delta xDS specific server which utilizes a ConfigWatcher and delta Callbacks.
func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server {
return &server{
cache: config,
callbacks: callbacks,
ctx: ctx,
}
}
func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error {
streamID := atomic.AddInt64(&s.streamCount, 1)
// streamNonce holds a unique nonce for req-resp pairs per xDS stream.
var streamNonce int64
// a collection of stack allocated watches per request type
watches := newWatches()
defer func() {
watches.Cancel()
if s.callbacks != nil {
s.callbacks.OnDeltaStreamClosed(streamID)
}
}()
// Sends a response, returns the new stream nonce
send := func(resp cache.DeltaResponse) (string, error) {
if resp == nil {
return "", errors.New("missing response")
}
response, err := resp.GetDeltaDiscoveryResponse()
if err != nil {
return "", err
}
streamNonce = streamNonce + 1
response.Nonce = strconv.FormatInt(streamNonce, 10)
if s.callbacks != nil {
s.callbacks.OnStreamDeltaResponse(streamID, resp.GetDeltaRequest(), response)
}
return response.Nonce, str.Send(response)
}
if s.callbacks != nil {
if err := s.callbacks.OnDeltaStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil {
return err
}
}
var node = &core.Node{}
for {
select {
case <-s.ctx.Done():
return nil
case resp, more := <-watches.deltaMuxedResponses:
if !more {
break
}
typ := resp.GetDeltaRequest().GetTypeUrl()
if resp == deltaErrorResponse {
return status.Errorf(codes.Unavailable, typ+" watch failed")
}
nonce, err := send(resp)
if err != nil {
return err
}
watch := watches.deltaWatches[typ]
watch.nonce = nonce
watch.state.SetResourceVersions(resp.GetNextVersionMap())
watches.deltaWatches[typ] = watch
case req, more := <-reqCh:
// input stream ended or errored out
if !more {
return nil
}
if req == nil {
return status.Errorf(codes.Unavailable, "empty request")
}
if s.callbacks != nil {
if err := s.callbacks.OnStreamDeltaRequest(streamID, req); err != nil {
return err
}
}
// The node information might only be set on the first incoming delta discovery request, so store it here so we can
// reset it on subsequent requests that omit it.
if req.Node != nil {
node = req.Node
} else {
req.Node = node
}
// type URL is required for ADS but is implicit for any other xDS stream
if defaultTypeURL == resource.AnyType {
if req.TypeUrl == "" {
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
}
} else if req.TypeUrl == "" {
req.TypeUrl = defaultTypeURL
}
typeURL := req.GetTypeUrl()
// cancel existing watch to (re-)request a newer version
watch, ok := watches.deltaWatches[typeURL]
if !ok {
// Initialize the state of the stream.
// Since there was no previous state, we know we're handling the first request of this type
// so we set the initial resource versions if we have any, and also signal if this stream is in wildcard mode.
watch.state = stream.NewStreamState(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions())
} else {
watch.Cancel()
}
s.subscribe(req.GetResourceNamesSubscribe(), watch.state.GetResourceVersions())
s.unsubscribe(req.GetResourceNamesUnsubscribe(), watch.state.GetResourceVersions())
watch.responses = make(chan cache.DeltaResponse, 1)
watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses)
watches.deltaWatches[typeURL] = watch
go func() {
resp, more := <-watch.responses
if more {
watches.deltaMuxedResponses <- resp
}
}()
}
}
}
func (s *server) DeltaStreamHandler(str stream.DeltaStream, typeURL string) error {
// a channel for receiving incoming delta requests
reqCh := make(chan *discovery.DeltaDiscoveryRequest)
// we need to concurrently handle incoming requests since we kick off processDelta as a return
go func() {
for {
select {
case <-str.Context().Done():
close(reqCh)
return
default:
req, err := str.Recv()
if err != nil {
close(reqCh)
return
}
reqCh <- req
}
}
}()
return s.processDelta(str, reqCh, typeURL)
}
// When we subscribe, we just want to make the cache know we are subscribing to a resource.
// Providing a name with an empty version is enough to make that happen.
func (s *server) subscribe(resources []string, sv map[string]string) {
for _, resource := range resources {
sv[resource] = ""
}
}
// Unsubscriptions remove resources from the stream state to
// indicate to the cache that we don't care about the resource anymore
func (s *server) unsubscribe(resources []string, sv map[string]string) {
for _, resource := range resources {
delete(sv, resource)
}
}