Skip to content

Commit

Permalink
Merge pull request #337 from els0r/332-support-for-server-sent-events…
Browse files Browse the repository at this point in the history
…-sse

[feature] Support for server sent events (SSE) in global-query API
  • Loading branch information
fako1024 authored Jul 18, 2024
2 parents 4e21ce4 + 0857582 commit 552489b
Show file tree
Hide file tree
Showing 20 changed files with 6,255 additions and 5,839 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ go.work.sum
.db/
tmp/
bin/
goProbe
goQuery
gpctl
global-query
35 changes: 33 additions & 2 deletions cmd/global-query/pkg/distributed/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (
type QueryRunner struct {
resolver hosts.Resolver
querier Querier

// onResult provides an event-based callback function to be executed upon reception
// of a new result from one of the queried hosts (allowing for dynamic / iterative
// handling of said results)
onResult func(*results.Result) error
}

// QueryOption configures the query runner
Expand All @@ -37,6 +42,21 @@ func NewQueryRunner(resolver hosts.Resolver, querier Querier, opts ...QueryOptio
return
}

// SetResultReceivedFn registers a callback to be executed for every results.Result that is
// read off the results channel
func (q *QueryRunner) SetResultReceivedFn(f func(*results.Result) error) *QueryRunner {
q.onResult = f
return q
}

// ResultReceived calls the result callback with res and
func (q *QueryRunner) ResultReceived(res *results.Result) error {
if q.onResult == nil {
return errors.New("no event callback provided (onResult)")
}
return q.onResult(res)
}

// Run executes / runs the query and creates the final result structure
func (q *QueryRunner) Run(ctx context.Context, args *query.Args) (*results.Result, error) {
// use a copy of the arguments, since some fields are modified by the querier
Expand Down Expand Up @@ -74,7 +94,7 @@ func (q *QueryRunner) Run(ctx context.Context, args *query.Args) (*results.Resul
logger.Info("reading query results from querier")

finalResult := aggregateResults(ctx, stmt,
q.querier.Query(ctx, hostList, &queryArgs),
q.querier.Query(ctx, hostList, &queryArgs), q.onResult,
)

finalResult.End()
Expand Down Expand Up @@ -115,7 +135,7 @@ func (q *QueryRunner) prepareHostList(ctx context.Context, queryHosts string) (h

// aggregateResults takes finished query workloads from the workloads channel, aggregates the result by merging the rows and summaries,
// and returns the final result. The `tracker` variable provides information about potential Run failures for individual hosts
func aggregateResults(ctx context.Context, stmt *query.Statement, queryResults <-chan *results.Result) (finalResult *results.Result) {
func aggregateResults(ctx context.Context, stmt *query.Statement, queryResults <-chan *results.Result, onResult func(*results.Result) error) (finalResult *results.Result) {
ctx, span := tracing.Start(ctx, "aggregateResults")
defer span.End()

Expand Down Expand Up @@ -193,6 +213,17 @@ func aggregateResults(ctx context.Context, stmt *query.Statement, queryResults <
// take the total from the query result. Since there may be overlap between the queries of two
// different systems, the overlap has to be deducted from the total
finalResult.Summary.Hits.Total += res.Summary.Hits.Total - merged

if onResult != nil {
// make sure the rows are set for the results callback
if len(rowMap) > 0 {
finalResult.Rows = rowMap.ToRowsSorted(results.By(stmt.SortBy, stmt.Direction, stmt.SortAscending))
}
err := onResult(finalResult)
if err != nil {
logger.With("error", err).Error("failed to call results callback")
}
}
}
}
}
31 changes: 27 additions & 4 deletions cmd/goQuery/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ goProbe.
pflags.String(conf.QueryLog, "", "Log query invocations to file\n")
pflags.DurationP(conf.QueryKeepAlive, "k", 0, "Interval to emit log messages showing that query processing is still ongoing\n")
pflags.Bool(conf.QueryStats, false, "Print query DB interaction statistics\n")
pflags.Bool(conf.QueryStreaming, false, "Stream results instead of waiting for the final result from a distributed query\n")

pflags.String(conf.LogLevel, logging.LevelWarn.String(), "log level (debug, info, warn, error, fatal, panic)")

Expand All @@ -203,7 +204,7 @@ func initLogger() {
opts := []logging.Option{
logging.WithVersion(version.Short()),
}
if cmdLineParams.Format == "json" {
if cmdLineParams.Format == types.FormatJSON {
format = logging.EncodingJSON
}
opts = append(opts, logging.WithOutput(os.Stdout), logging.WithErrorOutput(os.Stderr))
Expand Down Expand Up @@ -342,14 +343,36 @@ func entrypoint(cmd *cobra.Command, args []string) (err error) {
// make sure that the hostname is present in the query type (and therefore output)
// The assumption being that a human will have better knowledge
// of hostnames than of their ID counterparts
if queryArgs.Format == "txt" {
if queryArgs.Format == types.FormatTXT {
if !strings.Contains(queryArgs.Query, types.HostnameName) {
queryArgs.Query += types.AttrSep + types.HostnameName
}
}

// query using query server
querier = gqclient.New(viper.GetString(conf.QueryServerAddr))
if viper.GetBool(conf.QueryStreaming) {
logger.Info("calling streaming API")

querier = gqclient.NewSSE(viper.GetString(conf.QueryServerAddr),

// TODO: this will become more informational in the future as in: printing partial results, etc.
func(ctx context.Context, r *results.Result) error {
if r == nil {
return nil
}
all := len(r.HostsStatuses)
errs := len(r.HostsStatuses.GetErrorStatuses())

logger := logging.FromContext(ctx)
logger.Infof("received update: %d total / %d done / %d errors", all, all-errs, errs)

return nil
},
func(ctx context.Context, r *results.Result) error { return nil },
)
} else {
querier = gqclient.New(viper.GetString(conf.QueryServerAddr))
}
} else {
// query using local goDB
querier = engine.NewQueryRunner(dbPathCfg, engine.WithKeepAlive(viper.GetDuration(conf.QueryKeepAlive)))
Expand Down Expand Up @@ -410,7 +433,7 @@ func entrypoint(cmd *cobra.Command, args []string) (err error) {
}

// serialize raw results array if json is selected
if stmt.Format == "json" {
if stmt.Format == types.FormatJSON {
err = jsoniter.NewEncoder(stmt.Output).Encode(result)
if err != nil {
return fmt.Errorf("failed to serialize query results: %w", err)
Expand Down
1 change: 1 addition & 0 deletions cmd/goQuery/pkg/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
QueryLog = queryKey + ".log"
QueryKeepAlive = queryKey + ".keepalive"
QueryStats = queryKey + ".stats"
QueryStreaming = queryKey + ".streaming"

dbKey = "db"
QueryDBPath = dbKey + ".path"
Expand Down
31 changes: 16 additions & 15 deletions cmd/goquery_completion/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
//
// On Debian derivatives, we suggest creating a file `goquery` in `/etc/bash_completion.d` with the following contents:
//
// _goquery() {
// case "$3" in
// -d) # the -d flag specifies the database directory.
// # we rely on bash's builtin directory completion.
// COMPREPLY=( $( compgen -d -- "$2" ) )
// ;;
// _goquery() {
// case "$3" in
// -d) # the -d flag specifies the database directory.
// # we rely on bash's builtin directory completion.
// COMPREPLY=( $( compgen -d -- "$2" ) )
// ;;
//
// *)
// if [ -x /usr/local/share/goquery_completion ]; then
// mapfile -t COMPREPLY < <( /usr/local/share/goquery_completion bash "${COMP_POINT}" "${COMP_LINE}" )
// fi
// ;;
// esac
// }
// *)
// if [ -x /usr/local/share/goquery_completion ]; then
// mapfile -t COMPREPLY < <( /usr/local/share/goquery_completion bash "${COMP_POINT}" "${COMP_LINE}" )
// fi
// ;;
// esac
// }
package main

import (
Expand All @@ -40,6 +40,7 @@ import (
"strconv"
"strings"

"github.com/els0r/goProbe/pkg/types"
"github.com/els0r/goProbe/pkg/version"
)

Expand All @@ -56,7 +57,7 @@ const (
// bashUnescape unescapes the given string according to bash's escaping rules
// for autocompletion. Note: the rules for escaping during completion seem
// to differ from those during 'normal' operation of the shell.
// For example, `'hello world''hello world'` is treated as ["hello world", "hello world"]
// For example, `'hello worldhello world'` is treated as ["hello world", "hello world"]
// during completion but would usually be treated as ["hello worldhello world"].
//
// weird is set to true iff we are at a weird position:
Expand Down Expand Up @@ -172,7 +173,7 @@ func bashCompletion(args []string) {
// handled by wrapper bash script
return
case "-e":
printlns(filterPrefix(last(args), "txt", "json", "csv", "influxdb"))
printlns(filterPrefix(last(args), types.FormatTXT, types.FormatJSON, types.FormatCSV, types.FormatInfluxDB))
return
case "-f", "-l", "-h", "--help":
return
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ const (

// ValidationRoute is the route to validate a goquery query
ValidationRoute = QueryRoute + "/validate"

// SSEQueryRoute runs a goquery query with a return channel for partial results
SSEQueryRoute = QueryRoute + "/sse"
)
14 changes: 8 additions & 6 deletions pkg/api/globalquery/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@ package client

import (
"context"
"net/http"

"github.com/els0r/goProbe/pkg/api"
"github.com/els0r/goProbe/pkg/api/client"
"github.com/els0r/goProbe/pkg/query"
"github.com/els0r/goProbe/pkg/results"
"github.com/els0r/goProbe/pkg/types"
"github.com/fako1024/httpc"
)

const (
clientName = "global-query-client"
)

// Client denotes a global query client
type Client struct {
*client.DefaultClient
}

const (
clientName = "global-query-client"
)

// New creates a new client for the global-query API
func New(addr string, opts ...client.Option) *Client {
opts = append(opts, client.WithName(clientName))
Expand All @@ -38,7 +40,7 @@ func (c *Client) Query(ctx context.Context, args *query.Args) (*results.Result,
queryArgs := *args

// whatever happens, the results are expected to be returned in json
queryArgs.Format = "json"
queryArgs.Format = types.FormatJSON

if queryArgs.Caller == "" {
queryArgs.Caller = clientName
Expand All @@ -47,7 +49,7 @@ func (c *Client) Query(ctx context.Context, args *query.Args) (*results.Result,
var res = new(results.Result)

req := c.Modify(ctx,
httpc.NewWithClient("POST", c.NewURL(api.QueryRoute), c.Client()).
httpc.NewWithClient(http.MethodPost, c.NewURL(api.QueryRoute), c.Client()).
EncodeJSON(queryArgs).
ParseJSON(res),
)
Expand Down
Loading

0 comments on commit 552489b

Please sign in to comment.