Skip to content

Commit

Permalink
fixup! Update processor/stash/stash.go
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Aug 27, 2024
1 parent 44d2e76 commit 8f94a6f
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 13 deletions.
34 changes: 23 additions & 11 deletions processor/stash/stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/transientsource"
Expand Down Expand Up @@ -156,15 +157,15 @@ func (st *HandleT) storeErrorsToObjectStorage(jobs []*jobsdb.JobT) (errorJob []E
jobsPerWorkspace := lo.GroupBy(jobs, func(job *jobsdb.JobT) string {
return job.WorkspaceId
})
writerMap := make(map[string]misc.GZipWriter)
writerMap := make(map[string]string)

errorJobs := make([]ErrorJob, 0)

ctx := context.Background()
for workspaceID, jobsForWorkspace := range jobsPerWorkspace {
preferences, err := st.fileuploader.GetStoragePreferences(ctx, workspaceID)
if err != nil {
st.logger.Errorn("Skipping Storing errors for workspace since no storage preferences are found", logger.NewStringField("workspaceID", workspaceID))
st.logger.Errorn("Skipping Storing errors for workspace since no storage preferences are found", obskit.WorkspaceID(workspaceID))
errorJobs = append(errorJobs, ErrorJob{
jobs: jobsForWorkspace,
errorOutput: StoreErrorOutputT{
Expand All @@ -185,15 +186,26 @@ func (st *HandleT) storeErrorsToObjectStorage(jobs []*jobsdb.JobT) (errorJob []E
})
continue
}
path := fmt.Sprintf("%v%v.json.gz", tmpDirPath+localTmpDirName, fmt.Sprintf("%v.%v.%v.%v.%v", time.Now().Unix(), config.GetString("INSTANCE_ID", "1"), fmt.Sprintf("%v-%v", jobs[0].JobID, jobs[len(jobs)-1].JobID), uuid, workspaceID))
path := filepath.Join(
tmpDirPath,
localTmpDirName,
fmt.Sprintf(
"%v.%v.%v.%v.%v.json.gz",
time.Now().Unix(),
config.GetString("INSTANCE_ID", "1"),
fmt.Sprintf("%v-%v", jobs[0].JobID, jobs[len(jobs)-1].JobID),
uuid,
workspaceID,
),
)
if err := os.MkdirAll(filepath.Dir(path), os.ModePerm); err != nil {
panic(fmt.Errorf("creating gz file %q: mkdir error: %w", path, err))

Check warning on line 202 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L202

Added line #L202 was not covered by tests
}
writer, err := misc.CreateGZ(path)
if err != nil {
panic(err)

Check warning on line 206 in processor/stash/stash.go

View check run for this annotation

Codecov / codecov/patch

processor/stash/stash.go#L206

Added line #L206 was not covered by tests
}
writerMap[workspaceID] = writer
writerMap[workspaceID] = path
newline := []byte("\n")
lo.ForEach(jobsForWorkspace, func(job *jobsdb.JobT, _ int) {
rawJob, err := json.Marshal(job)
Expand All @@ -211,14 +223,14 @@ func (st *HandleT) storeErrorsToObjectStorage(jobs []*jobsdb.JobT) (errorJob []E
}

defer func() {
for _, writer := range writerMap {
_ = os.Remove(writer.File.Name())
for _, path := range writerMap {
_ = os.Remove(path)
}
}()

g, ctx := kitsync.NewEagerGroup(ctx, config.GetInt("Processor.errorBackupWorkers", 100))
var mu sync.Mutex
for workspaceID, writer := range writerMap {
for workspaceID, path := range writerMap {
errFileUploader, err := st.fileuploader.GetFileManager(ctx, workspaceID)
if err != nil {
st.logger.Errorn("Skipping Storing errors for workspace since no file manager is found", logger.NewStringField("workspaceID", workspaceID))
Expand All @@ -236,15 +248,15 @@ func (st *HandleT) storeErrorsToObjectStorage(jobs []*jobsdb.JobT) (errorJob []E
continue
}
g.Go(misc.WithBugsnag(func() error {
outputFile, err := os.Open(writer.File.Name())
outputFile, err := os.Open(path)
if err != nil {
panic(err)
}
prefixes := []string{"rudder-proc-err-logs", time.Now().Format("01-02-2006")}
uploadOutput, err := errFileUploader.Upload(ctx, outputFile, prefixes...)
st.logger.Infon("Uploaded error logs for workspaceId",
logger.NewStringField("location", uploadOutput.Location),
logger.NewStringField("workspaceID", workspaceID)
st.logger.Infon("Uploaded error logs for workspaceId",
logger.NewStringField("location", uploadOutput.Location),
obskit.WorkspaceID(workspaceID),
)
mu.Lock()
errorJobs = append(errorJobs, ErrorJob{
Expand Down
12 changes: 10 additions & 2 deletions router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,17 @@ func (brt *Handle) upload(provider string, batchJobs *BatchedJobs, isWarehouse b
if err != nil {
panic(err)
}
path := fmt.Sprintf("%v%v.json", tmpDirPath+localTmpDirName, fmt.Sprintf("%v.%v.%v", time.Now().Unix(), batchJobs.Connection.Source.ID, uuid))
gzipFilePath := filepath.Join(
tmpDirPath,
localTmpDirName,
fmt.Sprintf(
"%v.%v.%v.json.gz",
time.Now().Unix(),
batchJobs.Connection.Source.ID,
uuid,
),
)

gzipFilePath := fmt.Sprintf(`%v.gz`, path)
err = os.MkdirAll(filepath.Dir(gzipFilePath), os.ModePerm)
if err != nil {
panic(err)
Expand Down

0 comments on commit 8f94a6f

Please sign in to comment.