Skip to content

Commit

Permalink
Added FadeIn data inside endpointregistry
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Zavodskikh <roman.zavodskikh@zalando.de>
  • Loading branch information
Roman Zavodskikh committed Sep 25, 2023
1 parent eba1761 commit 3135b50
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 17 deletions.
27 changes: 22 additions & 5 deletions filters/fadein/fadein.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ const (
EndpointCreatedName = filters.EndpointCreatedName
)

type Options struct {
EndpointRegistry *routing.EndpointRegistry
}

type (
fadeIn struct {
duration time.Duration
Expand All @@ -39,7 +43,8 @@ type (

postProcessor struct {
// "http://10.2.1.53:1234": {t0 60s t0-10s}
detected map[string]detectedFadeIn
detected map[string]detectedFadeIn
endpointRegistry *routing.EndpointRegistry
}
)

Expand Down Expand Up @@ -192,14 +197,20 @@ func (endpointCreated) CreateFilter(args []interface{}) (filters.Filter, error)
func (endpointCreated) Request(filters.FilterContext) {}
func (endpointCreated) Response(filters.FilterContext) {}

// NewPostProcessor creates post-processor for maintaining the detection time of LB endpoints with fade-in
// behavior.
func NewPostProcessor() routing.PostProcessor {
// NewPostProcessorWithOptions creates post-processor for maintaining
// the fade-in behavior.
func NewPostProcessorWithOptions(o Options) routing.PostProcessor {
return &postProcessor{
detected: make(map[string]detectedFadeIn),
detected: make(map[string]detectedFadeIn),
endpointRegistry: o.EndpointRegistry,
}
}

// Deprecated: use NewPostProcessorWithOptions
func NewPostProcessor() routing.PostProcessor {
return NewPostProcessorWithOptions(Options{})
}

func (p *postProcessor) Do(r []*routing.Route) []*routing.Route {
now := time.Now()

Expand Down Expand Up @@ -232,9 +243,15 @@ func (p *postProcessor) Do(r []*routing.Route) []*routing.Route {
detected := p.detected[key].when
if detected.IsZero() || endpointsCreated[key].After(detected) {
detected = now
if p.endpointRegistry != nil {
p.endpointRegistry.SetDetectedTime(ep.Host, detected)
}
}

ep.Detected = detected
if p.endpointRegistry != nil {
p.endpointRegistry.SetFadeIn(ep.Host, ri.Id, ri.LBFadeInDuration, ri.LBFadeInExponent)
}
p.detected[key] = detectedFadeIn{
when: detected,
duration: ri.LBFadeInDuration,
Expand Down
57 changes: 48 additions & 9 deletions filters/fadein/fadein_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestCreateEndpointCreated(t *testing.T) {
}

func TestPostProcessor(t *testing.T) {
createRouting := func(t *testing.T, routes string) (*routing.Routing, func(string)) {
createRouting := func(t *testing.T, routes string, registry *routing.EndpointRegistry) (*routing.Routing, func(string)) {
dc, err := testdataclient.NewDoc(routes)
if err != nil {
t.Fatal(err)
Expand All @@ -197,7 +197,7 @@ func TestPostProcessor(t *testing.T) {
},
PostProcessors: []routing.PostProcessor{
loadbalancer.NewAlgorithmProvider(),
NewPostProcessor(),
NewPostProcessorWithOptions(Options{EndpointRegistry: registry}),
},
SignalFirstLoad: true,
})
Expand Down Expand Up @@ -234,36 +234,55 @@ func TestPostProcessor(t *testing.T) {
baz: Path("/baz") -> <"http://10.0.1.1:8080", "http://10.0.1.2:8080">
`

rt, _ := createRouting(t, routes)
registry := routing.NewEndpointRegistry(routing.RegistryOptions{})
rt, _ := createRouting(t, routes, registry)

foo := route(rt, "/foo")
if foo == nil || foo.LBFadeInDuration != 0 {
t.Fatal("failed to preserve non-LB route")
}
if registry.GetMetrics("www.example.org:443").FadeInDuration("foo") != 0 {
t.Fatal("failed to preserve non-LB route")
}

bar := route(rt, "/bar")
if bar == nil || bar.LBFadeInDuration != time.Minute {
t.Fatal("failed to postprocess LB route")
}
if registry.GetMetrics("10.0.0.1:8080").FadeInDuration("bar") != time.Minute {
t.Fatal("failed to postprocess LB route")
}
if registry.GetMetrics("10.0.0.2:8080").FadeInDuration("bar") != time.Minute {
t.Fatal("failed to postprocess LB route")
}

for _, ep := range bar.LBEndpoints {
if ep.Detected.IsZero() {
t.Fatal("failed to set detection time")
}
if registry.GetMetrics(ep.Host).DetectedTime().IsZero() {
t.Fatal("failed to set detection time")
}
}

baz := route(rt, "/baz")
if baz == nil || baz.LBFadeInDuration != 0 {
t.Fatal("failed to preserve non-fade LB route")
}
if registry.GetMetrics("10.0.1.1:8080").FadeInDuration("baz") != 0 {
t.Fatal("failed to preserve non-fade LB route")
}
if registry.GetMetrics("10.0.1.2:8080").FadeInDuration("baz") != 0 {
t.Fatal("failed to preserve non-fade LB route")
}
})

t.Run("invalid endpoint address", func(t *testing.T) {
const routes = `
* -> fadeIn("1m") -> <"http://::">
`

rt, _ := createRouting(t, routes)
rt, _ := createRouting(t, routes, nil)
r := route(rt, "/")
if r != nil {
t.Fatal("created invalid LB endpoint")
Expand All @@ -275,19 +294,24 @@ func TestPostProcessor(t *testing.T) {
* -> fadeIn("-1m") -> <"http://10.0.0.1:8080">
`

rt, _ := createRouting(t, routes)
registry := routing.NewEndpointRegistry(routing.RegistryOptions{})
rt, _ := createRouting(t, routes, registry)
r := route(rt, "/")
if r == nil || len(r.LBEndpoints) == 0 || !r.LBEndpoints[0].Detected.IsZero() {
t.Fatal("failed to ignore negative duration")
}
if !registry.GetMetrics("10.0.0.1:8080").DetectedTime().IsZero() {
t.Fatal("failed to ignore negative duration")
}
})

t.Run("endpoint already detected", func(t *testing.T) {
const routes = `
* -> fadeIn("1m") -> <"http://10.0.0.1:8080">
`

rt, update := createRouting(t, routes)
registry := routing.NewEndpointRegistry(routing.RegistryOptions{})
rt, update := createRouting(t, routes, registry)
firstDetected := time.Now()

const nextRoutes = `
Expand All @@ -303,6 +327,9 @@ func TestPostProcessor(t *testing.T) {
if ep.Detected.After(firstDetected) {
t.Fatal("Failed to keep detection time.")
}
if registry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) {
t.Fatal("Failed to keep detection time.")
}

found = true
}
Expand All @@ -318,7 +345,8 @@ func TestPostProcessor(t *testing.T) {
* -> fadeIn("1m") -> <"http://10.0.0.1:8080", "http://10.0.0.2:8080">
`

rt, update := createRouting(t, initialRoutes)
registry := routing.NewEndpointRegistry(routing.RegistryOptions{})
rt, update := createRouting(t, initialRoutes, registry)
firstDetected := time.Now()

const nextRoutes = `
Expand All @@ -336,6 +364,9 @@ func TestPostProcessor(t *testing.T) {
if ep.Detected.After(firstDetected) {
t.Fatal("Failed to keep detection time.")
}
if registry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) {
t.Fatal("Failed to keep detection time.")
}

found = true
}
Expand All @@ -351,7 +382,8 @@ func TestPostProcessor(t *testing.T) {
* -> fadeIn("15ms") -> <"http://10.0.0.1:8080", "http://10.0.0.2:8080">
`

rt, update := createRouting(t, initialRoutes)
registry := routing.NewEndpointRegistry(routing.RegistryOptions{})
rt, update := createRouting(t, initialRoutes, registry)
firstDetected := time.Now()

const nextRoutes = `
Expand All @@ -370,6 +402,9 @@ func TestPostProcessor(t *testing.T) {
if !ep.Detected.After(firstDetected) {
t.Fatal("Failed to clear detection time.")
}
if !registry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) {
t.Fatal("Failed to clear detection time.")
}

found = true
}
Expand All @@ -386,7 +421,8 @@ func TestPostProcessor(t *testing.T) {
`

routes := fmt.Sprintf(routesFmt, nows(t))
rt, update := createRouting(t, routes)
registry := routing.NewEndpointRegistry(routing.RegistryOptions{})
rt, update := createRouting(t, routes, registry)
firstDetected := time.Now()

const nextRoutesFmt = `
Expand All @@ -407,6 +443,9 @@ func TestPostProcessor(t *testing.T) {
if !ep.Detected.After(firstDetected) {
t.Fatal("Failed to reset detection time.")
}
if !registry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) {
t.Fatal("Failed to reset detection time.")
}

found = true
}
Expand Down
2 changes: 2 additions & 0 deletions loadbalancer/fadein_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func initializeEndpoints(endpointAges []time.Duration, fadeInDuration time.Durat
Detected: detectionTimes[i],
})
ctx.Registry.SetDetectedTime(eps[i], detectionTimes[i])
ctx.Registry.SetFadeIn(eps[i], "dummy", fadeInDuration, 1)
}

return ctx, eps
Expand Down Expand Up @@ -332,6 +333,7 @@ func benchmarkFadeIn(
Detected: detectionTimes[i],
})
registry.SetDetectedTime(eps[i], detectionTimes[i])
registry.SetFadeIn(eps[i], "dummy", fadeInDuration, 1)
}

var wg sync.WaitGroup
Expand Down
4 changes: 3 additions & 1 deletion proxy/fadeintesting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func (p *fadeInProxy) addInstances(n int) {
p.mx.Lock()
defer p.mx.Unlock()

endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
for i := 0; i < n; i++ {
client := p.backend.createDataClient()
fr := make(filters.Registry)
Expand All @@ -270,7 +271,8 @@ func (p *fadeInProxy) addInstances(n int) {
DataClients: []routing.DataClient{client},
PostProcessors: []routing.PostProcessor{
loadbalancer.NewAlgorithmProvider(),
fadein.NewPostProcessor(),
endpointRegistry,
fadein.NewPostProcessorWithOptions(fadein.Options{EndpointRegistry: endpointRegistry}),
},
})

Expand Down
31 changes: 30 additions & 1 deletion routing/endpointregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,18 @@ const lastSeenTimeout = 1 * time.Minute
type Metrics interface {
DetectedTime() time.Time
InflightRequests() int64
FadeInDuration(routeId string) time.Duration
}

type entry struct {
detected time.Time
inflightRequests int64
fadeIn map[string]fadeInData
}

type fadeInData struct {
fadeInDuration time.Duration
fadeInExponent float64
}

var _ Metrics = &entry{}
Expand All @@ -31,6 +38,15 @@ func (e *entry) InflightRequests() int64 {
return e.inflightRequests
}

func (e *entry) FadeInDuration(routeId string) time.Duration {
d, ok := e.fadeIn[routeId]
if !ok {
return 0
}

return d.fadeInDuration
}

type EndpointRegistry struct {
lastSeen map[string]time.Time
now func() time.Time
Expand Down Expand Up @@ -90,6 +106,11 @@ func (r *EndpointRegistry) GetMetrics(key string) Metrics {
e := r.getOrInitEntryLocked(key)
copy := &entry{}
*copy = *e

copy.fadeIn = make(map[string]fadeInData)
for k, v := range e.fadeIn {
copy.fadeIn[k] = v
}
return copy
}

Expand Down Expand Up @@ -117,6 +138,14 @@ func (r *EndpointRegistry) DecInflightRequest(key string) {
e.inflightRequests--
}

func (r *EndpointRegistry) SetFadeIn(key string, routeId string, duration time.Duration, exponent float64) {
r.mu.Lock()
defer r.mu.Unlock()

e := r.getOrInitEntryLocked(key)
e.fadeIn[routeId] = fadeInData{fadeInDuration: duration, fadeInExponent: exponent}
}

// getOrInitEntryLocked returns pointer to endpoint registry entry
// which contains the information about endpoint representing the
// following key. r.mu must be held while calling this function and
Expand All @@ -125,7 +154,7 @@ func (r *EndpointRegistry) DecInflightRequest(key string) {
func (r *EndpointRegistry) getOrInitEntryLocked(key string) *entry {
e, ok := r.data[key]
if !ok {
e = &entry{}
e = &entry{fadeIn: map[string]fadeInData{}}
r.data[key] = e
}
return e
Expand Down
2 changes: 1 addition & 1 deletion skipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1900,7 +1900,7 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error {
endpointRegistry,
schedulerRegistry,
builtin.NewRouteCreationMetrics(mtr),
fadein.NewPostProcessor(),
fadein.NewPostProcessorWithOptions(fadein.Options{EndpointRegistry: endpointRegistry}),
admissionControlSpec.PostProcessor(),
},
SignalFirstLoad: o.WaitFirstRouteLoad,
Expand Down

0 comments on commit 3135b50

Please sign in to comment.