Skip to content

Commit

Permalink
Actually support setting custom compressor and compression level for …
Browse files Browse the repository at this point in the history
…GPDir / GPFile
  • Loading branch information
fako1024 committed Aug 3, 2023
1 parent 8a2f5eb commit 5f87615
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 11 deletions.
25 changes: 17 additions & 8 deletions pkg/goDB/db_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@ type DBWriter struct {
dbpath string
iface string

dayTimestamp int64
encoderType encoders.Type
encoderLevel int
permissions fs.FileMode
}

// NewDBWriter initializes a new DBWriter
func NewDBWriter(dbpath string, iface string, encoderType encoders.Type) (w *DBWriter) {
return &DBWriter{dbpath, iface, 0, encoderType, DefaultPermissions}
return &DBWriter{
dbpath: dbpath,
iface: iface,
encoderType: encoderType,
permissions: DefaultPermissions,
}
}

// Permissions overrides the default permissions for files / directories in the DB
Expand All @@ -45,6 +50,12 @@ func (w *DBWriter) Permissions(permissions fs.FileMode) *DBWriter {
return w
}

// EncoderLevel overrides the default encoder / compressor level for files / directories in the DB
func (w *DBWriter) EncoderLevel(level int) *DBWriter {
w.encoderLevel = level
return w
}

// Write takes an aggregated flow map and its metadata and writes it to disk for a given timestamp
func (w *DBWriter) Write(flowmap *hashmap.AggFlowMap, captureMeta CaptureMetadata, timestamp int64) error {
var (
Expand All @@ -53,10 +64,9 @@ func (w *DBWriter) Write(flowmap *hashmap.AggFlowMap, captureMeta CaptureMetadat
err error
)

dir := gpfile.NewDir(filepath.Join(w.dbpath, w.iface), timestamp, gpfile.ModeWrite, gpfile.WithPermissions(w.permissions))
dir := gpfile.NewDir(filepath.Join(w.dbpath, w.iface), timestamp, gpfile.ModeWrite, gpfile.WithPermissions(w.permissions), gpfile.WithEncoderTypeLevel(w.encoderType, w.encoderLevel))
if err = dir.Open(); err != nil {
err = fmt.Errorf("Could not create / open daily directory: %w", err)
return err
return fmt.Errorf("failed to create / open daily directory: %w", err)
}

data, update = dbData(w.iface, timestamp, flowmap)
Expand Down Expand Up @@ -85,10 +95,9 @@ func (w *DBWriter) WriteBulk(workloads []BulkWorkload, dirTimestamp int64) (err
update gpfile.Stats
)

dir := gpfile.NewDir(filepath.Join(w.dbpath, w.iface), dirTimestamp, gpfile.ModeWrite, gpfile.WithPermissions(w.permissions))
dir := gpfile.NewDir(filepath.Join(w.dbpath, w.iface), dirTimestamp, gpfile.ModeWrite, gpfile.WithPermissions(w.permissions), gpfile.WithEncoderTypeLevel(w.encoderType, w.encoderLevel))
if err = dir.Open(); err != nil {
err = fmt.Errorf("Could not create / open daily directory: %w", err)
return err
return fmt.Errorf("failed to create / open daily directory: %w", err)
}

for _, workload := range workloads {
Expand Down
17 changes: 14 additions & 3 deletions pkg/goDB/storage/gpfile/gpfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ type GPFile struct {
lastSeekPos int64

// defaultEncoderType governs how data blocks are (de-)compressed by default
defaultEncoderType encoders.Type
defaultEncoder encoder.Encoder
freeEncoder bool
defaultEncoderType encoders.Type
defaultEncoderLevel int
defaultEncoder encoder.Encoder
freeEncoder bool

// accessMode denotes if the file is opened for read or write operations (to avoid
// race conditions and unpredictable behavior, only one mode is possible at a time)
Expand Down Expand Up @@ -100,6 +101,9 @@ func New(filename string, header *storage.BlockHeader, accessMode int, options .
if g.defaultEncoder, err = encoder.New(g.defaultEncoderType); err != nil {
return nil, err
}
if g.defaultEncoderLevel > 0 {
g.defaultEncoder.SetLevel(g.defaultEncoderLevel)
}
}

// Preallocate reusable buffers for uncompressed / block data from the global pool
Expand Down Expand Up @@ -336,3 +340,10 @@ func (g *GPFile) setEncoder(e encoder.Encoder) {
g.defaultEncoderType = e.Type()
g.freeEncoder = false
}

func (g *GPFile) setEncoderTypeLevel(t encoders.Type, l int) {
g.defaultEncoderType = t
if l > 0 {
g.defaultEncoderLevel = l
}
}
11 changes: 11 additions & 0 deletions pkg/goDB/storage/gpfile/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io/fs"

"github.com/els0r/goProbe/pkg/goDB/encoder"
"github.com/els0r/goProbe/pkg/goDB/encoder/encoders"
)

// Option defines optional arguments to gpfile
Expand All @@ -19,6 +20,7 @@ type optionSetterFile interface {
optionSetterCommon
setMemPool(MemPoolGCable)
setEncoder(encoder.Encoder)
setEncoderTypeLevel(encoders.Type, int)
}

// WithEncoder allows to set the compression implementation
Expand All @@ -30,6 +32,15 @@ func WithEncoder(e encoder.Encoder) Option {
}
}

// WithEncoderTypeLevel allows to set the compression type and level
func WithEncoderTypeLevel(t encoders.Type, l int) Option {
return func(o any) {
if obj, ok := o.(optionSetterFile); ok {
obj.setEncoderTypeLevel(t, l)
}
}
}

// WithReadAll triggers a full read of the underlying file from disk
// upon first read access to minimize I/O load.
// Seeking is handled by replacing the underlying file with a seekable
Expand Down

0 comments on commit 5f87615

Please sign in to comment.