Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use minio to emulate S3 for testing #102

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ jobs:
- 5432:5432
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5

minio:
image: bitnami/minio:latest
env:
MINIO_ROOT_USER: root
MINIO_ROOT_PASSWORD: tembatemba
MINIO_DEFAULT_BUCKETS: temba-archives
ports:
- 9000:9000
options: --health-cmd "mc ready local" --health-interval 10s --health-timeout 5s --health-retries 5

steps:
- name: Checkout code
uses: actions/checkout@v4
Expand All @@ -29,9 +39,6 @@ jobs:

- name: Run tests
run: go test -p=1 -coverprofile=coverage.text -covermode=atomic ./...
env:
ARCHIVER_AWS_ACCESS_KEY_ID: ${{ secrets.ARCHIVER_AWS_ACCESS_KEY_ID }}
ARCHIVER_AWS_SECRET_ACCESS_KEY: ${{ secrets.ARCHIVER_AWS_SECRET_ACCESS_KEY }}

- name: Upload coverage
if: success()
Expand Down
198 changes: 92 additions & 106 deletions archives/archives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,34 @@ import (
"github.com/stretchr/testify/assert"
)

func setup(t *testing.T) *sqlx.DB {
func setup(t *testing.T) (*Config, *sqlx.DB) {
config := NewDefaultConfig()
config.DB = "postgres://archiver_test:temba@localhost:5432/archiver_test?sslmode=disable&TimeZone=UTC"

// configure S3 to use a local minio instance
config.AWSAccessKeyID = "root"
config.AWSSecretAccessKey = "tembatemba"
config.S3Endpoint = "http://localhost:9000"
config.S3ForcePathStyle = true

testDB, err := os.ReadFile("../testdb.sql")
assert.NoError(t, err)

db, err := sqlx.Open("postgres", "postgres://archiver_test:temba@localhost:5432/archiver_test?sslmode=disable&TimeZone=UTC")
db, err := sqlx.Open("postgres", config.DB)
assert.NoError(t, err)

_, err = db.Exec(string(testDB))
assert.NoError(t, err)

slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})))

return db
return config, db
}

