Skip to content

Commit

Permalink
Implement whole logic of input data placement with Rucio
Browse files Browse the repository at this point in the history
fix import from PycurlRucio
  • Loading branch information
amaltaro committed Jun 25, 2020
1 parent bfd844b commit 9509161
Show file tree
Hide file tree
Showing 7 changed files with 382 additions and 28 deletions.
269 changes: 269 additions & 0 deletions src/python/WMCore/MicroService/Tools/PycurlRucio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Module based on the pycurl_manager module, implementing a few
functions to call to the Rucio RESTful APIs, leveraging the
pycurl concurrency.
"""
from __future__ import print_function, division

import datetime
import json
import logging
import re

try:
from urllib import quote, unquote
except ImportError:
# PY3
from urllib.parse import quote, unquote
from WMCore.MicroService.Unified.Common import cert, ckey
from WMCore.Services.pycurl_manager import RequestHandler
from WMCore.Services.pycurl_manager import getdata as multi_getdata


def parseNewLineJson(stream):
"""
Parse newline delimited json streaming data
"""
for line in stream.split("\n"):
if line:
yield json.loads(line)


def getRucioToken(rucioAuthUrl, rucioAcct):
"""
Provided a Rucio account, fetch a token from the authentication server
:param rucioAuthUrl: url to the rucio authentication server
:param rucioAcct: rucio account to be used
:return: a string token and its expiration time in datetime.datetime object
"""
params = {}
headers = {"X-Rucio-Account": rucioAcct}

url = '%s/auth/x509' % rucioAuthUrl
logging.info("Requesting a token to Rucio for account: %s, against url: %s", rucioAcct, rucioAuthUrl)
mgr = RequestHandler()
res = mgr.getheader(url, params=params, headers=headers, ckey=ckey(), cert=cert())
if res == "OK":
userToken = res.getHeaderKey('X-Rucio-Auth-Token')
tokenExpiration = res.getHeaderKey('X-Rucio-Auth-Token-Expires')
logging.info("Retrieved a token valid until: %s", tokenExpiration)
# convert the human readable expiration time to epoch time
tokenExpiration = datetime.datetime.strptime(tokenExpiration, "%a, %d %b %Y %H:%M:%S %Z")
# tokenExpiration = int(tokenExpiration.strftime('%s')) # convert it to EPOCH time
return userToken, tokenExpiration

raise RuntimeError("Failed to acquire a Rucio token. Error: {}".format(res.getReason()))


def renewRucioToken(rucioAuthUrl, userToken):
"""
Provided a user Rucio token, check it's lifetime and extend it by another hour
:param rucioAuthUrl: url to the rucio authentication server
:param rucioAcct: rucio account to be used
:return: a datetime.datetime object with the new token lifetime
"""
params = {}
headers = {"X-Rucio-Auth-Token": userToken}

url = '%s/auth/validate' % rucioAuthUrl
logging.info("Renewing the Rucio token...")
mgr = RequestHandler()
res = mgr.getdata(url, params=params, headers=headers, ckey=ckey(), cert=cert())
try:
newExpiration = eval(res)['lifetime']
except Exception as exc:
raise RuntimeError("Failed to renew Rucio token. Response: {} Error: {}".format(res, str(exc)))
return newExpiration


def getPileupDatasetSizesRucio(datasets, rucioUrl, rucioToken, scope="cms"):
"""
Given a list of datasets, find their total size in Rucio
:param datasets: list of dataset names
:param rucioUrl: a string with the Rucio URL
:param rucioToken: a string with the user rucio token
:param scope: a string with the Rucio scope of our data
:return: a flat dictionary of datasets and their respective sizes
NOTE: Value `None` is returned in case the data-service failed to serve a given request.
NOTE: Rucio version of getPileupDatasetSizes()
"""
sizeByDset = {}
if not datasets:
return sizeByDset

headers = {"X-Rucio-Auth-Token": rucioToken}
# FIXME: the query below was supposed to work..
# wait for this issue to get fixed: https://github.com/rucio/rucio/issues/3762
# type=collection maps to both dataset and container
# urls = ['%s/dids/%s/dids/search?type=collection&long=True&name=%s' % (rucioUrl, dset) for dset in datasets]
urls = ['{}/dids/{}/{}?dynamic=anything'.format(rucioUrl, scope, dset) for dset in datasets]
logging.info("Executing %d requests against Rucio for the container size", len(urls))
data = multi_getdata(urls, ckey(), cert(), headers=headers, decode=True)

for row in data:
dataset = row['url'].split('/dids/{}}/'.format(scope))[1]
dataset = dataset.replace("?dynamic=anything", "")
if row['data'] is None:
print("Failure in getPileupDatasetSizesRucio for dataset %s. Error: %s %s" % (dataset,
row.get('code'),
row.get('error')))
sizeByDset.setdefault(dataset, None)
continue
sizeByDset.setdefault(dataset, row['data']['bytes'])
return sizeByDset


def getPileupSubscriptionsRucio(datasets, rucioUrl, rucioToken, scope="cms"):
"""
Provided a list of datasets, find dataset level subscriptions where it's
as complete as `percent_min`.
:param datasets: list of dataset names
:param rucioUrl: a string with the Rucio URL
:param rucioToken: a string with the user rucio token
:param scope: a string with the Rucio scope of our data
:return: a dictionary of datasets and a list of their location.
NOTE: Value `None` is returned in case the data-service failed to serve a given request.
"""
# FIXME: we should definitely make a feature request to Rucio...
# so much, just to get the final RSEs for a dataset!!!
locationByDset = {}
if not datasets:
return locationByDset

headers = {"X-Rucio-Auth-Token": rucioToken}
# first, resolve the dataset into blocks
blocksByDset = getDatasetBlocksRucio(datasets, rucioUrl, rucioToken, scope)
urls = []
for dset, blocks in blocksByDset.items():
if blocks:
for block in blocks:
urls.append('{}/replicas/{}/{}/datasets'.format(rucioUrl, scope, quote(block)))

# this is going to be bloody expensive in terms of HTTP requests
logging.info("Executing %d requests against Rucio replicas API for blocks", len(urls))
data = multi_getdata(urls, ckey(), cert(), headers=headers)
for row in data:
block = row['url'].split("/{}}/".format(scope))[1]
block = unquote(re.sub("/datasets$", "", block, 1))
dataset = block.split("#")[0]
locationByDset.setdefault(dataset, set())
if row['data'] is None:
msg = "Failure in getPileupSubscriptionsRucio for dataset {} and block {}".format(dataset, block)
msg += " Error: {} {}".format(row.get('code'), row.get('error'))
print(msg)

locationByDset[dataset] = None
continue
if locationByDset.get(dataset, "non-None") is None:
# then one of the block requests failed, skip the whole dataset
continue
thisBlockRSEs = set()
for item in parseNewLineJson(row['data']):
if item['state'] == "AVAILABLE":
thisBlockRSEs.add(item["rse"])
# now we have the final block location
if dataset not in locationByDset:
# then this is the first block of this dataset
locationByDset[dataset] = thisBlockRSEs
else:
# otherwise, make an intersection of them
locationByDset[dataset] = locationByDset[dataset] & thisBlockRSEs
return locationByDset


def getDatasetBlocksRucio(datasets, rucioUrl, rucioToken, scope="cms"):
"""
Provided a list of datasets, find all their blocks.
:param datasets: list of dataset names
:param rucioUrl: a string with the Rucio URL
:param rucioToken: a string with the user rucio token
:param scope: a string with the Rucio scope of our data
:return: a dictionary key'ed by the datasets with a list of blocks.
NOTE: Value `None` is returned in case the data-service failed to serve a given request.
"""
blocksByDset = {}
if not datasets:
return blocksByDset

headers = {"X-Rucio-Auth-Token": rucioToken}
urls = ['{}/dids/{}/{}/dids'.format(rucioUrl, scope, dset) for dset in datasets]
logging.info("Executing %d requests against Rucio for the blocks by dataset", len(urls))
data = multi_getdata(urls, ckey(), cert(), headers=headers)
for row in data:
dataset = row['url'].split("/{}}/".format(scope))[1]
dataset = re.sub("/dids$", "", dataset, 1)
if row['data'] is None:
print("Failure in getDatasetBlocksRucio for dataset %s. Error: %s %s" % (dataset,
row.get('code'),
row.get('error')))
blocksByDset.setdefault(dataset, None)
continue
blocksByDset.setdefault(dataset, [])
for item in parseNewLineJson(row['data']):
blocksByDset[dataset].append(item["name"])
return blocksByDset


def getBlockReplicasAndSizeRucio(datasets, rucioUrl, rucioToken, scope="cms"):
"""
Given a list of datasets, find all their blocks with replicas
available.
:param datasets: list of dataset names
:param rucioUrl: a string with the Rucio URL
:param rucioToken: a string with the user rucio token
:param scope: a string with the Rucio scope of our data
:return: a dictionary in the form of:
{"dataset":
{"block":
{"blockSize": 111, "locations": ["x", "y"]}
}
}
NOTE: Value `None` is returned in case the data-service failed to serve a given request.
"""
dsetBlockSize = {}
if not datasets:
return dsetBlockSize

headers = {"X-Rucio-Auth-Token": rucioToken}
# first, figure out their block names
blocksByDset = getDatasetBlocksRucio(datasets, rucioUrl, rucioToken, scope=scope)
urls = []
for dset, blocks in blocksByDset.items():
if blocks:
for block in blocks:
urls.append('{}/replicas/{}/{}/datasets'.format(rucioUrl, scope, quote(block)))

# next, query the replicas API for the block location
# this is going to be bloody expensive in terms of HTTP requests
logging.info("Executing %d requests against Rucio replicas API for blocks", len(urls))
data = multi_getdata(urls, ckey(), cert(), headers=headers)
for row in data:
block = row['url'].split("/{}}/".format(scope))[1]
block = unquote(re.sub("/datasets$", "", block, 1))
dataset = block.split("#")[0]
dsetBlockSize.setdefault(dataset, dict())
if row['data'] is None:
msg = "Failure in getBlockReplicasAndSizeRucio for dataset {} and block {}".format(dataset, block)
msg += " Error: {} {}".format(row.get('code'), row.get('error'))
logging.error(msg)

dsetBlockSize[dataset] = None
continue
if dsetBlockSize.get(dataset, None) is None:
# then one of the block requests failed, skip the whole dataset
continue

thisBlockRSEs = []
bytes = 0
for item in parseNewLineJson(row['data']):
bytes = item['bytes']
if item['state'] == "AVAILABLE":
thisBlockRSEs.append(item["rse"])
# now we have the final block location
if not bytes and not thisBlockRSEs:
logging.warning("Block: %s has no replicas and no size", block)
else:
dsetBlockSize[dataset][block].update(({"locations": thisBlockRSEs, "blockSize": bytes}))
return dsetBlockSize
14 changes: 14 additions & 0 deletions src/python/WMCore/MicroService/Tools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env python
"""
DESCRIPTION
"""
from __future__ import print_function, division
import sys


def main():
sys.exit(0)


if __name__ == '__main__':
sys.exit(main())
22 changes: 16 additions & 6 deletions src/python/WMCore/MicroService/Unified/Common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
# system modules
import re
import time

from Utils.IteratorTools import grouper
from WMCore.MicroService.Tools.PycurlRucio import getPileupDatasetSizesRucio, getPileupSubscriptionsRucio, \
getBlockReplicasAndSizeRucio

try:
from urllib import quote, unquote
Expand Down Expand Up @@ -86,19 +87,23 @@ def dbsInfo(datasets, dbsUrl):
return datasetBlocks, datasetSizes, datasetTransfers


def getPileupDatasetSizes(datasets, phedexUrl):
def getPileupDatasetSizes(datasets, phedexUrl, rucioToken=None):
"""
Given a list of datasets, find all their blocks with replicas
available, i.e., blocks that have valid files to be processed,
and calculate the total dataset size
:param datasets: list of dataset names
:param phedexUrl: a string with the PhEDEx URL
:param rucioToken: optional argument containing the Rucio token.
It also triggers calls to Rucio instead of PhEDEx
:return: a dictionary of datasets and their respective sizes
NOTE: Value `None` is returned in case the data-service failed to serve a given request.
"""
sizeByDset = {}
if not datasets:
return sizeByDset
if rucioToken:
return getPileupDatasetSizesRucio(datasets, phedexUrl, rucioToken)

urls = ['%s/blockreplicas?dataset=%s' % (phedexUrl, dset) for dset in datasets]
logging.info("Executing %d requests against PhEDEx 'blockreplicas' API", len(urls))
Expand All @@ -113,7 +118,7 @@ def getPileupDatasetSizes(datasets, phedexUrl):
sizeByDset.setdefault(dataset, None)
continue
rows = json.loads(row['data'])
sizeByDset.setdefault(dataset, 0) # flat dict in the format of blockName: blockSize
sizeByDset.setdefault(dataset, 0)
try:
for item in rows['phedex']['block']:
sizeByDset[dataset] += item['bytes']
Expand All @@ -123,7 +128,7 @@ def getPileupDatasetSizes(datasets, phedexUrl):
return sizeByDset


def getBlockReplicasAndSize(datasets, phedexUrl, group=None):
def getBlockReplicasAndSize(datasets, phedexUrl, group=None, rucioToken=None):
"""
Given a list of datasets, find all their blocks with replicas
available (thus blocks with at least 1 valid file), completed
Expand All @@ -144,6 +149,8 @@ def getBlockReplicasAndSize(datasets, phedexUrl, group=None):
dsetBlockSize = {}
if not datasets:
return dsetBlockSize
if rucioToken:
return getBlockReplicasAndSizeRucio(datasets, phedexUrl, rucioToken)

