Skip to content

Commit

Permalink
fix: send periodic keepalive packets on eventstream connections
Browse files Browse the repository at this point in the history
When Argo is operating behind load balancers / ingress controllers that have
an idle timeout configured, it's not uncommon to get disconnected and have
an error shown in the UI if you're looking at a relatively inactive workflow
or workflow list.

In the SSE spec, `:\n` is a sequence that you can send to the client which
should be ignored by the client, so we can use that to periodically send
something in the response without affecting the code in the UI at all.

Fixes argoproj#5006

Signed-off-by: Daniel Herman <dherman@factset.com>
  • Loading branch information
dcherman committed Feb 12, 2021
1 parent 75f08e2 commit 8a1a448
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 121 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/emicklei/go-restful v2.15.0+incompatible // indirect
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/fatih/structs v1.1.0 // indirect
github.com/felixge/httpsnoop v1.0.1
github.com/gavv/httpexpect/v2 v2.0.3
github.com/go-openapi/jsonreference v0.19.5
github.com/go-openapi/spec v0.20.0
Expand Down
122 changes: 2 additions & 120 deletions go.sum

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net"
"net/http"
"os"
"time"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
Expand Down Expand Up @@ -303,7 +304,15 @@ func (as *argoServer) newHTTPServer(ctx context.Context, port int, artifactServe
mustRegisterGWHandler(workflowarchivepkg.RegisterArchivedWorkflowServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dialOpts)
mustRegisterGWHandler(clusterwftemplatepkg.RegisterClusterWorkflowTemplateServiceHandlerFromEndpoint, ctx, gwmux, endpoint, dialOpts)

mux.HandleFunc("/api/", func(w http.ResponseWriter, r *http.Request) { webhookInterceptor(w, r, gwmux) })
apiHandler := http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
webhookInterceptor(rw, r, gwmux)
})

if os.Getenv("ENABLE_SSE_KEEPALIVE") != "" {
apiHandler = serverSentEventKeepaliveMiddleware(apiHandler, time.Second*15)
}

mux.HandleFunc("/api/", apiHandler)
mux.HandleFunc("/artifacts/", artifactServer.GetArtifact)
mux.HandleFunc("/artifacts-by-uid/", artifactServer.GetArtifactByUID)
mux.HandleFunc("/oauth2/redirect", as.oAuth2Service.HandleRedirect)
Expand Down
86 changes: 86 additions & 0 deletions server/apiserver/keepalive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package apiserver

import (
"net/http"
"sync"
"time"

"github.com/argoproj/argo-workflows/v3/util/ticker"
"github.com/felixge/httpsnoop"
)

func isTextStreamRequest(r *http.Request) bool {
// We don't seem to be able to access the headers that are sent out in the response,
// so we're going to detect an SSE stream by looking at the Accept header instead
// and ensuring that it's the only valid response type accepted
acceptHeader, ok := r.Header["Accept"]
return ok && len(acceptHeader) == 1 && acceptHeader[0] == "text/event-stream"
}

type tickerFactoryFn func(time.Duration) ticker.Ticker

func serverSentEventKeepaliveMiddleware(next http.Handler, keepaliveInterval time.Duration) http.HandlerFunc {
return serverSentEventKeepaliveMiddlewareAux(next, keepaliveInterval, nil, func(d time.Duration) ticker.Ticker {
return ticker.NewTicker(d)
})
}

func serverSentEventKeepaliveMiddlewareAux(next http.Handler, keepaliveInterval time.Duration, wg *sync.WaitGroup, tickerFactory tickerFactoryFn) http.HandlerFunc {
return func(wr http.ResponseWriter, r *http.Request) {
if !isTextStreamRequest(r) {
next.ServeHTTP(wr, r)
return
}

ticker := tickerFactory(keepaliveInterval)
stopCh := r.Context().Done()

var writeLock sync.Mutex

writeKeepalive := func() {
writeLock.Lock()
defer writeLock.Unlock()

// Per https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation,
// lines that start with a `:` must be ignored by the client.
wr.Write([]byte(":\n"))

if f, ok := wr.(http.Flusher); ok {
f.Flush()
}

// The waitgroup is purely intended for unit tests and is always nil in production use cases
if wg != nil {
wg.Done()
}
}

go func() {
defer ticker.Stop()

for {
select {
case <-stopCh:
return

case <-ticker.C():
writeKeepalive()
}
}
}()

wrappedWr := httpsnoop.Wrap(wr, httpsnoop.Hooks{
Write: func(next httpsnoop.WriteFunc) httpsnoop.WriteFunc {
return func(p []byte) (int, error) {
writeLock.Lock()
defer writeLock.Unlock()

ticker.Reset(keepaliveInterval)
return next(p)
}
},
})

next.ServeHTTP(wrappedWr, r)
}
}
118 changes: 118 additions & 0 deletions server/apiserver/keepalive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package apiserver

