From d5d46f6ccf6d2e820017bdc867c0bbcd22033209 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Fri, 1 Oct 2021 11:37:48 -0500 Subject: [PATCH] Move code into archives package and split run and message specific code into their own modules --- archiver.go => archives/archives.go | 645 +----------------- archiver_test.go => archives/archives_test.go | 6 +- config.go => archives/config.go | 2 +- archives/messages.go | 398 +++++++++++ archives/runs.go | 268 ++++++++ s3.go => archives/s3.go | 2 +- .../testdata}/messages1.jsonl | 0 .../testdata}/messages2.jsonl | 0 {testdata => archives/testdata}/runs1.jsonl | 0 {testdata => archives/testdata}/runs2.jsonl | 0 cmd/rp-archiver/main.go | 18 +- testdb.sql | 2 +- 12 files changed, 683 insertions(+), 658 deletions(-) rename archiver.go => archives/archives.go (61%) rename archiver_test.go => archives/archives_test.go (99%) rename config.go => archives/config.go (99%) create mode 100644 archives/messages.go create mode 100644 archives/runs.go rename s3.go => archives/s3.go (99%) rename {testdata => archives/testdata}/messages1.jsonl (100%) rename {testdata => archives/testdata}/messages2.jsonl (100%) rename {testdata => archives/testdata}/runs1.jsonl (100%) rename {testdata => archives/testdata}/runs2.jsonl (100%) diff --git a/archiver.go b/archives/archives.go similarity index 61% rename from archiver.go rename to archives/archives.go index f0d770c..f72f9b5 100644 --- a/archiver.go +++ b/archives/archives.go @@ -1,4 +1,4 @@ -package archiver +package archives import ( "bufio" @@ -10,10 +10,9 @@ import ( "fmt" "io" "io/ioutil" - "time" - "os" "path/filepath" + "time" "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/jmoiron/sqlx" @@ -462,173 +461,6 @@ func EnsureTempArchiveDirectory(path string) error { return err } -const lookupMsgs = ` -SELECT rec.visibility, row_to_json(rec) FROM ( - SELECT - mm.id, - broadcast_id as broadcast, - row_to_json(contact) as contact, - CASE WHEN oo.is_anon = False THEN ccu.identity ELSE null END as urn, - row_to_json(channel) as channel, - CASE WHEN direction = 'I' THEN 'in' - WHEN direction = 'O' THEN 'out' - ELSE NULL - END as direction, - CASE WHEN msg_type = 'F' - THEN 'flow' - WHEN msg_type = 'V' - THEN 'ivr' - WHEN msg_type = 'I' - THEN 'inbox' - ELSE NULL - END as "type", - CASE when status = 'I' then 'initializing' - WHEN status = 'P' then 'queued' - WHEN status = 'Q' then 'queued' - WHEN status = 'W' then 'wired' - WHEN status = 'D' then 'delivered' - WHEN status = 'H' then 'handled' - WHEN status = 'E' then 'errored' - WHEN status = 'F' then 'failed' - WHEN status = 'S' then 'sent' - WHEN status = 'R' then 'resent' - ELSE NULL - END as status, - - CASE WHEN visibility = 'V' THEN 'visible' - WHEN visibility = 'A' THEN 'archived' - WHEN visibility = 'D' THEN 'deleted' - ELSE NULL - END as visibility, - text, - (select coalesce(jsonb_agg(attach_row), '[]'::jsonb) FROM (select attach_data.attachment[1] as content_type, attach_data.attachment[2] as url FROM (select regexp_matches(unnest(attachments), '^(.*?):(.*)$') attachment) as attach_data) as attach_row) as attachments, - labels_agg.data as labels, - mm.created_on as created_on, - sent_on, - mm.modified_on as modified_on - FROM msgs_msg mm - JOIN orgs_org oo ON mm.org_id = oo.id - JOIN LATERAL (select uuid, name from contacts_contact cc where cc.id = mm.contact_id) as contact ON True - LEFT JOIN contacts_contacturn ccu ON mm.contact_urn_id = ccu.id - LEFT JOIN LATERAL (select uuid, name from channels_channel ch where ch.id = mm.channel_id) as channel ON True - LEFT JOIN LATERAL (select coalesce(jsonb_agg(label_row), '[]'::jsonb) as data from (select uuid, name from msgs_label ml INNER JOIN msgs_msg_labels mml ON ml.id = mml.label_id AND mml.msg_id = mm.id) as label_row) as labels_agg ON True - - WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3 - ORDER BY created_on ASC, id ASC) rec; -` - -// writeMessageRecords writes the messages in the archive's date range to the passed in writer -func writeMessageRecords(ctx context.Context, db *sqlx.DB, archive *Archive, writer *bufio.Writer) (int, error) { - var rows *sqlx.Rows - recordCount := 0 - - // first write our normal records - var record, visibility string - - rows, err := db.QueryxContext(ctx, lookupMsgs, archive.Org.ID, archive.StartDate, archive.endDate()) - if err != nil { - return 0, errors.Wrapf(err, "error querying messages for org: %d", archive.Org.ID) - } - defer rows.Close() - - for rows.Next() { - err = rows.Scan(&visibility, &record) - if err != nil { - return 0, errors.Wrapf(err, "error scanning message row for org: %d", archive.Org.ID) - } - - if visibility == "deleted" { - continue - } - writer.WriteString(record) - writer.WriteString("\n") - recordCount++ - } - - logrus.WithField("record_count", recordCount).Debug("Done Writing") - return recordCount, nil -} - -const lookupFlowRuns = ` -SELECT rec.exited_on, row_to_json(rec) -FROM ( - SELECT - fr.id as id, - fr.uuid as uuid, - row_to_json(flow_struct) AS flow, - row_to_json(contact_struct) AS contact, - fr.responded, - (SELECT coalesce(jsonb_agg(path_data), '[]'::jsonb) from ( - SELECT path_row ->> 'node_uuid' AS node, (path_row ->> 'arrived_on')::timestamptz as time - FROM jsonb_array_elements(fr.path::jsonb) AS path_row) as path_data - ) as path, - (SELECT coalesce(jsonb_object_agg(values_data.key, values_data.value), '{}'::jsonb) from ( - SELECT key, jsonb_build_object('name', value -> 'name', 'value', value -> 'value', 'input', value -> 'input', 'time', (value -> 'created_on')::text::timestamptz, 'category', value -> 'category', 'node', value -> 'node_uuid') as value - FROM jsonb_each(fr.results::jsonb)) AS values_data - ) as values, - CASE - WHEN $1 - THEN '[]'::jsonb - ELSE - coalesce(fr.events, '[]'::jsonb) - END AS events, - fr.created_on, - fr.modified_on, - fr.exited_on, - CASE - WHEN exit_type = 'C' - THEN 'completed' - WHEN exit_type = 'I' - THEN 'interrupted' - WHEN exit_type = 'E' - THEN 'expired' - ELSE - null - END as exit_type, - a.username as submitted_by - - FROM flows_flowrun fr - LEFT JOIN auth_user a ON a.id = fr.submitted_by_id - JOIN LATERAL (SELECT uuid, name FROM flows_flow WHERE flows_flow.id = fr.flow_id) AS flow_struct ON True - JOIN LATERAL (SELECT uuid, name FROM contacts_contact cc WHERE cc.id = fr.contact_id) AS contact_struct ON True - - WHERE fr.org_id = $2 AND fr.modified_on >= $3 AND fr.modified_on < $4 - ORDER BY fr.modified_on ASC, id ASC -) as rec; -` - -// writeRunRecords writes the runs in the archive's date range to the passed in writer -func writeRunRecords(ctx context.Context, db *sqlx.DB, archive *Archive, writer *bufio.Writer) (int, error) { - var rows *sqlx.Rows - rows, err := db.QueryxContext(ctx, lookupFlowRuns, archive.Org.IsAnon, archive.Org.ID, archive.StartDate, archive.endDate()) - if err != nil { - return 0, errors.Wrapf(err, "error querying run records for org: %d", archive.Org.ID) - } - defer rows.Close() - - recordCount := 0 - var record string - var exitedOn *time.Time - for rows.Next() { - err = rows.Scan(&exitedOn, &record) - - // shouldn't be archiving an active run, that's an error - if exitedOn == nil { - return 0, fmt.Errorf("run still active, cannot archive: %s", record) - } - - if err != nil { - return 0, errors.Wrapf(err, "error scanning run record for org: %d", archive.Org.ID) - } - - writer.WriteString(record) - writer.WriteString("\n") - recordCount++ - } - - return recordCount, nil -} - // CreateArchiveFile is responsible for writing an archive file for the passed in archive from our database func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archivePath string) error { ctx, cancel := context.WithTimeout(ctx, time.Hour*3) @@ -1044,41 +876,6 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s return created, nil } -const selectOrgMessagesInRange = ` -SELECT mm.id, mm.visibility -FROM msgs_msg mm -LEFT JOIN contacts_contact cc ON cc.id = mm.contact_id -WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3 -ORDER BY mm.created_on ASC, mm.id ASC -` - -const setMessageDeleteReason = ` -UPDATE msgs_msg -SET delete_reason = 'A' -WHERE id IN(?) -` - -const deleteMessageLogs = ` -DELETE FROM channels_channellog -WHERE msg_id IN(?) -` - -const deleteMessageLabels = ` -DELETE FROM msgs_msg_labels -WHERE msg_id IN(?) -` - -const unlinkResponses = ` -UPDATE msgs_msg -SET response_to_id = NULL -WHERE response_to_id IN(?) -` - -const deleteMessages = ` -DELETE FROM msgs_msg -WHERE id IN(?) -` - const setArchiveDeleted = ` UPDATE archives_archive SET needs_deletion = FALSE, deleted_on = $2 @@ -1102,444 +899,6 @@ func executeInQuery(ctx context.Context, tx *sqlx.Tx, query string, ids []int64) var deleteTransactionSize = 100 -// DeleteArchivedMessages takes the passed in archive, verifies the S3 file is still present (and correct), then selects -// all the messages in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time -// -// Upon completion it updates the needs_deletion flag on the archive -func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3Client s3iface.S3API, archive *Archive) error { - outer, cancel := context.WithTimeout(ctx, time.Hour*3) - defer cancel() - - start := time.Now() - log := logrus.WithFields(logrus.Fields{ - "id": archive.ID, - "org_id": archive.OrgID, - "start_date": archive.StartDate, - "end_date": archive.endDate(), - "archive_type": archive.ArchiveType, - "total_count": archive.RecordCount, - }) - log.Info("deleting messages") - - // first things first, make sure our file is present on S3 - md5, err := GetS3FileETAG(outer, s3Client, archive.URL) - if err != nil { - return err - } - - // if our etag and archive md5 don't match, that's an error, return - if md5 != archive.Hash { - return fmt.Errorf("archive md5: %s and s3 etag: %s do not match", archive.Hash, md5) - } - - // ok, archive file looks good, let's build up our list of message ids, this may be big but we are int64s so shouldn't be too big - rows, err := db.QueryxContext(outer, selectOrgMessagesInRange, archive.OrgID, archive.StartDate, archive.endDate()) - if err != nil { - return err - } - defer rows.Close() - - visibleCount := 0 - var msgID int64 - var visibility string - msgIDs := make([]int64, 0, archive.RecordCount) - for rows.Next() { - err = rows.Scan(&msgID, &visibility) - if err != nil { - return err - } - msgIDs = append(msgIDs, msgID) - - // keep track of the number of visible messages, ie, not deleted - if visibility != "D" { - visibleCount++ - } - } - rows.Close() - - log.WithFields(logrus.Fields{ - "msg_count": len(msgIDs), - }).Debug("found messages") - - // verify we don't see more messages than there are in our archive (fewer is ok) - if visibleCount > archive.RecordCount { - return fmt.Errorf("more messages in the database: %d than in archive: %d", visibleCount, archive.RecordCount) - } - - // ok, delete our messages in batches, we do this in transactions as it spans a few different queries - for startIdx := 0; startIdx < len(msgIDs); startIdx += deleteTransactionSize { - // no single batch should take more than a few minutes - ctx, cancel := context.WithTimeout(ctx, time.Minute*15) - defer cancel() - - start := time.Now() - - endIdx := startIdx + deleteTransactionSize - if endIdx > len(msgIDs) { - endIdx = len(msgIDs) - } - batchIDs := msgIDs[startIdx:endIdx] - - // start our transaction - tx, err := db.BeginTxx(ctx, nil) - if err != nil { - return err - } - - // first update our delete_reason - err = executeInQuery(ctx, tx, setMessageDeleteReason, batchIDs) - if err != nil { - return fmt.Errorf("error updating delete reason: %s", err.Error()) - } - - // now delete any channel logs - err = executeInQuery(ctx, tx, deleteMessageLogs, batchIDs) - if err != nil { - return fmt.Errorf("error removing channel logs: %s", err.Error()) - } - - // then any labels - err = executeInQuery(ctx, tx, deleteMessageLabels, batchIDs) - if err != nil { - return fmt.Errorf("error removing message labels: %s", err.Error()) - } - - // unlink any responses - err = executeInQuery(ctx, tx, unlinkResponses, batchIDs) - if err != nil { - return fmt.Errorf("error unlinking responses: %s", err.Error()) - } - - // finally, delete our messages - err = executeInQuery(ctx, tx, deleteMessages, batchIDs) - if err != nil { - return fmt.Errorf("error deleting messages: %s", err.Error()) - } - - // commit our transaction - err = tx.Commit() - if err != nil { - return fmt.Errorf("error committing message delete transaction: %s", err.Error()) - } - - log.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - "count": len(batchIDs), - }).Debug("deleted batch of messages") - - cancel() - } - - outer, cancel = context.WithTimeout(ctx, time.Minute) - defer cancel() - - deletedOn := time.Now() - - // all went well! mark our archive as no longer needing deletion - _, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn) - if err != nil { - return fmt.Errorf("error setting archive as deleted: %s", err.Error()) - } - archive.NeedsDeletion = false - archive.DeletedOn = &deletedOn - - logrus.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - }).Info("completed deleting messages") - - return nil -} - -// DeleteBroadcasts deletes all broadcasts older than 90 days for the passed in org which have no active messages on them -func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, org Org) error { - start := time.Now() - threshhold := now.AddDate(0, 0, -org.RetentionPeriod) - - rows, err := db.QueryxContext(ctx, selectOldOrgBroadcasts, org.ID, threshhold) - if err != nil { - return err - } - defer rows.Close() - - count := 0 - for rows.Next() { - if count == 0 { - logrus.WithField("org_id", org.ID).Info("deleting broadcasts") - } - - // been deleting this org more than an hour? thats enough for today, exit out - if time.Since(start) > time.Hour { - break - } - - var broadcastID int64 - err := rows.Scan(&broadcastID) - if err != nil { - return errors.Wrap(err, "unable to get broadcast id") - } - - // make sure we have no active messages - var msgCount int64 - err = db.Get(&msgCount, `SELECT count(*) FROM msgs_msg WHERE broadcast_id = $1`, broadcastID) - if err != nil { - return errors.Wrapf(err, "unable to select number of msgs for broadcast: %d", broadcastID) - } - - if msgCount != 0 { - logrus.WithField("broadcast_id", broadcastID).WithField("org_id", org.ID).WithField("msg_count", msgCount).Warn("unable to delete broadcast, has messages still") - continue - } - - // we delete broadcasts in a transaction per broadcast - tx, err := db.BeginTx(ctx, nil) - if err != nil { - return errors.Wrapf(err, "error starting transaction while deleting broadcast: %d", broadcastID) - } - - // delete contacts M2M - _, err = tx.Exec(`DELETE from msgs_broadcast_contacts WHERE broadcast_id = $1`, broadcastID) - if err != nil { - tx.Rollback() - return errors.Wrapf(err, "error deleting related contacts for broadcast: %d", broadcastID) - } - - // delete groups M2M - _, err = tx.Exec(`DELETE from msgs_broadcast_groups WHERE broadcast_id = $1`, broadcastID) - if err != nil { - tx.Rollback() - return errors.Wrapf(err, "error deleting related groups for broadcast: %d", broadcastID) - } - - // delete URNs M2M - _, err = tx.Exec(`DELETE from msgs_broadcast_urns WHERE broadcast_id = $1`, broadcastID) - if err != nil { - tx.Rollback() - return errors.Wrapf(err, "error deleting related urns for broadcast: %d", broadcastID) - } - - // delete counts associated with this broadcast - _, err = tx.Exec(`DELETE from msgs_broadcastmsgcount WHERE broadcast_id = $1`, broadcastID) - if err != nil { - tx.Rollback() - return errors.Wrapf(err, "error deleting counts for broadcast: %d", broadcastID) - } - - // finally, delete our broadcast - _, err = tx.Exec(`DELETE from msgs_broadcast WHERE id = $1`, broadcastID) - if err != nil { - tx.Rollback() - return errors.Wrapf(err, "error deleting broadcast: %d", broadcastID) - } - - err = tx.Commit() - if err != nil { - return errors.Wrapf(err, "error deleting broadcast: %d", broadcastID) - } - - count++ - } - - if count > 0 { - logrus.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - "count": count, - "org_id": org.ID, - }).Info("completed deleting broadcasts") - } - - return nil -} - -const selectOldOrgBroadcasts = ` -SELECT - id -FROM - msgs_broadcast -WHERE - org_id = $1 AND - created_on < $2 AND - schedule_id IS NULL -ORDER BY - created_on ASC, - id ASC -LIMIT 1000000; -` - -const selectOrgRunsInRange = ` -SELECT fr.id, fr.is_active -FROM flows_flowrun fr -LEFT JOIN contacts_contact cc ON cc.id = fr.contact_id -WHERE fr.org_id = $1 AND fr.modified_on >= $2 AND fr.modified_on < $3 -ORDER BY fr.modified_on ASC, fr.id ASC -` - -const setRunDeleteReason = ` -UPDATE flows_flowrun -SET delete_reason = 'A' -WHERE id IN(?) -` - -const deleteRecentRuns = ` -DELETE FROM flows_flowpathrecentrun -WHERE run_id IN(?) -` - -const unlinkParents = ` -UPDATE flows_flowrun -SET parent_id = NULL -WHERE parent_id IN(?) -` - -const deleteRuns = ` -DELETE FROM flows_flowrun -WHERE id IN(?) -` - -// DeleteArchivedRuns takes the passed in archive, verifies the S3 file is still present (and correct), then selects -// all the runs in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time -// -// Upon completion it updates the needs_deletion flag on the archive -func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Client s3iface.S3API, archive *Archive) error { - outer, cancel := context.WithTimeout(ctx, time.Hour*3) - defer cancel() - - start := time.Now() - log := logrus.WithFields(logrus.Fields{ - "id": archive.ID, - "org_id": archive.OrgID, - "start_date": archive.StartDate, - "end_date": archive.endDate(), - "archive_type": archive.ArchiveType, - "total_count": archive.RecordCount, - }) - log.Info("deleting runs") - - // first things first, make sure our file is present on S3 - md5, err := GetS3FileETAG(outer, s3Client, archive.URL) - if err != nil { - return err - } - - // if our etag and archive md5 don't match, that's an error, return - if md5 != archive.Hash { - return fmt.Errorf("archive md5: %s and s3 etag: %s do not match", archive.Hash, md5) - } - - // ok, archive file looks good, let's build up our list of run ids, this may be big but we are int64s so shouldn't be too big - rows, err := db.QueryxContext(outer, selectOrgRunsInRange, archive.OrgID, archive.StartDate, archive.endDate()) - if err != nil { - return err - } - defer rows.Close() - - var runID int64 - var isActive bool - runCount := 0 - runIDs := make([]int64, 0, archive.RecordCount) - for rows.Next() { - err = rows.Scan(&runID, &isActive) - if err != nil { - return err - } - - // if this run is still active, something has gone wrong, throw an error - if isActive { - return fmt.Errorf("run %d in archive is still active", runID) - } - - // increment our count - runCount++ - runIDs = append(runIDs, runID) - } - rows.Close() - - log.WithFields(logrus.Fields{ - "run_count": len(runIDs), - }).Debug("found runs") - - // verify we don't see more runs than there are in our archive (fewer is ok) - if runCount > archive.RecordCount { - return fmt.Errorf("more runs in the database: %d than in archive: %d", runCount, archive.RecordCount) - } - - // ok, delete our runs in batches, we do this in transactions as it spans a few different queries - for startIdx := 0; startIdx < len(runIDs); startIdx += deleteTransactionSize { - // no single batch should take more than a few minutes - ctx, cancel := context.WithTimeout(ctx, time.Minute*15) - defer cancel() - - start := time.Now() - - endIdx := startIdx + deleteTransactionSize - if endIdx > len(runIDs) { - endIdx = len(runIDs) - } - batchIDs := runIDs[startIdx:endIdx] - - // start our transaction - tx, err := db.BeginTxx(ctx, nil) - if err != nil { - return err - } - - // first update our delete_reason - err = executeInQuery(ctx, tx, setRunDeleteReason, batchIDs) - if err != nil { - return fmt.Errorf("error updating delete reason: %s", err.Error()) - } - - // any recent runs - err = executeInQuery(ctx, tx, deleteRecentRuns, batchIDs) - if err != nil { - return fmt.Errorf("error deleting recent runs: %s", err.Error()) - } - - // unlink any parents - err = executeInQuery(ctx, tx, unlinkParents, batchIDs) - if err != nil { - return fmt.Errorf("error unliking parent runs: %s", err.Error()) - } - - // finally, delete our runs - err = executeInQuery(ctx, tx, deleteRuns, batchIDs) - if err != nil { - return fmt.Errorf("error deleting runs: %s", err.Error()) - } - - // commit our transaction - err = tx.Commit() - if err != nil { - return fmt.Errorf("error committing run delete transaction: %s", err.Error()) - } - - log.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - "count": len(batchIDs), - }).Debug("deleted batch of runs") - - cancel() - } - - outer, cancel = context.WithTimeout(ctx, time.Minute) - defer cancel() - - deletedOn := time.Now() - - // all went well! mark our archive as no longer needing deletion - _, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn) - if err != nil { - return fmt.Errorf("error setting archive as deleted: %s", err.Error()) - } - archive.NeedsDeletion = false - archive.DeletedOn = &deletedOn - - logrus.WithFields(logrus.Fields{ - "elapsed": time.Since(start), - }).Info("completed deleting runs") - - return nil -} - // DeleteArchivedOrgRecords deletes all the records for the passeg in org based on archives already created func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) { // get all the archives that haven't yet been deleted diff --git a/archiver_test.go b/archives/archives_test.go similarity index 99% rename from archiver_test.go rename to archives/archives_test.go index 87cbf84..7997cc8 100644 --- a/archiver_test.go +++ b/archives/archives_test.go @@ -1,4 +1,4 @@ -package archiver +package archives import ( "compress/gzip" @@ -16,7 +16,7 @@ import ( ) func setup(t *testing.T) *sqlx.DB { - testDB, err := ioutil.ReadFile("testdb.sql") + testDB, err := ioutil.ReadFile("../testdb.sql") assert.NoError(t, err) db, err := sqlx.Open("postgres", "postgres://temba:temba@localhost:5432/archiver_test?sslmode=disable&TimeZone=UTC") @@ -487,7 +487,7 @@ func TestArchiveOrgRuns(t *testing.T) { assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), created[11].StartDate) assert.Equal(t, DayPeriod, created[11].Period) assert.Equal(t, 1, created[11].RecordCount) - assert.Equal(t, int64(427), created[11].Size) + assert.Equal(t, int64(428), created[11].Size) assert.Equal(t, "bf08041cef314492fee2910357ec4189", created[11].Hash) assert.Equal(t, 12, len(deleted)) diff --git a/config.go b/archives/config.go similarity index 99% rename from config.go rename to archives/config.go index 6fe9af3..5ec635a 100644 --- a/config.go +++ b/archives/config.go @@ -1,4 +1,4 @@ -package archiver +package archives // Config is our top level configuration object type Config struct { diff --git a/archives/messages.go b/archives/messages.go new file mode 100644 index 0000000..a3a5119 --- /dev/null +++ b/archives/messages.go @@ -0,0 +1,398 @@ +package archives + +import ( + "bufio" + "context" + "fmt" + "time" + + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const lookupMsgs = ` +SELECT rec.visibility, row_to_json(rec) FROM ( + SELECT + mm.id, + broadcast_id as broadcast, + row_to_json(contact) as contact, + CASE WHEN oo.is_anon = False THEN ccu.identity ELSE null END as urn, + row_to_json(channel) as channel, + CASE WHEN direction = 'I' THEN 'in' + WHEN direction = 'O' THEN 'out' + ELSE NULL + END as direction, + CASE WHEN msg_type = 'F' + THEN 'flow' + WHEN msg_type = 'V' + THEN 'ivr' + WHEN msg_type = 'I' + THEN 'inbox' + ELSE NULL + END as "type", + CASE when status = 'I' then 'initializing' + WHEN status = 'P' then 'queued' + WHEN status = 'Q' then 'queued' + WHEN status = 'W' then 'wired' + WHEN status = 'D' then 'delivered' + WHEN status = 'H' then 'handled' + WHEN status = 'E' then 'errored' + WHEN status = 'F' then 'failed' + WHEN status = 'S' then 'sent' + WHEN status = 'R' then 'resent' + ELSE NULL + END as status, + + CASE WHEN visibility = 'V' THEN 'visible' + WHEN visibility = 'A' THEN 'archived' + WHEN visibility = 'D' THEN 'deleted' + ELSE NULL + END as visibility, + text, + (select coalesce(jsonb_agg(attach_row), '[]'::jsonb) FROM (select attach_data.attachment[1] as content_type, attach_data.attachment[2] as url FROM (select regexp_matches(unnest(attachments), '^(.*?):(.*)$') attachment) as attach_data) as attach_row) as attachments, + labels_agg.data as labels, + mm.created_on as created_on, + sent_on, + mm.modified_on as modified_on + FROM msgs_msg mm + JOIN orgs_org oo ON mm.org_id = oo.id + JOIN LATERAL (select uuid, name from contacts_contact cc where cc.id = mm.contact_id) as contact ON True + LEFT JOIN contacts_contacturn ccu ON mm.contact_urn_id = ccu.id + LEFT JOIN LATERAL (select uuid, name from channels_channel ch where ch.id = mm.channel_id) as channel ON True + LEFT JOIN LATERAL (select coalesce(jsonb_agg(label_row), '[]'::jsonb) as data from (select uuid, name from msgs_label ml INNER JOIN msgs_msg_labels mml ON ml.id = mml.label_id AND mml.msg_id = mm.id) as label_row) as labels_agg ON True + + WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3 + ORDER BY created_on ASC, id ASC) rec; +` + +// writeMessageRecords writes the messages in the archive's date range to the passed in writer +func writeMessageRecords(ctx context.Context, db *sqlx.DB, archive *Archive, writer *bufio.Writer) (int, error) { + var rows *sqlx.Rows + recordCount := 0 + + // first write our normal records + var record, visibility string + + rows, err := db.QueryxContext(ctx, lookupMsgs, archive.Org.ID, archive.StartDate, archive.endDate()) + if err != nil { + return 0, errors.Wrapf(err, "error querying messages for org: %d", archive.Org.ID) + } + defer rows.Close() + + for rows.Next() { + err = rows.Scan(&visibility, &record) + if err != nil { + return 0, errors.Wrapf(err, "error scanning message row for org: %d", archive.Org.ID) + } + + if visibility == "deleted" { + continue + } + writer.WriteString(record) + writer.WriteString("\n") + recordCount++ + } + + logrus.WithField("record_count", recordCount).Debug("Done Writing") + return recordCount, nil +} + +const selectOrgMessagesInRange = ` +SELECT mm.id, mm.visibility +FROM msgs_msg mm +LEFT JOIN contacts_contact cc ON cc.id = mm.contact_id +WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3 +ORDER BY mm.created_on ASC, mm.id ASC +` + +const setMessageDeleteReason = ` +UPDATE msgs_msg +SET delete_reason = 'A' +WHERE id IN(?) +` + +const deleteMessageLogs = ` +DELETE FROM channels_channellog +WHERE msg_id IN(?) +` + +const deleteMessageLabels = ` +DELETE FROM msgs_msg_labels +WHERE msg_id IN(?) +` + +const unlinkResponses = ` +UPDATE msgs_msg +SET response_to_id = NULL +WHERE response_to_id IN(?) +` + +const deleteMessages = ` +DELETE FROM msgs_msg +WHERE id IN(?) +` + +// DeleteArchivedMessages takes the passed in archive, verifies the S3 file is still present (and correct), then selects +// all the messages in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time +// +// Upon completion it updates the needs_deletion flag on the archive +func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3Client s3iface.S3API, archive *Archive) error { + outer, cancel := context.WithTimeout(ctx, time.Hour*3) + defer cancel() + + start := time.Now() + log := logrus.WithFields(logrus.Fields{ + "id": archive.ID, + "org_id": archive.OrgID, + "start_date": archive.StartDate, + "end_date": archive.endDate(), + "archive_type": archive.ArchiveType, + "total_count": archive.RecordCount, + }) + log.Info("deleting messages") + + // first things first, make sure our file is present on S3 + md5, err := GetS3FileETAG(outer, s3Client, archive.URL) + if err != nil { + return err + } + + // if our etag and archive md5 don't match, that's an error, return + if md5 != archive.Hash { + return fmt.Errorf("archive md5: %s and s3 etag: %s do not match", archive.Hash, md5) + } + + // ok, archive file looks good, let's build up our list of message ids, this may be big but we are int64s so shouldn't be too big + rows, err := db.QueryxContext(outer, selectOrgMessagesInRange, archive.OrgID, archive.StartDate, archive.endDate()) + if err != nil { + return err + } + defer rows.Close() + + visibleCount := 0 + var msgID int64 + var visibility string + msgIDs := make([]int64, 0, archive.RecordCount) + for rows.Next() { + err = rows.Scan(&msgID, &visibility) + if err != nil { + return err + } + msgIDs = append(msgIDs, msgID) + + // keep track of the number of visible messages, ie, not deleted + if visibility != "D" { + visibleCount++ + } + } + rows.Close() + + log.WithFields(logrus.Fields{ + "msg_count": len(msgIDs), + }).Debug("found messages") + + // verify we don't see more messages than there are in our archive (fewer is ok) + if visibleCount > archive.RecordCount { + return fmt.Errorf("more messages in the database: %d than in archive: %d", visibleCount, archive.RecordCount) + } + + // ok, delete our messages in batches, we do this in transactions as it spans a few different queries + for startIdx := 0; startIdx < len(msgIDs); startIdx += deleteTransactionSize { + // no single batch should take more than a few minutes + ctx, cancel := context.WithTimeout(ctx, time.Minute*15) + defer cancel() + + start := time.Now() + + endIdx := startIdx + deleteTransactionSize + if endIdx > len(msgIDs) { + endIdx = len(msgIDs) + } + batchIDs := msgIDs[startIdx:endIdx] + + // start our transaction + tx, err := db.BeginTxx(ctx, nil) + if err != nil { + return err + } + + // first update our delete_reason + err = executeInQuery(ctx, tx, setMessageDeleteReason, batchIDs) + if err != nil { + return fmt.Errorf("error updating delete reason: %s", err.Error()) + } + + // now delete any channel logs + err = executeInQuery(ctx, tx, deleteMessageLogs, batchIDs) + if err != nil { + return fmt.Errorf("error removing channel logs: %s", err.Error()) + } + + // then any labels + err = executeInQuery(ctx, tx, deleteMessageLabels, batchIDs) + if err != nil { + return fmt.Errorf("error removing message labels: %s", err.Error()) + } + + // unlink any responses + err = executeInQuery(ctx, tx, unlinkResponses, batchIDs) + if err != nil { + return fmt.Errorf("error unlinking responses: %s", err.Error()) + } + + // finally, delete our messages + err = executeInQuery(ctx, tx, deleteMessages, batchIDs) + if err != nil { + return fmt.Errorf("error deleting messages: %s", err.Error()) + } + + // commit our transaction + err = tx.Commit() + if err != nil { + return fmt.Errorf("error committing message delete transaction: %s", err.Error()) + } + + log.WithFields(logrus.Fields{ + "elapsed": time.Since(start), + "count": len(batchIDs), + }).Debug("deleted batch of messages") + + cancel() + } + + outer, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() + + deletedOn := time.Now() + + // all went well! mark our archive as no longer needing deletion + _, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn) + if err != nil { + return fmt.Errorf("error setting archive as deleted: %s", err.Error()) + } + archive.NeedsDeletion = false + archive.DeletedOn = &deletedOn + + logrus.WithFields(logrus.Fields{ + "elapsed": time.Since(start), + }).Info("completed deleting messages") + + return nil +} + +const selectOldOrgBroadcasts = ` +SELECT + id +FROM + msgs_broadcast +WHERE + org_id = $1 AND + created_on < $2 AND + schedule_id IS NULL +ORDER BY + created_on ASC, + id ASC +LIMIT 1000000; +` + +// DeleteBroadcasts deletes all broadcasts older than 90 days for the passed in org which have no active messages on them +func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, org Org) error { + start := time.Now() + threshhold := now.AddDate(0, 0, -org.RetentionPeriod) + + rows, err := db.QueryxContext(ctx, selectOldOrgBroadcasts, org.ID, threshhold) + if err != nil { + return err + } + defer rows.Close() + + count := 0 + for rows.Next() { + if count == 0 { + logrus.WithField("org_id", org.ID).Info("deleting broadcasts") + } + + // been deleting this org more than an hour? thats enough for today, exit out + if time.Since(start) > time.Hour { + break + } + + var broadcastID int64 + err := rows.Scan(&broadcastID) + if err != nil { + return errors.Wrap(err, "unable to get broadcast id") + } + + // make sure we have no active messages + var msgCount int64 + err = db.Get(&msgCount, `SELECT count(*) FROM msgs_msg WHERE broadcast_id = $1`, broadcastID) + if err != nil { + return errors.Wrapf(err, "unable to select number of msgs for broadcast: %d", broadcastID) + } + + if msgCount != 0 { + logrus.WithField("broadcast_id", broadcastID).WithField("org_id", org.ID).WithField("msg_count", msgCount).Warn("unable to delete broadcast, has messages still") + continue + } + + // we delete broadcasts in a transaction per broadcast + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return errors.Wrapf(err, "error starting transaction while deleting broadcast: %d", broadcastID) + } + + // delete contacts M2M + _, err = tx.Exec(`DELETE from msgs_broadcast_contacts WHERE broadcast_id = $1`, broadcastID) + if err != nil { + tx.Rollback() + return errors.Wrapf(err, "error deleting related contacts for broadcast: %d", broadcastID) + } + + // delete groups M2M + _, err = tx.Exec(`DELETE from msgs_broadcast_groups WHERE broadcast_id = $1`, broadcastID) + if err != nil { + tx.Rollback() + return errors.Wrapf(err, "error deleting related groups for broadcast: %d", broadcastID) + } + + // delete URNs M2M + _, err = tx.Exec(`DELETE from msgs_broadcast_urns WHERE broadcast_id = $1`, broadcastID) + if err != nil { + tx.Rollback() + return errors.Wrapf(err, "error deleting related urns for broadcast: %d", broadcastID) + } + + // delete counts associated with this broadcast + _, err = tx.Exec(`DELETE from msgs_broadcastmsgcount WHERE broadcast_id = $1`, broadcastID) + if err != nil { + tx.Rollback() + return errors.Wrapf(err, "error deleting counts for broadcast: %d", broadcastID) + } + + // finally, delete our broadcast + _, err = tx.Exec(`DELETE from msgs_broadcast WHERE id = $1`, broadcastID) + if err != nil { + tx.Rollback() + return errors.Wrapf(err, "error deleting broadcast: %d", broadcastID) + } + + err = tx.Commit() + if err != nil { + return errors.Wrapf(err, "error deleting broadcast: %d", broadcastID) + } + + count++ + } + + if count > 0 { + logrus.WithFields(logrus.Fields{ + "elapsed": time.Since(start), + "count": count, + "org_id": org.ID, + }).Info("completed deleting broadcasts") + } + + return nil +} diff --git a/archives/runs.go b/archives/runs.go new file mode 100644 index 0000000..3602f69 --- /dev/null +++ b/archives/runs.go @@ -0,0 +1,268 @@ +package archives + +import ( + "bufio" + "context" + "fmt" + "time" + + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const lookupFlowRuns = ` +SELECT rec.exited_on, row_to_json(rec) +FROM ( + SELECT + fr.id as id, + fr.uuid as uuid, + row_to_json(flow_struct) AS flow, + row_to_json(contact_struct) AS contact, + fr.responded, + (SELECT coalesce(jsonb_agg(path_data), '[]'::jsonb) from ( + SELECT path_row ->> 'node_uuid' AS node, (path_row ->> 'arrived_on')::timestamptz as time + FROM jsonb_array_elements(fr.path::jsonb) AS path_row) as path_data + ) as path, + (SELECT coalesce(jsonb_object_agg(values_data.key, values_data.value), '{}'::jsonb) from ( + SELECT key, jsonb_build_object('name', value -> 'name', 'value', value -> 'value', 'input', value -> 'input', 'time', (value -> 'created_on')::text::timestamptz, 'category', value -> 'category', 'node', value -> 'node_uuid') as value + FROM jsonb_each(fr.results::jsonb)) AS values_data + ) as values, + CASE + WHEN $1 + THEN '[]'::jsonb + ELSE + coalesce(fr.events, '[]'::jsonb) + END AS events, + fr.created_on, + fr.modified_on, + fr.exited_on, + CASE + WHEN exit_type = 'C' + THEN 'completed' + WHEN exit_type = 'I' + THEN 'interrupted' + WHEN exit_type = 'E' + THEN 'expired' + ELSE + null + END as exit_type, + a.username as submitted_by + + FROM flows_flowrun fr + LEFT JOIN auth_user a ON a.id = fr.submitted_by_id + JOIN LATERAL (SELECT uuid, name FROM flows_flow WHERE flows_flow.id = fr.flow_id) AS flow_struct ON True + JOIN LATERAL (SELECT uuid, name FROM contacts_contact cc WHERE cc.id = fr.contact_id) AS contact_struct ON True + + WHERE fr.org_id = $2 AND fr.modified_on >= $3 AND fr.modified_on < $4 + ORDER BY fr.modified_on ASC, id ASC +) as rec; +` + +// writeRunRecords writes the runs in the archive's date range to the passed in writer +func writeRunRecords(ctx context.Context, db *sqlx.DB, archive *Archive, writer *bufio.Writer) (int, error) { + var rows *sqlx.Rows + rows, err := db.QueryxContext(ctx, lookupFlowRuns, archive.Org.IsAnon, archive.Org.ID, archive.StartDate, archive.endDate()) + if err != nil { + return 0, errors.Wrapf(err, "error querying run records for org: %d", archive.Org.ID) + } + defer rows.Close() + + recordCount := 0 + var record string + var exitedOn *time.Time + for rows.Next() { + err = rows.Scan(&exitedOn, &record) + + // shouldn't be archiving an active run, that's an error + if exitedOn == nil { + return 0, fmt.Errorf("run still active, cannot archive: %s", record) + } + + if err != nil { + return 0, errors.Wrapf(err, "error scanning run record for org: %d", archive.Org.ID) + } + + writer.WriteString(record) + writer.WriteString("\n") + recordCount++ + } + + return recordCount, nil +} + +const selectOrgRunsInRange = ` +SELECT fr.id, fr.is_active +FROM flows_flowrun fr +LEFT JOIN contacts_contact cc ON cc.id = fr.contact_id +WHERE fr.org_id = $1 AND fr.modified_on >= $2 AND fr.modified_on < $3 +ORDER BY fr.modified_on ASC, fr.id ASC +` + +const setRunDeleteReason = ` +UPDATE flows_flowrun +SET delete_reason = 'A' +WHERE id IN(?) +` + +const deleteRecentRuns = ` +DELETE FROM flows_flowpathrecentrun +WHERE run_id IN(?) +` + +const unlinkParents = ` +UPDATE flows_flowrun +SET parent_id = NULL +WHERE parent_id IN(?) +` + +const deleteRuns = ` +DELETE FROM flows_flowrun +WHERE id IN(?) +` + +// DeleteArchivedRuns takes the passed in archive, verifies the S3 file is still present (and correct), then selects +// all the runs in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time +// +// Upon completion it updates the needs_deletion flag on the archive +func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Client s3iface.S3API, archive *Archive) error { + outer, cancel := context.WithTimeout(ctx, time.Hour*3) + defer cancel() + + start := time.Now() + log := logrus.WithFields(logrus.Fields{ + "id": archive.ID, + "org_id": archive.OrgID, + "start_date": archive.StartDate, + "end_date": archive.endDate(), + "archive_type": archive.ArchiveType, + "total_count": archive.RecordCount, + }) + log.Info("deleting runs") + + // first things first, make sure our file is present on S3 + md5, err := GetS3FileETAG(outer, s3Client, archive.URL) + if err != nil { + return err + } + + // if our etag and archive md5 don't match, that's an error, return + if md5 != archive.Hash { + return fmt.Errorf("archive md5: %s and s3 etag: %s do not match", archive.Hash, md5) + } + + // ok, archive file looks good, let's build up our list of run ids, this may be big but we are int64s so shouldn't be too big + rows, err := db.QueryxContext(outer, selectOrgRunsInRange, archive.OrgID, archive.StartDate, archive.endDate()) + if err != nil { + return err + } + defer rows.Close() + + var runID int64 + var isActive bool + runCount := 0 + runIDs := make([]int64, 0, archive.RecordCount) + for rows.Next() { + err = rows.Scan(&runID, &isActive) + if err != nil { + return err + } + + // if this run is still active, something has gone wrong, throw an error + if isActive { + return fmt.Errorf("run %d in archive is still active", runID) + } + + // increment our count + runCount++ + runIDs = append(runIDs, runID) + } + rows.Close() + + log.WithFields(logrus.Fields{ + "run_count": len(runIDs), + }).Debug("found runs") + + // verify we don't see more runs than there are in our archive (fewer is ok) + if runCount > archive.RecordCount { + return fmt.Errorf("more runs in the database: %d than in archive: %d", runCount, archive.RecordCount) + } + + // ok, delete our runs in batches, we do this in transactions as it spans a few different queries + for startIdx := 0; startIdx < len(runIDs); startIdx += deleteTransactionSize { + // no single batch should take more than a few minutes + ctx, cancel := context.WithTimeout(ctx, time.Minute*15) + defer cancel() + + start := time.Now() + + endIdx := startIdx + deleteTransactionSize + if endIdx > len(runIDs) { + endIdx = len(runIDs) + } + batchIDs := runIDs[startIdx:endIdx] + + // start our transaction + tx, err := db.BeginTxx(ctx, nil) + if err != nil { + return err + } + + // first update our delete_reason + err = executeInQuery(ctx, tx, setRunDeleteReason, batchIDs) + if err != nil { + return fmt.Errorf("error updating delete reason: %s", err.Error()) + } + + // any recent runs + err = executeInQuery(ctx, tx, deleteRecentRuns, batchIDs) + if err != nil { + return fmt.Errorf("error deleting recent runs: %s", err.Error()) + } + + // unlink any parents + err = executeInQuery(ctx, tx, unlinkParents, batchIDs) + if err != nil { + return fmt.Errorf("error unliking parent runs: %s", err.Error()) + } + + // finally, delete our runs + err = executeInQuery(ctx, tx, deleteRuns, batchIDs) + if err != nil { + return fmt.Errorf("error deleting runs: %s", err.Error()) + } + + // commit our transaction + err = tx.Commit() + if err != nil { + return fmt.Errorf("error committing run delete transaction: %s", err.Error()) + } + + log.WithFields(logrus.Fields{ + "elapsed": time.Since(start), + "count": len(batchIDs), + }).Debug("deleted batch of runs") + + cancel() + } + + outer, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() + + deletedOn := time.Now() + + // all went well! mark our archive as no longer needing deletion + _, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn) + if err != nil { + return fmt.Errorf("error setting archive as deleted: %s", err.Error()) + } + archive.NeedsDeletion = false + archive.DeletedOn = &deletedOn + + logrus.WithFields(logrus.Fields{ + "elapsed": time.Since(start), + }).Info("completed deleting runs") + + return nil +} diff --git a/s3.go b/archives/s3.go similarity index 99% rename from s3.go rename to archives/s3.go index 89f4392..2a755e6 100644 --- a/s3.go +++ b/archives/s3.go @@ -1,4 +1,4 @@ -package archiver +package archives import ( "context" diff --git a/testdata/messages1.jsonl b/archives/testdata/messages1.jsonl similarity index 100% rename from testdata/messages1.jsonl rename to archives/testdata/messages1.jsonl diff --git a/testdata/messages2.jsonl b/archives/testdata/messages2.jsonl similarity index 100% rename from testdata/messages2.jsonl rename to archives/testdata/messages2.jsonl diff --git a/testdata/runs1.jsonl b/archives/testdata/runs1.jsonl similarity index 100% rename from testdata/runs1.jsonl rename to archives/testdata/runs1.jsonl diff --git a/testdata/runs2.jsonl b/archives/testdata/runs2.jsonl similarity index 100% rename from testdata/runs2.jsonl rename to archives/testdata/runs2.jsonl diff --git a/cmd/rp-archiver/main.go b/cmd/rp-archiver/main.go index 230e614..c48f4d2 100644 --- a/cmd/rp-archiver/main.go +++ b/cmd/rp-archiver/main.go @@ -11,12 +11,12 @@ import ( "github.com/jmoiron/sqlx" _ "github.com/lib/pq" "github.com/nyaruka/ezconf" - archiver "github.com/nyaruka/rp-archiver" + "github.com/nyaruka/rp-archiver/archives" "github.com/sirupsen/logrus" ) func main() { - config := archiver.NewConfig() + config := archives.NewConfig() loader := ezconf.NewLoader(&config, "archiver", "Archives RapidPro runs and msgs to S3", []string{"archiver.toml"}) loader.MustLoad() @@ -67,14 +67,14 @@ func main() { var s3Client s3iface.S3API if config.UploadToS3 { - s3Client, err = archiver.NewS3Client(config) + s3Client, err = archives.NewS3Client(config) if err != nil { logrus.WithError(err).Fatal("unable to initialize s3 client") } } // ensure that we can actually write to the temp directory - err = archiver.EnsureTempArchiveDirectory(config.TempDir) + err = archives.EnsureTempArchiveDirectory(config.TempDir) if err != nil { logrus.WithError(err).Fatal("cannot write to temp directory") } @@ -91,7 +91,7 @@ func main() { // get our active orgs ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - orgs, err := archiver.GetActiveOrgs(ctx, db, config) + orgs, err := archives.GetActiveOrgs(ctx, db, config) cancel() if err != nil { @@ -107,15 +107,15 @@ func main() { log := logrus.WithField("org", org.Name).WithField("org_id", org.ID) if config.ArchiveMessages { - _, _, err = archiver.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archiver.MessageType) + _, _, err = archives.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archives.MessageType) if err != nil { - log.WithError(err).WithField("archive_type", archiver.MessageType).Error("error archiving org messages") + log.WithError(err).WithField("archive_type", archives.MessageType).Error("error archiving org messages") } } if config.ArchiveRuns { - _, _, err = archiver.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archiver.RunType) + _, _, err = archives.ArchiveOrg(ctx, time.Now(), config, db, s3Client, org, archives.RunType) if err != nil { - log.WithError(err).WithField("archive_type", archiver.RunType).Error("error archiving org runs") + log.WithError(err).WithField("archive_type", archives.RunType).Error("error archiving org runs") } } diff --git a/testdb.sql b/testdb.sql index c24a175..e2632c3 100644 --- a/testdb.sql +++ b/testdb.sql @@ -319,7 +319,7 @@ INSERT INTO flows_flowrun(id, uuid, responded, contact_id, flow_id, org_id, resu '[{"uuid": "600ac5b4-4895-4161-ad97-6e2f1bb48bcb", "node_uuid": "accbc6e2-b0df-46cd-9a76-bff0fdf4d753", "arrived_on": "2017-08-12T15:07:24.049815+02:00", "exit_uuid": "8249e2dc-c893-4200-b6d2-398d07a459bc"}]', '[{"msg": {"urn": "tel:+12076661212", "text": "hola", "uuid": "9ea50923-0888-4596-9a9d-4890994934a9", "channel": {"name": "1223", "uuid": "d6597e08-8285-428c-8e7e-97c68adfa073"}}, "type": "msg_created", "step_uuid": "ae067248-df92-41c8-bb29-92506e984259", "created_on": "2018-01-22T15:06:47.357682+00:00"}]', '2017-08-10 21:11:59.890662+02:00','2017-08-10 21:11:59.890662+02:00','2017-08-10 21:11:59.890662+02:00', 'C', 'C', NULL, 1), -(4, 'de782b35-a398-46ed-8550-34c66053841b', TRUE, 7, 2, 3, +(4, 'b3d5a052-77af-4588-b32c-124a7e0e9d6f', TRUE, 7, 2, 3, '{"agree": {"category": "Disagree", "node_uuid": "084c8cf1-715d-4d0a-b38d-a616ed74e638", "name": "Agree", "value": "B", "created_on": "2017-10-10T12:25:21.714339+00:00", "input": "B"}}', '[{"uuid": "babf4fc8-e12c-4bb9-a9dd-61178a118b5a", "node_uuid": "accbc6e2-b0df-46cd-9a76-bff0fdf4d753", "arrived_on": "2017-10-12T15:07:24.049815+02:00", "exit_uuid": "8249e2dc-c893-4200-b6d2-398d07a459bc"}]', '[{"msg": {"urn": "tel:+12076661212", "text": "hi hi", "uuid": "543d2c4b-ff0b-4b87-a9a4-b2d6745cf470", "channel": {"name": "1223", "uuid": "d6597e08-8285-428c-8e7e-97c68adfa073"}}, "type": "msg_created", "step_uuid": "3a5014dd-7b14-4b7a-be52-0419c09340a6", "created_on": "2018-10-12T15:06:47.357682+00:00"}]',