Skip to content

Commit

Permalink
Added allocations to dispatchers (#288)
Browse files Browse the repository at this point in the history
Added allocations to dispatchers
  • Loading branch information
jshaofuturewei authored May 28, 2021
1 parent f1c1335 commit 64b7175
Show file tree
Hide file tree
Showing 25 changed files with 42,732 additions and 2,162 deletions.
Binary file modified docs/misc/Scheduler_Performance.xlsx
Binary file not shown.
10 changes: 6 additions & 4 deletions globalscheduler/cmd/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
var configInstance *Config

type Config struct {
Scheduler QpsConfig
Distributor QpsConfig
Dispatcher QpsConfig
Scheduler ResourceConfig
Distributor ResourceConfig
Dispatcher ResourceConfig
}

func init() {
Expand Down Expand Up @@ -61,7 +61,9 @@ func GetInstance() *Config {
func AddQPSFlags(config *restclient.Config, qpsConfig QpsConfig) {
kubeConfigs := config.GetAllConfigs()
for _, kubeConfig := range kubeConfigs {
kubeConfig.ContentType = qpsConfig.ContentType
if qpsConfig.ContentType != "" {
kubeConfig.ContentType = qpsConfig.ContentType
}
kubeConfig.QPS = qpsConfig.Qps
kubeConfig.Burst = qpsConfig.Burst
}
Expand Down
5 changes: 5 additions & 0 deletions globalscheduler/cmd/conf/qps.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,8 @@ type QpsConfig struct {
Burst int
ContentType string
}

type ResourceConfig struct {
Allocation QpsConfig
Pod QpsConfig
}
161 changes: 119 additions & 42 deletions globalscheduler/cmd/proxy-server/allocation_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,29 @@ type AllocationHandler struct {
clientset *allocclientset.Clientset
}

type AllocationResp struct {
Id types.UID `json:"id"`
ResourceGroup ResourceGroupResp `json:"resource_group"`
ResourceVersion string `json:"resource_version"`
Replicas int `json:"replicas"`
}

type ResourceGroupResp struct {
Name string `json:"name"`
Resources []ResourceResp `json:"resources"`
Selected []SelectedResp `json:"selected"`
}

type ResourceResp struct {
Name string `json:"name"`
FlavorId string `json:"flavor_id"`
}

type SelectedResp struct {
Region string `json:"region"`
AvailabilityZone string `json:"availability_zone"`
}

func NewAllocationHandler() *AllocationHandler {
config := getConfig()
clientset, err := allocclientset.NewForConfig(config)
Expand Down Expand Up @@ -68,26 +91,29 @@ func (handler *AllocationHandler) getAllocation(w http.ResponseWriter, r *http.R
return "", err
}
allocstr, err = yaml.Marshal(allocation)
if err != nil {
return "", err
}
}
return string(allocstr), nil
}

func (handler *AllocationHandler) createAllocation(w http.ResponseWriter, r *http.Request) error {
func (handler *AllocationHandler) createAllocation(w http.ResponseWriter, r *http.Request) (string, error) {
namespace, _ := getNamespaceAndName(r)
reqBody, err := ioutil.ReadAll(r.Body)
byteValue, err := ioutil.ReadAll(r.Body)
if err != nil {
klog.Errorf("Failed to read allocations with the error %v", err)
return err
return "", err
}
alloc, err := yaml2Allocation(reqBody)
alloc, err := yaml2Allocation(byteValue)
if err != nil {
klog.Errorf("Failed to covert yaml to allocation with the error %v", err)
return err
return "", err
}
createdAlloc, err := handler.clientset.GlobalschedulerV1().Allocations(namespace).Create(&alloc)
if err != nil {
klog.Errorf("Failed to create the allocation %v with the error %v", alloc, err)
return err
return "", err
}
duration := int64(TimeOut * time.Second * 2)
options := metav1.ListOptions{
Expand All @@ -98,26 +124,32 @@ func (handler *AllocationHandler) createAllocation(w http.ResponseWriter, r *htt
}
watcher := handler.clientset.GlobalschedulerV1().Allocations(namespace).Watch(options)
timer := time.NewTimer(TimeOut * time.Second)
return handler.watchAllocationPhase(namespace, createdAlloc.Name, createdAlloc, r.Context(), watcher, timer)
err = handler.watchAllocationPhase(namespace, createdAlloc.Name, createdAlloc, r.Context(), watcher, timer)
if err != nil {
return "", err
}
resp := composeResponse(createdAlloc)
respBytes, err := yaml.Marshal(resp)
return string(respBytes), err
}

func (handler *AllocationHandler) putAllocation(w http.ResponseWriter, r *http.Request) error {
func (handler *AllocationHandler) putAllocation(w http.ResponseWriter, r *http.Request) (string, error) {
namespace, _ := getNamespaceAndName(r)
reqBody, err := ioutil.ReadAll(r.Body)
byteValue, err := ioutil.ReadAll(r.Body)
if err != nil {
klog.Errorf("Failed to read allocations with the error %v", err)
return err
return "", err
}
alloc, err := yaml2Allocation(reqBody)
alloc, err := yaml2Allocation(byteValue)
if err != nil {
klog.Errorf("Failed to covert yaml to allocation with the error %v", err)
return err
return "", err
}

updatedAlloc, err := handler.clientset.GlobalschedulerV1().Allocations(namespace).Update(&alloc)
if err != nil {
klog.Errorf("Failed to update the allocation %v with the error %v", alloc, err)
return err
return "", err
}
duration := int64(TimeOut * time.Second)
options := metav1.ListOptions{
Expand All @@ -128,25 +160,32 @@ func (handler *AllocationHandler) putAllocation(w http.ResponseWriter, r *http.R
}
watcher := handler.clientset.GlobalschedulerV1().Allocations(namespace).Watch(options)
timer := time.NewTimer(TimeOut * time.Second)
return handler.watchAllocationPhase(namespace, updatedAlloc.Name, updatedAlloc, r.Context(), watcher, timer)
err = handler.watchAllocationPhase(namespace, updatedAlloc.Name, updatedAlloc, r.Context(), watcher, timer)
if err != nil {
return "", err
}
resp := composeResponse(updatedAlloc)
respBytes, err := yaml.Marshal(resp)
return string(respBytes), err
}

func (handler *AllocationHandler) patchAllocation(w http.ResponseWriter, r *http.Request) error {
func (handler *AllocationHandler) patchAllocation(w http.ResponseWriter, r *http.Request) (string, error) {
namespace, name := getNamespaceAndName(r)
reqBody, err := ioutil.ReadAll(r.Body)
byteValue, err := ioutil.ReadAll(r.Body)
if err != nil {
klog.Errorf("Failed to read allocations with the error %v", err)
return err
return "", err
}
jsonstr, err := yaml2json(reqBody)
jsonstr, err := yaml2json(byteValue)
if err != nil {
klog.Errorf("Failed to convert yaml %s to allocation with the error %v", string(reqBody), err)
return err
klog.Errorf("Failed to convert yaml %s to allocation with the error %v", string(byteValue), err)
return "", err
}

patchedAlloc, err := handler.clientset.GlobalschedulerV1().Allocations(namespace).Patch(name, types.MergePatchType, []byte(jsonstr))
if err != nil {
klog.Errorf("There is an error [%v] in patch application", err)
return err
return "", err
}

duration := int64(TimeOut * time.Second)
Expand All @@ -158,7 +197,13 @@ func (handler *AllocationHandler) patchAllocation(w http.ResponseWriter, r *http
}
watcher := handler.clientset.GlobalschedulerV1().Allocations(namespace).Watch(options)
timer := time.NewTimer(TimeOut * time.Second)
return handler.watchAllocationPhase(namespace, name, patchedAlloc, r.Context(), watcher, timer)
err = handler.watchAllocationPhase(namespace, name, patchedAlloc, r.Context(), watcher, timer)
if err != nil {
return "", err
}
resp := composeResponse(patchedAlloc)
respBytes, err := yaml.Marshal(resp)
return string(respBytes), err
}

func (handler *AllocationHandler) deleteAllocation(w http.ResponseWriter, r *http.Request) error {
Expand All @@ -175,9 +220,9 @@ func (handler *AllocationHandler) watchAllocationPhase(namespace, name string, a
for {
select {
case event := <-watcher.ResultChan():
allocObj, ok := event.Object.(*v1.Allocation)
alloc, ok := event.Object.(*v1.Allocation)
if ok {
status = string(allocObj.Status.Phase)
status = string(alloc.Status.Phase)
if status == string(corev1.ClusterScheduled) {
return nil
}
Expand Down Expand Up @@ -210,44 +255,40 @@ func (handler *AllocationHandler) ServeHTTP(w http.ResponseWriter, r *http.Reque
http.NotFound(w, r)
return
}
result := ""
var result string
var statusCode int
var err error

switch r.Method {
case "GET":
result, err = handler.getAllocation(w, r)
statusCode = http.StatusAccepted
case "POST":
err = handler.createAllocation(w, r)
result = http.StatusText(http.StatusCreated)
result, err = handler.createAllocation(w, r)
statusCode = http.StatusCreated
case "PUT":
err = handler.putAllocation(w, r)
result = http.StatusText(http.StatusAccepted)
result, err = handler.putAllocation(w, r)
statusCode = http.StatusAccepted
case "PATCH":
err = handler.patchAllocation(w, r)
result = http.StatusText(http.StatusAccepted)
result, err = handler.patchAllocation(w, r)
statusCode = http.StatusAccepted
case "DELETE":
err = handler.deleteAllocation(w, r)
result = http.StatusText(http.StatusAccepted)
statusCode = http.StatusAccepted
default:
result = http.StatusText(http.StatusNotImplemented)
w.WriteHeader(http.StatusNotImplemented)
statusCode = http.StatusNotImplemented
}
if err != nil {
internalError := http.StatusInternalServerError
http.Error(w, err.Error(), internalError)
} else if result != "" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
w.Write([]byte(result))
}
}

func yaml2Allocation(reqBody []byte) (alloc v1.Allocation, err error) {
if str, err := yaml2json(reqBody); err != nil {
klog.Errorf("Failed to convert to json with the error: %v", err)
} else {
err = json.Unmarshal([]byte(str), &alloc)
}
return alloc, err
}

func getNamespaceAndName(r *http.Request) (string, string) {
var name, namespace string
namespace = "default"
Expand All @@ -261,3 +302,39 @@ func getNamespaceAndName(r *http.Request) (string, string) {
}
return namespace, name
}

func yaml2Allocation(reqBody []byte) (alloc v1.Allocation, err error) {
if str, err := yaml2json(reqBody); err != nil {
klog.Errorf("Failed to convert to json with the error: %v", err)
} else {
err = json.Unmarshal([]byte(str), &alloc)
alloc.Status.ClusterNames = make([]string, 0)
}
return alloc, err
}

func composeResponse(alloc *v1.Allocation) AllocationResp {
resp := AllocationResp{}
resp.Id = alloc.ObjectMeta.GetUID()
resp.ResourceVersion = alloc.ObjectMeta.GetResourceVersion()
resp.ResourceGroup = ResourceGroupResp{}
resp.ResourceGroup.Name = alloc.Spec.ResourceGroup.Name
resp.Replicas = alloc.Spec.Replicas
resp.ResourceGroup.Resources = make([]ResourceResp, 0)
resp.ResourceGroup.Selected = make([]SelectedResp, 0)
for _, resource := range alloc.Spec.ResourceGroup.Resources {
resourceResp := ResourceResp{Name: resource.Name}
if len(resource.Flavors) > 0 {
resourceResp.FlavorId = resource.Flavors[0].FlavorId
}
resp.ResourceGroup.Resources = append(resp.ResourceGroup.Resources, resourceResp)
}
for _, region := range alloc.Spec.Selector.Regions {
selectedResp := SelectedResp{Region: region.Region}
if len(region.AvailabilityZone) > 0 {
selectedResp.AvailabilityZone = region.AvailabilityZone[0]
}
resp.ResourceGroup.Selected = append(resp.ResourceGroup.Selected, selectedResp)
}
return resp
}
13 changes: 9 additions & 4 deletions globalscheduler/controllers/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,15 @@ func CreateAllocationCRD() *apiextv1beta1.CustomResourceDefinition {
"ssd": {Type: "integer"},
},
},
"need_eip": {Type: "boolean"},
"image": {Type: "string"},
"security_group_id": {Type: "string"},
"nic_name": {Type: "string"},
"need_eip": {Type: "boolean"},
"virtual_machine": {
Type: "object",
Properties: map[string]apiextv1beta1.JSONSchemaProps{
"image": {Type: "string"},
"security_group_id": {Type: "string"},
"nic_name": {Type: "string"},
},
},
},
},
},
Expand Down
2 changes: 2 additions & 0 deletions globalscheduler/controllers/dispatcher/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ go_library(
"//globalscheduler/controllers:go_default_library",
"//globalscheduler/controllers/util:go_default_library",
"//globalscheduler/controllers/util/openstack:go_default_library",
"//globalscheduler/pkg/apis/allocation/client/clientset/versioned:go_default_library",
"//globalscheduler/pkg/apis/allocation/v1:go_default_library",
"//globalscheduler/pkg/apis/cluster/client/clientset/versioned:go_default_library",
"//globalscheduler/pkg/apis/cluster/client/informers/externalversions/cluster/v1:go_default_library",
"//globalscheduler/pkg/apis/cluster/v1:go_default_library",
Expand Down
Loading

0 comments on commit 64b7175

Please sign in to comment.