Skip to content

Commit

Permalink
Add extra context for disk usage warning. (#3495)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael R. Crusoe <michael.crusoe@gmail.com>
  • Loading branch information
DailyDreaming and mr-c authored Mar 18, 2021
1 parent 8179a03 commit 1f0930b
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 66 deletions.
1 change: 0 additions & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ lint:
script:
- pwd
- virtualenv -p python3.6 venv && . venv/bin/activate && make prepare && make develop extras=[all] && pip install htcondor
- make mypy
- make docs


Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ mypy:
$(CURDIR)/contrib/admin/mypy-with-ignore.py

diff_mypy:
mypy --strict --cobertura-xml-report . src/toil || true
diff-cover --fail-under=100 cobertura.xml
mypy --cobertura-xml-report . src/toil || true
diff-cover --fail-under=98 cobertura.xml

flake8: $(PYSOURCES)
flake8 --ignore=E501,W293,W291,E265,E302,E722,E126,E303,E261,E201,E202,W503,W504,W391,E128,E301,E127,E502,E129,E262,E111,E117,E306,E203,E231,E226,E741,E122,E251,E305,E701,E222,E225,E241,E305,E123,E121,E703,E704,E125,E402 $^
Expand Down
15 changes: 15 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,18 @@ addopts = --doctest-modules --tb=native --assert=plain
max-line-length = 88
select = C,E,F,W,B,B950
ignore = E203, E501, W503

[mypy]
warn_unused_configs = True
disallow_any_generics = True
disallow_subclassing_any = True
disallow_untyped_defs = True
disallow_incomplete_defs = True
check_untyped_defs = True
disallow_untyped_decorators = True
no_implicit_optional = True
warn_redundant_casts = True
warn_unused_ignores = True
warn_return_any = True
implicit_reexport = False
strict_equality = True
6 changes: 3 additions & 3 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import uuid

from argparse import _ArgumentGroup, ArgumentParser, ArgumentDefaultsHelpFormatter
from typing import Optional, Callable, Any, List
from typing import Optional, Callable, Any, List, Tuple

from toil import logProcessContext, lookupEnvVar
from toil.batchSystems.options import (add_all_batchsystem_options,
Expand Down Expand Up @@ -1372,7 +1372,7 @@ def cacheDirName(workflowID):
return f'cache-{workflowID}'


def getDirSizeRecursively(dirPath):
def getDirSizeRecursively(dirPath: str) -> int:
"""
This method will return the cumulative number of bytes occupied by the files
on disk in the directory and its subdirectories.
Expand Down Expand Up @@ -1404,7 +1404,7 @@ def getDirSizeRecursively(dirPath):
return 0


def getFileSystemSize(dirPath):
def getFileSystemSize(dirPath: str) -> Tuple[int, int]:
"""
Return the free space, and total size of the file system hosting `dirPath`.
Expand Down
17 changes: 9 additions & 8 deletions src/toil/fileStores/abstractFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
from abc import ABC, abstractmethod
from contextlib import contextmanager
from threading import Event, Semaphore
from typing import Union
from typing import Callable, Generator, Union

import dill

from toil.common import cacheDirName
from toil.fileStores import FileID
from toil.jobStores.abstractJobStore import AbstractJobStore
from toil.lib.io import WriteWatchingStream

from toil.job import Job, JobDescription
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -57,7 +58,7 @@ class AbstractFileStore(ABC):
_pendingFileWrites = set()
_terminateEvent = Event() # Used to signify crashes in threads

def __init__(self, jobStore, jobDesc, localTempDir, waitForPreviousCommit):
def __init__(self, jobStore: AbstractJobStore, jobDesc: JobDescription, localTempDir: str, waitForPreviousCommit: Callable[[],None]) -> None:
"""
Create a new file store object.
Expand All @@ -78,10 +79,10 @@ def __init__(self, jobStore, jobDesc, localTempDir, waitForPreviousCommit):
"""
self.jobStore = jobStore
self.jobDesc = jobDesc
self.localTempDir = os.path.abspath(localTempDir)
self.localTempDir: str = os.path.abspath(localTempDir)
self.workFlowDir = os.path.dirname(self.localTempDir)
self.workDir = os.path.dirname(self.localTempDir)
self.jobName = self.jobDesc.command.split()[1]
self.workDir: str = os.path.dirname(self.localTempDir)
self.jobName: str = self.jobDesc.command.split()[1]
self.waitForPreviousCommit = waitForPreviousCommit
self.loggingMessages = []
# Records file IDs of files deleted during the current job. Doesn't get
Expand Down Expand Up @@ -135,7 +136,7 @@ def shutdownFileStore(workflowDir, workflowID):
NonCachingFileStore.shutdown(workflowDir)

@contextmanager
def open(self, job):
def open(self, job: Job) -> Generator[None, None, None]:
"""
The context manager used to conduct tasks prior-to, and after a job has
been run. File operations are only permitted inside the context
Expand Down Expand Up @@ -476,7 +477,7 @@ def write(self, fileName):
os.rename(fileName + '.tmp', fileName)

# Functions related to logging
def logToMaster(self, text, level=logging.INFO):
def logToMaster(self, text: str, level: int =logging.INFO) -> None:
"""
Send a logging message to the leader. The message will also be \
logged by the worker at the same level.
Expand Down
37 changes: 18 additions & 19 deletions src/toil/fileStores/cachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@
import time
import uuid
from contextlib import contextmanager
from typing import Any, Callable, Generator, Optional

from toil.common import cacheDirName, getDirSizeRecursively, getFileSystemSize
from toil.fileStores import FileID, make_public_dir
from toil.fileStores.abstractFileStore import AbstractFileStore
from toil.jobStores.abstractJobStore import AbstractJobStore
from toil.lib.humanize import bytes2human
from toil.lib.io import atomic_copy, atomic_copyobj, robust_rmtree
from toil.lib.retry import ErrorCondition, retry
from toil.lib.threading import get_process_name, process_name_exists
from toil.job import Job, JobDescription

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -169,7 +172,7 @@ class CachingFileStore(AbstractFileStore):
"""

def __init__(self, jobStore, jobDesc, localTempDir, waitForPreviousCommit):
def __init__(self, jobStore: AbstractJobStore, jobDesc: JobDescription, localTempDir: str, waitForPreviousCommit: Callable[[],None]) -> None:
super(CachingFileStore, self).__init__(jobStore, jobDesc, localTempDir, waitForPreviousCommit)

# For testing, we have the ability to force caching to be non-free, by never linking from the file store
Expand All @@ -196,12 +199,12 @@ def __init__(self, jobStore, jobDesc, localTempDir, waitForPreviousCommit):

# Since each worker has it's own unique CachingFileStore instance, and only one Job can run
# at a time on a worker, we can track some stuff about the running job in ourselves.
self.jobName = str(self.jobDesc)
self.jobName: str = str(self.jobDesc)
self.jobID = self.jobDesc.jobStoreID
logger.debug('Starting job (%s) with ID (%s).', self.jobName, self.jobID)

# When the job actually starts, we will fill this in with the job's disk requirement.
self.jobDiskBytes = None
self.jobDiskBytes: Optional[float] = None

# We need to track what attempt of the workflow we are, to prevent crosstalk between attempts' caches.
self.workflowAttemptNumber = self.jobStore.config.workflowAttemptNumber
Expand Down Expand Up @@ -967,7 +970,7 @@ def _freeUpSpace(self):
# Normal AbstractFileStore API

@contextmanager
def open(self, job):
def open(self, job: Job) -> Generator[None, None, None]:
"""
This context manager decorated method allows cache-specific operations to be conducted
before and after the execution of a job in worker.py
Expand Down Expand Up @@ -996,22 +999,18 @@ def open(self, job):
# See how much disk space is used at the end of the job.
# Not a real peak disk usage, but close enough to be useful for warning the user.
# TODO: Push this logic into the abstract file store
diskUsed = getDirSizeRecursively(self.localTempDir)
logString = ("Job {jobName} used {percent:.2f}% ({humanDisk}B [{disk}B] used, "
"{humanRequestedDisk}B [{requestedDisk}B] requested) at the end of "
"its run.".format(jobName=self.jobName,
percent=(float(diskUsed) / self.jobDiskBytes * 100 if
self.jobDiskBytes > 0 else 0.0),
humanDisk=bytes2human(diskUsed),
disk=diskUsed,
humanRequestedDisk=bytes2human(self.jobDiskBytes),
requestedDisk=self.jobDiskBytes))
self.logToMaster(logString, level=logging.DEBUG)
if diskUsed > self.jobDiskBytes:
self.logToMaster("Job used more disk than requested. Please reconsider modifying "
"the user script to avoid the chance of failure due to "
"incorrectly requested resources. " + logString,
disk: int = getDirSizeRecursively(self.localTempDir)
percent: float = 0.0
if self.jobDiskBytes and self.jobDiskBytes > 0:
percent = float(disk) / self.jobDiskBytes * 100
disk_usage: str = (f"Job {self.jobName} used {percent:.2f}% disk ({bytes2human(disk)}B [{disk}B] used, "
f"{bytes2human(self.jobDiskBytes)}B [{self.jobDiskBytes}B] requested).")
if disk > self.jobDiskBytes:
self.logToMaster("Job used more disk than requested. For CWL, consider increasing the outdirMin "
f"requirement, otherwise, consider increasing the disk requirement. {disk_usage}",
level=logging.WARNING)
else:
self.logToMaster(disk_usage, level=logging.DEBUG)

# Go back up to the per-worker local temp directory.
os.chdir(startingDir)
Expand Down
52 changes: 25 additions & 27 deletions src/toil/fileStores/nonCachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,57 +20,55 @@
import uuid
from collections import defaultdict
from contextlib import contextmanager
from typing import Callable, Dict, Optional, Generator

import dill

from toil.common import getDirSizeRecursively, getFileSystemSize
from toil.fileStores import FileID, make_public_dir
from toil.fileStores.abstractFileStore import AbstractFileStore
from toil.jobStores.abstractJobStore import AbstractJobStore
from toil.lib.humanize import bytes2human
from toil.lib.io import robust_rmtree
from toil.lib.threading import get_process_name, process_name_exists
from toil.job import Job, JobDescription

logger = logging.getLogger(__name__)
logger: logging.Logger = logging.getLogger(__name__)


class NonCachingFileStore(AbstractFileStore):
def __init__(self, jobStore, jobDesc, localTempDir, waitForPreviousCommit):
super(NonCachingFileStore, self).__init__(jobStore, jobDesc, localTempDir, waitForPreviousCommit)
def __init__(self, jobStore: AbstractJobStore, jobDesc: JobDescription, localTempDir: str, waitForPreviousCommit: Callable[[], None]) -> None:
super().__init__(jobStore, jobDesc, localTempDir, waitForPreviousCommit)
# This will be defined in the `open` method.
self.jobStateFile = None
self.localFileMap = defaultdict(list)
self.jobStateFile: Optional[str] = None
self.localFileMap: Dict[str, str] = {}
self.localFileMap = defaultdict(list) # type: ignore

@contextmanager
def open(self, job):
def open(self, job: Job) -> Generator[None, None, None]:
jobReqs = job.disk
startingDir = os.getcwd()
self.localTempDir = make_public_dir(os.path.join(self.localTempDir, str(uuid.uuid4())))
self._removeDeadJobs(self.workDir)
self.jobStateFile = self._createJobStateFile()
freeSpace, diskSize = getFileSystemSize(self.localTempDir)
if freeSpace <= 0.1 * diskSize:
logger.warning('Starting job %s with less than 10%% of disk space remaining.',
self.jobName)
logger.warning(f'Starting job {self.jobName} with less than 10%% of disk space remaining.')
try:
os.chdir(self.localTempDir)
with super().open(job):
yield
finally:
diskUsed = getDirSizeRecursively(self.localTempDir)
logString = ("Job {jobName} used {percent:.2f}% ({humanDisk}B [{disk}B] used, "
"{humanRequestedDisk}B [{requestedDisk}B] requested) at the end of "
"its run.".format(jobName=self.jobName,
percent=(float(diskUsed) / jobReqs * 100 if
jobReqs > 0 else 0.0),
humanDisk=bytes2human(diskUsed),
disk=diskUsed,
humanRequestedDisk=bytes2human(jobReqs),
requestedDisk=jobReqs))
self.logToMaster(logString, level=logging.DEBUG)
if diskUsed > jobReqs:
self.logToMaster("Job used more disk than requested. Consider modifying the user "
"script to avoid the chance of failure due to incorrectly "
"requested resources. " + logString, level=logging.WARNING)
disk = getDirSizeRecursively(self.localTempDir)
percent = float(disk) / jobReqs * 100 if jobReqs > 0 else 0.0
disk_usage = (f"Job {self.jobName} used {percent:.2f}% disk ({bytes2human(disk)}B [{disk}B] used, "
f"{bytes2human(jobReqs)}B [{jobReqs}B] requested).")
if disk > jobReqs:
self.logToMaster("Job used more disk than requested. For CWL, consider increasing the outdirMin "
f"requirement, otherwise, consider increasing the disk requirement. {disk_usage}",
level=logging.WARNING)
else:
self.logToMaster(disk_usage, level=logging.DEBUG)
os.chdir(startingDir)
# Finally delete the job from the worker
os.remove(self.jobStateFile)
Expand Down Expand Up @@ -166,7 +164,7 @@ def __del__(self):
"""

@classmethod
def _removeDeadJobs(cls, nodeInfo, batchSystemShutdown=False):
def _removeDeadJobs(cls, nodeInfo: str, batchSystemShutdown: bool=False) -> None:
"""
Look at the state of all jobs registered in the individual job state files, and handle them
(clean up the disk)
Expand All @@ -190,7 +188,7 @@ def _removeDeadJobs(cls, nodeInfo, batchSystemShutdown=False):
try:
# Try and lock it
fcntl.lockf(dirFD, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError as e:
except OSError as e:
# We lost the race. Someone else is alive and has it locked.
os.close(dirFD)
else:
Expand Down Expand Up @@ -230,7 +228,7 @@ def _getAllJobStates(workflowDir):
for filename in jobStateFiles:
try:
yield NonCachingFileStore._readJobState(filename)
except IOError as e:
except OSError as e:
if e.errno == 2:
# job finished & deleted its jobState file since the jobState files were discovered
continue
Expand All @@ -243,7 +241,7 @@ def _readJobState(jobStateFileName):
state = dill.load(fH)
return state

def _createJobStateFile(self):
def _createJobStateFile(self) -> str:
"""
Create the job state file for the current job and fill in the required
values.
Expand Down
9 changes: 5 additions & 4 deletions src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from contextlib import contextmanager
from io import BytesIO
from typing import Dict, Optional, Union

import dill

Expand Down Expand Up @@ -310,7 +311,7 @@ def requirements(self):
return dict(self._requirementOverrides)

@property
def disk(self):
def disk(self) -> int:
"""
The maximum number of bytes of disk required.
Expand Down Expand Up @@ -372,7 +373,7 @@ class JobDescription(Requirer):
their specific parameters.
"""

def __init__(self, requirements, jobName, unitName='', displayName='', command=None):
def __init__(self, requirements: Dict[str, Union[int, str, bool]], jobName: str, unitName: str='', displayName: str='', command: Optional[str]=None) -> None:
"""
Create a new JobDescription.
Expand Down Expand Up @@ -415,7 +416,7 @@ def makeString(x):
# Gets replaced with/rewritten into the real, executable command when
# the leader passes the description off to the batch system to be
# executed.
self.command = command
self.command: Optional[str] = command

# Set scheduling properties that the leader read to think about scheduling.

Expand Down Expand Up @@ -1050,7 +1051,7 @@ def description(self):
# requirements through to the JobDescription.

@property
def disk(self):
def disk(self) -> int:
"""
The maximum number of bytes of disk the job will require to run.
Expand Down
11 changes: 9 additions & 2 deletions src/toil/lib/humanize.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# http://code.activestate.com/recipes/578019-bytes-to-human-human-to-bytes-converter/

from typing import Optional, SupportsInt

"""
Bytes-to-human / human-to-bytes converter.
Based on: http://goo.gl/kTQMs
Expand All @@ -16,7 +18,7 @@
'iec_ext' : ('byte', 'kibi', 'mebi', 'gibi', 'tebi', 'pebi', 'exbi', 'zebi', 'yobi'),
}

def bytes2human(n, fmt='%(value).1f %(symbol)s', symbols='customary'):
def bytes2human(n: SupportsInt, fmt: Optional[str] = None, symbols: Optional[str] = None) -> str:
"""
Convert n bytes into a human readable string based on format.
symbols can be either "customary", "customary_ext", "iec" or "iec_ext",
Expand All @@ -25,7 +27,12 @@ def bytes2human(n, fmt='%(value).1f %(symbol)s', symbols='customary'):
n = int(n)
if n < 0:
raise ValueError("n < 0")
symbols = SYMBOLS[symbols]
if not fmt:
fmt = '%(value).1f %(symbol)s'
if not symbols:
symbols = SYMBOLS['customary']
else:
symbols = SYMBOLS[symbols]
prefix = {}
for i, s in enumerate(symbols[1:]):
prefix[s] = 1 << (i+1)*10
Expand Down

0 comments on commit 1f0930b

Please sign in to comment.