diff --git a/archiver/worker.go b/archiver/worker.go index a02abde756..4c4046a4a1 100644 --- a/archiver/worker.go +++ b/archiver/worker.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "path" + "path/filepath" "time" "github.com/google/uuid" @@ -155,13 +156,20 @@ func (w *worker) uploadJobs(ctx context.Context, jobs []*jobsdb.JobT) (string, e lastJobCreatedAt := jobs[len(jobs)-1].CreatedAt.UTC() workspaceID := jobs[0].WorkspaceId - gzWriter := fileuploader.NewGzMultiFileWriter() filePath := path.Join( lo.Must(misc.CreateTMPDIR()), "rudder-backups", w.sourceID, fmt.Sprintf("%d_%d_%s_%s.json.gz", firstJobCreatedAt.Unix(), lastJobCreatedAt.Unix(), workspaceID, uuid.NewString()), ) + if err := os.MkdirAll(filepath.Dir(filePath), os.ModePerm); err != nil { + return "", fmt.Errorf("creating gz file %q: mkdir error: %w", filePath, err) + } + gzWriter, err := misc.CreateGZ(filePath) + if err != nil { + return "", fmt.Errorf("create gz writer: %w", err) + } + defer func() { _ = os.Remove(filePath) }() for _, job := range jobs { j, err := marshalJob(job) @@ -169,7 +177,7 @@ func (w *worker) uploadJobs(ctx context.Context, jobs []*jobsdb.JobT) (string, e _ = gzWriter.Close() return "", fmt.Errorf("marshal job: %w", err) } - if _, err := gzWriter.Write(filePath, append(j, '\n')); err != nil { + if _, err := gzWriter.Write(append(j, '\n')); err != nil { _ = gzWriter.Close() return "", fmt.Errorf("write to file: %w", err) } @@ -177,7 +185,6 @@ func (w *worker) uploadJobs(ctx context.Context, jobs []*jobsdb.JobT) (string, e if err := gzWriter.Close(); err != nil { return "", fmt.Errorf("close writer: %w", err) } - defer func() { _ = os.Remove(filePath) }() fileUploader, err := w.storageProvider.GetFileManager(w.lifecycle.ctx, workspaceID) if err != nil { diff --git a/cmd/backupfilemigrator/file_migrator.go b/cmd/backupfilemigrator/file_migrator.go index 209bd63fcc..a2d808e96c 100644 --- a/cmd/backupfilemigrator/file_migrator.go +++ b/cmd/backupfilemigrator/file_migrator.go @@ -25,7 +25,6 @@ import ( "github.com/samber/lo" - "github.com/rudderlabs/rudder-server/services/fileuploader" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/tidwall/gjson" @@ -129,29 +128,35 @@ func (m *fileMigrator) uploadFile(ctx context.Context, jobs []*newFileFormat, so firstJobCreatedAt := jobs[0].CreatedAt.UTC() lastJobCreatedAt := jobs[len(jobs)-1].CreatedAt.UTC() - gzWriter := fileuploader.NewGzMultiFileWriter() localFilePath := path.Join( lo.Must(misc.CreateTMPDIR()), "rudder-backups", sourceId, fmt.Sprintf("%d_%d_%s.json.gz", firstJobCreatedAt.Unix(), lastJobCreatedAt.Unix(), workspaceId), ) + if err := os.MkdirAll(filepath.Dir(localFilePath), os.ModePerm); err != nil { + return fmt.Errorf("creating gz file %q: mkdir error: %w", localFilePath, err) + } + writer, err := misc.CreateGZ(localFilePath) + if err != nil { + return fmt.Errorf("failed to create gz writer: %w", err) + } + defer func() { _ = os.Remove(localFilePath) }() for _, job := range jobs { jobBytes, err := json.Marshal(job) if err != nil { - _ = gzWriter.Close() + _ = writer.Close() return fmt.Errorf("failed to marshal job: %w", err) } - if _, err := gzWriter.Write(localFilePath, append(jobBytes, '\n')); err != nil { - _ = gzWriter.Close() + if _, err := writer.Write(append(jobBytes, '\n')); err != nil { + _ = writer.Close() return fmt.Errorf("write to local file failed: %w", err) } } - if err := gzWriter.Close(); err != nil { + if err := writer.Close(); err != nil { return fmt.Errorf("failed to close writer: %w", err) } - defer func() { _ = os.Remove(localFilePath) }() localFile, err := os.Open(localFilePath) if err != nil { diff --git a/processor/stash/stash.go b/processor/stash/stash.go index 68d700d9f2..e0d533b46f 100644 --- a/processor/stash/stash.go +++ b/processor/stash/stash.go @@ -3,8 +3,10 @@ package stash import ( "context" "encoding/json" + "errors" "fmt" "os" + "path/filepath" "sync" "time" @@ -17,6 +19,8 @@ import ( "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" + kitsync "github.com/rudderlabs/rudder-go-kit/sync" + obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" "github.com/rudderlabs/rudder-server/jobsdb" "github.com/rudderlabs/rudder-server/services/fileuploader" "github.com/rudderlabs/rudder-server/services/transientsource" @@ -144,7 +148,7 @@ func (st *HandleT) storeErrorsToObjectStorage(jobs []*jobsdb.JobT) (errorJob []E localTmpDirName := "/rudder-processor-errors/" uuid := uuid.New().String() - st.logger.Debug("[Processor: storeErrorsToObjectStorage]: Starting logging to object storage") + st.logger.Debugn("[Processor: storeErrorsToObjectStorage]: Starting logging to object storage") tmpDirPath, err := misc.CreateTMPDIR() if err != nil { @@ -154,8 +158,7 @@ func (st *HandleT) storeErrorsToObjectStorage(jobs []*jobsdb.JobT) (errorJob []E jobsPerWorkspace := lo.GroupBy(jobs, func(job *jobsdb.JobT) string { return job.WorkspaceId }) - gzWriter := fileuploader.NewGzMultiFileWriter() - dumps := make(map[string]string) + writerMap := make(map[string]string) errorJobs := make([]ErrorJob, 0) @@ -163,7 +166,7 @@ func (st *HandleT) storeErrorsToObjectStorage(jobs []*jobsdb.JobT) (errorJob []E for workspaceID, jobsForWorkspace := range jobsPerWorkspace { preferences, err := st.fileuploader.GetStoragePreferences(ctx, workspaceID) if err != nil { - st.logger.Errorf("Skipping Storing errors for workspace: %s since no storage preferences are found", workspaceID) + st.logger.Errorn("Skipping Storing errors for workspace since no storage preferences are found", obskit.WorkspaceID(workspaceID), obskit.Error(err)) errorJobs = append(errorJobs, ErrorJob{ jobs: jobsForWorkspace, errorOutput: StoreErrorOutputT{ @@ -174,7 +177,7 @@ func (st *HandleT) storeErrorsToObjectStorage(jobs []*jobsdb.JobT) (errorJob []E continue } if !preferences.ProcErrors { - st.logger.Infof("Skipping Storing errors for workspace: %s since ProcErrors is set to false", workspaceID) + st.logger.Infon("Skipping Storing errors for workspace since ProcErrors is set to false", obskit.WorkspaceID(workspaceID)) errorJobs = append(errorJobs, ErrorJob{ jobs: jobsForWorkspace, errorOutput: StoreErrorOutputT{ @@ -184,48 +187,64 @@ func (st *HandleT) storeErrorsToObjectStorage(jobs []*jobsdb.JobT) (errorJob []E }) continue } - path := fmt.Sprintf("%v%v.json.gz", tmpDirPath+localTmpDirName, fmt.Sprintf("%v.%v.%v.%v.%v", time.Now().Unix(), config.GetString("INSTANCE_ID", "1"), fmt.Sprintf("%v-%v", jobs[0].JobID, jobs[len(jobs)-1].JobID), uuid, workspaceID)) - dumps[workspaceID] = path + path := filepath.Join( + tmpDirPath, + localTmpDirName, + fmt.Sprintf( + "%v.%v.%v.%v.%v.json.gz", + time.Now().Unix(), + config.GetString("INSTANCE_ID", "1"), + fmt.Sprintf("%v-%v", jobs[0].JobID, jobs[len(jobs)-1].JobID), + uuid, + workspaceID, + ), + ) + if err := os.MkdirAll(filepath.Dir(path), os.ModePerm); err != nil { + panic(fmt.Errorf("creating gz file %q: mkdir error: %w", path, err)) + } + writer, err := misc.CreateGZ(path) + if err != nil { + panic(err) + } + writerMap[workspaceID] = path newline := []byte("\n") lo.ForEach(jobsForWorkspace, func(job *jobsdb.JobT, _ int) { rawJob, err := json.Marshal(job) if err != nil { panic(err) } - if _, err := gzWriter.Write(path, append(rawJob, newline...)); err != nil { + if _, err := writer.Write(append(rawJob, newline...)); err != nil { + _ = writer.Close() panic(err) } }) + if err := writer.Close(); err != nil { + panic(err) + } } - err = gzWriter.Close() - if err != nil { - panic(err) - } defer func() { - for _, path := range dumps { - os.Remove(path) + for _, path := range writerMap { + _ = os.Remove(path) } }() - g, ctx := errgroup.WithContext(ctx) - g.SetLimit(config.GetInt("Processor.errorBackupWorkers", 100)) + g, ctx := kitsync.NewEagerGroup(ctx, config.GetInt("Processor.errorBackupWorkers", 100)) var mu sync.Mutex - for workspaceID, filePath := range dumps { - wrkId := workspaceID - path := filePath - errFileUploader, err := st.fileuploader.GetFileManager(ctx, wrkId) + for workspaceID, path := range writerMap { + errFileUploader, err := st.fileuploader.GetFileManager(ctx, workspaceID) if err != nil { - st.logger.Errorf("Skipping Storing errors for workspace: %s since no file manager is found", workspaceID) - mu.Lock() - errorJobs = append(errorJobs, ErrorJob{ - jobs: jobsPerWorkspace[workspaceID], - errorOutput: StoreErrorOutputT{ - Location: "", - Error: err, - }, - }) - mu.Unlock() + st.logger.Errorn("Skipping Storing errors for workspace since no file manager is found", obskit.WorkspaceID(workspaceID), obskit.Error(err)) + if !errors.Is(err, fileuploader.ErrNotSubscribed) { + mu.Lock() + errorJobs = append(errorJobs, ErrorJob{ + jobs: jobsPerWorkspace[workspaceID], + errorOutput: StoreErrorOutputT{ + Error: err, + }, + }) + mu.Unlock() + } continue } g.Go(crash.Wrapper(func() error { @@ -235,10 +254,13 @@ func (st *HandleT) storeErrorsToObjectStorage(jobs []*jobsdb.JobT) (errorJob []E } prefixes := []string{"rudder-proc-err-logs", time.Now().Format("01-02-2006")} uploadOutput, err := errFileUploader.Upload(ctx, outputFile, prefixes...) - st.logger.Infof("Uploaded error logs to %s for workspaceId %s", uploadOutput.Location, wrkId) + st.logger.Infon("Uploaded error logs for workspaceId", + logger.NewStringField("location", uploadOutput.Location), + obskit.WorkspaceID(workspaceID), + ) mu.Lock() errorJobs = append(errorJobs, ErrorJob{ - jobs: jobsPerWorkspace[wrkId], + jobs: jobsPerWorkspace[workspaceID], errorOutput: StoreErrorOutputT{ Location: uploadOutput.Location, Error: err, diff --git a/router/batchrouter/handle.go b/router/batchrouter/handle.go index 5cc9996e22..3b55638667 100644 --- a/router/batchrouter/handle.go +++ b/router/batchrouter/handle.go @@ -274,9 +274,17 @@ func (brt *Handle) upload(provider string, batchJobs *BatchedJobs, isWarehouse b if err != nil { panic(err) } - path := fmt.Sprintf("%v%v.json", tmpDirPath+localTmpDirName, fmt.Sprintf("%v.%v.%v", time.Now().Unix(), batchJobs.Connection.Source.ID, uuid)) + gzipFilePath := filepath.Join( + tmpDirPath, + localTmpDirName, + fmt.Sprintf( + "%v.%v.%v.json.gz", + time.Now().Unix(), + batchJobs.Connection.Source.ID, + uuid, + ), + ) - gzipFilePath := fmt.Sprintf(`%v.gz`, path) err = os.MkdirAll(filepath.Dir(gzipFilePath), os.ModePerm) if err != nil { panic(err) diff --git a/services/fileuploader/gzwriter.go b/services/fileuploader/gzwriter.go deleted file mode 100644 index bf6e5d743b..0000000000 --- a/services/fileuploader/gzwriter.go +++ /dev/null @@ -1,62 +0,0 @@ -package fileuploader - -import ( - "fmt" - "os" - "path/filepath" - - "github.com/rudderlabs/rudder-server/utils/misc" -) - -// MultiFileWriter can write to multiple paths at the same time. -type MultiFileWriter interface { - // Write writes the given data to the file at the given path. - Write(path string, data []byte) (int, error) - // Close closes all open files. - Close() error - - // Count returns the number of open files. - Count() int -} - -type gzFileHandler struct { - // gzWriters is a map of path to GZipWriter. - gzWriters map[string]misc.GZipWriter -} - -// NewGzMultiFileWriter creates a new MultiFileWriter that writes to multiple gz-compressed files. -func NewGzMultiFileWriter() MultiFileWriter { - return &gzFileHandler{ - gzWriters: make(map[string]misc.GZipWriter), - } -} - -func (g *gzFileHandler) Write(path string, data []byte) (count int, err error) { - if _, ok := g.gzWriters[path]; !ok { - - err := os.MkdirAll(filepath.Dir(path), os.ModePerm) - if err != nil { - return 0, fmt.Errorf("creating gz file %q: mkdir error: %w", path, err) - } - - g.gzWriters[path], err = misc.CreateGZ(path) - if err != nil { - return 0, err - } - } - return g.gzWriters[path].Write(data) -} - -func (g *gzFileHandler) Close() error { - for path, writer := range g.gzWriters { - if err := writer.CloseGZ(); err != nil { - return fmt.Errorf("closing gz file %q: %w", path, err) - } - } - g.gzWriters = make(map[string]misc.GZipWriter) - return nil -} - -func (g *gzFileHandler) Count() int { - return len(g.gzWriters) -} diff --git a/services/fileuploader/gzwriter_test.go b/services/fileuploader/gzwriter_test.go deleted file mode 100644 index 4b8caf139a..0000000000 --- a/services/fileuploader/gzwriter_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package fileuploader - -import ( - "os" - "testing" - - . "github.com/onsi/gomega" -) - -func TestGzWriter(t *testing.T) { - RegisterTestingT(t) - gzWriter := NewGzMultiFileWriter() - err := os.MkdirAll("./temp", os.ModePerm) - Expect(err).To(BeNil()) - - _, err = gzWriter.Write("./temp/test", []byte("test1")) - Expect(err).To(BeNil()) - Expect(gzWriter.Count()).To(Equal(1)) - - _, err = gzWriter.Write("./temp/test", []byte("test2")) - Expect(err).To(BeNil()) - Expect(gzWriter.Count()).To(Equal(1)) - - _, err = gzWriter.Write("./temp/make", []byte("test3")) - Expect(err).To(BeNil()) - Expect(gzWriter.Count()).To(Equal(2)) - - err = gzWriter.Close() - Expect(err).To(BeNil()) - - Expect(gzWriter.Count()).To(Equal(0)) - os.RemoveAll("./temp/") -}