Skip to content

Commit

Permalink
Merge pull request #3005 from mr0re1/zonal_bi
Browse files Browse the repository at this point in the history
Default to zonal bulkInsert
  • Loading branch information
mr0re1 authored Sep 9, 2024
2 parents cb4ccd3 + bde1b24 commit f4b5be1
Showing 1 changed file with 21 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ def create_instances_request(nodes, partition_name, placement_group, job_id=None
model = next(iter(nodes))
nodeset = lookup().node_nodeset(model)
template = lookup().node_template(model)
region = lookup().node_region(model)
partition = lookup().cfg.partitions[partition_name]
log.debug(f"create_instances_request: {model} placement: {placement_group}")

Expand All @@ -159,18 +158,21 @@ def create_instances_request(nodes, partition_name, placement_group, job_id=None
# key is instance name, value overwrites properties
body.perInstanceProperties = {k: per_instance_properties(k) for k in nodes}

body.locationPolicy.locations = {
**{
f"zones/{zone}": {"preference": "ALLOW"}
for zone in nodeset.zone_policy_allow or []
},
**{
f"zones/{zone}": {"preference": "DENY"}
for zone in nodeset.zone_policy_deny or []
},
}
body.locationPolicy.targetShape = nodeset.zone_target_shape
zone_allow = nodeset.zone_policy_allow or []
zone_deny = nodeset.zone_policy_deny or []

if len(zone_allow) == 1: # if only one zone is used, use zonal BulkInsert API, as less prone to errors
api_method = lookup().compute.instances().bulkInsert
method_args = {"zone": zone_allow[0]}
else:
api_method = lookup().compute.regionInstances().bulkInsert
method_args = {"region": lookup().node_region(model)}

body.locationPolicy.locations = {
**{ f"zones/{z}": {"preference": "ALLOW"} for z in zone_allow },
**{ f"zones/{z}": {"preference": "DENY"} for z in zone_deny }}
body.locationPolicy.targetShape = nodeset.zone_target_shape

if lookup().cfg.enable_slurm_gcp_plugins:
slurm_gcp_plugins.pre_instance_bulk_insert(
lkp=lookup(),
Expand All @@ -179,16 +181,13 @@ def create_instances_request(nodes, partition_name, placement_group, job_id=None
request_body=body,
)

request = lookup().compute.regionInstances().bulkInsert(
project=lookup().project, region=region, body=body.to_dict()
)

if log.isEnabledFor(logging.DEBUG):
log.debug(
f"new request: endpoint={request.methodId} nodes={to_hostlist_fast(nodes)}"
)
log_api_request(request)
return request
req = api_method(
project=lookup().project,
body=body.to_dict(),
**method_args)
log.debug(f"new request: endpoint={req.methodId} nodes={to_hostlist_fast(nodes)}")
log_api_request(req)
return req


def group_nodes_bulk(nodes, resume_data=None):
Expand Down

0 comments on commit f4b5be1

Please sign in to comment.