Skip to content

Commit

Permalink
Ray Webhook Support for Single-Host, Multi-Slice TPUs (#453)
Browse files Browse the repository at this point in the history
* Fix incorrect replicaIndex for single-host, multi replica

* Fix single-host, multi-slice deletion logic
  • Loading branch information
ryanaoleary authored Mar 26, 2024
1 parent b0f3182 commit bc25097
Showing 1 changed file with 46 additions and 42 deletions.
88 changes: 46 additions & 42 deletions applications/ray/kuberay-tpu-webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,30 +168,30 @@ func injectHostnames(hostNames string, envPath string, container corev1.Containe
*patches = append(*patches, subdomainPatch, hostNamesPatch)
}

func injectMultiHostReplicaLabel(clusterName string, namespace string, replicaIndex int, workerGroupName string, patches *[]patch) {
func injectReplicaLabel(clusterName string, namespace string, replicaIndex int, workerGroupName string, patches *[]patch) {
labelPatch := patch{"op": "replace"}
labelPath := "/metadata/labels/multiHostReplica"
multiHostReplicaValue := workerGroupName + "-" + strconv.Itoa(replicaIndex)
labelPath := "/metadata/labels/replicaIndex"
replicaLabelValue := workerGroupName + "-" + strconv.Itoa(replicaIndex)

klog.V(1).InfoS("injectMultiHostReplicaLabel", "RayCluster", namespace+"/"+clusterName, "multiHostReplica", multiHostReplicaValue)
klog.V(1).InfoS("injectReplicaLabel", "RayCluster", namespace+"/"+clusterName, "replicaIndex", replicaLabelValue)

labelPatch["path"] = labelPath
labelPatch["value"] = multiHostReplicaValue
labelPatch["value"] = replicaLabelValue

*patches = append(*patches, labelPatch)
}

// inject pod affinity and anti-affinity scheduling constraints using multiHostReplica label
// inject pod affinity and anti-affinity scheduling constraints using replicaIndex label
func injectPodAffinity(pod *corev1.Pod, replicaIndex int, workerGroupName string, patches *[]patch) {
key := "multiHostReplica"
key := "replicaIndex"
value := workerGroupName + "-" + strconv.Itoa(replicaIndex)
topologyKey := "cloud.google.com/gke-nodepool"
clusterName := pod.Labels["ray.io/cluster"]
namespace := pod.Namespace

klog.V(1).InfoS("injectPodAffinity", "RayCluster", namespace+"/"+clusterName, "podAffinity match label", value)

// construct affinity value to inject - schedule pods with the same multiHostReplica together
// construct affinity value to inject - schedule pods with the same replicaIndex together
podAffinityPatch := patch{"op": "add"}

affinitySelectorRequirement := metav1.LabelSelectorRequirement{key, metav1.LabelSelectorOpIn, []string{value}}
Expand Down Expand Up @@ -266,14 +266,14 @@ func validateRayCluster(admissionReview *admissionv1.AdmissionReview) (*admissio
// create mapping for pod slices -> TPU_WORKER_HOSTNAMES in cluster
replicas := int(*workerGroupSpec.Replicas)
numOfHosts := workerGroupSpec.NumOfHosts
if numOfHosts > 1 {
for replicaIndex := 0; replicaIndex < replicas; replicaIndex++ {
// reset past sliceToWorkers and sliceToHostnames entries for slice in ray cluster
groupName := workerGroupSpec.GroupName
podSlice := slice{clusterName, groupName, replicaIndex, numOfHosts}
sliceToWorkers[podSlice] = nil
sliceToHostnames[podSlice] = ""
// generate TPU_WORKER_HOSTNAMES
for replicaIndex := 0; replicaIndex < replicas; replicaIndex++ {
// reset past sliceToWorkers and sliceToHostnames entries for slice in ray cluster
groupName := workerGroupSpec.GroupName
podSlice := slice{clusterName, groupName, replicaIndex, numOfHosts}
sliceToWorkers[podSlice] = nil
sliceToHostnames[podSlice] = ""
// generate TPU_WORKER_HOSTNAMES
if numOfHosts > 1 {
joinedHostNames, err := genDNSHostnames(workerGroupSpec, replicaIndex)
if err != nil {
klog.Error("Failed to generate DNS Hostnames")
Expand Down Expand Up @@ -324,7 +324,7 @@ func getReplicaIndex(clusterName string, namespace string) int {
if sliceToWorkers == nil {
return 0
}
next_lowest_id := math.MaxInt32
nextLowestId := math.MaxInt32
for slice, workerList := range sliceToWorkers {
if slice.clusterName == clusterName {
createdPods := 0
Expand All @@ -334,14 +334,17 @@ func getReplicaIndex(clusterName string, namespace string) int {
}
}
if createdPods < int(slice.numOfHosts) {
if slice.replicaIndex < next_lowest_id {
next_lowest_id = slice.replicaIndex
if slice.replicaIndex < nextLowestId {
nextLowestId = slice.replicaIndex
}
}
}
}
klog.V(0).InfoS("getReplicaIndex", "RayCluster", namespace+"/"+clusterName, "Replica Index", next_lowest_id)
return next_lowest_id
if nextLowestId == math.MaxInt32 {
klog.ErrorS(errors.New("Replica Index never set"), "RayCluster", namespace+"/"+clusterName, "Replica Index", nextLowestId)
}
klog.V(0).InfoS("getReplicaIndex", "RayCluster", namespace+"/"+clusterName, "Replica Index", nextLowestId)
return nextLowestId
}

// returns next lowest TPU_WORKER_ID in pod slice and updates mappings
Expand All @@ -362,10 +365,10 @@ func getNextWorkerID(podSlice slice, namespace string, replicaIndex int) int {
}
// reassign next lowest TPU_WORKER_ID if pod has been deleted
if replacePod == true {
for _, worker := range sliceToWorkers[podSlice] {
for index, worker := range sliceToWorkers[podSlice] {
// set worker.isCreated to true now that pod is being re-created
if worker.workerIndex == nextLowestID {
worker.isCreated = true
sliceToWorkers[podSlice][index].isCreated = true
}
}
} else {
Expand Down Expand Up @@ -433,13 +436,13 @@ func mutatePod(admissionReview *admissionv1.AdmissionReview) (*admissionv1.Admis
numOfHosts, _ := getNumTPUHostsFromTopology(clusterName, namespace, topology, acceleratorType) // ignore error here because topology may not be set yet
replicaIndex := getReplicaIndex(clusterName, namespace)
podSlice := slice{clusterName, groupName, replicaIndex, numOfHosts}
tpuWorkerID := 0 // defaults to 0 for single-host
tpuWorkerID := getNextWorkerID(podSlice, namespace, replicaIndex) // defaults to 0 for single-host

// inject replica index label
injectReplicaLabel(clusterName, namespace, replicaIndex, groupName, &patches)

isMultiHost, _ := isTPUMultiHost(clusterName, namespace, topology, acceleratorType) // ignore error here because topology may not be set yet
if isMultiHost {
// get next unique TPU_WORKER_ID for multi-host slice
tpuWorkerID = getNextWorkerID(podSlice, namespace, replicaIndex)

// inject hostname into pod spec for DNS records
hostname := fmt.Sprintf(groupName+"-%d-%d", replicaIndex, tpuWorkerID)
klog.V(1).InfoS("mutatePod", "RayCluster", namespace+"/"+clusterName, "hostname", hostname)
Expand All @@ -448,9 +451,6 @@ func mutatePod(admissionReview *admissionv1.AdmissionReview) (*admissionv1.Admis
hostnamePatch["value"] = hostname
patches = append(patches, hostnamePatch)

// inject multi-host replica label
injectMultiHostReplicaLabel(clusterName, namespace, replicaIndex, groupName, &patches)

// inject pod affinity/anti-affinity for scheduling
injectPodAffinity(pod, replicaIndex, groupName, &patches)
}
Expand Down Expand Up @@ -535,14 +535,17 @@ func deletePod(admissionReview *admissionv1.AdmissionReview) (*admissionv1.Admis
if clusterName == "" {
return nil, errors.New("Kuberay Pod missing RayCluster label")
}
groupName := pod.Labels["ray.io/group"]
if groupName == "" {
return nil, errors.New("Kuberay Pod missing Ray group label")
}
namespace := pod.Namespace
multiHostReplicaLabel := pod.Labels["multiHostReplica"]

if multiHostReplicaLabel != "" {
multiHostReplicaLabelValues := strings.Split(multiHostReplicaLabel, "-")
groupName := multiHostReplicaLabelValues[0]
replicaIndex, _ := strconv.Atoi(multiHostReplicaLabelValues[1]) // ignore error here since must be set
replicaIndexLabel := pod.Labels["replicaIndex"]

if replicaIndexLabel != "" {
replicaIndexLabelValues := strings.Split(replicaIndexLabel, "-")
replicaIndex, _ := strconv.Atoi(replicaIndexLabelValues[1]) // ignore error here since must be set

containers := pod.Spec.Containers
if containers == nil {
return nil, errors.New("Pod spec missing containers")
Expand All @@ -557,22 +560,23 @@ func deletePod(admissionReview *admissionv1.AdmissionReview) (*admissionv1.Admis
break
}
}
// pod belongs to a multi-host replica -> update the map
if tpuWorkerID == -1 {
return nil, errors.New("Kuberay Pod missing TPU_WORKER_ID")
}
// update sliceToWorkers map
for slice, _ := range sliceToWorkers {
if slice.groupName == groupName && slice.replicaIndex == replicaIndex {
if slice.clusterName == clusterName && slice.groupName == groupName && slice.replicaIndex == replicaIndex {
// set the pod state to indicate it is not running
for _, worker := range sliceToWorkers[slice] {
for index, worker := range sliceToWorkers[slice] {
if worker.workerIndex == tpuWorkerID {
worker.isCreated = false
sliceToWorkers[slice][index].isCreated = false
klog.V(0).InfoS("deletePod", "RayCluster", namespace+"/"+clusterName, "TPU_WORKER_ID", tpuWorkerID, "Replica Index", replicaIndex)
break
}
}
break
}
}
} else {
klog.V(0).InfoS("deletePod", "RayCluster", namespace+"/"+clusterName, "Pod Name", pod.Name)
}

// Create AdmissionResponse - we never deny the deletion request
Expand Down

0 comments on commit bc25097

Please sign in to comment.