From 3076bb8768b663ba739adb2622fc088c1b69b261 Mon Sep 17 00:00:00 2001 From: Nic Pottier Date: Wed, 16 May 2018 17:18:40 -0500 Subject: [PATCH] work in drazen changes --- archiver.go | 101 ++++++++++++++++++++++++++++++++++------ archiver_test.go | 53 +++++++++++++++------ cmd/rp-archiver/main.go | 98 ++++++++++++-------------------------- config.go | 6 +++ 4 files changed, 163 insertions(+), 95 deletions(-) diff --git a/archiver.go b/archiver.go index aa57930..35cb70d 100644 --- a/archiver.go +++ b/archiver.go @@ -25,7 +25,7 @@ import ( type ArchiveType string const ( - FlowRunType = ArchiveType("flowrun") + RunType = ArchiveType("run") MessageType = ArchiveType("message") SessionType = ArchiveType("session") ) @@ -91,10 +91,10 @@ func GetActiveOrgs(ctx context.Context, db *sqlx.DB) ([]Org, error) { const lookupOrgArchives = `SELECT start_date, archive_type FROM archives_archive WHERE org_id = $1 AND archive_type = $2 AND period = 'D' ORDER BY start_date asc` -// GetArchiveTasks calculates what archives need to be generated for the passed in org this is calculated per day -func GetArchiveTasks(ctx context.Context, db *sqlx.DB, now time.Time, org Org, archiveType ArchiveType) ([]Archive, error) { - archives := []Archive{} - err := db.SelectContext(ctx, &archives, lookupOrgArchives, org.ID, archiveType) +// GetMissingArchives calculates what archives need to be generated for the passed in org this is calculated per day +func GetMissingArchives(ctx context.Context, db *sqlx.DB, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, error) { + existingArchives := []*Archive{} + err := db.SelectContext(ctx, &existingArchives, lookupOrgArchives, org.ID, archiveType) if err != nil && err != sql.ErrNoRows { return nil, err } @@ -104,7 +104,7 @@ func GetArchiveTasks(ctx context.Context, db *sqlx.DB, now time.Time, org Org, a orgUTC := org.CreatedOn.In(time.UTC) startDate := time.Date(orgUTC.Year(), orgUTC.Month(), orgUTC.Day(), 0, 0, 0, 0, time.UTC) - tasks := make([]Archive, 0, 1) + missingArchives := make([]*Archive, 0, 1) currentArchiveIdx := 0 // walk forwards until we are after our end date @@ -112,12 +112,12 @@ func GetArchiveTasks(ctx context.Context, db *sqlx.DB, now time.Time, org Org, a existing := false // advance our current archive idx until we are on our start date or later - for currentArchiveIdx < len(archives) && archives[currentArchiveIdx].StartDate.Before(startDate) { + for currentArchiveIdx < len(existingArchives) && existingArchives[currentArchiveIdx].StartDate.Before(startDate) { currentArchiveIdx++ } // do we already have this archive? - if currentArchiveIdx < len(archives) && archives[currentArchiveIdx].StartDate.Equal(startDate) { + if currentArchiveIdx < len(existingArchives) && existingArchives[currentArchiveIdx].StartDate.Equal(startDate) { existing = true } @@ -130,13 +130,13 @@ func GetArchiveTasks(ctx context.Context, db *sqlx.DB, now time.Time, org Org, a ArchiveType: archiveType, Period: Day, } - tasks = append(tasks, archive) + missingArchives = append(missingArchives, &archive) } startDate = startDate.Add(time.Hour * 24) } - return tasks, nil + return missingArchives, nil } const lookupMsgs = ` @@ -230,7 +230,7 @@ FROM ( ` // EnsureTempArchiveDirectory checks that we can write to our archive directory, creating it first if needbe -func EnsureTempArchiveDirectory(ctx context.Context, path string) error { +func EnsureTempArchiveDirectory(path string) error { if len(path) == 0 { return fmt.Errorf("path argument cannot be empty") } @@ -287,7 +287,7 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi var rows *sqlx.Rows if archive.ArchiveType == MessageType { rows, err = db.QueryxContext(ctx, lookupMsgs, archive.Org.ID, archive.StartDate, endDate) - } else if archive.ArchiveType == FlowRunType { + } else if archive.ArchiveType == RunType { rows, err = db.QueryxContext(ctx, lookupFlowRuns, archive.Org.ID, archive.StartDate, endDate) } if err != nil { @@ -313,7 +313,7 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi if visibility == "deleted" { continue } - } else if archive.ArchiveType == FlowRunType { + } else if archive.ArchiveType == RunType { err = rows.Scan(&record) if err != nil { return err @@ -360,16 +360,31 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi return nil } -// UploadArchive upload file to S3 +// UploadArchive uploads the passed archive file to S3 func UploadArchive(ctx context.Context, s3Client s3iface.S3API, bucket string, archive *Archive) error { // s3 wants a base64 encoded hash instead of our hex encoded hashBytes, _ := hex.DecodeString(archive.ArchiveHash) hashBase64 := base64.StdEncoding.EncodeToString(hashBytes) + archivePath := "" + if archive.Period == Day { + archivePath = fmt.Sprintf( + "/%d/%s_%s%d%02d%02d_%s.jsonl.gz", + archive.Org.ID, archive.ArchiveType, archive.Period, + archive.StartDate.Year(), archive.StartDate.Month(), archive.StartDate.Day(), + archive.ArchiveHash) + } else { + archivePath = fmt.Sprintf( + "/%d/%s_%s%d%02d_%s.jsonl.gz", + archive.Org.ID, archive.ArchiveType, archive.Period, + archive.StartDate.Year(), archive.StartDate.Month(), + archive.ArchiveHash) + } + url, err := s3.PutS3File( s3Client, bucket, - fmt.Sprintf("/%d/%s_%d_%02d_%s.jsonl.gz", archive.Org.ID, archive.ArchiveType, archive.StartDate.Year(), archive.StartDate.Month(), archive.ArchiveHash), + archivePath, "application/json", "gzip", archive.ArchiveFile, @@ -436,3 +451,59 @@ func DeleteArchiveFile(archive *Archive) error { log.WithField("filename", archive.ArchiveFile).Debug("deleted temporary archive file") return nil } + +// ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives +func ArchiveOrg(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) { + log := logrus.WithField("org", org.Name).WithField("org_id", org.ID) + records := 0 + + archives, err := GetMissingArchives(ctx, db, now, org, archiveType) + if err != nil { + return nil, fmt.Errorf("error calculating tasks for type '%s'", archiveType) + } + + for _, archive := range archives { + log = log.WithField("start_date", archive.StartDate).WithField("period", archive.Period).WithField("archive_type", archive.ArchiveType) + log.Info("starting archive") + err := CreateArchiveFile(ctx, db, archive, config.TempDir) + if err != nil { + log.WithError(err).Error("error writing archive file") + continue + } + + if config.UploadToS3 { + err = UploadArchive(ctx, s3Client, config.S3Bucket, archive) + if err != nil { + log.WithError(err).Error("error writing archive to s3") + continue + } + } + + err = WriteArchiveToDB(ctx, db, archive) + if err != nil { + log.WithError(err).Error("error writing record to db") + continue + } + + // purge records that were archived + + if config.DeleteAfterUpload == true { + err := DeleteArchiveFile(archive) + if err != nil { + log.WithError(err).Error("error deleting temporary file") + continue + } + } + + log.WithField("id", archive.ID).WithField("record_count", archive.RecordCount).WithField("elapsed", archive.BuildTime).Info("archive complete") + records += archive.RecordCount + } + + if len(archives) > 0 { + elapsed := time.Now().Sub(now) + rate := float32(records) / (float32(elapsed) / float32(time.Second)) + log.WithField("elapsed", elapsed).WithField("records_per_second", int(rate)).Info("completed archival for org") + } + + return archives, nil +} diff --git a/archiver_test.go b/archiver_test.go index 21f386a..56a9a3f 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -38,19 +38,19 @@ func TestGetArchiveTasks(t *testing.T) { now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) // org 1 is too new, no tasks - tasks, err := GetArchiveTasks(ctx, db, now, orgs[0], MessageType) + tasks, err := GetMissingArchives(ctx, db, now, orgs[0], MessageType) assert.NoError(t, err) assert.Equal(t, 0, len(tasks)) // org 2 should have some - tasks, err = GetArchiveTasks(ctx, db, now, orgs[1], MessageType) + tasks, err = GetMissingArchives(ctx, db, now, orgs[1], MessageType) assert.NoError(t, err) assert.Equal(t, 62, len(tasks)) assert.Equal(t, time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), tasks[0].StartDate) assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), tasks[61].StartDate) // org 3 is the same as 2, but two of the tasks have already been built - tasks, err = GetArchiveTasks(ctx, db, now, orgs[2], MessageType) + tasks, err = GetMissingArchives(ctx, db, now, orgs[2], MessageType) assert.NoError(t, err) assert.Equal(t, 60, len(tasks)) assert.Equal(t, time.Date(2017, 8, 11, 0, 0, 0, 0, time.UTC), tasks[0].StartDate) @@ -61,20 +61,20 @@ func TestCreateMsgArchive(t *testing.T) { db := setup(t) ctx := context.Background() - err := EnsureTempArchiveDirectory(ctx, "/tmp") + err := EnsureTempArchiveDirectory("/tmp") assert.NoError(t, err) orgs, err := GetActiveOrgs(ctx, db) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) - tasks, err := GetArchiveTasks(ctx, db, now, orgs[1], MessageType) + tasks, err := GetMissingArchives(ctx, db, now, orgs[1], MessageType) assert.NoError(t, err) assert.Equal(t, 62, len(tasks)) task := tasks[0] // build our first task, should have no messages - err = CreateArchiveFile(ctx, db, &task, "/tmp") + err = CreateArchiveFile(ctx, db, task, "/tmp") assert.NoError(t, err) // should have no records and be an empty gzip file @@ -82,11 +82,11 @@ func TestCreateMsgArchive(t *testing.T) { assert.Equal(t, int64(23), task.ArchiveSize) assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", task.ArchiveHash) - DeleteArchiveFile(&task) + DeleteArchiveFile(task) - // build our second task, should have a single message + // build our third task, should have a single message task = tasks[2] - err = CreateArchiveFile(ctx, db, &task, "/tmp") + err = CreateArchiveFile(ctx, db, task, "/tmp") assert.NoError(t, err) // should have two records, second will have attachments @@ -94,7 +94,7 @@ func TestCreateMsgArchive(t *testing.T) { assert.Equal(t, int64(365), task.ArchiveSize) assert.Equal(t, "cc67ae0d1edb9caa4c8c56b3d4de58ee", task.ArchiveHash) - DeleteArchiveFile(&task) + DeleteArchiveFile(task) _, err = os.Stat(task.ArchiveFile) assert.True(t, os.IsNotExist(err)) } @@ -107,21 +107,48 @@ func TestWriteArchiveToDB(t *testing.T) { assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) - tasks, err := GetArchiveTasks(ctx, db, now, orgs[1], MessageType) + tasks, err := GetMissingArchives(ctx, db, now, orgs[1], MessageType) assert.NoError(t, err) assert.Equal(t, 62, len(tasks)) assert.Equal(t, time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), tasks[0].StartDate) task := tasks[0] - err = WriteArchiveToDB(ctx, db, &task) + err = WriteArchiveToDB(ctx, db, task) assert.NoError(t, err) assert.Equal(t, 3, task.ID) assert.Equal(t, false, task.IsPurged) // if we recalculate our tasks, we should have one less now - tasks, err = GetArchiveTasks(ctx, db, now, orgs[1], MessageType) + tasks, err = GetMissingArchives(ctx, db, now, orgs[1], MessageType) assert.NoError(t, err) assert.Equal(t, 61, len(tasks)) assert.Equal(t, time.Date(2017, 8, 11, 0, 0, 0, 0, time.UTC), tasks[0].StartDate) } + +func TestArchiveOrg(t *testing.T) { + db := setup(t) + ctx := context.Background() + + orgs, err := GetActiveOrgs(ctx, db) + assert.NoError(t, err) + now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) + + conf := NewConfig() + conf.UploadToS3 = false + conf.TempDir = "/tmp" + + archives, err := ArchiveOrg(ctx, now, conf, db, nil, orgs[1], MessageType) + + assert.Equal(t, 62, len(archives)) + assert.Equal(t, time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), archives[0].StartDate) + assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), archives[61].StartDate) + + assert.Equal(t, 0, archives[0].RecordCount) + assert.Equal(t, int64(23), archives[0].ArchiveSize) + assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", archives[0].ArchiveHash) + + assert.Equal(t, 2, archives[2].RecordCount) + assert.Equal(t, int64(365), archives[2].ArchiveSize) + assert.Equal(t, "cc67ae0d1edb9caa4c8c56b3d4de58ee", archives[2].ArchiveHash) +} diff --git a/cmd/rp-archiver/main.go b/cmd/rp-archiver/main.go index b444679..0545001 100644 --- a/cmd/rp-archiver/main.go +++ b/cmd/rp-archiver/main.go @@ -16,7 +16,7 @@ import ( "github.com/nyaruka/ezconf" archiver "github.com/nyaruka/rp-archiver" "github.com/nyaruka/rp-archiver/s3" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" ) func main() { @@ -25,35 +25,35 @@ func main() { loader.MustLoad() if config.DeleteAfterUpload && !config.UploadToS3 { - log.Fatal("cannot delete archives and also not upload to s3") + logrus.Fatal("cannot delete archives and also not upload to s3") } // configure our logger - log.SetOutput(os.Stdout) - log.SetFormatter(&log.TextFormatter{}) + logrus.SetOutput(os.Stdout) + logrus.SetFormatter(&logrus.TextFormatter{}) - level, err := log.ParseLevel(config.LogLevel) + level, err := logrus.ParseLevel(config.LogLevel) if err != nil { - log.Fatalf("Invalid log level '%s'", level) + logrus.Fatalf("Invalid log level '%s'", level) } - log.SetLevel(level) + logrus.SetLevel(level) // if we have a DSN entry, try to initialize it if config.SentryDSN != "" { - hook, err := logrus_sentry.NewSentryHook(config.SentryDSN, []log.Level{log.PanicLevel, log.FatalLevel, log.ErrorLevel}) + hook, err := logrus_sentry.NewSentryHook(config.SentryDSN, []logrus.Level{logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel}) hook.Timeout = 0 hook.StacktraceConfiguration.Enable = true hook.StacktraceConfiguration.Skip = 4 hook.StacktraceConfiguration.Context = 5 if err != nil { - log.Fatalf("Invalid sentry DSN: '%s': %s", config.SentryDSN, err) + logrus.Fatalf("Invalid sentry DSN: '%s': %s", config.SentryDSN, err) } - log.StandardLogger().Hooks.Add(hook) + logrus.StandardLogger().Hooks.Add(hook) } db, err := sqlx.Open("postgres", config.DB) if err != nil { - log.Fatal(err) + logrus.Fatal(err) } // create our s3 client @@ -65,10 +65,10 @@ func main() { S3ForcePathStyle: aws.Bool(config.S3ForcePathStyle), }) if err != nil { - log.WithError(err).Fatal("error creating s3 client") + logrus.WithError(err).Fatal("error creating s3 client") } s3Session.Handlers.Send.PushFront(func(r *request.Request) { - log.WithField("headers", r.HTTPRequest.Header).WithField("service", r.ClientInfo.ServiceName).WithField("operation", r.Operation).WithField("params", r.Params).Debug("making aws request") + logrus.WithField("headers", r.HTTPRequest.Header).WithField("service", r.ClientInfo.ServiceName).WithField("operation", r.Operation).WithField("params", r.Params).Debug("making aws request") }) s3Client := aws_s3.New(s3Session) @@ -77,75 +77,39 @@ func main() { // test out our S3 credentials err = s3.TestS3(s3Client, config.S3Bucket) if err != nil { - log.WithError(err).Fatal("s3 bucket not reachable") + logrus.WithError(err).Fatal("s3 bucket not reachable") } else { - log.Info("s3 bucket ok") + logrus.Info("s3 bucket ok") } } + // ensure that we can actually write to the temp directory ctx := context.Background() - orgs, err := archiver.GetActiveOrgs(ctx, db) + err = archiver.EnsureTempArchiveDirectory(config.TempDir) if err != nil { - log.Fatal(err) + logrus.WithError(err).Fatal("cannot write to temp directory") } - // ensure that we can actually write to the temp directory - dir_err := archiver.EnsureTempArchiveDirectory(ctx, config.TempDir) - if dir_err != nil { - log.Fatal(dir_err) + // get our active orgs + orgs, err := archiver.GetActiveOrgs(ctx, db) + if err != nil { + logrus.Fatal(err) } - now := time.Now() + // for each org, do our export for _, org := range orgs { - log := log.WithField("org", org.Name).WithField("org_id", org.ID) - orgStart := time.Now() - orgRecords := 0 - - tasks, err := archiver.GetArchiveTasks(ctx, db, now, org, archiver.MessageType) - if err != nil { - log.WithError(err).Error("error calculating message tasks") - continue - } - - for _, task := range tasks { - log = log.WithField("start_date", task.StartDate).WithField("period", task.Period).WithField("archive_type", task.ArchiveType) - log.Info("starting archive") - err := archiver.CreateArchiveFile(ctx, db, &task, config.TempDir) + log := logrus.WithField("org", org.Name).WithField("org_id", org.ID) + if config.ArchiveMessages { + _, err = archiver.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archiver.MessageType) if err != nil { - log.WithError(err).Error("error writing archive file") - continue + log.WithError(err).Error() } - - if config.UploadToS3 { - err = archiver.UploadArchive(ctx, s3Client, config.S3Bucket, &task) - if err != nil { - log.WithError(err).Error("error writing archive to s3") - continue - } - } - - err = archiver.WriteArchiveToDB(ctx, db, &task) + } + if config.ArchiveRuns { + _, err = archiver.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archiver.RunType) if err != nil { - log.WithError(err).Error("error writing record to db") - continue + log.WithError(err).Error() } - - if config.DeleteAfterUpload { - err := archiver.DeleteArchiveFile(&task) - if err != nil { - log.WithError(err).Error("error deleting temporary file") - continue - } - } - - log.WithField("id", task.ID).WithField("record_count", task.RecordCount).WithField("elapsed", task.BuildTime).Info("archive complete") - orgRecords += task.RecordCount - } - - if len(tasks) > 0 { - elapsed := time.Now().Sub(orgStart) - rate := float32(orgRecords) / (float32(elapsed) / float32(time.Second)) - log.WithField("elapsed", elapsed).WithField("records_per_second", int(rate)).Info("completed archival for org") } } } diff --git a/config.go b/config.go index 62dab63..fbab5ec 100644 --- a/config.go +++ b/config.go @@ -17,6 +17,9 @@ type Config struct { TempDir string `help:"directory where temporary archive files are written"` DeleteAfterUpload bool `help:"whether we should delete temporary archive file"` UploadToS3 bool `help:"whether we should upload archive to S3"` + + ArchiveMessages bool `help:"whether we should archive messages"` + ArchiveRuns bool `help:"whether we should archive runs"` } func NewConfig() *Config { @@ -36,6 +39,9 @@ func NewConfig() *Config { TempDir: "/tmp/archiver", DeleteAfterUpload: true, UploadToS3: true, + + ArchiveMessages: true, + ArchiveRuns: true, } return &config