Skip to content

Commit

Permalink
Safe StreamEvents write loop (#14557)
Browse files Browse the repository at this point in the history
* new type for tests where errors are only logged

* StreamHandler waits for write loop exit

* add test case for writer timeout

* add changelog

* add missing file

* logging fix

* fix logging test to allow info logs

* naming/comments; make response controller private

* simplify cancel defers

* fix typo in test file name

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
  • Loading branch information
kasey and kasey authored Oct 24, 2024
1 parent 83ed320 commit 52cf3a1
Show file tree
Hide file tree
Showing 10 changed files with 486 additions and 95 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve

- Fixed mesh size by appending `gParams.Dhi = gossipSubDhi`
- Fix skipping partial withdrawals count.
- wait for the async StreamEvent writer to exit before leaving the http handler, avoiding race condition panics [pr](https://github.com/prysmaticlabs/prysm/pull/14557)

### Security

Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/rpc/eth/events/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "go_default_library",
srcs = [
"events.go",
"log.go",
"server.go",
],
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc/eth/events",
Expand Down Expand Up @@ -58,5 +59,6 @@ go_test(
"//testing/util:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_r3labs_sse_v2//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
148 changes: 111 additions & 37 deletions beacon-chain/rpc/eth/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
log "github.com/sirupsen/logrus"
)

const DefaultEventFeedDepth = 1000
Expand Down Expand Up @@ -74,13 +73,6 @@ var (
errWriterUnusable = errors.New("http response writer is unusable")
)

// StreamingResponseWriter defines a type that can be used by the eventStreamer.
// This must be an http.ResponseWriter that supports flushing and hijacking.
type StreamingResponseWriter interface {
http.ResponseWriter
http.Flusher
}

// The eventStreamer uses lazyReaders to defer serialization until the moment the value is ready to be written to the client.
type lazyReader func() io.Reader

Expand Down Expand Up @@ -150,6 +142,7 @@ func newTopicRequest(topics []string) (*topicRequest, error) {
// Servers may send SSE comments beginning with ':' for any purpose,
// including to keep the event stream connection alive in the presence of proxy servers.
func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) {
log.Debug("Starting StreamEvents handler")
ctx, span := trace.StartSpan(r.Context(), "events.StreamEvents")
defer span.End()

Expand All @@ -159,47 +152,51 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) {
return
}

sw, ok := w.(StreamingResponseWriter)
if !ok {
msg := "beacon node misconfiguration: http stack may not support required response handling features, like flushing"
httputil.HandleError(w, msg, http.StatusInternalServerError)
return
timeout := s.EventWriteTimeout
if timeout == 0 {
timeout = time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
}
depth := s.EventFeedDepth
if depth == 0 {
depth = DefaultEventFeedDepth
ka := s.KeepAliveInterval
if ka == 0 {
ka = timeout
}
es, err := newEventStreamer(depth, s.KeepAliveInterval)
if err != nil {
httputil.HandleError(w, err.Error(), http.StatusInternalServerError)
return
buffSize := s.EventFeedDepth
if buffSize == 0 {
buffSize = DefaultEventFeedDepth
}

api.SetSSEHeaders(w)
sw := newStreamingResponseController(w, timeout)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
api.SetSSEHeaders(sw)
es := newEventStreamer(buffSize, ka)

go es.outboxWriteLoop(ctx, cancel, sw)
if err := es.recvEventLoop(ctx, cancel, topics, s); err != nil {
log.WithError(err).Debug("Shutting down StreamEvents handler.")
}
cleanupStart := time.Now()
es.waitForExit()
log.WithField("cleanup_wait", time.Since(cleanupStart)).Debug("streamEvents shutdown complete")
}

func newEventStreamer(buffSize int, ka time.Duration) (*eventStreamer, error) {
if ka == 0 {
ka = time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
}
func newEventStreamer(buffSize int, ka time.Duration) *eventStreamer {
return &eventStreamer{
outbox: make(chan lazyReader, buffSize),
keepAlive: ka,
}, nil
outbox: make(chan lazyReader, buffSize),
keepAlive: ka,
openUntilExit: make(chan struct{}),
}
}

type eventStreamer struct {
outbox chan lazyReader
keepAlive time.Duration
outbox chan lazyReader
keepAlive time.Duration
openUntilExit chan struct{}
}

func (es *eventStreamer) recvEventLoop(ctx context.Context, cancel context.CancelFunc, req *topicRequest, s *Server) error {
defer close(es.outbox)
defer cancel()
eventsChan := make(chan *feed.Event, len(es.outbox))
if req.needOpsFeed {
opsSub := s.OperationNotifier.OperationFeed().Subscribe(eventsChan)
Expand Down Expand Up @@ -228,7 +225,6 @@ func (es *eventStreamer) recvEventLoop(ctx context.Context, cancel context.Cance
// channel should stay relatively empty, which gives this loop time to unsubscribe
// and cleanup before the event stream channel fills and disrupts other readers.
if err := es.safeWrite(ctx, lr); err != nil {
cancel()
// note: we could hijack the connection and close it here. Does that cause issues? What are the benefits?
// A benefit of hijack and close is that it may force an error on the remote end, however just closing the context of the
// http handler may be sufficient to cause the remote http response reader to close.
Expand Down Expand Up @@ -265,12 +261,13 @@ func newlineReader() io.Reader {

// outboxWriteLoop runs in a separate goroutine. Its job is to write the values in the outbox to
// the client as fast as the client can read them.
func (es *eventStreamer) outboxWriteLoop(ctx context.Context, cancel context.CancelFunc, w StreamingResponseWriter) {
func (es *eventStreamer) outboxWriteLoop(ctx context.Context, cancel context.CancelFunc, w *streamingResponseWriterController) {
var err error
defer func() {
if err != nil {
log.WithError(err).Debug("Event streamer shutting down due to error.")
}
es.exit()
}()
defer cancel()
// Write a keepalive at the start to test the connection and simplify test setup.
Expand Down Expand Up @@ -310,18 +307,43 @@ func (es *eventStreamer) outboxWriteLoop(ctx context.Context, cancel context.Can
}
}

func writeLazyReaderWithRecover(w StreamingResponseWriter, lr lazyReader) (err error) {
func (es *eventStreamer) exit() {
drained := 0
for range es.outbox {
drained += 1
}
log.WithField("undelivered_events", drained).Debug("Event stream outbox drained.")
close(es.openUntilExit)
}

// waitForExit blocks until the outboxWriteLoop has exited.
// While this function blocks, it is not yet safe to exit the http handler,
// because the outboxWriteLoop may still be writing to the http ResponseWriter.
func (es *eventStreamer) waitForExit() {
<-es.openUntilExit
}

func writeLazyReaderWithRecover(w *streamingResponseWriterController, lr lazyReader) (err error) {
defer func() {
if r := recover(); r != nil {
log.WithField("panic", r).Error("Recovered from panic while writing event to client.")
err = errWriterUnusable
}
}()
_, err = io.Copy(w, lr())
r := lr()
out, err := io.ReadAll(r)
if err != nil {
return err
}
_, err = w.Write(out)
return err
}

func (es *eventStreamer) writeOutbox(ctx context.Context, w StreamingResponseWriter, first lazyReader) error {
func (es *eventStreamer) writeOutbox(ctx context.Context, w *streamingResponseWriterController, first lazyReader) error {
// The outboxWriteLoop is responsible for managing the keep-alive timer and toggling between reading from the outbox
// when it is ready, only allowing the keep-alive to fire when there hasn't been a write in the keep-alive interval.
// Since outboxWriteLoop will get either the first event or the keep-alive, we let it pass in the first event to write,
// either the event's lazyReader, or nil for a keep-alive.
needKeepAlive := true
if first != nil {
if err := writeLazyReaderWithRecover(w, first); err != nil {
Expand All @@ -337,6 +359,11 @@ func (es *eventStreamer) writeOutbox(ctx context.Context, w StreamingResponseWri
case <-ctx.Done():
return ctx.Err()
case rf := <-es.outbox:
// We don't want to call Flush until we've exhausted all the writes - it's always preferrable to
// just keep draining the outbox and rely on the underlying Write code to flush+block when it
// needs to based on buffering. Whenever we fill the buffer with a string of writes, the underlying
// code will flush on its own, so it's better to explicitly flush only once, after we've totally
// drained the outbox, to catch any dangling bytes stuck in a buffer.
if err := writeLazyReaderWithRecover(w, rf); err != nil {
return err
}
Expand All @@ -347,8 +374,7 @@ func (es *eventStreamer) writeOutbox(ctx context.Context, w StreamingResponseWri
return err
}
}
w.Flush()
return nil
return w.Flush()
}
}
}
Expand Down Expand Up @@ -638,3 +664,51 @@ func (s *Server) currentPayloadAttributes(ctx context.Context) (lazyReader, erro
})
}, nil
}

func newStreamingResponseController(rw http.ResponseWriter, timeout time.Duration) *streamingResponseWriterController {
rc := http.NewResponseController(rw)
return &streamingResponseWriterController{
timeout: timeout,
rw: rw,
rc: rc,
}
}

// streamingResponseWriterController provides an interface similar to an http.ResponseWriter,
// wrapping an http.ResponseWriter and an http.ResponseController, using the ResponseController
// to set and clear deadlines for Write and Flush methods, and delegating to the underlying
// types to Write and Flush.
type streamingResponseWriterController struct {
timeout time.Duration
rw http.ResponseWriter
rc *http.ResponseController
}

func (c *streamingResponseWriterController) Write(b []byte) (int, error) {
if err := c.setDeadline(); err != nil {
return 0, err
}
out, err := c.rw.Write(b)
if err != nil {
return out, err
}
return out, c.clearDeadline()
}

func (c *streamingResponseWriterController) setDeadline() error {
return c.rc.SetWriteDeadline(time.Now().Add(c.timeout))
}

func (c *streamingResponseWriterController) clearDeadline() error {
return c.rc.SetWriteDeadline(time.Time{})
}

func (c *streamingResponseWriterController) Flush() error {
if err := c.setDeadline(); err != nil {
return err
}
if err := c.rc.Flush(); err != nil {
return err
}
return c.clearDeadline()
}
Loading

0 comments on commit 52cf3a1

Please sign in to comment.