Skip to content

Commit

Permalink
DRAFT
Browse files Browse the repository at this point in the history
  • Loading branch information
mr0re1 committed Dec 4, 2024
1 parent c14e964 commit 71712aa
Showing 1 changed file with 77 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,14 @@ def per_instance_properties(node):

return props

def create_instances_request(nodes, partition_name, placement_group, job_id=None):
def create_instances_request(nodes: List[str], placement_group: Optional[str], excl_job_id: Optional[int]):
"""Call regionInstances.bulkInsert to create instances"""
assert 0 < len(nodes) <= BULK_INSERT_LIMIT

# model here indicates any node that can be used to describe the rest
model = next(iter(nodes))
nodeset = lookup().node_nodeset(model)
template = lookup().node_template(model)
partition = lookup().cfg.partitions[partition_name]
log.debug(f"create_instances_request: {model} placement: {placement_group}")

body = NSDict()
Expand All @@ -181,14 +180,10 @@ def create_instances_request(nodes, partition_name, placement_group, job_id=None
# source of instance properties
body.sourceInstanceTemplate = template

labels = (
dict(slurm_job_id=job_id)
if job_id is not None and partition.enable_job_exclusive
else None
)
labels = {"slurm_job_id": excl_job_id} if excl_job_id else None
# overwrites properties across all instances
body.instanceProperties = instance_properties(
nodeset, model, placement_group, labels, job_id
nodeset, model, placement_group, labels, excl_job_id
)

# key is instance name, value overwrites properties
Expand Down Expand Up @@ -225,96 +220,73 @@ def create_instances_request(nodes, partition_name, placement_group, job_id=None
log_api_request(req)
return req

@dataclass()
class PlacementAndNodes:
placement: Optional[str]
nodes: List[str]

@dataclass(frozen=True)
class BulkChunk:
nodes: List[str]
prefix: str
prefix: str # <cluster_name>-<nodeset_name>
chunk_idx: int
job_id: Optional[int]
excl_job_id: Optional[int]
partition: Optional[str]
placement_group: Optional[str] = None

@property
def name(self):
if self.placement_group is not None:
return f"{self.prefix}:job{self.excl_job_id}:{self.placement_group}:{self.chunk_idx}"
if self.excl_job_id is not None:
return f"{self.prefix}:job{self.excl_job_id}:{self.chunk_idx}"
return f"{self.prefix}:{self.chunk_idx}"


def group_nodes_bulk(nodes: List[str], resume_data: Optional[ResumeData], lkp: util.Lookup):
"""group nodes by job_id, placement_group, node_group, and max bulkInsert size"""
"""group nodes by nodeset, placement_group, exclusive_job_id if any"""
if resume_data is None: # all nodes will be considered jobless
resume_data = ResumeData(jobs=[])

nodes = set(nodes) # turn into set to simplify intersection
non_excl = nodes.copy()
groups = {} # excl_job_id|none -> PlacementAndNodes

@dataclass(frozen=True)
class JobGroup: # aux struct
job_id: Optional[int]
partition: Optional[str]
placement_groups: Dict[str, List[str]]

job_groups = {}

# expand all job nodelists
# expand all exclusive job nodelists
for job in resume_data.jobs:
nodes_resume = nodes & set(job.nodes_alloc)
if lkp.partition_is_tpu(job.partition): # don't create placement groups for TPU
pgs = {None: sorted(nodes_resume)}
else:
# create placement groups if nodes for job need it
pgs = create_placement_groups(job.nodes_alloc, job.job_id)
if not lkp.cfg.partitions[job.partition].enable_job_exclusive:
continue
non_excl.difference_update(job.nodes_alloc)

# placement group assignment is based on all allocated nodes, but we only want to
# handle nodes in nodes_resume in this run.
for pg, pg_nodes in pgs.items():
pgs[pg] = sorted(set(pg_nodes) & nodes_resume)

job_groups[job.job_id] = JobGroup(
job_id=job.job_id,
partition=job.partition,
placement_groups=pgs,
)
pns = create_placement_groups(job.nodes_alloc, job.job_id)
# placement group assignment is based on all allocated nodes, but we only want to
# handle nodes in nodes_resume in this run.
for pn in pns:
pn.nodes = sorted(set(pn.nodes) & nodes)

all_jobless_nodes = nodes.difference(
chain.from_iterable(j.nodes_alloc for j in resume_data.jobs))
jobless_nodes, jobless_nodes_tpu = util.separate(lkp.node_is_tpu, all_jobless_nodes)

job_groups["Normal_None"] = JobGroup(
job_id=None,
placement_groups=create_placement_groups(sorted(jobless_nodes), job_id=0),
partition=None,
)
job_groups["TPU_None"] = JobGroup(
job_id=None,
placement_groups={None: sorted(jobless_nodes_tpu)},
partition=None,
)
groups[job.job_id] = pns

groups[None] = create_placement_groups(sorted(non_excl), job_id=0),

def chunk_nodes(nodes: List[str]):
chunk_size = BULK_INSERT_LIMIT
if nodes and lkp.node_is_tpu(nodes[0]):
chunk_size = util.TPU(lkp.node_nodeset(nodes[0])).vmcount
return chunked(nodes, n=chunk_size)

grouped_nodes = [
chunks = [
BulkChunk(
nodes=nodes_chunk,
prefix=prefix,
job_id = job.job_id,
partition = job.partition,
placement_group=placement_group,
prefix=lkp.node_prefix(nodes_chunk[0]), # <cluster_name>-<nodeset_name>
excl_job_id = job_id,
placement_group=pn.placement,
chunk_idx=i)

for job in job_groups.values()
for placement_group, pg_nodes in job.placement_groups.items()
for prefix, nodes in util.groupby_unsorted(pg_nodes, lkp.node_prefix)
for i, nodes_chunk in enumerate(chunk_nodes(list(nodes)))
for job_id, placements in groups.items()
for pn in placements
for i, nodes_chunk in enumerate(chunk_nodes(pn.nodes)) if nodes_chunk
]

def group_name(chunk: BulkChunk):
if chunk.placement_group is not None:
return f"{chunk.prefix}:job{chunk.job_id}:{chunk.placement_group}:{chunk.chunk_idx}"
if chunk.job_id is not None:
return f"{chunk.prefix}:job{chunk.job_id}:{chunk.chunk_idx}"
return f"{chunk.prefix}:{chunk.chunk_idx}"

return {group_name(chunk): chunk for chunk in grouped_nodes}
return {chunk.name: chunk for chunk in chunks}


def start_tpu(data):
Expand Down Expand Up @@ -367,15 +339,15 @@ def resume_nodes(nodes: List[str], resume_data: Optional[ResumeData]):
bi_inserts = {}

for group, chunk in grouped_nodes.items():
if chunk.partition and lookup().partition_is_tpu(chunk.partition):
model = chunk.nodes[0]
if lookup().node_is_tpu(model):
# do not create multiple tpu_objs if nodes with the same prefix are used
if chunk.prefix not in tpu_objs.keys():
model = chunk.nodes[0]
tpu_objs[chunk.prefix] = util.TPU(lookup().node_nodeset(model))
tpu_start_data.append({"tpu": tpu_objs[chunk.prefix], "node": chunk.nodes})
else:
bi_inserts[group] = create_instances_request(
chunk.nodes, chunk.partition, chunk.placement_group, chunk.job_id
chunk.nodes, chunk.placement_group, chunk.excl_job_id
)

# execute all bulkInsert requests with batch
Expand Down Expand Up @@ -499,41 +471,44 @@ def create_placement_request(pg_name, region):
return request


def create_placement_groups(node_list: List[str], job_id:int) -> Dict[str, List[str]]:
pgs = {}
node_map = lookup().nodeset_map(node_list)
for _, nodes in node_map.items():
pgs.update(create_nodeset_placement_groups(nodes, job_id))
return pgs
def create_placement_groups(nodes: List[str], job_id:int, lkp: util.Lookup) -> List[PlacementAndNodes]:
res = []
for _, ns_nodes in lkp.nodeset_map(nodes):
res.extend(create_nodeset_placement_groups(ns_nodes, job_id, lkp))
return res


def create_nodeset_placement_groups(node_list: List[str], job_id:int) -> Dict[str, List[str]]:
no_pg = {None: node_list} # canned result for no placement policies created
def create_nodeset_placement_groups(nodes: List[str], job_id:int, lkp: util.Lookup) -> List[PlacementAndNodes]:
# canned result for no placement policies created
no_pp = [PlacementAndNodes(placement=None, nodes=nodes)]

if len(node_list) < 2:
return no_pg # don't create placement_policy for just one node
if len(nodes) < 2:
return no_pp # don't create placement_policy for just one node

model = next(iter(node_list))
nodeset = lookup().node_nodeset(model)
if not (nodeset.enable_placement and valid_placement_nodes(node_list)):
return no_pg
model = nodes[0]
nodeset = lkp.node_nodeset(model)
if not (nodeset.enable_placement and valid_placement_node(model)):
return no_pp
if lkp.node_is_tpu(model):
return no_pp

region = lookup().node_region(model)
region = lkp.node_region(model)

groups = {
f"{lookup().cfg.slurm_cluster_name}-slurmgcp-managed-{nodeset.nodeset_name}-{job_id}-{i}": nodes
for i, nodes in enumerate(chunked(node_list, n=PLACEMENT_MAX_CNT))
}
groups = [
PlacementAndNodes(
placement=f"{lkp.cfg.slurm_cluster_name}-slurmgcp-managed-{nodeset.nodeset_name}-{job_id}-{i}",
nodes=chunk
)
for i, chunk in enumerate(chunked(nodes, n=PLACEMENT_MAX_CNT))
]

if log.isEnabledFor(logging.DEBUG):
debug_groups = {
group: to_hostlist_fast(nodes) for group, nodes in groups.items()
}
debug_groups = {g.placement: to_hostlist_fast(g.nodes) for g in groups}
log.debug(
f"creating {len(groups)} placement groups: \n{yaml.safe_dump(debug_groups).rstrip()}"
)
requests = {
group: create_placement_request(group, region) for group in groups.keys()
g.placement: create_placement_request(g.placement, region) for g in groups
}
ops = dict(
zip(requests.keys(), map_with_futures(ensure_execute, requests.values()))
Expand Down Expand Up @@ -575,16 +550,15 @@ def classify_result(item):
return groups


def valid_placement_nodes(nodelist):
def valid_placement_node(node: str) -> bool:
invalid_types = frozenset(["e2", "t2d", "n1", "t2a", "m1", "m2", "m3"])
for node in nodelist:
mt = lookup().node_template_info(node).machineType
if mt.split("-")[0] in invalid_types:
log.warn(f"Unsupported machine type for placement policy: {mt}.")
log.warn(
f"Please do not use any the following machine types with placement policy: ({','.join(invalid_types)})"
)
return False
mt = lookup().node_template_info(node).machineType
if mt.split("-")[0] in invalid_types:
log.warn(f"Unsupported machine type for placement policy: {mt}.")
log.warn(
f"Please do not use any the following machine types with placement policy: ({','.join(invalid_types)})"
)
return False
return True


Expand Down

0 comments on commit 71712aa

Please sign in to comment.