forked from dmwm/WMCore
-
Notifications
You must be signed in to change notification settings - Fork 3
/
setup_test.py
719 lines (623 loc) · 27.1 KB
/
setup_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
from distutils.core import Command
from unittest import TextTestRunner, TestLoader, TestSuite
from setup_build import get_path_to_wmcore_root
from glob import glob
from os.path import splitext, basename, join as pjoin, walk
from ConfigParser import ConfigParser, NoOptionError
import os, sys, os.path
import atexit, signal
import unittest
import time
import pickle
import threading
import hashlib
# pylint and coverage aren't standard, but aren't strictly necessary
# you should get them though
can_lint = False
can_coverage = False
can_nose = False
try:
from pylint.lint import Run
from pylint.lint import preprocess_options, cb_init_hook
from pylint import checkers
can_lint = True
except:
pass
try:
import coverage
can_coverage = True
except:
pass
can_coverage = False
try:
import nose
from nose.plugins import Plugin, PluginTester
from nose.plugins.attrib import AttributeSelector
import nose.failure
import nose.case
can_nose = True
except:
pass
def generate_filelist(basepath=None, recurse=True, ignore=False):
"""
Recursively get a list of files to test/lint
"""
if basepath:
walkpath = os.path.join(get_path_to_wmcore_root(), 'src/python', basepath)
else:
walkpath = os.path.join(get_path_to_wmcore_root(), 'src/python')
files = []
if walkpath.endswith('.py'):
if ignore and walkpath.endswith(ignore):
files.append(walkpath)
else:
for dirpath, dirnames, filenames in os.walk(walkpath):
# skipping CVS directories and their contents
pathelements = dirpath.split('/')
result = []
if not 'CVS' in pathelements:
# to build up a list of file names which contain tests
for file in filenames:
if file.endswith('.py'):
filepath = '/'.join([dirpath, file])
files.append(filepath)
if len(files) == 0 and recurse:
files = generate_filelist(basepath + '.py', not recurse)
return files
if can_nose:
def get_subpackages(dir, prefix = ""):
if prefix.startswith('.'):
prefix = prefix[1:]
result=[]
for subdir in os.listdir(dir):
if os.path.isdir(os.path.join(dir, subdir)):
result.extend( get_subpackages(os.path.join(dir,subdir), prefix + "." + subdir) )
else:
# should be a file
if subdir.endswith('.py'):
if subdir.startswith('__init__.py'):
continue
result.append( prefix + "."+ subdir[:-3])
return result
def trapExit( code ):
"""
Cherrypy likes to call os._exit() which causes the interpreter to
bomb without a chance of catching it. This sucks. This function
will replace os._exit() and throw an exception instead
"""
if hasattr( threading.local(), "isMain" ) and threading.local().isMain:
# The main thread should raise an exception
sys.stderr.write("*******EXIT WAS TRAPPED**********\n")
raise RuntimeError, "os._exit() was called in the main thread"
else:
# os._exit on child threads should just blow away the thread
raise SystemExit, "os._exit() was called in a child thread. " +\
"Protecting the interpreter and trapping it"
class DetailedOutputter(Plugin):
name = "detailed"
def __init__(self):
pass
def setOutputStream(self, stream):
"""Get handle on output stream so the plugin can print id #s
"""
self.stream = stream
def beforeTest(self, test):
if ( test.id().startswith('nose.failure') ):
pass
#Plugin.beforeTest(self, test)
else:
self.stream.write('%s -- ' % (test.id(), ))
def configure(self, options, conf):
"""
Configure plugin. Skip plugin is enabled by default.
"""
self.enabled = True
class TestCommand(Command):
"""Runs our test suite"""
# WARNING WARNING WARNING
# if you set this to true, I will really delete your database
# after every test
# WARNING WARNING WARNING
user_options = [('reallyDeleteMyDatabaseAfterEveryTest=',
None,
'If you set this I WILL DELETE YOUR DATABASE AFTER EVERY TEST. DO NOT RUN ON A PRODUCTION SYSTEM'),
('buildBotMode=',
None,
'Are we running inside buildbot?'),
('workerNodeTestsOnly=',
None,
"Are we testing WN functionality? (Only runs WN-specific tests)"),
('testCertainPath=',
None,
"Only runs tests below a certain path (i.e. test/python searches the whole test tree"),
('quickTestMode=',
None,
"Fails on the first error, doesn't compute coverage"),
('testTotalSlices=',
None,
"Number of ways to split up the test suite"),
('testCurrentSlice=',
None,
"Which slice to run (zero-based index)"),
('testingRoot=',
None,
"Primarily used by buildbot. Gives the path to the root of the test tree (i.e. the directory with WMCore_t)"),
('testMinimumIndex=',
None,
"The minimum ID to be executed (advanced use)"),
('testMaximumIndex=',
None,
"The maximum ID to be executed (advanced use)")
]
def initialize_options(self):
self.reallyDeleteMyDatabaseAfterEveryTest = False
self.buildBotMode = False
self.workerNodeTestsOnly = False
self.testCertainPath = False
self.quickTestMode = False
self.testingRoot = "test/python"
self.testTotalSlices = 1
self.testCurrentSlice = 0
self.testMinimumIndex = 0
self.testMaximumIndex = 9999999
pass
def finalize_options(self):
pass
def callNose( self, args, paths ):
# let people specify more than one path
pathList = paths.split(':')
# sometimes this doesn't get removed
if os.path.exists('.noseids'):
os.unlink('.noseids')
# run once to get splits
collectOnlyArgs = args[:]
collectOnlyArgs.extend([ '-q', '--collect-only', '--with-id' ])
retval = nose.run(argv=collectOnlyArgs, addplugins=[DetailedOutputter()])
if not retval:
print "Failed to collect TestCase IDs"
return retval
idhandle = open( ".noseids", "r" )
testIds = pickle.load(idhandle)['ids']
idhandle.close()
if os.path.exists("nosetests.xml"):
os.unlink("nosetests.xml")
print "path lists is %s" % pathList
# divide it up
totalCases = len(testIds)
myIds = []
for id in testIds.keys():
if int(id) >= int(self.testMinimumIndex) and int(id) <= int(self.testMaximumIndex):
# generate a stable ID for sorting
if len(testIds[id]) == 3:
testName = "%s%s" % (testIds[id][1], testIds[id][2])
testHash = hashlib.md5( testName ).hexdigest()
hashSnip = testHash[:7]
hashInt = int( hashSnip, 16 )
else:
hashInt = id
if ( hashInt % int(self.testTotalSlices) ) == int(self.testCurrentSlice):
for path in pathList:
if path in testIds[id][0]:
myIds.append( str(id) )
break
myIds = sorted( myIds )
print "Out of %s cases, we will run %s" % (totalCases, len(myIds))
if not myIds:
return True
args.extend(['-v', '--with-id'])
args.extend(myIds)
return nose.run( argv=args )
def run(self):
# trap os._exit
os.DMWM_REAL_EXIT = os._exit
os._exit = trapExit
threading.local().isMain = True
testPath = 'test/python'
if self.testCertainPath:
print "Using the tests below: %s" % self.testCertainPath
testPath = self.testCertainPath
else:
print "Nose is scanning all tests"
if self.quickTestMode:
quickTestArg = ['--stop']
else:
quickTestArg = []
if self.reallyDeleteMyDatabaseAfterEveryTest:
print "#### WE ARE DELETING YOUR DATABASE. 3 SECONDS TO CANCEL ####"
print "#### buildbotmode is %s" % self.buildBotMode
sys.stdout.flush()
import WMQuality.TestInit
WMQuality.TestInit.deleteDatabaseAfterEveryTest( "I'm Serious" )
time.sleep(4)
if self.workerNodeTestsOnly:
args = [__file__,'--with-xunit', '-m', '(_t.py$)|(_t$)|(^test)','-a','workerNodeTest',self.testingRoot]
args.extend( quickTestArg )
retval = self.callNose(args, paths = testPath)
elif not self.buildBotMode:
args = [__file__,'--with-xunit', '-m', '(_t.py$)|(_t$)|(^test)', '-a', '!workerNodeTest',self.testingRoot]
args.extend( quickTestArg )
retval = self.callNose(args, paths = testPath)
else:
print "### We are in buildbot mode ###"
srcRoot = os.path.join(os.path.normpath(os.path.dirname(__file__)), 'src', 'python')
modulesToCover = []
modulesToCover.extend(get_subpackages(os.path.join(srcRoot,'WMCore'), 'WMCore'))
modulesToCover.extend(get_subpackages(os.path.join(srcRoot,'WMComponent'), 'WMComponent'))
modulesToCover.extend(get_subpackages(os.path.join(srcRoot,'WMQuality'), 'WMQuality'))
moduleList = ",".join(modulesToCover)
sys.stdout.flush()
if not quickTestArg:
retval = self.callNose([__file__,'--with-xunit', '-m', '(_t.py$)|(_t$)|(^test)','-a',
'!workerNodeTest,!integration,!performance,!lifecycle,!__integration__,!__performance__,!__lifecycle__',
'--with-coverage','--cover-html','--cover-html-dir=coverageHtml','--cover-erase',
'--cover-package=' + moduleList, '--cover-inclusive',
self.testingRoot],
paths = testPath)
else:
retval = self.callNose([__file__,'--with-xunit', '-m', '(_t.py$)|(_t$)|(^test)','-a',
'!workerNodeTest,!integration,!performance,!lifecycle,!__integration__,!__performance__,!__lifecycle__',
'--stop', self.testingRoot],
paths = testPath)
threadCount = len(threading.enumerate())
# Set the signal handler and a 20-second alarm
def signal_handler( foo, bar ):
sys.stderr.write("Timeout reached trying to shut down. Force killing...\n")
sys.stderr.flush()
if retval:
os.DMWM_REAL_EXIT( 0 )
else:
os.DMWM_REAL_EXIT( 1 )
signal.signal(signal.SIGALRM, signal_handler )
signal.alarm(20)
marker = open("nose-marker.txt", "w")
marker.write("Ready to be slayed\n")
marker.flush()
marker.close()
if threadCount > 1:
import cherrypy
sys.stderr.write("There are %s threads running. Cherrypy may be acting up.\n" % len(threading.enumerate()))
sys.stderr.write("The threads are: \n%s\n" % threading.enumerate())
atexit.register(cherrypy.engine.stop)
cherrypy.engine.exit()
sys.stderr.write("Asked cherrypy politely to commit suicide\n")
sys.stderr.write("Now there are %s threads running\n" % len(threading.enumerate()))
sys.stderr.write("The threads are: \n%s\n" % threading.enumerate())
threadCount = len(threading.enumerate())
print "Testing complete, there are now %s threads" % len(threading.enumerate())
# try to exit
if retval:
sys.exit( 0 )
else:
sys.exit( 1 )
# if we got here, then sys.exit got cancelled by the alarm...
sys.stderr.write("Failed to exit after 30 secs...something hung\n")
sys.stderr.write("Forcing process to die")
os.DMWM_REAL_EXIT()
else:
class TestCommand(Command):
user_options = [ ]
def run(self):
print "Nose isn't installed. You must install the nose package to run tests (easy_install nose might do it)"
sys.exit(1)
pass
def initialize_options(self):
pass
def finalize_options(self):
pass
pass
if can_lint:
class LinterRun(Run):
def __init__(self, args, reporter=None):
self._rcfile = None
self._plugins = []
preprocess_options(args, {
# option: (callback, takearg)
'rcfile': (self.cb_set_rcfile, True),
'load-plugins': (self.cb_add_plugins, True),
})
self.linter = linter = self.LinterClass((
('rcfile',
{'action' : 'callback', 'callback' : lambda *args: 1,
'type': 'string', 'metavar': '<file>',
'help' : 'Specify a configuration file.'}),
('init-hook',
{'action' : 'callback', 'type' : 'string', 'metavar': '<code>',
'callback' : cb_init_hook,
'help' : 'Python code to execute, usually for sys.path \
manipulation such as pygtk.require().'}),
('help-msg',
{'action' : 'callback', 'type' : 'string', 'metavar': '<msg-id>',
'callback' : self.cb_help_message,
'group': 'Commands',
'help' : '''Display a help message for the given message id and \
exit. The value may be a comma separated list of message ids.'''}),
('list-msgs',
{'action' : 'callback', 'metavar': '<msg-id>',
'callback' : self.cb_list_messages,
'group': 'Commands',
'help' : "Generate pylint's full documentation."}),
('generate-rcfile',
{'action' : 'callback', 'callback' : self.cb_generate_config,
'group': 'Commands',
'help' : '''Generate a sample configuration file according to \
the current configuration. You can put other options before this one to get \
them in the generated configuration.'''}),
('generate-man',
{'action' : 'callback', 'callback' : self.cb_generate_manpage,
'group': 'Commands',
'help' : "Generate pylint's man page.",'hide': 'True'}),
('errors-only',
{'action' : 'callback', 'callback' : self.cb_error_mode,
'short': 'e',
'help' : '''In error mode, checkers without error messages are \
disabled and for others, only the ERROR messages are displayed, and no reports \
are done by default'''}),
('profile',
{'type' : 'yn', 'metavar' : '<y_or_n>',
'default': False,
'help' : 'Profiled execution.'}),
), option_groups=self.option_groups,
reporter=reporter, pylintrc=self._rcfile)
# register standard checkers
checkers.initialize(linter)
# load command line plugins
linter.load_plugin_modules(self._plugins)
# read configuration
linter.disable_message('W0704')
linter.read_config_file()
# is there some additional plugins in the file configuration, in
config_parser = linter._config_parser
if config_parser.has_option('MASTER', 'load-plugins'):
plugins = splitstrip(config_parser.get('MASTER', 'load-plugins'))
linter.load_plugin_modules(plugins)
# now we can load file config and command line, plugins (which can
# provide options) have been registered
linter.load_config_file()
if reporter:
# if a custom reporter is provided as argument, it may be overriden
# by file parameters, so re-set it here, but before command line
# parsing so it's still overrideable by command line option
linter.set_reporter(reporter)
args = linter.load_command_line_configuration(args)
# insert current working directory to the python path to have a correct
# behaviour
sys.path.insert(0, os.getcwd())
if self.linter.config.profile:
print >> sys.stderr, '** profiled run'
from hotshot import Profile, stats
prof = Profile('stones.prof')
prof.runcall(linter.check, args)
prof.close()
data = stats.load('stones.prof')
data.strip_dirs()
data.sort_stats('time', 'calls')
data.print_stats(30)
sys.path.pop(0)
def cb_set_rcfile(self, name, value):
"""callback for option preprocessing (ie before optik parsing)"""
self._rcfile = value
def cb_add_plugins(self, name, value):
"""callback for option preprocessing (ie before optik parsing)"""
self._plugins.extend(splitstrip(value))
def cb_error_mode(self, *args, **kwargs):
"""error mode:
* checkers without error messages are disabled
* for others, only the ERROR messages are displayed
* disable reports
* do not save execution information
"""
self.linter.disable_noerror_checkers()
self.linter.set_option('disable-msg-cat', 'WCRFI')
self.linter.set_option('reports', False)
self.linter.set_option('persistent', False)
def cb_generate_config(self, *args, **kwargs):
"""optik callback for sample config file generation"""
self.linter.generate_config(skipsections=('COMMANDS',))
def cb_generate_manpage(self, *args, **kwargs):
"""optik callback for sample config file generation"""
from pylint import __pkginfo__
self.linter.generate_manpage(__pkginfo__)
def cb_help_message(self, option, opt_name, value, parser):
"""optik callback for printing some help about a particular message"""
self.linter.help_message(splitstrip(value))
def cb_list_messages(self, option, opt_name, value, parser):
"""optik callback for printing available messages"""
self.linter.list_messages()
else:
class LinterRun:
def __init__(self):
pass
def lint_score(stats, evaluation):
return eval(evaluation, {}, stats)
def lint_files(files, reports=False):
"""
lint a (list of) file(s) and return the results as a dictionary containing
filename : result_dict
"""
rcfile=os.path.join(get_path_to_wmcore_root(),'standards/.pylintrc')
arguements = ['--rcfile=%s' % rcfile, '--ignore=DefaultConfig.py']
if not reports:
arguements.append('-rn')
arguements.extend(files)
lntr = LinterRun(arguements)
results = {}
for file in files:
lntr.linter.check(file)
results[file] = {'stats': lntr.linter.stats,
'score': lint_score(lntr.linter.stats,
lntr.linter.config.evaluation)
}
if reports:
print '----------------------------------'
print 'Your code has been rated at %.2f/10' % \
lint_score(lntr.linter.stats, lntr.linter.config.evaluation)
return results, lntr.linter.config.evaluation
class LintCommand(Command):
description = "Lint all files in the src tree"
"""
TODO: better format the test results, get some global result, make output
more buildbot friendly.
"""
user_options = [ ('package=', 'p', 'package to lint, default to None'),
('report', 'r', 'return a detailed lint report, default False')]
def initialize_options(self):
self._dir = get_path_to_wmcore_root()
self.package = None
self.report = False
def finalize_options(self):
if self.report:
self.report = True
def run(self):
'''
Find the code and run lint on it
'''
if can_lint:
srcpypath = os.path.join(self._dir, 'src/python/')
sys.path.append(srcpypath)
files_to_lint = []
if self.package:
if self.package.endswith('.py'):
cnt = self.package.count('.') - 1
files_to_lint = generate_filelist(self.package.replace('.', '/', cnt), 'DeafultConfig.py')
else:
files_to_lint = generate_filelist(self.package.replace('.', '/'), 'DeafultConfig.py')
else:
files_to_lint = generate_filelist(ignore='DeafultConfig.py')
results, evaluation = lint_files(files_to_lint, self.report)
ln = len(results)
scr = 0
print
for k, v in results.items():
print "%s: %.2f/10" % (k.replace('src/python/', ''), v['score'])
scr += v['score']
if ln > 1:
print '--------------------------------------------------------'
print 'Average pylint score for %s is: %.2f/10' % (self.package,
scr/ln)
else:
print 'You need to install pylint before using the lint command'
class ReportCommand(Command):
description = "Generate a simple html report for ease of viewing in buildbot"
"""
To contain:
average lint score
% code coverage
list of classes missing tests
etc.
"""
user_options = [ ]
def initialize_options(self):
pass
def finalize_options(self):
pass
def run(self):
"""
run all the tests needed to generate the report and make an
html table
"""
files = generate_filelist()
error = 0
warning = 0
refactor = 0
convention = 0
statement = 0
srcpypath = '/'.join([get_path_to_wmcore_root(), 'src/python/'])
sys.path.append(srcpypath)
cfg = ConfigParser()
cfg.read('standards/.pylintrc')
# Supress stdout/stderr
sys.stderr = open('/dev/null', 'w')
sys.stdout = open('/dev/null', 'w')
# wrap it in an exception handler, otherwise we can't see why it fails
try:
# lint the code
for stats in lint_files(files):
error += stats['error']
warning += stats['warning']
refactor += stats['refactor']
convention += stats['convention']
statement += stats['statement']
except Exception,e:
# and restore the stdout/stderr
sys.stderr = sys.__stderr__
sys.stdout = sys.__stderr__
raise e
# and restore the stdout/stderr
sys.stderr = sys.__stderr__
sys.stdout = sys.__stderr__
stats = {'error': error,
'warning': warning,
'refactor': refactor,
'convention': convention,
'statement': statement}
lint_score = eval(cfg.get('MASTER', 'evaluation'), {}, stats)
coverage = 0 # TODO: calculate this
testless_classes = [] # TODO: generate this
print "<table>"
print "<tr>"
print "<td colspan=2><h1>WMCore test report</h1></td>"
print "</tr>"
print "<tr>"
print "<td>Average lint score</td>"
print "<td>%.2f</td>" % lint_score
print "</tr>"
print "<tr>"
print "<td>% code coverage</td>"
print "<td>%s</td>" % coverage
print "</tr>"
print "<tr>"
print "<td>Classes missing tests</td>"
print "<td>"
if len(testless_classes) == 0:
print "None"
else:
print "<ul>"
for c in testless_classes:
print "<li>%c</li>" % c
print "</ul>"
print "</td>"
print "</tr>"
print "</table>"
class CoverageCommand(Command):
description = "Run code coverage tests"
"""
To do this, we need to run all the unittests within the coverage
framework to record all the lines(and branches) executed
unfortunately, we have multiple code paths per database schema, so
we need to find a way to merge them.
TODO: modify the test command to have a flag to record code coverage
the file thats used can then be used here, saving us from running
our tests twice
"""
user_options = [ ]
def initialize_options(self):
pass
def finalize_options(self):
pass
def run(self):
"""
Determine the code's test coverage and return that as a float
http://nedbatchelder.com/code/coverage/
"""
if can_coverage:
files = generate_filelist()
dataFile = None
cov = None
# attempt to load previously cached coverage information if it exists
try:
dataFile = open("wmcore-coverage.dat","r")
cov = coverage.coverage(branch = True, data_file='wmcore-coverage.dat')
cov.load()
except:
cov = coverage.coverage(branch = True, )
cov.start()
runUnitTests()
cov.stop()
cov.save()
# we have our coverage information, now let's do something with it
# get a list of modules
cov.report(morfs = files, file=sys.stdout)
return 0
else:
print 'You need the coverage module installed before running the' +\
' coverage command'