Skip to content

Commit

Permalink
Amend PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
fako1024 committed May 10, 2024
1 parent 8a6e135 commit fe57523
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 33 deletions.
5 changes: 2 additions & 3 deletions cmd/goQuery/cmd/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ be broken up into IPv4 and IPv6 flows and the drops for that interface will be s
}

func listInterfacesEntrypoint(_ *cobra.Command, args []string) error {
return listInterfaces(context.Background(), viper.GetString(conf.QueryDBPath), args...)
return listInterfaces(context.Background(), viper.GetString(conf.QueryDBPath), viper.GetString(conf.QueryLog), args...)
}

// List interfaces for which data is available and show how many flows and
// how much traffic was observed for each one.
func listInterfaces(ctx context.Context, dbPath string, ifaces ...string) error {
func listInterfaces(ctx context.Context, dbPath, queryLogFile string, ifaces ...string) error {
queryArgs := cmdLineParams

// TODO: consider making this configurable
Expand All @@ -65,7 +65,6 @@ func listInterfaces(ctx context.Context, dbPath string, ifaces ...string) error
logger := logging.FromContext(ctx)

// create query logger
queryLogFile := viper.GetString(conf.QueryLog)
var qlogger *logging.L
if queryLogFile != "" {
var err error
Expand Down
2 changes: 1 addition & 1 deletion cmd/goQuery/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func entrypoint(cmd *cobra.Command, args []string) (err error) {
// run commands that don't require any argument
// handle list flag
if cmdLineParams.List {
err := listInterfaces(queryCtx, dbPathCfg)
err := listInterfaces(queryCtx, dbPathCfg, viper.GetString(conf.QueryLog))
if err != nil {
return fmt.Errorf("failed to retrieve list of available databases: %w", err)
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/goDB/storage/gpfile/gpdir.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ func (d *GPDir) TimeRange() (first int64, last int64) {
d.BlockMetadata[0].Blocks()[d.BlockMetadata[0].NBlocks()-1].Timestamp
}

const (
minMetadataFileSize = 73
minMetadataFileSizePos = minMetadataFileSize - 1
)

// Unmarshal reads and unmarshals a serialized metadata set into the GPDir instance
func (d *GPDir) Unmarshal(r concurrency.ReadWriteSeekCloser) error {

Expand All @@ -245,10 +250,10 @@ func (d *GPDir) Unmarshal(r concurrency.ReadWriteSeekCloser) error {
}

data := memFile.Data()
if len(data) < 73 {
if len(data) < minMetadataFileSize {
return fmt.Errorf("%w (len: %d)", ErrInputSizeTooSmall, len(data))
}
_ = data[72] // Compiler hint
_ = data[minMetadataFileSizePos] // Compiler hint

d.Metadata = newMetadata()

Expand All @@ -261,7 +266,7 @@ func (d *GPDir) Unmarshal(r concurrency.ReadWriteSeekCloser) error {
d.Metadata.Counts.BytesSent = binary.BigEndian.Uint64(data[48:56]) // Get global Counters (BytesSent)
d.Metadata.Counts.PacketsRcvd = binary.BigEndian.Uint64(data[56:64]) // Get global Counters (PacketsRcvd)
d.Metadata.Counts.PacketsSent = binary.BigEndian.Uint64(data[64:72]) // Get global Counters (PacketsSent)
pos := 72
pos := minMetadataFileSizePos

// Get block information
for i := 0; i < int(types.ColIdxCount); i++ {
Expand Down Expand Up @@ -337,7 +342,7 @@ func (d *GPDir) Marshal(w concurrency.ReadWriteSeekCloser) error {
binary.BigEndian.PutUint64(data[48:56], d.Metadata.Counts.BytesSent) // Store global Counters (BytesSent)
binary.BigEndian.PutUint64(data[56:64], d.Metadata.Counts.PacketsRcvd) // Store global Counters (PacketsRcvd)
binary.BigEndian.PutUint64(data[64:72], d.Metadata.Counts.PacketsSent) // Store global Counters (PacketsSent)
pos := 72
pos := minMetadataFileSizePos

if nBlocks > 0 {

Expand Down
51 changes: 26 additions & 25 deletions pkg/goDB/storage/gpfile/gpfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
)

var (
testDirPath = filepath.Join(testBasePath, "test_db")
testFilePath = filepath.Join(testBasePath, "test.gpf")

testEncoders = []encoders.Type{
Expand Down Expand Up @@ -59,7 +60,7 @@ func TestDirPermissions(t *testing.T) {
0644,
} {
t.Run(perm.String(), func(t *testing.T) {
require.Nil(t, os.RemoveAll("/tmp/test_db"))
require.Nil(t, os.RemoveAll(testDirPath))

// default (no option provided) should amount to default permissions
opts := []Option{WithPermissions(perm)}
Expand All @@ -68,15 +69,15 @@ func TestDirPermissions(t *testing.T) {
perm = defaultPermissions
}

testDir := NewDirWriter("/tmp/test_db", 1000, opts...)
testDir := NewDirWriter(testDirPath, 1000, opts...)
require.Nil(t, testDir.Open(), "error opening test dir for writing")
require.Nil(t, testDir.Close(), "error closing test dir")

stat, err := os.Stat("/tmp/test_db")
stat, err := os.Stat(testDirPath)
require.Nil(t, err, "failed to call Stat() on new GPDir")
require.Equal(t, stat.Mode().Perm(), calculateDirPerm(perm), stat.Mode().String())

stat, err = os.Stat(filepath.Join("/tmp/test_db/1970/01/0_0-0-0-0-0-0-0", metadataFileName))
stat, err = os.Stat(filepath.Join(testDirPath, "1970/01/0_0-0-0-0-0-0-0", metadataFileName))
require.Nil(t, err, "failed to call Stat() on block metadata file")
require.Equal(t, stat.Mode().Perm(), perm, stat.Mode().String())
})
Expand Down Expand Up @@ -220,23 +221,23 @@ func testRoundtrip(t *testing.T, encType encoders.Type) {

func TestInvalidMetadata(t *testing.T) {

require.Nil(t, os.RemoveAll("/tmp/test_db"))
require.Nil(t, os.MkdirAll("/tmp/test_db/1970/01/0", 0750), "error creating test dir for reading")
require.Nil(t, os.WriteFile("/tmp/test_db/1970/01/0/.blockmeta", []byte{0x1}, 0600), "error creating test metdadata for reading")
require.Nil(t, os.RemoveAll(testDirPath))
require.Nil(t, os.MkdirAll(filepath.Join(testDirPath, "1970/01/0"), 0750), "error creating test dir for reading")
require.Nil(t, os.WriteFile(filepath.Join(testDirPath, "1970/01/0/.blockmeta"), []byte{0x1}, 0600), "error creating test metdadata for reading")

testDir := NewDirReader("/tmp/test_db", 1000, "")
testDir := NewDirReader(testDirPath, 1000, "")
require.ErrorIs(t, testDir.Open(), ErrInputSizeTooSmall)
}

func TestEmptyMetadata(t *testing.T) {

require.Nil(t, os.RemoveAll("/tmp/test_db"))
require.Nil(t, os.RemoveAll(testDirPath))

testDir := NewDirWriter("/tmp/test_db", 1000)
testDir := NewDirWriter(testDirPath, 1000)
require.Nil(t, testDir.Open(), "error opening test dir for writing")
require.Nil(t, testDir.Close(), "error writing test dir")

testDir = NewDirReader("/tmp/test_db", 1000, "0-0-0-0-0-0-0")
testDir = NewDirReader(testDirPath, 1000, "0-0-0-0-0-0-0")
require.Nil(t, testDir.Open(), "error opening test dir for reading")

for i := 0; i < int(types.ColIdxCount); i++ {
Expand All @@ -248,9 +249,9 @@ func TestEmptyMetadata(t *testing.T) {

func TestMetadataRoundTrip(t *testing.T) {

require.Nil(t, os.RemoveAll("/tmp/test_db"))
require.Nil(t, os.RemoveAll(testDirPath))

testDir := NewDirWriter("/tmp/test_db", 1000)
testDir := NewDirWriter(testDirPath, 1000)
require.Nil(t, testDir.Open(), "error opening test dir for writing")

for i := 0; i < int(types.ColIdxCount); i++ {
Expand Down Expand Up @@ -299,11 +300,11 @@ func TestMetadataRoundTrip(t *testing.T) {
require.Nil(t, jsoniter.NewDecoder(buf).Decode(&refMetadata), "error decoding reference data for later comparison")
require.Nil(t, testDir.Close(), "error writing test dir")

_, fullPath := genWritePathForTimestamp("/tmp/test_db", 1000)
_, fullPath := genWritePathForTimestamp(testDirPath, 1000)
ts, suffix, err := ExtractTimestampMetadataSuffix(filepath.Base(fullPath))
require.Nil(t, err)

testDir = NewDirReader("/tmp/test_db", ts, suffix)
testDir = NewDirReader(testDirPath, ts, suffix)
require.Nil(t, testDir.Open(), "error opening test dir for reading")

require.Equal(t, testDir.Metadata.BlockTraffic, refMetadata.BlockTraffic, "mismatched global block metadata")
Expand All @@ -326,18 +327,18 @@ func TestMetadataRoundTrip(t *testing.T) {

func TestBrokenAccess(t *testing.T) {

require.Nil(t, os.RemoveAll("/tmp/test_db"))
require.Nil(t, os.RemoveAll(testDirPath))

// Write some blocks and flush the data to disk
testDir := NewDirWriter("/tmp/test_db", 1000)
testDir := NewDirWriter(testDirPath, 1000)
require.Nil(t, testDir.Open(), "error opening test dir for writing")
require.Nil(t, writeDummyBlock(1, testDir, 1), "failed to write blocks")
require.Nil(t, writeDummyBlock(2, testDir, 2), "failed to write blocks")
expectedOffset := testDir.BlockMetadata[0].CurrentOffset
require.Nil(t, testDir.Close(), "error writing test dir")

// Append another block, flush the GPFiles but "fail" to write the metadata
testDir = NewDirWriter("/tmp/test_db", 1000)
testDir = NewDirWriter(testDirPath, 1000)
require.Nil(t, testDir.Open(), "error opening test dir for writing")
require.Nil(t, writeDummyBlock(3, testDir, 3), "failed to write blocks")
require.NotEqual(t, expectedOffset, testDir.BlockMetadata[0].CurrentOffset)
Expand All @@ -347,12 +348,12 @@ func TestBrokenAccess(t *testing.T) {
}
}

_, fullPath := genWritePathForTimestamp("/tmp/test_db", 1000)
_, fullPath := genWritePathForTimestamp(testDirPath, 1000)
ts, suffix, err := ExtractTimestampMetadataSuffix(filepath.Base(fullPath))
require.Nil(t, err)

// Read the directory and validate that we only "see" two blocks on metadata level
testDir = NewDirReader("/tmp/test_db", ts, suffix)
testDir = NewDirReader(testDirPath, ts, suffix)
require.Nil(t, testDir.Open(), "error opening test dir for reading")
require.Equal(t, expectedOffset, testDir.BlockMetadata[0].CurrentOffset)
require.Equal(t, testDir.BlockMetadata[0].NBlocks(), 2)
Expand All @@ -372,7 +373,7 @@ func TestBrokenAccess(t *testing.T) {
})

// Append another two blocks and write normally
testDir = NewDirWriter("/tmp/test_db", 1000)
testDir = NewDirWriter(testDirPath, 1000)
require.Nil(t, testDir.Open(), "error opening test dir for writing")
require.Equal(t, expectedOffset, testDir.BlockMetadata[0].CurrentOffset)
require.Nil(t, writeDummyBlock(4, testDir, 4), "failed to write blocks")
Expand All @@ -381,12 +382,12 @@ func TestBrokenAccess(t *testing.T) {
require.Equal(t, testDir.BlockMetadata[0].NBlocks(), 4)
require.Nil(t, testDir.Close(), "error writing test dir")

_, fullPath = genWritePathForTimestamp("/tmp/test_db", 1000)
_, fullPath = genWritePathForTimestamp(testDirPath, 1000)
ts, suffix, err = ExtractTimestampMetadataSuffix(filepath.Base(fullPath))
require.Nil(t, err)

// Read the directory and validate that we only "see" four blocks on metadata level
testDir = NewDirReader("/tmp/test_db", ts, suffix)
testDir = NewDirReader(testDirPath, ts, suffix)
require.Nil(t, testDir.Open(), "error opening test dir for reading")
require.Equal(t, expectedOffset, testDir.BlockMetadata[0].CurrentOffset)
require.Equal(t, testDir.BlockMetadata[0].NBlocks(), 4)
Expand Down Expand Up @@ -416,8 +417,8 @@ func TestDailyDirectoryPathLayers(t *testing.T) {
for year := 1970; year < 2200; year++ {
for month := time.January; month <= time.December; month++ {
for day := 1; day <= time.Date(year, month+1, 0, 0, 0, 0, 0, time.UTC).Day(); day++ {
gpDir := NewDirReader("/tmp/test_db", time.Date(year, month, day, 0, 0, 0, 0, time.UTC).Unix(), "")
require.Equal(t, gpDir.dirPath, filepath.Join("/tmp/test_db", fmt.Sprintf("%d", year), fmt.Sprintf("%02d", month), fmt.Sprintf("%d", DirTimestamp(time.Date(year, month, day, 0, 0, 0, 0, time.UTC).Unix()))))
gpDir := NewDirReader(testDirPath, time.Date(year, month, day, 0, 0, 0, 0, time.UTC).Unix(), "")
require.Equal(t, gpDir.dirPath, filepath.Join(testDirPath, fmt.Sprintf("%d", year), fmt.Sprintf("%02d", month), fmt.Sprintf("%d", DirTimestamp(time.Date(year, month, day, 0, 0, 0, 0, time.UTC).Unix()))))
}
}
}
Expand Down

0 comments on commit fe57523

Please sign in to comment.