Skip to content

Commit

Permalink
[feature] Keep Alive for goQuery: log progress during query execution (
Browse files Browse the repository at this point in the history
…#319)

This commit provides a _simple_ (!) keep-alive mechanism, which can be
used to inform the caller of `goQuery` that a query is still running.

The `--query.keepalive` or `-k` shorthand specifies an interval after
which the caller gets a query processing update. By default, it is 0,
which equates no keep alive.

### Example Output

```nothing
ts=2024-05-23T14:39:01.715Z level=info msg="processed work dir" version=devel worker.id=7 iface=eth0 dir=.db/eth0/2024/03/1710460800 stats.bytes_loaded=0 stats.bytes_decompressed=3848214 stats.blocks_processed=17860 stats.blocks_corrupted=0 stats.directories_processed=31 stats.workloads=1
ts=2024-05-23T14:39:01.931Z level=info msg="processed workload" version=devel iface=eth0 dir=.db/eth0 stats.bytes_loaded=0 stats.bytes_decompressed=7357134 stats.blocks_processed=68980 stats.blocks_corrupted=0 stats.directories_processed=60 stats.workloads=2
ts=2024-05-23T14:39:01.933Z level=info msg="processed workload" version=devel iface=eth0 dir=.db/eth0 stats.bytes_loaded=0 stats.bytes_decompressed=10069197 stats.blocks_processed=105844 stats.blocks_corrupted=0 stats.directories_processed=92 stats.workloads=3
ts=2024-05-23T14:39:01.933Z level=info msg="processed workload" version=devel iface=eth0 dir=.db/eth0 stats.bytes_loaded=0 stats.bytes_decompressed=12768653 stats.blocks_processed=142716 stats.blocks_corrupted=0 stats.directories_processed=124 stats.workloads=4
ts=2024-05-23T14:39:01.968Z level=info msg="processed workload" version=devel iface=eth0 dir=.db/eth0 stats.bytes_loaded=0 stats.bytes_decompressed=16810721 stats.blocks_processed=179588 stats.blocks_corrupted=0 stats.directories_processed=156 stats.workloads=5
ts=2024-05-23T14:39:01.969Z level=info msg="processed workload" version=devel iface=eth0 dir=.db/eth0 stats.bytes_loaded=0 stats.bytes_decompressed=19428092 stats.blocks_processed=216456 stats.blocks_corrupted=0 stats.directories_processed=188 stats.workloads=6
ts=2024-05-23T14:39:01.970Z level=info msg="processed workload" version=devel iface=eth0 dir=.db/eth0 stats.bytes_loaded=0 stats.bytes_decompressed=22129050 stats.blocks_processed=253324 stats.blocks_corrupted=0 stats.directories_processed=220 stats.workloads=7

                           packets   packets            bytes     bytes       
         sip         dip        in       out      %        in       out      %
  200.13.1.2  200.24.1.2   41.38 M   49.49 M  25.26   5.09 GB   6.09 GB  22.47
                               ...       ...              ...       ...       
                          191.16 M  168.53 M         28.11 GB  21.62 GB       
                                                                       
     Totals:                        359.69 M                   49.73 GB  

Timespan    : [2023-09-06 23:58:54, 2024-04-13 20:35:00] (219d20h36m0s)
Interface   : eth0
Sorted by   : accumulated data volume (sent and received)
Query stats : displayed top 1 hits out of 506 in 7.448s
```

### Other Changes

The `stats` label in the log output stems from a `WorkloadStats` struct
which tracks interactions with the DB engine. Mainly how much data is
loaded and processed.

It is meant to be extended in future iterations.

---------

Co-authored-by: Lennart Elsen <lelsen@open-systems.com>
  • Loading branch information
els0r and Lennart Elsen authored May 23, 2024
1 parent 220343f commit 0c2bddf
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 43 deletions.
36 changes: 27 additions & 9 deletions cmd/goQuery/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ goProbe.
pflags.String(conf.StoredQuery, "", "Load JSON serialized query arguments from disk and run them\n")
pflags.Duration(conf.QueryTimeout, query.DefaultQueryTimeout, "Abort query processing after timeout expires\n")
pflags.String(conf.QueryLog, "", "Log query invocations to file\n")
pflags.DurationP(conf.QueryKeepAlive, "k", 0, "Interval to emit log messages showing that query processing is still ongoing\n")

pflags.String(conf.LogLevel, logging.LevelWarn.String(), "log level (debug, info, warn, error, fatal, panic)")

Expand All @@ -194,13 +195,29 @@ goProbe.
}

func initLogger() {
format := logging.EncodingLogfmt

// since this is a command line tool, only warnings and errors should be printed and they
// shouldn't go to a dedicated file
err := logging.Init(logging.LevelFromString(viper.GetString(conf.LogLevel)), logging.EncodingLogfmt,
// shouldn't go to a dedicated file unless json output is requested
opts := []logging.Option{
logging.WithVersion(version.Short()),
logging.WithOutput(os.Stdout),
logging.WithErrorOutput(os.Stderr),
)
}
if cmdLineParams.Format == "json" {
format = logging.EncodingJSON

// if there is a query log, write log lines to that
queryLog := viper.GetString(conf.QueryLog)
if queryLog != "" {
opts = append(opts, logging.WithFileOutput(queryLog))
} else {
// log to stderr so the json output can be parsed from stdout
opts = append(opts, logging.WithOutput(os.Stderr))
}
} else {
opts = append(opts, logging.WithOutput(os.Stdout), logging.WithErrorOutput(os.Stderr))
}

err := logging.Init(logging.LevelFromString(viper.GetString(conf.LogLevel)), format, opts...)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to initialize logger: %v\n", err)
os.Exit(1)
Expand Down Expand Up @@ -297,7 +314,7 @@ func entrypoint(cmd *cobra.Command, args []string) (err error) {
// initialize tracing
shutdown, err := tracing.InitFromFlags(queryCtx)
if err != nil {
logger.Error("failed to set up tracing", "error", err)
logger.With("error", err).Error("failed to set up tracing")
}
defer shutdown(context.Background())

Expand All @@ -311,7 +328,9 @@ func entrypoint(cmd *cobra.Command, args []string) (err error) {
ctx = queryCtx
}

queryArgs.Caller = os.Args[0] // take the full path of called binary
if queryArgs.Caller == "" {
queryArgs.Caller = os.Args[0] // take the full path of called binary
}

// run the query
var result *results.Result
Expand Down Expand Up @@ -342,7 +361,7 @@ func entrypoint(cmd *cobra.Command, args []string) (err error) {
querier = gqclient.New(viper.GetString(conf.QueryServerAddr))
} else {
// query using local goDB
querier = engine.NewQueryRunner(dbPathCfg)
querier = engine.NewQueryRunner(dbPathCfg, engine.WithKeepAlive(viper.GetDuration(conf.QueryKeepAlive)))
}

// check if the traceparent is set
Expand Down Expand Up @@ -380,7 +399,6 @@ func entrypoint(cmd *cobra.Command, args []string) (err error) {
// convert the command line parameters
stmt, err := queryArgs.Prepare()
if err != nil {

// if there's an args error, try to print it in a user-friendly way
return types.ShouldPretty(err, queryPrepFailureMsg)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/goQuery/pkg/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const (
QueryTimeout = queryKey + ".timeout"
QueryHostsResolution = queryKey + ".hosts-resolution"
QueryLog = queryKey + ".log"
QueryKeepAlive = queryKey + ".keepalive"

dbKey = "db"
QueryDBPath = dbKey + ".path"
Expand Down
Loading

0 comments on commit 0c2bddf

Please sign in to comment.