Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release v3.14.3 #506

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,17 @@ func Command(depsFn func() CommandDeps) *cli.Command {
Usage: "date to archive in GMT+8, in format of YYYY-MM-DD",
Required: true,
},
&cli.BoolFlag{
Name: "delete-after-archive",
Aliases: []string{"D"},
Usage: "delete the archived drop reports and extras after archiving",
Required: true,
},
},
Action: func(ctx *cli.Context) error {
date := ctx.String("date")
return run(ctx, depsFn(), date)
deleteAfterArchive := ctx.Bool("delete-after-archive")
return run(ctx, depsFn(), date, deleteAfterArchive)
},
}
}
4 changes: 2 additions & 2 deletions cmd/app/cli/runscript/scripts/archive_drop_reports/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/urfave/cli/v2"
)

func run(ctx *cli.Context, deps CommandDeps, dateStr string) error {
func run(ctx *cli.Context, deps CommandDeps, dateStr string, deleteAfterArchive bool) error {
log.Info().Str("date", dateStr).Msg("running script")

var err error
Expand All @@ -19,7 +19,7 @@ func run(ctx *cli.Context, deps CommandDeps, dateStr string) error {
return errors.Wrap(err, "failed to parse date")
}

if err = deps.DropReportArchiveService.ArchiveByDate(ctx.Context, date); err != nil {
if err = deps.DropReportArchiveService.ArchiveByDate(ctx.Context, date, deleteAfterArchive); err != nil {
return errors.Wrap(err, "failed to run archiveDropReports")
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/avast/retry-go/v4 v4.3.4
github.com/aws/aws-sdk-go-v2 v1.23.1
github.com/aws/aws-sdk-go-v2/config v1.25.4
github.com/aws/aws-sdk-go-v2/credentials v1.16.3
github.com/aws/aws-sdk-go-v2/credentials v1.16.4
github.com/aws/aws-sdk-go-v2/service/s3 v1.43.1
github.com/aws/smithy-go v1.17.0
github.com/davecgh/go-spew v1.1.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ github.com/aws/aws-sdk-go-v2/config v1.25.4 h1:r+X1x8QI6FEPdJDWCNBDZHyAcyFwSjHN8
github.com/aws/aws-sdk-go-v2/config v1.25.4/go.mod h1:8GTjImECskr7D88P/Nn9uM4M4rLY9i77hLJZgkZEWV8=
github.com/aws/aws-sdk-go-v2/credentials v1.16.3 h1:8PeI2krzzjDJ5etmgaMiD1JswsrLrWvKKu/uBUtNy1g=
github.com/aws/aws-sdk-go-v2/credentials v1.16.3/go.mod h1:Kdh/okh+//vQ/AjEt81CjvkTo64+/zIE4OewP7RpfXk=
github.com/aws/aws-sdk-go-v2/credentials v1.16.4 h1:i7UQYYDSJrtc30RSwJwfBKwLFNnBTiICqAJ0pPdum8E=
github.com/aws/aws-sdk-go-v2/credentials v1.16.4/go.mod h1:Kdh/okh+//vQ/AjEt81CjvkTo64+/zIE4OewP7RpfXk=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.5 h1:KehRNiVzIfAcj6gw98zotVbb/K67taJE0fkfgM6vzqU=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.5/go.mod h1:VhnExhw6uXy9QzetvpXDolo1/hjhx4u9qukBGkuUwjs=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.4 h1:LAm3Ycm9HJfbSCd5I+wqC2S9Ej7FPrgr5CQoOljJZcE=
Expand Down
2 changes: 2 additions & 0 deletions internal/app/appconfig/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ type ConfigSpec struct {
AWSSecretKey string `required:"true" split_words:"true"`

NoArchiveDays int `split_words:"true" default:"60"`

DeleteDropReportAfterArchive bool `split_words:"true" default:"false"`
}

type Config struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/meta/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ func (c *AdminController) ArchiveDropReports(ctx *fiber.Ctx) error {
return ctx.Status(fiber.StatusBadRequest).SendString("invalid date")
}

err = c.DropReportArchiveService.ArchiveByDate(ctx.UserContext(), date)
err = c.DropReportArchiveService.ArchiveByDate(ctx.UserContext(), date, request.DeleteAfterArchive)
if err != nil {
flog.ErrorFrom(ctx, "archive.drop_report").
Err(err).
Expand Down
3 changes: 2 additions & 1 deletion internal/model/types/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ type CloneFromCNRequest struct {
}

type ArchiveDropReportRequest struct {
Date string `json:"date" validate:"required" required:"true"`
Date string `json:"date" validate:"required" required:"true"`
DeleteAfterArchive bool `json:"deleteAfterArchive" validate:"required" required:"true"`
}

type ForeignTimeRange struct {
Expand Down
16 changes: 16 additions & 0 deletions internal/repo/drop_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,22 @@ func (s *DropReport) GetDropReportsForArchive(ctx context.Context, cursor *model
return results, newCursor(results), nil
}

// DeleteDropReportsForArchive deletes drop reports for archive.
// returns number of rows affected and error
func (s *DropReport) DeleteDropReportsForArchive(ctx context.Context, tx bun.Tx, date time.Time) (int64, error) {
start := time.UnixMilli(util.GetDayStartTime(&date, "CN")) // we use CN server's day start time across all servers for archive
end := start.Add(time.Hour * 24)
r, err := tx.NewDelete().
Model((*model.DropReport)(nil)).
Where("created_at >= to_timestamp(?)", start.Unix()).
Where("created_at < to_timestamp(?)", end.Unix()).
Exec(ctx)
if err != nil {
return -1, err
}
return r.RowsAffected()
}

func (s *DropReport) handleStagesAndItems(query *bun.SelectQuery, stageIdItemIdMap map[int][]int) {
stageConditions := make([]string, 0)
for stageId, itemIds := range stageIdItemIdMap {
Expand Down
15 changes: 15 additions & 0 deletions internal/repo/drop_report_extra.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,21 @@ func (c *DropReportExtra) GetDropReportExtraForArchive(ctx context.Context, curs
return dropReportExtras, newCursor, nil
}

// DeleteDropReportExtrasForArchive deletes all drop report extras with report_id between idInclusiveStart and idInclusiveEnd.
// Returns the number of rows affected and an error if any.
func (c *DropReportExtra) DeleteDropReportExtrasForArchive(ctx context.Context, tx bun.Tx, idInclusiveStart int, idInclusiveEnd int) (int64, error) {
r, err := tx.NewDelete().
Model((*model.DropReportExtra)(nil)).
Where("report_id >= ?", idInclusiveStart).
Where("report_id <= ?", idInclusiveEnd).
Exec(ctx)
if err != nil {
return -1, err
}

return r.RowsAffected()
}

func (c *DropReportExtra) IsDropReportExtraMD5Exist(ctx context.Context, md5 string) bool {
var dropReportExtra model.DropReportExtra

Expand Down
5 changes: 5 additions & 0 deletions internal/service/drop_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/uptrace/bun"
"gopkg.in/guregu/null.v3"

"exusiai.dev/backend-next/internal/model"
Expand Down Expand Up @@ -114,3 +115,7 @@ func (s *DropReport) GetDropReports(
func (s *DropReport) GetDropReportsForArchive(ctx context.Context, cursor *model.Cursor, date time.Time, limit int) ([]*model.DropReport, model.Cursor, error) {
return s.DropReportRepo.GetDropReportsForArchive(ctx, cursor, date, limit)
}

func (s *DropReport) DeleteDropReportsForArchive(ctx context.Context, tx bun.Tx, date time.Time) (int64, error) {
return s.DropReportRepo.DeleteDropReportsForArchive(ctx, tx, date)
}
75 changes: 72 additions & 3 deletions internal/service/drop_report_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-redsync/redsync/v4"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/uptrace/bun"
"golang.org/x/sync/errgroup"

"exusiai.dev/backend-next/internal/app/appconfig"
Expand All @@ -31,12 +32,13 @@ type Archive struct {

s3Client *s3.Client
lock *redsync.Mutex
db *bun.DB

dropReportsArchiver *archiver.Archiver
dropReportExtrasArchiver *archiver.Archiver
}

func NewArchive(dropReportService *DropReport, dropReportExtraService *DropReportExtra, conf *appconfig.Config, lock *redsync.Redsync) (*Archive, error) {
func NewArchive(dropReportService *DropReport, dropReportExtraService *DropReportExtra, conf *appconfig.Config, lock *redsync.Redsync, db *bun.DB) (*Archive, error) {
cfg, err := config.LoadDefaultConfig(context.Background(),
config.WithRegion(conf.DropReportArchiveS3Region),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(conf.AWSAccessKey, conf.AWSSecretKey, "")),
Expand All @@ -52,6 +54,7 @@ func NewArchive(dropReportService *DropReport, dropReportExtraService *DropRepor
Config: conf,
s3Client: s3Client,
lock: lock.NewMutex("mutex:archiver", redsync.WithExpiry(30*time.Minute), redsync.WithTries(2)),
db: db,
dropReportsArchiver: &archiver.Archiver{
S3Client: s3Client,
S3Bucket: conf.DropReportArchiveS3Bucket,
Expand All @@ -69,10 +72,10 @@ func NewArchive(dropReportService *DropReport, dropReportExtraService *DropRepor

func (s *Archive) ArchiveByGlobalConfig(ctx context.Context) error {
targetDay := time.Now().AddDate(0, 0, -1*s.Config.NoArchiveDays)
return s.ArchiveByDate(ctx, targetDay)
return s.ArchiveByDate(ctx, targetDay, s.Config.DeleteDropReportAfterArchive)
}

func (s *Archive) ArchiveByDate(ctx context.Context, date time.Time) error {
func (s *Archive) ArchiveByDate(ctx context.Context, date time.Time, deleteAfterArchive bool) error {
if err := s.lock.Lock(); err != nil {
return errors.Wrap(err, "failed to acquire lock")
}
Expand Down Expand Up @@ -125,6 +128,21 @@ func (s *Archive) ArchiveByDate(ctx context.Context, date time.Time) error {
Err(err).
Msg("finished archiving")

if deleteAfterArchive {
log.Info().
Str("evt.name", "archive.delete").
Msg("deleting drop reports and extras")

err = s.DeleteReportsAndExtras(ctx, date, firstId, lastId)
if err != nil {
return errors.Wrap(err, "failed to delete drop reports and extras")
}

log.Info().
Str("evt.name", "archive.delete").
Msg("finished deleting drop reports and extras")
}

return err
}

Expand Down Expand Up @@ -208,3 +226,54 @@ func (s *Archive) populateDropReportExtrasToArchiver(ctx context.Context, idIncl
Msg("finished populating drop report extras")
return nil
}

func (s *Archive) DeleteReportsAndExtras(ctx context.Context, date time.Time, idInclusiveStart int, idInclusiveEnd int) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return errors.Wrap(err, "failed to start transaction")
}

log.Info().
Str("evt.name", "archive.deletion").
Str("date", date.Format("2006-01-02")).
Int("first_id", idInclusiveStart).
Int("last_id", idInclusiveEnd).
Msg("start deleting drop reports and extras")

var rowsAffected int64
rowsAffected, err = s.DropReportService.DeleteDropReportsForArchive(ctx, tx, date)
if err != nil {
return errors.Wrap(err, "failed to delete drop reports")
}

log.Info().
Int64("rows_affected", rowsAffected).
Str("evt.name", "archive.deletion.drop_report").
Str("date", date.Format("2006-01-02")).
Msg("finished deleting drop reports")

rowsAffected, err = s.DropReportExtraService.DeleteDropReportExtrasForArchive(ctx, tx, idInclusiveStart, idInclusiveEnd)
if err != nil {
return errors.Wrap(err, "failed to delete drop report extras")
}

log.Info().
Str("evt.name", "archive.deletion.drop_report_extra").
Int("first_id", idInclusiveStart).
Int("last_id", idInclusiveEnd).
Int64("rows_affected", rowsAffected).
Msg("finished deleting drop report extras")

if err := tx.Commit(); err != nil {
return errors.Wrap(err, "failed to commit transaction")
}

log.Info().
Str("evt.name", "archive.deletion.success").
Str("date", date.Format("2006-01-02")).
Int("first_id", idInclusiveStart).
Int("last_id", idInclusiveEnd).
Msg("finished committing the transaction of deleting drop reports and extras")

return nil
}
6 changes: 6 additions & 0 deletions internal/service/drop_report_extra.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package service
import (
"context"

"github.com/uptrace/bun"

"exusiai.dev/backend-next/internal/model"
"exusiai.dev/backend-next/internal/repo"
)
Expand All @@ -20,3 +22,7 @@ func NewDropReportExtra(dropReportExtraRepo *repo.DropReportExtra) *DropReportEx
func (s *DropReportExtra) GetDropReportExtraForArchive(ctx context.Context, cursor *model.Cursor, idInclusiveStart int, idInclusiveEnd int, limit int) ([]*model.DropReportExtra, model.Cursor, error) {
return s.DropReportExtraRepo.GetDropReportExtraForArchive(ctx, cursor, idInclusiveStart, idInclusiveEnd, limit)
}

func (c *DropReportExtra) DeleteDropReportExtrasForArchive(ctx context.Context, tx bun.Tx, idInclusiveStart int, idInclusiveEnd int) (int64, error) {
return c.DropReportExtraRepo.DeleteDropReportExtrasForArchive(ctx, tx, idInclusiveStart, idInclusiveEnd)
}
Loading