Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce counters #19

Merged
merged 4 commits into from
Sep 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/api-reference/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,26 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
/counters:
put:
summary: Force the server to refresh all the counters
description: Force server to refresh all the counters. This operation is blocking and expensive, please use with care.
operationId: refreshcounters
tags:
- stats
responses:
'200':
description: A message
content:
application/json:
schema:
$ref: "#/components/schemas/Message"
default:
description: unexpected error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
components:
schemas:
Cloud:
Expand Down
6 changes: 4 additions & 2 deletions internal/api/api.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package api

import (
"log"
"net/http"
"github.com/julienschmidt/httprouter"
"github.com/redhat-gpe/agnostics/internal/db"
"github.com/redhat-gpe/agnostics/internal/log"
"io"
)

Expand Down Expand Up @@ -38,6 +38,8 @@ func Serve() {
router.GET("/api/v1/placements", v1GetPlacements)
router.GET("/api/v1/placements/:uuid", v1GetPlacement)
router.DELETE("/api/v1/placements/:uuid", v1DeletePlacement)
router.PUT("/api/v1/counters", v1PutCounters)

log.Fatal(http.ListenAndServe(":8080", router))
log.Out.Println("API listen on port :8080")
log.Err.Fatal(http.ListenAndServe(":8080", router))
}
24 changes: 22 additions & 2 deletions internal/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ func v1GetPlacements(w http.ResponseWriter, req *http.Request, params httprouter
w.Header().Set("Content-Type", "application/json")
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
if p, err := placement.GetAll() ; err == nil {
log.Debug.Println("GET placement", p)
log.Debug.Println("GET all placements")
if p, err := placement.GetAll(0) ; err == nil {
if err := enc.Encode(p); err != nil {
log.Err.Println("GET placement", err)
w.WriteHeader(http.StatusInternalServerError)
Expand Down Expand Up @@ -291,3 +291,23 @@ func v1PullRepository(w http.ResponseWriter, req *http.Request, _ httprouter.Par
Message: "Request to update git repository received.",
})
}

func v1PutCounters(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
w.Header().Set("Content-Type", "application/json")
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
log.Out.Println("Refresh all counters")
err := placement.RefreshAllCounters()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Err.Println(err)
enc.Encode(v1.Error{
Code: http.StatusInternalServerError,
Message: "ERROR while refreshing counters.",
})

}
enc.Encode(v1.Message{
Message: "All counters updated",
})
}
6 changes: 6 additions & 0 deletions internal/api/v1/cloud.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package v1

// NewCloud is the constructor for a new Cloud object.
func NewCloud() Cloud {
return Cloud{Enabled: true}
}
2 changes: 1 addition & 1 deletion internal/api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type Cloud struct {
// by the scheduler later, depending on priorities configured.
// It's possible to add it to the config though, if needed.
Weight int `json:"weight"`
// Enabled defines if the cloud can be selected when loading the config. It's a top-level control. If it's set to false, then the cloud will not be loaded in the configuration. It takes precedence over scheduling, thus over taints and tolerations.
// Enabled defines if the cloud can be selected when loading the config. It's a top-level control. If it's set to false, then the cloud will not be used. It takes precedence over scheduling, thus over taints and tolerations.
// True by default.
Enabled bool `json:"enabled"`
// Taints are part of the mechanism to reduce the priority (effect=PreferNoSchedule)
Expand Down
6 changes: 2 additions & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,14 @@ func loadClouds() map[string]v1.Cloud {
log.Err.Println("Error in loadClouds()")
log.Err.Fatal(err)
}
cloud := v1.Cloud{Enabled: true}
cloud := v1.NewCloud()
err = yaml.Unmarshal(content, &cloud)
if err != nil {
log.Err.Println("Cannot read configuration of clouds.yml")
log.Err.Fatalf("Cannot unmarshal data: %v", err)
} else {
log.Debug.Printf("Found cloud %s (enabled=%v)\n", cloud.Name, cloud.Enabled)
if cloud.Enabled {
clouds[cloud.Name] = cloud
}
clouds[cloud.Name] = cloud
}
}

Expand Down
3 changes: 3 additions & 0 deletions internal/modules/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ func LabelPredicates(clouds map[string]v1.Cloud, labels map[string]string) map[s

out:
for k, v := range clouds {
if v.Enabled == false {
continue out
}
// Check if all labels match
for lk, lv := range labels {
if v.Labels[lk] != lv {
Expand Down
14 changes: 13 additions & 1 deletion internal/modules/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func TestLabelPredicates(t *testing.T) {
clouds = map[string]v1.Cloud{
"openstack-1": v1.Cloud{
Name: "openstack-1",
Enabled: true,
Labels: map[string]string{
"type": "osp",
"region": "na",
Expand All @@ -22,6 +23,7 @@ func TestLabelPredicates(t *testing.T) {
},
"openstack-2": v1.Cloud{
Name: "openstack-2",
Enabled: true,
Labels: map[string]string{
"type": "osp",
"region": "emea",
Expand All @@ -30,19 +32,29 @@ func TestLabelPredicates(t *testing.T) {
},
"openstack-3": v1.Cloud{
Name: "openstack-3",
Enabled: true,
Labels: map[string]string{
"type": "osp",
"region": "emea",
"purpose": "ELT",
},
},
"openstack-disabled": v1.Cloud{
Name: "openstack-disabled",
Enabled: false,
Labels: map[string]string{
"type": "osp",
"region": "emea",
"purpose": "OFF",
},
},
}

labels = map[string]string{
"type": "osp",
}
result = LabelPredicates(clouds, labels)
if len(result) != len(clouds) {
if len(result) != len(clouds) - 1 {
t.Error(clouds, labels, result)
}

Expand Down
121 changes: 113 additions & 8 deletions internal/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ func get(key string) (v1.Placement, error) {
} else if reply == nil {
return v1.Placement{}, ErrPlacementNotFound
} else {
log.Debug.Println("reply Get(", key ,")=", string(reply))

var p v1.Placement
if err := json.Unmarshal(reply, &p); err != nil {
return v1.Placement{}, err
Expand All @@ -44,7 +42,9 @@ func Get(uuid string) (v1.Placement, error) {
}

// Get retrives a placement from the DB.
func GetAll() ([]v1.Placement, error) {
// The 'count' parameter is the maximum number of placements to be returned.
// Set 'count' to 0 if you want the function to return all placements without limit.
func GetAll(count int) ([]v1.Placement, error) {
result := []v1.Placement{}
conn, err := db.Dial()
if err != nil {
Expand All @@ -53,13 +53,34 @@ func GetAll() ([]v1.Placement, error) {
}
defer conn.Close()

keys, err := redis.Strings(conn.Do("KEYS", "placement:*"))
if err != nil {
log.Err.Println("placement.Get error", err)
return []v1.Placement{}, err
// here we'll store our iterator value
iter := 0

// this will store the keys of each iteration
var keys []string
for {

// we scan with our iter offset, starting at 0
arr, err := redis.Values(conn.Do("SCAN", iter, "MATCH", "placement:*"))
if err != nil {
log.Err.Println("placement.Get error", err)
return []v1.Placement{}, err
}
// now we get the iter and the keys from the multi-bulk reply
iter, _ = redis.Int(arr[0], nil)
k, _ := redis.Strings(arr[1], nil)

keys = append(keys, k...)

// check if we need to stop...
if count != 0 && len(keys) > count {
break
}
if iter == 0 {
break
}
}
for _, key := range keys {
log.Debug.Println(key)
if p, err := get(key); err == nil {
result = append(result, p)
}
Expand All @@ -76,6 +97,12 @@ func Save(p v1.Placement) error {
}
defer conn.Close()

newEntry := false
pOld, err := Get(p.UUID)
if err == ErrPlacementNotFound {
newEntry = true
}

jsonText, err := json.Marshal(p)

if reply, err := conn.Do("JSON.SET", "placement:"+p.UUID, ".", jsonText); err != nil {
Expand All @@ -86,10 +113,81 @@ func Save(p v1.Placement) error {
return nil
} else {
log.Debug.Println("placement.Set(", p.UUID,")", reply)
// Update counter
countPlacementsByCloud(conn, "INCR", p.Cloud.Name)
countPlacementsByCloud(conn, "INCR", "all")
if newEntry == false {
countPlacementsByCloud(conn, "DECR", pOld.Cloud.Name)
countPlacementsByCloud(conn, "DECR", "all")
}

return nil
}
}

func countPlacementsByCloud(conn redis.Conn, command string, name string) error {
if reply, err := conn.Do(command, "counter:placements:"+name); err != nil {
log.Err.Println(command, "(counter:placements:", name,")", err)
return err
} else {
log.Debug.Println(command, "(counter:placements:", name,")", reply)
return nil
}
}

// GetCountPlacementsByCloud return the counter for that cloud name.
func GetCountPlacementsByCloud(name string) (string, error) {
conn, err := db.Dial()
if err != nil {
log.Err.Println("Cannot connect to redis:", err)
return "", err
}
defer conn.Close()

if reply, err := redis.String(conn.Do("GET", "counter:placements:"+name)); err != nil {
if err.Error() == "redigo: nil returned" {
return "0", nil
}
log.Err.Println("GET", "(counter:placements:", name,")", err)
return reply, err
} else {
return reply, nil
}
}

// RefreshAllCounters calculates and refreshes all the counters
func RefreshAllCounters() error {
placements, err := GetAll(0)
if err != nil {
return err
}

byCloud := map[string]int{}

for _, p := range placements {
byCloud[p.Cloud.Name] = byCloud[p.Cloud.Name] + 1
}

conn, err := db.Dial()
if err != nil {
log.Err.Println("Cannot connect to redis:", err)
return err
}
defer conn.Close()

for k, v := range byCloud {
if _, err := conn.Do("SET", "counter:placements:"+k, v); err != nil {
log.Err.Println("SET", "(counter:placements:", k,")", err)
return err
}
}
if _, err := conn.Do("SET", "counter:placements:all", len(placements)); err != nil {
log.Err.Println("SET", "(counter:placements:all)", err)
return err
}
return nil
}

// Delete deletes a placement from the database.

func Delete(uuid string) error {
Expand All @@ -100,6 +198,11 @@ func Delete(uuid string) error {
}
defer conn.Close()

p, err := Get(uuid)
if err == ErrPlacementNotFound {
return ErrPlacementNotFound
}

if reply, err := conn.Do("JSON.DEL", "placement:"+uuid); err != nil {
log.Debug.Println("placement.Delete(", uuid ,")=", reply)
if err == redis.ErrNil {
Expand All @@ -110,6 +213,8 @@ func Delete(uuid string) error {
return ErrPlacementNotFound
} else {
log.Debug.Println("reply Delete(", uuid ,")=", reply)
countPlacementsByCloud(conn, "DECR", p.Cloud.Name)
countPlacementsByCloud(conn, "DECR", "all")
return nil
}
}