From 368c50c6a9e22e3c0dc4d227cb4d52d5150d499e Mon Sep 17 00:00:00 2001 From: Alan Malta Rodrigues Date: Tue, 14 Sep 2021 16:41:43 +0200 Subject: [PATCH] new unit tests TaskChain unit tests for GPU --- .../WMCore_t/WMSpec_t/StdSpecs_t/StdBase_t.py | 22 +++- .../WMSpec_t/StdSpecs_t/TaskChain_t.py | 116 ++++++++++++++++++ 2 files changed, 137 insertions(+), 1 deletion(-) diff --git a/test/python/WMCore_t/WMSpec_t/StdSpecs_t/StdBase_t.py b/test/python/WMCore_t/WMSpec_t/StdSpecs_t/StdBase_t.py index e71e4d9597..a1b9a94785 100644 --- a/test/python/WMCore_t/WMSpec_t/StdSpecs_t/StdBase_t.py +++ b/test/python/WMCore_t/WMSpec_t/StdSpecs_t/StdBase_t.py @@ -7,9 +7,11 @@ """ from __future__ import print_function +import json import unittest from WMCore.WMSpec.StdSpecs.StdBase import StdBase +from WMCore.WMSpec.WMSpecErrors import WMSpecFactoryException class StdBaseTest(unittest.TestCase): @@ -75,7 +77,25 @@ def testCalcEvtsPerJobLumi(self): self.assertEqual((15000, 100), StdBase.calcEvtsPerJobLumi(None, 100, 1, requestedEvents=15000)) self.assertEqual((15000, 15000), StdBase.calcEvtsPerJobLumi(None, None, 1, requestedEvents=15000)) - return + def testValidateGPUSettings(self): + """ + Test the 'validateGPUSettings' StdBase method. + """ + with self.assertRaises(WMSpecFactoryException): + StdBase.validateGPUSettings({"RequiresGPU": "optional"}) + with self.assertRaises(WMSpecFactoryException): + StdBase.validateGPUSettings({"RequiresGPU": "required"}) + with self.assertRaises(WMSpecFactoryException): + StdBase.validateGPUSettings({"RequiresGPU": "optional", "GPUParams": json.dumps("")}) + with self.assertRaises(WMSpecFactoryException): + StdBase.validateGPUSettings({"RequiresGPU": "required", "GPUParams": json.dumps(None)}) + + # now input that passes the validation + self.assertTrue(StdBase.validateGPUSettings({"RequiresGPU": "forbidden"})) + self.assertTrue(StdBase.validateGPUSettings({"RequiresGPU": "optional", + "GPUParams": json.dumps("blah")})) + self.assertTrue(StdBase.validateGPUSettings({"RequiresGPU": "required", + "GPUParams": json.dumps({"Key1": "value1"})})) if __name__ == "__main__": diff --git a/test/python/WMCore_t/WMSpec_t/StdSpecs_t/TaskChain_t.py b/test/python/WMCore_t/WMSpec_t/StdSpecs_t/TaskChain_t.py index e8f5a071a0..86a64567a9 100644 --- a/test/python/WMCore_t/WMSpec_t/StdSpecs_t/TaskChain_t.py +++ b/test/python/WMCore_t/WMSpec_t/StdSpecs_t/TaskChain_t.py @@ -2333,5 +2333,121 @@ def testResourcesOverride(self): } testWorkload.updateArguments(assignDict) + def testGPUTaskChains(self): + """ + Test GPU support in TaskChains, top level settings only + """ + processorDocs = makeProcessingConfigs(self.configDatabase) + + arguments = TaskChainWorkloadFactory.getTestArguments() + arguments.update(deepcopy(REQUEST_INPUT)) + arguments['Task1']['ConfigCacheID'] = processorDocs['DigiHLT'] + arguments['Task2']['ConfigCacheID'] = processorDocs['Reco'] + factory = TaskChainWorkloadFactory() + testWorkload = factory.factoryWorkloadConstruction("PullingTheChain", arguments) + self.assertIsNone(arguments['RequiresGPU']) + self.assertEqual(arguments['GPUParams'], json.dumps(None)) + for taskKey in ("Task1", "Task2"): + self.assertTrue("RequiresGPU" not in arguments[taskKey]) + self.assertTrue("GPUParams" not in arguments[taskKey]) + + for taskName in testWorkload.listAllTaskNames(): + taskObj = testWorkload.getTaskByName(taskName) + for stepName in taskObj.listAllStepNames(): + stepHelper = taskObj.getStepHelper(stepName) + if stepHelper.stepType() == "CMSSW": + self.assertEqual(stepHelper.data.application.gpu.gpuRequired, "forbidden") + self.assertIsNone(stepHelper.data.application.gpu.gpuRequirements) + else: + self.assertFalse(hasattr(stepHelper.data.application, "gpu")) + + ### Now assign this workflow and check those arguments once again + assignDict = {"SiteWhitelist": ["T2_US_Nebraska"], "Team": "The-A-Team", + "MergedLFNBase": "/store/data", + "UnmergedLFNBase": "/store/unmerged" + } + testWorkload.updateArguments(assignDict) + + self.assertIsNone(arguments['RequiresGPU']) + self.assertEqual(arguments['GPUParams'], json.dumps(None)) + for taskKey in ("Task1", "Task2"): + self.assertTrue("RequiresGPU" not in arguments[taskKey]) + self.assertTrue("GPUParams" not in arguments[taskKey]) + + for taskName in testWorkload.listAllTaskNames(): + taskObj = testWorkload.getTaskByName(taskName) + for stepName in taskObj.listAllStepNames(): + stepHelper = taskObj.getStepHelper(stepName) + if stepHelper.stepType() == "CMSSW": + self.assertEqual(stepHelper.data.application.gpu.gpuRequired, "forbidden") + self.assertIsNone(stepHelper.data.application.gpu.gpuRequirements) + else: + self.assertFalse(hasattr(stepHelper.data.application, "gpu")) + + # last but not least, test a failing case + arguments['RequiresGPU'] = "required" + arguments['GPUParams'] = json.dumps(None) + with self.assertRaises(WMSpecFactoryException): + factory.factoryWorkloadConstruction("PullingTheChain", arguments) + + + def testGPUTaskChainsTasks(self): + """ + Test GPU support in TaskChains, with task-level settings + """ + processorDocs = makeProcessingConfigs(self.configDatabase) + + arguments = TaskChainWorkloadFactory.getTestArguments() + arguments.update(deepcopy(REQUEST_INPUT)) + arguments['Task1']['ConfigCacheID'] = processorDocs['DigiHLT'] + arguments['Task2']['ConfigCacheID'] = processorDocs['Reco'] + gpuParams = {"GPUMemoryMB": 1234, "CUDARuntime": "11.2.3", "CUDACapabilities": ["7.5", "8.0"]} + arguments['Task1'].update({"RequiresGPU": "optional", "GPUParams": json.dumps(gpuParams)}) + arguments['Task2'].update({"RequiresGPU": "required", "GPUParams": json.dumps(gpuParams)}) + factory = TaskChainWorkloadFactory() + testWorkload = factory.factoryWorkloadConstruction("PullingTheChain", arguments) + self.assertIsNone(arguments['RequiresGPU']) + self.assertEqual(arguments["Task1"]['RequiresGPU'], "optional") + self.assertEqual(arguments["Task2"]['RequiresGPU'], "required") + + self.assertEqual(arguments['GPUParams'], json.dumps(None)) + self.assertEqual(arguments["Task1"]['GPUParams'], json.dumps(gpuParams)) + self.assertEqual(arguments["Task2"]['GPUParams'], json.dumps(gpuParams)) + + for taskName in testWorkload.listAllTaskNames(): + taskObj = testWorkload.getTaskByName(taskName) + for stepName in taskObj.listAllStepNames(): + stepHelper = taskObj.getStepHelper(stepName) + if taskObj.taskType() in ["Merge", "Harvesting", "Cleanup", "LogCollect"]: + if stepHelper.stepType() == "CMSSW": + self.assertEqual(stepHelper.data.application.gpu.gpuRequired, "forbidden") + self.assertIsNone(stepHelper.data.application.gpu.gpuRequirements) + else: + self.assertFalse(hasattr(stepHelper.data.application, "gpu")) + elif stepHelper.stepType() == "CMSSW" and taskName == "DIGI": + self.assertEqual(stepHelper.data.application.gpu.gpuRequired, arguments["Task1"]['RequiresGPU']) + self.assertItemsEqual(stepHelper.data.application.gpu.gpuRequirements, gpuParams) + elif stepHelper.stepType() == "CMSSW" and taskName == "RECO": + self.assertEqual(stepHelper.data.application.gpu.gpuRequired, arguments["Task2"]['RequiresGPU']) + self.assertEqual(stepHelper.data.application.gpu.gpuRequirements, gpuParams) + else: + self.assertFalse(hasattr(stepHelper.data.application, "gpu")) + + + ### Now assign this workflow + assignDict = {"SiteWhitelist": ["T2_US_Nebraska"], "Team": "The-A-Team", + "MergedLFNBase": "/store/data", + "UnmergedLFNBase": "/store/unmerged" + } + testWorkload.updateArguments(assignDict) + self.assertIsNone(arguments['RequiresGPU']) + self.assertEqual(arguments["Task1"]['RequiresGPU'], "optional") + self.assertEqual(arguments["Task2"]['RequiresGPU'], "required") + + self.assertEqual(arguments['GPUParams'], json.dumps(None)) + self.assertEqual(arguments["Task1"]['GPUParams'], json.dumps(gpuParams)) + self.assertEqual(arguments["Task2"]['GPUParams'], json.dumps(gpuParams)) + + if __name__ == '__main__': unittest.main()