Skip to content

Commit

Permalink
Switch validation script to Rucio; rollback py3 changes
Browse files Browse the repository at this point in the history
pylint fixes

do not set Test RSEs to the PNN field

retrieve container level rules too

Remake of dataset file count script to work with Rucio; rollback py3 aliases
  • Loading branch information
amaltaro committed Nov 4, 2020
1 parent b0e11b6 commit 0c18625
Show file tree
Hide file tree
Showing 3 changed files with 311 additions and 209 deletions.
209 changes: 92 additions & 117 deletions bin/adhoc-scripts/checkDsetFileCount.py
Original file line number Diff line number Diff line change
@@ -1,139 +1,114 @@
#!/usr/bin/env python
"""
Script meant to be used to check the status of a dataset (or lfn) against DBS
and PhEDEx. It also prints any discrepancy between those 2 data management tools.
Script meant to fetch the number of blocks and files for a given dataset,
using both DBS and Rucio services. It prints any inconsistency among
those two.
"""
from __future__ import print_function, division
from future import standard_library
standard_library.install_aliases()

import http.client
import json
import os
import logging
import sys
import urllib.request, urllib.parse
from urllib.error import HTTPError, URLError
from argparse import ArgumentParser

main_url = "https://cmsweb.cern.ch"
phedex_url = main_url + "/phedex/datasvc/json/prod/"
dbs_url = main_url + "/dbs/prod/global/DBSReader/"


class HTTPSClientAuthHandler(urllib.request.HTTPSHandler):
def __init__(self):
urllib.request.HTTPSHandler.__init__(self)
self.key = os.getenv("X509_USER_PROXY")
self.cert = os.getenv("X509_USER_PROXY")

def https_open(self, req):
return self.do_open(self.getConnection, req)

def getConnection(self, host, timeout=300):
return http.client.HTTPSConnection(host, key_file=self.key, cert_file=self.cert)


def get_content(url, params=None):
opener = urllib.request.build_opener(HTTPSClientAuthHandler())
try:
if params:
response = opener.open(url, params)
output = response.read()
else:
response = opener.open(url)
output = response.read()
except HTTPError as e:
print('The server couldn\'t fulfill the request. Erro code: ', e.code)
sys.exit(1)
except URLError as e:
print('Failed to reach server. Reason:', e.reason)
sys.exit(1)
return output
from collections import Counter

from future.utils import viewkeys, viewvalues

from WMCore.Services.DBS.DBS3Reader import DBS3Reader
from WMCore.Services.Rucio.Rucio import Rucio

RUCIO_ACCT = "wma_prod"
RUCIO_HOST = "http://cms-rucio.cern.ch"
RUCIO_AUTH = "https://cms-rucio-auth.cern.ch"
DBS_URL = "https://cmsweb.cern.ch/dbs/prod/global/DBSReader/"

def phedex_info(dataset):

def loggerSetup(logLevel=logging.INFO):
"""
Query blockreplicas PhEDEx API to retrieve detailed information
for a specific dataset
Return a logger which writes everything to stdout.
"""
api_url = phedex_url + "blockreplicas" + "?" + urllib.parse.urlencode([('dataset', dataset)])
phedex_summary = json.loads(get_content(api_url))
return phedex_summary
logger = logging.getLogger(__name__)
outHandler = logging.StreamHandler(sys.stdout)
outHandler.setFormatter(logging.Formatter("%(asctime)s:%(levelname)s:%(module)s: %(message)s"))
outHandler.setLevel(logLevel)
logger.addHandler(outHandler)
logger.setLevel(logLevel)
return logger


def dbs_info(dataset):
def getFromRucio(dataset, logger):
"""
Queries 2 DBS APIs to get both summary and detailed information
Using the WMCore Rucio object and fetch all the blocks and files
for a given container.
Returns a dictionary key'ed by the block name, value is the amount of files.
"""
dbs_out = {}
api_url = dbs_url + "blocksummaries" + "?" + urllib.parse.urlencode({'dataset': dataset})
dbs_out['blocksummaries'] = json.loads(get_content(api_url))
api_url = dbs_url + "files" + "?" + urllib.parse.urlencode({'dataset': dataset}) + "&detail=1"
dbs_out['files'] = json.loads(get_content(api_url))
return dbs_out
rucio = Rucio(acct=RUCIO_ACCT,
hostUrl=RUCIO_HOST,
authUrl=RUCIO_AUTH,
configDict={'logger': logger, 'phedexCompatible': False})

