Skip to content

Commit

Permalink
chore: remove multiGzipWriter (#5016)
Browse files Browse the repository at this point in the history
Co-authored-by: Leonidas Vrachnis
Co-authored-by: Rohith BCS
Co-authored-by: Francesco Casula
  • Loading branch information
Sidddddarth committed Sep 4, 2024
1 parent cf6cb7a commit bbbd07f
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 139 deletions.
13 changes: 10 additions & 3 deletions archiver/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"path"
"path/filepath"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -155,29 +156,35 @@ func (w *worker) uploadJobs(ctx context.Context, jobs []*jobsdb.JobT) (string, e
lastJobCreatedAt := jobs[len(jobs)-1].CreatedAt.UTC()
workspaceID := jobs[0].WorkspaceId

gzWriter := fileuploader.NewGzMultiFileWriter()
filePath := path.Join(
lo.Must(misc.CreateTMPDIR()),
"rudder-backups",
w.sourceID,
fmt.Sprintf("%d_%d_%s_%s.json.gz", firstJobCreatedAt.Unix(), lastJobCreatedAt.Unix(), workspaceID, uuid.NewString()),
)
if err := os.MkdirAll(filepath.Dir(filePath), os.ModePerm); err != nil {
return "", fmt.Errorf("creating gz file %q: mkdir error: %w", filePath, err)
}
gzWriter, err := misc.CreateGZ(filePath)
if err != nil {
return "", fmt.Errorf("create gz writer: %w", err)
}
defer func() { _ = os.Remove(filePath) }()

for _, job := range jobs {
j, err := marshalJob(job)
if err != nil {
_ = gzWriter.Close()
return "", fmt.Errorf("marshal job: %w", err)
}
if _, err := gzWriter.Write(filePath, append(j, '\n')); err != nil {
if _, err := gzWriter.Write(append(j, '\n')); err != nil {
_ = gzWriter.Close()
return "", fmt.Errorf("write to file: %w", err)
}
}
if err := gzWriter.Close(); err != nil {
return "", fmt.Errorf("close writer: %w", err)
}
defer func() { _ = os.Remove(filePath) }()

fileUploader, err := w.storageProvider.GetFileManager(w.lifecycle.ctx, workspaceID)
if err != nil {
Expand Down
19 changes: 12 additions & 7 deletions cmd/backupfilemigrator/file_migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/samber/lo"

"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/utils/misc"

"github.com/tidwall/gjson"
Expand Down Expand Up @@ -129,29 +128,35 @@ func (m *fileMigrator) uploadFile(ctx context.Context, jobs []*newFileFormat, so
firstJobCreatedAt := jobs[0].CreatedAt.UTC()
lastJobCreatedAt := jobs[len(jobs)-1].CreatedAt.UTC()

gzWriter := fileuploader.NewGzMultiFileWriter()
localFilePath := path.Join(
lo.Must(misc.CreateTMPDIR()),
"rudder-backups",
sourceId,
fmt.Sprintf("%d_%d_%s.json.gz", firstJobCreatedAt.Unix(), lastJobCreatedAt.Unix(), workspaceId),
)
if err := os.MkdirAll(filepath.Dir(localFilePath), os.ModePerm); err != nil {
return fmt.Errorf("creating gz file %q: mkdir error: %w", localFilePath, err)
}
writer, err := misc.CreateGZ(localFilePath)
if err != nil {
return fmt.Errorf("failed to create gz writer: %w", err)
}
defer func() { _ = os.Remove(localFilePath) }()

for _, job := range jobs {
jobBytes, err := json.Marshal(job)
if err != nil {
_ = gzWriter.Close()
_ = writer.Close()
return fmt.Errorf("failed to marshal job: %w", err)
}
if _, err := gzWriter.Write(localFilePath, append(jobBytes, '\n')); err != nil {
_ = gzWriter.Close()
if _, err := writer.Write(append(jobBytes, '\n')); err != nil {
_ = writer.Close()
return fmt.Errorf("write to local file failed: %w", err)
}
}
if err := gzWriter.Close(); err != nil {
if err := writer.Close(); err != nil {
return fmt.Errorf("failed to close writer: %w", err)
}
defer func() { _ = os.Remove(localFilePath) }()

localFile, err := os.Open(localFilePath)
if err != nil {
Expand Down
86 changes: 54 additions & 32 deletions processor/stash/stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package stash
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"time"

Expand All @@ -17,6 +19,8 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/transientsource"
Expand Down Expand Up @@ -144,7 +148,7 @@ func (st *HandleT) storeErrorsToObjectStorage(jobs []*jobsdb.JobT) (errorJob []E
localTmpDirName := "/rudder-processor-errors/"

uuid := uuid.New().String()
st.logger.Debug("[Processor: storeErrorsToObjectStorage]: Starting logging to object storage")
st.logger.Debugn("[Processor: storeErrorsToObjectStorage]: Starting logging to object storage")

tmpDirPath, err := misc.CreateTMPDIR()
if err != nil {
Expand All @@ -154,16 +158,15 @@ func (st *HandleT) storeErrorsToObjectStorage(jobs []*jobsdb.JobT) (errorJob []E
jobsPerWorkspace := lo.GroupBy(jobs, func(job *jobsdb.JobT) string {
return job.WorkspaceId
})
gzWriter := fileuploader.NewGzMultiFileWriter()
dumps := make(map[string]string)
writerMap := make(map[string]string)

errorJobs := make([]ErrorJob, 0)

ctx := context.Background()
for workspaceID, jobsForWorkspace := range jobsPerWorkspace {
preferences, err := st.fileuploader.GetStoragePreferences(ctx, workspaceID)
if err != nil {
st.logger.Errorf("Skipping Storing errors for workspace: %s since no storage preferences are found", workspaceID)
st.logger.Errorn("Skipping Storing errors for workspace since no storage preferences are found", obskit.WorkspaceID(workspaceID), obskit.Error(err))
errorJobs = append(errorJobs, ErrorJob{
jobs: jobsForWorkspace,
errorOutput: StoreErrorOutputT{
Expand All @@ -174,7 +177,7 @@ func (st *HandleT) storeErrorsToObjectStorage(jobs []*jobsdb.JobT) (errorJob []E
continue
}
if !preferences.ProcErrors {
st.logger.Infof("Skipping Storing errors for workspace: %s since ProcErrors is set to false", workspaceID)
st.logger.Infon("Skipping Storing errors for workspace since ProcErrors is set to false", obskit.WorkspaceID(workspaceID))
errorJobs = append(errorJobs, ErrorJob{
jobs: jobsForWorkspace,
errorOutput: StoreErrorOutputT{
Expand All @@ -184,48 +187,64 @@ func (st *HandleT) storeErrorsToObjectStorage(jobs []*jobsdb.JobT) (errorJob []E
})
continue
}
path := fmt.Sprintf("%v%v.json.gz", tmpDirPath+localTmpDirName, fmt.Sprintf("%v.%v.%v.%v.%v", time.Now().Unix(), config.GetString("INSTANCE_ID", "1"), fmt.Sprintf("%v-%v", jobs[0].JobID, jobs[len(jobs)-1].JobID), uuid, workspaceID))
dumps[workspaceID] = path
path := filepath.Join(
tmpDirPath,
localTmpDirName,
fmt.Sprintf(
"%v.%v.%v.%v.%v.json.gz",
time.Now().Unix(),
config.GetString("INSTANCE_ID", "1"),
fmt.Sprintf("%v-%v", jobs[0].JobID, jobs[len(jobs)-1].JobID),
uuid,
workspaceID,
),
)
if err := os.MkdirAll(filepath.Dir(path), os.ModePerm); err != nil {
panic(fmt.Errorf("creating gz file %q: mkdir error: %w", path, err))
}
writer, err := misc.CreateGZ(path)
if err != nil {
panic(err)
}
writerMap[workspaceID] = path
newline := []byte("\n")
lo.ForEach(jobsForWorkspace, func(job *jobsdb.JobT, _ int) {
rawJob, err := json.Marshal(job)
if err != nil {
panic(err)
}
if _, err := gzWriter.Write(path, append(rawJob, newline...)); err != nil {
if _, err := writer.Write(append(rawJob, newline...)); err != nil {
_ = writer.Close()
panic(err)
}
})
if err := writer.Close(); err != nil {
panic(err)
}
}

err = gzWriter.Close()
if err != nil {
panic(err)
}
defer func() {
for _, path := range dumps {
os.Remove(path)
for _, path := range writerMap {
_ = os.Remove(path)
}
}()

g, ctx := errgroup.WithContext(ctx)
g.SetLimit(config.GetInt("Processor.errorBackupWorkers", 100))
g, ctx := kitsync.NewEagerGroup(ctx, config.GetInt("Processor.errorBackupWorkers", 100))
var mu sync.Mutex
for workspaceID, filePath := range dumps {
wrkId := workspaceID
path := filePath
errFileUploader, err := st.fileuploader.GetFileManager(ctx, wrkId)
for workspaceID, path := range writerMap {
errFileUploader, err := st.fileuploader.GetFileManager(ctx, workspaceID)
if err != nil {
st.logger.Errorf("Skipping Storing errors for workspace: %s since no file manager is found", workspaceID)
mu.Lock()
errorJobs = append(errorJobs, ErrorJob{
jobs: jobsPerWorkspace[workspaceID],
errorOutput: StoreErrorOutputT{
Location: "",
Error: err,
},
})
mu.Unlock()
st.logger.Errorn("Skipping Storing errors for workspace since no file manager is found", obskit.WorkspaceID(workspaceID), obskit.Error(err))
if !errors.Is(err, fileuploader.ErrNotSubscribed) {
mu.Lock()
errorJobs = append(errorJobs, ErrorJob{
jobs: jobsPerWorkspace[workspaceID],
errorOutput: StoreErrorOutputT{
Error: err,
},
})
mu.Unlock()
}
continue
}
g.Go(crash.Wrapper(func() error {
Expand All @@ -235,10 +254,13 @@ func (st *HandleT) storeErrorsToObjectStorage(jobs []*jobsdb.JobT) (errorJob []E
}
prefixes := []string{"rudder-proc-err-logs", time.Now().Format("01-02-2006")}
uploadOutput, err := errFileUploader.Upload(ctx, outputFile, prefixes...)
st.logger.Infof("Uploaded error logs to %s for workspaceId %s", uploadOutput.Location, wrkId)
st.logger.Infon("Uploaded error logs for workspaceId",
logger.NewStringField("location", uploadOutput.Location),
obskit.WorkspaceID(workspaceID),
)
mu.Lock()
errorJobs = append(errorJobs, ErrorJob{
jobs: jobsPerWorkspace[wrkId],
jobs: jobsPerWorkspace[workspaceID],
errorOutput: StoreErrorOutputT{
Location: uploadOutput.Location,
Error: err,
Expand Down
12 changes: 10 additions & 2 deletions router/batchrouter/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,17 @@ func (brt *Handle) upload(provider string, batchJobs *BatchedJobs, isWarehouse b
if err != nil {
panic(err)
}
path := fmt.Sprintf("%v%v.json", tmpDirPath+localTmpDirName, fmt.Sprintf("%v.%v.%v", time.Now().Unix(), batchJobs.Connection.Source.ID, uuid))
gzipFilePath := filepath.Join(
tmpDirPath,
localTmpDirName,
fmt.Sprintf(
"%v.%v.%v.json.gz",
time.Now().Unix(),
batchJobs.Connection.Source.ID,
uuid,
),
)

gzipFilePath := fmt.Sprintf(`%v.gz`, path)
err = os.MkdirAll(filepath.Dir(gzipFilePath), os.ModePerm)
if err != nil {
panic(err)
Expand Down
62 changes: 0 additions & 62 deletions services/fileuploader/gzwriter.go

This file was deleted.

33 changes: 0 additions & 33 deletions services/fileuploader/gzwriter_test.go

This file was deleted.

0 comments on commit bbbd07f

Please sign in to comment.