From cf942ebcdb42c6ef2e1530ead7ef17c8819d9304 Mon Sep 17 00:00:00 2001 From: Nic Pottier Date: Thu, 25 Jul 2019 07:36:04 -0700 Subject: [PATCH] update table references, use wrapf for errors --- archiver.go | 230 ++++++++++++---------------------------- cmd/rp-archiver/main.go | 4 +- testdb.sql | 26 +---- 3 files changed, 73 insertions(+), 187 deletions(-) diff --git a/archiver.go b/archiver.go index ee4fb95..a97549d 100644 --- a/archiver.go +++ b/archiver.go @@ -111,7 +111,7 @@ func GetActiveOrgs(ctx context.Context, db *sqlx.DB) ([]Org, error) { rows, err := db.QueryxContext(ctx, lookupActiveOrgs) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "error fetching active orgs") } defer rows.Close() @@ -120,7 +120,7 @@ func GetActiveOrgs(ctx context.Context, db *sqlx.DB) ([]Org, error) { org := Org{ActiveDays: 90} err = rows.StructScan(&org) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "error scanning active org") } orgs = append(orgs, org) } @@ -142,7 +142,7 @@ func GetCurrentArchives(ctx context.Context, db *sqlx.DB, org Org, archiveType A archives := make([]*Archive, 0, 1) err := db.SelectContext(ctx, &archives, lookupOrgArchives, org.ID, archiveType) if err != nil && err != sql.ErrNoRows { - return nil, err + return nil, errors.Wrapf(err, "error selecting current archives for org: %d and type: %s", org.ID, archiveType) } return archives, nil @@ -162,7 +162,7 @@ func GetArchivesNeedingDeletion(ctx context.Context, db *sqlx.DB, org Org, archi archives := make([]*Archive, 0, 1) err := db.SelectContext(ctx, &archives, lookupArchivesNeedingDeletion, org.ID, archiveType) if err != nil && err != sql.ErrNoRows { - return nil, err + return nil, errors.Wrapf(err, "error selecting archives needing deletion for org: %d and type: %s", org.ID, archiveType) } return archives, nil @@ -174,21 +174,17 @@ FROM archives_archive WHERE org_id = $1 AND archive_type = $2 ` -// GetCurrentArchiveCount returns all the current archives for the passed in org and record type +// GetCurrentArchiveCount returns the archive count for the passed in org and record type func GetCurrentArchiveCount(ctx context.Context, db *sqlx.DB, org Org, archiveType ArchiveType) (int, error) { ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() var archiveCount int - rows, err := db.QueryxContext(ctx, lookupCountOrgArchives, org.ID, archiveType) + err := db.GetContext(ctx, &archiveCount, lookupCountOrgArchives, org.ID, archiveType) if err != nil { - return 0, err + return 0, errors.Wrapf(err, "error querying archive count for org: %d and type: %s", org.ID, archiveType) } - defer rows.Close() - - rows.Next() - err = rows.Scan(&archiveCount) return archiveCount, nil } @@ -210,7 +206,7 @@ func GetDailyArchivesForDateRange(ctx context.Context, db *sqlx.DB, org Org, arc err := db.SelectContext(ctx, &existingArchives, lookupOrgDailyArchivesForDateRange, org.ID, archiveType, DayPeriod, startDate, endDate) if err != nil && err != sql.ErrNoRows { - return nil, err + return nil, errors.Wrapf(err, "error selecting daily archives for org: %d and type: %s", org.ID, archiveType) } return existingArchives, nil @@ -252,7 +248,7 @@ func GetMissingDailyArchivesForDateRange(ctx context.Context, db *sqlx.DB, start rows, err := db.QueryxContext(ctx, lookupMissingDailyArchive, startDate, endDate, org.ID, DayPeriod, archiveType) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "error getting missing daily archives for org: %d and type: %s", org.ID, archiveType) } defer rows.Close() @@ -261,7 +257,7 @@ func GetMissingDailyArchivesForDateRange(ctx context.Context, db *sqlx.DB, start err = rows.Scan(&missingDay) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "error scanning missing daily archive for org: %d and type: %s", org.ID, archiveType) } archive := Archive{ Org: org, @@ -304,7 +300,7 @@ func GetMissingMonthlyArchives(ctx context.Context, db *sqlx.DB, now time.Time, rows, err := db.QueryxContext(ctx, lookupMissingMonthlyArchive, startDate, endDate, org.ID, MonthPeriod, archiveType) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "error getting missing monthly archive for org: %d and type: %s", org.ID, archiveType) } defer rows.Close() @@ -313,7 +309,7 @@ func GetMissingMonthlyArchives(ctx context.Context, db *sqlx.DB, now time.Time, err = rows.Scan(&missingMonth) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "error scanning missing monthly archive for org: %d and type: %s", org.ID, archiveType) } archive := Archive{ Org: org, @@ -336,13 +332,6 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client start := time.Now() - log := logrus.WithFields(logrus.Fields{ - "org_id": monthlyArchive.Org.ID, - "archive_type": monthlyArchive.ArchiveType, - "start_date": monthlyArchive.StartDate, - "period": monthlyArchive.Period, - }) - // figure out the first day in the monthlyArchive we'll archive startDate := monthlyArchive.StartDate endDate := startDate.AddDate(0, 1, 0).Add(time.Nanosecond * -1) @@ -365,8 +354,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client filename := fmt.Sprintf("%s_%d_%s_%d_%02d_", monthlyArchive.ArchiveType, monthlyArchive.Org.ID, monthlyArchive.Period, monthlyArchive.StartDate.Year(), monthlyArchive.StartDate.Month()) file, err := ioutil.TempFile(conf.TempDir, filename) if err != nil { - log.WithError(err).Error("error creating temp file") - return err + return errors.Wrapf(err, "error creating temp file: %s", filename) } writerHash := md5.New() gzWriter := gzip.NewWriter(io.MultiWriter(file, writerHash)) @@ -395,8 +383,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client reader, err := GetS3File(ctx, s3Client, daily.URL) if err != nil { - log.WithError(err).Error("error getting daily S3 file") - return err + return errors.Wrapf(err, "error reading S3 URL: %s", daily.URL) } // set up our reader to calculate our hash along the way @@ -404,15 +391,13 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client teeReader := io.TeeReader(reader, readerHash) gzipReader, err := gzip.NewReader(teeReader) if err != nil { - log.WithError(err).Error("error creating gzip reader") - return err + return errors.Wrapf(err, "error creating gzip reader") } // copy this daily file (uncompressed) to our new monthly file _, err = io.Copy(writer, gzipReader) if err != nil { - log.WithError(err).Error("error copying from s3 to disk") - return err + return errors.Wrapf(err, "error copying from s3 to disk for URL: %s", daily.URL) } reader.Close() @@ -442,7 +427,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client monthlyArchive.Hash = hex.EncodeToString(writerHash.Sum(nil)) stat, err := file.Stat() if err != nil { - return err + return errors.Wrapf(err, "error statting file: %s", file.Name()) } monthlyArchive.Size = stat.Size() monthlyArchive.RecordCount = recordCount @@ -464,7 +449,7 @@ func EnsureTempArchiveDirectory(path string) error { if os.IsNotExist(err) { return os.MkdirAll(path, 0700) } else if err != nil { - return err + return errors.Wrapf(err, "error statting temp dir: %s", path) } // is path a directory @@ -549,14 +534,14 @@ func writeMessageRecords(ctx context.Context, db *sqlx.DB, archive *Archive, wri rows, err := db.QueryxContext(ctx, lookupMsgs, archive.Org.ID, archive.StartDate, archive.endDate()) if err != nil { - return 0, err + return 0, errors.Wrapf(err, "error querying messages for org: %d", archive.Org.ID) } defer rows.Close() for rows.Next() { err = rows.Scan(&visibility, &record) if err != nil { - return 0, err + return 0, errors.Wrapf(err, "error scanning message row for org: %d", archive.Org.ID) } if visibility == "deleted" { @@ -624,35 +609,23 @@ func writeRunRecords(ctx context.Context, db *sqlx.DB, archive *Archive, writer var rows *sqlx.Rows rows, err := db.QueryxContext(ctx, lookupFlowRuns, archive.Org.IsAnon, archive.Org.ID, archive.StartDate, archive.endDate()) if err != nil { - return 0, err + return 0, errors.Wrapf(err, "error querying run records for org: %d", archive.Org.ID) } defer rows.Close() recordCount := 0 - var record, visibility string + var record string var exitedOn *time.Time for rows.Next() { - if archive.ArchiveType == MessageType { - err = rows.Scan(&visibility, &record) - if err != nil { - return 0, err - } - - // skip over deleted rows - if visibility == "deleted" { - continue - } - } else if archive.ArchiveType == RunType { - err = rows.Scan(&exitedOn, &record) + err = rows.Scan(&exitedOn, &record) - // shouldn't be archiving an active run, that's an error - if exitedOn == nil { - return 0, fmt.Errorf("run still active, cannot archive: %s", record) - } + // shouldn't be archiving an active run, that's an error + if exitedOn == nil { + return 0, fmt.Errorf("run still active, cannot archive: %s", record) + } - if err != nil { - return 0, err - } + if err != nil { + return 0, errors.Wrapf(err, "error scanning run record for org: %d", archive.Org.ID) } writer.WriteString(record) @@ -681,7 +654,7 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi filename := fmt.Sprintf("%s_%d_%s%d%02d%02d_", archive.ArchiveType, archive.Org.ID, archive.Period, archive.StartDate.Year(), archive.StartDate.Month(), archive.StartDate.Day()) file, err := ioutil.TempFile(archivePath, filename) if err != nil { - return err + return errors.Wrapf(err, "error creating temp file: %s", file.Name()) } hash := md5.New() gzWriter := gzip.NewWriter(io.MultiWriter(file, hash)) @@ -703,25 +676,25 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi } if err != nil { - return err + return errors.Wrapf(err, "error writing archive") } archive.ArchiveFile = file.Name() err = writer.Flush() if err != nil { - return err + return errors.Wrapf(err, "error flushing archive file") } err = gzWriter.Close() if err != nil { - return err + return errors.Wrapf(err, "error closing archive gzip writer") } // calculate our size and hash archive.Hash = hex.EncodeToString(hash.Sum(nil)) stat, err := file.Stat() if err != nil { - return err + return errors.Wrapf(err, "error calculating archive hash") } if stat.Size() > 5e9 { @@ -764,7 +737,7 @@ func UploadArchive(ctx context.Context, s3Client s3iface.S3API, bucket string, a err := UploadToS3(ctx, s3Client, bucket, archivePath, archive) if err != nil { - return err + return errors.Wrapf(err, "error uploading archive to S3") } archive.NeedsDeletion = true @@ -804,23 +777,20 @@ func WriteArchiveToDB(ctx context.Context, db *sqlx.DB, archive *Archive) error tx, err := db.BeginTxx(ctx, nil) if err != nil { - logrus.WithError(err).Error("error starting transaction") - return err + return errors.Wrapf(err, "error starting transaction") } rows, err := tx.NamedQuery(insertArchive, archive) if err != nil { - logrus.WithError(err).Error("error inserting archive") tx.Rollback() - return err + return errors.Wrapf(err, "error inserting archive") } rows.Next() err = rows.Scan(&archive.ID) if err != nil { - logrus.WithError(err).Error("error reading new archive id") tx.Rollback() - return err + return errors.Wrapf(err, "error reading new archive id") } rows.Close() @@ -834,29 +804,26 @@ func WriteArchiveToDB(ctx context.Context, db *sqlx.DB, archive *Archive) error result, err := tx.ExecContext(ctx, updateRollups, archive.ID, pq.Array(childIDs)) if err != nil { - logrus.WithError(err).Error("error updating rollup ids") tx.Rollback() - return err + return errors.Wrapf(err, "error updating rollup ids") } affected, err := result.RowsAffected() if err != nil { - logrus.WithError(err).Error("error getting number rollup ids updated") tx.Rollback() - return err + return errors.Wrapf(err, "error getting number of rollup ids updated") } if int(affected) != len(childIDs) { - logrus.Error("mismatch in number of children and number of rows updated") tx.Rollback() - return fmt.Errorf("mismatch in number of children updated") + return fmt.Errorf("mismatch in number of children updated and number of rows updated") } } err = tx.Commit() if err != nil { - logrus.WithError(err).Error("error comitting new archive") tx.Rollback() + return errors.Wrapf(err, "error committing new archive transaction") } - return err + return nil } // DeleteArchiveFile removes our own disk archive file @@ -864,7 +831,7 @@ func DeleteArchiveFile(archive *Archive) error { err := os.Remove(archive.ArchiveFile) if err != nil { - return err + return errors.Wrapf(err, "error deleting temp archive file: %s", archive.ArchiveFile) } logrus.WithFields(logrus.Fields{ @@ -889,54 +856,40 @@ func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *s archiveCount, err := GetCurrentArchiveCount(ctx, db, org, archiveType) if err != nil { - return nil, fmt.Errorf("error getting current archives") + return nil, errors.Wrapf(err, "error getting current archive count") } - var archives []*Archive + archives := make([]*Archive, 0) + + // no existing archives means this might be a backfill, figure out if there are full months we can build first if archiveCount == 0 { - // no existing archives means this might be a backfill, figure out if there are full monthes we can build first archives, err = GetMissingMonthlyArchives(ctx, db, now, org, archiveType) if err != nil { - log.WithError(err).Error("error calculating missing monthly archives") - return nil, err + return nil, errors.Wrapf(err, "error getting missing monthly archives") } // we first create monthly archives err = createArchives(ctx, db, config, s3Client, org, archives) if err != nil { - return nil, err - } - - // then add in daily archives taking into account the monthly that have been built - daily, err := GetMissingDailyArchives(ctx, db, now, org, archiveType) - if err != nil { - log.WithError(err).Error("error calculating missing daily archives") - return nil, err - } - // we then create missing daily archives - err = createArchives(ctx, db, config, s3Client, org, daily) - if err != nil { - return nil, err - } - - // append daily archives to the monthly archives - archives = append(archives, daily...) - defer ctx.Done() - } else { - // figure out any missing day archives - archives, err = GetMissingDailyArchives(ctx, db, now, org, archiveType) - if err != nil { - log.WithError(err).Error("error calculating missing daily archives") - return nil, err - } - - err = createArchives(ctx, db, config, s3Client, org, archives) - if err != nil { - return nil, err + return nil, errors.Wrapf(err, "error creating new monthly archives") } + } + // then add in daily archives taking into account the monthly that have been built + daily, err := GetMissingDailyArchives(ctx, db, now, org, archiveType) + if err != nil { + return nil, errors.Wrapf(err, "error getting missing daily archives") + } + // we then create missing daily archives + err = createArchives(ctx, db, config, s3Client, org, daily) + if err != nil { + return nil, errors.Wrapf(err, "error creating new daily archives") } + // append daily archives to any monthly archives + archives = append(archives, daily...) + defer ctx.Done() + // sum all records in the archives for _, archive := range archives { records += archive.RecordCount @@ -980,7 +933,7 @@ func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s err = UploadArchive(ctx, s3Client, config.S3Bucket, archive) if err != nil { log.WithError(err).Error("error writing archive to s3") - return err + continue } } @@ -1023,7 +976,7 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s // get our missing monthly archives archives, err := GetMissingMonthlyArchives(ctx, db, now, org, archiveType) if err != nil { - return nil, fmt.Errorf("error calculating missing monthly archives for type '%s'", archiveType) + return nil, err } // build them from rollups @@ -1409,27 +1362,11 @@ SET delete_reason = 'A' WHERE id IN(?) ` -const deleteWebhookEvents = ` -DELETE FROM api_webhookevent -WHERE run_id IN(?) -RETURNING id -` - -const deleteWebhookResults = ` -DELETE FROM api_webhookresult -WHERE event_id IN(?) -` - const deleteRecentRuns = ` DELETE FROM flows_flowpathrecentrun WHERE run_id IN(?) ` -const deleteActionLogs = ` -DELETE FROM flows_actionlog -WHERE run_id IN(?) -` - const unlinkParents = ` UPDATE flows_flowrun SET parent_id = NULL @@ -1534,33 +1471,6 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie return fmt.Errorf("error updating delete reason: %s", err.Error()) } - // delete our webhooks, getting back any dependent results - var hookIDs []int64 - q, vs, err := sqlx.In(deleteWebhookEvents, batchIDs) - if err != nil { - return err - } - q = tx.Rebind(q) - err = tx.SelectContext(ctx, &hookIDs, q, vs...) - if err != nil { - tx.Rollback() - return fmt.Errorf("error removing webhook events: %s", err.Error()) - } - - // if there are associated results, delete those too - if len(hookIDs) > 0 { - err = executeInQuery(ctx, tx, deleteWebhookResults, hookIDs) - if err != nil { - return fmt.Errorf("error removing webhook results: %s", err.Error()) - } - } - - // any action logs - err = executeInQuery(ctx, tx, deleteActionLogs, batchIDs) - if err != nil { - return fmt.Errorf("error removing action logs: %s", err.Error()) - } - // any recent runs err = executeInQuery(ctx, tx, deleteRecentRuns, batchIDs) if err != nil { @@ -1649,7 +1559,7 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config } if err != nil { - log.WithError(err).Error("Error deleting archive") + log.WithError(err).Error("error deleting archive") continue } @@ -1666,12 +1576,12 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config func ArchiveOrg(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) { created, err := CreateOrgArchives(ctx, now, config, db, s3Client, org, archiveType) if err != nil { - return nil, nil, err + return nil, nil, errors.Wrapf(err, "error creating archives") } monthlies, err := RollupOrgArchives(ctx, now, config, db, s3Client, org, archiveType) if err != nil { - return nil, nil, err + return nil, nil, errors.Wrapf(err, "error rolling up archives") } for _, m := range monthlies { @@ -1683,7 +1593,7 @@ func ArchiveOrg(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, if config.Delete { deleted, err = DeleteArchivedOrgRecords(ctx, now, config, db, s3Client, org, archiveType) if err != nil { - return created, deleted, err + return created, deleted, errors.Wrapf(err, "error deleting archived records") } } diff --git a/cmd/rp-archiver/main.go b/cmd/rp-archiver/main.go index 758b8d4..a6e3693 100644 --- a/cmd/rp-archiver/main.go +++ b/cmd/rp-archiver/main.go @@ -102,13 +102,13 @@ func main() { if config.ArchiveMessages { _, _, err = archiver.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archiver.MessageType) if err != nil { - log.WithError(err).Error() + log.WithError(err).WithField("archive_type", archiver.MessageType).Error("error archiving org messages") } } if config.ArchiveRuns { _, _, err = archiver.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archiver.RunType) if err != nil { - log.WithError(err).Error() + log.WithError(err).WithField("archive_type", archiver.RunType).Error("error archiving org runs") } } diff --git a/testdb.sql b/testdb.sql index b82213e..b25d8a7 100644 --- a/testdb.sql +++ b/testdb.sql @@ -214,26 +214,11 @@ CREATE TABLE channels_channellog ( msg_id integer NOT NULL references msgs_msg(id) ); -CREATE TABLE api_webhookevent ( - id serial primary key, - run_id integer NOT NULL references flows_flowrun(id) DEFERRABLE INITIALLY DEFERRED -); - -CREATE TABLE api_webhookresult ( - id serial primary key, - event_id integer NOT NULL references api_webhookevent(id) DEFERRABLE INITIALLY DEFERRED -); - CREATE TABLE flows_flowpathrecentrun ( id serial primary key, run_id integer NOT NULL references flows_flowrun(id) DEFERRABLE INITIALLY DEFERRED ); -CREATE TABLE flows_actionlog ( - id serial primary key, - run_id integer NOT NULL references flows_flowrun(id) DEFERRABLE INITIALLY DEFERRED -); - INSERT INTO orgs_language(id, iso_code) VALUES (1, 'eng'); @@ -352,14 +337,5 @@ INSERT INTO flows_flowrun(id, uuid, responded, contact_id, flow_id, org_id, resu (6, '6262eefe-a6e9-4201-9b76-a7f25e3b7f29', TRUE, 7, 2, 3, '{}', '[]', '[]', '2017-12-12 21:11:59.890662+02:00','2017-12-12 21:11:59.890662+02:00','2017-12-12 21:11:59.890662+02:00', 'C', 4, NULL); -INSERT INTO api_webhookevent(id, run_id) VALUES -(1, 3); - -INSERT INTO api_webhookresult(id, event_id) VALUES -(1, 1); - INSERT INTO flows_flowpathrecentrun(id, run_id) VALUES -(1, 3); - -INSERT INTO flows_actionlog(id, run_id) VALUES -(1, 3); +(1, 3); \ No newline at end of file