Skip to content

Commit

Permalink
work in drazen changes
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed May 16, 2018
1 parent 651c54b commit 3076bb8
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 95 deletions.
101 changes: 86 additions & 15 deletions archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
type ArchiveType string

const (
FlowRunType = ArchiveType("flowrun")
RunType = ArchiveType("run")
MessageType = ArchiveType("message")
SessionType = ArchiveType("session")
)
Expand Down Expand Up @@ -91,10 +91,10 @@ func GetActiveOrgs(ctx context.Context, db *sqlx.DB) ([]Org, error) {

const lookupOrgArchives = `SELECT start_date, archive_type FROM archives_archive WHERE org_id = $1 AND archive_type = $2 AND period = 'D' ORDER BY start_date asc`

// GetArchiveTasks calculates what archives need to be generated for the passed in org this is calculated per day
func GetArchiveTasks(ctx context.Context, db *sqlx.DB, now time.Time, org Org, archiveType ArchiveType) ([]Archive, error) {
archives := []Archive{}
err := db.SelectContext(ctx, &archives, lookupOrgArchives, org.ID, archiveType)
// GetMissingArchives calculates what archives need to be generated for the passed in org this is calculated per day
func GetMissingArchives(ctx context.Context, db *sqlx.DB, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, error) {
existingArchives := []*Archive{}
err := db.SelectContext(ctx, &existingArchives, lookupOrgArchives, org.ID, archiveType)
if err != nil && err != sql.ErrNoRows {
return nil, err
}
Expand All @@ -104,20 +104,20 @@ func GetArchiveTasks(ctx context.Context, db *sqlx.DB, now time.Time, org Org, a
orgUTC := org.CreatedOn.In(time.UTC)
startDate := time.Date(orgUTC.Year(), orgUTC.Month(), orgUTC.Day(), 0, 0, 0, 0, time.UTC)

tasks := make([]Archive, 0, 1)
missingArchives := make([]*Archive, 0, 1)
currentArchiveIdx := 0

// walk forwards until we are after our end date
for !startDate.After(endDate) {
existing := false

// advance our current archive idx until we are on our start date or later
for currentArchiveIdx < len(archives) && archives[currentArchiveIdx].StartDate.Before(startDate) {
for currentArchiveIdx < len(existingArchives) && existingArchives[currentArchiveIdx].StartDate.Before(startDate) {
currentArchiveIdx++
}

// do we already have this archive?
if currentArchiveIdx < len(archives) && archives[currentArchiveIdx].StartDate.Equal(startDate) {
if currentArchiveIdx < len(existingArchives) && existingArchives[currentArchiveIdx].StartDate.Equal(startDate) {
existing = true
}

Expand All @@ -130,13 +130,13 @@ func GetArchiveTasks(ctx context.Context, db *sqlx.DB, now time.Time, org Org, a
ArchiveType: archiveType,
Period: Day,
}
tasks = append(tasks, archive)
missingArchives = append(missingArchives, &archive)
}

startDate = startDate.Add(time.Hour * 24)
}

return tasks, nil
return missingArchives, nil
}

const lookupMsgs = `
Expand Down Expand Up @@ -230,7 +230,7 @@ FROM (
`

// EnsureTempArchiveDirectory checks that we can write to our archive directory, creating it first if needbe
func EnsureTempArchiveDirectory(ctx context.Context, path string) error {
func EnsureTempArchiveDirectory(path string) error {
if len(path) == 0 {
return fmt.Errorf("path argument cannot be empty")
}
Expand Down Expand Up @@ -287,7 +287,7 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi
var rows *sqlx.Rows
if archive.ArchiveType == MessageType {
rows, err = db.QueryxContext(ctx, lookupMsgs, archive.Org.ID, archive.StartDate, endDate)
} else if archive.ArchiveType == FlowRunType {
} else if archive.ArchiveType == RunType {
rows, err = db.QueryxContext(ctx, lookupFlowRuns, archive.Org.ID, archive.StartDate, endDate)
}
if err != nil {
Expand All @@ -313,7 +313,7 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi
if visibility == "deleted" {
continue
}
} else if archive.ArchiveType == FlowRunType {
} else if archive.ArchiveType == RunType {
err = rows.Scan(&record)
if err != nil {
return err
Expand Down Expand Up @@ -360,16 +360,31 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi
return nil
}

// UploadArchive upload file to S3
// UploadArchive uploads the passed archive file to S3
func UploadArchive(ctx context.Context, s3Client s3iface.S3API, bucket string, archive *Archive) error {
// s3 wants a base64 encoded hash instead of our hex encoded
hashBytes, _ := hex.DecodeString(archive.ArchiveHash)
hashBase64 := base64.StdEncoding.EncodeToString(hashBytes)

archivePath := ""
if archive.Period == Day {
archivePath = fmt.Sprintf(
"/%d/%s_%s%d%02d%02d_%s.jsonl.gz",
archive.Org.ID, archive.ArchiveType, archive.Period,
archive.StartDate.Year(), archive.StartDate.Month(), archive.StartDate.Day(),
archive.ArchiveHash)
} else {
archivePath = fmt.Sprintf(
"/%d/%s_%s%d%02d_%s.jsonl.gz",
archive.Org.ID, archive.ArchiveType, archive.Period,
archive.StartDate.Year(), archive.StartDate.Month(),
archive.ArchiveHash)
}

url, err := s3.PutS3File(
s3Client,
bucket,
fmt.Sprintf("/%d/%s_%d_%02d_%s.jsonl.gz", archive.Org.ID, archive.ArchiveType, archive.StartDate.Year(), archive.StartDate.Month(), archive.ArchiveHash),
archivePath,
"application/json",
"gzip",
archive.ArchiveFile,
Expand Down Expand Up @@ -436,3 +451,59 @@ func DeleteArchiveFile(archive *Archive) error {
log.WithField("filename", archive.ArchiveFile).Debug("deleted temporary archive file")
return nil
}

// ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives
func ArchiveOrg(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) {
log := logrus.WithField("org", org.Name).WithField("org_id", org.ID)
records := 0

archives, err := GetMissingArchives(ctx, db, now, org, archiveType)
if err != nil {
return nil, fmt.Errorf("error calculating tasks for type '%s'", archiveType)
}

for _, archive := range archives {
log = log.WithField("start_date", archive.StartDate).WithField("period", archive.Period).WithField("archive_type", archive.ArchiveType)
log.Info("starting archive")
err := CreateArchiveFile(ctx, db, archive, config.TempDir)
if err != nil {
log.WithError(err).Error("error writing archive file")
continue
}

if config.UploadToS3 {
err = UploadArchive(ctx, s3Client, config.S3Bucket, archive)
if err != nil {
log.WithError(err).Error("error writing archive to s3")
continue
}
}

err = WriteArchiveToDB(ctx, db, archive)
if err != nil {
log.WithError(err).Error("error writing record to db")
continue
}

// purge records that were archived

if config.DeleteAfterUpload == true {
err := DeleteArchiveFile(archive)
if err != nil {
log.WithError(err).Error("error deleting temporary file")
continue
}
}

log.WithField("id", archive.ID).WithField("record_count", archive.RecordCount).WithField("elapsed", archive.BuildTime).Info("archive complete")
records += archive.RecordCount
}

if len(archives) > 0 {
elapsed := time.Now().Sub(now)
rate := float32(records) / (float32(elapsed) / float32(time.Second))
log.WithField("elapsed", elapsed).WithField("records_per_second", int(rate)).Info("completed archival for org")
}

return archives, nil
}
53 changes: 40 additions & 13 deletions archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@ func TestGetArchiveTasks(t *testing.T) {
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)

// org 1 is too new, no tasks
tasks, err := GetArchiveTasks(ctx, db, now, orgs[0], MessageType)
tasks, err := GetMissingArchives(ctx, db, now, orgs[0], MessageType)
assert.NoError(t, err)
assert.Equal(t, 0, len(tasks))

// org 2 should have some
tasks, err = GetArchiveTasks(ctx, db, now, orgs[1], MessageType)
tasks, err = GetMissingArchives(ctx, db, now, orgs[1], MessageType)
assert.NoError(t, err)
assert.Equal(t, 62, len(tasks))
assert.Equal(t, time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), tasks[0].StartDate)
assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), tasks[61].StartDate)

// org 3 is the same as 2, but two of the tasks have already been built
tasks, err = GetArchiveTasks(ctx, db, now, orgs[2], MessageType)
tasks, err = GetMissingArchives(ctx, db, now, orgs[2], MessageType)
assert.NoError(t, err)
assert.Equal(t, 60, len(tasks))
assert.Equal(t, time.Date(2017, 8, 11, 0, 0, 0, 0, time.UTC), tasks[0].StartDate)
Expand All @@ -61,40 +61,40 @@ func TestCreateMsgArchive(t *testing.T) {
db := setup(t)
ctx := context.Background()

err := EnsureTempArchiveDirectory(ctx, "/tmp")
err := EnsureTempArchiveDirectory("/tmp")
assert.NoError(t, err)

orgs, err := GetActiveOrgs(ctx, db)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)

tasks, err := GetArchiveTasks(ctx, db, now, orgs[1], MessageType)
tasks, err := GetMissingArchives(ctx, db, now, orgs[1], MessageType)
assert.NoError(t, err)
assert.Equal(t, 62, len(tasks))
task := tasks[0]

// build our first task, should have no messages
err = CreateArchiveFile(ctx, db, &task, "/tmp")
err = CreateArchiveFile(ctx, db, task, "/tmp")
assert.NoError(t, err)

// should have no records and be an empty gzip file
assert.Equal(t, 0, task.RecordCount)
assert.Equal(t, int64(23), task.ArchiveSize)
assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", task.ArchiveHash)

DeleteArchiveFile(&task)
DeleteArchiveFile(task)

// build our second task, should have a single message
// build our third task, should have a single message
task = tasks[2]
err = CreateArchiveFile(ctx, db, &task, "/tmp")
err = CreateArchiveFile(ctx, db, task, "/tmp")
assert.NoError(t, err)

// should have two records, second will have attachments
assert.Equal(t, 2, task.RecordCount)
assert.Equal(t, int64(365), task.ArchiveSize)
assert.Equal(t, "cc67ae0d1edb9caa4c8c56b3d4de58ee", task.ArchiveHash)

DeleteArchiveFile(&task)
DeleteArchiveFile(task)
_, err = os.Stat(task.ArchiveFile)
assert.True(t, os.IsNotExist(err))
}
Expand All @@ -107,21 +107,48 @@ func TestWriteArchiveToDB(t *testing.T) {
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)

tasks, err := GetArchiveTasks(ctx, db, now, orgs[1], MessageType)
tasks, err := GetMissingArchives(ctx, db, now, orgs[1], MessageType)
assert.NoError(t, err)
assert.Equal(t, 62, len(tasks))
assert.Equal(t, time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), tasks[0].StartDate)

task := tasks[0]
err = WriteArchiveToDB(ctx, db, &task)
err = WriteArchiveToDB(ctx, db, task)

assert.NoError(t, err)
assert.Equal(t, 3, task.ID)
assert.Equal(t, false, task.IsPurged)

// if we recalculate our tasks, we should have one less now
tasks, err = GetArchiveTasks(ctx, db, now, orgs[1], MessageType)
tasks, err = GetMissingArchives(ctx, db, now, orgs[1], MessageType)
assert.NoError(t, err)
assert.Equal(t, 61, len(tasks))
assert.Equal(t, time.Date(2017, 8, 11, 0, 0, 0, 0, time.UTC), tasks[0].StartDate)
}

func TestArchiveOrg(t *testing.T) {
db := setup(t)
ctx := context.Background()

orgs, err := GetActiveOrgs(ctx, db)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)

conf := NewConfig()
conf.UploadToS3 = false
conf.TempDir = "/tmp"

archives, err := ArchiveOrg(ctx, now, conf, db, nil, orgs[1], MessageType)

assert.Equal(t, 62, len(archives))
assert.Equal(t, time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), archives[0].StartDate)
assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), archives[61].StartDate)

assert.Equal(t, 0, archives[0].RecordCount)
assert.Equal(t, int64(23), archives[0].ArchiveSize)
assert.Equal(t, "f0d79988b7772c003d04a28bd7417a62", archives[0].ArchiveHash)

assert.Equal(t, 2, archives[2].RecordCount)
assert.Equal(t, int64(365), archives[2].ArchiveSize)
assert.Equal(t, "cc67ae0d1edb9caa4c8c56b3d4de58ee", archives[2].ArchiveHash)
}
Loading

0 comments on commit 3076bb8

Please sign in to comment.