Skip to content
This repository has been archived by the owner on Jan 19, 2023. It is now read-only.

Commit

Permalink
Merge pull request #4 from vshn/accumulate-first
Browse files Browse the repository at this point in the history
Accumulate data first & fix error handling
  • Loading branch information
davidgubler authored Jul 28, 2022
2 parents b07d50d + 83693ff commit fbcca51
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 94 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,8 @@ cloudscale-metrics-collector
.public/
node_modules/

# IDE
.idea/

# Configuration
env
104 changes: 104 additions & 0 deletions accumulate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package main

import (
"context"
"fmt"
"github.com/cloudscale-ch/cloudscale-go-sdk/v2"
"os"
"time"
)

// AccumulateKey represents one data point ("fact") in the billing database.
// The actual value for the data point is not present in this type, as this type is just a map key, and the corresponding value is stored as a map value.
type AccumulateKey struct {
Query string
Zone string
Tenant string
Namespace string
Start time.Time
}

// String returns the full "source" string as used by the appuio-cloud-reporting
func (this AccumulateKey) String() string {
return this.Query + ":" + this.Zone + ":" + this.Tenant + ":" + this.Namespace
}

/*
accumulateBucketMetrics gets all the bucket metrics from cloudscale and puts them into a map. The map key is the "AccumulateKey",
and the value is the raw value of the data returned by cloudscale (e.g. bytes, requests). In order to construct the
correct AccumulateKey, this function needs to fetch the ObjectUsers's tags, because that's where the zone, tenant and
namespace are stored.
This method is "accumulating" data because it collects data from possibly multiple ObjectsUsers under the same
AccumulateKey. This is because the billing system can't handle multiple ObjectsUsers per namespace.
*/
func accumulateBucketMetrics(ctx context.Context, date time.Time, cloudscaleClient *cloudscale.Client) (map[AccumulateKey]uint64, error) {
bucketMetricsRequest := cloudscale.BucketMetricsRequest{Start: date, End: date}
bucketMetrics, err := cloudscaleClient.Metrics.GetBucketMetrics(ctx, &bucketMetricsRequest)
if err != nil {
return nil, err
}

accumulated := make(map[AccumulateKey]uint64)

for _, bucketMetricsData := range bucketMetrics.Data {
objectsUser, err := cloudscaleClient.ObjectsUsers.Get(ctx, bucketMetricsData.Subject.ObjectsUserID)
if err != nil || objectsUser == nil {
fmt.Fprintf(os.Stderr, "WARNING: Cannot sync bucket %s, objects user %s not found\n", bucketMetricsData.Subject.BucketName, bucketMetricsData.Subject.ObjectsUserID)
continue
}
err = accumulateBucketMetricsForObjectsUser(accumulated, bucketMetricsData, objectsUser)
if err != nil {
fmt.Fprintf(os.Stderr, "WARNING: Cannot sync bucket %s: %v\n", bucketMetricsData.Subject.BucketName, err)
continue
}
}

return accumulated, nil
}

