From bb76d1ea499d618e9f5ddcd32805904148c4a0a3 Mon Sep 17 00:00:00 2001 From: GalvinGao Date: Tue, 21 Nov 2023 13:58:52 -0800 Subject: [PATCH 1/7] chore: improve log --- internal/controller/meta/admin.go | 6 +++++ internal/pkg/archiver/archiver.go | 36 +++++++++++++++++++------ internal/repo/drop_report.go | 2 +- internal/service/drop_report_archive.go | 2 ++ 4 files changed, 37 insertions(+), 9 deletions(-) diff --git a/internal/controller/meta/admin.go b/internal/controller/meta/admin.go index 522b2c13..08720c04 100644 --- a/internal/controller/meta/admin.go +++ b/internal/controller/meta/admin.go @@ -26,6 +26,7 @@ import ( "exusiai.dev/backend-next/internal/model/cache" "exusiai.dev/backend-next/internal/model/gamedata" "exusiai.dev/backend-next/internal/model/types" + "exusiai.dev/backend-next/internal/pkg/flog" "exusiai.dev/backend-next/internal/pkg/pgerr" "exusiai.dev/backend-next/internal/repo" "exusiai.dev/backend-next/internal/server/svr" @@ -721,6 +722,11 @@ func (c *AdminController) ArchiveDropReports(ctx *fiber.Ctx) error { err = c.DropReportArchiveService.ArchiveByDate(ctx.UserContext(), date) if err != nil { + flog.ErrorFrom(ctx, "archive.drop_report"). + Err(err). + Time("targetDay", date). + Msg("failed to archive drop report") + return err } return ctx.SendStatus(fiber.StatusOK) diff --git a/internal/pkg/archiver/archiver.go b/internal/pkg/archiver/archiver.go index a4ba09a7..207dcf23 100644 --- a/internal/pkg/archiver/archiver.go +++ b/internal/pkg/archiver/archiver.go @@ -62,19 +62,29 @@ func (a *Archiver) canonicalFilePath() string { func (a *Archiver) Prepare(ctx context.Context, date time.Time) error { a.initLogger() - a.logger.Info().Str("date", date.Format("2006-01-02")).Msg("preparing archiver") + a.logger.Info(). + Str("evt.name", "archiver.prepare"). + Str("date", date.Format("2006-01-02")). + Msg("preparing archiver") + a.date = date a.writerCh = make(chan interface{}, ArchiverChanBufferSize) if err := a.assertS3FileNonExistence(ctx); err != nil { return errors.Wrap(err, "failed to assertFileNonExistence") } - a.logger.Trace().Msg("asserted S3 file non-existence") + a.logger.Debug(). + Str("evt.name", "archiver.prepare.assertFileNonExistence"). + Str("canonicalFilePath", a.canonicalFilePath()). + Msg("asserted S3 file non-existence") if err := a.createLocalTempDir(); err != nil { return errors.Wrap(err, "failed to createLocalTempDir") } - a.logger.Trace().Str("localTempDir", a.localTempDir).Msg("created local temp dir") + a.logger.Debug(). + Str("evt.name", "archiver.prepare.createLocalTempDir"). + Str("localTempDir", a.localTempDir). + Msg("created local temp dir") return nil } @@ -132,12 +142,16 @@ func (a *Archiver) Collect(ctx context.Context) error { if err := a.archiveToLocalFile(ctx); err != nil { return errors.Wrap(err, "failed to archiveToLocalFile") } - a.logger.Trace().Msg("archived to local file") + a.logger.Debug(). + Str("evt.name", "archiver.collect.archiveToLocalFile"). + Msg("archived to local file") if err := a.uploadToS3(ctx); err != nil { return errors.Wrap(err, "failed to uploadToS3") } - a.logger.Trace().Msg("uploaded to S3") + a.logger.Debug(). + Str("evt.name", "archiver.collect.uploadToS3"). + Msg("uploaded to S3") if err := a.Cleanup(); err != nil { return errors.Wrap(err, "failed to Cleanup") @@ -150,14 +164,18 @@ func (a *Archiver) archiveToLocalFile(ctx context.Context) error { if err := a.ensureFileBaseDir(localTempFilePath); err != nil { return errors.Wrap(err, "failed to ensureFileBaseDir") } - a.logger.Trace().Str("localTempFilePath", localTempFilePath).Msg("ensured file base dir") + a.logger.Debug(). + Str("evt.name", "archiver.collect.archiveToLocalFile.ensureFileBaseDir"). + Str("localTempFilePath", localTempFilePath).Msg("ensured file base dir") file, err := os.OpenFile(localTempFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644) if err != nil { return errors.Wrap(err, "failed to open file") } defer file.Close() - a.logger.Trace().Str("localTempFilePath", localTempFilePath).Msg("opened file, ready to write gzip stream") + a.logger.Debug(). + Str("evt.name", "archiver.collect.archiveToLocalFile.openFile"). + Str("localTempFilePath", localTempFilePath).Msg("opened file, ready to write gzip stream") gzipWriter := gzip.NewWriter(file) defer gzipWriter.Close() @@ -170,7 +188,9 @@ func (a *Archiver) archiveToLocalFile(ctx context.Context) error { return nil case item, ok := <-a.writerCh: if !ok { - a.logger.Trace().Msg("writerCh closed, exiting archiveToLocalFile (closing gzipWriter and file)") + a.logger.Debug(). + Str("evt.name", "archiver.collect.archiveToLocalFile.writerChClosed"). + Msg("writerCh closed, exiting archiveToLocalFile (closing gzipWriter and file)") return nil } if err := jsonEncoder.Encode(item); err != nil { diff --git a/internal/repo/drop_report.go b/internal/repo/drop_report.go index b48647bb..23aca856 100644 --- a/internal/repo/drop_report.go +++ b/internal/repo/drop_report.go @@ -312,7 +312,7 @@ func (s *DropReport) GetDropReports(ctx context.Context, queryCtx *model.DropRep func (s *DropReport) GetDropReportsForArchive(ctx context.Context, cursor *model.Cursor, date time.Time, limit int) ([]*model.DropReport, model.Cursor, error) { start := time.UnixMilli(util.GetDayStartTime(&date, "CN")) // we use CN server's day start time across all servers for archive end := start.Add(time.Hour * 24) - results := make([]*model.DropReport, 0) + results := make([]*model.DropReport, 0, limit) query := s.DB.NewSelect(). Model(&results). Where("created_at >= to_timestamp(?)", start.Unix()). diff --git a/internal/service/drop_report_archive.go b/internal/service/drop_report_archive.go index 29da4f8f..a790886d 100644 --- a/internal/service/drop_report_archive.go +++ b/internal/service/drop_report_archive.go @@ -131,6 +131,7 @@ func (s *Archive) populateDropReportsToArchiver(ctx context.Context, date time.T break } log.Info(). + Str("evt.name", "archive.populate.drop_reports"). Int("page", page). Int("cursor_start", cursor.Start). Int("cursor_end", cursor.End). @@ -166,6 +167,7 @@ func (s *Archive) populateDropReportExtrasToArchiver(ctx context.Context, idIncl break } log.Info(). + Str("evt.name", "archive.populate.drop_report_extras"). Int("page", page). Int("cursor_start", cursor.Start). Int("cursor_end", cursor.End). From d5aae82508ec254d1233a60b2eab5f819f0726aa Mon Sep 17 00:00:00 2001 From: GalvinGao Date: Tue, 21 Nov 2023 14:16:52 -0800 Subject: [PATCH 2/7] fix: reduce batchsize --- internal/service/drop_report_archive.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/service/drop_report_archive.go b/internal/service/drop_report_archive.go index a790886d..3e87de09 100644 --- a/internal/service/drop_report_archive.go +++ b/internal/service/drop_report_archive.go @@ -22,6 +22,7 @@ const ( RealmDropReportExtras = "drop_report_extras" ArchiveS3Prefix = "v1/" + BatchSize = 1000 ) type Archive struct { @@ -117,7 +118,7 @@ func (s *Archive) populateDropReportsToArchiver(ctx context.Context, date time.T var err error var page, totalCount, firstId, lastId int for { - dropReports, cursor, err = s.DropReportService.GetDropReportsForArchive(ctx, &cursor, date, 10000) + dropReports, cursor, err = s.DropReportService.GetDropReportsForArchive(ctx, &cursor, date, 1000) if err != nil { return 0, 0, errors.Wrap(err, "failed to extract drop reports") } @@ -159,7 +160,7 @@ func (s *Archive) populateDropReportExtrasToArchiver(ctx context.Context, idIncl var err error var page, totalCount int for { - extras, cursor, err = s.DropReportExtraService.GetDropReportExtraForArchive(ctx, &cursor, idInclusiveStart, idInclusiveEnd, 10000) + extras, cursor, err = s.DropReportExtraService.GetDropReportExtraForArchive(ctx, &cursor, idInclusiveStart, idInclusiveEnd, 1000) if err != nil { return errors.Wrap(err, "failed to extract drop report extras") } From ac61e207e6704dd65ce46b2799fd981152b8fd6e Mon Sep 17 00:00:00 2001 From: GalvinGao Date: Tue, 21 Nov 2023 14:19:46 -0800 Subject: [PATCH 3/7] feat: add batchsize to config --- internal/app/appconfig/spec.go | 2 ++ internal/service/drop_report_archive.go | 5 ++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/app/appconfig/spec.go b/internal/app/appconfig/spec.go index a54d0e28..c6be7401 100644 --- a/internal/app/appconfig/spec.go +++ b/internal/app/appconfig/spec.go @@ -131,6 +131,8 @@ type ConfigSpec struct { // We don't want to show all patterns because it will be too many. So we set a limit here (default 19) PatternMatrixLimit int `split_words:"true" default:"19"` + DropReportArchiveBatchSize int `split_words:"true" default:"1000"` + DropReportArchiveS3Bucket string `required:"true" split_words:"true"` DropReportArchiveS3Region string `required:"true" split_words:"true"` AWSAccessKey string `required:"true" split_words:"true"` diff --git a/internal/service/drop_report_archive.go b/internal/service/drop_report_archive.go index 3e87de09..310b0fbb 100644 --- a/internal/service/drop_report_archive.go +++ b/internal/service/drop_report_archive.go @@ -22,7 +22,6 @@ const ( RealmDropReportExtras = "drop_report_extras" ArchiveS3Prefix = "v1/" - BatchSize = 1000 ) type Archive struct { @@ -118,7 +117,7 @@ func (s *Archive) populateDropReportsToArchiver(ctx context.Context, date time.T var err error var page, totalCount, firstId, lastId int for { - dropReports, cursor, err = s.DropReportService.GetDropReportsForArchive(ctx, &cursor, date, 1000) + dropReports, cursor, err = s.DropReportService.GetDropReportsForArchive(ctx, &cursor, date, s.Config.DropReportArchiveBatchSize) if err != nil { return 0, 0, errors.Wrap(err, "failed to extract drop reports") } @@ -160,7 +159,7 @@ func (s *Archive) populateDropReportExtrasToArchiver(ctx context.Context, idIncl var err error var page, totalCount int for { - extras, cursor, err = s.DropReportExtraService.GetDropReportExtraForArchive(ctx, &cursor, idInclusiveStart, idInclusiveEnd, 1000) + extras, cursor, err = s.DropReportExtraService.GetDropReportExtraForArchive(ctx, &cursor, idInclusiveStart, idInclusiveEnd, s.Config.DropReportArchiveBatchSize) if err != nil { return errors.Wrap(err, "failed to extract drop report extras") } From 668e75c17609224491fcbe7822e64df06b7256e6 Mon Sep 17 00:00:00 2001 From: GalvinGao Date: Tue, 21 Nov 2023 16:53:25 -0800 Subject: [PATCH 4/7] fix: add ErrFileAlreadyExists support --- internal/service/drop_report_archive.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/internal/service/drop_report_archive.go b/internal/service/drop_report_archive.go index 310b0fbb..ec333456 100644 --- a/internal/service/drop_report_archive.go +++ b/internal/service/drop_report_archive.go @@ -81,9 +81,25 @@ func (s *Archive) ArchiveByDate(ctx context.Context, date time.Time) error { eg := errgroup.Group{} if err := s.dropReportsArchiver.Prepare(ctx, date); err != nil { + if errors.Is(err, archiver.ErrFileAlreadyExists) { + log.Info(). + Str("evt.name", "archive.drop_reports"). + Str("realm", RealmDropReports). + Msg("already archived") + + return nil + } return errors.Wrap(err, "failed to prepare drop reports archiver") } if err := s.dropReportExtrasArchiver.Prepare(ctx, date); err != nil { + if errors.Is(err, archiver.ErrFileAlreadyExists) { + log.Info(). + Str("evt.name", "archive.drop_report_extras"). + Str("realm", RealmDropReportExtras). + Msg("already archived") + + return nil + } return errors.Wrap(err, "failed to prepare drop report extras archiver") } @@ -104,9 +120,12 @@ func (s *Archive) ArchiveByDate(ctx context.Context, date time.Time) error { } err = eg.Wait() - log.Info().Err(err).Msg("finished archiving") + log.Info(). + Str("evt.name", "archive.finished"). + Err(err). + Msg("finished archiving") - return nil + return err } func (s *Archive) populateDropReportsToArchiver(ctx context.Context, date time.Time) (int, int, error) { From df439839f075f4b738504a2a75b47b08c4d18de2 Mon Sep 17 00:00:00 2001 From: AlvISsReimu Date: Tue, 21 Nov 2023 22:36:34 -0800 Subject: [PATCH 5/7] feat: add deletion of reports and extras after archiving --- .../scripts/archive_drop_reports/command.go | 9 ++- .../scripts/archive_drop_reports/script.go | 4 +- go.mod | 2 +- go.sum | 2 + internal/app/appconfig/spec.go | 2 + internal/controller/meta/admin.go | 2 +- internal/model/types/admin.go | 3 +- internal/repo/drop_report.go | 11 ++++ internal/repo/drop_report_extra.go | 10 +++ internal/service/drop_report.go | 5 ++ internal/service/drop_report_archive.go | 61 ++++++++++++++++++- internal/service/drop_report_extra.go | 6 ++ 12 files changed, 108 insertions(+), 9 deletions(-) diff --git a/cmd/app/cli/runscript/scripts/archive_drop_reports/command.go b/cmd/app/cli/runscript/scripts/archive_drop_reports/command.go index 1ec02ada..bb36a9d9 100644 --- a/cmd/app/cli/runscript/scripts/archive_drop_reports/command.go +++ b/cmd/app/cli/runscript/scripts/archive_drop_reports/command.go @@ -24,10 +24,17 @@ func Command(depsFn func() CommandDeps) *cli.Command { Usage: "date to archive in GMT+8, in format of YYYY-MM-DD", Required: true, }, + &cli.BoolFlag{ + Name: "delete-after-archive", + Aliases: []string{"D"}, + Usage: "delete the archived drop reports and extras after archiving", + Required: true, + }, }, Action: func(ctx *cli.Context) error { date := ctx.String("date") - return run(ctx, depsFn(), date) + deleteAfterArchive := ctx.Bool("delete-after-archive") + return run(ctx, depsFn(), date, deleteAfterArchive) }, } } diff --git a/cmd/app/cli/runscript/scripts/archive_drop_reports/script.go b/cmd/app/cli/runscript/scripts/archive_drop_reports/script.go index 1c95f88a..53a7161c 100644 --- a/cmd/app/cli/runscript/scripts/archive_drop_reports/script.go +++ b/cmd/app/cli/runscript/scripts/archive_drop_reports/script.go @@ -9,7 +9,7 @@ import ( "github.com/urfave/cli/v2" ) -func run(ctx *cli.Context, deps CommandDeps, dateStr string) error { +func run(ctx *cli.Context, deps CommandDeps, dateStr string, deleteAfterArchive bool) error { log.Info().Str("date", dateStr).Msg("running script") var err error @@ -19,7 +19,7 @@ func run(ctx *cli.Context, deps CommandDeps, dateStr string) error { return errors.Wrap(err, "failed to parse date") } - if err = deps.DropReportArchiveService.ArchiveByDate(ctx.Context, date); err != nil { + if err = deps.DropReportArchiveService.ArchiveByDate(ctx.Context, date, deleteAfterArchive); err != nil { return errors.Wrap(err, "failed to run archiveDropReports") } diff --git a/go.mod b/go.mod index 4ba7a405..d6235da6 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/avast/retry-go/v4 v4.3.4 github.com/aws/aws-sdk-go-v2 v1.23.1 github.com/aws/aws-sdk-go-v2/config v1.25.4 - github.com/aws/aws-sdk-go-v2/credentials v1.16.3 + github.com/aws/aws-sdk-go-v2/credentials v1.16.4 github.com/aws/aws-sdk-go-v2/service/s3 v1.43.1 github.com/aws/smithy-go v1.17.0 github.com/davecgh/go-spew v1.1.1 diff --git a/go.sum b/go.sum index 92d328cd..4fceefde 100644 --- a/go.sum +++ b/go.sum @@ -71,6 +71,8 @@ github.com/aws/aws-sdk-go-v2/config v1.25.4 h1:r+X1x8QI6FEPdJDWCNBDZHyAcyFwSjHN8 github.com/aws/aws-sdk-go-v2/config v1.25.4/go.mod h1:8GTjImECskr7D88P/Nn9uM4M4rLY9i77hLJZgkZEWV8= github.com/aws/aws-sdk-go-v2/credentials v1.16.3 h1:8PeI2krzzjDJ5etmgaMiD1JswsrLrWvKKu/uBUtNy1g= github.com/aws/aws-sdk-go-v2/credentials v1.16.3/go.mod h1:Kdh/okh+//vQ/AjEt81CjvkTo64+/zIE4OewP7RpfXk= +github.com/aws/aws-sdk-go-v2/credentials v1.16.4 h1:i7UQYYDSJrtc30RSwJwfBKwLFNnBTiICqAJ0pPdum8E= +github.com/aws/aws-sdk-go-v2/credentials v1.16.4/go.mod h1:Kdh/okh+//vQ/AjEt81CjvkTo64+/zIE4OewP7RpfXk= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.5 h1:KehRNiVzIfAcj6gw98zotVbb/K67taJE0fkfgM6vzqU= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.5/go.mod h1:VhnExhw6uXy9QzetvpXDolo1/hjhx4u9qukBGkuUwjs= github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.4 h1:LAm3Ycm9HJfbSCd5I+wqC2S9Ej7FPrgr5CQoOljJZcE= diff --git a/internal/app/appconfig/spec.go b/internal/app/appconfig/spec.go index c6be7401..f920a5de 100644 --- a/internal/app/appconfig/spec.go +++ b/internal/app/appconfig/spec.go @@ -139,6 +139,8 @@ type ConfigSpec struct { AWSSecretKey string `required:"true" split_words:"true"` NoArchiveDays int `split_words:"true" default:"60"` + + DeleteDropReportAfterArchive bool `split_words:"true" default:"false"` } type Config struct { diff --git a/internal/controller/meta/admin.go b/internal/controller/meta/admin.go index 08720c04..9018404b 100644 --- a/internal/controller/meta/admin.go +++ b/internal/controller/meta/admin.go @@ -720,7 +720,7 @@ func (c *AdminController) ArchiveDropReports(ctx *fiber.Ctx) error { return ctx.Status(fiber.StatusBadRequest).SendString("invalid date") } - err = c.DropReportArchiveService.ArchiveByDate(ctx.UserContext(), date) + err = c.DropReportArchiveService.ArchiveByDate(ctx.UserContext(), date, request.DeleteAfterArchive) if err != nil { flog.ErrorFrom(ctx, "archive.drop_report"). Err(err). diff --git a/internal/model/types/admin.go b/internal/model/types/admin.go index 12783036..8446bb08 100644 --- a/internal/model/types/admin.go +++ b/internal/model/types/admin.go @@ -56,7 +56,8 @@ type CloneFromCNRequest struct { } type ArchiveDropReportRequest struct { - Date string `json:"date" validate:"required" required:"true"` + Date string `json:"date" validate:"required" required:"true"` + DeleteAfterArchive bool `json:"deleteAfterArchive validate:"required" required:"true"` } type ForeignTimeRange struct { diff --git a/internal/repo/drop_report.go b/internal/repo/drop_report.go index 23aca856..d6b68e7b 100644 --- a/internal/repo/drop_report.go +++ b/internal/repo/drop_report.go @@ -329,6 +329,17 @@ func (s *DropReport) GetDropReportsForArchive(ctx context.Context, cursor *model return results, newCursor(results), nil } +func (s *DropReport) DeleteDropReportsForArchive(ctx context.Context, tx bun.Tx, date time.Time) error { + start := time.UnixMilli(util.GetDayStartTime(&date, "CN")) // we use CN server's day start time across all servers for archive + end := start.Add(time.Hour * 24) + _, err := tx.NewDelete(). + Model((*model.DropReport)(nil)). + Where("created_at >= to_timestamp(?)", start.Unix()). + Where("created_at < to_timestamp(?)", end.Unix()). + Exec(ctx) + return err +} + func (s *DropReport) handleStagesAndItems(query *bun.SelectQuery, stageIdItemIdMap map[int][]int) { stageConditions := make([]string, 0) for stageId, itemIds := range stageIdItemIdMap { diff --git a/internal/repo/drop_report_extra.go b/internal/repo/drop_report_extra.go index 802307b2..671cf099 100644 --- a/internal/repo/drop_report_extra.go +++ b/internal/repo/drop_report_extra.go @@ -67,6 +67,16 @@ func (c *DropReportExtra) GetDropReportExtraForArchive(ctx context.Context, curs return dropReportExtras, newCursor, nil } +func (c *DropReportExtra) DeleteDropReportExtrasForArchive(ctx context.Context, tx bun.Tx, idInclusiveStart int, idInclusiveEnd int) error { + _, err := tx.NewDelete(). + Model((*model.DropReportExtra)(nil)). + Where("report_id >= ?", idInclusiveStart). + Where("report_id <= ?", idInclusiveEnd). + Exec(ctx) + + return err +} + func (c *DropReportExtra) IsDropReportExtraMD5Exist(ctx context.Context, md5 string) bool { var dropReportExtra model.DropReportExtra diff --git a/internal/service/drop_report.go b/internal/service/drop_report.go index af0fde1f..7670d58b 100644 --- a/internal/service/drop_report.go +++ b/internal/service/drop_report.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/uptrace/bun" "gopkg.in/guregu/null.v3" "exusiai.dev/backend-next/internal/model" @@ -114,3 +115,7 @@ func (s *DropReport) GetDropReports( func (s *DropReport) GetDropReportsForArchive(ctx context.Context, cursor *model.Cursor, date time.Time, limit int) ([]*model.DropReport, model.Cursor, error) { return s.DropReportRepo.GetDropReportsForArchive(ctx, cursor, date, limit) } + +func (s *DropReport) DeleteDropReportsForArchive(ctx context.Context, tx bun.Tx, date time.Time) error { + return s.DropReportRepo.DeleteDropReportsForArchive(ctx, tx, date) +} diff --git a/internal/service/drop_report_archive.go b/internal/service/drop_report_archive.go index ec333456..2fefd66e 100644 --- a/internal/service/drop_report_archive.go +++ b/internal/service/drop_report_archive.go @@ -10,6 +10,7 @@ import ( "github.com/go-redsync/redsync/v4" "github.com/pkg/errors" "github.com/rs/zerolog/log" + "github.com/uptrace/bun" "golang.org/x/sync/errgroup" "exusiai.dev/backend-next/internal/app/appconfig" @@ -31,12 +32,13 @@ type Archive struct { s3Client *s3.Client lock *redsync.Mutex + db *bun.DB dropReportsArchiver *archiver.Archiver dropReportExtrasArchiver *archiver.Archiver } -func NewArchive(dropReportService *DropReport, dropReportExtraService *DropReportExtra, conf *appconfig.Config, lock *redsync.Redsync) (*Archive, error) { +func NewArchive(dropReportService *DropReport, dropReportExtraService *DropReportExtra, conf *appconfig.Config, lock *redsync.Redsync, db *bun.DB) (*Archive, error) { cfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion(conf.DropReportArchiveS3Region), config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(conf.AWSAccessKey, conf.AWSSecretKey, "")), @@ -52,6 +54,7 @@ func NewArchive(dropReportService *DropReport, dropReportExtraService *DropRepor Config: conf, s3Client: s3Client, lock: lock.NewMutex("mutex:archiver", redsync.WithExpiry(30*time.Minute), redsync.WithTries(2)), + db: db, dropReportsArchiver: &archiver.Archiver{ S3Client: s3Client, S3Bucket: conf.DropReportArchiveS3Bucket, @@ -69,10 +72,10 @@ func NewArchive(dropReportService *DropReport, dropReportExtraService *DropRepor func (s *Archive) ArchiveByGlobalConfig(ctx context.Context) error { targetDay := time.Now().AddDate(0, 0, -1*s.Config.NoArchiveDays) - return s.ArchiveByDate(ctx, targetDay) + return s.ArchiveByDate(ctx, targetDay, s.Config.DeleteDropReportAfterArchive) } -func (s *Archive) ArchiveByDate(ctx context.Context, date time.Time) error { +func (s *Archive) ArchiveByDate(ctx context.Context, date time.Time, deleteAfterArchive bool) error { if err := s.lock.Lock(); err != nil { return errors.Wrap(err, "failed to acquire lock") } @@ -125,6 +128,10 @@ func (s *Archive) ArchiveByDate(ctx context.Context, date time.Time) error { Err(err). Msg("finished archiving") + if deleteAfterArchive { + s.DeleteReportsAndExtras(ctx, date, firstId, lastId) + } + return err } @@ -208,3 +215,51 @@ func (s *Archive) populateDropReportExtrasToArchiver(ctx context.Context, idIncl Msg("finished populating drop report extras") return nil } + +func (s *Archive) DeleteReportsAndExtras(ctx context.Context, date time.Time, idInclusiveStart int, idInclusiveEnd int) error { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return errors.Wrap(err, "failed to start transaction") + } + + log.Info(). + Str("evt.name", "archive.deletion"). + Str("date", date.Format("2006-01-02")). + Int("first_id", idInclusiveStart). + Int("last_id", idInclusiveEnd). + Msg("start deleting drop reports and extras") + + err = s.DropReportService.DeleteDropReportsForArchive(ctx, tx, date) + if err != nil { + return errors.Wrap(err, "failed to delete drop reports") + } + + log.Info(). + Str("evt.name", "archive.deletion.drop_report"). + Str("date", date.Format("2006-01-02")). + Msg("finished deleting drop reports") + + err = s.DropReportExtraService.DeleteDropReportExtrasForArchive(ctx, tx, idInclusiveStart, idInclusiveEnd) + if err != nil { + return errors.Wrap(err, "failed to delete drop report extras") + } + + log.Info(). + Str("evt.name", "archive.deletion.drop_report_extra"). + Int("first_id", idInclusiveStart). + Int("last_id", idInclusiveEnd). + Msg("finished deleting drop report extras") + + if err := tx.Commit(); err != nil { + return errors.Wrap(err, "failed to commit transaction") + } + + log.Info(). + Str("evt.name", "archive.deletion.success"). + Str("date", date.Format("2006-01-02")). + Int("first_id", idInclusiveStart). + Int("last_id", idInclusiveEnd). + Msg("finished committing the transaction of deleting drop reports and extras") + + return nil +} diff --git a/internal/service/drop_report_extra.go b/internal/service/drop_report_extra.go index 5961a332..9b4c1d5d 100644 --- a/internal/service/drop_report_extra.go +++ b/internal/service/drop_report_extra.go @@ -3,6 +3,8 @@ package service import ( "context" + "github.com/uptrace/bun" + "exusiai.dev/backend-next/internal/model" "exusiai.dev/backend-next/internal/repo" ) @@ -20,3 +22,7 @@ func NewDropReportExtra(dropReportExtraRepo *repo.DropReportExtra) *DropReportEx func (s *DropReportExtra) GetDropReportExtraForArchive(ctx context.Context, cursor *model.Cursor, idInclusiveStart int, idInclusiveEnd int, limit int) ([]*model.DropReportExtra, model.Cursor, error) { return s.DropReportExtraRepo.GetDropReportExtraForArchive(ctx, cursor, idInclusiveStart, idInclusiveEnd, limit) } + +func (c *DropReportExtra) DeleteDropReportExtrasForArchive(ctx context.Context, tx bun.Tx, idInclusiveStart int, idInclusiveEnd int) error { + return c.DropReportExtraRepo.DeleteDropReportExtrasForArchive(ctx, tx, idInclusiveStart, idInclusiveEnd) +} From 308a9b033da2fd4f1cc44b7f0671431cb8e46ff3 Mon Sep 17 00:00:00 2001 From: GalvinGao Date: Wed, 22 Nov 2023 23:28:27 -0800 Subject: [PATCH 6/7] feat: add rowsAffected & fix json struct tag --- internal/model/types/admin.go | 2 +- internal/repo/drop_report.go | 11 ++++++++--- internal/repo/drop_report_extra.go | 11 ++++++++--- internal/service/drop_report.go | 2 +- internal/service/drop_report_archive.go | 7 +++++-- internal/service/drop_report_extra.go | 2 +- 6 files changed, 24 insertions(+), 11 deletions(-) diff --git a/internal/model/types/admin.go b/internal/model/types/admin.go index 8446bb08..1db243c9 100644 --- a/internal/model/types/admin.go +++ b/internal/model/types/admin.go @@ -57,7 +57,7 @@ type CloneFromCNRequest struct { type ArchiveDropReportRequest struct { Date string `json:"date" validate:"required" required:"true"` - DeleteAfterArchive bool `json:"deleteAfterArchive validate:"required" required:"true"` + DeleteAfterArchive bool `json:"deleteAfterArchive" validate:"required" required:"true"` } type ForeignTimeRange struct { diff --git a/internal/repo/drop_report.go b/internal/repo/drop_report.go index d6b68e7b..b2be801f 100644 --- a/internal/repo/drop_report.go +++ b/internal/repo/drop_report.go @@ -329,15 +329,20 @@ func (s *DropReport) GetDropReportsForArchive(ctx context.Context, cursor *model return results, newCursor(results), nil } -func (s *DropReport) DeleteDropReportsForArchive(ctx context.Context, tx bun.Tx, date time.Time) error { +// DeleteDropReportsForArchive deletes drop reports for archive. +// returns number of rows affected and error +func (s *DropReport) DeleteDropReportsForArchive(ctx context.Context, tx bun.Tx, date time.Time) (int64, error) { start := time.UnixMilli(util.GetDayStartTime(&date, "CN")) // we use CN server's day start time across all servers for archive end := start.Add(time.Hour * 24) - _, err := tx.NewDelete(). + r, err := tx.NewDelete(). Model((*model.DropReport)(nil)). Where("created_at >= to_timestamp(?)", start.Unix()). Where("created_at < to_timestamp(?)", end.Unix()). Exec(ctx) - return err + if err != nil { + return -1, err + } + return r.RowsAffected() } func (s *DropReport) handleStagesAndItems(query *bun.SelectQuery, stageIdItemIdMap map[int][]int) { diff --git a/internal/repo/drop_report_extra.go b/internal/repo/drop_report_extra.go index 671cf099..b4c565f1 100644 --- a/internal/repo/drop_report_extra.go +++ b/internal/repo/drop_report_extra.go @@ -67,14 +67,19 @@ func (c *DropReportExtra) GetDropReportExtraForArchive(ctx context.Context, curs return dropReportExtras, newCursor, nil } -func (c *DropReportExtra) DeleteDropReportExtrasForArchive(ctx context.Context, tx bun.Tx, idInclusiveStart int, idInclusiveEnd int) error { - _, err := tx.NewDelete(). +// DeleteDropReportExtrasForArchive deletes all drop report extras with report_id between idInclusiveStart and idInclusiveEnd. +// Returns the number of rows affected and an error if any. +func (c *DropReportExtra) DeleteDropReportExtrasForArchive(ctx context.Context, tx bun.Tx, idInclusiveStart int, idInclusiveEnd int) (int64, error) { + r, err := tx.NewDelete(). Model((*model.DropReportExtra)(nil)). Where("report_id >= ?", idInclusiveStart). Where("report_id <= ?", idInclusiveEnd). Exec(ctx) + if err != nil { + return -1, err + } - return err + return r.RowsAffected() } func (c *DropReportExtra) IsDropReportExtraMD5Exist(ctx context.Context, md5 string) bool { diff --git a/internal/service/drop_report.go b/internal/service/drop_report.go index 7670d58b..5c78f9c1 100644 --- a/internal/service/drop_report.go +++ b/internal/service/drop_report.go @@ -116,6 +116,6 @@ func (s *DropReport) GetDropReportsForArchive(ctx context.Context, cursor *model return s.DropReportRepo.GetDropReportsForArchive(ctx, cursor, date, limit) } -func (s *DropReport) DeleteDropReportsForArchive(ctx context.Context, tx bun.Tx, date time.Time) error { +func (s *DropReport) DeleteDropReportsForArchive(ctx context.Context, tx bun.Tx, date time.Time) (int64, error) { return s.DropReportRepo.DeleteDropReportsForArchive(ctx, tx, date) } diff --git a/internal/service/drop_report_archive.go b/internal/service/drop_report_archive.go index 2fefd66e..0f99e4e3 100644 --- a/internal/service/drop_report_archive.go +++ b/internal/service/drop_report_archive.go @@ -229,17 +229,19 @@ func (s *Archive) DeleteReportsAndExtras(ctx context.Context, date time.Time, id Int("last_id", idInclusiveEnd). Msg("start deleting drop reports and extras") - err = s.DropReportService.DeleteDropReportsForArchive(ctx, tx, date) + var rowsAffected int64 + rowsAffected, err = s.DropReportService.DeleteDropReportsForArchive(ctx, tx, date) if err != nil { return errors.Wrap(err, "failed to delete drop reports") } log.Info(). + Int64("rows_affected", rowsAffected). Str("evt.name", "archive.deletion.drop_report"). Str("date", date.Format("2006-01-02")). Msg("finished deleting drop reports") - err = s.DropReportExtraService.DeleteDropReportExtrasForArchive(ctx, tx, idInclusiveStart, idInclusiveEnd) + rowsAffected, err = s.DropReportExtraService.DeleteDropReportExtrasForArchive(ctx, tx, idInclusiveStart, idInclusiveEnd) if err != nil { return errors.Wrap(err, "failed to delete drop report extras") } @@ -248,6 +250,7 @@ func (s *Archive) DeleteReportsAndExtras(ctx context.Context, date time.Time, id Str("evt.name", "archive.deletion.drop_report_extra"). Int("first_id", idInclusiveStart). Int("last_id", idInclusiveEnd). + Int64("rows_affected", rowsAffected). Msg("finished deleting drop report extras") if err := tx.Commit(); err != nil { diff --git a/internal/service/drop_report_extra.go b/internal/service/drop_report_extra.go index 9b4c1d5d..79e8fe03 100644 --- a/internal/service/drop_report_extra.go +++ b/internal/service/drop_report_extra.go @@ -23,6 +23,6 @@ func (s *DropReportExtra) GetDropReportExtraForArchive(ctx context.Context, curs return s.DropReportExtraRepo.GetDropReportExtraForArchive(ctx, cursor, idInclusiveStart, idInclusiveEnd, limit) } -func (c *DropReportExtra) DeleteDropReportExtrasForArchive(ctx context.Context, tx bun.Tx, idInclusiveStart int, idInclusiveEnd int) error { +func (c *DropReportExtra) DeleteDropReportExtrasForArchive(ctx context.Context, tx bun.Tx, idInclusiveStart int, idInclusiveEnd int) (int64, error) { return c.DropReportExtraRepo.DeleteDropReportExtrasForArchive(ctx, tx, idInclusiveStart, idInclusiveEnd) } From 968ae8d5799ff4f294b858c87b083d2f66803755 Mon Sep 17 00:00:00 2001 From: GalvinGao Date: Wed, 22 Nov 2023 23:29:19 -0800 Subject: [PATCH 7/7] feat: add log --- internal/service/drop_report_archive.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/internal/service/drop_report_archive.go b/internal/service/drop_report_archive.go index 0f99e4e3..7d010c1f 100644 --- a/internal/service/drop_report_archive.go +++ b/internal/service/drop_report_archive.go @@ -129,7 +129,18 @@ func (s *Archive) ArchiveByDate(ctx context.Context, date time.Time, deleteAfter Msg("finished archiving") if deleteAfterArchive { - s.DeleteReportsAndExtras(ctx, date, firstId, lastId) + log.Info(). + Str("evt.name", "archive.delete"). + Msg("deleting drop reports and extras") + + err = s.DeleteReportsAndExtras(ctx, date, firstId, lastId) + if err != nil { + return errors.Wrap(err, "failed to delete drop reports and extras") + } + + log.Info(). + Str("evt.name", "archive.delete"). + Msg("finished deleting drop reports and extras") } return err