Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added FadeIn data inside endpointregistry #2611

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions filters/fadein/fadein.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ const (
EndpointCreatedName = filters.EndpointCreatedName
)

type Options struct {
EndpointRegistry *routing.EndpointRegistry
}

type (
fadeIn struct {
duration time.Duration
Expand All @@ -39,7 +43,8 @@ type (

postProcessor struct {
// "http://10.2.1.53:1234": {t0 60s t0-10s}
detected map[string]detectedFadeIn
detected map[string]detectedFadeIn
endpointRegistry *routing.EndpointRegistry
}
)

Expand Down Expand Up @@ -192,14 +197,20 @@ func (endpointCreated) CreateFilter(args []interface{}) (filters.Filter, error)
func (endpointCreated) Request(filters.FilterContext) {}
func (endpointCreated) Response(filters.FilterContext) {}

// NewPostProcessor creates post-processor for maintaining the detection time of LB endpoints with fade-in
// behavior.
func NewPostProcessor() routing.PostProcessor {
// NewPostProcessorWithOptions creates post-processor for maintaining
// the fade-in behavior.
func NewPostProcessorWithOptions(o Options) routing.PostProcessor {
return &postProcessor{
detected: make(map[string]detectedFadeIn),
detected: make(map[string]detectedFadeIn),
endpointRegistry: o.EndpointRegistry,
}
}

// Deprecated: use NewPostProcessorWithOptions
RomanZavodskikh marked this conversation as resolved.
Show resolved Hide resolved
func NewPostProcessor() routing.PostProcessor {
return NewPostProcessorWithOptions(Options{})
}

func (p *postProcessor) Do(r []*routing.Route) []*routing.Route {
now := time.Now()

Expand Down Expand Up @@ -232,9 +243,15 @@ func (p *postProcessor) Do(r []*routing.Route) []*routing.Route {
detected := p.detected[key].when
if detected.IsZero() || endpointsCreated[key].After(detected) {
detected = now
if p.endpointRegistry != nil {
p.endpointRegistry.SetDetectedTime(ep.Host, detected)
}
}

ep.Detected = detected
if p.endpointRegistry != nil {
p.endpointRegistry.SetFadeIn(ep.Host, ri.Id, ri.LBFadeInDuration, ri.LBFadeInExponent)
}
p.detected[key] = detectedFadeIn{
when: detected,
duration: ri.LBFadeInDuration,
Expand Down
57 changes: 48 additions & 9 deletions filters/fadein/fadein_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestCreateEndpointCreated(t *testing.T) {
}

func TestPostProcessor(t *testing.T) {
createRouting := func(t *testing.T, routes string) (*routing.Routing, func(string)) {
createRouting := func(t *testing.T, routes string, registry *routing.EndpointRegistry) (*routing.Routing, func(string)) {
dc, err := testdataclient.NewDoc(routes)
if err != nil {
t.Fatal(err)
Expand All @@ -197,7 +197,7 @@ func TestPostProcessor(t *testing.T) {
},
PostProcessors: []routing.PostProcessor{
loadbalancer.NewAlgorithmProvider(),
NewPostProcessor(),
NewPostProcessorWithOptions(Options{EndpointRegistry: registry}),
},
SignalFirstLoad: true,
})
Expand Down Expand Up @@ -234,36 +234,55 @@ func TestPostProcessor(t *testing.T) {
baz: Path("/baz") -> <"http://10.0.1.1:8080", "http://10.0.1.2:8080">
`

rt, _ := createRouting(t, routes)
registry := routing.NewEndpointRegistry(routing.RegistryOptions{})
rt, _ := createRouting(t, routes, registry)

foo := route(rt, "/foo")
if foo == nil || foo.LBFadeInDuration != 0 {
t.Fatal("failed to preserve non-LB route")
}
if registry.GetMetrics("www.example.org:443").FadeInDuration("foo") != 0 {
t.Fatal("failed to preserve non-LB route")
}

bar := route(rt, "/bar")
if bar == nil || bar.LBFadeInDuration != time.Minute {
t.Fatal("failed to postprocess LB route")
}
if registry.GetMetrics("10.0.0.1:8080").FadeInDuration("bar") != time.Minute {
t.Fatal("failed to postprocess LB route")
}
if registry.GetMetrics("10.0.0.2:8080").FadeInDuration("bar") != time.Minute {
t.Fatal("failed to postprocess LB route")
}

for _, ep := range bar.LBEndpoints {
if ep.Detected.IsZero() {
t.Fatal("failed to set detection time")
}
if registry.GetMetrics(ep.Host).DetectedTime().IsZero() {
t.Fatal("failed to set detection time")
}
}

baz := route(rt, "/baz")
if baz == nil || baz.LBFadeInDuration != 0 {
t.Fatal("failed to preserve non-fade LB route")
}
if registry.GetMetrics("10.0.1.1:8080").FadeInDuration("baz") != 0 {
t.Fatal("failed to preserve non-fade LB route")
}
if registry.GetMetrics("10.0.1.2:8080").FadeInDuration("baz") != 0 {
t.Fatal("failed to preserve non-fade LB route")
}
})

t.Run("invalid endpoint address", func(t *testing.T) {
const routes = `
* -> fadeIn("1m") -> <"http://::">
`

rt, _ := createRouting(t, routes)
rt, _ := createRouting(t, routes, nil)
r := route(rt, "/")
if r != nil {
t.Fatal("created invalid LB endpoint")
Expand All @@ -275,19 +294,24 @@ func TestPostProcessor(t *testing.T) {
* -> fadeIn("-1m") -> <"http://10.0.0.1:8080">
`

rt, _ := createRouting(t, routes)
registry := routing.NewEndpointRegistry(routing.RegistryOptions{})
rt, _ := createRouting(t, routes, registry)
r := route(rt, "/")
if r == nil || len(r.LBEndpoints) == 0 || !r.LBEndpoints[0].Detected.IsZero() {
t.Fatal("failed to ignore negative duration")
}
if !registry.GetMetrics("10.0.0.1:8080").DetectedTime().IsZero() {
t.Fatal("failed to ignore negative duration")
}
})

t.Run("endpoint already detected", func(t *testing.T) {
const routes = `
* -> fadeIn("1m") -> <"http://10.0.0.1:8080">
`

rt, update := createRouting(t, routes)
registry := routing.NewEndpointRegistry(routing.RegistryOptions{})
rt, update := createRouting(t, routes, registry)
firstDetected := time.Now()

const nextRoutes = `
Expand All @@ -303,6 +327,9 @@ func TestPostProcessor(t *testing.T) {
if ep.Detected.After(firstDetected) {
t.Fatal("Failed to keep detection time.")
}
if registry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) {
t.Fatal("Failed to keep detection time.")
}

found = true
}
Expand All @@ -318,7 +345,8 @@ func TestPostProcessor(t *testing.T) {
* -> fadeIn("1m") -> <"http://10.0.0.1:8080", "http://10.0.0.2:8080">
`

rt, update := createRouting(t, initialRoutes)
registry := routing.NewEndpointRegistry(routing.RegistryOptions{})
rt, update := createRouting(t, initialRoutes, registry)
firstDetected := time.Now()

const nextRoutes = `
Expand All @@ -336,6 +364,9 @@ func TestPostProcessor(t *testing.T) {
if ep.Detected.After(firstDetected) {
t.Fatal("Failed to keep detection time.")
}
if registry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) {
t.Fatal("Failed to keep detection time.")
}

found = true
}
Expand All @@ -351,7 +382,8 @@ func TestPostProcessor(t *testing.T) {
* -> fadeIn("15ms") -> <"http://10.0.0.1:8080", "http://10.0.0.2:8080">
`

rt, update := createRouting(t, initialRoutes)
registry := routing.NewEndpointRegistry(routing.RegistryOptions{})
rt, update := createRouting(t, initialRoutes, registry)
firstDetected := time.Now()

const nextRoutes = `
Expand All @@ -370,6 +402,9 @@ func TestPostProcessor(t *testing.T) {
if !ep.Detected.After(firstDetected) {
t.Fatal("Failed to clear detection time.")
}
if !registry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) {
t.Fatal("Failed to clear detection time.")
}

found = true
}
Expand All @@ -386,7 +421,8 @@ func TestPostProcessor(t *testing.T) {
`

routes := fmt.Sprintf(routesFmt, nows(t))
rt, update := createRouting(t, routes)
registry := routing.NewEndpointRegistry(routing.RegistryOptions{})
rt, update := createRouting(t, routes, registry)
firstDetected := time.Now()

const nextRoutesFmt = `
Expand All @@ -407,6 +443,9 @@ func TestPostProcessor(t *testing.T) {
if !ep.Detected.After(firstDetected) {
t.Fatal("Failed to reset detection time.")
}
if !registry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) {
t.Fatal("Failed to reset detection time.")
}

found = true
}
Expand Down
2 changes: 2 additions & 0 deletions loadbalancer/fadein_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func initializeEndpoints(endpointAges []time.Duration, fadeInDuration time.Durat
Detected: detectionTimes[i],
})
ctx.Registry.SetDetectedTime(eps[i], detectionTimes[i])
ctx.Registry.SetFadeIn(eps[i], "dummy", fadeInDuration, 1)
}

return ctx, eps
Expand Down Expand Up @@ -332,6 +333,7 @@ func benchmarkFadeIn(
Detected: detectionTimes[i],
})
registry.SetDetectedTime(eps[i], detectionTimes[i])
registry.SetFadeIn(eps[i], "dummy", fadeInDuration, 1)
}

var wg sync.WaitGroup
Expand Down
4 changes: 3 additions & 1 deletion proxy/fadeintesting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func (p *fadeInProxy) addInstances(n int) {
p.mx.Lock()
defer p.mx.Unlock()

endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
for i := 0; i < n; i++ {
client := p.backend.createDataClient()
fr := make(filters.Registry)
Expand All @@ -270,7 +271,8 @@ func (p *fadeInProxy) addInstances(n int) {
DataClients: []routing.DataClient{client},
PostProcessors: []routing.PostProcessor{
loadbalancer.NewAlgorithmProvider(),
fadein.NewPostProcessor(),
endpointRegistry,
fadein.NewPostProcessorWithOptions(fadein.Options{EndpointRegistry: endpointRegistry}),
},
})

Expand Down
31 changes: 30 additions & 1 deletion routing/endpointregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,18 @@ const lastSeenTimeout = 1 * time.Minute
type Metrics interface {
DetectedTime() time.Time
InflightRequests() int64
FadeInDuration(routeId string) time.Duration
}

type entry struct {
detected time.Time
inflightRequests int64
fadeIn map[string]fadeInData
}

type fadeInData struct {
fadeInDuration time.Duration
fadeInExponent float64
}

var _ Metrics = &entry{}
Expand All @@ -31,6 +38,15 @@ func (e *entry) InflightRequests() int64 {
return e.inflightRequests
}

func (e *entry) FadeInDuration(routeId string) time.Duration {
d, ok := e.fadeIn[routeId]
if !ok {
return 0
}

return d.fadeInDuration
}

type EndpointRegistry struct {
lastSeen map[string]time.Time
now func() time.Time
Expand Down Expand Up @@ -90,6 +106,11 @@ func (r *EndpointRegistry) GetMetrics(key string) Metrics {
e := r.getOrInitEntryLocked(key)
copy := &entry{}
*copy = *e

copy.fadeIn = make(map[string]fadeInData)
for k, v := range e.fadeIn {
copy.fadeIn[k] = v
}
return copy
}

Expand Down Expand Up @@ -117,6 +138,14 @@ func (r *EndpointRegistry) DecInflightRequest(key string) {
e.inflightRequests--
}

func (r *EndpointRegistry) SetFadeIn(key string, routeId string, duration time.Duration, exponent float64) {
r.mu.Lock()
defer r.mu.Unlock()

e := r.getOrInitEntryLocked(key)
e.fadeIn[routeId] = fadeInData{fadeInDuration: duration, fadeInExponent: exponent}
}

// getOrInitEntryLocked returns pointer to endpoint registry entry
// which contains the information about endpoint representing the
// following key. r.mu must be held while calling this function and
Expand All @@ -125,7 +154,7 @@ func (r *EndpointRegistry) DecInflightRequest(key string) {
func (r *EndpointRegistry) getOrInitEntryLocked(key string) *entry {
e, ok := r.data[key]
if !ok {
e = &entry{}
e = &entry{fadeIn: map[string]fadeInData{}}
r.data[key] = e
}
return e
Expand Down
2 changes: 1 addition & 1 deletion skipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1900,7 +1900,7 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error {
endpointRegistry,
schedulerRegistry,
builtin.NewRouteCreationMetrics(mtr),
fadein.NewPostProcessor(),
fadein.NewPostProcessorWithOptions(fadein.Options{EndpointRegistry: endpointRegistry}),
admissionControlSpec.PostProcessor(),
},
SignalFirstLoad: o.WaitFirstRouteLoad,
Expand Down