result = dict()
for block in rucio.getBlocksInContainer(dataset):
data = rucio.getDID(block)
result.setdefault(block, data['length'])
return result


def main(argv=None):
def getFromDBS(dataset, logger):
"""
Uses the WMCore DBS3Reader object to fetch all the blocks and files
for a given container.
Returns a dictionary key'ed by the block name, and an inner dictionary
with the number of valid and invalid files. It also returns a total counter
for the number of valid and invalid files in the dataset.
"""
dbsReader = DBS3Reader(DBS_URL, logger)

result = dict()
dbsFilesCounter = Counter({'valid': 0, 'invalid': 0})
blocks = dbsReader.listFileBlocks(dataset)
for block in blocks:
data = dbsReader.dbs.listFileArray(block_name=block, validFileOnly=0, detail=True)
result.setdefault(block, Counter({'valid': 0, 'invalid': 0}))
for fileInfo in data:
if fileInfo['is_file_valid'] == 1:
result[block]['valid'] += 1
dbsFilesCounter['valid'] += 1
else:
result[block]['invalid'] += 1
dbsFilesCounter['invalid'] += 1
return result, dbsFilesCounter


def main():
"""
Receive either a dataset name or a logical file name
and proxy location. Then it queries the following data
services:
- phedex : gets number of files
- dbs : gets the number of valid, invalid and total files
It returns the number of files for this dataset/lfn available
in PhEDEx and DBS
Expects a dataset name as input argument.
It then queries Rucio and DBS and compare their blocks and
number of files.
"""
usage = "usage: %prog -d dataset_name"
parser = ArgumentParser(usage=usage)
parser.add_argument('-d', '--dataset', help='Dataset name', dest='dataset')
parser.add_argument('-l', '--lfn', help='Logical file name', dest='lfn')
options = parser.parse_args()
if not (options.dataset or options.lfn):
parser.error("Please supply either dataset name or file name \
and certificate location")
if len(sys.argv) != 2:
print("A dataset name must be provided in the command line")
sys.exit(1)
if options.dataset:
dataset = options.dataset
if options.lfn:
lfn = options.lfn
lfnAux = lfn.split('/')
dataset = '/' + lfnAux[4] + '/' + lfnAux[3] + '-' + lfnAux[6] + '/' + lfnAux[5]

print("Dataset: %s" % dataset)

phedex_out = phedex_info(dataset)
dbs_out = dbs_info(dataset)
phedex_files = 0
phedex_blocks = {}
for item in phedex_out["phedex"]["block"]:
phedex_files += item['files']
phedex_blocks.setdefault(item['name'], item['files'])

dbs_files = dbs_out['blocksummaries'][0]['num_file']
dbs_blocks = {}
dbs_file_valid = 0
dbs_file_invalid = 0
for item in dbs_out['files']:
dbs_blocks.setdefault(item['block_name'], 0)
dbs_blocks[item['block_name']] += 1
if item['is_file_valid']:
dbs_file_valid += 1
else:
dbs_file_invalid += 1

print("Phedex file count : ", phedex_files)
print("DBS file count : ", dbs_files)
print(" - valid files : ", dbs_file_valid)
print(" - invalid files : ", dbs_file_invalid)
print(" - valid+invalid : ", (dbs_file_valid + dbs_file_invalid))
print("Blocks in PhEDEx but not in DBS: ", set(phedex_blocks.keys()) - set(dbs_blocks.keys()))
print("Blocks in DBS but not in PhEDEx: ", set(dbs_blocks.keys()) - set(phedex_blocks.keys()))

for blockname in phedex_blocks:
if phedex_blocks[blockname] != dbs_blocks.get(blockname):
print("Block with file mismatch: %s" % blockname)
print("\tPhEDEx: %s\t\tDBS: %s" % (phedex_blocks.get(blockname), dbs_blocks.get(blockname)))
datasetName = sys.argv[1]

logger = loggerSetup(logging.INFO)

rucioOutput = getFromRucio(datasetName, logger)
dbsOutput, dbsFilesCounter = getFromDBS(datasetName, logger)

