This repository has been archived by the owner on Jan 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
/
RunJob.py
2150 lines (1743 loc) · 96.1 KB
/
RunJob.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Class definition:
# RunJob
# This is the main RunJob class; RunJobEvent etc will inherit from this class
# Note: at the moment, this class is essentially the old runJob module turned object oriented.
# The class will later become RunJobNormal, ie responible for running normal PanDA jobs.
# At that point a new RunJob top class will be created containing methods that have been
# identified as common between the various sub classes.
# Instances are generated with RunJobFactory
# Subclasses should implement all needed methods prototyped in this class
# Note: not compatible with Singleton Design Pattern due to the subclassing
# Standard python modules
import os, sys, commands, time
import traceback
import atexit, signal
from optparse import OptionParser
from json import loads
# Pilot modules
import Site, pUtil, Job, Node, RunJobUtilities
import Mover as mover
from pUtil import tolog, readpar, createLockFile, getDatasetDict, getSiteInformation,\
tailPilotErrorDiag, getCmtconfig, getExperiment, getGUID, getWriteToInputFilenames
from JobRecovery import JobRecovery
from FileStateClient import updateFileStates, dumpFileStates
from ErrorDiagnosis import ErrorDiagnosis # import here to avoid issues seen at BU with missing module
from PilotErrors import PilotErrors
from shutil import copy2
from FileHandling import tail, getExtension, extractOutputFiles, getDestinationDBlockItems, getDirectAccess, writeFile, readFile
from EventRanges import downloadEventRanges
from processes import get_cpu_consumption_time
# remove logguid, debuglevel - not needed
# relabelled -h, queuename to -b (debuglevel not used)
class RunJob(object):
# private data members
__runjob = "RunJob" # String defining the RunJob class
__instance = None # Boolean used by subclasses to become a Singleton
__error = PilotErrors() # PilotErrors object
# __appdir = "/usatlas/projects/OSG" # Default software installation directory
# __debugLevel = 0 # 0: debug info off, 1: display function name when called, 2: full debug info
__failureCode = None # set by signal handler when user/batch system kills the job
__globalPilotErrorDiag = "" # global pilotErrorDiag used with signal handler (only)
__globalErrorCode = 0 # global error code used with signal handler (only)
__inputDir = "" # location of input files (source for mv site mover)
__logguid = None # guid for the log file
__outputDir = "" # location of output files (destination for mv site mover)
__pilot_initdir = "" # location of where the pilot is untarred and started
__pilotlogfilename = "pilotlog.txt" # default pilotlog filename
__pilotserver = "localhost" # default server
__pilotport = 88888 # default port
__proxycheckFlag = True # True (default): perform proxy validity checks, False: no check
__pworkdir = "/tmp" # site work dir used by the parent
# __queuename = "" # PanDA queue NOT NEEDED
# __sitename = "testsite" # PanDA site NOT NEEDED
__stageinretry = 1 # number of stage-in tries
__stageoutretry = 1 # number of stage-out tries
# __testLevel = 0 # test suite control variable (0: no test, 1: put error, 2: ...) NOT USED
# __workdir = "/tmp" # NOT USED
__cache = "" # Cache URL, e.g. used by LSST
__pandaserver = "" # Full PanDA server url incl. port and sub dirs
__recovery = False
__jobStateFile = None
__yodaNodes = None
__yodaQueue = None
corruptedFiles = []
# Getter and setter methods
def getExperiment(self):
""" Getter for __experiment """
return self.__experiment
def getFailureCode(self):
""" Getter for __failureCode """
return self.__failureCode
def setFailureCode(self, code):
""" Setter for __failureCode """
self.__failureCode = code
def getGlobalPilotErrorDiag(self):
""" Getter for __globalPilotErrorDiag """
return self.__globalPilotErrorDiag
def setGlobalPilotErrorDiag(self, pilotErrorDiag):
""" Setter for __globalPilotErrorDiag """
self.__globalPilotErrorDiag = pilotErrorDiag
def getGlobalErrorCode(self):
""" Getter for __globalErrorCode """
return self.__globalErrorCode
def setGlobalErrorCode(self, code):
""" Setter for __globalErrorCode """
self.__globalErrorCode = code
def setCache(self, cache):
""" Setter for __cache """
self.__cache = cache
def getInputDir(self):
""" Getter for __inputDir """
return self.__inputDir
def setInputDir(self, inputDir):
""" Setter for __inputDir """
self.__inputDir = inputDir
def getLogGUID(self):
""" Getter for __logguid """
return self.__logguid
def getOutputDir(self):
""" Getter for __outputDir """
return self.__outputDir
def getPilotInitDir(self):
""" Getter for __pilot_initdir """
return self.__pilot_initdir
def setPilotInitDir(self, pilot_initdir):
""" Setter for __pilot_initdir """
self.__pilot_initdir = pilot_initdir
def getPilotLogFilename(self):
""" Getter for __pilotlogfilename """
return self.__pilotlogfilename
def getPilotServer(self):
""" Getter for __pilotserver """
return self.__pilotserver
def getPilotPort(self):
""" Getter for __pilotport """
return self.__pilotport
def getProxyCheckFlag(self):
""" Getter for __proxycheckFlag """
return self.__proxycheckFlag
def getParentWorkDir(self):
""" Getter for __pworkdir """
return self.__pworkdir
def setParentWorkDir(self, pworkdir):
""" Setter for __pworkdir """
self.__pworkdir = pworkdir
def getStageInRetry(self):
""" Getter for __stageinretry """
return self.__stageinretry
def getStageOutRetry(self):
""" Getter for __stageoutretry """
return self.__stageoutretry
def setStageInRetry(self, stageinretry):
""" Setter for __stageinretry """
self.__stageinretry = stageinretry
def getCache(self):
""" Getter for __cache """
return self.__cache
def getRecovery(self):
return self.__recovery
def getJobStateFile(self):
return self.__jobStateFile
def setLogGUID(self, logguid):
""" Setter for __logguid """
self.__logguid = logguid
def getYodaNodes(self):
try:
if self.__yodaNodes is None:
return None
nodes = int(self.__yodaNodes)
return nodes
except:
tolog(traceback.format_exc())
return None
def getYodaQueue(self):
try:
if self.__yodaQueue is None:
return None
return self.__yodaQueue
except:
tolog(traceback.format_exc())
return None
def getPanDAServer(self):
""" Getter for __pandaserver """
return self.__pandaserver
def setPanDAServer(self, pandaserver):
""" Setter for __pandaserver """
self.__pandaserver = pandaserver
# Required methods
def __init__(self):
""" Default initialization """
# e.g. self.__errorLabel = errorLabel
pass
def getRunJob(self):
""" Return a string with the module name """
return self.__runjob
def argumentParser(self):
""" Argument parser for the RunJob module """
# Return variables
appdir = None
queuename = None
sitename = None
workdir = None
parser = OptionParser()
parser.add_option("-a", "--appdir", dest="appdir",
help="The local path to the applications directory", metavar="APPDIR")
parser.add_option("-b", "--queuename", dest="queuename",
help="Queue name", metavar="QUEUENAME")
parser.add_option("-d", "--workdir", dest="workdir",
help="The local path to the working directory of the payload", metavar="WORKDIR")
parser.add_option("-g", "--inputdir", dest="inputDir",
help="Location of input files to be transferred by the mv site mover", metavar="INPUTDIR")
parser.add_option("-i", "--logfileguid", dest="logguid",
help="Log file guid", metavar="GUID")
parser.add_option("-k", "--pilotlogfilename", dest="pilotlogfilename",
help="The name of the pilot log file", metavar="PILOTLOGFILENAME")
parser.add_option("-l", "--pilotinitdir", dest="pilot_initdir",
help="The local path to the directory where the pilot was launched", metavar="PILOT_INITDIR")
parser.add_option("-m", "--outputdir", dest="outputDir",
help="Destination of output files to be transferred by the mv site mover", metavar="OUTPUTDIR")
parser.add_option("-o", "--parentworkdir", dest="pworkdir",
help="Path to the work directory of the parent process (i.e. the pilot)", metavar="PWORKDIR")
parser.add_option("-s", "--sitename", dest="sitename",
help="The name of the site where the job is to be run", metavar="SITENAME")
parser.add_option("-w", "--pilotserver", dest="pilotserver",
help="The URL of the pilot TCP server (localhost) WILL BE RETIRED", metavar="PILOTSERVER")
parser.add_option("-p", "--pilotport", dest="pilotport",
help="Pilot TCP server port (default: 88888)", metavar="PORT")
parser.add_option("-t", "--proxycheckflag", dest="proxycheckFlag",
help="True (default): perform proxy validity checks, False: no check", metavar="PROXYCHECKFLAG")
parser.add_option("-x", "--stageinretries", dest="stageinretry",
help="The number of stage-in retries", metavar="STAGEINRETRY")
#parser.add_option("-B", "--filecatalogregistration", dest="fileCatalogRegistration",
# help="True (default): perform file catalog registration, False: no catalog registration", metavar="FILECATALOGREGISTRATION")
parser.add_option("-E", "--stageoutretries", dest="stageoutretry",
help="The number of stage-out retries", metavar="STAGEOUTRETRY")
parser.add_option("-F", "--experiment", dest="experiment",
help="Current experiment (default: ATLAS)", metavar="EXPERIMENT")
parser.add_option("-R", "--recovery", dest="recovery",
help="Run in recovery mode", metavar="RECOVERY")
parser.add_option("-S", "--jobStateFile", dest="jobStateFile",
help="Job State File", metavar="JOBSTATEFILE")
parser.add_option("-N", "--yodaNodes", dest="yodaNodes",
help="Maximum nodes Yoda starts with", metavar="YODANODES")
parser.add_option("-Q", "--yodaQueue", dest="yodaQueue",
help="The queue yoda will be send to", metavar="YODAQUEUE")
parser.add_option("-H", "--cache", dest="cache",
help="Cache URL", metavar="CACHE")
parser.add_option("-W", "--pandaserver", dest="pandaserver",
help="The full URL of the PanDA server (incl. port)", metavar="PANDASERVER")
# options = {'experiment': 'ATLAS'}
try:
(options, args) = parser.parse_args()
except Exception,e:
tolog("!!WARNING!!3333!! Exception caught:" % (e))
print options.experiment
else:
if options.appdir:
# self.__appdir = options.appdir
appdir = options.appdir
if options.experiment:
self.__experiment = options.experiment
if options.logguid:
self.__logguid = options.logguid
if options.inputDir:
self.__inputDir = options.inputDir
if options.pilot_initdir:
self.__pilot_initdir = options.pilot_initdir
if options.pilotlogfilename:
self.__pilotlogfilename = options.pilotlogfilename
if options.pilotserver:
self.__pilotserver = options.pilotserver
if options.pandaserver:
self.__pandaserver = options.pandaserver
if options.proxycheckFlag:
if options.proxycheckFlag.lower() == "false":
self.__proxycheckFlag = False
else:
self.__proxycheckFlag = True
else:
self.__proxycheckFlag = True
if options.pworkdir:
self.__pworkdir = options.pworkdir
if options.outputDir:
self.__outputDir = options.outputDir
if options.pilotport:
try:
self.__pilotport = int(options.pilotport)
except Exception, e:
tolog("!!WARNING!!3232!! Exception caught: %s" % (e))
# self.__queuename is not needed
if options.queuename:
queuename = options.queuename
if options.sitename:
sitename = options.sitename
if options.stageinretry:
try:
self.__stageinretry = int(options.stageinretry)
except Exception, e:
tolog("!!WARNING!!3232!! Exception caught: %s" % (e))
if options.stageoutretry:
try:
self.__stageoutretry = int(options.stageoutretry)
except Exception, e:
tolog("!!WARNING!!3232!! Exception caught: %s" % (e))
if options.workdir:
workdir = options.workdir
if options.cache:
self.__cache = options.cache
self.__recovery = options.recovery
self.__jobStateFile = options.jobStateFile
if options.yodaNodes:
self.__yodaNodes = options.yodaNodes
if options.yodaQueue:
self.__yodaQueue = options.yodaQueue
return sitename, appdir, workdir, queuename
def getRunJobFileName(self):
""" Return the filename of the module """
fullpath = sys.modules[self.__module__].__file__
# Note: the filename above will contain both full path, and might end with .pyc, fix this
filename = os.path.basename(fullpath)
if filename.endswith(".pyc"):
filename = filename[:-1] # remove the trailing 'c'
return filename
def allowLoopingJobKiller(self):
""" Should the pilot search for looping jobs? """
# The pilot has the ability to monitor the payload work directory. If there are no updated files within a certain
# time limit, the pilot will consider the as stuck (looping) and will kill it. The looping time limits are set
# in environment.py (see e.g. loopingLimitDefaultProd)
return True
def cleanup(self, job, rf=None):
""" Cleanup function """
# 'rf' is a list that will contain the names of the files that could be transferred
# In case of transfer problems, all remaining files will be found and moved
# to the data directory for later recovery.
try:
if int(job.result[1]) > 0 and (job.result[2] is None or job.result[2] == '' or int(job.result[2]) == 0):
job.result[2] = PilotErrors.ERR_RUNJOBEXC
except:
tolog(traceback.format_exc())
tolog("********************************************************")
tolog(" This job ended with (trf,pilot) exit code of (%d,%d)" % (job.result[1], job.result[2]))
tolog("********************************************************")
# clean up the pilot wrapper modules
pUtil.removePyModules(job.workdir)
if os.path.isdir(job.workdir):
os.chdir(job.workdir)
# remove input files from the job workdir
remFiles = job.inFiles
for inf in remFiles:
if inf and inf != 'NULL' and os.path.isfile("%s/%s" % (job.workdir, inf)): # non-empty string and not NULL
try:
os.remove("%s/%s" % (job.workdir, inf))
except Exception,e:
tolog("!!WARNING!!3000!! Ignore this Exception when deleting file %s: %s" % (inf, str(e)))
pass
# only remove output files if status is not 'holding'
# in which case the files should be saved for the job recovery.
# the job itself must also have finished with a zero trf error code
# (data will be moved to another directory to keep it out of the log file)
# always copy the metadata-<jobId>.xml to the site work dir
# WARNING: this metadata file might contain info about files that were not successfully moved to the SE
# it will be regenerated by the job recovery for the cases where there are output files in the datadir
try:
tolog('job.workdir is %s pworkdir is %s ' % (job.workdir, self.__pworkdir))
copy2("%s/metadata-%s.xml" % (job.workdir, job.jobId), "%s/metadata-%s.xml" % (self.__pworkdir, job.jobId))
except Exception, e:
tolog("Warning: Could not copy metadata-%s.xml to site work dir - ddm Adder problems will occure in case of job recovery" % (job.jobId))
tolog('job.workdir is %s pworkdir is %s ' % (job.workdir, self.__pworkdir))
if job.result[0] == 'holding' and job.result[1] == 0:
try:
# create the data directory
os.makedirs(job.datadir)
except OSError, e:
tolog("!!WARNING!!3000!! Could not create data directory: %s, %s" % (job.datadir, str(e)))
else:
# find all remaining files in case 'rf' is not empty
remaining_files = []
moved_files_list = []
try:
if rf != None:
moved_files_list = RunJobUtilities.getFileNamesFromString(rf[1])
remaining_files = RunJobUtilities.getRemainingFiles(moved_files_list, job.outFiles)
except Exception, e:
tolog("!!WARNING!!3000!! Illegal return value from Mover: %s, %s" % (str(rf), str(e)))
remaining_files = job.outFiles
# move all remaining output files to the data directory
nr_moved = 0
for _file in remaining_files:
try:
os.system("mv %s %s" % (_file, job.datadir))
except OSError, e:
tolog("!!WARNING!!3000!! Failed to move file %s (abort all)" % (_file))
break
else:
nr_moved += 1
tolog("Moved %d/%d output file(s) to: %s" % (nr_moved, len(remaining_files), job.datadir))
# remove all successfully copied files from the local directory
nr_removed = 0
for _file in moved_files_list:
try:
os.system("rm %s" % (_file))
except OSError, e:
tolog("!!WARNING!!3000!! Failed to remove output file: %s, %s" % (_file, e))
else:
nr_removed += 1
tolog("Removed %d output file(s) from local dir" % (nr_removed))
# copy the PoolFileCatalog.xml for non build jobs
if not pUtil.isBuildJob(remaining_files):
_fname = os.path.join(job.workdir, "PoolFileCatalog.xml")
tolog("Copying %s to %s" % (_fname, job.datadir))
try:
copy2(_fname, job.datadir)
except Exception, e:
tolog("!!WARNING!!3000!! Could not copy PoolFileCatalog.xml to data dir - expect ddm Adder problems during job recovery")
# remove all remaining output files from the work directory
# (a successfully copied file should already have been removed by the Mover)
rem = False
for inf in job.outFiles:
if inf and inf != 'NULL' and os.path.isfile("%s/%s" % (job.workdir, inf)): # non-empty string and not NULL
try:
os.remove("%s/%s" % (job.workdir, inf))
except Exception,e:
tolog("!!WARNING!!3000!! Ignore this Exception when deleting file %s: %s" % (inf, str(e)))
pass
else:
tolog("Lingering output file removed: %s" % (inf))
rem = True
if not rem:
tolog("All output files already removed from local dir")
tolog("Payload cleanup has finished")
def sysExit(self, job, rf=None):
'''
wrapper around sys.exit
rs is the return string from Mover::put containing a list of files that were not transferred
'''
self.cleanup(job, rf=rf)
sys.stderr.close()
tolog("RunJob (payload wrapper) has finished")
# change to sys.exit?
os._exit(job.result[2]) # pilotExitCode, don't confuse this with the overall pilot exit code,
# which doesn't get reported back to panda server anyway
def failJob(self, transExitCode, pilotExitCode, job, ins=None, pilotErrorDiag=None, docleanup=True):
""" set the fail code and exit """
if docleanup:
self.cleanup(job, rf=None)
if job.eventServiceMerge:
if self.corruptedFiles:
job.corruptedFiles = ','.join([e['lfn'] for e in self.corruptedFiles])
job.result[2] = self.corruptedFiles[0]['status_code']
else:
pilotExitCode = PilotErrors.ERR_ESRECOVERABLE
job.setState(["failed", transExitCode, pilotExitCode])
if pilotErrorDiag:
job.pilotErrorDiag = pilotErrorDiag
tolog("Will now update local pilot TCP server")
rt = RunJobUtilities.updatePilotServer(job, self.__pilotserver, self.__pilotport, final=True)
if ins:
ec = pUtil.removeFiles(job.workdir, ins)
if docleanup:
self.sysExit(job)
def isMultiTrf(self, parameterList):
""" Will we execute multiple jobs? """
if len(parameterList) > 1:
multi_trf = True
else:
multi_trf = False
return multi_trf
def setup(self, job, jobSite, thisExperiment):
""" prepare the setup and get the run command list """
# start setup time counter
t0 = time.time()
ec = 0
runCommandList = []
# split up the job parameters to be able to loop over the tasks
jobParameterList = job.jobPars.split("\n")
jobHomePackageList = job.homePackage.split("\n")
jobTrfList = job.trf.split("\n")
job.release = thisExperiment.formatReleaseString(job.release)
releaseList = thisExperiment.getRelease(job.release)
tolog("Number of transformations to process: %s" % len(jobParameterList))
multi_trf = self.isMultiTrf(jobParameterList)
# verify that the multi-trf job is setup properly
ec, job.pilotErrorDiag, releaseList = RunJobUtilities.verifyMultiTrf(jobParameterList, jobHomePackageList, jobTrfList, releaseList)
if ec > 0:
return ec, runCommandList, job, multi_trf
os.chdir(jobSite.workdir)
tolog("Current job workdir is %s" % os.getcwd())
# setup the trf(s)
_i = 0
_stdout = job.stdout
_stderr = job.stderr
_first = True
for (_jobPars, _homepackage, _trf, _swRelease) in map(None, jobParameterList, jobHomePackageList, jobTrfList, releaseList):
tolog("Preparing setup %d/%d" % (_i + 1, len(jobParameterList)))
# reset variables
job.jobPars = _jobPars
job.homePackage = _homepackage
job.trf = _trf
job.release = _swRelease
if multi_trf:
job.stdout = _stdout.replace(".txt", "_%d.txt" % (_i + 1))
job.stderr = _stderr.replace(".txt", "_%d.txt" % (_i + 1))
# post process copysetup variable in case of directIn/useFileStager
_copysetup = readpar('copysetup')
_copysetupin = readpar('copysetupin')
if "--directIn" in job.jobPars or "--useFileStager" in job.jobPars or _copysetup.count('^') == 5 or _copysetupin.count('^') == 5:
# only need to update the queuedata file once
if _first:
RunJobUtilities.updateCopysetups(job.jobPars)
_first = False
# setup the trf
ec, job.pilotErrorDiag, cmd, job.spsetup, job.JEM, job.cmtconfig = thisExperiment.getJobExecutionCommand(job, jobSite, self.__pilot_initdir)
if ec > 0:
# setup failed
break
# add the setup command to the command list
runCommandList.append(cmd)
_i += 1
job.stdout = _stdout
job.stderr = _stderr
job.timeSetup = int(time.time() - t0)
tolog("Total setup time: %d s" % (job.timeSetup))
return ec, runCommandList, job, multi_trf
def stageIn_new(self,
job,
jobSite,
analysisJob=None, # not used: job.isAnalysisJob() should be used instead
files=None,
pfc_name="PoolFileCatalog.xml"):
"""
Perform the stage-in
Do transfer input files
new site movers based implementation workflow
"""
tolog("Preparing for get command [stageIn_new]")
infiles = [e.lfn for e in job.inData]
tolog("Input file(s): (%s in total)" % len(infiles))
for ind, lfn in enumerate(infiles, 1):
tolog("%s. %s" % (ind, lfn))
if not infiles:
tolog("No input files for this job .. skip stage-in")
return job, infiles, None, False
t0 = os.times()
job.result[2], job.pilotErrorDiag, _dummy, FAX_dictionary = mover.get_data_new(job, jobSite, stageinTries=self.__stageinretry, proxycheck=False, workDir=self.__pworkdir, pfc_name=pfc_name, files=files)
t1 = os.times()
# record failed stagein files
for e in job.inData:
if e.status == 'error':
failed_file = {'lfn': e.lfn, 'status': e.status, 'status_code': e.status_code, 'status_message': e.status_message}
self.corruptedFiles.append(failed_file)
job.timeStageIn = int(round(t1[4] - t0[4]))
usedFAXandDirectIO = FAX_dictionary.get('usedFAXandDirectIO', False)
statusPFCTurl = None
return job, infiles, statusPFCTurl, usedFAXandDirectIO
@mover.use_newmover(stageIn_new)
def stageIn(self, job, jobSite, analysisJob, pfc_name="PoolFileCatalog.xml", prefetcher=False):
""" Perform the stage-in """
ec = 0
statusPFCTurl = None
usedFAXandDirectIO = False
# Prepare the input files (remove non-valid names) if there are any
ins, job.filesizeIn, job.checksumIn = RunJobUtilities.prepareInFiles(job.inFiles, job.filesizeIn, job.checksumIn)
if ins and not prefetcher:
tolog("Preparing for get command")
# Get the file access info (only useCT is needed here)
si = getSiteInformation(self.getExperiment())
useCT, oldPrefix, newPrefix = si.getFileAccessInfo(job.transferType)
# Transfer input files
tin_0 = os.times()
ec, job.pilotErrorDiag, statusPFCTurl, FAX_dictionary = \
mover.get_data(job, jobSite, ins, self.__stageinretry, analysisJob=analysisJob, usect=useCT,\
pinitdir=self.__pilot_initdir, proxycheck=False, inputDir=self.__inputDir, workDir=self.__pworkdir, pfc_name=pfc_name)
if ec != 0:
job.result[2] = ec
tin_1 = os.times()
job.timeStageIn = int(round(tin_1[4] - tin_0[4]))
# Extract any FAX info from the dictionary
job.filesWithoutFAX = FAX_dictionary.get('N_filesWithoutFAX', 0)
job.filesWithFAX = FAX_dictionary.get('N_filesWithFAX', 0)
job.bytesWithoutFAX = FAX_dictionary.get('bytesWithoutFAX', 0)
job.bytesWithFAX = FAX_dictionary.get('bytesWithFAX', 0)
usedFAXandDirectIO = FAX_dictionary.get('usedFAXandDirectIO', False)
elif prefetcher:
tolog("No need to stage in files since prefetcher will be used")
return job, ins, statusPFCTurl, usedFAXandDirectIO
def getTrfExitInfo(self, exitCode, workdir):
""" Get the trf exit code and info from job report if possible """
exitAcronym = ""
exitMsg = ""
# does the job report exist?
extension = getExtension(alternative='pickle')
if extension.lower() == "json":
_filename = "jobReport.%s" % (extension)
else:
_filename = "jobReportExtract.%s" % (extension)
filename = os.path.join(workdir, _filename)
if os.path.exists(filename):
tolog("Found job report: %s" % (filename))
# wait a few seconds to make sure the job report is finished
tolog("Taking a 5s nap to make sure the job report is finished")
time.sleep(5)
# first backup the jobReport to the job workdir since it will be needed later
# (the current location will disappear since it will be tarred up in the jobs' log file)
d = os.path.join(workdir, '..')
try:
copy2(filename, os.path.join(d, _filename))
except Exception, e:
tolog("Warning: Could not backup %s to %s: %s" % (_filename, d, e))
else:
tolog("Backed up %s to %s" % (_filename, d))
# search for the exit code
try:
f = open(filename, "r")
except Exception, e:
tolog("!!WARNING!!1112!! Failed to open job report: %s" % (e))
else:
if extension.lower() == "json":
from json import load
else:
from pickle import load
data = load(f)
# extract the exit code and info
_exitCode = self.extractDictionaryObject("exitCode", data)
if _exitCode:
if _exitCode == 0 and exitCode != 0:
tolog("!!WARNING!!1111!! Detected inconsistency in %s: exitcode listed as 0 but original trf exit code was %d (using original error code)" %\
(filename, exitCode))
else:
exitCode = _exitCode
_exitAcronym = self.extractDictionaryObject("exitAcronym", data)
if _exitAcronym:
exitAcronym = _exitAcronym
_exitMsg = self.extractDictionaryObject("exitMsg", data)
if _exitMsg:
exitMsg = _exitMsg
f.close()
tolog("Trf exited with:")
tolog("...exitCode=%d" % (exitCode))
tolog("...exitAcronym=%s" % (exitAcronym))
tolog("...exitMsg=%s" % (exitMsg))
else:
tolog("Job report not found: %s" % (filename))
return exitCode, exitAcronym, exitMsg
def extractDictionaryObject(self, obj, dictionary):
""" Extract an object from a dictionary """
_obj = None
try:
_obj = dictionary[obj]
except Exception, e:
tolog("Object %s not found in dictionary" % (obj))
else:
tolog('Extracted \"%s\"=%s from dictionary' % (obj, _obj))
return _obj
def getUtilitySubprocess(self, thisExperiment, cmd, pid, job):
""" Return/execute the utility subprocess if required """
utility_subprocess = None
if thisExperiment.shouldExecuteUtility():
try:
mem_cmd = thisExperiment.getUtilityCommand(job_command=cmd, pid=pid, release=job.release, homePackage=job.homePackage, cmtconfig=job.cmtconfig, trf=job.trf, workdir=job.workdir)
if mem_cmd != "":
utility_subprocess = self.getSubprocess(thisExperiment, mem_cmd)
if utility_subprocess:
try:
tolog("Process id of utility: %d" % (utility_subprocess.pid))
except Exception, e:
tolog("!!WARNING!!3436!! Exception caught: %s" % (e))
else:
tolog("Could not launch utility since the command path does not exist")
except Exception, e:
tolog("!!WARNING!!5454!! Exception caught: %s" % (e))
utility_subprocess = None
else:
tolog("Not required to run utility")
return utility_subprocess
def getBenchmarkSubprocess(self, node, coreCount, workdir, sitename):
""" Return/execute the benchmark subprocess if required """
# Output json: /tmp/cern-benchmark_$USER/bmk_tmp/result_profile.json
benchmark_subprocess = None
# run benchmark test if required by experiment site information object
si = getSiteInformation(self.getExperiment())
if si.shouldExecuteBenchmark():
thisExperiment = getExperiment(self.getExperiment())
cmd = si.getBenchmarkCommand(cloud=readpar('cloud'), cores=coreCount, workdir=workdir)
benchmark_subprocess = self.getSubprocess(thisExperiment, cmd)
if benchmark_subprocess:
try:
tolog("Process id of benchmark suite: %d" % (benchmark_subprocess.pid))
except Exception, e:
tolog("!!WARNING!!3436!! Exception caught: %s" % (e))
else:
tolog("Not required to run the benchmark suite")
return benchmark_subprocess
def isDirectAccess(self, analysisJob, transferType=None):
""" determine if direct access should be used """
directIn, directInType = getDirectAccess()
if not analysisJob and transferType and transferType != "direct":
directIn = False
return directIn
def replaceLFNsWithTURLs(self, cmd, fname, inFiles, workdir, writetofile=""):
"""
Replace all LFNs with full TURLs.
This function is used with direct access. Athena requires a full TURL instead of LFN.
"""
tolog("inside replaceLFNsWithTURLs()")
turl_dictionary = {} # { LFN: TURL, ..}
if os.path.exists(fname):
file_info_dictionary = mover.getFileInfoDictionaryFromXML(fname)
tolog("file_info_dictionary=%s" % file_info_dictionary)
for inputFile in inFiles:
if inputFile in file_info_dictionary:
turl = file_info_dictionary[inputFile][0]
turl_dictionary[inputFile] = turl
if inputFile in cmd:
if turl.startswith('root://') and turl not in cmd:
cmd = cmd.replace(inputFile, turl)
tolog("Replaced '%s' with '%s' in the run command" % (inputFile, turl))
else:
tolog("!!WARNING!!3434!! inputFile=%s not in dictionary=%s" % (inputFile, file_info_dictionary))
tolog("writetofile=%s" % writetofile)
tolog("turl_dictionary=%s" % turl_dictionary)
# replace the LFNs with TURLs in the writeToFile input file list (if it exists)
if writetofile and turl_dictionary:
filenames = getWriteToInputFilenames(writetofile)
tolog("filenames=%s" % filenames)
for fname in filenames:
new_lines = []
path = os.path.join(workdir, fname)
if os.path.exists(path):
f = readFile(path)
tolog("readFile=%s" % f)
for line in f.split('\n'):
fname = os.path.basename(line)
if fname in turl_dictionary:
turl = turl_dictionary[fname]
new_lines.append(turl)
else:
if line:
new_lines.append(line)
lines = '\n'.join(new_lines)
if lines:
writeFile(path, lines)
tolog("lines=%s" % lines)
else:
tolog("!!WARNING!!4546!! File does not exist: %s" % path)
else:
tolog("!!WARNING!!4545!! Could not find file: %s (cannot locate TURLs for direct access)" % fname)
return cmd
def executePayload(self, thisExperiment, runCommandList, job):
""" execute the payload """
# do not hide the proxy for PandaMover since it needs it or for sites that has sc.proxy = donothide
# if 'DDM' not in jobSite.sitename and readpar('proxy') != 'donothide':
# # create the proxy guard object (must be created here before the sig2exc())
# proxyguard = ProxyGuard()
#
# # hide the proxy
# hP_ret = proxyguard.hideProxy()
# if not hP_ret:
# tolog("Warning: Proxy exposed to payload")
# If clone job, make sure that the events should be processed
if job.cloneJob == "runonce":
try:
# If the event is still available, the go ahead and run the payload
message = downloadEventRanges(job.jobId, job.jobsetID, job.taskID, url=self.__pandaserver)
# Create a list of event ranges from the downloaded message
event_ranges = self.extractEventRanges(message)
# Are there any event ranges?
if event_ranges == []:
tolog("!!WARNING!!2424!! This clone job was already executed")
exitMsg = "Already executed clone job"
res_tuple = (1, exitMsg)
res = (res_tuple[0], res_tuple[1], exitMsg)
job.result[0] = exitMsg
job.result[1] = 0 # transExitCode
job.result[2] = self.__error.ERR_EXECUTEDCLONEJOB # Pilot error code
return res, job, False, 0
else:
tolog("Ok to execute clone job")
except Exception, e:
tolog("!1WARNING!!2323!! Exception caught: %s" % (e))
# Run the payload process, which could take days to finish
t0 = os.times()
path = os.path.join(job.workdir, 't0_times.txt')
if writeFile(path, str(t0)):
tolog("Wrote %s to file %s" % (str(t0), path))
else:
tolog("!!WARNING!!3344!! Failed to write t0 to file, will not be able to calculate CPU consumption time on the fly")
res_tuple = (0, 'Undefined')
multi_trf = self.isMultiTrf(runCommandList)
_stdout = job.stdout
_stderr = job.stderr
# Loop over all run commands (only >1 for multi-trfs)
current_job_number = 0
getstatusoutput_was_interrupted = False
number_of_jobs = len(runCommandList)
for cmd in runCommandList:
current_job_number += 1
# Create the stdout/err files
if multi_trf:
job.stdout = _stdout.replace(".txt", "_%d.txt" % (current_job_number))
job.stderr = _stderr.replace(".txt", "_%d.txt" % (current_job_number))
file_stdout, file_stderr = self.getStdoutStderrFileObjects(stdoutName=job.stdout, stderrName=job.stderr)
if not (file_stdout and file_stderr):
res_tuple = (1, "Could not open stdout/stderr files, piping not possible")
tolog("!!WARNING!!2222!! %s" % (res_tuple[1]))
break
try:
# Add the full job command to the job_setup.sh file
to_script = cmd.replace(";", ";\n")
thisExperiment.updateJobSetupScript(job.workdir, to_script=to_script)
# For direct access in prod jobs, we need to substitute the input file names with the corresponding TURLs
try:
analysisJob = job.isAnalysisJob()
directIn = self.isDirectAccess(analysisJob, transferType=job.transferType)
tolog("analysisJob=%s" % analysisJob)
tolog("directIn=%s" % directIn)
if not analysisJob and directIn:
# replace the LFNs with TURLs in the job command
# (and update the writeToFile input file list if it exists)
_fname = os.path.join(job.workdir, "PoolFileCatalog.xml")
cmd = self.replaceLFNsWithTURLs(cmd, _fname, job.inFiles, job.workdir, writetofile=job.writetofile)
except Exception, e:
tolog("Caught exception: %s" % e)
tolog("Executing job command %d/%d" % (current_job_number, number_of_jobs))
# Hack to replace Archive_tf
# if job.trf == 'Archive_tf.py' or job.trf == 'Dummy_tf.py':