This repository has been archived by the owner on Jan 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
/
RunJobHpcarcEvent.py
139 lines (110 loc) · 4.54 KB
/
RunJobHpcarcEvent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# Class definition:
# RunJobHpcEvent
# This class is the base class for the HPC Event Server classes.
# Instances are generated with RunJobFactory via pUtil::getRunJob()
# Implemented as a singleton class
# http://stackoverflow.com/questions/42558/python-and-the-singleton-pattern
import commands
import json
import os
import re
import shutil
import subprocess
import sys
import time
import traceback
from RunJobHpcEvent import RunJobHpcEvent
from pUtil import tolog
class RunJobHpcarcEvent(RunJobHpcEvent):
# private data members
__runjob = "RunJobHpcarcEvent" # String defining the sub class
__instance = None # Boolean used by subclasses to become a Singleton
#__error = PilotErrors() # PilotErrors object
# Required methods
def __init__(self):
""" Default initialization """
super(RunJobHpcarcEvent, self).__init__()
def __new__(cls, *args, **kwargs):
""" Override the __new__ method to make the class a singleton """
if not cls.__instance:
cls.__instance = super(RunJobHpcarcEvent, cls).__new__(cls, *args, **kwargs)
return cls.__instance
def getRunJob(self):
""" Return a string with the experiment name """
return self.__runjob
def getRunJobFileName(self):
""" Return the filename of the module """
return super(RunJobHpcEvent, self).getRunJobFileName()
# def argumentParser(self): <-- see example in RunJob.py
def allowLoopingJobKiller(self):
""" Should the pilot search for looping jobs? """
# The pilot has the ability to monitor the payload work directory. If there are no updated files within a certain
# time limit, the pilot will consider the as stuck (looping) and will kill it. The looping time limits are set
# in environment.py (see e.g. loopingLimitDefaultProd)
return False
def setupYoda(self):
tolog("setupYoda")
self.__hpcStatue = 'starting'
self.updateAllJobsState('starting', self.__hpcStatue)
status, output = self.prepareHPCJobs()
if status != 0:
tolog("Failed to prepare HPC jobs: status %s, output %s" % (status, output))
self.failAllJobs(0, PilotErrors.ERR_UNKNOWN, self.__jobs, pilotErrorDiag=output)
return
self.__hpcStatus = None
self.__hpcLog = None
hpcManager = self.__hpcManager
hpcJobs = {}
for jobId in self.__jobs:
if len(self.__eventRanges[jobId]) > 0:
hpcJobs[jobId] = self.__jobs[jobId]['hpcJob']
hpcManager.initJobs(hpcJobs, self.__jobEventRanges)
hpcManager.setPandaJobStateFile(self.__jobStateFile)
hpcManager.setStageoutThreads(self.__stageout_threads)
hpcManager.saveState()
self.__hpcManager = hpcManager
def getRankNum():
if os.environ.has_key('RANK_NUM'):
tolog("RANK %s" % os.environ['RANK_NUM'])
return int(os.environ['RANK_NUM'])
elif os.environ.has_key('SLURM_NODEID'):
tolog("RANK %s" % os.environ['SLURM_NODEID'])
return int(os.environ['SLURM_NODEID'])
return None
if __name__ == "__main__":
tolog("Starting RunJobHpcarcEvent")
rank_num = None
if os.environ.has_key('RANK_NUM'):
tolog("RANK_NUM(PBS) is %s" % os.environ['RANK_NUM'])
rank_num = int(os.environ['RANK_NUM'])
elif os.environ.has_key('SLURM_NODEID'):
tolog("RANK_NUM(SLURM) %s" % os.environ['SLURM_NODEID'])
rank_num = int(os.environ['SLURM_NODEID'])
if not os.environ.has_key('PilotHomeDir'):
os.environ['PilotHomeDir'] = os.getcwd()
if True:
runJob = RunJobHpcarcEvent()
try:
runJob.setupHPCEvent(rank_num)
tolog("RANK_NUM %s" % rank_num)
if rank_num is None or rank_num == 0:
runJob.setupHPCManager()
runJob.getHPCEventJobs()
runJob.stageInHPCJobs()
runJob.startHPCJobs()
else:
runJob.startHPCSlaveJobs()
except:
tolog("RunJobHpcEventException")
tolog(traceback.format_exc())
tolog(sys.exc_info()[1])
tolog(sys.exc_info()[2])
finally:
try:
runJob.checkJobMetrics()
runJob.stageOutZipFiles()
except:
tolog("Failed to stageout zip files")
tolog(traceback.format_exc())
runJob.finishJobs()
sys.exit(0)