From 5f87615e8347544bca6710417b408823038376c4 Mon Sep 17 00:00:00 2001 From: fako1024 Date: Thu, 3 Aug 2023 10:54:08 +0200 Subject: [PATCH] Actually support setting custom compressor and compression level for GPDir / GPFile --- pkg/goDB/db_writer.go | 25 +++++++++++++++++-------- pkg/goDB/storage/gpfile/gpfile.go | 17 ++++++++++++++--- pkg/goDB/storage/gpfile/options.go | 11 +++++++++++ 3 files changed, 42 insertions(+), 11 deletions(-) diff --git a/pkg/goDB/db_writer.go b/pkg/goDB/db_writer.go index ad71cf2d..22eb5744 100644 --- a/pkg/goDB/db_writer.go +++ b/pkg/goDB/db_writer.go @@ -29,14 +29,19 @@ type DBWriter struct { dbpath string iface string - dayTimestamp int64 encoderType encoders.Type + encoderLevel int permissions fs.FileMode } // NewDBWriter initializes a new DBWriter func NewDBWriter(dbpath string, iface string, encoderType encoders.Type) (w *DBWriter) { - return &DBWriter{dbpath, iface, 0, encoderType, DefaultPermissions} + return &DBWriter{ + dbpath: dbpath, + iface: iface, + encoderType: encoderType, + permissions: DefaultPermissions, + } } // Permissions overrides the default permissions for files / directories in the DB @@ -45,6 +50,12 @@ func (w *DBWriter) Permissions(permissions fs.FileMode) *DBWriter { return w } +// EncoderLevel overrides the default encoder / compressor level for files / directories in the DB +func (w *DBWriter) EncoderLevel(level int) *DBWriter { + w.encoderLevel = level + return w +} + // Write takes an aggregated flow map and its metadata and writes it to disk for a given timestamp func (w *DBWriter) Write(flowmap *hashmap.AggFlowMap, captureMeta CaptureMetadata, timestamp int64) error { var ( @@ -53,10 +64,9 @@ func (w *DBWriter) Write(flowmap *hashmap.AggFlowMap, captureMeta CaptureMetadat err error ) - dir := gpfile.NewDir(filepath.Join(w.dbpath, w.iface), timestamp, gpfile.ModeWrite, gpfile.WithPermissions(w.permissions)) + dir := gpfile.NewDir(filepath.Join(w.dbpath, w.iface), timestamp, gpfile.ModeWrite, gpfile.WithPermissions(w.permissions), gpfile.WithEncoderTypeLevel(w.encoderType, w.encoderLevel)) if err = dir.Open(); err != nil { - err = fmt.Errorf("Could not create / open daily directory: %w", err) - return err + return fmt.Errorf("failed to create / open daily directory: %w", err) } data, update = dbData(w.iface, timestamp, flowmap) @@ -85,10 +95,9 @@ func (w *DBWriter) WriteBulk(workloads []BulkWorkload, dirTimestamp int64) (err update gpfile.Stats ) - dir := gpfile.NewDir(filepath.Join(w.dbpath, w.iface), dirTimestamp, gpfile.ModeWrite, gpfile.WithPermissions(w.permissions)) + dir := gpfile.NewDir(filepath.Join(w.dbpath, w.iface), dirTimestamp, gpfile.ModeWrite, gpfile.WithPermissions(w.permissions), gpfile.WithEncoderTypeLevel(w.encoderType, w.encoderLevel)) if err = dir.Open(); err != nil { - err = fmt.Errorf("Could not create / open daily directory: %w", err) - return err + return fmt.Errorf("failed to create / open daily directory: %w", err) } for _, workload := range workloads { diff --git a/pkg/goDB/storage/gpfile/gpfile.go b/pkg/goDB/storage/gpfile/gpfile.go index c4d928f3..1e8ad7a4 100644 --- a/pkg/goDB/storage/gpfile/gpfile.go +++ b/pkg/goDB/storage/gpfile/gpfile.go @@ -57,9 +57,10 @@ type GPFile struct { lastSeekPos int64 // defaultEncoderType governs how data blocks are (de-)compressed by default - defaultEncoderType encoders.Type - defaultEncoder encoder.Encoder - freeEncoder bool + defaultEncoderType encoders.Type + defaultEncoderLevel int + defaultEncoder encoder.Encoder + freeEncoder bool // accessMode denotes if the file is opened for read or write operations (to avoid // race conditions and unpredictable behavior, only one mode is possible at a time) @@ -100,6 +101,9 @@ func New(filename string, header *storage.BlockHeader, accessMode int, options . if g.defaultEncoder, err = encoder.New(g.defaultEncoderType); err != nil { return nil, err } + if g.defaultEncoderLevel > 0 { + g.defaultEncoder.SetLevel(g.defaultEncoderLevel) + } } // Preallocate reusable buffers for uncompressed / block data from the global pool @@ -336,3 +340,10 @@ func (g *GPFile) setEncoder(e encoder.Encoder) { g.defaultEncoderType = e.Type() g.freeEncoder = false } + +func (g *GPFile) setEncoderTypeLevel(t encoders.Type, l int) { + g.defaultEncoderType = t + if l > 0 { + g.defaultEncoderLevel = l + } +} diff --git a/pkg/goDB/storage/gpfile/options.go b/pkg/goDB/storage/gpfile/options.go index 2d3cd26c..a2d1c5fd 100644 --- a/pkg/goDB/storage/gpfile/options.go +++ b/pkg/goDB/storage/gpfile/options.go @@ -4,6 +4,7 @@ import ( "io/fs" "github.com/els0r/goProbe/pkg/goDB/encoder" + "github.com/els0r/goProbe/pkg/goDB/encoder/encoders" ) // Option defines optional arguments to gpfile @@ -19,6 +20,7 @@ type optionSetterFile interface { optionSetterCommon setMemPool(MemPoolGCable) setEncoder(encoder.Encoder) + setEncoderTypeLevel(encoders.Type, int) } // WithEncoder allows to set the compression implementation @@ -30,6 +32,15 @@ func WithEncoder(e encoder.Encoder) Option { } } +// WithEncoderTypeLevel allows to set the compression type and level +func WithEncoderTypeLevel(t encoders.Type, l int) Option { + return func(o any) { + if obj, ok := o.(optionSetterFile); ok { + obj.setEncoderTypeLevel(t, l) + } + } +} + // WithReadAll triggers a full read of the underlying file from disk // upon first read access to minimize I/O load. // Seeking is handled by replacing the underlying file with a seekable