func accumulateBucketMetricsForObjectsUser(accumulated map[AccumulateKey]uint64, bucketMetricsData cloudscale.BucketMetricsData, objectsUser *cloudscale.ObjectsUser) error {
if len(bucketMetricsData.TimeSeries) != 1 {
return fmt.Errorf("There must be exactly one metrics data point, found %d", len(bucketMetricsData.TimeSeries))
}

tenantStr := objectsUser.Tags["tenant"]
if tenantStr == "" {
return fmt.Errorf("no tenant information found on objectsUser")
}
namespace := objectsUser.Tags["namespace"]
if namespace == "" {
return fmt.Errorf("no namespace information found on objectsUser")
}
zone := objectsUser.Tags["zone"]
if zone == "" {
return fmt.Errorf("no zone information found on objectsUser")
}

sourceStorage := AccumulateKey{
Query: sourceQueryStorage,
Zone: zone,
Tenant: tenantStr,
Namespace: namespace,
Start: bucketMetricsData.TimeSeries[0].Start,
}
sourceTrafficOut := AccumulateKey{
Query: sourceQueryTrafficOut,
Zone: zone,
Tenant: tenantStr,
Namespace: namespace,
Start: bucketMetricsData.TimeSeries[0].Start,
}
sourceRequests := AccumulateKey{
Query: sourceQueryRequests,
Zone: zone,
Tenant: tenantStr,
Namespace: namespace,
Start: bucketMetricsData.TimeSeries[0].Start,
}

accumulated[sourceStorage] += uint64(bucketMetricsData.TimeSeries[0].Usage.StorageBytes)
accumulated[sourceTrafficOut] += uint64(bucketMetricsData.TimeSeries[0].Usage.SentBytes)
accumulated[sourceRequests] += uint64(bucketMetricsData.TimeSeries[0].Usage.Requests)

return nil
}
72 changes: 72 additions & 0 deletions accumulate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"fmt"
"github.com/cloudscale-ch/cloudscale-go-sdk/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
"time"
)

// assertEqualfUint64 implements the functionality of assert.Equalf for uint64, because assert.Equalf cannot print uint64 correctly.
// See https://github.com/stretchr/testify/issues/400
func assertEqualfUint64(t *testing.T, expected uint64, actual uint64, msg string, args ...interface{}) bool {
if expected != actual {
return assert.Fail(t, fmt.Sprintf("Not equal: \n"+
"expected: %d\n"+
"actual : %d", expected, actual))
}
return true
}

func TestAccumulateBucketMetricsForObjectsUser(t *testing.T) {
zone := "appuio-cloudscale-ch-lpg"
tenant := "inity"
namespace := "testnamespace"

// get the correct date for a data set that was created yesterday
location, err := time.LoadLocation("Europe/Zurich")
require.NoError(t, err, "could not load location Europe/Zurich")
now := time.Now().In(location)
date := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location())

// build input data structure
bucketMetricsInterval := []cloudscale.BucketMetricsInterval{
{
Start: date,
End: date,
Usage: cloudscale.BucketMetricsIntervalUsage{
Requests: 5,
StorageBytes: 1000000,
SentBytes: 2000000,
},
},
}
bucketMetricsData := cloudscale.BucketMetricsData{
TimeSeries: bucketMetricsInterval,
}
objectsUser := cloudscale.ObjectsUser{}
objectsUser.Tags = cloudscale.TagMap{"zone": zone, "tenant": tenant, "namespace": namespace}

accumulated := make(map[AccumulateKey]uint64)
accumulateBucketMetricsForObjectsUser(accumulated, bucketMetricsData, &objectsUser)

require.Len(t, accumulated, 3, "incorrect amount of values 'accumulated'")

key := AccumulateKey{
Zone: zone,
Tenant: tenant,
Namespace: namespace,
Start: date,
}

key.Query = "object-storage-requests"
assertEqualfUint64(t, uint64(5), accumulated[key], "incorrect value in %s", key)

key.Query = "object-storage-storage"
assertEqualfUint64(t, uint64(1000000), accumulated[key], "incorrect value in %s", key)

key.Query = "object-storage-traffic-out"
assertEqualfUint64(t, uint64(2000000), accumulated[key], "incorrect value in %s", key)
}
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ require (
github.com/go-logr/logr v1.2.3
github.com/go-logr/zapr v1.2.3
github.com/jmoiron/sqlx v1.3.5
github.com/stretchr/testify v1.8.0
github.com/urfave/cli/v2 v2.10.3
go.uber.org/zap v1.21.0
)

require (
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.12.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
Expand All @@ -24,12 +26,13 @@ require (
github.com/jackc/pgtype v1.11.0 // indirect
github.com/jackc/pgx/v4 v4.16.0 // indirect
github.com/lopezator/migrator v0.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/stretchr/testify v1.8.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
Expand Down Expand Up @@ -229,6 +231,7 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s=
Expand Down
Loading

0 comments on commit fbcca51

Please sign in to comment.