Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to GPUs in TaskChain spec #10805

Merged
merged 2 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions src/python/WMCore/WMSpec/StdSpecs/ReReco.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ def validateSchema(self, schema):
instanceArguments[realArg] = skimArguments[argument]
try:
validateArgumentsCreate(skimSchema, instanceArguments)
# Validate GPU-related spec parameters
DataProcessing.validateGPUSettings(schema)
except Exception as ex:
self.raiseValidationException(str(ex))

Expand All @@ -276,10 +278,3 @@ def validateSchema(self, schema):
if diffSet:
self.raiseValidationException(
msg="A transient output module was specified but no skim was defined for it")

# Validate GPU-related spec parameters
if schema["RequiresGPU"] in ("optional", "required"):
if not json.loads(schema["GPUParams"]):
msg = "Request is set with RequiresGPU={}, ".format(schema["RequiresGPU"])
msg += "but GPUParams schema is not provided or correct."
self.raiseValidationException(msg)
23 changes: 23 additions & 0 deletions src/python/WMCore/WMSpec/StdSpecs/StdBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,9 @@ def getChainCreateArgs(firstTask=False, generator=False):
'Memory': {'default': None, 'null': True, 'type': float, 'validate': lambda x: x > 0},
'Multicore': {'default': 0, 'type': int, 'validate': lambda x: x > 0},
'PrepID': {'default': None, 'null': True, 'optional': True, 'type': str},
"RequiresGPU": {"default": "forbidden",
Copy link
Contributor

@todor-ivanov todor-ivanov Sep 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope these few lines do not overlap with the some of the previous PR. Otherwise this may cause some conflicts.

"validate": lambda x: x in ("forbidden", "optional", "required")},
"GPUParams": {"default": json.dumps(None), "validate": gpuParameters},
'PrimaryDataset': {'null': True, 'validate': primdataset, 'attr': 'inputPrimaryDataset'},
'ProcessingString': {'optional': True, 'validate': procstring},
'ProcessingVersion': {'type': int, 'validate': procversion},
Expand Down Expand Up @@ -1359,3 +1362,23 @@ def getAssignTestArguments(cls):
else:
schema[arg] = workloadDefinition[arg]['default']
return schema

@staticmethod
def validateGPUSettings(schemaData):
"""
Method to check whether GPU settings have been provided for
a workflow (or tasks/step) that requires GPUs (or has it set
to optional).
:param schemaData: workflow or task/step dictionary
:return: nothing if validation is successful, otherwise raises an exception
"""
if schemaData.get("RequiresGPU") in ("optional", "required"):
try:
msg = "Request is set with RequiresGPU={}, ".format(schemaData["RequiresGPU"])
if not json.loads(schemaData["GPUParams"]):
msg += "but GPUParams argument is empty and/or incorrect."
raise WMSpecFactoryException(msg)
except KeyError:
msg += "but GPUParams argument has not been provided."
raise WMSpecFactoryException(msg)
return True
18 changes: 15 additions & 3 deletions src/python/WMCore/WMSpec/StdSpecs/TaskChain.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,13 @@
},
"""
from __future__ import division

import json
from builtins import range, object
from future.utils import viewitems

from Utils.Utilities import makeList, strToBool
from WMCore.Lexicon import primdataset, taskStepName
from WMCore.Lexicon import primdataset, taskStepName, gpuParameters
from WMCore.WMSpec.StdSpecs.StdBase import StdBase
from WMCore.WMSpec.WMWorkloadTools import (validateArgumentsCreate, parsePileupConfig,
checkMemCore, checkEventStreams, checkTimePerEvent)
Expand Down Expand Up @@ -625,7 +627,11 @@ def getWorkloadCreateArgs():
"TimePerEvent": {"default": 12.0, "type": float, "validate": checkTimePerEvent},
"Memory": {"default": 2300.0, "type": float, "validate": checkMemCore},
"Multicore": {"default": 1, "type": int, "validate": checkMemCore},
"EventStreams": {"type": int, "null": True, "default": 0, "validate": checkEventStreams}
"EventStreams": {"type": int, "null": True, "default": 0, "validate": checkEventStreams},
# no need for workload-level defaults, if task-level default is provided
"RequiresGPU": {"default": None, "null": True,
"validate": lambda x: x in ("forbidden", "optional", "required")},
"GPUParams": {"default": json.dumps(None), "validate": gpuParameters},
}
baseArgs.update(specArgs)
StdBase.setDefaultArgumentsProperty(baseArgs)
Expand Down Expand Up @@ -727,6 +733,11 @@ def validateSchema(self, schema):
if task['InputFromOutputModule'] in inputTransientModules:
inputTransientModules.remove(task['InputFromOutputModule'])

try:
StdBase.validateGPUSettings(schema)
except Exception as ex:
self.raiseValidationException(str(ex))

for task in transientMapping:
if transientMapping[task]:
msg = "A transient module is not processed by a subsequent task.\n"
Expand All @@ -742,10 +753,11 @@ def validateTask(self, taskConf, taskArgumentDefinition):
"""
try:
validateArgumentsCreate(taskConf, taskArgumentDefinition, checkInputDset=False)
# Validate GPU-related spec parameters
StdBase.validateGPUSettings(taskConf)
except WMSpecFactoryException:
# just re-raise it to keep the error message clear
raise
except Exception as ex:
self.raiseValidationException(str(ex))

