Skip to content

Commit

Permalink
fix: close init after assigning storage settings (#2678)
Browse files Browse the repository at this point in the history
  • Loading branch information
BonapartePC authored Nov 9, 2022
1 parent 8ddfb8e commit 4986595
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 18 deletions.
38 changes: 31 additions & 7 deletions processor/stash/stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,30 @@ func (st *HandleT) storeErrorsToObjectStorage(jobs []*jobsdb.JobT) (errorJob []E
gzWriter := fileuploader.NewGzMultiFileWriter()
dumps := make(map[string]string)

errorJobs := make([]ErrorJob, 0)

for workspaceID, jobsForWorkspace := range jobsPerWorkspace {
preferences, err := st.fileuploader.GetStoragePreferences(workspaceID)
if err != nil {
panic(err)
st.logger.Errorf("Skipping Storing errors for workspace: %s since no storage preferences are found", workspaceID)
errorJobs = append(errorJobs, ErrorJob{
jobs: jobsForWorkspace,
errorOutput: StoreErrorOutputT{
Location: "",
Error: err,
},
})
continue
}
if !preferences.ProcErrors {
st.logger.Infof("Skipping Storing errors for workspace: %s since ProcErrors is set to false", workspaceID)
errorJobs = append(errorJobs, ErrorJob{
jobs: jobsForWorkspace,
errorOutput: StoreErrorOutputT{
Location: "",
Error: nil,
},
})
continue
}
path := fmt.Sprintf("%v%v.json.gz", tmpDirPath+localTmpDirName, fmt.Sprintf("%v.%v.%v.%v.%v", time.Now().Unix(), config.GetString("INSTANCE_ID", "1"), fmt.Sprintf("%v-%v", jobs[0].JobID, jobs[len(jobs)-1].JobID), uuid, workspaceID))
Expand All @@ -188,23 +206,29 @@ func (st *HandleT) storeErrorsToObjectStorage(jobs []*jobsdb.JobT) (errorJob []E
}
}()

errorJobs := make([]ErrorJob, 0)

g, _ := errgroup.WithContext(context.Background())
g.SetLimit(config.GetInt("Processor.errorBackupWorkers", 100))
for workspaceID, filePath := range dumps {
wrkId := workspaceID
path := filePath
errFileUploader, err := st.fileuploader.GetFileManager(wrkId)
if err != nil {
st.logger.Errorf("Skipping Storing errors for workspace: %s since no file manager is found", workspaceID)
errorJobs = append(errorJobs, ErrorJob{
jobs: jobsPerWorkspace[workspaceID],
errorOutput: StoreErrorOutputT{
Location: "",
Error: err,
},
})
continue
}
g.Go(misc.WithBugsnag(func() error {
outputFile, err := os.Open(path)
if err != nil {
panic(err)
}
prefixes := []string{"rudder-proc-err-logs", time.Now().Format("01-02-2006")}
errFileUploader, err := st.fileuploader.GetFileManager(wrkId)
if err != nil {
panic(err)
}
uploadOutput, err := errFileUploader.Upload(context.TODO(), outputFile, prefixes...)
errorJobs = append(errorJobs, ErrorJob{
jobs: jobsPerWorkspace[workspaceID],
Expand Down
31 changes: 21 additions & 10 deletions processor/stash/stash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

func TestStoreErrorsToObjectStorage(t *testing.T) {
tmpDir := t.TempDir()
uniqueWorkspaces := 3
uniqueWorkspaces := 4

t.Setenv("RUDDER_TMPDIR", tmpDir)

Expand Down Expand Up @@ -90,11 +90,11 @@ func TestStoreErrorsToObjectStorage(t *testing.T) {
Bucket: backendconfig.StorageBucket{
Type: "MINIO",
Config: map[string]interface{}{
"bucketName": minioResource[2].BucketName,
"bucketName": minioResource[3].BucketName,
"prefix": prefix,
"endPoint": minioResource[2].Endpoint,
"accessKeyID": minioResource[2].AccessKey,
"secretAccessKey": minioResource[2].SecretKey,
"endPoint": minioResource[3].Endpoint,
"accessKeyID": minioResource[3].AccessKey,
"secretAccessKey": minioResource[3].SecretKey,
},
},
Preferences: backendconfig.StoragePreferences{
Expand Down Expand Up @@ -148,7 +148,9 @@ func TestStoreErrorsToObjectStorage(t *testing.T) {
var file []*filemanager.FileObject
require.Eventually(t, func() bool {
file, err = fm.ListFilesWithPrefix(context.Background(), "", "", 5)

if !storageSettings[workspace].Preferences.ProcErrors {
return true
}
if len(file) != 1 {
t.Log("file list: ", file, " err: ", err, "len: ", len(file))
fm, err = fileuploaderProvider.GetFileManager(workspace)
Expand All @@ -158,15 +160,24 @@ func TestStoreErrorsToObjectStorage(t *testing.T) {
return true
}, 20*time.Second, 1*time.Second, "no backup files found in backup store: ", err)

f := downloadFile(t, fm, file[0].Key, cleanup)
jobsFromFile, err := readGzipJobFile(f.Name())
if storageSettings[workspace].Preferences.ProcErrors {
f := downloadFile(t, fm, file[0].Key, cleanup)
jobsFromFile, err := readGzipJobFile(f.Name())
require.NoError(t, err)
require.NotZero(t, jobsCount[workspace], "jobsCount for workspace: ", workspace, " is zero")
require.Equal(t, jobsCount[workspace], len(jobsFromFile))
} else {
require.Zero(t, jobsCount[workspace], "jobsCount for workspace: ", workspace, " is not zero")
}
}

jobsToFail := []*jobsdb.JobT{
{
WorkspaceId: "defaultWorkspaceID-5",
},
}

errJobs = st.storeErrorsToObjectStorage(jobsToFail)
require.Equal(t, 1, len(errJobs))
require.Equal(t, errJobs[0].errorOutput.Error.Error(), "no storage settings found for workspace: defaultWorkspaceID-5")
}

func countJobsByWorkspace(jobs []*jobsdb.JobT) map[string]int {
Expand Down
2 changes: 1 addition & 1 deletion services/fileuploader/fileuploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ func (p *provider) updateLoop(ctx context.Context, backendConfig backendconfig.B
Preferences: preferences,
}
}
p.storageSettings = settings
p.onceInit.Do(func() {
close(p.init)
})
p.storageSettings = settings
}

p.onceInit.Do(func() {
Expand Down

0 comments on commit 4986595

Please sign in to comment.