Skip to content

Commit

Permalink
SSE support for global-query (server implementation)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lennart Elsen committed Jun 24, 2024
1 parent fc4b41a commit d8af524
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 2 deletions.
23 changes: 21 additions & 2 deletions cmd/global-query/pkg/distributed/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
type QueryRunner struct {
resolver hosts.Resolver
querier Querier

onResult func(*results.Result) error
}

// QueryOption configures the query runner
Expand All @@ -37,6 +39,18 @@ func NewQueryRunner(resolver hosts.Resolver, querier Querier, opts ...QueryOptio
return
}

// SetResultReceivedFn registers a callback to be executed for every results that is
// read off the results channel
func (q *QueryRunner) SetResultReceivedFn(f func(*results.Result) error) *QueryRunner {
q.onResult = f
return q
}

// ResultReceived calls the result callback with res and
func (q *QueryRunner) ResultReceived(res *results.Result) error {
return q.onResult(res)
}

// Run executes / runs the query and creates the final result structure
func (q *QueryRunner) Run(ctx context.Context, args *query.Args) (*results.Result, error) {
// use a copy of the arguments, since some fields are modified by the querier
Expand Down Expand Up @@ -74,7 +88,7 @@ func (q *QueryRunner) Run(ctx context.Context, args *query.Args) (*results.Resul
logger.Info("reading query results from querier")

finalResult := aggregateResults(ctx, stmt,
q.querier.Query(ctx, hostList, &queryArgs),
q.querier.Query(ctx, hostList, &queryArgs), q.onResult,
)

finalResult.End()
Expand Down Expand Up @@ -115,7 +129,7 @@ func (q *QueryRunner) prepareHostList(ctx context.Context, queryHosts string) (h

// aggregateResults takes finished query workloads from the workloads channel, aggregates the result by merging the rows and summaries,
// and returns the final result. The `tracker` variable provides information about potential Run failures for individual hosts
func aggregateResults(ctx context.Context, stmt *query.Statement, queryResults <-chan *results.Result) (finalResult *results.Result) {
func aggregateResults(ctx context.Context, stmt *query.Statement, queryResults <-chan *results.Result, onResult func(*results.Result) error) (finalResult *results.Result) {
ctx, span := tracing.Start(ctx, "aggregateResults")
defer span.End()

Expand Down Expand Up @@ -193,6 +207,11 @@ func aggregateResults(ctx context.Context, stmt *query.Statement, queryResults <
// take the total from the query result. Since there may be overlap between the queries of two
// different systems, the overlap has to be deducted from the total
finalResult.Summary.Hits.Total += res.Summary.Hits.Total - merged

err := onResult(finalResult)
if err != nil {
logger.With("error", err).Error("failed to call results callback")
}
}
}
}
3 changes: 3 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ const (

// ValidationRoute is the route to validate a goquery query
ValidationRoute = QueryRoute + "/validate"

// SSEQueryRoute runs a goquery query with a return channel for partial results
SSEQueryRoute = QueryRoute + "/sse"
)
17 changes: 17 additions & 0 deletions pkg/api/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package api
import (
"context"

"github.com/danielgtaylor/huma/v2/sse"
"github.com/els0r/goProbe/cmd/global-query/pkg/distributed"
"github.com/els0r/goProbe/pkg/query"
"github.com/els0r/goProbe/pkg/results"
"github.com/els0r/telemetry/logging"
Expand All @@ -22,6 +24,21 @@ func getBodyQueryRunnerHandler(caller string, querier query.Runner) func(context
}
}

func getSSEBodyQueryRunnerHandler(caller string, querier *distributed.QueryRunner) func(context.Context, *ArgsInput, sse.Sender) {
return func(ctx context.Context, input *ArgsInput, send sse.Sender) {
querier.SetResultReceivedFn(func(res *results.Result) error {
return send.Data(&PartialResult{res})
})

res, err := runQuery(ctx, caller, input.Body, querier)
if err != nil {
send.Data(err)
return
}
_ = send.Data(&FinalResult{res})
}
}

func runQuery(ctx context.Context, caller string, args *query.Args, querier query.Runner) (*results.Result, error) {
// make sure all defaults are available if they weren't set explicitly
args.SetDefaults()
Expand Down
38 changes: 38 additions & 0 deletions pkg/api/query_api_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"net/http"

"github.com/danielgtaylor/huma/v2"
"github.com/danielgtaylor/huma/v2/sse"
"github.com/els0r/goProbe/cmd/global-query/pkg/distributed"
"github.com/els0r/goProbe/pkg/query"
"github.com/els0r/goProbe/pkg/results"
)
Expand Down Expand Up @@ -48,6 +50,32 @@ func RegisterQueryAPI(a huma.API, caller string, querier query.Runner, middlewar
},
getBodyQueryRunnerHandler(caller, querier),
)

// register routes specific to distributed querying
dqr, ok := querier.(*distributed.QueryRunner)
if ok {
registerDistributedQueryAPI(a, caller, dqr, middlewares)
}
}

func registerDistributedQueryAPI(a huma.API, caller string, qr *distributed.QueryRunner, middlewares huma.Middlewares) {
sse.Register(a,
huma.Operation{
OperationID: "query-post-run-sse",
Method: http.MethodPost,
Path: SSEQueryRoute,
Summary: "Run query with server sent events (SSE)",
Description: "Runs a query based on the parameters provided in the body. Pushes back partial results via SSE",
Middlewares: middlewares,
Tags: queryTags,
},
map[string]any{
"queryError": &query.DetailError{},
"partialResult": &PartialResult{},
"finalResult": &FinalResult{},
},
getSSEBodyQueryRunnerHandler(caller, qr),
)
}

// ArgsBodyInput stores the query args to be validated in the body
Expand All @@ -66,3 +94,13 @@ type ArgsParamsInput struct {
type QueryResultOutput struct {
Body *results.Result
}

// PartialResult represents an update to the results structure. It SHOULD only be used if the
// results.Result object will be further modified / aggregated. This data structure is relevant
// only in the context of SSE
type PartialResult struct{ *results.Result }

// FinalResult represents the result which is sent after all aggregation of partial results has
// completed. It SHOULD only be sent at the end of a streaming operation. This data structure is relevant
// only in the context of SSE
type FinalResult struct{ *results.Result }

0 comments on commit d8af524

Please sign in to comment.