urls = ['%s/blockreplicas?dataset=%s' % (phedexUrl, dset) for dset in datasets]
logging.info("Executing %d requests against PhEDEx 'blockreplicas' API", len(urls))
Expand Down Expand Up @@ -175,21 +182,24 @@ def getBlockReplicasAndSize(datasets, phedexUrl, group=None):
return dsetBlockSize


# FIXME: implement the same logic for Rucio
def getPileupSubscriptions(datasets, phedexUrl, group=None, percentMin=99):
def getPileupSubscriptions(datasets, phedexUrl, group=None, percentMin=99, rucioToken=None):
"""
Provided a list of datasets, find dataset level subscriptions where it's
as complete as `percent_min`.
:param datasets: list of dataset names
:param phedexUrl: a string with the PhEDEx URL
:param group: optional string with the PhEDEx group
:param percent_min: only return subscriptions that are this complete
:param rucioToken: optional argument containing the Rucio token.
It also triggers calls to Rucio instead of PhEDEx
:return: a dictionary of datasets and a list of their location.
NOTE: Value `None` is returned in case the data-service failed to serve a given request.
"""
locationByDset = {}
if not datasets:
return locationByDset
if rucioToken:
return getPileupSubscriptionsRucio(datasets, phedexUrl, rucioToken)

if group:
url = "%s/subscriptions?group=%s" % (phedexUrl, group)
Expand Down
3 changes: 2 additions & 1 deletion src/python/WMCore/MicroService/Unified/MSCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ def __init__(self, msConfig, logger=None):
logger=self.logger)

if self.msConfig['useRucio']:
self.rucio = Rucio(self.msConfig['rucioAccount'], configDict={"logger": self.logger})
self.rucio = Rucio(self.msConfig['rucioAccount'], hostUrl=self.msConfig['rucioUrl'],
authUrl=self.msConfig['rucioAuthUrl'], configDict={"logger": self.logger})
else:
# hard code it to production DBS otherwise PhEDEx subscribe API fails to match TMDB data
dbsUrl = "https://cmsweb.cern.ch/dbs/prod/global/DBSReader"
Expand Down
Loading

0 comments on commit 9509161

Please sign in to comment.