return
22 changes: 21 additions & 1 deletion test/python/WMCore_t/WMSpec_t/StdSpecs_t/StdBase_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
"""
from __future__ import print_function

import json
import unittest

from WMCore.WMSpec.StdSpecs.StdBase import StdBase
from WMCore.WMSpec.WMSpecErrors import WMSpecFactoryException


class StdBaseTest(unittest.TestCase):
Expand Down Expand Up @@ -75,7 +77,25 @@ def testCalcEvtsPerJobLumi(self):
self.assertEqual((15000, 100), StdBase.calcEvtsPerJobLumi(None, 100, 1, requestedEvents=15000))
self.assertEqual((15000, 15000), StdBase.calcEvtsPerJobLumi(None, None, 1, requestedEvents=15000))

return
def testValidateGPUSettings(self):
"""
Test the 'validateGPUSettings' StdBase method.
"""
with self.assertRaises(WMSpecFactoryException):
StdBase.validateGPUSettings({"RequiresGPU": "optional"})
with self.assertRaises(WMSpecFactoryException):
StdBase.validateGPUSettings({"RequiresGPU": "required"})
with self.assertRaises(WMSpecFactoryException):
StdBase.validateGPUSettings({"RequiresGPU": "optional", "GPUParams": json.dumps("")})
with self.assertRaises(WMSpecFactoryException):
StdBase.validateGPUSettings({"RequiresGPU": "required", "GPUParams": json.dumps(None)})

# now input that passes the validation
self.assertTrue(StdBase.validateGPUSettings({"RequiresGPU": "forbidden"}))
self.assertTrue(StdBase.validateGPUSettings({"RequiresGPU": "optional",
"GPUParams": json.dumps("blah")}))
self.assertTrue(StdBase.validateGPUSettings({"RequiresGPU": "required",
"GPUParams": json.dumps({"Key1": "value1"})}))


if __name__ == "__main__":
Expand Down
116 changes: 116 additions & 0 deletions test/python/WMCore_t/WMSpec_t/StdSpecs_t/TaskChain_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -2333,5 +2333,121 @@ def testResourcesOverride(self):
}
testWorkload.updateArguments(assignDict)

def testGPUTaskChains(self):
"""
Test GPU support in TaskChains, top level settings only
"""
processorDocs = makeProcessingConfigs(self.configDatabase)

arguments = TaskChainWorkloadFactory.getTestArguments()
arguments.update(deepcopy(REQUEST_INPUT))
arguments['Task1']['ConfigCacheID'] = processorDocs['DigiHLT']
arguments['Task2']['ConfigCacheID'] = processorDocs['Reco']
factory = TaskChainWorkloadFactory()
testWorkload = factory.factoryWorkloadConstruction("PullingTheChain", arguments)
self.assertIsNone(arguments['RequiresGPU'])
self.assertEqual(arguments['GPUParams'], json.dumps(None))
for taskKey in ("Task1", "Task2"):
self.assertTrue("RequiresGPU" not in arguments[taskKey])
self.assertTrue("GPUParams" not in arguments[taskKey])

for taskName in testWorkload.listAllTaskNames():
taskObj = testWorkload.getTaskByName(taskName)
for stepName in taskObj.listAllStepNames():
stepHelper = taskObj.getStepHelper(stepName)
if stepHelper.stepType() == "CMSSW":
self.assertEqual(stepHelper.data.application.gpu.gpuRequired, "forbidden")
self.assertIsNone(stepHelper.data.application.gpu.gpuRequirements)
else:
self.assertFalse(hasattr(stepHelper.data.application, "gpu"))

### Now assign this workflow and check those arguments once again
assignDict = {"SiteWhitelist": ["T2_US_Nebraska"], "Team": "The-A-Team",
"MergedLFNBase": "/store/data",
"UnmergedLFNBase": "/store/unmerged"
}
testWorkload.updateArguments(assignDict)

self.assertIsNone(arguments['RequiresGPU'])
self.assertEqual(arguments['GPUParams'], json.dumps(None))
for taskKey in ("Task1", "Task2"):
self.assertTrue("RequiresGPU" not in arguments[taskKey])
self.assertTrue("GPUParams" not in arguments[taskKey])

for taskName in testWorkload.listAllTaskNames():
taskObj = testWorkload.getTaskByName(taskName)
for stepName in taskObj.listAllStepNames():
stepHelper = taskObj.getStepHelper(stepName)
if stepHelper.stepType() == "CMSSW":
self.assertEqual(stepHelper.data.application.gpu.gpuRequired, "forbidden")
self.assertIsNone(stepHelper.data.application.gpu.gpuRequirements)
else:
self.assertFalse(hasattr(stepHelper.data.application, "gpu"))

# last but not least, test a failing case
arguments['RequiresGPU'] = "required"
arguments['GPUParams'] = json.dumps(None)
with self.assertRaises(WMSpecFactoryException):
factory.factoryWorkloadConstruction("PullingTheChain", arguments)


def testGPUTaskChainsTasks(self):
"""
Test GPU support in TaskChains, with task-level settings
"""
processorDocs = makeProcessingConfigs(self.configDatabase)

arguments = TaskChainWorkloadFactory.getTestArguments()
arguments.update(deepcopy(REQUEST_INPUT))
arguments['Task1']['ConfigCacheID'] = processorDocs['DigiHLT']
arguments['Task2']['ConfigCacheID'] = processorDocs['Reco']
gpuParams = {"GPUMemoryMB": 1234, "CUDARuntime": "11.2.3", "CUDACapabilities": ["7.5", "8.0"]}
arguments['Task1'].update({"RequiresGPU": "optional", "GPUParams": json.dumps(gpuParams)})
arguments['Task2'].update({"RequiresGPU": "required", "GPUParams": json.dumps(gpuParams)})
factory = TaskChainWorkloadFactory()
testWorkload = factory.factoryWorkloadConstruction("PullingTheChain", arguments)
self.assertIsNone(arguments['RequiresGPU'])
self.assertEqual(arguments["Task1"]['RequiresGPU'], "optional")
self.assertEqual(arguments["Task2"]['RequiresGPU'], "required")

self.assertEqual(arguments['GPUParams'], json.dumps(None))
self.assertEqual(arguments["Task1"]['GPUParams'], json.dumps(gpuParams))
self.assertEqual(arguments["Task2"]['GPUParams'], json.dumps(gpuParams))

for taskName in testWorkload.listAllTaskNames():
taskObj = testWorkload.getTaskByName(taskName)
for stepName in taskObj.listAllStepNames():
stepHelper = taskObj.getStepHelper(stepName)
if taskObj.taskType() in ["Merge", "Harvesting", "Cleanup", "LogCollect"]:
if stepHelper.stepType() == "CMSSW":
self.assertEqual(stepHelper.data.application.gpu.gpuRequired, "forbidden")
self.assertIsNone(stepHelper.data.application.gpu.gpuRequirements)
else:
self.assertFalse(hasattr(stepHelper.data.application, "gpu"))
elif stepHelper.stepType() == "CMSSW" and taskName == "DIGI":
self.assertEqual(stepHelper.data.application.gpu.gpuRequired, arguments["Task1"]['RequiresGPU'])
self.assertItemsEqual(stepHelper.data.application.gpu.gpuRequirements, gpuParams)
elif stepHelper.stepType() == "CMSSW" and taskName == "RECO":
self.assertEqual(stepHelper.data.application.gpu.gpuRequired, arguments["Task2"]['RequiresGPU'])
self.assertEqual(stepHelper.data.application.gpu.gpuRequirements, gpuParams)
else:
self.assertFalse(hasattr(stepHelper.data.application, "gpu"))


### Now assign this workflow
assignDict = {"SiteWhitelist": ["T2_US_Nebraska"], "Team": "The-A-Team",
"MergedLFNBase": "/store/data",
"UnmergedLFNBase": "/store/unmerged"
}
testWorkload.updateArguments(assignDict)
self.assertIsNone(arguments['RequiresGPU'])
self.assertEqual(arguments["Task1"]['RequiresGPU'], "optional")
self.assertEqual(arguments["Task2"]['RequiresGPU'], "required")

self.assertEqual(arguments['GPUParams'], json.dumps(None))
self.assertEqual(arguments["Task1"]['GPUParams'], json.dumps(gpuParams))
self.assertEqual(arguments["Task2"]['GPUParams'], json.dumps(gpuParams))


if __name__ == '__main__':
unittest.main()