Skip to content

Commit

Permalink
Merge pull request #3340 from mr0re1/job_array
Browse files Browse the repository at this point in the history
SlurmGCP. Improve `resume.py` performance for job arrays.
  • Loading branch information
mr0re1 authored Dec 5, 2024
2 parents 97e662d + 6a650d5 commit 5f1092e
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,14 @@ def per_instance_properties(node):

return props

def create_instances_request(nodes, partition_name, placement_group, job_id=None):
def create_instances_request(nodes: List[str], placement_group: Optional[str], excl_job_id: Optional[int]):
"""Call regionInstances.bulkInsert to create instances"""
assert 0 < len(nodes) <= BULK_INSERT_LIMIT

# model here indicates any node that can be used to describe the rest
model = next(iter(nodes))
nodeset = lookup().node_nodeset(model)
template = lookup().node_template(model)
partition = lookup().cfg.partitions[partition_name]
log.debug(f"create_instances_request: {model} placement: {placement_group}")

body = NSDict()
Expand All @@ -181,14 +180,10 @@ def create_instances_request(nodes, partition_name, placement_group, job_id=None
# source of instance properties
body.sourceInstanceTemplate = template

labels = (
dict(slurm_job_id=job_id)
if job_id is not None and partition.enable_job_exclusive
else None
)
labels = {"slurm_job_id": excl_job_id} if excl_job_id else None
# overwrites properties across all instances
body.instanceProperties = instance_properties(
nodeset, model, placement_group, labels, job_id
nodeset, model, placement_group, labels, excl_job_id
)

# key is instance name, value overwrites properties
Expand Down Expand Up @@ -225,96 +220,74 @@ def create_instances_request(nodes, partition_name, placement_group, job_id=None
log_api_request(req)
return req

@dataclass()
class PlacementAndNodes:
placement: Optional[str]
nodes: List[str]

@dataclass(frozen=True)
class BulkChunk:
nodes: List[str]
prefix: str
prefix: str # <cluster_name>-<nodeset_name>
chunk_idx: int
job_id: Optional[int]
partition: Optional[str]
excl_job_id: Optional[int]
placement_group: Optional[str] = None

@property
def name(self):
if self.placement_group is not None:
return f"{self.prefix}:job{self.excl_job_id}:{self.placement_group}:{self.chunk_idx}"
if self.excl_job_id is not None:
return f"{self.prefix}:job{self.excl_job_id}:{self.chunk_idx}"
return f"{self.prefix}:{self.chunk_idx}"


def group_nodes_bulk(nodes: List[str], resume_data: Optional[ResumeData], lkp: util.Lookup):
"""group nodes by job_id, placement_group, node_group, and max bulkInsert size"""
"""group nodes by nodeset, placement_group, exclusive_job_id if any"""
if resume_data is None: # all nodes will be considered jobless
resume_data = ResumeData(jobs=[])

nodes = set(nodes) # turn into set to simplify intersection
non_excl = nodes.copy()
groups = {} # excl_job_id|none -> PlacementAndNodes

@dataclass(frozen=True)
class JobGroup: # aux struct
job_id: Optional[int]
partition: Optional[str]
placement_groups: Dict[str, List[str]]

job_groups = {}

# expand all job nodelists
# expand all exclusive job nodelists
for job in resume_data.jobs:
nodes_resume = nodes & set(job.nodes_alloc)
if lkp.partition_is_tpu(job.partition): # don't create placement groups for TPU
pgs = {None: sorted(nodes_resume)}
else:
# create placement groups if nodes for job need it
pgs = create_placement_groups(job.nodes_alloc, job.job_id)
if not lkp.cfg.partitions[job.partition].enable_job_exclusive:
continue

# placement group assignment is based on all allocated nodes, but we only want to
# handle nodes in nodes_resume in this run.
for pg, pg_nodes in pgs.items():
pgs[pg] = sorted(set(pg_nodes) & nodes_resume)

job_groups[job.job_id] = JobGroup(
job_id=job.job_id,
partition=job.partition,
placement_groups=pgs,
)
groups[job.job_id] = []
# placement group assignment is based on all allocated nodes, ...
for pn in create_placement_groups(job.nodes_alloc, job.job_id, lkp):
groups[job.job_id].append(
PlacementAndNodes(
placement=pn.placement,
#... but we only want to handle nodes in nodes_resume in this run.
nodes = sorted(set(pn.nodes) & nodes)
))
non_excl.difference_update(job.nodes_alloc)

all_jobless_nodes = nodes.difference(
chain.from_iterable(j.nodes_alloc for j in resume_data.jobs))
jobless_nodes, jobless_nodes_tpu = util.separate(lkp.node_is_tpu, all_jobless_nodes)

job_groups["Normal_None"] = JobGroup(
job_id=None,
placement_groups=create_placement_groups(sorted(jobless_nodes), job_id=0),
partition=None,
)
job_groups["TPU_None"] = JobGroup(
job_id=None,
placement_groups={None: sorted(jobless_nodes_tpu)},
partition=None,
)
groups[None] = create_placement_groups(sorted(non_excl), job_id=0, lkp=lkp)

def chunk_nodes(nodes: List[str]):
chunk_size = BULK_INSERT_LIMIT
if nodes and lkp.node_is_tpu(nodes[0]):
chunk_size = util.TPU(lkp.node_nodeset(nodes[0])).vmcount
return chunked(nodes, n=chunk_size)

grouped_nodes = [
chunks = [
BulkChunk(
nodes=nodes_chunk,
prefix=prefix,
job_id = job.job_id,
partition = job.partition,
placement_group=placement_group,
prefix=lkp.node_prefix(nodes_chunk[0]), # <cluster_name>-<nodeset_name>
excl_job_id = job_id,
placement_group=pn.placement,
chunk_idx=i)

for job in job_groups.values()
for placement_group, pg_nodes in job.placement_groups.items()
for prefix, nodes in util.groupby_unsorted(pg_nodes, lkp.node_prefix)
for i, nodes_chunk in enumerate(chunk_nodes(list(nodes)))
for job_id, placements in groups.items()
for pn in placements if pn.nodes
for i, nodes_chunk in enumerate(chunk_nodes(pn.nodes))
]

def group_name(chunk: BulkChunk):
if chunk.placement_group is not None:
return f"{chunk.prefix}:job{chunk.job_id}:{chunk.placement_group}:{chunk.chunk_idx}"
if chunk.job_id is not None:
return f"{chunk.prefix}:job{chunk.job_id}:{chunk.chunk_idx}"
return f"{chunk.prefix}:{chunk.chunk_idx}"

return {group_name(chunk): chunk for chunk in grouped_nodes}
return {chunk.name: chunk for chunk in chunks}


def start_tpu(data):
Expand Down Expand Up @@ -367,15 +340,15 @@ def resume_nodes(nodes: List[str], resume_data: Optional[ResumeData]):
bi_inserts = {}

for group, chunk in grouped_nodes.items():
if chunk.partition and lookup().partition_is_tpu(chunk.partition):
model = chunk.nodes[0]
if lookup().node_is_tpu(model):
# do not create multiple tpu_objs if nodes with the same prefix are used
if chunk.prefix not in tpu_objs.keys():
model = chunk.nodes[0]
tpu_objs[chunk.prefix] = util.TPU(lookup().node_nodeset(model))
tpu_start_data.append({"tpu": tpu_objs[chunk.prefix], "node": chunk.nodes})
else:
bi_inserts[group] = create_instances_request(
chunk.nodes, chunk.partition, chunk.placement_group, chunk.job_id
chunk.nodes, chunk.placement_group, chunk.excl_job_id
)

# execute all bulkInsert requests with batch
Expand Down Expand Up @@ -499,41 +472,44 @@ def create_placement_request(pg_name, region):
return request


def create_placement_groups(node_list: List[str], job_id:int) -> Dict[str, List[str]]:
pgs = {}
node_map = lookup().nodeset_map(node_list)
for _, nodes in node_map.items():
pgs.update(create_nodeset_placement_groups(nodes, job_id))
return pgs
def create_placement_groups(nodes: List[str], job_id:int, lkp: util.Lookup) -> List[PlacementAndNodes]:
res = []
for _, ns_nodes in lkp.nodeset_map(nodes).items():
res.extend(create_nodeset_placement_groups(ns_nodes, job_id, lkp))
return res


def create_nodeset_placement_groups(node_list: List[str], job_id:int) -> Dict[str, List[str]]:
no_pg = {None: node_list} # canned result for no placement policies created
def create_nodeset_placement_groups(nodes: List[str], job_id:int, lkp: util.Lookup) -> List[PlacementAndNodes]:
# canned result for no placement policies created
no_pp = [PlacementAndNodes(placement=None, nodes=nodes)]

if len(node_list) < 2:
return no_pg # don't create placement_policy for just one node
if len(nodes) < 2:
return no_pp # don't create placement_policy for just one node

model = next(iter(node_list))
nodeset = lookup().node_nodeset(model)
if not (nodeset.enable_placement and valid_placement_nodes(node_list)):
return no_pg
model = nodes[0]
nodeset = lkp.node_nodeset(model)
if not (nodeset.enable_placement and valid_placement_node(model)):
return no_pp
if lkp.node_is_tpu(model):
return no_pp

region = lookup().node_region(model)
region = lkp.node_region(model)

groups = {
f"{lookup().cfg.slurm_cluster_name}-slurmgcp-managed-{nodeset.nodeset_name}-{job_id}-{i}": nodes
for i, nodes in enumerate(chunked(node_list, n=PLACEMENT_MAX_CNT))
}
groups = [
PlacementAndNodes(
placement=f"{lkp.cfg.slurm_cluster_name}-slurmgcp-managed-{nodeset.nodeset_name}-{job_id}-{i}",
nodes=chunk
)
for i, chunk in enumerate(chunked(nodes, n=PLACEMENT_MAX_CNT))
]

if log.isEnabledFor(logging.DEBUG):
debug_groups = {
group: to_hostlist_fast(nodes) for group, nodes in groups.items()
}
debug_groups = {g.placement: to_hostlist_fast(g.nodes) for g in groups}
log.debug(
f"creating {len(groups)} placement groups: \n{yaml.safe_dump(debug_groups).rstrip()}"
)
requests = {
group: create_placement_request(group, region) for group in groups.keys()
g.placement: create_placement_request(g.placement, region) for g in groups
}
ops = dict(
zip(requests.keys(), map_with_futures(ensure_execute, requests.values()))
Expand Down Expand Up @@ -575,16 +551,15 @@ def classify_result(item):
return groups


def valid_placement_nodes(nodelist):
def valid_placement_node(node: str) -> bool:
invalid_types = frozenset(["e2", "t2d", "n1", "t2a", "m1", "m2", "m3"])
for node in nodelist:
mt = lookup().node_template_info(node).machineType
if mt.split("-")[0] in invalid_types:
log.warn(f"Unsupported machine type for placement policy: {mt}.")
log.warn(
f"Please do not use any the following machine types with placement policy: ({','.join(invalid_types)})"
)
return False
mt = lookup().node_template_info(node).machineType
if mt.split("-")[0] in invalid_types:
log.warn(f"Unsupported machine type for placement policy: {mt}.")
log.warn(
f"Please do not use any the following machine types with placement policy: ({','.join(invalid_types)})"
)
return False
return True


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class TstPartition:
partition_name: str = "euler"
partition_nodeset: list[str] = field(default_factory=list)
partition_nodeset_tpu: list[str] = field(default_factory=list)
enable_job_exclusive: bool = False

@dataclass
class TstCfg:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from common import TstCfg, TstNodeset, TstPartition, TstTPU # needed to import util
import util
from resume import get_resume_file_data, ResumeData, ResumeJobData, group_nodes_bulk, BulkChunk
from resume import get_resume_file_data, ResumeData, ResumeJobData, group_nodes_bulk, BulkChunk, PlacementAndNodes

def test_get_resume_file_data_no_env():
with unittest.mock.patch.dict(os.environ, {"SLURM_RESUME_FILE": ""}):
Expand Down Expand Up @@ -70,24 +70,35 @@ def test_group_nodes_bulk(mock_create_placement_groups, mock_tpu):
"t": TstNodeset(nodeset_name="t"),
},
partitions={
"p1": TstPartition(partition_name="p1"),
"p1": TstPartition(
partition_name="p1",
enable_job_exclusive=True,
),
"p2": TstPartition(
partition_name="p2",
partition_nodeset_tpu=["t"],
enable_job_exclusive=True,
)
}
)
lkp = util.Lookup(cfg)

def mock_create_placement_groups_se(nodes, job_id):
def mock_create_placement_groups_se(nodes, job_id, lkp):
args = (set(nodes), job_id)
if ({"c-n-1", "c-n-2"}, 0) == args:
return { "g0": ["c-n-1", "c-n-2"] }
if ({'c-n-1', 'c-n-2', 'c-t-8', 'c-t-9'}, 0) == args:
return [
PlacementAndNodes("g0", ["c-n-1", "c-n-2"]),
PlacementAndNodes(None, ['c-t-8', 'c-t-9']),
]
if ({"c-n-0", "c-n-8"}, 1) == args:
return {
"g10": ["c-n-0"],
"g11": ["c-n-8"],
}
return [
PlacementAndNodes("g10", ["c-n-0"]),
PlacementAndNodes("g11", ["c-n-8"]),
]
if ({'c-t-0', 'c-t-1', 'c-t-2', 'c-t-3', 'c-t-4', 'c-t-5'}, 2) == args:
return [
PlacementAndNodes(None, ['c-t-0', 'c-t-1', 'c-t-2', 'c-t-3', 'c-t-4', 'c-t-5'])
]
raise AssertionError(f"unexpected invocation: '{args}'")
mock_create_placement_groups.side_effect = mock_create_placement_groups_se

Expand All @@ -106,13 +117,13 @@ def mock_tpu_se(ns: TstNodeset) -> TstTPU:
mock_create_placement_groups.assert_called()
assert got == {
"c-n:jobNone:g0:0": BulkChunk(
nodes=["c-n-1", "c-n-2"], prefix="c-n", chunk_idx=0, job_id=None, partition=None, placement_group="g0"),
nodes=["c-n-1", "c-n-2"], prefix="c-n", chunk_idx=0, excl_job_id=None, placement_group="g0"),
"c-n:job1:g10:0": BulkChunk(
nodes=["c-n-0"], prefix="c-n", chunk_idx=0, job_id=1, partition="p1", placement_group="g10"),
nodes=["c-n-0"], prefix="c-n", chunk_idx=0, excl_job_id=1, placement_group="g10"),
"c-t:0": BulkChunk(
nodes=["c-t-8", "c-t-9"], prefix="c-t", chunk_idx=0, job_id=None, partition=None, placement_group=None),
nodes=["c-t-8", "c-t-9"], prefix="c-t", chunk_idx=0, excl_job_id=None, placement_group=None),
"c-t:job2:0": BulkChunk(
nodes=["c-t-0", "c-t-1"], prefix="c-t", chunk_idx=0, job_id=2, partition="p2", placement_group=None),
nodes=["c-t-0", "c-t-1"], prefix="c-t", chunk_idx=0, excl_job_id=2, placement_group=None),
"c-t:job2:1": BulkChunk(
nodes=["c-t-2", "c-t-3"], prefix="c-t", chunk_idx=1, job_id=2, partition="p2", placement_group=None),
nodes=["c-t-2", "c-t-3"], prefix="c-t", chunk_idx=1, excl_job_id=2, placement_group=None),
}

0 comments on commit 5f1092e

Please sign in to comment.