From 6a650d5fc8ba4a0da12d617bc8b2a8377f1b796d Mon Sep 17 00:00:00 2001 From: Ivan Orlov Date: Wed, 4 Dec 2024 08:45:34 +0000 Subject: [PATCH] SlurmGCP. Resume. Don't group nodes by non-exclusive jobs --- .../modules/slurm_files/scripts/resume.py | 185 ++++++++---------- .../slurm_files/scripts/tests/common.py | 1 + .../slurm_files/scripts/tests/test_resume.py | 39 ++-- 3 files changed, 106 insertions(+), 119 deletions(-) diff --git a/community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/resume.py b/community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/resume.py index b6dc2ac14b..22be2fdab2 100755 --- a/community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/resume.py +++ b/community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/resume.py @@ -157,7 +157,7 @@ def per_instance_properties(node): return props -def create_instances_request(nodes, partition_name, placement_group, job_id=None): +def create_instances_request(nodes: List[str], placement_group: Optional[str], excl_job_id: Optional[int]): """Call regionInstances.bulkInsert to create instances""" assert 0 < len(nodes) <= BULK_INSERT_LIMIT @@ -165,7 +165,6 @@ def create_instances_request(nodes, partition_name, placement_group, job_id=None model = next(iter(nodes)) nodeset = lookup().node_nodeset(model) template = lookup().node_template(model) - partition = lookup().cfg.partitions[partition_name] log.debug(f"create_instances_request: {model} placement: {placement_group}") body = NSDict() @@ -181,14 +180,10 @@ def create_instances_request(nodes, partition_name, placement_group, job_id=None # source of instance properties body.sourceInstanceTemplate = template - labels = ( - dict(slurm_job_id=job_id) - if job_id is not None and partition.enable_job_exclusive - else None - ) + labels = {"slurm_job_id": excl_job_id} if excl_job_id else None # overwrites properties across all instances body.instanceProperties = instance_properties( - nodeset, model, placement_group, labels, job_id + nodeset, model, placement_group, labels, excl_job_id ) # key is instance name, value overwrites properties @@ -225,96 +220,74 @@ def create_instances_request(nodes, partition_name, placement_group, job_id=None log_api_request(req) return req +@dataclass() +class PlacementAndNodes: + placement: Optional[str] + nodes: List[str] @dataclass(frozen=True) class BulkChunk: nodes: List[str] - prefix: str + prefix: str # - chunk_idx: int - job_id: Optional[int] - partition: Optional[str] + excl_job_id: Optional[int] placement_group: Optional[str] = None + + @property + def name(self): + if self.placement_group is not None: + return f"{self.prefix}:job{self.excl_job_id}:{self.placement_group}:{self.chunk_idx}" + if self.excl_job_id is not None: + return f"{self.prefix}:job{self.excl_job_id}:{self.chunk_idx}" + return f"{self.prefix}:{self.chunk_idx}" def group_nodes_bulk(nodes: List[str], resume_data: Optional[ResumeData], lkp: util.Lookup): - """group nodes by job_id, placement_group, node_group, and max bulkInsert size""" + """group nodes by nodeset, placement_group, exclusive_job_id if any""" if resume_data is None: # all nodes will be considered jobless resume_data = ResumeData(jobs=[]) nodes = set(nodes) # turn into set to simplify intersection + non_excl = nodes.copy() + groups = {} # excl_job_id|none -> PlacementAndNodes - @dataclass(frozen=True) - class JobGroup: # aux struct - job_id: Optional[int] - partition: Optional[str] - placement_groups: Dict[str, List[str]] - - job_groups = {} - - # expand all job nodelists + # expand all exclusive job nodelists for job in resume_data.jobs: - nodes_resume = nodes & set(job.nodes_alloc) - if lkp.partition_is_tpu(job.partition): # don't create placement groups for TPU - pgs = {None: sorted(nodes_resume)} - else: - # create placement groups if nodes for job need it - pgs = create_placement_groups(job.nodes_alloc, job.job_id) + if not lkp.cfg.partitions[job.partition].enable_job_exclusive: + continue - # placement group assignment is based on all allocated nodes, but we only want to - # handle nodes in nodes_resume in this run. - for pg, pg_nodes in pgs.items(): - pgs[pg] = sorted(set(pg_nodes) & nodes_resume) - - job_groups[job.job_id] = JobGroup( - job_id=job.job_id, - partition=job.partition, - placement_groups=pgs, - ) + groups[job.job_id] = [] + # placement group assignment is based on all allocated nodes, ... + for pn in create_placement_groups(job.nodes_alloc, job.job_id, lkp): + groups[job.job_id].append( + PlacementAndNodes( + placement=pn.placement, + #... but we only want to handle nodes in nodes_resume in this run. + nodes = sorted(set(pn.nodes) & nodes) + )) + non_excl.difference_update(job.nodes_alloc) - all_jobless_nodes = nodes.difference( - chain.from_iterable(j.nodes_alloc for j in resume_data.jobs)) - jobless_nodes, jobless_nodes_tpu = util.separate(lkp.node_is_tpu, all_jobless_nodes) - - job_groups["Normal_None"] = JobGroup( - job_id=None, - placement_groups=create_placement_groups(sorted(jobless_nodes), job_id=0), - partition=None, - ) - job_groups["TPU_None"] = JobGroup( - job_id=None, - placement_groups={None: sorted(jobless_nodes_tpu)}, - partition=None, - ) + groups[None] = create_placement_groups(sorted(non_excl), job_id=0, lkp=lkp) def chunk_nodes(nodes: List[str]): chunk_size = BULK_INSERT_LIMIT if nodes and lkp.node_is_tpu(nodes[0]): chunk_size = util.TPU(lkp.node_nodeset(nodes[0])).vmcount return chunked(nodes, n=chunk_size) - - grouped_nodes = [ + + chunks = [ BulkChunk( nodes=nodes_chunk, - prefix=prefix, - job_id = job.job_id, - partition = job.partition, - placement_group=placement_group, + prefix=lkp.node_prefix(nodes_chunk[0]), # - + excl_job_id = job_id, + placement_group=pn.placement, chunk_idx=i) - for job in job_groups.values() - for placement_group, pg_nodes in job.placement_groups.items() - for prefix, nodes in util.groupby_unsorted(pg_nodes, lkp.node_prefix) - for i, nodes_chunk in enumerate(chunk_nodes(list(nodes))) + for job_id, placements in groups.items() + for pn in placements if pn.nodes + for i, nodes_chunk in enumerate(chunk_nodes(pn.nodes)) ] - - def group_name(chunk: BulkChunk): - if chunk.placement_group is not None: - return f"{chunk.prefix}:job{chunk.job_id}:{chunk.placement_group}:{chunk.chunk_idx}" - if chunk.job_id is not None: - return f"{chunk.prefix}:job{chunk.job_id}:{chunk.chunk_idx}" - return f"{chunk.prefix}:{chunk.chunk_idx}" - - return {group_name(chunk): chunk for chunk in grouped_nodes} + return {chunk.name: chunk for chunk in chunks} def start_tpu(data): @@ -367,15 +340,15 @@ def resume_nodes(nodes: List[str], resume_data: Optional[ResumeData]): bi_inserts = {} for group, chunk in grouped_nodes.items(): - if chunk.partition and lookup().partition_is_tpu(chunk.partition): + model = chunk.nodes[0] + if lookup().node_is_tpu(model): # do not create multiple tpu_objs if nodes with the same prefix are used if chunk.prefix not in tpu_objs.keys(): - model = chunk.nodes[0] tpu_objs[chunk.prefix] = util.TPU(lookup().node_nodeset(model)) tpu_start_data.append({"tpu": tpu_objs[chunk.prefix], "node": chunk.nodes}) else: bi_inserts[group] = create_instances_request( - chunk.nodes, chunk.partition, chunk.placement_group, chunk.job_id + chunk.nodes, chunk.placement_group, chunk.excl_job_id ) # execute all bulkInsert requests with batch @@ -499,41 +472,44 @@ def create_placement_request(pg_name, region): return request -def create_placement_groups(node_list: List[str], job_id:int) -> Dict[str, List[str]]: - pgs = {} - node_map = lookup().nodeset_map(node_list) - for _, nodes in node_map.items(): - pgs.update(create_nodeset_placement_groups(nodes, job_id)) - return pgs +def create_placement_groups(nodes: List[str], job_id:int, lkp: util.Lookup) -> List[PlacementAndNodes]: + res = [] + for _, ns_nodes in lkp.nodeset_map(nodes).items(): + res.extend(create_nodeset_placement_groups(ns_nodes, job_id, lkp)) + return res -def create_nodeset_placement_groups(node_list: List[str], job_id:int) -> Dict[str, List[str]]: - no_pg = {None: node_list} # canned result for no placement policies created +def create_nodeset_placement_groups(nodes: List[str], job_id:int, lkp: util.Lookup) -> List[PlacementAndNodes]: + # canned result for no placement policies created + no_pp = [PlacementAndNodes(placement=None, nodes=nodes)] - if len(node_list) < 2: - return no_pg # don't create placement_policy for just one node + if len(nodes) < 2: + return no_pp # don't create placement_policy for just one node - model = next(iter(node_list)) - nodeset = lookup().node_nodeset(model) - if not (nodeset.enable_placement and valid_placement_nodes(node_list)): - return no_pg + model = nodes[0] + nodeset = lkp.node_nodeset(model) + if not (nodeset.enable_placement and valid_placement_node(model)): + return no_pp + if lkp.node_is_tpu(model): + return no_pp - region = lookup().node_region(model) + region = lkp.node_region(model) - groups = { - f"{lookup().cfg.slurm_cluster_name}-slurmgcp-managed-{nodeset.nodeset_name}-{job_id}-{i}": nodes - for i, nodes in enumerate(chunked(node_list, n=PLACEMENT_MAX_CNT)) - } + groups = [ + PlacementAndNodes( + placement=f"{lkp.cfg.slurm_cluster_name}-slurmgcp-managed-{nodeset.nodeset_name}-{job_id}-{i}", + nodes=chunk + ) + for i, chunk in enumerate(chunked(nodes, n=PLACEMENT_MAX_CNT)) + ] if log.isEnabledFor(logging.DEBUG): - debug_groups = { - group: to_hostlist_fast(nodes) for group, nodes in groups.items() - } + debug_groups = {g.placement: to_hostlist_fast(g.nodes) for g in groups} log.debug( f"creating {len(groups)} placement groups: \n{yaml.safe_dump(debug_groups).rstrip()}" ) requests = { - group: create_placement_request(group, region) for group in groups.keys() + g.placement: create_placement_request(g.placement, region) for g in groups } ops = dict( zip(requests.keys(), map_with_futures(ensure_execute, requests.values())) @@ -575,16 +551,15 @@ def classify_result(item): return groups -def valid_placement_nodes(nodelist): +def valid_placement_node(node: str) -> bool: invalid_types = frozenset(["e2", "t2d", "n1", "t2a", "m1", "m2", "m3"]) - for node in nodelist: - mt = lookup().node_template_info(node).machineType - if mt.split("-")[0] in invalid_types: - log.warn(f"Unsupported machine type for placement policy: {mt}.") - log.warn( - f"Please do not use any the following machine types with placement policy: ({','.join(invalid_types)})" - ) - return False + mt = lookup().node_template_info(node).machineType + if mt.split("-")[0] in invalid_types: + log.warn(f"Unsupported machine type for placement policy: {mt}.") + log.warn( + f"Please do not use any the following machine types with placement policy: ({','.join(invalid_types)})" + ) + return False return True diff --git a/community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/tests/common.py b/community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/tests/common.py index bfe7f5cc9c..2b4e34187a 100644 --- a/community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/tests/common.py +++ b/community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/tests/common.py @@ -42,6 +42,7 @@ class TstPartition: partition_name: str = "euler" partition_nodeset: list[str] = field(default_factory=list) partition_nodeset_tpu: list[str] = field(default_factory=list) + enable_job_exclusive: bool = False @dataclass class TstCfg: diff --git a/community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/tests/test_resume.py b/community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/tests/test_resume.py index 7d9dfe4ac1..08e755c7c6 100644 --- a/community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/tests/test_resume.py +++ b/community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/tests/test_resume.py @@ -19,7 +19,7 @@ from common import TstCfg, TstNodeset, TstPartition, TstTPU # needed to import util import util -from resume import get_resume_file_data, ResumeData, ResumeJobData, group_nodes_bulk, BulkChunk +from resume import get_resume_file_data, ResumeData, ResumeJobData, group_nodes_bulk, BulkChunk, PlacementAndNodes def test_get_resume_file_data_no_env(): with unittest.mock.patch.dict(os.environ, {"SLURM_RESUME_FILE": ""}): @@ -70,24 +70,35 @@ def test_group_nodes_bulk(mock_create_placement_groups, mock_tpu): "t": TstNodeset(nodeset_name="t"), }, partitions={ - "p1": TstPartition(partition_name="p1"), + "p1": TstPartition( + partition_name="p1", + enable_job_exclusive=True, + ), "p2": TstPartition( partition_name="p2", partition_nodeset_tpu=["t"], + enable_job_exclusive=True, ) } ) lkp = util.Lookup(cfg) - def mock_create_placement_groups_se(nodes, job_id): + def mock_create_placement_groups_se(nodes, job_id, lkp): args = (set(nodes), job_id) - if ({"c-n-1", "c-n-2"}, 0) == args: - return { "g0": ["c-n-1", "c-n-2"] } + if ({'c-n-1', 'c-n-2', 'c-t-8', 'c-t-9'}, 0) == args: + return [ + PlacementAndNodes("g0", ["c-n-1", "c-n-2"]), + PlacementAndNodes(None, ['c-t-8', 'c-t-9']), + ] if ({"c-n-0", "c-n-8"}, 1) == args: - return { - "g10": ["c-n-0"], - "g11": ["c-n-8"], - } + return [ + PlacementAndNodes("g10", ["c-n-0"]), + PlacementAndNodes("g11", ["c-n-8"]), + ] + if ({'c-t-0', 'c-t-1', 'c-t-2', 'c-t-3', 'c-t-4', 'c-t-5'}, 2) == args: + return [ + PlacementAndNodes(None, ['c-t-0', 'c-t-1', 'c-t-2', 'c-t-3', 'c-t-4', 'c-t-5']) + ] raise AssertionError(f"unexpected invocation: '{args}'") mock_create_placement_groups.side_effect = mock_create_placement_groups_se @@ -106,13 +117,13 @@ def mock_tpu_se(ns: TstNodeset) -> TstTPU: mock_create_placement_groups.assert_called() assert got == { "c-n:jobNone:g0:0": BulkChunk( - nodes=["c-n-1", "c-n-2"], prefix="c-n", chunk_idx=0, job_id=None, partition=None, placement_group="g0"), + nodes=["c-n-1", "c-n-2"], prefix="c-n", chunk_idx=0, excl_job_id=None, placement_group="g0"), "c-n:job1:g10:0": BulkChunk( - nodes=["c-n-0"], prefix="c-n", chunk_idx=0, job_id=1, partition="p1", placement_group="g10"), + nodes=["c-n-0"], prefix="c-n", chunk_idx=0, excl_job_id=1, placement_group="g10"), "c-t:0": BulkChunk( - nodes=["c-t-8", "c-t-9"], prefix="c-t", chunk_idx=0, job_id=None, partition=None, placement_group=None), + nodes=["c-t-8", "c-t-9"], prefix="c-t", chunk_idx=0, excl_job_id=None, placement_group=None), "c-t:job2:0": BulkChunk( - nodes=["c-t-0", "c-t-1"], prefix="c-t", chunk_idx=0, job_id=2, partition="p2", placement_group=None), + nodes=["c-t-0", "c-t-1"], prefix="c-t", chunk_idx=0, excl_job_id=2, placement_group=None), "c-t:job2:1": BulkChunk( - nodes=["c-t-2", "c-t-3"], prefix="c-t", chunk_idx=1, job_id=2, partition="p2", placement_group=None), + nodes=["c-t-2", "c-t-3"], prefix="c-t", chunk_idx=1, excl_job_id=2, placement_group=None), }