Skip to content

Commit

Permalink
prepare run on slurm
Browse files Browse the repository at this point in the history
  • Loading branch information
floriscalkoen committed Jul 31, 2024
1 parent 5869547 commit 989f57f
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 14 deletions.
4 changes: 0 additions & 4 deletions scripts/python/make_gcts.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,6 @@ def sort_line_segments(segments, original_line):
instance_type = configure_instance()
client = DaskClientManager().create_client(
instance_type,
threads_per_worker=1,
processes=True,
n_workers=5,
local_directory="/tmp",
)
client.run(silence_shapely_warnings)
logging.info(f"Client dashboard link: {client.dashboard_link}")
Expand Down
70 changes: 60 additions & 10 deletions src/coastpy/utils/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,21 @@ def _create_local_client(self, *args: Any, **kwargs: Any):
Returns:
Client: The Dask local client.
"""
# Set default values
from distributed import Client

return Client(*args, **kwargs)
configs = {
"threads_per_worker": 1,
"processes": True,
"n_workers": 5,
"local_directory": "/tmp",
}

# Update defaults with any overrides provided in kwargs
configs.update(kwargs)

# Create and return the Dask Client using the updated parameters
return Client(*args, **configs)

def _create_slurm_client(self, *args: Any, **kwargs: Any):
"""Create a SLURM Dask client with potential overrides.
Expand All @@ -72,17 +84,55 @@ def _create_slurm_client(self, *args: Any, **kwargs: Any):
"""
from dask_jobqueue import SLURMCluster

min_jobs = kwargs.pop(
"minimum_jobs", dask.config.get("jobqueue.adaptive.minimum", 1)
)
max_jobs = kwargs.pop(
"maximum_jobs", dask.config.get("jobqueue.adaptive.maximum", 30)
)

cluster = SLURMCluster(*args, **kwargs)
cluster.adapt(minimum_jobs=min_jobs, maximum_jobs=max_jobs)
# Define default values specific to SLURM
slurm_configs = {
"cores": 1, # Cores per worker
"memory": "12GB", # Memory per worker
"processes": 1, # Processes per worker
"local_directory": "/scratch/frcalkoen/tmp",
"walltime": "4:00:00",
}
# Update default values with any overrides provided in kwargs
slurm_configs.update(kwargs)

# Create the SLURM cluster
cluster = SLURMCluster(*args, **slurm_configs)

cluster.scale(jobs=5)

# min_jobs = kwargs.pop(
# "minimum_jobs", dask.config.get("jobqueue.adaptive.minimum", 1)
# )
# max_jobs = kwargs.pop(
# "maximum_jobs", dask.config.get("jobqueue.adaptive.maximum", 30)
# )

# cluster.adapt(minimum_jobs=min_jobs, maximum_jobs=max_jobs)
return cluster.get_client()

# def _create_slurm_client(self, *args: Any, **kwargs: Any):
# """Create a SLURM Dask client with potential overrides.

# Args:
# *args: Additional positional arguments for client creation.
# **kwargs: Additional keyword arguments for client creation.

# Returns:
# Client: The Dask SLURM client.
# """
# from dask_jobqueue import SLURMCluster

# min_jobs = kwargs.pop(
# "minimum_jobs", dask.config.get("jobqueue.adaptive.minimum", 1)
# )
# max_jobs = kwargs.pop(
# "maximum_jobs", dask.config.get("jobqueue.adaptive.maximum", 30)
# )

# cluster = SLURMCluster(*args, **kwargs)
# cluster.adapt(minimum_jobs=min_jobs, maximum_jobs=max_jobs)
# return cluster.get_client()


def silence_shapely_warnings() -> None:
"""Suppress specific warnings commonly encountered in Shapely geometry operations."""
Expand Down

0 comments on commit 989f57f

Please sign in to comment.