Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added allocations to dispatchers #288

Merged
merged 7 commits into from
May 28, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions globalscheduler/controllers/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,15 @@ func CreateAllocationCRD() *apiextv1beta1.CustomResourceDefinition {
"ssd": {Type: "integer"},
},
},
"need_eip": {Type: "boolean"},
"image": {Type: "string"},
"security_group_id": {Type: "string"},
"nic_name": {Type: "string"},
"need_eip": {Type: "boolean"},
"virtual_machine": {
Type: "object",
Properties: map[string]apiextv1beta1.JSONSchemaProps{
"image": {Type: "string"},
"security_group_id": {Type: "string"},
"nic_name": {Type: "string"},
},
},
},
},
},
Expand Down
2 changes: 2 additions & 0 deletions globalscheduler/controllers/dispatcher/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ go_library(
"//globalscheduler/controllers:go_default_library",
"//globalscheduler/controllers/util:go_default_library",
"//globalscheduler/controllers/util/openstack:go_default_library",
"//globalscheduler/pkg/apis/allocation/client/clientset/versioned:go_default_library",
"//globalscheduler/pkg/apis/allocation/v1:go_default_library",
"//globalscheduler/pkg/apis/cluster/client/clientset/versioned:go_default_library",
"//globalscheduler/pkg/apis/cluster/client/informers/externalversions/cluster/v1:go_default_library",
"//globalscheduler/pkg/apis/cluster/v1:go_default_library",
Expand Down
236 changes: 204 additions & 32 deletions globalscheduler/controllers/dispatcher/dispatcher_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,47 @@ import (
"k8s.io/kubernetes/globalscheduler/cmd/conf"
"k8s.io/kubernetes/globalscheduler/controllers/util"
"k8s.io/kubernetes/globalscheduler/controllers/util/openstack"
allocclientset "k8s.io/kubernetes/globalscheduler/pkg/apis/allocation/client/clientset/versioned"
allocv1 "k8s.io/kubernetes/globalscheduler/pkg/apis/allocation/v1"
clusterclientset "k8s.io/kubernetes/globalscheduler/pkg/apis/cluster/client/clientset/versioned"
dispatcherclientset "k8s.io/kubernetes/globalscheduler/pkg/apis/dispatcher/client/clientset/versioned"
dispatcherv1 "k8s.io/kubernetes/globalscheduler/pkg/apis/dispatcher/v1"
"os"
"reflect"
"sync"
"syscall"
"time"
)

const dispatcherName = "dispatcher"

type Process struct {
namespace string
name string
dispatcherClientset *dispatcherclientset.Clientset
clusterClientset *clusterclientset.Clientset
clientset *kubernetes.Clientset
podQueue chan *v1.Pod
clusterIpMap map[string]string
tokenMap map[string]string
clusterRange dispatcherv1.DispatcherRange
totalCreateLatency int64
totalDeleteLatency int64
totalPodCreateNum int
totalPodDeleteNum int
namespace string
name string
allocClientset *allocclientset.Clientset
dispatcherClientset *dispatcherclientset.Clientset
clusterClientset *clusterclientset.Clientset
clientset *kubernetes.Clientset
podQueue chan *v1.Pod
clusterIpMap map[string]string
tokenMap map[string]string
clusterRange dispatcherv1.DispatcherRange
totalCreateLatency int64
totalDeleteLatency int64
totalPodCreateNum int
totalPodDeleteNum int
totalAllocationCreateNum int
totalAllocationDeleteNum int
}

func NewProcess(config *rest.Config, namespace string, name string, quit chan struct{}) Process {
podQueue := make(chan *v1.Pod, 300)

allocationClientset, err := allocclientset.NewForConfig(config)
if err != nil {
klog.Fatal(err)
}

dispatcherClientset, err := dispatcherclientset.NewForConfig(config)
if err != nil {
klog.Fatal(err)
Expand All @@ -76,19 +87,22 @@ func NewProcess(config *rest.Config, namespace string, name string, quit chan st
}

return Process{
namespace: namespace,
name: name,
clientset: clientset,
clusterClientset: clusterClientset,
dispatcherClientset: dispatcherClientset,
podQueue: podQueue,
clusterIpMap: make(map[string]string),
tokenMap: make(map[string]string),
clusterRange: dispatcher.Spec.ClusterRange,
totalCreateLatency: 0,
totalDeleteLatency: 0,
totalPodCreateNum: 0,
totalPodDeleteNum: 0,
namespace: namespace,
name: name,
clientset: clientset,
allocClientset: allocationClientset,
clusterClientset: clusterClientset,
dispatcherClientset: dispatcherClientset,
podQueue: podQueue,
clusterIpMap: make(map[string]string),
tokenMap: make(map[string]string),
clusterRange: dispatcher.Spec.ClusterRange,
totalCreateLatency: 0,
totalDeleteLatency: 0,
totalPodCreateNum: 0,
totalPodDeleteNum: 0,
totalAllocationCreateNum: 0,
totalAllocationDeleteNum: 0,
}
}

Expand Down Expand Up @@ -121,7 +135,44 @@ func (p *Process) Run(quit chan struct{}) {
})

go dispatcherInformer.Run(quit)
boundPodnformer := p.initPodInformer(v1.PodBound, cache.ResourceEventHandlerFuncs{

boundAllocationInformer := p.initAllocationInformer(allocv1.AllocationBound, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
alloc, ok := obj.(*allocv1.Allocation)
if !ok {
klog.Warningf("Failed to convert object %+v to an allocation", obj)
return
}
klog.V(4).Infof("An allocation %s has been added", alloc.Name)

util.CheckTime(alloc.GetName(), "dispatcher", "CreateAllocation-Start", 1)
go func() {
p.SendAllocationToCluster(alloc)
}()
},
})
go boundAllocationInformer.Run(quit)

scheduledAllocationInformer := p.initAllocationInformer(allocv1.AllocationScheduled, cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
alloc, ok := obj.(*allocv1.Allocation)
if !ok {
klog.Warningf("Failed to convert an deleted object %+v to an allocation", obj)
return
}
klog.V(4).Infof("An allocation %s has been deleted", alloc.Name)
if alloc.ObjectMeta.DeletionTimestamp == nil {
alloc.ObjectMeta.SetDeletionTimestamp(&metav1.Time{})
}
util.CheckTime(alloc.Name, "dispatcher", "DeleteAllocation-Start", 1)
go func() {
p.SendAllocationToCluster(alloc)
}()
},
})
go scheduledAllocationInformer.Run(quit)