logger.info("*** Dataset: %s", datasetName)
logger.info("Rucio file count : %s", sum(viewvalues(rucioOutput)))
logger.info("DBS file count : %s", dbsFilesCounter['valid'] + dbsFilesCounter['invalid'])
logger.info(" - valid files : %s", dbsFilesCounter['valid'])
logger.info(" - invalid files : %s", dbsFilesCounter['invalid'])
logger.info("Blocks in Rucio but not in DBS: %s", set(viewkeys(rucioOutput)) - set(viewkeys(dbsOutput)))
logger.info("Blocks in DBS but not in Rucio: %s", set(viewkeys(dbsOutput)) - set(viewkeys(rucioOutput)))

for blockname in rucioOutput:
if blockname not in dbsOutput:
logger.error("This block does not exist in DBS: %s", blockname)
continue
if rucioOutput[blockname] != sum(viewvalues(dbsOutput[blockname])):
logger.warning("Block with file mismatch: %s", blockname)
logger.warning("\tRucio: %s\t\tDBS: %s", rucioOutput[blockname], sum(viewvalues(dbsOutput[blockname])))


if __name__ == "__main__":
Expand Down
35 changes: 20 additions & 15 deletions bin/adhoc-scripts/workflowCompletion.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,27 @@
"""
from __future__ import print_function, division

from future import standard_library
standard_library.install_aliases()
import argparse
import http.client
import httplib
import json
import os
import pwd
import sys
import urllib.request, urllib.parse
from urllib.error import HTTPError, URLError
import urllib
import urllib2
from urllib2 import HTTPError, URLError

# ID for the User-Agent
CLIENT_ID = 'workflowCompletion::python/%s.%s' % sys.version_info[:2]


class HTTPSClientAuthHandler(urllib.request.HTTPSHandler):
class HTTPSClientAuthHandler(urllib2.HTTPSHandler):
"""
Basic HTTPS class
"""

def __init__(self, key, cert):
urllib.request.HTTPSHandler.__init__(self)
urllib2.HTTPSHandler.__init__(self)
self.key = key
self.cert = cert

Expand All @@ -37,32 +36,38 @@ def https_open(self, req):
return self.do_open(self.getConnection, req)

def getConnection(self, host, timeout=290):
return http.client.HTTPSConnection(host, key_file=self.key, cert_file=self.cert)
return httplib.HTTPSConnection(host, key_file=self.key, cert_file=self.cert)


def getX509():
"Helper function to get x509 from env or tmp file"
certFile = os.environ.get('X509_USER_CERT', '')
keyFile = os.environ.get('X509_USER_KEY', '')
if certFile and keyFile:
return certFile, keyFile

proxy = os.environ.get('X509_USER_PROXY', '')
if not proxy:
proxy = '/tmp/x509up_u%s' % pwd.getpwuid(os.getuid()).pw_uid
if not os.path.isfile(proxy):
return ''
return proxy
return '', ''
return proxy, proxy


def getContent(url, params=None):
cert = getX509()
certFile, keyFile = getX509()
client = '%s (%s)' % (CLIENT_ID, os.environ.get('USER', ''))
handler = HTTPSClientAuthHandler(cert, cert)
opener = urllib.request.build_opener(handler)
handler = HTTPSClientAuthHandler(keyFile, certFile)
opener = urllib2.build_opener(handler)
opener.addheaders = [("User-Agent", client),
("Accept", "application/json")]

try:
response = opener.open(url, params)
output = response.read()
except HTTPError as e:
print("The server couldn't fulfill the request at %s" % url)
print("Error code: ", e.code)
print("Error: {}".format(e))
output = '{}'
# sys.exit(1)
except URLError as e:
Expand Down Expand Up @@ -108,7 +113,7 @@ def handleDBS(reqmgrOutDsets, cmswebUrl):

dbsOutput = {}
for dataset in reqmgrOutDsets:
fullUrl = dbsUrl + "filesummaries?" + urllib.parse.urlencode({'dataset': dataset})
fullUrl = dbsUrl + "filesummaries?" + urllib.urlencode({'dataset': dataset})
data = json.loads(getContent(fullUrl))
if data:
dbsOutput[dataset] = data[0]['num_lumi']
Expand Down
Loading

0 comments on commit 0c18625

Please sign in to comment.