func TestGetMissingDayArchives(t *testing.T) {
db := setup(t)
config, db := setup(t)

// get the tasks for our org
ctx := context.Background()
config := NewDefaultConfig()

orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
Expand Down Expand Up @@ -83,11 +90,10 @@ func TestGetMissingDayArchives(t *testing.T) {
}

func TestGetMissingMonthArchives(t *testing.T) {
db := setup(t)
config, db := setup(t)

// get the tasks for our org
ctx := context.Background()
config := NewDefaultConfig()

orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
Expand Down Expand Up @@ -115,13 +121,12 @@ func TestGetMissingMonthArchives(t *testing.T) {
}

func TestCreateMsgArchive(t *testing.T) {
db := setup(t)
config, db := setup(t)
ctx := context.Background()

err := EnsureTempArchiveDirectory("/tmp")
assert.NoError(t, err)

config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down Expand Up @@ -192,13 +197,12 @@ func assertArchiveFile(t *testing.T, archive *Archive, truthName string) {
}

func TestCreateRunArchive(t *testing.T) {
db := setup(t)
config, db := setup(t)
ctx := context.Background()

err := EnsureTempArchiveDirectory("/tmp")
assert.NoError(t, err)

config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down Expand Up @@ -252,10 +256,9 @@ func TestCreateRunArchive(t *testing.T) {
}

func TestWriteArchiveToDB(t *testing.T) {
db := setup(t)
config, db := setup(t)
ctx := context.Background()

config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down Expand Up @@ -305,11 +308,10 @@ func getCountInRange(db *sqlx.DB, query string, orgID int, start time.Time, end
}

func TestArchiveOrgMessages(t *testing.T) {
db := setup(t)
config, db := setup(t)
ctx := context.Background()
deleteTransactionSize = 1

config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down Expand Up @@ -425,103 +427,89 @@ func assertArchive(t *testing.T, a *Archive, startDate time.Time, period Archive
}

func TestArchiveOrgRuns(t *testing.T) {
db := setup(t)
config, db := setup(t)
ctx := context.Background()

config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)

os.Args = []string{"rp-archiver"}

loader := ezconf.NewLoader(&config, "archiver", "Archives RapidPro runs and msgs to S3", nil)
loader.MustLoad()

config.Delete = true

// AWS S3 config in the environment needed to download from S3
if config.AWSAccessKeyID != "" && config.AWSSecretAccessKey != "" {
s3Client, err := NewS3Client(config)
assert.NoError(t, err)

dailiesCreated, _, monthliesCreated, _, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[2], RunType)
assert.NoError(t, err)
s3Client, err := NewS3Client(config)
assert.NoError(t, err)

assert.Equal(t, 10, len(dailiesCreated))
assertArchive(t, dailiesCreated[0], time.Date(2017, 10, 1, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")
assertArchive(t, dailiesCreated[9], time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 2, 1953, "95475b968ceff15f2f90d539e1bd3d20")
dailiesCreated, _, monthliesCreated, _, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[2], RunType)
assert.NoError(t, err)

assert.Equal(t, 2, len(monthliesCreated))
assertArchive(t, monthliesCreated[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 1, 465, "40abf2113ea7c25c5476ff3025d54b07")
assertArchive(t, monthliesCreated[1], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")
assert.Equal(t, 10, len(dailiesCreated))
assertArchive(t, dailiesCreated[0], time.Date(2017, 10, 1, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")
assertArchive(t, dailiesCreated[9], time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 2, 1953, "95475b968ceff15f2f90d539e1bd3d20")

assert.Equal(t, 12, len(deleted))
assert.Equal(t, 2, len(monthliesCreated))
assertArchive(t, monthliesCreated[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 1, 465, "40abf2113ea7c25c5476ff3025d54b07")
assertArchive(t, monthliesCreated[1], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")

// no runs remaining
for _, d := range deleted {
count, err := getCountInRange(
db,
getRunCount,
orgs[2].ID,
d.StartDate,
d.endDate(),
)
assert.NoError(t, err)
assert.Equal(t, 0, count)
assert.Equal(t, 12, len(deleted))

assert.False(t, d.NeedsDeletion)
assert.NotNil(t, d.DeletedOn)
}

// other org runs unaffected
// no runs remaining
for _, d := range deleted {
count, err := getCountInRange(
db,
getRunCount,
orgs[1].ID,
time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2020, 2, 1, 0, 0, 0, 0, time.UTC),
)
assert.NoError(t, err)
assert.Equal(t, 3, count)

// more recent run unaffected (even though it was parent)
count, err = getCountInRange(
db,
getRunCount,
orgs[2].ID,
time.Date(2017, 12, 1, 0, 0, 0, 0, time.UTC),
time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC),
d.StartDate,
d.endDate(),
)
assert.NoError(t, err)
assert.Equal(t, 1, count)
assert.Equal(t, 0, count)

// org 2 has a run that can't be archived because it's still active - as it has no existing archives
// this will manifest itself as a monthly which fails to save
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], RunType)
assert.NoError(t, err)
assert.False(t, d.NeedsDeletion)
assert.NotNil(t, d.DeletedOn)
}

assert.Equal(t, 31, len(dailiesCreated))
assertArchive(t, dailiesCreated[0], time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")
// other org runs unaffected
count, err := getCountInRange(
db,
getRunCount,
orgs[1].ID,
time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2020, 2, 1, 0, 0, 0, 0, time.UTC),
)
assert.NoError(t, err)
assert.Equal(t, 3, count)

assert.Equal(t, 1, len(dailiesFailed))
assertArchive(t, dailiesFailed[0], time.Date(2017, 8, 14, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 0, "")
// more recent run unaffected (even though it was parent)
count, err = getCountInRange(
db,
getRunCount,
orgs[2].ID,
time.Date(2017, 12, 1, 0, 0, 0, 0, time.UTC),
time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC),
)
assert.NoError(t, err)
assert.Equal(t, 1, count)

assert.Equal(t, 1, len(monthliesCreated))
assertArchive(t, monthliesCreated[0], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")
// org 2 has a run that can't be archived because it's still active - as it has no existing archives
// this will manifest itself as a monthly which fails to save
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], RunType)
assert.NoError(t, err)

assert.Equal(t, 1, len(monthliesFailed))
assertArchive(t, monthliesFailed[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 0, "")
}
assert.Equal(t, 31, len(dailiesCreated))
assertArchive(t, dailiesCreated[0], time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")

assert.Equal(t, 1, len(dailiesFailed))
assertArchive(t, dailiesFailed[0], time.Date(2017, 8, 14, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 0, "")

assert.Equal(t, 1, len(monthliesCreated))
assertArchive(t, monthliesCreated[0], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")

assert.Equal(t, 1, len(monthliesFailed))
assertArchive(t, monthliesFailed[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 0, "")
}

func TestArchiveActiveOrgs(t *testing.T) {
db := setup(t)
config := NewDefaultConfig()

os.Args = []string{"rp-archiver"}
loader := ezconf.NewLoader(&config, "archiver", "Archives RapidPro runs and msgs to S3", nil)
loader.MustLoad()
config, db := setup(t)

mockAnalytics := analytics.NewMock()
analytics.RegisterBackend(mockAnalytics)
Expand All @@ -530,28 +518,26 @@ func TestArchiveActiveOrgs(t *testing.T) {
dates.SetNowSource(dates.NewSequentialNowSource(time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)))
defer dates.SetNowSource(dates.DefaultNowSource)

if config.AWSAccessKeyID != "" && config.AWSSecretAccessKey != "" {
s3Client, err := NewS3Client(config)
assert.NoError(t, err)
s3Client, err := NewS3Client(config)
assert.NoError(t, err)

err = ArchiveActiveOrgs(db, config, s3Client)
assert.NoError(t, err)
err = ArchiveActiveOrgs(db, config, s3Client)
assert.NoError(t, err)

assert.Equal(t, map[string][]float64{
"archiver.archive_elapsed": {848.0},
"archiver.orgs_archived": {3},
"archiver.msgs_records_archived": {5},
"archiver.msgs_archives_created": {92},
"archiver.msgs_archives_failed": {0},
"archiver.msgs_rollups_created": {3},
"archiver.msgs_rollups_failed": {0},
"archiver.runs_records_archived": {4},
"archiver.runs_archives_created": {41},
"archiver.runs_archives_failed": {1},
"archiver.runs_rollups_created": {3},
"archiver.runs_rollups_failed": {1},
}, mockAnalytics.Gauges)
}
assert.Equal(t, map[string][]float64{
"archiver.archive_elapsed": {848.0},
"archiver.orgs_archived": {3},
"archiver.msgs_records_archived": {5},
"archiver.msgs_archives_created": {92},
"archiver.msgs_archives_failed": {0},
"archiver.msgs_rollups_created": {3},
"archiver.msgs_rollups_failed": {0},
"archiver.runs_records_archived": {4},
"archiver.runs_archives_created": {41},
"archiver.runs_archives_failed": {1},
"archiver.runs_rollups_created": {3},
"archiver.runs_rollups_failed": {1},
}, mockAnalytics.Gauges)

analytics.Stop()
}
10 changes: 6 additions & 4 deletions archives/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ type Config struct {
AWSSecretAccessKey string `help:"secret access key to use for AWS services"`
AWSRegion string `help:"region to use for AWS services, e.g. us-east-1"`

S3Endpoint string `help:"the S3 endpoint we will write archives to"`
S3Bucket string `help:"the S3 bucket we will write archives to"`
S3Endpoint string `help:"the S3 endpoint we will write archives to"`
S3Bucket string `help:"the S3 bucket we will write archives to"`
S3ForcePathStyle bool `help:"S3 should used /bucket/path style URLs"`

TempDir string `help:"directory where temporary archive files are written"`
KeepFiles bool `help:"whether we should keep local archive files after upload (default false)"`
Expand Down Expand Up @@ -43,8 +44,9 @@ func NewDefaultConfig() *Config {
AWSSecretAccessKey: "",
AWSRegion: "us-east-1",

S3Endpoint: "https://s3.amazonaws.com",
S3Bucket: "dl-archiver-test",
S3Endpoint: "https://s3.amazonaws.com",
S3Bucket: "temba-archives",
S3ForcePathStyle: false,

TempDir: "/tmp",
KeepFiles: false,
Expand Down
5 changes: 3 additions & 2 deletions archives/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ const chunkSizeBytes = 1e9 // 1GB
// NewS3Client creates a new s3 client from the passed in config, testing it as necessary
func NewS3Client(config *Config) (s3iface.S3API, error) {
s3config := &aws.Config{
Region: aws.String(config.AWSRegion),
Endpoint: aws.String(config.S3Endpoint),
Region: aws.String(config.AWSRegion),
Endpoint: aws.String(config.S3Endpoint),
S3ForcePathStyle: aws.Bool(config.S3ForcePathStyle),
}
if config.AWSAccessKeyID != "" {
s3config.Credentials = credentials.NewStaticCredentials(config.AWSAccessKeyID, config.AWSSecretAccessKey, "")
Expand Down
Loading