diff --git a/pkg/api/globalquery/client/client.go b/pkg/api/globalquery/client/client.go index 28377608..237ff0f8 100644 --- a/pkg/api/globalquery/client/client.go +++ b/pkg/api/globalquery/client/client.go @@ -1,13 +1,16 @@ package client import ( + "bytes" "context" + "net/http" "github.com/els0r/goProbe/pkg/api" "github.com/els0r/goProbe/pkg/api/client" "github.com/els0r/goProbe/pkg/query" "github.com/els0r/goProbe/pkg/results" "github.com/fako1024/httpc" + jsoniter "github.com/json-iterator/go" ) // Client denotes a global query client @@ -47,7 +50,7 @@ func (c *Client) Query(ctx context.Context, args *query.Args) (*results.Result, var res = new(results.Result) req := c.Modify(ctx, - httpc.NewWithClient("POST", c.NewURL(api.QueryRoute), c.Client()). + httpc.NewWithClient(http.MethodPost, c.NewURL(api.QueryRoute), c.Client()). EncodeJSON(queryArgs). ParseJSON(res), ) @@ -59,3 +62,46 @@ func (c *Client) Query(ctx context.Context, args *query.Args) (*results.Result, return res, nil } + +// QuerySSE performs the global query and returns its result, while consuming updates +// to partial results +func (c *Client) QuerySSE(ctx context.Context, args *query.Args, onUpdate, onFinish func(*results.Result) error) (*results.Result, error) { + // use a copy of the arguments, since some fields are modified by the client + queryArgs := *args + + // whatever happens, the results are expected to be returned in json + queryArgs.Format = "json" + + if queryArgs.Caller == "" { + queryArgs.Caller = clientName + } + + var res = new(results.Result) + + buf := &bytes.Buffer{} + + err := jsoniter.NewEncoder(buf).Encode(queryArgs) + if err != nil { + return nil, err + } + + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.NewURL(api.SSEQueryRoute), buf) + if err != nil { + return nil, err + } + req.Header.Set("Cache-Control", "no-cache") + req.Header.Set("Accept", "text/event-stream") + req.Header.Set("Connection", "keep-alive") + + resp, err := c.Client().Do(req) + if err != nil { + return nil, err + } + + // parse events + + return res, nil +} diff --git a/pkg/api/globalquery/client/stream.go b/pkg/api/globalquery/client/stream.go new file mode 100644 index 00000000..94b4289c --- /dev/null +++ b/pkg/api/globalquery/client/stream.go @@ -0,0 +1,23 @@ +package client + +import ( + "bufio" + "io" +) + +type eventStream struct { + scanner *bufio.Scanner + wordScanner *bufio.Scanner + bufSize int +} + +func newEventStream(r io.Reader) *eventStream { + s := bufio.NewScanner(r) + wordScanner := bufio.NewScanner(r) + return &eventStream{scanner: s, wordScanner: wordScanner} +} + +func (e *eventStream) setBuffer(size int) *eventStream { + e.scanner.Buffer(make([]byte, size), bufio.MaxScanTokenSize) + return e +} diff --git a/pkg/api/query_api_ops.go b/pkg/api/query_api_ops.go index 3bfd3bae..61efde5b 100644 --- a/pkg/api/query_api_ops.go +++ b/pkg/api/query_api_ops.go @@ -70,14 +70,24 @@ func registerDistributedQueryAPI(a huma.API, caller string, qr *distributed.Quer Tags: queryTags, }, map[string]any{ - "queryError": &query.DetailError{}, - "partialResult": &PartialResult{}, - "finalResult": &FinalResult{}, + string(StreamEventQueryError): &query.DetailError{}, + string(StreamEventPartialResult): &PartialResult{}, + string(StreamEventFinalResult): &FinalResult{}, }, getSSEBodyQueryRunnerHandler(caller, qr), ) } +// StreamEventType describes the type of server sent event +type StreamEventType string + +// Different event types that the query server sends +const ( + StreamEventQueryError StreamEventType = "queryError" + StreamEventPartialResult StreamEventType = "partialResult" + StreamEventFinalResult StreamEventType = "finalResult" +) + // ArgsBodyInput stores the query args to be validated in the body type ArgsInput struct { Body *query.Args