Skip to content

Commit

Permalink
RSDK-8872 - flakey test fix (#4407)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicksanford authored Oct 1, 2024
1 parent 59e3c5c commit d8a1647
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 56 deletions.
96 changes: 46 additions & 50 deletions services/datamanager/builtin/builtin_capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@ package builtin

import (
"context"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

"github.com/golang/geo/r3"
"github.com/pkg/errors"
v1 "go.viam.com/api/app/datasync/v1"
"go.viam.com/test"

Expand Down Expand Up @@ -148,6 +145,9 @@ func TestDataCaptureEnabled(t *testing.T) {
c.CaptureDir = initCaptureDir
c.CaptureDisabled = tc.initialServiceDisableStatus
c.ScheduledSyncDisabled = true
// MaximumCaptureFileSizeBytes is set to 1 so that each reading becomes its own capture file
// and we can confidently read the capture file without it's contents being modified by the collector
c.MaximumCaptureFileSizeBytes = 1

// Build and start data manager.
b, err := New(context.Background(), deps, config, datasync.NoOpCloudClientConstructor, connToConnectivityStateError, logger)
Expand Down Expand Up @@ -176,6 +176,9 @@ func TestDataCaptureEnabled(t *testing.T) {
c2.CaptureDisabled = tc.newServiceDisableStatus
c2.ScheduledSyncDisabled = true
c2.CaptureDir = updatedCaptureDir
// MaximumCaptureFileSizeBytes is set to 1 so that each reading becomes its own capture file
// and we can confidently read the capture file without it's contents being modified by the collector
c2.MaximumCaptureFileSizeBytes = 1

// Update to new config and let it run for a bit.
err = b.Reconfigure(context.Background(), deps, updatedConfig)
Expand All @@ -198,7 +201,7 @@ func TestDataCaptureEnabled(t *testing.T) {
}
}

func TestSwitchResource(t *testing.T) {
func TestReconfigureResource(t *testing.T) {
logger := logging.NewTestLogger(t)
captureDir := t.TempDir()

Expand All @@ -218,6 +221,9 @@ func TestSwitchResource(t *testing.T) {
c.CaptureDisabled = false
c.ScheduledSyncDisabled = true
c.CaptureDir = captureDir
// MaximumCaptureFileSizeBytes is set to 1 so that each reading becomes its own capture file
// and we can confidently read the capture file without it's contents being modified by the collector
c.MaximumCaptureFileSizeBytes = 1

// Build and start data manager.
b, err := New(context.Background(), deps, config, datasync.NoOpCloudClientConstructor, connToConnectivityStateError, logger)
Expand Down Expand Up @@ -245,63 +251,53 @@ func TestSwitchResource(t *testing.T) {
err = b.Reconfigure(context.Background(), deps2, config)
test.That(t, err, test.ShouldBeNil)

dataBeforeSwitch, err := getSensorData(captureDir)
test.That(t, err, test.ShouldBeNil)

// Test that sensor data is captured from the new collector.
// wait for all the files on disk to
waitForCaptureFilesToExceedNFiles(captureDir, len(getAllFileInfos(captureDir)), logger)
testFilesContainSensorData(t, captureDir)

filePaths := getAllFilePaths(captureDir)
test.That(t, len(filePaths), test.ShouldEqual, 2)
// Test that sensor data is captured from the new collector.
var (
captureDataHasZeroReadings bool
captureDataHasNonZeroReadings bool
)

for _, fp := range getAllFilePaths(captureDir) {
// ignore in progress files
if filepath.Ext(fp) == data.InProgressCaptureFileExt {
continue
}
initialData, err := data.SensorDataFromCaptureFilePath(fp)
test.That(t, err, test.ShouldBeNil)
for _, d := range initialData {
// Each resource's mocked capture method outputs a different value.
// Assert that we see the expected data captured by the initial arm1 resource.
pose := d.GetStruct().GetFields()["pose"].GetStructValue().GetFields()
if pose["x"].GetNumberValue() == 0 && pose["y"].GetNumberValue() == 0 && pose["z"].GetNumberValue() == 0 {
captureDataHasZeroReadings = true
}

initialData, err := data.SensorDataFromCaptureFilePath(filePaths[0])
test.That(t, err, test.ShouldBeNil)
for _, d := range initialData {
// Each resource's mocked capture method outputs a different value.
// Assert that we see the expected data captured by the initial arm1 resource.
test.That(
t,
d.GetStruct().GetFields()["pose"].GetStructValue().GetFields()["x"].GetNumberValue(),
test.ShouldEqual,
float64(0),
)
if pose["x"].GetNumberValue() == 888 && pose["y"].GetNumberValue() == 888 && pose["z"].GetNumberValue() == 888 {
captureDataHasNonZeroReadings = true
}
}
}
// Assert that the initial arm1 resource isn't capturing any more data.
test.That(t, len(initialData), test.ShouldEqual, len(dataBeforeSwitch))

newData, err := data.SensorDataFromCaptureFilePath(filePaths[1])
test.That(t, err, test.ShouldBeNil)
for _, d := range newData {
// Assert that we see the expected data captured by the updated arm1 resource.
test.That(
t,
d.GetStruct().GetFields()["pose"].GetStructValue().GetFields()["x"].GetNumberValue(),
test.ShouldEqual,
float64(888),
)
}
// Assert that the updated arm1 resource is capturing data.
test.That(t, len(newData), test.ShouldBeGreaterThan, 0)
// Assert that both the sensor data from the first instance of `arm1` was captured as well as data from the second instance
test.That(t, captureDataHasZeroReadings, test.ShouldBeTrue)
test.That(t, captureDataHasNonZeroReadings, test.ShouldBeTrue)
}

func getSensorData(dir string) ([]*v1.SensorData, error) {
var sd []*v1.SensorData
filePaths := getAllFilePaths(dir)
for _, path := range filePaths {
if filepath.Ext(path) == data.InProgressCaptureFileExt {
continue
}
d, err := data.SensorDataFromCaptureFilePath(path)
// It's possible a file was closed (and so its extension changed) in between the points where we gathered
// file names and here. So if the file does not exist, check if the extension has just been changed.
if errors.Is(err, os.ErrNotExist) {
path = strings.TrimSuffix(path, filepath.Ext(path)) + data.CompletedCaptureFileExt
d, err = data.SensorDataFromCaptureFilePath(path)
if err != nil {
return nil, err
}
} else if err != nil {
if err != nil {
return nil, err
}

sd = append(sd, d...)
}
return sd, nil
Expand All @@ -327,16 +323,16 @@ func waitForCaptureFilesToExceedNFiles(captureDir string, n int, logger logging.
start := time.Now()
for {
files := getAllFileInfos(captureDir)
nonEmptyFiles := 0
captureFiles := 0
for idx := range files {
if files[idx].Size() > int64(emptyFileBytesSize) {
if files[idx].Size() > int64(emptyFileBytesSize) && filepath.Ext(files[idx].Name()) == data.CompletedCaptureFileExt {
// Every datamanager file has at least 90 bytes of metadata. Wait for that to be
// observed before considering the file as "existing".
nonEmptyFiles++
captureFiles++
}

// We have N+1 files. No need to count any more.
if nonEmptyFiles > n {
if captureFiles > n {
return
}
}
Expand Down
6 changes: 6 additions & 0 deletions services/datamanager/builtin/builtin_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ func TestSyncEnabled(t *testing.T) {
c.ScheduledSyncDisabled = tc.syncStartDisabled
c.CaptureDir = tmpDir
c.SyncIntervalMins = syncIntervalMins
// MaximumCaptureFileSizeBytes is set to 1 so that each reading becomes its own capture file
// and we can confidently read the capture file without it's contents being modified by the collector
c.MaximumCaptureFileSizeBytes = 1

b, err := New(context.Background(), deps, config, dataSyncServiceClientConstructor, tc.connStateConstructor, logger)
test.That(t, err, test.ShouldBeNil)
Expand Down Expand Up @@ -771,6 +774,9 @@ func TestStreamingDCUpload(t *testing.T) {
c.ScheduledSyncDisabled = true
c.SyncIntervalMins = syncIntervalMins
c.CaptureDir = tmpDir
// MaximumCaptureFileSizeBytes is set to 1 so that each reading becomes its own capture file
// and we can confidently read the capture file without it's contents being modified by the collector
c.MaximumCaptureFileSizeBytes = 1
b, err := New(context.Background(), deps, config, datasync.NoOpCloudClientConstructor, ConnToConnectivityStateReady, logger)
test.That(t, err, test.ShouldBeNil)

Expand Down
15 changes: 9 additions & 6 deletions services/datamanager/builtin/builtin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ func TestFileDeletion(t *testing.T) {
// create sync clock so we can control when a single iteration of file deltion happens
c := config.ConvertedAttributes.(*Config)
c.CaptureDir = tempDir
// MaximumCaptureFileSizeBytes is set to 1 so that each reading becomes its own capture file
// and we can confidently read the capture file without it's contents being modified by the collector
c.MaximumCaptureFileSizeBytes = 1
bSvc, err := New(ctx, deps, config, datasync.NoOpCloudClientConstructor, connToConnectivityStateError, logger)
test.That(t, err, test.ShouldBeNil)
b := bSvc.(*builtIn)
Expand Down Expand Up @@ -839,7 +842,7 @@ func getAllFiles(dir string) ([]os.FileInfo, []string) {
func waitForCaptureFilesToEqualNFiles(ctx context.Context, captureDir string, n int, logger logging.Logger) error {
var diagnostics sync.Once
start := time.Now()
nonEmptyFiles := 0
captureFiles := 0
files := []fs.FileInfo{}
i := 0
for {
Expand All @@ -848,20 +851,20 @@ func waitForCaptureFilesToEqualNFiles(ctx context.Context, captureDir string, n
for _, f := range files {
fNames = append(fNames, f.Name())
}
logger.Errorf("target: %d, iterations: %d, nonEmptyFiles: %d, files: %v", n, i, nonEmptyFiles, fNames)
logger.Errorf("target: %d, iterations: %d, captureFiles: %d, files: %v", n, i, captureFiles, fNames)
return err
}
files = getAllFileInfos(captureDir)
nonEmptyFiles = 0
captureFiles = 0
for idx := range files {
if files[idx].Size() > int64(emptyFileBytesSize) {
if files[idx].Size() > int64(emptyFileBytesSize) && filepath.Ext(files[idx].Name()) == data.CompletedCaptureFileExt {
// Every datamanager file has at least 90 bytes of metadata. Wait for that to be
// observed before considering the file as "existing".
nonEmptyFiles++
captureFiles++
}
}

if nonEmptyFiles == n {
if captureFiles == n {
return nil
}

Expand Down

0 comments on commit d8a1647

Please sign in to comment.