diff --git a/services/datamanager/builtin/builtin_capture_test.go b/services/datamanager/builtin/builtin_capture_test.go index e310e0de3d0..6e15dc95c6d 100644 --- a/services/datamanager/builtin/builtin_capture_test.go +++ b/services/datamanager/builtin/builtin_capture_test.go @@ -2,15 +2,12 @@ package builtin import ( "context" - "os" "path/filepath" - "strings" "sync" "testing" "time" "github.com/golang/geo/r3" - "github.com/pkg/errors" v1 "go.viam.com/api/app/datasync/v1" "go.viam.com/test" @@ -148,6 +145,9 @@ func TestDataCaptureEnabled(t *testing.T) { c.CaptureDir = initCaptureDir c.CaptureDisabled = tc.initialServiceDisableStatus c.ScheduledSyncDisabled = true + // MaximumCaptureFileSizeBytes is set to 1 so that each reading becomes its own capture file + // and we can confidently read the capture file without it's contents being modified by the collector + c.MaximumCaptureFileSizeBytes = 1 // Build and start data manager. b, err := New(context.Background(), deps, config, datasync.NoOpCloudClientConstructor, connToConnectivityStateError, logger) @@ -176,6 +176,9 @@ func TestDataCaptureEnabled(t *testing.T) { c2.CaptureDisabled = tc.newServiceDisableStatus c2.ScheduledSyncDisabled = true c2.CaptureDir = updatedCaptureDir + // MaximumCaptureFileSizeBytes is set to 1 so that each reading becomes its own capture file + // and we can confidently read the capture file without it's contents being modified by the collector + c2.MaximumCaptureFileSizeBytes = 1 // Update to new config and let it run for a bit. err = b.Reconfigure(context.Background(), deps, updatedConfig) @@ -198,7 +201,7 @@ func TestDataCaptureEnabled(t *testing.T) { } } -func TestSwitchResource(t *testing.T) { +func TestReconfigureResource(t *testing.T) { logger := logging.NewTestLogger(t) captureDir := t.TempDir() @@ -218,6 +221,9 @@ func TestSwitchResource(t *testing.T) { c.CaptureDisabled = false c.ScheduledSyncDisabled = true c.CaptureDir = captureDir + // MaximumCaptureFileSizeBytes is set to 1 so that each reading becomes its own capture file + // and we can confidently read the capture file without it's contents being modified by the collector + c.MaximumCaptureFileSizeBytes = 1 // Build and start data manager. b, err := New(context.Background(), deps, config, datasync.NoOpCloudClientConstructor, connToConnectivityStateError, logger) @@ -245,63 +251,53 @@ func TestSwitchResource(t *testing.T) { err = b.Reconfigure(context.Background(), deps2, config) test.That(t, err, test.ShouldBeNil) - dataBeforeSwitch, err := getSensorData(captureDir) - test.That(t, err, test.ShouldBeNil) - - // Test that sensor data is captured from the new collector. + // wait for all the files on disk to waitForCaptureFilesToExceedNFiles(captureDir, len(getAllFileInfos(captureDir)), logger) testFilesContainSensorData(t, captureDir) - filePaths := getAllFilePaths(captureDir) - test.That(t, len(filePaths), test.ShouldEqual, 2) + // Test that sensor data is captured from the new collector. + var ( + captureDataHasZeroReadings bool + captureDataHasNonZeroReadings bool + ) + + for _, fp := range getAllFilePaths(captureDir) { + // ignore in progress files + if filepath.Ext(fp) == data.InProgressCaptureFileExt { + continue + } + initialData, err := data.SensorDataFromCaptureFilePath(fp) + test.That(t, err, test.ShouldBeNil) + for _, d := range initialData { + // Each resource's mocked capture method outputs a different value. + // Assert that we see the expected data captured by the initial arm1 resource. + pose := d.GetStruct().GetFields()["pose"].GetStructValue().GetFields() + if pose["x"].GetNumberValue() == 0 && pose["y"].GetNumberValue() == 0 && pose["z"].GetNumberValue() == 0 { + captureDataHasZeroReadings = true + } - initialData, err := data.SensorDataFromCaptureFilePath(filePaths[0]) - test.That(t, err, test.ShouldBeNil) - for _, d := range initialData { - // Each resource's mocked capture method outputs a different value. - // Assert that we see the expected data captured by the initial arm1 resource. - test.That( - t, - d.GetStruct().GetFields()["pose"].GetStructValue().GetFields()["x"].GetNumberValue(), - test.ShouldEqual, - float64(0), - ) + if pose["x"].GetNumberValue() == 888 && pose["y"].GetNumberValue() == 888 && pose["z"].GetNumberValue() == 888 { + captureDataHasNonZeroReadings = true + } + } } - // Assert that the initial arm1 resource isn't capturing any more data. - test.That(t, len(initialData), test.ShouldEqual, len(dataBeforeSwitch)) - newData, err := data.SensorDataFromCaptureFilePath(filePaths[1]) - test.That(t, err, test.ShouldBeNil) - for _, d := range newData { - // Assert that we see the expected data captured by the updated arm1 resource. - test.That( - t, - d.GetStruct().GetFields()["pose"].GetStructValue().GetFields()["x"].GetNumberValue(), - test.ShouldEqual, - float64(888), - ) - } - // Assert that the updated arm1 resource is capturing data. - test.That(t, len(newData), test.ShouldBeGreaterThan, 0) + // Assert that both the sensor data from the first instance of `arm1` was captured as well as data from the second instance + test.That(t, captureDataHasZeroReadings, test.ShouldBeTrue) + test.That(t, captureDataHasNonZeroReadings, test.ShouldBeTrue) } func getSensorData(dir string) ([]*v1.SensorData, error) { var sd []*v1.SensorData filePaths := getAllFilePaths(dir) for _, path := range filePaths { + if filepath.Ext(path) == data.InProgressCaptureFileExt { + continue + } d, err := data.SensorDataFromCaptureFilePath(path) - // It's possible a file was closed (and so its extension changed) in between the points where we gathered - // file names and here. So if the file does not exist, check if the extension has just been changed. - if errors.Is(err, os.ErrNotExist) { - path = strings.TrimSuffix(path, filepath.Ext(path)) + data.CompletedCaptureFileExt - d, err = data.SensorDataFromCaptureFilePath(path) - if err != nil { - return nil, err - } - } else if err != nil { + if err != nil { return nil, err } - sd = append(sd, d...) } return sd, nil @@ -327,16 +323,16 @@ func waitForCaptureFilesToExceedNFiles(captureDir string, n int, logger logging. start := time.Now() for { files := getAllFileInfos(captureDir) - nonEmptyFiles := 0 + captureFiles := 0 for idx := range files { - if files[idx].Size() > int64(emptyFileBytesSize) { + if files[idx].Size() > int64(emptyFileBytesSize) && filepath.Ext(files[idx].Name()) == data.CompletedCaptureFileExt { // Every datamanager file has at least 90 bytes of metadata. Wait for that to be // observed before considering the file as "existing". - nonEmptyFiles++ + captureFiles++ } // We have N+1 files. No need to count any more. - if nonEmptyFiles > n { + if captureFiles > n { return } } diff --git a/services/datamanager/builtin/builtin_sync_test.go b/services/datamanager/builtin/builtin_sync_test.go index 78950c94fb6..844b8772095 100644 --- a/services/datamanager/builtin/builtin_sync_test.go +++ b/services/datamanager/builtin/builtin_sync_test.go @@ -168,6 +168,9 @@ func TestSyncEnabled(t *testing.T) { c.ScheduledSyncDisabled = tc.syncStartDisabled c.CaptureDir = tmpDir c.SyncIntervalMins = syncIntervalMins + // MaximumCaptureFileSizeBytes is set to 1 so that each reading becomes its own capture file + // and we can confidently read the capture file without it's contents being modified by the collector + c.MaximumCaptureFileSizeBytes = 1 b, err := New(context.Background(), deps, config, dataSyncServiceClientConstructor, tc.connStateConstructor, logger) test.That(t, err, test.ShouldBeNil) @@ -771,6 +774,9 @@ func TestStreamingDCUpload(t *testing.T) { c.ScheduledSyncDisabled = true c.SyncIntervalMins = syncIntervalMins c.CaptureDir = tmpDir + // MaximumCaptureFileSizeBytes is set to 1 so that each reading becomes its own capture file + // and we can confidently read the capture file without it's contents being modified by the collector + c.MaximumCaptureFileSizeBytes = 1 b, err := New(context.Background(), deps, config, datasync.NoOpCloudClientConstructor, ConnToConnectivityStateReady, logger) test.That(t, err, test.ShouldBeNil) diff --git a/services/datamanager/builtin/builtin_test.go b/services/datamanager/builtin/builtin_test.go index 14bc6210043..8540e76f7eb 100644 --- a/services/datamanager/builtin/builtin_test.go +++ b/services/datamanager/builtin/builtin_test.go @@ -266,6 +266,9 @@ func TestFileDeletion(t *testing.T) { // create sync clock so we can control when a single iteration of file deltion happens c := config.ConvertedAttributes.(*Config) c.CaptureDir = tempDir + // MaximumCaptureFileSizeBytes is set to 1 so that each reading becomes its own capture file + // and we can confidently read the capture file without it's contents being modified by the collector + c.MaximumCaptureFileSizeBytes = 1 bSvc, err := New(ctx, deps, config, datasync.NoOpCloudClientConstructor, connToConnectivityStateError, logger) test.That(t, err, test.ShouldBeNil) b := bSvc.(*builtIn) @@ -839,7 +842,7 @@ func getAllFiles(dir string) ([]os.FileInfo, []string) { func waitForCaptureFilesToEqualNFiles(ctx context.Context, captureDir string, n int, logger logging.Logger) error { var diagnostics sync.Once start := time.Now() - nonEmptyFiles := 0 + captureFiles := 0 files := []fs.FileInfo{} i := 0 for { @@ -848,20 +851,20 @@ func waitForCaptureFilesToEqualNFiles(ctx context.Context, captureDir string, n for _, f := range files { fNames = append(fNames, f.Name()) } - logger.Errorf("target: %d, iterations: %d, nonEmptyFiles: %d, files: %v", n, i, nonEmptyFiles, fNames) + logger.Errorf("target: %d, iterations: %d, captureFiles: %d, files: %v", n, i, captureFiles, fNames) return err } files = getAllFileInfos(captureDir) - nonEmptyFiles = 0 + captureFiles = 0 for idx := range files { - if files[idx].Size() > int64(emptyFileBytesSize) { + if files[idx].Size() > int64(emptyFileBytesSize) && filepath.Ext(files[idx].Name()) == data.CompletedCaptureFileExt { // Every datamanager file has at least 90 bytes of metadata. Wait for that to be // observed before considering the file as "existing". - nonEmptyFiles++ + captureFiles++ } } - if nonEmptyFiles == n { + if captureFiles == n { return nil }