Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] Implement metadata storage in daily directory names and fast path for list target #313

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions cmd/goQuery/cmd/list.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package cmd

import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"os"
"runtime"
"strings"
Expand All @@ -13,6 +15,7 @@ import (
"github.com/els0r/goProbe/pkg/goDB"
"github.com/els0r/goProbe/pkg/goDB/info"
"github.com/els0r/goProbe/pkg/query"
"github.com/els0r/telemetry/logging"
jsoniter "github.com/json-iterator/go"
"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand Down Expand Up @@ -47,16 +50,37 @@ be broken up into IPv4 and IPv6 flows and the drops for that interface will be s
}

func listInterfacesEntrypoint(_ *cobra.Command, args []string) error {
return listInterfaces(viper.GetString(conf.QueryDBPath), args...)
return listInterfaces(context.Background(), viper.GetString(conf.QueryDBPath), args...)
}

// List interfaces for which data is available and show how many flows and
// how much traffic was observed for each one.
func listInterfaces(dbPath string, ifaces ...string) error {
func listInterfaces(ctx context.Context, dbPath string, ifaces ...string) error {
queryArgs := cmdLineParams

// TODO: consider making this configurable
output := os.Stdout
var ifacesMetadata []*goDB.InterfaceMetadata

logger := logging.FromContext(ctx)

// create query logger
queryLogFile := viper.GetString(conf.QueryLog)
fako1024 marked this conversation as resolved.
Show resolved Hide resolved
var qlogger *logging.L
if queryLogFile != "" {
var err error

logger := logger.With("file", queryLogFile)
logger.Debugf("logging interface list query")

qlogger, err = logging.New(slog.LevelInfo, logging.EncodingJSON, logging.WithFileOutput(queryLogFile))
if err != nil {
logger.Errorf("failed to initialize query logger: %v", err)
} else {
qlogger.With("args", queryArgs).Infof("preparing interface list query")
defer qlogger.Info("interface list query finished")
}
}

first, last, err := query.ParseTimeRange(queryArgs.First, queryArgs.Last)
if err != nil {
Expand Down Expand Up @@ -96,7 +120,7 @@ func listInterfaces(dbPath string, ifaces ...string) error {
dbWorkerManagers = append(dbWorkerManagers, wm)
}

var ifacesMetadata = make([]*goDB.InterfaceMetadata, 0, len(dbWorkerManagers))
ifacesMetadata = make([]*goDB.InterfaceMetadata, 0, len(dbWorkerManagers))
for _, manager := range dbWorkerManagers {
manager := manager

Expand Down
8 changes: 4 additions & 4 deletions cmd/goQuery/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,13 @@ func entrypoint(cmd *cobra.Command, args []string) (err error) {
// in the arguments
dbPathCfg := viper.GetString(conf.QueryDBPath)

queryCtx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, os.Interrupt)
defer stop()

// run commands that don't require any argument
// handle list flag
if cmdLineParams.List {
err := listInterfaces(dbPathCfg)
err := listInterfaces(queryCtx, dbPathCfg)
fako1024 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed to retrieve list of available databases: %w", err)
}
Expand Down Expand Up @@ -288,9 +291,6 @@ func entrypoint(cmd *cobra.Command, args []string) (err error) {
// make sure there's protection against unbounded time intervals
queryArgs = setDefaultTimeRange(&queryArgs)

queryCtx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, os.Interrupt)
defer stop()

// get logger
logger := logging.FromContext(queryCtx)

Expand Down
7 changes: 3 additions & 4 deletions cmd/legacy/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"errors"
"flag"
"fmt"
"io/fs"
Expand Down Expand Up @@ -196,9 +195,9 @@ func main() {

// Skip input if output already exists (unless -overwrite is specified)
if !overwrite {
destPath := gpfile.GenPathForTimestamp(filepath.Join(outPath, iface), epochTS)
if _, err := os.Stat(destPath); !errors.Is(err, fs.ErrNotExist) {
logger.Debugf("skipping already converted dir %s", destPath)
match, exists := gpfile.FindDirForTimestamp(filepath.Join(outPath, iface), epochTS)
if exists {
logger.Debugf("skipping already converted dir %s", match)
continue
}
}
Expand Down
29 changes: 24 additions & 5 deletions examples/analyze-meta/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"io"
"os"
"path/filepath"
"strconv"
"strings"
"text/tabwriter"
"time"
Expand All @@ -20,7 +19,8 @@ import (
)

var (
pathMetaFile string
pathMetaFile string
rewriteMetadata bool
)

func main() {
Expand All @@ -35,21 +35,38 @@ func main() {
logger := logging.Logger()

flag.StringVar(&pathMetaFile, "path", "", "Path to meta file")
flag.BoolVar(&rewriteMetadata, "rewrite-metadata", false, "Rewrite metadata on disk")
flag.Parse()

pathMetaFile = filepath.Clean(pathMetaFile)
dirPath := filepath.Dir(pathMetaFile)
timestamp, err := strconv.ParseInt(filepath.Base(dirPath), 10, 64)

timestamp, suffix, err := gpfile.ExtractTimestampMetadataSuffix(filepath.Base(dirPath))
if err != nil {
logger.Fatalf("failed to extract timestamp: %s", err)
}
baseDirPath := filepath.Dir(filepath.Dir(filepath.Dir(dirPath)))

gpDir := gpfile.NewDir(baseDirPath, timestamp, gpfile.ModeRead)
if rewriteMetadata {
gpDir := gpfile.NewDirWriter(baseDirPath, timestamp)
if err := gpDir.Open(); err != nil {
logger.Fatalf("failed to open GPF dir: %v", err)
}
if err := gpDir.Close(); err != nil {
logger.Fatalf("failed to close GPF dir: %v", err)
}
return
}

gpDir := gpfile.NewDirReader(baseDirPath, timestamp, suffix)
if err := gpDir.Open(); err != nil {
logger.Fatalf("failed to open GPF dir: %v", err)
}
defer gpDir.Close()
defer func() {
if err := gpDir.Close(); err != nil {
logger.Fatalf("failed to close GPF dir: %v", err)
}
}()

for i := types.ColumnIndex(0); i < types.ColIdxCount; i++ {
err = PrintMetaTable(gpDir, i, os.Stdout)
Expand All @@ -59,6 +76,8 @@ func main() {
}
}

// PrintMetaTable displays a comprehensive table with all metadata information read from the
// metadata file
func PrintMetaTable(gpDir *gpfile.GPDir, column types.ColumnIndex, w io.Writer) error {

blockMetadata := gpDir.BlockMetadata[column]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/els0r/telemetry/logging v0.0.0-20231115132112-88976d9255a2
github.com/els0r/telemetry/metrics v0.0.0-20231115132112-88976d9255a2
github.com/els0r/telemetry/tracing v0.0.0-20231115132112-88976d9255a2
github.com/fako1024/gotools/bitpack v0.0.0-20240419120819-e06ca9ce9e88
github.com/fako1024/gotools/bitpack v0.0.0-20240429125115-3a9469ff8610
github.com/fako1024/gotools/concurrency v0.0.0-20230905084243-56aa6a34fb53
github.com/fako1024/httpc v1.0.18
github.com/fako1024/slimcap v1.0.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ github.com/els0r/telemetry/tracing v0.0.0-20231115132112-88976d9255a2 h1:cnctcxr
github.com/els0r/telemetry/tracing v0.0.0-20231115132112-88976d9255a2/go.mod h1:8Cl578dIQOTDB6DkPXFCOQ0Z5MQeXwhOzZB/mcCHDgA=
github.com/fako1024/gotools/bitpack v0.0.0-20240419120819-e06ca9ce9e88 h1:nTZAuKai0dR/olhLiojPnF6dHA749iM/xr2Xg8mtnXo=
github.com/fako1024/gotools/bitpack v0.0.0-20240419120819-e06ca9ce9e88/go.mod h1:4wD0uVvDrCUyQd0YxxugT1GYeGtiWVIZ5qhrCWSiH6I=
github.com/fako1024/gotools/bitpack v0.0.0-20240429125115-3a9469ff8610 h1:BIg6mOiuP1dtCD4/6Byakk2FNSX9avA8ckHv9swZ6TQ=
github.com/fako1024/gotools/bitpack v0.0.0-20240429125115-3a9469ff8610/go.mod h1:4wD0uVvDrCUyQd0YxxugT1GYeGtiWVIZ5qhrCWSiH6I=
github.com/fako1024/gotools/concurrency v0.0.0-20230905084243-56aa6a34fb53 h1:TUa4dGmK5t0ndG3sWocAN6+DIszheg9iHJSgb4P/dus=
github.com/fako1024/gotools/concurrency v0.0.0-20230905084243-56aa6a34fb53/go.mod h1:4mhi2XPVVG6s5zDqL5brEXq2sVZSb4K96zKEJiWrSFs=
github.com/fako1024/httpc v1.0.18 h1:8Eqd4dm1CL9v+y4nugFh9kC4DpvK3sA5NP0QzxA72dY=
Expand Down
73 changes: 48 additions & 25 deletions pkg/goDB/DBWorkManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package goDB
import (
"context"
"fmt"
"io/fs"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -108,8 +109,8 @@ func (w *DBWorkManager) CreateWorkerJobs(tfirst int64, tlast int64) (nonempty bo
var curDir *gpfile.GPDir
workloadBulk := make([]*gpfile.GPDir, 0, WorkBulkSize)

walkFunc := func(numDirs int, dayTimestamp int64) error {
curDir = gpfile.NewDir(w.dbIfaceDir, dayTimestamp, gpfile.ModeRead)
walkFunc := func(numDirs int, dayTimestamp int64, suffix string) error {
curDir = gpfile.NewDirReader(w.dbIfaceDir, dayTimestamp, suffix)

// For the first and last item, check out the GPDir metadata for the actual first and
// last block timestamp to cover (and adapt variables accordingly)
Expand Down Expand Up @@ -169,11 +170,11 @@ func (w *DBWorkManager) CreateWorkerJobs(tfirst int64, tlast int64) (nonempty bo
return 0 < numDirs, nil
}

func skipNonMatching(isDir bool) bool {
return !isDir
func skipNonMatchingDir(entry fs.DirEntry) bool {
return !entry.IsDir()
}

type dbWalkFunc func(numDirs int, dayTimestamp int64) error
type dbWalkFunc func(numDirs int, dayTimestamp int64, suffix string) error

func (w *DBWorkManager) walkDB(tfirst, tlast int64, fn dbWalkFunc) (numDirs int, err error) {
// Get list of years in main directory (ordered by directory name, i.e. time)
Expand All @@ -186,8 +187,8 @@ func (w *DBWorkManager) walkDB(tfirst, tlast int64, fn dbWalkFunc) (numDirs int,
var unixFirst, unixLast = time.Unix(tfirst, 0), time.Unix(tlast+DBWriteInterval, 0)
for _, year := range yearList {

// Skip obvious non-matching entries
if skipNonMatching(year.IsDir()) {
// Skip obvious non-matching entries (anything not a directory)
if skipNonMatchingDir(year) {
continue
}

Expand All @@ -206,8 +207,9 @@ func (w *DBWorkManager) walkDB(tfirst, tlast int64, fn dbWalkFunc) (numDirs int,
return numDirs, err
}
for _, month := range monthList {
// Skip obvious non-matching entries
if skipNonMatching(month.IsDir()) {

// Skip obvious non-matching entries (anything not a directory)
if skipNonMatchingDir(month) {
continue
}

Expand All @@ -227,19 +229,24 @@ func (w *DBWorkManager) walkDB(tfirst, tlast int64, fn dbWalkFunc) (numDirs int,
return numDirs, err
}

for _, file := range dirList {
if skipNonMatching(file.IsDir()) {
for _, timestamp := range dirList {

// Skip obvious non-matching entries (anything not a directory)
if skipNonMatchingDir(timestamp) {
continue
}
dayTimestamp, err := strconv.ParseInt(file.Name(), 10, 64)

// Extract the timestamp (and potentially metadata suffix) from the directory name
dayTimestamp, suffix, err := gpfile.ExtractTimestampMetadataSuffix(timestamp.Name())
if err != nil {
return numDirs, fmt.Errorf("failed to parse epoch timestamp from directory `%s`: %w", file.Name(), err)
return numDirs, fmt.Errorf("failed to parse timestamp / suffix from directory `%s`: %w", timestamp.Name(), err)
}

// check if the directory is within time frame of interest
if tfirst < dayTimestamp+gpfile.EpochDay && dayTimestamp < tlast+DBWriteInterval {

// actual processing upon a match
err := fn(numDirs, dayTimestamp)
err := fn(numDirs, dayTimestamp, suffix)
if err != nil {
return numDirs, err
}
Expand Down Expand Up @@ -267,17 +274,23 @@ func (w *DBWorkManager) ReadMetadata(tfirst int64, tlast int64) (*InterfaceMetad

// make sure to start with zero workloads as the number of assigned
// workloads depends on how many directories have to be read
var curDir *gpfile.GPDir
var (
curDir *gpfile.GPDir
currentTimestamp int64
currentSuffix string
)

var currentTimestamp int64
walkFunc := func(numDirs int, dayTimestamp int64, suffix string) error {

walkFunc := func(numDirs int, dayTimestamp int64) error {
currentTimestamp = dayTimestamp
curDir = gpfile.NewDir(w.dbIfaceDir, dayTimestamp, gpfile.ModeRead, gpFileOptions...)
var err error
currentTimestamp, currentSuffix = dayTimestamp, suffix
curDir = gpfile.NewDirReader(w.dbIfaceDir, dayTimestamp, suffix, gpFileOptions...)

err := curDir.Open()
if err != nil {
return fmt.Errorf("failed to open first GPDir %s to ascertain query block timing: %w", curDir.Path(), err)
if curDir.Metadata == nil {
err = curDir.Open()
if err != nil {
return fmt.Errorf("failed to open GPDir %s to ascertain query block timing: %w", curDir.Path(), err)
}
}

// do the metadata compuation based on the metadata
Expand All @@ -286,6 +299,14 @@ func (w *DBWorkManager) ReadMetadata(tfirst int64, tlast int64) (*InterfaceMetad
// compute the metadata for the first day. If a "first" time argument is given,
// the partial day has to be computed
if numDirs == 0 {

if !curDir.IsOpen() {
err = curDir.Open()
if err != nil {
return fmt.Errorf("failed to open GPDir %s to ascertain query block timing: %w", curDir.Path(), err)
}
}

dirFirst, _ := curDir.TimeRange()
if tfirst >= dirFirst {
// subtract all entries that are smaller than w.tFirstCovered because they were added in the day loop
Expand Down Expand Up @@ -313,8 +334,10 @@ func (w *DBWorkManager) ReadMetadata(tfirst int64, tlast int64) (*InterfaceMetad
w.tFirstCovered = dirFirst
}
}
if err := curDir.Close(); err != nil {
return fmt.Errorf("failed to close first GPDir %s after ascertaining query block timing: %w", curDir.Path(), err)
if curDir.IsOpen() {
if err := curDir.Close(); err != nil {
return fmt.Errorf("failed to close first GPDir %s after ascertaining query block timing: %w", curDir.Path(), err)
}
}
return nil
}
Expand All @@ -327,7 +350,7 @@ func (w *DBWorkManager) ReadMetadata(tfirst int64, tlast int64) (*InterfaceMetad
// compute the metadata for the last block. This will be partial if the last timestamp is smaller than the last
// block captured for the day
if curDir != nil {
curDir = gpfile.NewDir(w.dbIfaceDir, currentTimestamp, gpfile.ModeRead, gpFileOptions...)
curDir = gpfile.NewDirReader(w.dbIfaceDir, currentTimestamp, currentSuffix, gpFileOptions...)

if err := curDir.Open(); err != nil {
return nil, fmt.Errorf("failed to open last GPDir %s to ascertain query block timing: %w", curDir.Path(), err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/goDB/db_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (w *DBWriter) Write(flowmap *hashmap.AggFlowMap, captureStats capturetypes.
err error
)

dir := gpfile.NewDir(filepath.Join(w.dbpath, w.iface), timestamp, gpfile.ModeWrite, gpfile.WithPermissions(w.permissions), gpfile.WithEncoderTypeLevel(w.encoderType, w.encoderLevel))
dir := gpfile.NewDirWriter(filepath.Join(w.dbpath, w.iface), timestamp, gpfile.WithPermissions(w.permissions), gpfile.WithEncoderTypeLevel(w.encoderType, w.encoderLevel))
if err = dir.Open(); err != nil {
return fmt.Errorf("failed to create / open daily directory: %w", err)
}
Expand Down Expand Up @@ -97,7 +97,7 @@ func (w *DBWriter) WriteBulk(workloads []BulkWorkload, dirTimestamp int64) (err
update gpfile.Stats
)

dir := gpfile.NewDir(filepath.Join(w.dbpath, w.iface), dirTimestamp, gpfile.ModeWrite, gpfile.WithPermissions(w.permissions), gpfile.WithEncoderTypeLevel(w.encoderType, w.encoderLevel))
dir := gpfile.NewDirWriter(filepath.Join(w.dbpath, w.iface), dirTimestamp, gpfile.WithPermissions(w.permissions), gpfile.WithEncoderTypeLevel(w.encoderType, w.encoderLevel))
if err = dir.Open(); err != nil {
return fmt.Errorf("failed to create / open daily directory: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/goDB/godb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func populateTestDir(t *testing.T, basePath, iface string, timestamp time.Time)

testPath := filepath.Join(basePath, iface)

f := gpfile.NewDir(testPath, timestamp.Unix(), gpfile.ModeWrite)
f := gpfile.NewDirWriter(testPath, timestamp.Unix())
require.Nil(t, f.Open())

data, update := dbData(generateFlows())
Expand Down
Loading
Loading