Skip to content

Commit

Permalink
Add reservation support in slurm sync for scheduled maintenance
Browse files Browse the repository at this point in the history
  • Loading branch information
harshthakkar01 committed Aug 7, 2024
1 parent 2b62970 commit aaedcf8
Showing 1 changed file with 142 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from itertools import chain
from pathlib import Path
import yaml
import subprocess
from typing import List, Dict

import util
from util import (
Expand Down Expand Up @@ -70,6 +72,16 @@
),
)

UPC_MAINT_CMD = "gcloud alpha compute instances list --project={}" \
" --filter='upcomingMaintenance:*' --format='value(name," \
"upcomingMaintenance.startTimeWindow.earliest," \
"upcomingMaintenance.startTimeWindow.latest," \
"upcomingMaintenance.canReschedule,upcomingMaintenance.type)'"

SLURM_CREATE_RESERVATION = "scontrol create reservation user=root starttime={} duration=180 nodes={} reservationname={}"
SLURM_DELETE_RESERVATION = "scontrol delete reservation {}"
SLURM_SHOW_RESERVATION = "scontrol show reservation"


def start_instance_op(inst):
return lkp.compute.instances().start(
Expand Down Expand Up @@ -472,6 +484,129 @@ def update_topology(lkp: util.Lookup) -> None:
log.debug("Topology configuration updated. Reconfiguring Slurm.")
util.scontrol_reconfigure(lkp)


def run_command(cmd: str, err_msg: str = None) -> subprocess.CompletedProcess:
res = subprocess.run(cmd, shell=True, universal_newlines=True, check=False,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if res.returncode != 0:
raise subprocess.SubprocessError(f"{err_msg}:\n{res.stderr}")

return res

def delete_reservations(reservation_map: Dict[str, str]) -> None:
err = "error deleting reservation"
for reservation_name in list(reservation_map.keys()):
res = run_command(SLURM_DELETE_RESERVATION.format(reservation_name), err)
del reservation_map[reservation_name]


def update_slurm_reservation_maintenance(reservation_name: str, node: str,
starttime: str, reservation_map: Dict[str, str]) -> None:
## If reservation exists for maintenance, update reservation if time window
## for scheduled maintenance changes.
if reservation_map is not None and reservation_name in reservation_map:
details = reservation_map[reservation_name]

nodes = None
start_time = None

values = details.split(",")
nodes = values[0].strip()
maintenance_start_time = values[1].strip()

if nodes and start_time:
if maintenance_start_time == starttime:
return

err = "error deleting reservation"
res = run_command(SLURM_DELETE_RESERVATION.format(reservation_name), err)

err = "error updating reservation"
res = run_command(SLURM_CREATE_RESERVATION.format(starttime, nodes, reservation_name), err)

del reservation_map[reservation_name] # remove the reservation from the map.

else: # Reservation doesn't exist, make reservation during maintenance window.
err = "error creating reservation."
res = run_command(SLURM_CREATE_RESERVATION.format(starttime, node, reservation_name), err)


def get_slurm_reservation_maintenance() -> Dict[str, str]:
err = "error getting reservation details"
res = run_command(SLURM_SHOW_RESERVATION, err)
all_reservations = [x.split() for x in res.stdout.split("\n\n")[:-1]]
reservation_map = {}

for reservation in all_reservations:
reservation_name = None
nodes = None
start_time = None
for item in reservation:
key, value = item.split('=', 1) # Split at the first '='

if key == 'ReservationName':
reservation_name = value
elif key == 'Nodes':
nodes = value
elif key == 'StartTime':
start_time = value

if reservation_name is None or nodes is None or start_time is None:
continue

# Check if the reservation is for scheduled maintenance.
maintenance_reservation = nodes + "_maintenance"
if reservation_name != maintenance_reservation:
continue

reservation_map[reservation_name] = nodes + ', ' + start_time

return reservation_map


def update_upcoming_maintenance(lkp: util.Lookup) -> List[str]:
err = "error getting upcoming maintenance list"
res = run_command(UPC_MAINT_CMD.format(lkp.project), err)

upc_maint = [x.split() for x in res.stdout.split("\n")[:-1]]
slurm_nodes = lkp.slurm_nodes().keys()
upc_maint_vms = [vm for sublist in upc_maint for vm in sublist if vm in slurm_nodes]

return upc_maint_vms


# Sync maintenance reservation gets upcoming maintenance notification and
# updates slurm reservation for the node during scheduled maintenance window.
def sync_maintenance_reservation(lkp: util.Lookup):
# Get upcoming maintenance details from the slurm cluster.
upc_maint_vms = update_upcoming_maintenance(lkp)
log.debug(f"upcoming-maintenance-vms: {upc_maint_vms}")

# Get reservation for maintenance.
# [reservation_name --> nodes + ', ' + start_time]
# reservation_name should be of format nodes_maintenance.
reservation_map = get_slurm_reservation_maintenance()
log.debug(f"reservation-map: {reservation_map}")

# Iterate upcoming maintenance and update reservation.
# Also remove reservation from the queue after update.
for row in upc_maint_vms:
vm_name = row[0]
reservation_name = vm_name + "_maintenance"
# Update start time format to be compatible with slurm reservation format.
starttime = datetime.strptime(row[1], "%Y-%m-%dT%H:%M:%S%z")
formatted_starttime = starttime.strftime("%Y-%m-%dT%H:%M:%S")

update_slurm_reservation_maintenance(reservation_name, vm_name, formatted_starttime, reservation_map)

# If reservation map is not empty, means we have reserved vms for maintenance
# and maintenance is already finished. Remove all these maintenance reservations.
if not reservation_map or len(reservation_map) == 0:
return

delete_reservations(reservation_map)


def main():
try:
reconfigure_slurm()
Expand All @@ -483,15 +618,22 @@ def main():
sync_slurm()
except Exception:
log.exception("failed to sync instances")

try:
sync_placement_groups()
except Exception:
log.exception("failed to sync placement groups")

try:
update_topology(lkp)
except Exception:
log.exception("failed to update topology")

try:
sync_maintenance_reservation(lkp)
except Exception:
log.exception("failed to sync slurm reservation for scheduled maintenance")

try:
install_custom_scripts(check_hash=True)
except Exception:
Expand Down

0 comments on commit aaedcf8

Please sign in to comment.