Skip to content

Commit

Permalink
WIP: SSE client implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Lennart Elsen committed Jun 25, 2024
1 parent d8af524 commit 0340ff2
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 4 deletions.
48 changes: 47 additions & 1 deletion pkg/api/globalquery/client/client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package client

import (
"bytes"
"context"
"net/http"

"github.com/els0r/goProbe/pkg/api"
"github.com/els0r/goProbe/pkg/api/client"
"github.com/els0r/goProbe/pkg/query"
"github.com/els0r/goProbe/pkg/results"
"github.com/fako1024/httpc"
jsoniter "github.com/json-iterator/go"
)

// Client denotes a global query client
Expand Down Expand Up @@ -47,7 +50,7 @@ func (c *Client) Query(ctx context.Context, args *query.Args) (*results.Result,
var res = new(results.Result)

req := c.Modify(ctx,
httpc.NewWithClient("POST", c.NewURL(api.QueryRoute), c.Client()).
httpc.NewWithClient(http.MethodPost, c.NewURL(api.QueryRoute), c.Client()).
EncodeJSON(queryArgs).
ParseJSON(res),
)
Expand All @@ -59,3 +62,46 @@ func (c *Client) Query(ctx context.Context, args *query.Args) (*results.Result,

return res, nil
}

// QuerySSE performs the global query and returns its result, while consuming updates
// to partial results
func (c *Client) QuerySSE(ctx context.Context, args *query.Args, onUpdate, onFinish func(*results.Result) error) (*results.Result, error) {
// use a copy of the arguments, since some fields are modified by the client
queryArgs := *args

// whatever happens, the results are expected to be returned in json
queryArgs.Format = "json"

if queryArgs.Caller == "" {
queryArgs.Caller = clientName
}

var res = new(results.Result)

buf := &bytes.Buffer{}

err := jsoniter.NewEncoder(buf).Encode(queryArgs)
if err != nil {
return nil, err
}

streamCtx, cancel := context.WithCancel(ctx)

Check failure on line 88 in pkg/api/globalquery/client/client.go

View workflow job for this annotation

GitHub Actions / Build / Test on Linux

streamCtx declared and not used

Check failure on line 88 in pkg/api/globalquery/client/client.go

View workflow job for this annotation

GitHub Actions / Build / Test on Linux

streamCtx declared and not used
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.NewURL(api.SSEQueryRoute), buf)
if err != nil {
return nil, err
}
req.Header.Set("Cache-Control", "no-cache")
req.Header.Set("Accept", "text/event-stream")
req.Header.Set("Connection", "keep-alive")

resp, err := c.Client().Do(req)

Check failure on line 99 in pkg/api/globalquery/client/client.go

View workflow job for this annotation

GitHub Actions / Build / Test on Linux

resp declared and not used

Check failure on line 99 in pkg/api/globalquery/client/client.go

View workflow job for this annotation

GitHub Actions / Build / Test on Linux

resp declared and not used
if err != nil {
return nil, err
}

// parse events

return res, nil
}
23 changes: 23 additions & 0 deletions pkg/api/globalquery/client/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package client

import (
"bufio"
"io"
)

type eventStream struct {
scanner *bufio.Scanner
wordScanner *bufio.Scanner
bufSize int
}

func newEventStream(r io.Reader) *eventStream {
s := bufio.NewScanner(r)
wordScanner := bufio.NewScanner(r)
return &eventStream{scanner: s, wordScanner: wordScanner}
}

func (e *eventStream) setBuffer(size int) *eventStream {
e.scanner.Buffer(make([]byte, size), bufio.MaxScanTokenSize)
return e
}
16 changes: 13 additions & 3 deletions pkg/api/query_api_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,24 @@ func registerDistributedQueryAPI(a huma.API, caller string, qr *distributed.Quer
Tags: queryTags,
},
map[string]any{
"queryError": &query.DetailError{},
"partialResult": &PartialResult{},
"finalResult": &FinalResult{},
string(StreamEventQueryError): &query.DetailError{},
string(StreamEventPartialResult): &PartialResult{},
string(StreamEventFinalResult): &FinalResult{},
},
getSSEBodyQueryRunnerHandler(caller, qr),
)
}

// StreamEventType describes the type of server sent event
type StreamEventType string

// Different event types that the query server sends
const (
StreamEventQueryError StreamEventType = "queryError"
StreamEventPartialResult StreamEventType = "partialResult"
StreamEventFinalResult StreamEventType = "finalResult"
)

// ArgsBodyInput stores the query args to be validated in the body
type ArgsInput struct {
Body *query.Args
Expand Down

0 comments on commit 0340ff2

Please sign in to comment.