Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] Goquery: less verbose and frequent keep-alive messages #327

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/global-query/pkg/distributed/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func aggregateResults(ctx context.Context, stmt *query.Statement, queryResults <
finalResult.Summary.First = res.Summary.First
finalResult.Summary.Last = res.Summary.Last
finalResult.Summary.Totals.Add(res.Summary.Totals)
finalResult.Summary.Stats.Add(res.Summary.Stats)

// take the total from the query result. Since there may be overlap between the queries of two
// different systems, the overlap has to be deducted from the total
Expand Down
15 changes: 3 additions & 12 deletions cmd/goQuery/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ goProbe.
pflags.Duration(conf.QueryTimeout, query.DefaultQueryTimeout, "Abort query processing after timeout expires\n")
pflags.String(conf.QueryLog, "", "Log query invocations to file\n")
pflags.DurationP(conf.QueryKeepAlive, "k", 0, "Interval to emit log messages showing that query processing is still ongoing\n")
pflags.Bool(conf.QueryStats, false, "Print query DB interaction statistics\n")

pflags.String(conf.LogLevel, logging.LevelWarn.String(), "log level (debug, info, warn, error, fatal, panic)")

Expand All @@ -204,18 +205,8 @@ func initLogger() {
}
if cmdLineParams.Format == "json" {
format = logging.EncodingJSON

// if there is a query log, write log lines to that
queryLog := viper.GetString(conf.QueryLog)
fako1024 marked this conversation as resolved.
Show resolved Hide resolved
if queryLog != "" {
opts = append(opts, logging.WithFileOutput(queryLog))
} else {
// log to stderr so the json output can be parsed from stdout
opts = append(opts, logging.WithOutput(os.Stderr))
}
} else {
opts = append(opts, logging.WithOutput(os.Stdout), logging.WithErrorOutput(os.Stderr))
}
opts = append(opts, logging.WithOutput(os.Stdout), logging.WithErrorOutput(os.Stderr))

err := logging.Init(logging.LevelFromString(viper.GetString(conf.LogLevel)), format, opts...)
if err != nil {
Expand Down Expand Up @@ -452,7 +443,7 @@ func entrypoint(cmd *cobra.Command, args []string) (err error) {
}
}

err = stmt.Print(ctx, result)
err = stmt.Print(ctx, result, results.WithQueryStats(viper.GetBool(conf.QueryStats)))
if err != nil {
return fmt.Errorf("failed to print query result: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/goQuery/pkg/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
QueryHostsResolution = queryKey + ".hosts-resolution"
QueryLog = queryKey + ".log"
QueryKeepAlive = queryKey + ".keepalive"
QueryStats = queryKey + ".stats"

dbKey = "db"
QueryDBPath = dbKey + ".path"
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ require (
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/chenzhuoyu/iasm v0.9.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fako1024/gotools v1.0.5 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ github.com/els0r/telemetry/metrics v0.0.0-20231115132112-88976d9255a2 h1:Y/FmxE8
github.com/els0r/telemetry/metrics v0.0.0-20231115132112-88976d9255a2/go.mod h1:gUr4b0xsv04I3NRg0Dd5Rt019UQPb3elFNNprfUG6Ew=
github.com/els0r/telemetry/tracing v0.0.0-20231115132112-88976d9255a2 h1:cnctcxrSG2+vevCZCbTPzzU+S8xB+m+W/gO8RVlK9x0=
github.com/els0r/telemetry/tracing v0.0.0-20231115132112-88976d9255a2/go.mod h1:8Cl578dIQOTDB6DkPXFCOQ0Z5MQeXwhOzZB/mcCHDgA=
github.com/fako1024/gotools v1.0.5 h1:iq3u/hZILWYE3E9PaUTd3bKCjXCV5PjyIUOJytv7NPs=
github.com/fako1024/gotools v1.0.5/go.mod h1:foBo9vYEXD2f0S+NVi0/CL3z0geLsJ5tqv+cVmrE6ZU=
github.com/fako1024/gotools/bitpack v0.0.0-20240429125115-3a9469ff8610 h1:BIg6mOiuP1dtCD4/6Byakk2FNSX9avA8ckHv9swZ6TQ=
github.com/fako1024/gotools/bitpack v0.0.0-20240429125115-3a9469ff8610/go.mod h1:4wD0uVvDrCUyQd0YxxugT1GYeGtiWVIZ5qhrCWSiH6I=
github.com/fako1024/gotools/concurrency v0.0.0-20240611104736-1c5ad479f084 h1:6jG3xsJenrWe+mHe6jAVIpkAq6Jha24wEV89UM2fepo=
Expand Down
5 changes: 5 additions & 0 deletions pkg/e2etest/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ func testE2E(t *testing.T, valFilterDescriptor int, datasets ...[]byte) {
t.Errorf("Mismatch on goQuery list target, want %+v, have %+v", listReference, resGoQueryList)
}

// Hack: make the deep equal disregard the DB load stats until we know exactly how they should look like
els0r marked this conversation as resolved.
Show resolved Hide resolved
// TODO: go through all pcaps and calculate these stats
resGoQuery.Summary.Stats = nil
resReference.Summary.Stats = nil

// Summary consistency check (do not fail yet to show details in the next check)
if !reflect.DeepEqual(resReference.Summary, resGoQuery.Summary) {
t.Errorf("Mismatch on goQuery summary, want %+v, have %+v", resReference.Summary, resGoQuery.Summary)
Expand Down
17 changes: 17 additions & 0 deletions pkg/formatting/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ func Count(val uint64) string {
return fmt.Sprintf("%.2f %s", valF, units[count])
}

// CountSmall is like Count, but formats numbers smaller than 1000 as integers
func CountSmall(val uint64) string {
if val >= 1000 {
return Count(val)
}
return fmt.Sprintf("%d", val)
}

// Size prints out size in a human-readable format (e.g. 10 MB)
func Size(size uint64) string {
count := 0
Expand All @@ -68,6 +76,15 @@ func Size(size uint64) string {
return fmt.Sprintf("%.2f %s", sizeF, units[count])
}

// SizeSmall prints out size in a human-readable format (e.g. 10 MB) just like Size,
// but makes sure that sizes under 1024 Byte are formatted as integers
func SizeSmall(size uint64) string {
if size > 1024 {
return Size(size)
}
return fmt.Sprintf("%d %s", size, "B")
}

// Duration prints out d in a human-readable duration format
func Duration(d time.Duration) string {
// enhance the classic duration Stringer to print out days
Expand Down
149 changes: 18 additions & 131 deletions pkg/goDB/DBWorkManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"fmt"
"io/fs"
"log/slog"
"os"
"path/filepath"
"strconv"
Expand All @@ -30,6 +29,7 @@ import (
"github.com/els0r/goProbe/pkg/goDB/storage/gpfile"
"github.com/els0r/goProbe/pkg/types"
"github.com/els0r/goProbe/pkg/types/hashmap"
"github.com/els0r/goProbe/pkg/types/workload"
"github.com/els0r/telemetry/logging"
"github.com/els0r/telemetry/tracing"
"github.com/fako1024/gotools/bitpack"
Expand All @@ -53,109 +53,21 @@ const (
keepAliveInterval = 5 * time.Second
)

// DBWorkload stores all relevant parameters to load a block and execute a query on it
type DBWorkload struct {
workDirs []*gpfile.GPDir

stats *WorkloadStats
statsFuncs StatsFuncs
}

func newDBWorkload(workDirs []*gpfile.GPDir) DBWorkload {
return DBWorkload{
workDirs: workDirs,
stats: &WorkloadStats{Workloads: 1},
}
}

// DBWorkManager schedules parallel processing of blocks relevant for a query
type DBWorkManager struct {
query *Query
dbIfaceDir string // path to interface directory in DB, e.g. /path/to/db/eth0
iface string
workloadChan chan DBWorkload
workloadChan chan *workload.Workload
numProcessingUnits int

tFirstCovered, tLastCovered int64
nWorkloads uint64

keepAlive time.Duration
stats *WorkloadStats
statsCallbacks StatsFuncs
}

// StatsFunc is a function which can be used to communicate running workload statistics
type StatsFunc func(stats *WorkloadStats)

// StatsCallbaacks is a list of StatsFuncs
type StatsFuncs []StatsFunc

func (sfc StatsFuncs) append(fn StatsFunc) StatsFuncs {
return append(sfc, fn)
}

// Execute runs all callbacks
func (scf StatsFuncs) Execute(stats *WorkloadStats) {
for _, fn := range scf {
fn(stats)
}
}

// WorkManagerOption configures the DBWorkManager
type WorkManagerOption func(*DBWorkManager)

// WithKeepAlive adds a logging callback to the StatsCallbacks
func WithKeepAlive(interval time.Duration) WorkManagerOption {
return func(w *DBWorkManager) {
w.keepAlive = interval
}
}

// WithStatsCallbacks configures the DBWorkManager to emit feedback on query processing. They will be called in
// the order they are provided
func WithStatsCallbacks(callbacks ...StatsFunc) WorkManagerOption {
return func(w *DBWorkManager) {
w.statsCallbacks = callbacks
}
}

// WorkloadStats tracks interactions with the underlying DB data
type WorkloadStats struct {
sync.RWMutex

BytesLoaded uint64 `json:"bytes_loaded" doc:"Bytes loaded from disk"`
BytesDecompressed uint64 `json:"bytes_decompressed" doc:"Effective block size after decompression"`
BlocksProcessed uint64 `json:"blocks_processed" doc:"Number of blocks loaded from disk"`
BlocksCorrupted uint64 `json:"blocks_corrupted" doc:"Blocks which could not be loaded or processed"`
DirectoriesProcessed uint64 `json:"directories_processed" doc:"Number of directories processed"`
Workloads uint64 `json:"workloads" doc:"Total number of workloads to be processed"`
}

// LogValue implements the slog.LogValuer interface
func (s *WorkloadStats) LogValue() slog.Value {
return slog.GroupValue(
slog.Uint64("bytes_loaded", s.BytesLoaded),
slog.Uint64("bytes_decompressed", s.BytesDecompressed),
slog.Uint64("blocks_processed", s.BlocksProcessed),
slog.Uint64("blocks_corrupted", s.BlocksCorrupted),
slog.Uint64("directories_processed", s.DirectoriesProcessed),
slog.Uint64("workloads", s.Workloads),
)
}

// Add adds the values of stats to s
func (s *WorkloadStats) Add(stats *WorkloadStats) {
if stats == nil {
return
}
s.BlocksProcessed += stats.BlocksProcessed
s.BytesDecompressed += stats.BytesDecompressed
s.BlocksProcessed += stats.BlocksProcessed
s.BlocksCorrupted += stats.BlocksCorrupted
s.DirectoriesProcessed += stats.DirectoriesProcessed
s.Workloads += stats.Workloads
}

// NewDBWorkManager sets up a new work manager for executing queries
func NewDBWorkManager(query *Query, dbpath string, iface string, numProcessingUnits int, opts ...WorkManagerOption) (*DBWorkManager, error) {
// Explicitly handle invalid number of processing units (to avoid deadlock)
Expand All @@ -167,9 +79,8 @@ func NewDBWorkManager(query *Query, dbpath string, iface string, numProcessingUn
query: query,
dbIfaceDir: filepath.Clean(filepath.Join(dbpath, iface)),
iface: iface,
workloadChan: make(chan DBWorkload, numProcessingUnits*64), // 64 is relatively arbitrary (but we're just sending quite basic objects)
workloadChan: make(chan *workload.Workload, numProcessingUnits*64), // 64 is relatively arbitrary (but we're just sending quite basic objects)
numProcessingUnits: numProcessingUnits,
stats: &WorkloadStats{},
}

for _, opt := range opts {
Expand Down Expand Up @@ -223,7 +134,7 @@ func (w *DBWorkManager) CreateWorkerJobs(tfirst int64, tlast int64) (nonempty bo
// create new workload for the directory
workloadBulk = append(workloadBulk, curDir)
if len(workloadBulk) == WorkBulkSize {
w.workloadChan <- newDBWorkload(workloadBulk)
w.workloadChan <- workload.New(workloadBulk)
w.nWorkloads++
workloadBulk = make([]*gpfile.GPDir, 0, WorkBulkSize)
}
Expand All @@ -236,7 +147,7 @@ func (w *DBWorkManager) CreateWorkerJobs(tfirst int64, tlast int64) (nonempty bo

// Flush any remaining work
if len(workloadBulk) > 0 {
w.workloadChan <- newDBWorkload(workloadBulk)
w.workloadChan <- workload.New(workloadBulk)
w.nWorkloads++
}

Expand Down Expand Up @@ -569,15 +480,8 @@ func (w *DBWorkManager) readMetadataAndEvaluate(workDir *gpfile.GPDir, blocks []
return aggMetadata, nil
}

func logWorkloadStats(logger *logging.L, msg string, stats *WorkloadStats) {
if stats == nil {
return
}
logger.With("stats", stats).Info(msg)
}

// main query processing
func (w *DBWorkManager) grabAndProcessWorkload(ctx context.Context, id int, wg *sync.WaitGroup, workloadChan <-chan DBWorkload, mapChan chan hashmap.AggFlowMapWithMetadata) {
func (w *DBWorkManager) grabAndProcessWorkload(ctx context.Context, id int, wg *sync.WaitGroup, workloadChan <-chan *workload.Workload, mapChan chan hashmap.AggFlowMapWithMetadata) {
go func() {
defer wg.Done()

Expand All @@ -602,17 +506,16 @@ func (w *DBWorkManager) grabAndProcessWorkload(ctx context.Context, id int, wg *
}
}()

lastStatsUpdate := time.Now()
for workload := range workloadChan {
stats := new(WorkloadStats)
for wl := range workloadChan {
stats := new(workload.Stats)
resultMap := hashmap.NewAggFlowMapWithMetadata()
for _, workDir := range workload.workDirs {
for _, workDir := range wl.WorkDirs() {
select {
case <-ctx.Done():
// query was cancelled, exit
w.stats.Lock()
logger.Infof("query cancelled (workload %d / %d)...", w.stats.DirectoriesProcessed, w.nWorkloads)
w.stats.Unlock()
wl.Stats().Lock()
logger.Infof("query cancelled (workload %d / %d)...", wl.Stats().DirectoriesProcessed, w.nWorkloads)
wl.Stats().Unlock()
return
default:
// check if a memory pool is available
Expand All @@ -629,28 +532,12 @@ func (w *DBWorkManager) grabAndProcessWorkload(ctx context.Context, id int, wg *
}
}

workload.stats.Add(stats)
if w.keepAlive > 0 {
if time.Since(lastStatsUpdate) > w.keepAlive {
statsFuncs := workload.statsFuncs.append(func(s *WorkloadStats) {
logWorkloadStats(logger.With("dir", workDir.Path()), "processed directory", s)
})
statsFuncs.Execute(workload.stats)
lastStatsUpdate = time.Now()
}
}
wl.AddStats(stats)
}

// Workload is counted, but we only add it to the final result if we got any entries
w.stats.Lock()
w.stats.Add(workload.stats)
w.stats.Unlock()
if resultMap.Len() > 0 {
if w.keepAlive > 0 {
logWorkloadStats(logger, "processed workload", w.stats)
}
mapChan <- resultMap
}
// we always send the map, even if the resultMap is empty. Filtering is left to the aggregate method
resultMap.Stats = wl.Stats()
mapChan <- resultMap
}
}()
}
Expand Down Expand Up @@ -678,7 +565,7 @@ func (w *DBWorkManager) ExecuteWorkerReadJobs(ctx context.Context, mapChan chan

// Block evaluation and aggregation -----------------------------------------------------
// this is where the actual reading and aggregation magic happens
func (w *DBWorkManager) readBlocksAndEvaluate(workDir *gpfile.GPDir, enc encoder.Encoder, resultMap *hashmap.AggFlowMapWithMetadata) (stats *WorkloadStats, err error) {
func (w *DBWorkManager) readBlocksAndEvaluate(workDir *gpfile.GPDir, enc encoder.Encoder, resultMap *hashmap.AggFlowMapWithMetadata) (stats *workload.Stats, err error) {
logger := logging.Logger()

var (
Expand All @@ -704,7 +591,7 @@ func (w *DBWorkManager) readBlocksAndEvaluate(workDir *gpfile.GPDir, enc encoder
return stats, fmt.Errorf("discovered invalid workload for mismatching interfaces, want `%s`, have `%s`", resultMap.Interface, w.iface)
}

stats = &WorkloadStats{
stats = &workload.Stats{
DirectoriesProcessed: 1,
}

Expand Down
Loading
Loading