diff --git a/community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/slurmsync.py b/community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/slurmsync.py index e648d6b80c..7547e3d127 100755 --- a/community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/slurmsync.py +++ b/community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/slurmsync.py @@ -26,6 +26,8 @@ from itertools import chain from pathlib import Path import yaml +import subprocess +from typing import List, Dict import util from util import ( @@ -70,6 +72,16 @@ ), ) +UPC_MAINT_CMD = "gcloud alpha compute instances list --project={}" \ + " --filter='upcomingMaintenance:*' --format='value(name," \ + "upcomingMaintenance.startTimeWindow.earliest," \ + "upcomingMaintenance.startTimeWindow.latest," \ + "upcomingMaintenance.canReschedule,upcomingMaintenance.type)'" + +SLURM_CREATE_RESERVATION = "scontrol create reservation user=root starttime={} duration=180 nodes={} reservationname={}" +SLURM_DELETE_RESERVATION = "scontrol delete reservation {}" +SLURM_SHOW_RESERVATION = "scontrol show reservation" + def start_instance_op(inst): return lkp.compute.instances().start( @@ -472,6 +484,129 @@ def update_topology(lkp: util.Lookup) -> None: log.debug("Topology configuration updated. Reconfiguring Slurm.") util.scontrol_reconfigure(lkp) + +def run_command(cmd: str, err_msg: str = None) -> subprocess.CompletedProcess: + res = subprocess.run(cmd, shell=True, universal_newlines=True, check=False, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if res.returncode != 0: + raise subprocess.SubprocessError(f"{err_msg}:\n{res.stderr}") + + return res + +def delete_reservations(reservation_map: Dict[str, str]) -> None: + err = "error deleting reservation" + for reservation_name in list(reservation_map.keys()): + res = run_command(SLURM_DELETE_RESERVATION.format(reservation_name), err) + del reservation_map[reservation_name] + + +def update_slurm_reservation_maintenance(reservation_name: str, node: str, + starttime: str, reservation_map: Dict[str, str]) -> None: + ## If reservation exists for maintenance, update reservation if time window + ## for scheduled maintenance changes. + if reservation_map is not None and reservation_name in reservation_map: + details = reservation_map[reservation_name] + + nodes = None + start_time = None + + values = details.split(",") + nodes = values[0].strip() + maintenance_start_time = values[1].strip() + + if nodes and start_time: + if maintenance_start_time == starttime: + return + + err = "error deleting reservation" + res = run_command(SLURM_DELETE_RESERVATION.format(reservation_name), err) + + err = "error updating reservation" + res = run_command(SLURM_CREATE_RESERVATION.format(starttime, nodes, reservation_name), err) + + del reservation_map[reservation_name] # remove the reservation from the map. + + else: # Reservation doesn't exist, make reservation during maintenance window. + err = "error creating reservation." + res = run_command(SLURM_CREATE_RESERVATION.format(starttime, node, reservation_name), err) + + +def get_slurm_reservation_maintenance() -> Dict[str, str]: + err = "error getting reservation details" + res = run_command(SLURM_SHOW_RESERVATION, err) + all_reservations = [x.split() for x in res.stdout.split("\n\n")[:-1]] + reservation_map = {} + + for reservation in all_reservations: + reservation_name = None + nodes = None + start_time = None + for item in reservation: + key, value = item.split('=', 1) # Split at the first '=' + + if key == 'ReservationName': + reservation_name = value + elif key == 'Nodes': + nodes = value + elif key == 'StartTime': + start_time = value + + if reservation_name is None or nodes is None or start_time is None: + continue + + # Check if the reservation is for scheduled maintenance. + maintenance_reservation = nodes + "_maintenance" + if reservation_name != maintenance_reservation: + continue + + reservation_map[reservation_name] = nodes + ', ' + start_time + + return reservation_map + + +def update_upcoming_maintenance(lkp: util.Lookup) -> List[str]: + err = "error getting upcoming maintenance list" + res = run_command(UPC_MAINT_CMD.format(lkp.project), err) + + upc_maint = [x.split() for x in res.stdout.split("\n")[:-1]] + slurm_nodes = lkp.slurm_nodes().keys() + upc_maint_vms = [vm for sublist in upc_maint for vm in sublist if vm in slurm_nodes] + + return upc_maint_vms + + +# Sync maintenance reservation gets upcoming maintenance notification and +# updates slurm reservation for the node during scheduled maintenance window. +def sync_maintenance_reservation(lkp: util.Lookup): + # Get upcoming maintenance details from the slurm cluster. + upc_maint_vms = update_upcoming_maintenance(lkp) + log.debug(f"upcoming-maintenance-vms: {upc_maint_vms}") + + # Get reservation for maintenance. + # [reservation_name --> nodes + ', ' + start_time] + # reservation_name should be of format nodes_maintenance. + reservation_map = get_slurm_reservation_maintenance() + log.debug(f"reservation-map: {reservation_map}") + + # Iterate upcoming maintenance and update reservation. + # Also remove reservation from the queue after update. + for row in upc_maint_vms: + vm_name = row[0] + reservation_name = vm_name + "_maintenance" + # Update start time format to be compatible with slurm reservation format. + starttime = datetime.strptime(row[1], "%Y-%m-%dT%H:%M:%S%z") + formatted_starttime = starttime.strftime("%Y-%m-%dT%H:%M:%S") + + update_slurm_reservation_maintenance(reservation_name, vm_name, formatted_starttime, reservation_map) + + # If reservation map is not empty, means we have reserved vms for maintenance + # and maintenance is already finished. Remove all these maintenance reservations. + if not reservation_map or len(reservation_map) == 0: + return + + delete_reservations(reservation_map) + + def main(): try: reconfigure_slurm() @@ -483,15 +618,22 @@ def main(): sync_slurm() except Exception: log.exception("failed to sync instances") + try: sync_placement_groups() except Exception: log.exception("failed to sync placement groups") + try: update_topology(lkp) except Exception: log.exception("failed to update topology") + try: + sync_maintenance_reservation(lkp) + except Exception: + log.exception("failed to sync slurm reservation for scheduled maintenance") + try: install_custom_scripts(check_hash=True) except Exception: