Skip to content

Commit

Permalink
apply Todors suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
amaltaro committed Jul 2, 2020
1 parent b49cf5a commit ce54624
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 18 deletions.
6 changes: 3 additions & 3 deletions src/python/WMCore/MicroService/Tools/PycurlRucio.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def getPileupDatasetSizesRucio(datasets, rucioUrl, rucioToken, scope="cms"):
dataset = dataset.replace("?dynamic=anything", "")
if row['data'] is None:
msg = "Failure in getPileupDatasetSizesRucio for dataset %s. Error: %s %s"
logging.info(msg, dataset, row.get('code'), row.get('error'))
logging.error(msg, dataset, row.get('code'), row.get('error'))
sizeByDset.setdefault(dataset, None)
continue
sizeByDset.setdefault(dataset, row['data']['bytes'])
Expand Down Expand Up @@ -163,7 +163,7 @@ def getPileupSubscriptionsRucio(datasets, rucioUrl, rucioToken, scope="cms"):
if row['data'] is None:
msg = "Failure in getPileupSubscriptionsRucio for dataset {} and block {}".format(dataset, block)
msg += " Error: {} {}".format(row.get('code'), row.get('error'))
logging.info(msg)
logging.error(msg)

locationByDset[dataset] = None
continue
Expand Down Expand Up @@ -207,7 +207,7 @@ def getDatasetBlocksRucio(datasets, rucioUrl, rucioToken, scope="cms"):
dataset = re.sub("/dids$", "", dataset, 1)
if row['data'] is None:
msg = "Failure in getDatasetBlocksRucio for dataset %s. Error: %s %s"
logging.info(msg, dataset, row.get('code'), row.get('error'))
logging.error(msg, dataset, row.get('code'), row.get('error'))
blocksByDset.setdefault(dataset, None)
continue
blocksByDset.setdefault(dataset, [])
Expand Down
4 changes: 2 additions & 2 deletions src/python/WMCore/MicroService/Unified/Common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import re
import time
from Utils.IteratorTools import grouper
from WMCore.MicroService.Tools.PycurlRucio import getPileupDatasetSizesRucio, getPileupSubscriptionsRucio, \
getBlockReplicasAndSizeRucio
from WMCore.MicroService.Tools.PycurlRucio import (getPileupDatasetSizesRucio, getPileupSubscriptionsRucio,
getBlockReplicasAndSizeRucio)

try:
from urllib import quote, unquote
Expand Down
50 changes: 37 additions & 13 deletions src/python/WMCore/MicroService/Unified/MSTransferor.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ def __init__(self, msConfig, logger=None):
else:
self.msConfig["phedexRequestOnly"] = "n"

self.rseQuotas = RSEQuotas(self.msConfig['detoxUrl'], self.msConfig["quotaAccount"],
if self.msConfig.get('useRucio', False):
quotaAccount = self.msConfig["rucioAccount"]
else:
quotaAccount = self.msConfig["quotaAccount"]

self.rseQuotas = RSEQuotas(self.msConfig['detoxUrl'], quotaAccount,
self.msConfig["quotaUsage"], useRucio=self.msConfig["useRucio"],
minimumThreshold=self.msConfig["minimumThreshold"],
verbose=self.msConfig['verbose'], logger=logger)
Expand Down Expand Up @@ -632,9 +637,14 @@ def makeTransferRequest(self, wflow):
subLevel, dataSize, nodes, idx)
if not success:
# stop any other data placement for this workflow
msg = "There were failures transferring data for workflow: %s. Will retry again later."
self.logger.warning(msg, wflow.getName())
break
if transferId:
transRec['transferIDs'].add(transferId)
if isinstance(transferId, (set, list)):
transRec['transferIDs'].update(transferId)
else:
transRec['transferIDs'].add(transferId)
self.rseQuotas.updateNodeUsage(nodes[idx], dataSize)

# and update some instance caches
Expand All @@ -656,13 +666,17 @@ def makeTransferRucio(self, wflow, dataIn, subLevel, blocks, dataSize, nodes, no
:param wflow: the workflow object
:param dataIn: short summary of the data to be placed
:param subLevel: subscription level (dataset/container or block)
:param subLevel: subscription level (container or block)
:param blocks: list of blocks to be subscribed (or None if dataset level)
:param dataSize: amount of data being placed by this rule
:param nodes: list of nodes/RSE
:param nodeIdx: index of the node/RSE to be used in the replication rule
:return: a boolean flagging whether it succeeded or not, and the rule id
"""
# Here we need to map the PhEDEx subscription level to a Rucio grouping level
# where:
# "dataset" level in PhEDEx --> "ALL" in Rucio (whole dataset at same location)
# "block" level in PhEDEx --> "DATASET" in Rucio (whole block at same location)
subLevel = "ALL" if subLevel == "dataset" else "DATASET"
dids = blocks if blocks else dataIn['name']
rseExpr = nodes[nodeIdx]
Expand All @@ -679,7 +693,7 @@ def makeTransferRucio(self, wflow, dataIn, subLevel, blocks, dataSize, nodes, no
msg += ", where parent blocks have also been added for dataset: %s" % wflow.getParentDataset()
self.logger.info(msg)

success, transferId = True, 0
success, transferId = True, set()
if self.msConfig.get('enableDataTransfer', True):
# Force request-only subscription
# to any data transfer going above some threshold (do not auto-approve)
Expand All @@ -689,15 +703,25 @@ def makeTransferRucio(self, wflow, dataIn, subLevel, blocks, dataSize, nodes, no
rule['ask_approval'] = True

# Then make the data subscription, for real!!!
res = self.rucio.createReplicationRule(dids, rseExpr, **rule)
if res:
self.logger.info("Rule replication created under id: %s", res[0])
transferId = res[0]
# send an email notification, if needed
self.notifyLargeData(aboveWarningThreshold, transferId, wflow.getName(), dataSize, dataIn)
else:
self.logger.error("Failed to create rule for %s, will retry later", dids)
try:
res = self.rucio.createReplicationRule(dids, rseExpr, **rule)
except Exception:
msg = "Hit a bad exception while creating replication rules for DID: %s"
self.logger.error(msg, dids)
success = False
else:
if res:
# it could be that some of the DIDs already had such a rule in
# place, so we might be retrieving a bunch of rule ids instead of
# a single one
for ruleID in res:
self.logger.info("Rule replication created under id: %s", ruleID)
transferId.add(ruleID)
# send an email notification, if needed
self.notifyLargeData(aboveWarningThreshold, transferId, wflow.getName(), dataSize, dataIn)
else:
self.logger.error("Failed to create rule for %s, will retry later", dids)
success = False
else:
msg = "DRY-RUN: making Rucio rule for dids: %s, rse: %s, kwargs: %s"
self.logger.info(msg, dids, rseExpr, rule)
Expand All @@ -709,7 +733,7 @@ def makeTransferPhedex(self, wflow, dataBlocks, dataIn, subLevel, dataSize, node
:param wflow: the workflow object
:param dataIn: short summary of the data to be placed
:param subLevel: subscription level (dataset/container or block)
:param subLevel: subscription level (dataset or block)
:param dataSize: amount of data being placed by this rule
:param nodes: list of nodes/RSE
:param nodeIdx: index of the node/RSE to be used in the replication rule
Expand Down

0 comments on commit ce54624

Please sign in to comment.