Skip to content

Commit

Permalink
fix: handle backpressure correctly in event stream
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Apr 12, 2024
1 parent 834b74f commit 4e2bf1f
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
30 changes: 21 additions & 9 deletions internal/api/backresthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,10 @@ func (s *BackrestHandler) ListSnapshotFiles(ctx context.Context, req *connect.Re

// GetOperationEvents implements GET /v1/events/operations
func (s *BackrestHandler) GetOperationEvents(ctx context.Context, req *connect.Request[emptypb.Empty], resp *connect.ServerStream[v1.OperationEvent]) error {
errorChan := make(chan error)
defer close(errorChan)

errChan := make(chan error, 1)
events := make(chan *v1.OperationEvent, 100)

callback := func(oldOp *v1.Operation, newOp *v1.Operation) {
var event *v1.OperationEvent
if oldOp == nil && newOp != nil {
Expand All @@ -212,17 +214,27 @@ func (s *BackrestHandler) GetOperationEvents(ctx context.Context, req *connect.R
return
}

if err := resp.Send(event); err != nil {
errorChan <- fmt.Errorf("failed to send event: %w", err)
select {
case events <- event:
default:
errChan <- errors.New("event buffer overflow, closing stream for client retry and catchup")
}
}
s.oplog.Subscribe(&callback)
defer s.oplog.Unsubscribe(&callback)
select {
case <-ctx.Done():
return nil
case err := <-errorChan:
return err

for {
select {
case err := <-errChan:
return err
case <-ctx.Done():
return nil
case event := <-events:
zap.S().Infof("sending event %v", event)
if err := resp.Send(event); err != nil {
return fmt.Errorf("failed to write event: %w", err)
}
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions proto/v1/hostinfo.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
syntax = "proto3";

package v1;

0 comments on commit 4e2bf1f

Please sign in to comment.