boundPodInformer := p.initPodInformer(v1.PodBound, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
Expand All @@ -135,8 +186,8 @@ func (p *Process) Run(quit chan struct{}) {
}()
},
})
go boundPodnformer.Run(quit)
scheduledPodnformer := p.initPodInformer(v1.ClusterScheduled, cache.ResourceEventHandlerFuncs{
go boundPodInformer.Run(quit)
scheduledPodInformer := p.initPodInformer(v1.ClusterScheduled, cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
Expand All @@ -150,11 +201,21 @@ func (p *Process) Run(quit chan struct{}) {
}()
},
})
go scheduledPodnformer.Run(quit)
go scheduledPodInformer.Run(quit)

go p.refreshToken()
<-quit
}

func (p *Process) initAllocationInformer(phase allocv1.AllocationPhase, funcs cache.ResourceEventHandlerFuncs) cache.SharedIndexInformer {
allocSelector := fields.ParseSelectorOrDie(fmt.Sprintf("status.phase=%s,status.cluster_name=gte:%s,status.cluster_name=lte:%s", string(phase),
p.clusterRange.Start, p.clusterRange.End))
allocLW := cache.NewListWatchFromClient(p.allocClientset.GlobalschedulerV1(), "allocations", p.namespace, allocSelector)
allocInformer := cache.NewSharedIndexInformer(allocLW, &allocv1.Allocation{}, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
allocInformer.AddEventHandler(funcs)
return allocInformer
}

func (p *Process) initPodInformer(phase v1.PodPhase, funcs cache.ResourceEventHandlerFuncs) cache.SharedIndexInformer {
podSelector := fields.ParseSelectorOrDie(fmt.Sprintf("status.phase=%s,spec.clusterName=gte:%s,spec.clusterName=lte:%s", string(phase),
p.clusterRange.Start, p.clusterRange.End))
Expand Down Expand Up @@ -278,8 +339,7 @@ func (p *Process) refreshToken() {
}

func (p *Process) init() {
fieldSelector := fmt.Sprintf("metadata.name=gte:%s,metadata.name=lte:%s", p.clusterRange.Start, p.clusterRange.End)
clusters, err := p.clusterClientset.GlobalschedulerV1().Clusters(metav1.NamespaceDefault).List(metav1.ListOptions{FieldSelector: fieldSelector})
clusters, err := p.clusterClientset.GlobalschedulerV1().Clusters(metav1.NamespaceDefault).List(metav1.ListOptions{})
if err != nil {
klog.Warningf("Failed to get clusters with error %v", err)
}
Expand All @@ -290,3 +350,115 @@ func (p *Process) init() {
}
}
}

func (p *Process) SendAllocationToCluster(alloc *allocv1.Allocation) {
if alloc != nil {
klog.V(2).Infof("Processing the item %v", alloc)

alloc.Status.DispatcherName = p.name
if alloc.ObjectMeta.DeletionTimestamp != nil {
util.CheckTime(alloc.Name, "dispatcher", "DeleteAllocation-End", 2)
p.totalAllocationDeleteNum += 1
p.deleteAllocationResources(alloc)
// Calculate delete latency
allocDeleteTime := alloc.DeletionTimestamp
currentTime := time.Now().UTC()
duration := (currentTime.UnixNano() - allocDeleteTime.UnixNano()) / 1000000
p.totalDeleteLatency += duration
deleteLatency := int(duration)
klog.V(2).Infof("************************************ Allocation Name: %s, Delete Latency: %d Millisecond ************************************", alloc.Name, deleteLatency)
pdgetrf marked this conversation as resolved.
Show resolved Hide resolved

// Calculate average delete latency
averageDeleteLatency := int(p.totalDeleteLatency) / p.totalPodDeleteNum
klog.V(2).Infof("%%%%%%%%%%%%%%%%%%%%%%%%%% Total Number of Allocation Deleted: %d, Average Delete Latency: %d Millisecond %%%%%%%%%%%%%%%%%%%%%%%%%%", p.totalAllocationDeleteNum, averageDeleteLatency)
p.totalAllocationCreateNum += 1
} else {
util.CheckTime(alloc.Name, "dispatcher", "CreateAllocation-End", 2)
p.totalAllocationCreateNum += 1
// Calculate create latency
podCreateTime := alloc.CreationTimestamp
currentTime := time.Now().UTC()
duration := (currentTime.UnixNano() - podCreateTime.UnixNano()) / 1000000
p.totalCreateLatency += duration
createLatency := int(duration)
klog.V(2).Infof("************************************ Allocation Name: %s, Create Latency: %d Millisecond ************************************", alloc.Name, createLatency)

// Calculate average create latency
averageCreateLatency := int(p.totalCreateLatency) / p.totalAllocationCreateNum
klog.V(2).Infof("%%%%%%%%%%%%%%%%%%%%%%%%%% Total Number of Allocation Created: %d, Average Create Latency: %d Millisecond %%%%%%%%%%%%%%%%%%%%%%%%%%", p.totalAllocationCreateNum, averageCreateLatency)

if isAllResourcesCreated := p.isAllAllocationResourcesCreated(alloc); isAllResourcesCreated {
alloc.Status.Phase = allocv1.AllocationScheduled
} else {
alloc.Status.Phase = allocv1.AllocationFailed
p.deleteAllocationResources(alloc)
}
if _, err := p.allocClientset.GlobalschedulerV1().AllocationsWithMultiTenancy(alloc.Namespace, alloc.ObjectMeta.Tenant).Update(alloc); err != nil {
klog.Warningf("The allocation %v failed to update its status phase to %v with the error %v", alloc.Name, alloc.Status.Phase, err)
} else {
klog.Infof("The allocation %s updated its status phase to %v successfully", alloc.Name, alloc.Status.Phase)
}
}

}
}
func (p *Process) isAllAllocationResourcesCreated(alloc *allocv1.Allocation) bool {
for idx, resource := range alloc.Spec.ResourceGroup.Resources {
for ciIdx, clusterInstance := range resource.VirtualMachine.ClusterInstances {
host, err := p.getHostIP(clusterInstance.ClusterName)
if err != nil {
klog.Warningf("Failed to get host from the cluster %v", clusterInstance.ClusterName)
return false
}
token, err := p.getToken(host)
if err != nil {
klog.Warningf("Failed to get token from host %v", host)
return false
}
instanceId, err := openstack.ServerCreateResources(host, token, clusterInstance.ClusterName, &resource)
if err == nil {
alloc.Spec.ResourceGroup.Resources[idx].VirtualMachine.ClusterInstances[ciIdx].InstanceId = instanceId
klog.V(3).Infof("The openstack vm for the allocation %s resource %s has been created at the host %v as instance %s", alloc.Name, resource.Name, host, instanceId)
} else {
klog.Warningf("The openstack vm for the allocation %s resource %s failed to create with the error %v", alloc.Name, resource.Name, err)
return false
}
}
}
return true
}

func (p *Process) deleteAllocationResources(alloc *allocv1.Allocation) {
var wg sync.WaitGroup
for _, resource := range alloc.Spec.ResourceGroup.Resources {
for _, clusterInstance := range resource.VirtualMachine.ClusterInstances {
instanceid := clusterInstance.InstanceId
clustername := clusterInstance.ClusterName
if instanceid != "" {
wg.Add(1)
host, err := p.getHostIP(clustername)
if err != nil {
klog.Warningf("Failed to get host from the cluster %v", clustername)
return
}
token, err := p.getToken(host)
if err != nil {
klog.Warningf("Failed to get token from host %v", host)
return
}

go func() {
defer wg.Done()
err = openstack.DeleteInstance(host, token, instanceid)

if err == nil {
klog.V(3).Infof("The openstack vm for the allocation %s resource %s has been deleted at the host %v", alloc.Name, resource.Name, host)
} else {
klog.Warningf("The openstack vm for the allocation %s resource %s failed to delete with the error %v", alloc.Name, resource.Name, err)
}
}()
}
}
}
wg.Wait()
}
1 change: 1 addition & 0 deletions globalscheduler/controllers/util/openstack/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "k8s.io/kubernetes/globalscheduler/controllers/util/openstack",
visibility = ["//visibility:public"],
deps = [
"//globalscheduler/pkg/apis/allocation/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
Expand Down
Loading