import (
"context"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/argoproj/argo-workflows/v3/util/ticker"
"github.com/stretchr/testify/assert"
)

type fakeTicker struct {
c chan time.Time
resetCalls int
}

func (ft *fakeTicker) Stop() {
}

func (ft *fakeTicker) Reset(time.Duration) {
ft.resetCalls++
}

func (ft *fakeTicker) C() <-chan time.Time {
return ft.c
}

func (ft *fakeTicker) tick() {
ft.c <- time.Now()
}

func newFakeTicker(time.Duration) *fakeTicker {
return &fakeTicker{
c: make(chan time.Time, 1),
resetCalls: 0,
}
}

func Test_serverSentEventKeepaliveMiddleware(t *testing.T) {
rr := httptest.NewRecorder()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

req, err := http.NewRequestWithContext(ctx, "GET", "/api/workflows", nil)
req.Header["Accept"] = []string{"text/event-stream"}

if !assert.Nil(t, err) {
return
}

var wrapped http.ResponseWriter

handler := func(rw http.ResponseWriter, r *http.Request) {
wrapped = rw
}

ft := newFakeTicker(time.Second * 1)

var wg sync.WaitGroup

mw := serverSentEventKeepaliveMiddlewareAux(http.HandlerFunc(handler), time.Second, &wg, func(time.Duration) ticker.Ticker {
return ft
})

mw(rr, req)

wg.Add(1)
ft.tick()
wg.Wait()

wrapped.Write([]byte("data: 1\n"))
assert.Equal(t, 1, ft.resetCalls)
wrapped.Write([]byte("data: 1\n"))
assert.Equal(t, 2, ft.resetCalls)

wg.Add(1)
ft.tick()
wg.Wait()

assert.Equal(t, 2, ft.resetCalls)
assert.Equal(t, ":\ndata: 1\ndata: 1\n:\n", string(rr.Body.Bytes()))
}

func Test_serverSentEventKeepaliveMiddleware_NonEventstream(t *testing.T) {
rr := httptest.NewRecorder()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

req, err := http.NewRequestWithContext(ctx, "GET", "/api/workflows", nil)
req.Header["Accept"] = []string{"text/plain"}

if !assert.Nil(t, err) {
return
}

var wrapped http.ResponseWriter

handler := func(rw http.ResponseWriter, r *http.Request) {
wrapped = rw
}

ft := newFakeTicker(time.Second * 1)
mw := serverSentEventKeepaliveMiddlewareAux(http.HandlerFunc(handler), time.Second, nil, func(time.Duration) ticker.Ticker {
return ft
})

mw(rr, req)

ft.tick()

wrapped.Write([]byte("foobar"))

assert.Equal(t, 0, ft.resetCalls)
assert.Equal(t, "foobar", string(rr.Body.Bytes()))
}
33 changes: 33 additions & 0 deletions util/ticker/ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package ticker

import (
"time"
)

type Ticker interface {
C() <-chan time.Time
Stop()
Reset(time.Duration)
}

type realTicker struct {
t *time.Ticker
}

func (rt *realTicker) Stop() {
rt.t.Stop()
}

func (rt *realTicker) Reset(d time.Duration) {
rt.t.Reset(d)
}

func (rt *realTicker) C() <-chan time.Time {
return rt.t.C
}

func NewTicker(d time.Duration) Ticker {
return &realTicker{
t: time.NewTicker(d),
}
}

0 comments on commit 8a1a448

Please sign in to comment.