diff --git a/.gitignore b/.gitignore index f56185d..0e43e16 100644 --- a/.gitignore +++ b/.gitignore @@ -12,5 +12,8 @@ cloudscale-metrics-collector .public/ node_modules/ +# IDE +.idea/ + # Configuration env diff --git a/accumulate.go b/accumulate.go new file mode 100644 index 0000000..4d02d4b --- /dev/null +++ b/accumulate.go @@ -0,0 +1,104 @@ +package main + +import ( + "context" + "fmt" + "github.com/cloudscale-ch/cloudscale-go-sdk/v2" + "os" + "time" +) + +// AccumulateKey represents one data point ("fact") in the billing database. +// The actual value for the data point is not present in this type, as this type is just a map key, and the corresponding value is stored as a map value. +type AccumulateKey struct { + Query string + Zone string + Tenant string + Namespace string + Start time.Time +} + +// String returns the full "source" string as used by the appuio-cloud-reporting +func (this AccumulateKey) String() string { + return this.Query + ":" + this.Zone + ":" + this.Tenant + ":" + this.Namespace +} + +/* +accumulateBucketMetrics gets all the bucket metrics from cloudscale and puts them into a map. The map key is the "AccumulateKey", +and the value is the raw value of the data returned by cloudscale (e.g. bytes, requests). In order to construct the +correct AccumulateKey, this function needs to fetch the ObjectUsers's tags, because that's where the zone, tenant and +namespace are stored. +This method is "accumulating" data because it collects data from possibly multiple ObjectsUsers under the same +AccumulateKey. This is because the billing system can't handle multiple ObjectsUsers per namespace. +*/ +func accumulateBucketMetrics(ctx context.Context, date time.Time, cloudscaleClient *cloudscale.Client) (map[AccumulateKey]uint64, error) { + bucketMetricsRequest := cloudscale.BucketMetricsRequest{Start: date, End: date} + bucketMetrics, err := cloudscaleClient.Metrics.GetBucketMetrics(ctx, &bucketMetricsRequest) + if err != nil { + return nil, err + } + + accumulated := make(map[AccumulateKey]uint64) + + for _, bucketMetricsData := range bucketMetrics.Data { + objectsUser, err := cloudscaleClient.ObjectsUsers.Get(ctx, bucketMetricsData.Subject.ObjectsUserID) + if err != nil || objectsUser == nil { + fmt.Fprintf(os.Stderr, "WARNING: Cannot sync bucket %s, objects user %s not found\n", bucketMetricsData.Subject.BucketName, bucketMetricsData.Subject.ObjectsUserID) + continue + } + err = accumulateBucketMetricsForObjectsUser(accumulated, bucketMetricsData, objectsUser) + if err != nil { + fmt.Fprintf(os.Stderr, "WARNING: Cannot sync bucket %s: %v\n", bucketMetricsData.Subject.BucketName, err) + continue + } + } + + return accumulated, nil +} + +func accumulateBucketMetricsForObjectsUser(accumulated map[AccumulateKey]uint64, bucketMetricsData cloudscale.BucketMetricsData, objectsUser *cloudscale.ObjectsUser) error { + if len(bucketMetricsData.TimeSeries) != 1 { + return fmt.Errorf("There must be exactly one metrics data point, found %d", len(bucketMetricsData.TimeSeries)) + } + + tenantStr := objectsUser.Tags["tenant"] + if tenantStr == "" { + return fmt.Errorf("no tenant information found on objectsUser") + } + namespace := objectsUser.Tags["namespace"] + if namespace == "" { + return fmt.Errorf("no namespace information found on objectsUser") + } + zone := objectsUser.Tags["zone"] + if zone == "" { + return fmt.Errorf("no zone information found on objectsUser") + } + + sourceStorage := AccumulateKey{ + Query: sourceQueryStorage, + Zone: zone, + Tenant: tenantStr, + Namespace: namespace, + Start: bucketMetricsData.TimeSeries[0].Start, + } + sourceTrafficOut := AccumulateKey{ + Query: sourceQueryTrafficOut, + Zone: zone, + Tenant: tenantStr, + Namespace: namespace, + Start: bucketMetricsData.TimeSeries[0].Start, + } + sourceRequests := AccumulateKey{ + Query: sourceQueryRequests, + Zone: zone, + Tenant: tenantStr, + Namespace: namespace, + Start: bucketMetricsData.TimeSeries[0].Start, + } + + accumulated[sourceStorage] += uint64(bucketMetricsData.TimeSeries[0].Usage.StorageBytes) + accumulated[sourceTrafficOut] += uint64(bucketMetricsData.TimeSeries[0].Usage.SentBytes) + accumulated[sourceRequests] += uint64(bucketMetricsData.TimeSeries[0].Usage.Requests) + + return nil +} diff --git a/accumulate_test.go b/accumulate_test.go new file mode 100644 index 0000000..076f645 --- /dev/null +++ b/accumulate_test.go @@ -0,0 +1,72 @@ +package main + +import ( + "fmt" + "github.com/cloudscale-ch/cloudscale-go-sdk/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +// assertEqualfUint64 implements the functionality of assert.Equalf for uint64, because assert.Equalf cannot print uint64 correctly. +// See https://github.com/stretchr/testify/issues/400 +func assertEqualfUint64(t *testing.T, expected uint64, actual uint64, msg string, args ...interface{}) bool { + if expected != actual { + return assert.Fail(t, fmt.Sprintf("Not equal: \n"+ + "expected: %d\n"+ + "actual : %d", expected, actual)) + } + return true +} + +func TestAccumulateBucketMetricsForObjectsUser(t *testing.T) { + zone := "appuio-cloudscale-ch-lpg" + tenant := "inity" + namespace := "testnamespace" + + // get the correct date for a data set that was created yesterday + location, err := time.LoadLocation("Europe/Zurich") + require.NoError(t, err, "could not load location Europe/Zurich") + now := time.Now().In(location) + date := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()) + + // build input data structure + bucketMetricsInterval := []cloudscale.BucketMetricsInterval{ + { + Start: date, + End: date, + Usage: cloudscale.BucketMetricsIntervalUsage{ + Requests: 5, + StorageBytes: 1000000, + SentBytes: 2000000, + }, + }, + } + bucketMetricsData := cloudscale.BucketMetricsData{ + TimeSeries: bucketMetricsInterval, + } + objectsUser := cloudscale.ObjectsUser{} + objectsUser.Tags = cloudscale.TagMap{"zone": zone, "tenant": tenant, "namespace": namespace} + + accumulated := make(map[AccumulateKey]uint64) + accumulateBucketMetricsForObjectsUser(accumulated, bucketMetricsData, &objectsUser) + + require.Len(t, accumulated, 3, "incorrect amount of values 'accumulated'") + + key := AccumulateKey{ + Zone: zone, + Tenant: tenant, + Namespace: namespace, + Start: date, + } + + key.Query = "object-storage-requests" + assertEqualfUint64(t, uint64(5), accumulated[key], "incorrect value in %s", key) + + key.Query = "object-storage-storage" + assertEqualfUint64(t, uint64(1000000), accumulated[key], "incorrect value in %s", key) + + key.Query = "object-storage-traffic-out" + assertEqualfUint64(t, uint64(2000000), accumulated[key], "incorrect value in %s", key) +} diff --git a/go.mod b/go.mod index e754d2b..57e9bd7 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/go-logr/logr v1.2.3 github.com/go-logr/zapr v1.2.3 github.com/jmoiron/sqlx v1.3.5 + github.com/stretchr/testify v1.8.0 github.com/urfave/cli/v2 v2.10.3 go.uber.org/zap v1.21.0 ) @@ -15,6 +16,7 @@ require ( require ( github.com/benbjohnson/clock v1.1.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect github.com/jackc/pgconn v1.12.0 // indirect github.com/jackc/pgio v1.0.0 // indirect @@ -24,12 +26,13 @@ require ( github.com/jackc/pgtype v1.11.0 // indirect github.com/jackc/pgx/v4 v4.16.0 // indirect github.com/lopezator/migrator v0.3.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect - github.com/stretchr/testify v1.8.0 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 // indirect golang.org/x/text v0.3.7 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 4922661..26b7bc0 100644 --- a/go.sum +++ b/go.sum @@ -88,8 +88,10 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= @@ -229,6 +231,7 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= diff --git a/main.go b/main.go index 147b944..4e4aef5 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "context" "database/sql" + "errors" "fmt" "github.com/appuio/appuio-cloud-reporting/pkg/db" "github.com/cloudscale-ch/cloudscale-go-sdk/v2" @@ -16,6 +17,7 @@ import ( "github.com/vshn/cloudscale-metrics-collector/pkg/tenantsmodel" "net/http" "os" + "strconv" "time" ) @@ -27,14 +29,15 @@ var ( appName = "cloudscale-metrics-collector" // constants + daysEnvVariable = "DAYS" tokenEnvVariable = "CLOUDSCALE_API_TOKEN" dbUrlEnvVariable = "ACR_DB_URL" // source format: 'query:zone:tenant:namespace' or 'query:zone:tenant:namespace:class' // We do not have real (prometheus) queries here, just random hardcoded strings. - sourceQueryStorage = "s3-storage" - sourceQueryTrafficOut = "s3-traffic-out" - sourceQueryRequests = "s3-requests" + sourceQueryStorage = "object-storage-storage" + sourceQueryTrafficOut = "object-storage-traffic-out" + sourceQueryRequests = "object-storage-requests" // we must use the correct zones, otherwise the appuio-odoo-adapter will not work correctly sourceZones = []string{"c-appuio-cloudscale-lpg-2"} @@ -47,7 +50,7 @@ var ( Source: sourceQueryStorage + ":" + sourceZones[0], Target: sql.NullString{String: "1401", Valid: true}, Amount: 0.003, - Unit: "GB/day", // SI GB according to cloudscale + Unit: "GBDay", // SI GB according to cloudscale During: db.InfiniteRange(), }, { @@ -87,21 +90,21 @@ var ( queriesData = []*db.Query{ { Name: sourceQueryStorage + ":" + sourceZones[0], - Description: "S3 Storage", + Description: "Object Storage - Storage (cloudscale.ch)", Query: "", - Unit: "GB/day", + Unit: "GBDay", During: db.InfiniteRange(), }, { Name: sourceQueryTrafficOut + ":" + sourceZones[0], - Description: "S3 Traffic Out", + Description: "Object Storage - Traffic Out (cloudscale.ch)", Query: "", Unit: "GB", During: db.InfiniteRange(), }, { Name: sourceQueryRequests + ":" + sourceZones[0], - Description: "S3 Requests", + Description: "Object Storage - Requests (cloudscale.ch)", Query: "", Unit: "KReq", During: db.InfiniteRange(), @@ -109,7 +112,7 @@ var ( } ) -func cfg() (string, string) { +func cfg() (string, string, int) { cloudscaleApiToken := os.Getenv(tokenEnvVariable) if cloudscaleApiToken == "" { fmt.Fprintf(os.Stderr, "ERROR: Environment variable %s must be set\n", tokenEnvVariable) @@ -122,7 +125,17 @@ func cfg() (string, string) { os.Exit(1) } - return cloudscaleApiToken, dbUrl + daysStr := os.Getenv(daysEnvVariable) + if daysStr == "" { + daysStr = "1" + } + days, err := strconv.Atoi(daysStr) + if err != nil { + fmt.Fprintf(os.Stderr, "ERROR: Environment variable %s must contain an integer\n", daysEnvVariable) + os.Exit(1) + } + + return cloudscaleApiToken, dbUrl, days } func initDb(ctx context.Context, tx *sqlx.Tx) error { @@ -149,131 +162,124 @@ func initDb(ctx context.Context, tx *sqlx.Tx) error { return nil } -func checkErrExit(err error) { +func main() { + ctx := context.Background() + err := sync(ctx) if err != nil { fmt.Fprintf(os.Stderr, "ERROR: %v\n", err) os.Exit(1) } + os.Exit(0) } -func main() { - ctx := context.Background() - cloudscaleApiToken, dbUrl := cfg() +func sync(ctx context.Context) error { + cloudscaleApiToken, dbUrl, days := cfg() cloudscaleClient := cloudscale.NewClient(http.DefaultClient) cloudscaleClient.AuthToken = cloudscaleApiToken // The cloudscale API works in Europe/Zurich, so we have to use the same, regardless of where this code runs location, err := time.LoadLocation("Europe/Zurich") - checkErrExit(err) + if err != nil { + return err + } // Fetch statistics of yesterday (as per Europe/Zurich). The metrics will cover the entire day. now := time.Now().In(location) - date := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()) - bucketMetricsRequest := cloudscale.BucketMetricsRequest{Start: date, End: date} - bucketMetrics, err := cloudscaleClient.Metrics.GetBucketMetrics(ctx, &bucketMetricsRequest) - checkErrExit(err) + date := time.Date(now.Year(), now.Month(), now.Day()-days, 0, 0, 0, 0, now.Location()) + if err != nil { + return err + } rdb, err := db.Openx(dbUrl) - checkErrExit(err) + if err != nil { + return err + } defer rdb.Close() // initialize DB tx, err := rdb.BeginTxx(ctx, &sql.TxOptions{}) - checkErrExit(err) + if err != nil { + return err + } defer tx.Rollback() err = initDb(ctx, tx) - checkErrExit(err) + if err != nil { + return err + } err = tx.Commit() - checkErrExit(err) + if err != nil { + return err + } - for _, bucketMetricsData := range bucketMetrics.Data { - // start new transaction for actual work - tx, err = rdb.BeginTxx(ctx, &sql.TxOptions{}) - checkErrExit(err) + accumulated, err := accumulateBucketMetrics(ctx, date, cloudscaleClient) + if err != nil { + return err + } - objectsUser, err := cloudscaleClient.ObjectsUsers.Get(ctx, bucketMetricsData.Subject.ObjectsUserID) - if err != nil || objectsUser == nil { - fmt.Fprintf(os.Stderr, "WARNING: Cannot sync bucket %s, objects user %s not found\n", bucketMetricsData.Subject.BucketName, bucketMetricsData.Subject.ObjectsUserID) + for source, value := range accumulated { + if value == 0 { continue } - tenantStr := objectsUser.Tags["tenant"] - if tenantStr == "" { - fmt.Fprintf(os.Stderr, "WARNING: Cannot sync bucket %s, no tenant information found on objectsUser\n", bucketMetricsData.Subject.BucketName) - continue - } - namespace := objectsUser.Tags["namespace"] - if namespace == "" { - fmt.Fprintf(os.Stderr, "WARNING: Cannot sync bucket %s, no namespace information found on objectsUser\n", bucketMetricsData.Subject.BucketName) - continue + fmt.Printf("syncing %s\n", source) + + // start new transaction for actual work + tx, err = rdb.BeginTxx(ctx, &sql.TxOptions{}) + if err != nil { + return err } - zone := objectsUser.Tags["zone"] - if zone == "" { - fmt.Fprintf(os.Stderr, "WARNING: Cannot sync bucket %s, no zone information found on objectsUser\n", bucketMetricsData.Subject.BucketName) - continue + + tenant, err := tenantsmodel.Ensure(ctx, tx, &db.Tenant{Source: source.Tenant}) + if err != nil { + return err } - sourceStorage := sourceQueryStorage + ":" + zone + ":" + tenantStr + ":" + namespace - sourceTrafficOut := sourceQueryTrafficOut + ":" + zone + ":" + tenantStr + ":" + namespace - sourceRequests := sourceQueryRequests + ":" + zone + ":" + tenantStr + ":" + namespace + category, err := categoriesmodel.Ensure(ctx, tx, &db.Category{Source: source.Zone + ":" + source.Namespace}) + if err != nil { + return err + } - tenant, err := tenantsmodel.Ensure(ctx, tx, &db.Tenant{Source: tenantStr}) - checkErrExit(err) + dateTime := datetimesmodel.New(source.Start) + dateTime, err = datetimesmodel.Ensure(ctx, tx, dateTime) + if err != nil { + return err + } - category, err := categoriesmodel.Ensure(ctx, tx, &db.Category{Source: zone + ":" + objectsUser.DisplayName}) - checkErrExit(err) + product, err := productsmodel.GetBestMatch(ctx, tx, source.String(), source.Start) + if err != nil { + return err + } - // Ensure a suitable dateTime object - dateTime := datetimesmodel.New(bucketMetricsData.TimeSeries[0].Start) - dateTime, err = datetimesmodel.Ensure(ctx, tx, dateTime) - checkErrExit(err) - - if bucketMetricsData.TimeSeries[0].Usage.StorageBytes > 0 { - fmt.Printf("syncing %s\n", sourceStorage) - product, err := productsmodel.GetBestMatch(ctx, tx, sourceStorage, bucketMetricsData.TimeSeries[0].Start) - checkErrExit(err) - discount, err := discountsmodel.GetBestMatch(ctx, tx, sourceStorage, bucketMetricsData.TimeSeries[0].Start) - checkErrExit(err) - query, err := queriesmodel.GetByName(ctx, tx, sourceQueryStorage+":"+zone) - checkErrExit(err) - storageQuantity := float64(bucketMetricsData.TimeSeries[0].Usage.StorageBytes) / 1000 / 1000 / 1000 - storageFact := factsmodel.New(dateTime, query, tenant, category, product, discount, storageQuantity) - _, err = factsmodel.Ensure(ctx, tx, storageFact) - checkErrExit(err) + discount, err := discountsmodel.GetBestMatch(ctx, tx, source.String(), source.Start) + if err != nil { + return err } - if bucketMetricsData.TimeSeries[0].Usage.SentBytes > 0 { - fmt.Printf("syncing %s\n", sourceTrafficOut) - product, err := productsmodel.GetBestMatch(ctx, tx, sourceTrafficOut, bucketMetricsData.TimeSeries[0].Start) - checkErrExit(err) - discount, err := discountsmodel.GetBestMatch(ctx, tx, sourceTrafficOut, bucketMetricsData.TimeSeries[0].Start) - checkErrExit(err) - query, err := queriesmodel.GetByName(ctx, tx, sourceQueryTrafficOut+":"+zone) - checkErrExit(err) - trafficOutQuantity := float64(bucketMetricsData.TimeSeries[0].Usage.SentBytes) / 1000 / 1000 / 1000 - trafficOutFact := factsmodel.New(dateTime, query, tenant, category, product, discount, trafficOutQuantity) - _, err = factsmodel.Ensure(ctx, tx, trafficOutFact) - checkErrExit(err) + query, err := queriesmodel.GetByName(ctx, tx, source.Query+":"+source.Zone) + if err != nil { + return err } - if bucketMetricsData.TimeSeries[0].Usage.Requests > 0 { - fmt.Printf("syncing %s\n", sourceTrafficOut) - product, err := productsmodel.GetBestMatch(ctx, tx, sourceRequests, bucketMetricsData.TimeSeries[0].Start) - checkErrExit(err) - discount, err := discountsmodel.GetBestMatch(ctx, tx, sourceRequests, bucketMetricsData.TimeSeries[0].Start) - checkErrExit(err) - query, err := queriesmodel.GetByName(ctx, tx, sourceQueryRequests+":"+zone) - checkErrExit(err) - requestsQuantity := float64(bucketMetricsData.TimeSeries[0].Usage.Requests) / 1000 - requestsFact := factsmodel.New(dateTime, query, tenant, category, product, discount, requestsQuantity) - _, err = factsmodel.Ensure(ctx, tx, requestsFact) - checkErrExit(err) + var quantity float64 + if query.Unit == "GB" || query.Unit == "GBDay" { + quantity = float64(value) / 1000 / 1000 / 1000 + } else if query.Unit == "KReq" { + quantity = float64(value) / 1000 + } else { + return errors.New("Unknown query unit " + query.Unit) + } + storageFact := factsmodel.New(dateTime, query, tenant, category, product, discount, quantity) + _, err = factsmodel.Ensure(ctx, tx, storageFact) + if err != nil { + return err } err = tx.Commit() - checkErrExit(err) + if err != nil { + return err + } } - os.Exit(0) + return nil }