Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement basic priority calculation. #24

Merged
merged 6 commits into from
Jul 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
grpcPort: ":50051"
redis:
addr: "localhost:6379"
priorityHalfTime: 20m
password: ""
db: 0
7 changes: 5 additions & 2 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package configuration

import "time"

type ArmadaConfig struct {
GrpcPort string
Redis RedisConfig
GrpcPort string
Redis RedisConfig
PriorityHalfTime time.Duration
}

type RedisConfig struct {
Expand Down
9 changes: 5 additions & 4 deletions internal/armada/repository/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
//"github.com/golang/protobuf/ptypes/timestamp"
)

const jobObjectPrefix = "job:"
const queuePrefix = "Job:Queue:"
const jobObjectPrefix = "Job:"
const jobQueuePrefix = "Job:Queue:"


type JobRepository interface {
AddJob(request *api.JobRequest) (string, error)
Expand All @@ -32,7 +33,7 @@ func (repo RedisJobRepository) AddJob(request *api.JobRequest) (string, error) {
return "", e
}

pipe.ZAdd(queuePrefix+job.Queue, redis.Z{
pipe.ZAdd(jobQueuePrefix+job.Queue, redis.Z{
Member: job.Id,
Score: job.Priority})

Expand All @@ -45,7 +46,7 @@ func (repo RedisJobRepository) AddJob(request *api.JobRequest) (string, error) {
}

func (repo RedisJobRepository) PeekQueue(queue string, limit int64) ([]*api.Job, error) {
ids, e := repo.Db.ZRange(queuePrefix+queue, 0, limit-1).Result()
ids, e := repo.Db.ZRange(jobQueuePrefix+queue, 0, limit-1).Result()
if e != nil {
return nil, e
}
Expand Down
20 changes: 20 additions & 0 deletions internal/armada/repository/queues.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package repository

import (
"github.com/go-redis/redis"
)

type Queue struct {
}

type QueueRepository interface {
GetQueues() ([]string, error)
}

type RedisQueueRepository struct {
Db *redis.Client
}

func (RedisQueueRepository) GetQueues() ([]string, error) {
panic("implement me")
}
87 changes: 87 additions & 0 deletions internal/armada/repository/usage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package repository

import (
"github.com/G-Research/k8s-batch/internal/armada/api"
"github.com/go-redis/redis"
"github.com/gogo/protobuf/proto"
"strconv"
)

type Usage struct {
PriorityPerQueue map[string]float64
CurrentUsagePerQueue map[string]float64
}

const clusterReportKey = "Cluster:Report"
const clusterPrioritiesPrefix = "Cluster:Priority:"

type UsageRepository interface {

GetClusterUsageReports() (map[string]*api.ClusterUsageReport, error)
GetClusterPriority(clusterId string) (map[string]float64, error)

UpdateCluster(report *api.ClusterUsageReport, priorities map[string]float64) error
}

type RedisUsageRepository struct {
Db *redis.Client
}


func (r RedisUsageRepository) GetClusterUsageReports() (map[string]*api.ClusterUsageReport, error) {
result, err := r.Db.HGetAll(clusterReportKey).Result()
if err != nil {
return nil, err
}
reports := make(map[string]*api.ClusterUsageReport)

for k, v := range result {
report := &api.ClusterUsageReport{}
e := proto.Unmarshal([]byte(v), report)
if e!= nil {
return nil, e
}
reports[k] = report
}
return reports, nil
}

func (r RedisUsageRepository) GetClusterPriority(clusterId string) (map[string]float64, error) {
result, err := r.Db.HGetAll(clusterPrioritiesPrefix+clusterId).Result()
if err != nil {
return nil, err
}
return toFloat64Map(result)
}

func (r RedisUsageRepository) UpdateCluster(report *api.ClusterUsageReport, priorities map[string]float64) error {

pipe := r.Db.TxPipeline()

data, e := proto.Marshal(report)
if e != nil {
return e
}
pipe.HSet(clusterReportKey, report.ClusterId, data)

untyped := make(map[string]interface{})
for k, v := range priorities {
untyped[k] = v
}
pipe.HMSet(clusterPrioritiesPrefix+report.ClusterId, untyped)

_, err := pipe.Exec()
return err
}

func toFloat64Map(result map[string]string) (map[string]float64, error) {
reports := make(map[string]float64)
for k, v := range result {
priority, e := strconv.ParseFloat(v, 64)
if e!= nil {
return nil, e
}
reports[k] = priority
}
return reports, nil
}
21 changes: 13 additions & 8 deletions internal/armada/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@ import (
"github.com/G-Research/k8s-batch/internal/armada/api"
"github.com/G-Research/k8s-batch/internal/armada/configuration"
"github.com/G-Research/k8s-batch/internal/armada/repository"
"github.com/G-Research/k8s-batch/internal/armada/service"
"github.com/G-Research/k8s-batch/internal/armada/server"
"github.com/go-redis/redis"
"google.golang.org/grpc"
"log"
"net"
"sync"
"time"
)

func Serve(config *configuration.ArmadaConfig) (*grpc.Server, *sync.WaitGroup) {
wg := &sync.WaitGroup{}
wg.Add(1)
server := grpc.NewServer()
grpcServer := grpc.NewServer()
go func () {
log.Printf("Grpc listening on %s", config.GrpcPort)
defer log.Println("Stopping server.")
Expand All @@ -27,22 +28,26 @@ func Serve(config *configuration.ArmadaConfig) (*grpc.Server, *sync.WaitGroup) {
})

jobRepository := &repository.RedisJobRepository{ Db: db }
submitServer := &service.SubmitServer{ JobRepository: jobRepository }
aggregatedQueueServer := &service.AggregatedQueueServer{ JobRepository: jobRepository }
usageRepository := &repository.RedisUsageRepository{ Db: db }

submitServer := &server.SubmitServer{ JobRepository: jobRepository }
usageServer := &server.UsageServer { UsageRepository: usageRepository, PriorityHalfTime: time.Minute }
aggregatedQueueServer := &server.AggregatedQueueServer{ JobRepository: jobRepository, UsageRepository: usageRepository }

lis, err := net.Listen("tcp", config.GrpcPort)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

api.RegisterSubmitServer(server, submitServer)
api.RegisterAggregatedQueueServer(server, aggregatedQueueServer)
api.RegisterSubmitServer(grpcServer, submitServer)
api.RegisterUsageServer(grpcServer, usageServer)
api.RegisterAggregatedQueueServer(grpcServer, aggregatedQueueServer)

if err := server.Serve(lis); err != nil {
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}

wg.Done()
} ()
return server, wg
return grpcServer, wg
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package service
package server

import (
"context"
Expand All @@ -10,9 +10,11 @@ import (

type AggregatedQueueServer struct {
JobRepository repository.JobRepository
UsageRepository repository.UsageRepository
}

func (AggregatedQueueServer) LeaseJobs(context.Context, *api.LeaseRequest) (*api.JobLease, error) {

//TODO Implement me
fmt.Println("Lease jobs called")
jobLease := api.JobLease{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package service
package server

import (
"context"
Expand Down
125 changes: 125 additions & 0 deletions internal/armada/server/usage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@

package server

import (
"context"
"github.com/G-Research/k8s-batch/internal/armada/api"
"github.com/G-Research/k8s-batch/internal/armada/repository"
"github.com/G-Research/k8s-batch/internal/common"
"github.com/gogo/protobuf/types"
"k8s.io/apimachinery/pkg/api/resource"
"math"
"math/big"
"time"
)

type UsageServer struct {
PriorityHalfTime time.Duration
UsageRepository repository.UsageRepository
}

func (s UsageServer) ReportUsage(ctx context.Context, report *api.ClusterUsageReport) (*types.Empty, error) {

reports, err := s.UsageRepository.GetClusterUsageReports()
if err != nil {
return nil, err
}

previousPriority, err := s.UsageRepository.GetClusterPriority(report.ClusterId)
if err != nil {
return nil, err
}

previousReport := reports[report.ClusterId]
timeChange := time.Minute
if previousReport != nil {
timeChange = report.ReportTime.Sub(previousReport.ReportTime)
}

reports[report.ClusterId] = report
availableResources := sumResources(reports)
resourceScarcity := calculateResourceScarcity(availableResources)
usage := calculateUsage(resourceScarcity, report.Queues)
newPriority := calculatePriority(usage, previousPriority, timeChange, s.PriorityHalfTime)

err = s.UsageRepository.UpdateCluster(report, newPriority)
if err != nil {
return nil, err
}
return nil, nil
}

func calculatePriority(usage map[string]float64, previousPriority map[string]float64, timeChange time.Duration, halfTime time.Duration) map[string]float64 {

newPriority := map[string]float64{}
timeChangeFactor := math.Pow(0.5, timeChange.Seconds() / halfTime.Seconds())

for queue, oldPriority := range previousPriority {
newPriority[queue] = timeChangeFactor * getOrDefault(usage, queue,0) +
(1 - timeChangeFactor) * oldPriority
}
for queue, usage := range usage {
_, exists := newPriority[queue]
if !exists {
newPriority[queue] = timeChangeFactor * usage
}
}
return newPriority
}

func calculateUsage(resourceScarcity map[string]float64, queues []*api.QueueReport) map[string]float64 {
usages := map[string]float64{}
for _, queue := range queues {
usage := 0.0
for resourceName, quantity := range queue.Resources {
scarcity := getOrDefault(resourceScarcity, resourceName, 1)
usage += asFloat64(quantity) * scarcity
}
usages[queue.Name] = usage
}
return usages
}

// Calculates inverse of resources per cpu unit
// { cpu: 4, memory: 20GB, gpu: 2 } -> { cpu: 1.0, memory: 0.2, gpu: 2 }
func calculateResourceScarcity(res common.ComputeResources) map[string]float64 {
importance := map[string]float64{
"cpu": 1,
}
cpu := asFloat64(res["cpu"])

for k, v := range res {
if k == "cpu"{
continue
}
q := asFloat64(v)
if q >= 0.00001 {
importance[k] = cpu / q
}
}
return importance
}

func sumResources(reports map[string]*api.ClusterUsageReport) common.ComputeResources {
result := common.ComputeResources{}
for _, report := range reports {
result.Add(report.ClusterCapacity)
}
return result
}

func getOrDefault(m map[string]float64, key string, def float64) float64 {
v, ok := m[key]
if ok {
return v
}
return def
}

func asFloat64(q resource.Quantity) float64 {
dec:= q.AsDec()
unscaled := dec.UnscaledBig()
scale := dec.Scale()
unscaledFloat, _ := new(big.Float).SetInt(unscaled).Float64()
return unscaledFloat * math.Pow10(-int(scale))
}
Loading