Skip to content

Commit

Permalink
feat(mads) add support for HTTP long polling (#2121)
Browse files Browse the repository at this point in the history
* feat(cache) implement a blocking Fetch

Signed-off-by: austin ce <austin.cawley@gmail.com>

* feat(mads) use timeout in fetch request

Signed-off-by: austin ce <austin.cawley@gmail.com>

* test(mads) add HTTP blocking update test

Signed-off-by: austin ce <austin.cawley@gmail.com>

* chore(mads) correct param name

Signed-off-by: austin ce <austin.cawley@gmail.com>

* chore(mads) change 'fetchTimeout' to 'defaultFetchTimeout' in config

Signed-off-by: austin ce <austin.cawley@gmail.com>

* chore(config) update api-server config test

Signed-off-by: austin ce <austin.cawley@gmail.com>
  • Loading branch information
austince authored Jun 10, 2021
1 parent 26726ff commit dbba76c
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pkg/api-server/config_ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ var _ = Describe("Config WS", func() {
"v1"
],
"assignmentRefreshInterval": "1s",
"fetchTimeout": "30s",
"defaultFetchTimeout": "30s",
"grpcPort": 0,
"port": 5676
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ store:
maxOpenConnections: 0 # ENV: KUMA_STORE_POSTGRES_MAX_OPEN_CONNECTIONS
# Maximum number of connections in the idle connection pool
# <0 value means no idle connections and 0 means default max idle connections
maxIdleConnections: 0 # ENV: KUMA_STORE_POSTGRES_MAX_IDLE_CONNECTIONS
maxIdleConnections: 0 # ENV: KUMA_STORE_POSTGRES_MAX_IDLE_CONNECTIONS
# TLS settings
tls:
# Mode of TLS connection. Available values (disable, verifyNone, verifyCa, verifyFull)
Expand Down Expand Up @@ -100,8 +100,8 @@ monitoringAssignmentServer:
- v1
# Interval for re-generating monitoring assignments for clients connected to the Control Plane.
assignmentRefreshInterval: 1s # ENV: KUMA_MONITORING_ASSIGNMENT_SERVER_ASSIGNMENT_REFRESH_INTERVAL
# Timeout for a single HTTP fetch-based discovery
fetchTimeout: 30s # ENV: KUMA_MONITORING_ASSIGNMENT_SERVER_FETCH_TIMEOUT
# The default timeout for a single fetch-based discovery request, if not specified
defaultFetchTimeout: 30s # ENV: KUMA_MONITORING_ASSIGNMENT_SERVER_DEFAULT_FETCH_TIMEOUT

# Envoy XDS server configuration
xdsServer:
Expand Down
6 changes: 3 additions & 3 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ var _ = Describe("Config loader", func() {
Expect(cfg.MonitoringAssignmentServer.GrpcPort).To(Equal(uint32(3333)))
Expect(cfg.MonitoringAssignmentServer.Port).To(Equal(uint32(2222)))
Expect(cfg.MonitoringAssignmentServer.AssignmentRefreshInterval).To(Equal(12 * time.Second))
Expect(cfg.MonitoringAssignmentServer.FetchTimeout).To(Equal(45 * time.Second))
Expect(cfg.MonitoringAssignmentServer.DefaultFetchTimeout).To(Equal(45 * time.Second))
Expect(cfg.MonitoringAssignmentServer.ApiVersions).To(HaveLen(1))
Expect(cfg.MonitoringAssignmentServer.ApiVersions).To(ContainElements("v1"))

Expand Down Expand Up @@ -298,7 +298,7 @@ apiServer:
monitoringAssignmentServer:
grpcPort: 3333
port: 2222
fetchTimeout: 45s
defaultFetchTimeout: 45s
apiVersions: [v1]
assignmentRefreshInterval: 12s
runtime:
Expand Down Expand Up @@ -473,7 +473,7 @@ sdsServer:
"KUMA_API_SERVER_AUTH_ALLOW_FROM_LOCALHOST": "false",
"KUMA_MONITORING_ASSIGNMENT_SERVER_GRPC_PORT": "3333",
"KUMA_MONITORING_ASSIGNMENT_SERVER_PORT": "2222",
"KUMA_MONITORING_ASSIGNMENT_SERVER_FETCH_TIMEOUT": "45s",
"KUMA_MONITORING_ASSIGNMENT_SERVER_DEFAULT_FETCH_TIMEOUT": "45s",
"KUMA_MONITORING_ASSIGNMENT_SERVER_API_VERSIONS": "v1",
"KUMA_MONITORING_ASSIGNMENT_SERVER_ASSIGNMENT_REFRESH_INTERVAL": "12s",
"KUMA_REPORTS_ENABLED": "false",
Expand Down
6 changes: 3 additions & 3 deletions pkg/config/mads/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var log = core.Log.WithName("mads-config")
func DefaultMonitoringAssignmentServerConfig() *MonitoringAssignmentServerConfig {
return &MonitoringAssignmentServerConfig{
Port: 5676,
FetchTimeout: 30 * time.Second,
DefaultFetchTimeout: 30 * time.Second,
ApiVersions: []mads.ApiVersion{mads.API_V1_ALPHA1, mads.API_V1},
AssignmentRefreshInterval: 1 * time.Second,
}
Expand All @@ -32,8 +32,8 @@ type MonitoringAssignmentServerConfig struct {
// Port of the server that serves Monitoring Assignment Discovery Service (MADS)
// over both grpc and http.
Port uint32 `yaml:"port" envconfig:"kuma_monitoring_assignment_server_port"`
// The timeout for a single fetch-based discovery request.
FetchTimeout time.Duration `yaml:"fetchTimeout" envconfig:"kuma_monitoring_assignment_server_fetch_timeout"`
// The default timeout for a single fetch-based discovery request, if not specified.
DefaultFetchTimeout time.Duration `yaml:"defaultFetchTimeout" envconfig:"kuma_monitoring_assignment_server_default_fetch_timeout"`
// Which observability apiVersions to serve
ApiVersions []string `yaml:"apiVersions" envconfig:"kuma_monitoring_assignment_server_api_versions"`
// Interval for re-generating monitoring assignments for clients connected to the Control Plane.
Expand Down
9 changes: 5 additions & 4 deletions pkg/mads/v1/service/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"io/ioutil"
"net/http"

"github.com/golang/protobuf/jsonpb"

"github.com/emicklei/go-restful"
v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
cache_types "github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/golang/protobuf/jsonpb"

rest_errors "github.com/kumahq/kuma/pkg/core/rest/errors"
rest_error_types "github.com/kumahq/kuma/pkg/core/rest/errors/types"
Expand Down Expand Up @@ -60,8 +59,10 @@ func (s *service) handleDiscovery(req *restful.Request, res *restful.Response) {

discoveryReq.TypeUrl = mads_v1.MonitoringAssignmentType

ctx, cancel := context.WithTimeout(context.Background(), s.config.FetchTimeout)
defer cancel()
timeout := s.config.DefaultFetchTimeout

ctx, cancelFunc := context.WithTimeout(req.Request.Context(), timeout)
defer cancelFunc()

discoveryRes, err := s.server.FetchMonitoringAssignments(ctx, discoveryReq)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/mads/v1/service/mads.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ type Server interface {
observability_v1.MonitoringAssignmentDiscoveryServiceServer
}

func NewServer(config envoy_cache.Cache, callbacks envoy_server.Callbacks) Server {
sotwServer := sotw.NewServer(context.Background(), config, callbacks)
restServer := rest.NewServer(config, callbacks)
func NewServer(cache envoy_cache.Cache, callbacks envoy_server.Callbacks) Server {
sotwServer := sotw.NewServer(context.Background(), cache, callbacks)
restServer := rest.NewServer(cache, callbacks)
return &server{stream: sotwServer, rest: restServer}
}

Expand Down
94 changes: 93 additions & 1 deletion pkg/mads/v1/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,19 @@ var _ = Describe("MADS http service", func() {

var resManager core_manager.ResourceManager

const refreshInterval = time.Millisecond * 500
// the refresh timeout should be smaller than the default fetch timeout so
// a refresh can happen during a single request

const refreshInterval = 500 * time.Millisecond

const defaultFetchTimeout = 5 * time.Second

BeforeEach(func() {
resManager = core_manager.NewResourceManager(memory.NewStore())

cfg := mads_config.DefaultMonitoringAssignmentServerConfig()
cfg.AssignmentRefreshInterval = refreshInterval
cfg.DefaultFetchTimeout = defaultFetchTimeout

svc := service.NewService(cfg, resManager, testing.NullLogger{})

Expand Down Expand Up @@ -365,5 +371,91 @@ var _ = Describe("MADS http service", func() {
Expect(discoveryRes.VersionInfo).ToNot(BeEmpty())
Expect(discoveryRes.Resources).To(HaveLen(2))
})

It("should block until there are updates", func() {
// given
discoveryReq := envoy_v3.DiscoveryRequest{
VersionInfo: "",
ResponseNonce: "",
TypeUrl: mads_v1.MonitoringAssignmentType,
ResourceNames: []string{},
Node: &envoy_core.Node{
Id: "test",
},
}
reqBytes, err := pbMarshaller.MarshalToString(&discoveryReq)
Expect(err).ToNot(HaveOccurred())

// when
req, err := http.NewRequest("POST", monitoringAssignmentPath, strings.NewReader(reqBytes))
Expect(err).ToNot(HaveOccurred())
req.Header.Add("content-type", "application/json")

resp, err := http.DefaultClient.Do(req)

// then
Expect(err).ToNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))

// when
respBody, err := ioutil.ReadAll(resp.Body)

// then
Expect(err).ToNot(HaveOccurred())

// when
discoveryRes := &envoy_v3.DiscoveryResponse{}
err = jsonpb.Unmarshal(bytes.NewReader(respBody), discoveryRes)
// then
Expect(err).ToNot(HaveOccurred())
Expect(discoveryRes.TypeUrl).To(Equal(mads_v1.MonitoringAssignmentType))
Expect(discoveryRes.VersionInfo).ToNot(BeEmpty())
Expect(discoveryRes.Resources).To(HaveLen(1))

// and given the same version
discoveryReq.VersionInfo = discoveryRes.VersionInfo
reqBytes, err = pbMarshaller.MarshalToString(&discoveryReq)
Expect(err).ToNot(HaveOccurred())

// when
req, err = http.NewRequest("POST", monitoringAssignmentPath, strings.NewReader(reqBytes))
Expect(err).ToNot(HaveOccurred())
req.Header.Add("content-type", "application/json")

respChan := make(chan *http.Response, 1)

go func() {
resp2, err := http.DefaultClient.Do(req)
Expect(err).ToNot(HaveOccurred())

respChan <- resp2
}()

// given an updated mesh while the request is in progress
time.Sleep(defaultFetchTimeout / 2)

err = createDataPlane(dp2)
Expect(err).ToNot(HaveOccurred())

resp2 := <-respChan

// then
Expect(err).ToNot(HaveOccurred())
Expect(resp2.StatusCode).To(Equal(http.StatusOK))

// when
respBody, err = ioutil.ReadAll(resp2.Body)

// then
Expect(err).ToNot(HaveOccurred())

// when
err = jsonpb.Unmarshal(bytes.NewReader(respBody), discoveryRes)
// then
Expect(err).ToNot(HaveOccurred())
Expect(discoveryRes.TypeUrl).To(Equal(mads_v1.MonitoringAssignmentType))
Expect(discoveryRes.VersionInfo).ToNot(BeEmpty())
Expect(discoveryRes.Resources).To(HaveLen(2))
})
})
})
22 changes: 22 additions & 0 deletions pkg/util/xds/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,13 @@ func createResponse(request *envoy_cache.Request, resources map[string]types.Res

// Fetch implements the envoy_cache fetch function.
// Fetch is called on multiple streams, so responding to individual names with the same version works.
// If there is a Deadline set on the context, the call will block until either the context is terminated
// or there is a new update.
func (cache *snapshotCache) Fetch(ctx context.Context, request *envoy_cache.Request) (envoy_cache.Response, error) {
if _, hasDeadline := ctx.Deadline(); hasDeadline {
return cache.blockingFetch(ctx, request)
}

nodeID := cache.hash.ID(request.Node)

cache.mu.RLock()
Expand All @@ -354,6 +360,22 @@ func (cache *snapshotCache) Fetch(ctx context.Context, request *envoy_cache.Requ
return nil, fmt.Errorf("missing snapshot for %q", nodeID)
}

// blockingFetch will wait until either the context is terminated or new resources become available
func (cache *snapshotCache) blockingFetch(ctx context.Context, request *envoy_cache.Request) (envoy_cache.Response, error) {
watchChan, cancelFunc := cache.CreateWatch(request)
if cancelFunc != nil {
defer cancelFunc()
}

select {
case <-ctx.Done():
// finished without an update
return nil, &types.SkipFetchError{}
case resp := <-watchChan:
return resp, nil
}
}

// GetStatusInfo retrieves the status info for the node.
func (cache *snapshotCache) GetStatusInfo(node string) StatusInfo {
cache.mu.RLock()
Expand Down

0 comments on commit dbba76c

Please sign in to comment.