From 95091619ca08a76dcd8f867f0ec94c7c9b3f0c1e Mon Sep 17 00:00:00 2001 From: Alan Malta Rodrigues Date: Wed, 24 Jun 2020 10:09:42 +0200 Subject: [PATCH] Implement whole logic of input data placement with Rucio fix import from PycurlRucio --- .../WMCore/MicroService/Tools/PycurlRucio.py | 269 ++++++++++++++++++ .../WMCore/MicroService/Tools/__init__.py | 14 + .../WMCore/MicroService/Unified/Common.py | 22 +- .../WMCore/MicroService/Unified/MSCore.py | 3 +- .../MicroService/Unified/MSTransferor.py | 6 +- .../MicroService/Unified/RequestInfo.py | 69 +++-- src/python/WMCore/Services/pycurl_manager.py | 27 +- 7 files changed, 382 insertions(+), 28 deletions(-) create mode 100644 src/python/WMCore/MicroService/Tools/PycurlRucio.py create mode 100644 src/python/WMCore/MicroService/Tools/__init__.py diff --git a/src/python/WMCore/MicroService/Tools/PycurlRucio.py b/src/python/WMCore/MicroService/Tools/PycurlRucio.py new file mode 100644 index 00000000000..f0758bd76e7 --- /dev/null +++ b/src/python/WMCore/MicroService/Tools/PycurlRucio.py @@ -0,0 +1,269 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Module based on the pycurl_manager module, implementing a few +functions to call to the Rucio RESTful APIs, leveraging the +pycurl concurrency. +""" +from __future__ import print_function, division + +import datetime +import json +import logging +import re + +try: + from urllib import quote, unquote +except ImportError: + # PY3 + from urllib.parse import quote, unquote +from WMCore.MicroService.Unified.Common import cert, ckey +from WMCore.Services.pycurl_manager import RequestHandler +from WMCore.Services.pycurl_manager import getdata as multi_getdata + + +def parseNewLineJson(stream): + """ + Parse newline delimited json streaming data + """ + for line in stream.split("\n"): + if line: + yield json.loads(line) + + +def getRucioToken(rucioAuthUrl, rucioAcct): + """ + Provided a Rucio account, fetch a token from the authentication server + :param rucioAuthUrl: url to the rucio authentication server + :param rucioAcct: rucio account to be used + :return: a string token and its expiration time in datetime.datetime object + """ + params = {} + headers = {"X-Rucio-Account": rucioAcct} + + url = '%s/auth/x509' % rucioAuthUrl + logging.info("Requesting a token to Rucio for account: %s, against url: %s", rucioAcct, rucioAuthUrl) + mgr = RequestHandler() + res = mgr.getheader(url, params=params, headers=headers, ckey=ckey(), cert=cert()) + if res == "OK": + userToken = res.getHeaderKey('X-Rucio-Auth-Token') + tokenExpiration = res.getHeaderKey('X-Rucio-Auth-Token-Expires') + logging.info("Retrieved a token valid until: %s", tokenExpiration) + # convert the human readable expiration time to epoch time + tokenExpiration = datetime.datetime.strptime(tokenExpiration, "%a, %d %b %Y %H:%M:%S %Z") + # tokenExpiration = int(tokenExpiration.strftime('%s')) # convert it to EPOCH time + return userToken, tokenExpiration + + raise RuntimeError("Failed to acquire a Rucio token. Error: {}".format(res.getReason())) + + +def renewRucioToken(rucioAuthUrl, userToken): + """ + Provided a user Rucio token, check it's lifetime and extend it by another hour + :param rucioAuthUrl: url to the rucio authentication server + :param rucioAcct: rucio account to be used + :return: a datetime.datetime object with the new token lifetime + """ + params = {} + headers = {"X-Rucio-Auth-Token": userToken} + + url = '%s/auth/validate' % rucioAuthUrl + logging.info("Renewing the Rucio token...") + mgr = RequestHandler() + res = mgr.getdata(url, params=params, headers=headers, ckey=ckey(), cert=cert()) + try: + newExpiration = eval(res)['lifetime'] + except Exception as exc: + raise RuntimeError("Failed to renew Rucio token. Response: {} Error: {}".format(res, str(exc))) + return newExpiration + + +def getPileupDatasetSizesRucio(datasets, rucioUrl, rucioToken, scope="cms"): + """ + Given a list of datasets, find their total size in Rucio + :param datasets: list of dataset names + :param rucioUrl: a string with the Rucio URL + :param rucioToken: a string with the user rucio token + :param scope: a string with the Rucio scope of our data + :return: a flat dictionary of datasets and their respective sizes + NOTE: Value `None` is returned in case the data-service failed to serve a given request. + NOTE: Rucio version of getPileupDatasetSizes() + """ + sizeByDset = {} + if not datasets: + return sizeByDset + + headers = {"X-Rucio-Auth-Token": rucioToken} + # FIXME: the query below was supposed to work.. + # wait for this issue to get fixed: https://github.com/rucio/rucio/issues/3762 + # type=collection maps to both dataset and container + # urls = ['%s/dids/%s/dids/search?type=collection&long=True&name=%s' % (rucioUrl, dset) for dset in datasets] + urls = ['{}/dids/{}/{}?dynamic=anything'.format(rucioUrl, scope, dset) for dset in datasets] + logging.info("Executing %d requests against Rucio for the container size", len(urls)) + data = multi_getdata(urls, ckey(), cert(), headers=headers, decode=True) + + for row in data: + dataset = row['url'].split('/dids/{}}/'.format(scope))[1] + dataset = dataset.replace("?dynamic=anything", "") + if row['data'] is None: + print("Failure in getPileupDatasetSizesRucio for dataset %s. Error: %s %s" % (dataset, + row.get('code'), + row.get('error'))) + sizeByDset.setdefault(dataset, None) + continue + sizeByDset.setdefault(dataset, row['data']['bytes']) + return sizeByDset + + +def getPileupSubscriptionsRucio(datasets, rucioUrl, rucioToken, scope="cms"): + """ + Provided a list of datasets, find dataset level subscriptions where it's + as complete as `percent_min`. + :param datasets: list of dataset names + :param rucioUrl: a string with the Rucio URL + :param rucioToken: a string with the user rucio token + :param scope: a string with the Rucio scope of our data + :return: a dictionary of datasets and a list of their location. + NOTE: Value `None` is returned in case the data-service failed to serve a given request. + """ + # FIXME: we should definitely make a feature request to Rucio... + # so much, just to get the final RSEs for a dataset!!! + locationByDset = {} + if not datasets: + return locationByDset + + headers = {"X-Rucio-Auth-Token": rucioToken} + # first, resolve the dataset into blocks + blocksByDset = getDatasetBlocksRucio(datasets, rucioUrl, rucioToken, scope) + urls = [] + for dset, blocks in blocksByDset.items(): + if blocks: + for block in blocks: + urls.append('{}/replicas/{}/{}/datasets'.format(rucioUrl, scope, quote(block))) + + # this is going to be bloody expensive in terms of HTTP requests + logging.info("Executing %d requests against Rucio replicas API for blocks", len(urls)) + data = multi_getdata(urls, ckey(), cert(), headers=headers) + for row in data: + block = row['url'].split("/{}}/".format(scope))[1] + block = unquote(re.sub("/datasets$", "", block, 1)) + dataset = block.split("#")[0] + locationByDset.setdefault(dataset, set()) + if row['data'] is None: + msg = "Failure in getPileupSubscriptionsRucio for dataset {} and block {}".format(dataset, block) + msg += " Error: {} {}".format(row.get('code'), row.get('error')) + print(msg) + + locationByDset[dataset] = None + continue + if locationByDset.get(dataset, "non-None") is None: + # then one of the block requests failed, skip the whole dataset + continue + thisBlockRSEs = set() + for item in parseNewLineJson(row['data']): + if item['state'] == "AVAILABLE": + thisBlockRSEs.add(item["rse"]) + # now we have the final block location + if dataset not in locationByDset: + # then this is the first block of this dataset + locationByDset[dataset] = thisBlockRSEs + else: + # otherwise, make an intersection of them + locationByDset[dataset] = locationByDset[dataset] & thisBlockRSEs + return locationByDset + + +def getDatasetBlocksRucio(datasets, rucioUrl, rucioToken, scope="cms"): + """ + Provided a list of datasets, find all their blocks. + :param datasets: list of dataset names + :param rucioUrl: a string with the Rucio URL + :param rucioToken: a string with the user rucio token + :param scope: a string with the Rucio scope of our data + :return: a dictionary key'ed by the datasets with a list of blocks. + NOTE: Value `None` is returned in case the data-service failed to serve a given request. + """ + blocksByDset = {} + if not datasets: + return blocksByDset + + headers = {"X-Rucio-Auth-Token": rucioToken} + urls = ['{}/dids/{}/{}/dids'.format(rucioUrl, scope, dset) for dset in datasets] + logging.info("Executing %d requests against Rucio for the blocks by dataset", len(urls)) + data = multi_getdata(urls, ckey(), cert(), headers=headers) + for row in data: + dataset = row['url'].split("/{}}/".format(scope))[1] + dataset = re.sub("/dids$", "", dataset, 1) + if row['data'] is None: + print("Failure in getDatasetBlocksRucio for dataset %s. Error: %s %s" % (dataset, + row.get('code'), + row.get('error'))) + blocksByDset.setdefault(dataset, None) + continue + blocksByDset.setdefault(dataset, []) + for item in parseNewLineJson(row['data']): + blocksByDset[dataset].append(item["name"]) + return blocksByDset + + +def getBlockReplicasAndSizeRucio(datasets, rucioUrl, rucioToken, scope="cms"): + """ + Given a list of datasets, find all their blocks with replicas + available. + :param datasets: list of dataset names + :param rucioUrl: a string with the Rucio URL + :param rucioToken: a string with the user rucio token + :param scope: a string with the Rucio scope of our data + :return: a dictionary in the form of: + {"dataset": + {"block": + {"blockSize": 111, "locations": ["x", "y"]} + } + } + NOTE: Value `None` is returned in case the data-service failed to serve a given request. + """ + dsetBlockSize = {} + if not datasets: + return dsetBlockSize + + headers = {"X-Rucio-Auth-Token": rucioToken} + # first, figure out their block names + blocksByDset = getDatasetBlocksRucio(datasets, rucioUrl, rucioToken, scope=scope) + urls = [] + for dset, blocks in blocksByDset.items(): + if blocks: + for block in blocks: + urls.append('{}/replicas/{}/{}/datasets'.format(rucioUrl, scope, quote(block))) + + # next, query the replicas API for the block location + # this is going to be bloody expensive in terms of HTTP requests + logging.info("Executing %d requests against Rucio replicas API for blocks", len(urls)) + data = multi_getdata(urls, ckey(), cert(), headers=headers) + for row in data: + block = row['url'].split("/{}}/".format(scope))[1] + block = unquote(re.sub("/datasets$", "", block, 1)) + dataset = block.split("#")[0] + dsetBlockSize.setdefault(dataset, dict()) + if row['data'] is None: + msg = "Failure in getBlockReplicasAndSizeRucio for dataset {} and block {}".format(dataset, block) + msg += " Error: {} {}".format(row.get('code'), row.get('error')) + logging.error(msg) + + dsetBlockSize[dataset] = None + continue + if dsetBlockSize.get(dataset, None) is None: + # then one of the block requests failed, skip the whole dataset + continue + + thisBlockRSEs = [] + bytes = 0 + for item in parseNewLineJson(row['data']): + bytes = item['bytes'] + if item['state'] == "AVAILABLE": + thisBlockRSEs.append(item["rse"]) + # now we have the final block location + if not bytes and not thisBlockRSEs: + logging.warning("Block: %s has no replicas and no size", block) + else: + dsetBlockSize[dataset][block].update(({"locations": thisBlockRSEs, "blockSize": bytes})) + return dsetBlockSize diff --git a/src/python/WMCore/MicroService/Tools/__init__.py b/src/python/WMCore/MicroService/Tools/__init__.py new file mode 100644 index 00000000000..a472dc03eaf --- /dev/null +++ b/src/python/WMCore/MicroService/Tools/__init__.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python +""" +DESCRIPTION +""" +from __future__ import print_function, division +import sys + + +def main(): + sys.exit(0) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src/python/WMCore/MicroService/Unified/Common.py b/src/python/WMCore/MicroService/Unified/Common.py index af66ea2cc42..c461a255678 100644 --- a/src/python/WMCore/MicroService/Unified/Common.py +++ b/src/python/WMCore/MicroService/Unified/Common.py @@ -14,8 +14,9 @@ # system modules import re import time - from Utils.IteratorTools import grouper +from WMCore.MicroService.Tools.PycurlRucio import getPileupDatasetSizesRucio, getPileupSubscriptionsRucio, \ + getBlockReplicasAndSizeRucio try: from urllib import quote, unquote @@ -86,19 +87,23 @@ def dbsInfo(datasets, dbsUrl): return datasetBlocks, datasetSizes, datasetTransfers -def getPileupDatasetSizes(datasets, phedexUrl): +def getPileupDatasetSizes(datasets, phedexUrl, rucioToken=None): """ Given a list of datasets, find all their blocks with replicas available, i.e., blocks that have valid files to be processed, and calculate the total dataset size :param datasets: list of dataset names :param phedexUrl: a string with the PhEDEx URL + :param rucioToken: optional argument containing the Rucio token. + It also triggers calls to Rucio instead of PhEDEx :return: a dictionary of datasets and their respective sizes NOTE: Value `None` is returned in case the data-service failed to serve a given request. """ sizeByDset = {} if not datasets: return sizeByDset + if rucioToken: + return getPileupDatasetSizesRucio(datasets, phedexUrl, rucioToken) urls = ['%s/blockreplicas?dataset=%s' % (phedexUrl, dset) for dset in datasets] logging.info("Executing %d requests against PhEDEx 'blockreplicas' API", len(urls)) @@ -113,7 +118,7 @@ def getPileupDatasetSizes(datasets, phedexUrl): sizeByDset.setdefault(dataset, None) continue rows = json.loads(row['data']) - sizeByDset.setdefault(dataset, 0) # flat dict in the format of blockName: blockSize + sizeByDset.setdefault(dataset, 0) try: for item in rows['phedex']['block']: sizeByDset[dataset] += item['bytes'] @@ -123,7 +128,7 @@ def getPileupDatasetSizes(datasets, phedexUrl): return sizeByDset -def getBlockReplicasAndSize(datasets, phedexUrl, group=None): +def getBlockReplicasAndSize(datasets, phedexUrl, group=None, rucioToken=None): """ Given a list of datasets, find all their blocks with replicas available (thus blocks with at least 1 valid file), completed @@ -144,6 +149,8 @@ def getBlockReplicasAndSize(datasets, phedexUrl, group=None): dsetBlockSize = {} if not datasets: return dsetBlockSize + if rucioToken: + return getBlockReplicasAndSizeRucio(datasets, phedexUrl, rucioToken) urls = ['%s/blockreplicas?dataset=%s' % (phedexUrl, dset) for dset in datasets] logging.info("Executing %d requests against PhEDEx 'blockreplicas' API", len(urls)) @@ -175,8 +182,7 @@ def getBlockReplicasAndSize(datasets, phedexUrl, group=None): return dsetBlockSize -# FIXME: implement the same logic for Rucio -def getPileupSubscriptions(datasets, phedexUrl, group=None, percentMin=99): +def getPileupSubscriptions(datasets, phedexUrl, group=None, percentMin=99, rucioToken=None): """ Provided a list of datasets, find dataset level subscriptions where it's as complete as `percent_min`. @@ -184,12 +190,16 @@ def getPileupSubscriptions(datasets, phedexUrl, group=None, percentMin=99): :param phedexUrl: a string with the PhEDEx URL :param group: optional string with the PhEDEx group :param percent_min: only return subscriptions that are this complete + :param rucioToken: optional argument containing the Rucio token. + It also triggers calls to Rucio instead of PhEDEx :return: a dictionary of datasets and a list of their location. NOTE: Value `None` is returned in case the data-service failed to serve a given request. """ locationByDset = {} if not datasets: return locationByDset + if rucioToken: + return getPileupSubscriptionsRucio(datasets, phedexUrl, rucioToken) if group: url = "%s/subscriptions?group=%s" % (phedexUrl, group) diff --git a/src/python/WMCore/MicroService/Unified/MSCore.py b/src/python/WMCore/MicroService/Unified/MSCore.py index d592741c010..1c7d7045e92 100644 --- a/src/python/WMCore/MicroService/Unified/MSCore.py +++ b/src/python/WMCore/MicroService/Unified/MSCore.py @@ -36,7 +36,8 @@ def __init__(self, msConfig, logger=None): logger=self.logger) if self.msConfig['useRucio']: - self.rucio = Rucio(self.msConfig['rucioAccount'], configDict={"logger": self.logger}) + self.rucio = Rucio(self.msConfig['rucioAccount'], hostUrl=self.msConfig['rucioUrl'], + authUrl=self.msConfig['rucioAuthUrl'], configDict={"logger": self.logger}) else: # hard code it to production DBS otherwise PhEDEx subscribe API fails to match TMDB data dbsUrl = "https://cmsweb.cern.ch/dbs/prod/global/DBSReader" diff --git a/src/python/WMCore/MicroService/Unified/MSTransferor.py b/src/python/WMCore/MicroService/Unified/MSTransferor.py index ff1f54667a3..864da35861e 100644 --- a/src/python/WMCore/MicroService/Unified/MSTransferor.py +++ b/src/python/WMCore/MicroService/Unified/MSTransferor.py @@ -96,7 +96,6 @@ def __init__(self, msConfig, logger=None): self.msConfig["quotaUsage"], useRucio=self.msConfig["useRucio"], minimumThreshold=self.msConfig["minimumThreshold"], verbose=self.msConfig['verbose'], logger=logger) - self.reqInfo = RequestInfo(msConfig, logger) self.cric = CRIC(logger=self.logger) self.inputMap = {"InputDataset": "primary", @@ -170,6 +169,7 @@ def execute(self, reqStatus): self.updateCaches() self.updateReportDict(summary, "total_num_campaigns", len(self.campaigns)) self.updateReportDict(summary, "nodes_out_of_space", list(self.rseQuotas.getOutOfSpaceRSEs())) + self.reqInfo = RequestInfo(self.msConfig, self.logger) except RuntimeWarning as ex: msg = "All retries exhausted! Last error was: '%s'" % str(ex) msg += "\nRetrying to update caches again in the next cycle." @@ -615,7 +615,9 @@ def _getFinalPNNs(self, psns): pnns = set() for psn in psns: for pnn in self.psn2pnnMap.get(psn, []): - if pnn == "T2_CH_CERNBOX" or pnn.startswith("T3_") or pnn.endswith("_MSS") or pnn.endswith("_Export"): + if pnn == "T2_CH_CERNBOX" or pnn.startswith("T3_"): + pass + elif pnn.endswith("_Tape") or pnn.endswith("_MSS") or pnn.endswith("_Export"): pass else: pnns.add(pnn) diff --git a/src/python/WMCore/MicroService/Unified/RequestInfo.py b/src/python/WMCore/MicroService/Unified/RequestInfo.py index e0a5b42b697..f77670f9fb5 100644 --- a/src/python/WMCore/MicroService/Unified/RequestInfo.py +++ b/src/python/WMCore/MicroService/Unified/RequestInfo.py @@ -17,18 +17,20 @@ from Utils.IteratorTools import grouper from WMCore.DataStructs.LumiList import LumiList from WMCore.MicroService.DataStructs.Workflow import Workflow +from WMCore.MicroService.Tools.PycurlRucio import getRucioToken from WMCore.MicroService.Unified.Common import \ elapsedTime, cert, ckey, workflowsInfo, eventsLumisInfo, getIO, \ dbsInfo, phedexInfo, getComputingTime, getNCopies, teraBytes, \ findBlockParents, findParent, getBlocksByDsetAndRun, getFileLumisInBlock, \ getBlockReplicasAndSize, getPileupDatasetSizes, getPileupSubscriptions, getRunsInBlock -from WMCore.MicroService.Unified.MSCore import MSCore from WMCore.MicroService.Unified.SiteInfo import SiteInfo +from WMCore.Services.PhEDEx.PhEDEx import PhEDEx +from WMCore.Services.ReqMgrAux.ReqMgrAux import ReqMgrAux from WMCore.Services.pycurl_manager import getdata \ as multi_getdata, RequestHandler -class RequestInfo(MSCore): +class RequestInfo(object): """ RequestInfo class provides functionality to access and manipulate requests. @@ -38,7 +40,21 @@ def __init__(self, msConfig, logger): """ Basic setup for this RequestInfo module """ - super(RequestInfo, self).__init__(msConfig, logger) + self.logger = logger + self.msConfig = msConfig + + self.reqmgrAux = ReqMgrAux(self.msConfig['reqmgr2Url'], + httpDict={'cacheduration': 1.0}, + logger=self.logger) + + if self.msConfig['useRucio']: + self.rucioToken, self.tokenValidity = getRucioToken(self.msConfig['rucioAuthUrl'], + self.msConfig['rucioAccount']) + else: + # hard code it to production DBS otherwise PhEDEx subscribe API fails to match TMDB data + dbsUrl = "https://cmsweb.cern.ch/dbs/prod/global/DBSReader" + self.phedex = PhEDEx(httpDict={'cacheduration': 0.5}, dbsUrl=dbsUrl, logger=self.logger) + self.rucioToken, self.tokenValidity = None, None def __call__(self, reqRecords): """ @@ -49,8 +65,7 @@ def __call__(self, reqRecords): # obtain new unified Configuration uConfig = self.unifiedConfig() if not uConfig: - self.logger.warning( - "Failed to fetch the latest unified config. Skipping this cycle") + self.logger.warning("Failed to fetch the latest unified config. Skipping this cycle") return [] self.logger.info("Going to process %d requests.", len(reqRecords)) @@ -287,16 +302,28 @@ def getSecondaryDatasets(self, workflows): for wflow in workflows: datasets = datasets | wflow.getPileupDatasets() - # now fetch valid blocks from PhEDEx and calculate the total dataset size - self.logger.info("Fetching pileup dataset sizes for %d datasets against PhEDEx: %s", - len(datasets), self.msConfig['phedexUrl']) - sizesByDset = getPileupDatasetSizes(datasets, self.msConfig['phedexUrl']) + if self.rucioToken: + # now fetch valid blocks from PhEDEx and calculate the total dataset size + self.logger.info("Fetching pileup dataset sizes for %d datasets against Rucio: %s", + len(datasets), self.msConfig['rucioUrl']) + sizesByDset = getPileupDatasetSizes(datasets, self.msConfig['rucioUrl'], self.rucioToken) + + # then fetch data location for subscribed data, under the group provided in the config + self.logger.info("Fetching pileup dataset location for %d datasets against Rucio: %s", + len(datasets), self.msConfig['RucioUrl']) + locationsByDset = getPileupSubscriptions(datasets, self.msConfig['rucioUrl'], + percentMin=self.msConfig['minPercentCompletion']) + else: + # now fetch valid blocks from PhEDEx and calculate the total dataset size + self.logger.info("Fetching pileup dataset sizes for %d datasets against PhEDEx: %s", + len(datasets), self.msConfig['phedexUrl']) + sizesByDset = getPileupDatasetSizes(datasets, self.msConfig['phedexUrl']) - # then fetch data location for subscribed data, under the group provided in the config - self.logger.info("Fetching pileup dataset location for %d datasets against PhEDEx: %s", - len(datasets), self.msConfig['phedexUrl']) - locationsByDset = getPileupSubscriptions(datasets, self.msConfig['phedexUrl'], - percentMin=self.msConfig['minPercentCompletion']) + # then fetch data location for subscribed data, under the group provided in the config + self.logger.info("Fetching pileup dataset location for %d datasets against PhEDEx: %s", + len(datasets), self.msConfig['phedexUrl']) + locationsByDset = getPileupSubscriptions(datasets, self.msConfig['phedexUrl'], + percentMin=self.msConfig['minPercentCompletion']) # now check if any of our calls failed; if so, workflow needs to be skipped from this cycle # FIXME: isn't there a better way to do this?!? @@ -340,10 +367,16 @@ def getInputDataBlocks(self, workflows): if dataIn['type'] in ["primary", "parent"]: datasets.add(dataIn['name']) - # now fetch block names from PhEDEx - self.logger.info("Fetching block info for %d datasets against PhEDEx: %s", - len(datasets), self.msConfig['phedexUrl']) - blocksByDset = getBlockReplicasAndSize(datasets, self.msConfig['phedexUrl']) + if self.rucioToken: + # now fetch valid blocks from PhEDEx and calculate the total dataset size + self.logger.info("Fetching parent/primary block info for %d datasets against Rucio: %s", + len(datasets), self.msConfig['rucioUrl']) + blocksByDset = getBlockReplicasAndSize(datasets, self.msConfig['rucioUrl'], self.rucioToken) + else: + # now fetch block names from PhEDEx + self.logger.info("Fetching parent/primary block info for %d datasets against PhEDEx: %s", + len(datasets), self.msConfig['phedexUrl']) + blocksByDset = getBlockReplicasAndSize(datasets, self.msConfig['phedexUrl']) # now check if any of our calls failed; if so, workflow needs to be skipped from this cycle # FIXME: isn't there a better way to do this?!? diff --git a/src/python/WMCore/Services/pycurl_manager.py b/src/python/WMCore/Services/pycurl_manager.py index 25ded6f1391..e096deab646 100644 --- a/src/python/WMCore/Services/pycurl_manager.py +++ b/src/python/WMCore/Services/pycurl_manager.py @@ -90,6 +90,30 @@ def parse(self, response): except: pass + def getReason(self): + """ + Return the HTTP request reason + """ + return self.reason + + def getHeader(self): + """ + Return the header dictionary object + """ + return self.header + + def getHeaderKey(self, keyName): + """ + Provided a key name, return it from the HTTP header. + Note that - by design - header keys are meant to be + case insensitive + :param keyName: a header key name to be looked up + :return: the value for that header key, or None if not found + """ + for keyHea, valHea in self.header.items(): + if keyHea.lower() == keyName.lower(): + return valHea + class RequestHandler(object): """ @@ -291,7 +315,8 @@ def getheader(self, url, params, headers=None, verb='GET', verbose=0, ckey=None, cert=None, doseq=True): """Fetch HTTP header""" header, _ = self.request(url, params, headers, verb, - verbose, ckey, cert, doseq) + verbose, ckey, cert, doseq=doseq) + return header def multirequest(self, url, parray, headers=None,