-
Notifications
You must be signed in to change notification settings - Fork 3
WMAgent developer tutorial
This page is a concise (hopefully) summary of what I know regarding the WMCore system and the WMComponents, which are the heart of the deployed agent. I'll try to include everything important that I know about how the agent works.
But you don't really care about that, do you? Chances are that if you're here, you're here for one of two reasons.
- Something is broken
- Someone wants to add new functionality to one of the components.
- Both
I'll try and write down what can possibly go wrong, and also how to add most of the functionality that you normally have to, but that's going to have to involve some familiarity with the code itself. To do that I'll have to start with the general flow of the components, and then go component by component.
We use two different databases in the WMAgent, the first is a SQL based database, it's either MySQL or Oracle This is the current production deployment. Second is CouchDB, a non-relational database used to store documents like the Framework Job Reports (FWJRs). All the software for this is normally handled below the level that you will have to deal with. Database transactions for couch are almost all carried out through the ChangeState? class in WMCore.JobStateMachine?.ChangeState?. The SQL stuff is a bit more complicated, but the database connections are generally initialized in the setup of whatever you're doing, and then attached to python's threading class. You'll often see code like this scattered throughout the WMCore software:
myThread = threading.currentThread()
myThread.transaction.begin()
...
myThread.transaction.commit()
The init code attached the SQLAlchemy engine and DB gear to the current thread. This can then be accessed by any part of the code through the threading module's currentThread() method, which returns a data object with anything attached to it that you put there.
The WMAgent runs on a number of object types which are stored in the SQL database with unique IDs: workflows, subscriptions, jobs, files, jobgroups, filesets, etc. These objects are defined as classes in WMCore.WMBS, which basically defines all their functionality. These classes are essentially interface layers, connecting an instance of that class with the underlying database through the WMBS DAO layer, which is located in the same directory (by convention, library objects or others with DAOs often have directories named MySQL or Oracle).
To see an example, let's look at a basic object, the Fileset object.
A Fileset is defined by an entry into the wmbs_fileset database table, defined in WMCore.WMBS.CreateWMBSBase:
CREATE TABLE wmbs_fileset (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(500) NOT NULL,
open INT(1) NOT NULL DEFAULT 0,
last_update INTEGER NOT NULL,
UNIQUE (name))
The code that accesses it is in the Fileset class in WMCore.WMBS.Fileset, which has a number of methods. For one example, we can look at the exists() method, a basic method which determines whether the Fileset you've defined actually exists:
def exists(self):
"""
Does a fileset exist with this name in the database
"""
if self.id != -1:
action = self.daofactory(classname = "Fileset.ExistsByID")
result = action.execute(id = self.id, conn = self.getDBConn(),
transaction = self.existingTransaction())
else:
action = self.daofactory(classname = "Fileset.Exists")
result = action.execute(self.name, conn = self.getDBConn(),
transaction = self.existingTransaction())
if result != False:
self.id = result
return result
The class, using a pre-defined factory object (a glorified import statement), loads the WMCore.WMBS.MySQL.Fileset.ExistsByID or Exists object and runs it using the current transation (WMBS objects have special getDBConn() and existingTransaction() methods). The code in Exists is the code that holds both the SQL statement and the execute instruction, building the binds and executing them on the SQL database:
class ExistsByID(DBFormatter):
sql = """select id from wmbs_fileset
where id = :id"""
...
...
...
def execute(self, id, conn = None, transaction = False):
result = self.dbi.processData(self.sql, self.getBinds(id),
conn = conn, transaction = transaction)
return self.format(result)
One thing you will notice as you go onwards is that the methods in the classes are rarely used. They work fine for testing, but for running over hundreds or thousands of objects doing things one at a time is simply too slow. Most of the components use bulk DAO calls of some sort.
Types of WMBS Objects
Workflow: A workflow represents a defined "task" a single piece of work that is done on the worker node. A typical user "request" will have multiple tasks, some automatic. For instance you could have a ReReco request with a ReReco task, a Merge task, and then a LogCollect? and Cleanup task. Workflows are created by the WorkQueue?. Subscription: A subscription is a single block of work - a task being run over a specific fileset. For instance if you were intending to run a request over several blocks in a dataset, the WorkQueue might inject your work one block at a time as it has space to run them. Each block would have one subscription, and there would be a different fileset for each subscription. Subscriptions are normally created by the WorkQueue. Fileset: This is a list of files. It is very simple and you won't have to mess around with them much. The only complication is that there are both input and output filesets, but these are generally fairly simply arranged. Filesets are created by the WorkQueue - as far as you're concerned. Files: A file uses the standard CMS definition, it's a unique file object stored in an SE somewhere. We define it by a unique LFN, and the other characteristics of a CMS file. Files come from two places. The initial files are put in by the WorkQueue from DBS. However, files are also created by the jobs when they run in the form of output, log files, etc. These files are created in WMBS by the JobAccountant once it processes the job FWJR. Files have a list of filesets in wmbs_fileset_files, a list of locations they are at in wmbs_file_location, and a series of subscriptions in wmbs_sub_* Job Group: A job group is sort of an archaic object representing a list of jobs. Current convention is that a job group is a list of jobs created by the JobCreator in one iteration that have the same list of possible locations. I would have gotten rid of them if they weren't so ingrained in everything. A job group belongs to a subscription. Job: A job is a single piece of work, the object that is sent to a worker node and takes up one batch slot. A job does work based on its task (inherited from the job group, and in turn from the subscription, and in turn from the workflow), and a list of files (assigned in wmbs_job_assoc). Jobs are created by the JobCreator. Question: Why do we have filesets and four separate tables that assign files to subscriptions. Why not just assign files directly to subscriptions?
Answer: We can have multiple workflows running over the same files, especially if they're popular. Rather than create a new file entry for every subscription running over the same file, we create a single entry for > the file, and then assign it to multiple filesets and through that to multiple subscriptions. This is easier on the database in terms of DB size, although it forces us to keep better track of what we're doing.
Work
Of course, work is not submitted by users already in WMBS format. Nor does WMBS hold everything that you need to run jobs. That would be too convenient. A user submits a request to the RequestManager, which uses the data they pass in to create a WMWorkload, using the WMSpec. Workloads contain tasks, tasks contain steps, and altogether they have all the information they need to run. See the WMSpec section under Libraries for more information on how.
The JobStateMachine
The JobStateMachine (JSM) is probably the most important piece of the WMBS system. It consists of two parts, connected through the code in WMCore.JobStateMachine?.ChangeState?, a column in the wmbs_job table called state which holds the current state of the job, and the corresponding state transition system in Couch. When a component tries to do work it looks for all jobs in a current state, loads them, does what it's supposed to do, and then migrates them to the next state. Most components contain, for that reason, a final call to ChangeState?.propagate(), where they send a list of jobs and the state that those jobs should be propagated to.
Underneath the hood this code updated the wmbs_job.state column and updates the couch record. The couch record then holds a complete list of what the job has done, and is used for error handling.
Allowed states are defined in WMBS.JobStateMachine?.Transitions, and in the wmbs_job_state table.
created Job has been created by the JobCreator and is awaiting submission. Jobs may stay here for quite some time if no slots are available executing Job has been submitted to the batch system and is running complete Job has finished on the batch system success Job has succeeded and been processed by the JobAccountant cleanout Job has been archived by the JobArchiver and is awaiting removal createfailed Job failed during creation (currently unused) submitfailed Job failed during submission jobfailed Job failed during execution createcooloff Job passed through createfailed and is awaiting resubmission submitcooloff Job passed through submitfailed and is awaiting resubmission jobcooloff Job passed through jobfailed and is awaiting resubmission exhausted Job has failed too many times (or too severely) to retry and is being failed out of the system Components
The Components are the ones that actually create, run, and manage the jobs. Let's step through them one at a time:
JobCreator
The JobCreator? is usually the most stable of the "complex" components - it does a lot of things but does things pretty smoothly since it has been extensively tested for several years. Fundamentally, what the JobCreator does is take the subscriptions and workflows injected by the WorkQueue and turn them into jobs. It does this in several steps.
It checks out a list of subscriptions with files available, and runs over each subscription in turn. For each subscription it pulls out the WMSpec from the WMBS Workflow, loading it from wherever it happens to be cached on disk. This is important as this is the only component that should be directly touching the spec (which is the master copy of all information about the job). It then attempts to use that information to split the subscription into jobs. It does this using the JobSplitting code (described below in the libraries section), using the splitting type defined in the subscription, and the arguments that it pulls out of the spec (LumiBased and FileBased are the most popular processing types, but there are separate splitting algos for merges, cleanups, and for use in MC production etc.) This procedure is complicated by the use of a SQLAlchemy cursor, which is basically an open DB transaction. When running through a large subscription we pull out a limited number (a few hundred) files, split those into jobs, and then repeat on the next batch of files, allowing us to pull huge workloads one chunk at a time and reducing our memory footprint. Once it has the jobs, in terms of JobGroups filled with WMBS Job objects, the JobCreator creates a cache directory for each job. The cache directory is where the per-job files are dumped. This is important for batch execution as most batch systems attempt to drop a series of log files, etc., somewhere on the submit machine. The job is then pickled and placed in the cache directory. This instance of the job object contains the job's in database arguments, but it also contains several other pieces of information (such as the task) pulled directly out of the spec and put in each job. This information is necessary for the JobSubmitter, but is not important for the rest of the agent, certainly not important enough to try and store it in the DB. The list of jobs is then propagated to the 'created' state in the JSM. Problems
As I said, the JobCreator is normally fairly well behaved, but it does have the problem that it does a tremendous amount of work. It creates and checks in the jobs, acquires files, writes to disk, and plays around extensively in the database. All of this can take a tremendous toll on the submit node. These days that load is not quite so bad, but JobCreator? work is a lot of optimizing.
The biggest headache is always the JobSplitting code, especially the pulling of files out of the DB and the checking in of new jobs. If you can figure a way to optimize that it would help a lot.
JobSubmitter
Ah, the JobSubmitter. Completely ridiculous, terribly error-prone, and liable to fall apart if someone sneezes it. The JobSubmitter stores all waiting jobs in a set of caches until slots open up in the locations it can submit to. This opens it up to memory problems, slow algorithm problems, and a massive set of failures if for some reason a huge amount of work is submitted to by the JobCreator (for instance, someone bases work on a huge block).
Upon init, the JobSubmitter builds a huge cache with every job currently in the 'created' state. This is actually several caches, one listing the job information, and one listing the sites that the job can run at. Unlike most other components, the primary method of building this cache is unpickling the job object created by the JobCreator, which holds not only the basic information for the job but other data pulled from the spec that is necessary for the submitters to know. This cache is then refreshed every iteration of the component. Whitelists and blacklists are taken care of, jobs are sorted, and a lot of other stuff happens. Jobs are sorted into groups with the same sandbox and turned from a list of jobs into a JobPackage object, which is subsequently written out to disk. ResourceControl is polled to check to see how many slots are available (more details on this in the library section) For each site that has extra slots, the JobSubmitter goes through the available slots one subscription type (or threshold) at a time. It takes jobs from workflows that match that type until it has built enough jobs to submit that the site is full, then it goes on to the next one. The submitted jobs, having been sorted by sandbox, are sent to BossAir submit() for further submission. JobSubmitter waits for BossAir to return the list of jobs successfully (and unsuccessfully) submitted. It then propagates those jobs to either "executing" or "submitfailed" Problems
Loads of them, mostly related to how the caching works. The caches consist of several dictionaries and lists, sometimes several layers deep. To get an idea of how much of a headache this is, look at the code to remove submitted jobs from the cache:
for siteName in self.cachedJobs.keys(): for taskType in self.cachedJobs[siteName].keys(): for workflow in self.cachedJobs[siteName][taskType].keys(): for cachedJobID in list(self.cachedJobs[siteName][taskType][workflow]): if cachedJobID in jobIDsToPurge: self.cachedJobs[siteName][taskType][workflow].remove(cachedJobID) try: del self.jobDataCache[workflow][cachedJobID] except KeyError: # Already gone pass That's just uncomfortable to watch, let alone run. And it's not the only problem. Here's the definitions of the cache objects:
self.cachedJobIDs = set()
self.cachedJobs = {}
self.jobDataCache = {}
self.jobsToPackage = {}
self.sandboxPackage = {}
self.siteKeys = {}
self.locationDict = {}
self.cmsNames = {}
self.drainSites = []
That's just a lot of caches to keep track of.
So here's the list of common complaints:
The JobSubmitter will use too much memory if someone finds a new way to blow up the cache. Submitting jobs with dozens, if not hundreds, of available sites is a good way to start sabotaging things. Because of python memory management problems, this problem will persist forever. If someone has managed to get a huge number of jobs through the WorkQueue, we'll have the same problem. In that case the cache will be full of hundreds of thousands of regular jobs, causing the memory usage to balloon uncontrollably. At a certain point manipulating the lists will cause the JobSubmitter to just slow down almost continually. Sometimes you will need to add a new variable for the submitters to use. Remember that to do this you have to add the variable both to the tuple put into the cache, and the tuple that is taken out. JobStatusLite (JSL)
Don't ask why JSL has such a stupid name. It's a holdover from the days of BossLite, and I would haste to say that it's not my fault. JSL is also unique in that the code it actually runs is not really in that directory - instead it's the StatusPoller in WMCore.BossAir?. It is also alarmingly simple - most of the work is done by BossAir. Really it only exists to force BossAir to examine the state of all currently running jobs.
Unique among all component, JSL doesn't touch the WMBS database. It runs the BossAir track() method (See that section for more details) and gets the jobs that BossAir lists as running. It checks the time those jobs have been in their current global states with the config-set timeouts for those states. If they have run too long, those jobs are killed. It then tells BossAir what it's decided. Problems
The biggest problem with JSL is that people forget that it exists. If you don't remember that, the fact that jobs get 'stuck' in BossAir can become a bit of a mystery. Any sort of strange error that crashes JSL will cause all transition of jobs in BossAir to grind to a halt. As a result jobs will never finish, and the other components will sit around doing nothing. So far though we haven't had any really major trouble with running JSL.
JobTracker
The JobTracker is the endpoint for the BossAir system. As such it depends almost entirely on the implementation of the BossAir complete() methods. Once it gets information from BossAir, there's not that much to do.
Load all the executing jobs. Load all the jobs that BossAir thinks are complete by running getComplete() (which runs the complete() methods under the hood) If a job is listed by BossAir but not by WMBS, ignore it. if the job ended because it timed out, fail it. Else, make sure that the FWJR is there, and set the job status to 'complete'. Problems
Fortunately, the JobTracker is one of those components that rarely has problems because it does some fairly simple stuff. If it does have problems they're usually related to some ridiculously large chunk of jobs. However, it may look like it's having problems - for instance, jobs can get stuck there. This is addressed in troubleshooting below.
JobAccountant
The JobAccountant is the biggest pain of the standard line components. This is because it does so much.
It loads a list of all the jobs in the 'complete' state. For every job it loads the FWJR, which is brought back from the worker node, and parses out all the data. This includes not only whether or not the job finished, but what it did, what files it produced, where those files are, the dataset those files belong to, etc. Virtually everything that is ever known about the job and the data it produced has to go through the JobAccountant. Having loaded the FWJR, it then has the unpleasant task of injecting the files into WMBS. In order to do this it takes a list of all the files that the job created and records them in cached lists. It holds those lists until it is done with the batch of files it has pulled. Having sorted all the information for injection into WMBS it also produces information in caches for DBSBuffer insertion. DBSBuffer exists because at the beginning the injection into DBS and PhEDEx was considered to take too long to wait for its completion. This means that almost all the file info is duplicated. This is further complicated by the necessity of finding the parentage for each file to be inserted into DBS. The parents of a file in WMBS are not necessarily the right parents - the parents of a file injected into DBS have to be in DBS, which means that unmerged files and intermediate files which were merged to create the final output are ignored. This forces you to trace up several generations for possibly the right parents. Once the cycle is done and all the information in caches, each file is inserted into WMBS. This is more complicated than you might think, as everything has to be done with DAOs written just to do this bulk insertion. Regular file insertion is just too slow. Everything is then repeated to insert files into DBSBuffer, using a different set of custom DAOs. Jobs that succeeded are pushed into the successful category, those that failed are propagated into jobfailed. Regardless, the FWJR is pushed to couch by the JSM. Problems
The JobAccountant is full of problems. Let's go over them.
Loading from the FWJR is too slow, especially going one file per job. There are a lot of caches, which is a great way to blow up memory usage, especially if someone does something stupid like create a job with ten thousand files. Caches should be strictly memory controlled - which is of course impossible because we have no idea how many files you get per job. Any of these could become a disaster. JobAccountant duplicates a lot of effort during this procedure because it injects almost identical information into DBSBuffer and WMBS. One could simplify the system by integrating DBSBuffer and WMBS. One could also cut one's own throat with a butter knife. It might be easier because there's a lot of DBSBuffer stuff that would need changing. Bulk file insert is slow, slow, slow. I don't know a better way to handle it but it's possible that someone with a better knowledge of SQL might be able to accelerate the process. The bulk insert commands are also very temperamental. I make mistakes in the order and the logic all the time. Every once in a while you will have a problem where something is being inserted for a file that should not have it. This usually becomes a partially indecipherable error, usually complaining about a NOT NULL violation or an invalid value. Those you'll have to fix on your own. ErrorHandler
The ErrorHandler is at least in theory very simple. It picks up jobs in the three error states, JobFailed, SubmitFailed, and CreateFailed (which is unused), determines whether they have to be exhausted, and then either exhausts them (basically fails the entire job) or puts them in the appropriate cooloff (it will retry after some period of time).
Load all the jobs in the three failed states. For each state check the jobs against the max retry_count. See if we've already retried too many times. If we have, exhaust the jobs. If we have to check the FWJR do that - load the FWJR and check for the appropriate exit codes and possible reasons. If they match the config list exhaust the jobs here. Send the failed jobs on to ACDC. Propagate jobs to the correct cooloff state. Problems
The big ugly part of the ErrorHandler is the part where it loads the FWJRs manually to check for critical error conditions. Really we shouldn't do this, but it's a hack fix for something that Ops decided it wanted.
RetryManager
The RetryManager does the other end of the error handling. Once the ErrorHandler puts jobs in the cooloff states, they don't return to the created state until the RetryManger says they do. The RetryManager does this by checking against the Agent's assigned retry algorithm.
Load the jobs in each cooloff state from the DB. For each job check it against the plugin-based Retry Algorithm, which is located in WMComponent.RetryManager?.PlugIns?, and are usually somehow based on the retry_count (for instance the linear algorithm just increases the timeout each time by a base factor multiplied by the number of previous retries. If the job passed, then put it into created. If it failed, ignore it. Problems
Not many here. The RetryManager is largely self-sufficient. You may have to tweak the plug-ins from time to time, but everything should be largely self-contained and simple enough to be easy to fix.
JobArchiver
JobArchiver is a bit of an odd duck, as it was originally one of the few components to run at a moderate sort of timescale. As a result it inherited several different tasks. Its main job is to take all finished jobs, those either in the 'success' or 'exhausted' state. It also searches for closable filesets and closes them, and marks workflows as injected if necessary.
Load all jobs in both the cleanout and exhausted state. For each job go through, package up the contents of its cache_dir into a tarball, and then delete the original cache_dir (originally this would be on a separate hard drive, but no longer). Move the jobs into the 'cleanout' state. Close any closable filesets (independent code). Mark as injected any workflows that require it (independent code). Problems
The only problem the JobArchiver has is that the code it runs comes from so many different places it is impossible to keep track of.
Question: Should the miscellaneous functions be handled by a separate Utility module of some sort?
Answer: Maybe, but so far there are only two of them, which means that we just stuck them in when they appeared. If more of them show up, maybe.
TaskArchiver
The TaskArchiver does a lot of stuff. As it is the last thing that runs, it handles all the cleanup - all of it. Not only does it mark all the subscriptions as finished, if a workflow is finished in total, everything in the workflow is removed from the database, all the information is assembled into a WorkloadSummary, and everything is sent off to couch and the WorkQueue for examination. Because of this the TaskArchiver is actually very long and convoluted.
Instead of loading jobs, the TaskArchiver loads a list of finished subscriptions - subscriptions with no child subscriptions who have only closeout jobs. Having loaded those subscriptions, it notifies the WorkQueue that they are done. It then goes through and deletes each subscription. It does this by using the WMBSSubscription.deleteEverything() method. This goes through and deletes all associated jobs, job groups, masks, and any files that are not in use by other pieces of the code. If the workflow assigned to the subscription is no longer in use, it deletes that too. Having done this, for each subscription it checks to see if the workflow is still there. If not, it then proceeds to archive the workflow information into a WorkflowSummary First it pulls all failed job info out of couch by using the "failedJobsByWorkflowName" view. Then it pulls the output by using the "outputByWorkflowName" view. Then it pulls the performance information using the "performanceByWorkflowName" view. It sorts the performance information, grouping it all together, and calculates various statistical values. It pulls out the "worst offenders" for each stat and attaches their output, logArchive and other information for a more detailed examination. It organizes the rest of the workflow information, including properly formatting a look at all failed jobs, putting error messages in the summary, and including outputs. The entire summary is then posted to couch. The TaskArchiver also handles some JSON stuff for CRAB and finishes. At the end of the TaskArchiver, all data has been erased from WMBS. Problems
Can you even list them?
People want new stuff in the WorkloadSummary all the time. This takes a lot of work if the data is not immediately available as it usually has to be pulled out of couch. Pulling information out of couch for the WorkloadSummary all the time creates a very slow process. Couch is not that speedy, and when you combine that with how much you try to pull out of couch, well, things can get ugly very quickly. It would be nice if we could optimize this, but we are dealing with two couch databases already, things only seem to get slower. If anything does go wrong in WMBS, TaskArchiver is very good at erasing all the evidence. That means that any work you do for debugging often has to be done with the TaskArchiver off, or you will find your data disappearing in midstream. Somewhere there's probably a bad condition that deletes database entires before we're ready. That has to be found, but that's a pain. It's slow, slow, slow. Because it does delete everything there is a constant debate about where to put various monitoring, etc. calls in relation to it in the chain of events. I really don't know what to say about this, but the TaskArchiver is sort of the final word, so it tends to get roped into attempts to stick other things in the 'line' of necessary procedures. DBSUpload
If you're here, chances are you're up the creek without a paddle. As it currently stands, the DBSUploader is the most complicated and confused piece of code that we have. I've rewritten it several times, none of which has done more than a few minor improvements in style and speed.
Requirements
Part of the problem with the DBSUploader is that it does too many things. It has two (possibly three depending on how you count) complete tasks that it attempts to execute. It reads DBSBuffer, creates blocks, checks those blocks into the database, checks blocks out of the database to upload, uploads them, migrates them, and then marks them in DBSBuffer. Here are the main concerns:
DBS2 (and to a certain extent, DBS3) can fail utterly and unexpectedly for a variety of reasons during transfer. DBS2 does not always produce machine-parsable errors. Most importantly, if DBS2 fails, it gives no indication of the final state of the system. Not everything is wrapped in a single transaction, so if an exception is thrown it is no guarantee that absolutely nothing was transfered. If you are attempting to upload, or worse, migrate, a block through DBS and it fails, you have no way of knowing whether the block and the files were properly inserted. Everything we do in DBS is done in terms of blocks. However, everything we produce we produce in terms of files. Therefore DBSUploader has to turn files into blocks. Because of the previous points, DBSUploader must save the blocks it produces before attempting to upload them. If you built new blocks after an initial failure with the same files but new block names, you could be duplicating the files. If you add files to the same block and attempt to reupload you will get a duplication error. If you change the files in any way and attempt to upload the same block you could get a duplication error. This does not necessarily mean that the files are correct, just that the block name is the same. Given the above, you absolutely have to upload the same version of the block in every iteration. This is done by sorting files into blocks and then saving the blocks to the database before moving to the uploading phase. Because DBSUploader is high priority, the component cannot crash just because one dataset is bad. Therefore any exception must result in that block being passed over, but must not result in everything crashing. DBSUploader
So here's more or less what the DBSUploader does do:
Creating blocks Loads any new files out of the DBSBuffer database (where the JobAccountant put them) Sort files into blocks based on the same location, dataset, and algorithm. Block the files and save them into dbsbuffer_block If any blocks are full, move them to status 'PENDING' Uploading blocks Load all blocks in status 'PENDING' Load all files from each of those blocks. Sort the blocks into like categories. Using DBSInterface.runDBSBuffer, upload the blocks. Mark blocks in DBSBuffer depending on whether they were inserted or migrated DBSInterface
Everything for DBS2 should be done through the DBS2 API. Unfortunately, I found that to be largely unhelpful. Correspondingly, I ended up writing a wrapper layer entirely around the DBS2 API, the DBSInterface. The Interface has a set of functions that act as wrappers around the API classes, transforming data into the correct format and properly logging errors, and then an interface class which handles all the work. The Interface class is called using the runDBSBuffer() method, and does the following:
Inserts the dataset and the algorithm, or at least tries to make sure that they're there. Creates the blocks For each block, assembles and uploads the files for that block. Checks if the blocks are closed. If closed, call migrateClosedBlocks() to migrate them. Migrate the blocks to DBS2 Problems
There are no end to problems here. The most common, unfortunately, is that DBSBuffer and DBS get out of synch. When that happens there will either be an error as you try and upload things that will repeat indefinitely, or, worse, things will not be complete. A DBS error is usually solved by the following steps:
Check the entries for the block in DBS2 and DBSBuffer. If the block exists in both places you could be looking for missing files, etc. Check the parents of the block. The number one problem for migration is that an earlier error caused parents not to be migrated to global, but was not caught. Check for an invalid dataset name, or algorithm, or one of the other pieces that makes up a block. This could prevent a block from being inserted in the first place. Make sure that if a block is closed in one place, it's closed in the others. If all else fails, purge the entry from DBSBuffer and rerun the workflow. This is not a good option. I really don't know what to tell you, other than good luck. You'll probably need it.
PhEDExInjector
Injects files that are in DBS into PhEDEx. I didn't write it so I don't know how it works. It depends on the DBSBuffer database though, not the WMBS objects, and follows DBSUpload in that particular chain of events.
WorkQueueManager
I have no idea what this does, it was written and maintained by the WorkQueue group, so they might have a better idea than I do.
Libraries
These are libraries in WMCore that I regularly have to deal with or at least know how they work.
BossAir
WMSpec
WMSpec contains literally all the information that we know about the workflow or the job, and contains the code that runs it on the worker node. For a better analysis of how things run on the worker node, look at that particular section of this guide. For other things, the basic code in WMSpec consists of:
The Objects: WMWorkload, WMTask, WMStep and their associated code. Workloads are made up of tasks, tasks are made up of steps. All of them have a large amount of information loaded into them and a large variety of methods for handling that information. You rarely work with these objects directly - of more use are the Helper classes in each file that contain all the user-friendly accessors. The StdSpecs: WMCore.WMSpec.StdSpecs? is one of the most important pieces of code in WMCore. This directory contains a class for each type of request that allows you to build a WMWorkload out of just a dictionary of arguments, setting up the Output Modules, the Monitoring, the Input Steps, and everything else in a concise manner. It also holds the validation that the ReqMgr runs to make sure that the job is alright. This code often has to be tweaked. Most often it is StdBase, from which all other spec types inherit, that has the most problems, and you should familiarize yourself with how that code works. The Step components: Each step type (CMSSW, StageOut?, LogArchive?) has the code that you actually run in WMCore.WMSpec.Steps.Executors. They also have a template (used to build arguments) in WMCore.WMSpec.Steps.Templates, a builder in WMCore.WMSpec.Steps.Builders (maybe deprecated), a diagnostic tool in WMCore.WMSpec.Steps.Diagnostics, and some other random code running around. Anything that you have to do to change how an individual step runs on the worker node probably has to be done here. See the section on what happens on the worker node for more details. ResourceControl
ResourceControl is a set of a database table and its associated interfaces that controls the sites and where they get their jobs. The sites are described in the wmbs_location table, but the actual amount of jobs submitted is done through ResourceControl. This is because even though you have a certain number of slots on a site, you want to control how many jobs start at a particular site based upon the job type. For instance, you don't want to send hundreds of Merge jobs to a single site for fear of overloading their SE. To handle these requirements there is a table called rc_thresholds. For a given site there is a threshold for each subscription type (Merge, Processing, Production, etc.) that can run at that particular site. ResourceControl keeps track of how many slots there are for each type of job, and can tell you how many slots there are available.
This seems like a remarkably easy task to handle, so you might wonder what requires all the code that's in there. The tricky part is actually accounting for all of the jobs. In order to make sure that you don't oversubmit jobs, ResourceControl has to report not only the number of jobs currently running at a site, but also the number of jobs in the system that are waiting for slots at that site to open up. This is a huge, massive list, and because you don't actually know what site those jobs are at (and they could potentially go to several), you end up with a giant mound of SQL code.
At this point ResourceControl is fairly well controlled, but the SQL spaghetti that makes up the UnassignedSQL call occasionally causes problems. Unfortunately, there's not a lot I can tell you about how to fix it.
FwkJobReport
The FWJR is the main method by which we get information from a job. The class itself is in WMCore.FwkJobReport?.Report, and is based on the same Configuration structure as all the config files, with a lot of auxiliary methods. Those methods are usually just to make sure that the Accountant gets a fairly easy time of things, but they also help tracking things later on.
Creation
A FWJR is created on the worker node once for each step (for more info on how the job actually runs, see the "How Jobs Run" section - once I write it). When a job starts, it creates a "default" FWJR to be used in case no real FWJR is created due to failure. After that each step (CMSSW, StageOut, etc.) creates a FWJR in their sub-directory. When the task finishes, the various FWJR objects are collated by the WMCore.WMSpec.WMTask:
def completeTask(self, jobLocation, logLocation):
"""
_completeTask_
Combine all the logs from all the steps in the task to a single log
If necessary, output to Dashboard
"""
import WMCore.FwkJobReport.Report as Report
finalReport = Report.Report()
# We left the master report somewhere way up at the top
testPath = os.path.join(jobLocation, '../../', logLocation)
if os.path.exists(testPath):
# If a report already exists, we load it and
# append our steps to it
finalReport.load(testPath)
taskSteps = self.listAllStepNames()
for taskStep in taskSteps:
reportPath = os.path.join(jobLocation, taskStep, "Report.pkl")
if os.path.isfile(reportPath):
stepReport = Report.Report(taskStep)
stepReport.unpersist(reportPath)
finalReport.setStep(taskStep, stepReport.retrieveStep(taskStep))
else:
# Then we have a missing report
# This should raise an alarm bell, as per Steve's request
# TODO: Change error code
finalReport.addStep(reportname = taskStep, status = 1)
finalReport.addError(stepName = taskStep, exitCode = 99999, errorType = "ReportManipulatingError",
errorDetails = "Could not find report file for step %s!" % taskStep)
finalReport.data.completed = True
finalReport.persist(logLocation)
Once this is done, the FWJRs are passed back to the agent through whatever system the batch code uses.
Reading
FWJRs are loaded and read by the JobAccountant component in the loadJobReport() method. Once this has been done, the FWJR is then processed by the subsequent steps in the Accountant chain, the files which were put in are extracted and placed into WMBS, and the necessary steps taken.
At the end of the Accountant cycle, the FWJR is put into couch. This is done by putting the FWJR in a 'fwjr' key attached to a job, and then propagating the job the same way that you would propagate any normal job from one state to the next:
if job.get("fwjr", None):
# If there are too many input files, strip them out
# of the FWJR, as they should already
# be in the database
# This is not critical
try:
if len(job['fwjr'].getAllInputFiles()) > self.maxUploadedInputFiles:
job['fwjr'].stripInputFiles()
except:
logging.error("Error while trying to strip input files from FWJR. Ignoring.")
pass
job["fwjr"].setTaskName(job["task"])
fwjrDocument = {"_id": "%s-%s" % (job["id"], job["retry_count"]),
"jobid": job["id"],
"retrycount": job["retry_count"],
"fwjr": job["fwjr"].__to_json__(None),
"type": "fwjr"}
self.fwjrdatabase.queue(fwjrDocument, timestamp = True)
This has the interesting effect of making any object that you put in the 'fwjr' key:value slot of a job transition to the fwjrdatabase, so be careful of that.
WMRuntime
All the code for unpacking the job on the worker node, setting it up, setting up the logging, and getting it started is in WMRuntime. So is the monitoring code that watches over the process when it is running. For more information see the section on how jobs run.
JobSplitting
The JobSplitting code is the code that takes a subscription with all its attached files and creates jobs out of them. This is done in a plugin-based manner, with all the relevant code in WMCore.JobSplitting?. The most important class is the JobFactory, from which all functionality is inherited. Just by looking at the first methods of JobFactory you can already see one crucial issue, JobSplitting works on both WMBS and non-database DataStruct? jobs. This means that there are two sets of functionality in some cases, and two sets of unittests (one in JobSplitting_t and one in WMBS_t.JobSplitting?_t). The second problem you will notice is the convoluted way in which it can load files:
def loadFiles(self, size = 10):
"""
_loadFiles_
Grab some files from the resultProxy
Should handle multiple proxies. Not really sure about that
"""
if len(self.proxies) < 1:
# Well, you don't have any proxies.
# This is what happens when you ran out of files last time
logging.info("No additional files found; Ending.")
return set()
resultProxy = self.proxies[0]
rawResults = []
if type(resultProxy.keys) == list:
keys = resultProxy.keys
else:
keys = resultProxy.keys()
if type(keys) == set:
# If it's a set, handle it
keys = list(keys)
files = set()
while len(rawResults) < size and len(self.proxies) > 0:
length = size - len(rawResults)
newResults = resultProxy.fetchmany(size = length)
if len(newResults) < length:
# Assume we're all out
# Eliminate this proxy
self.proxies.remove(resultProxy)
rawResults.extend(newResults)
... This code is responsible for a great many of my headaches, but it acts to open a cursor in the database, allowing us to pull the files out of the database one batch at a time. Instead of trying to pull the whole batch at once, this code, if called by the JobCreator, will pull only a small bunch.
The problem is that this doesn't work well for all things. While you might want to limit the number of files you pull for a Processing job, a Merge step has its own way of limiting how many files to pull. Therefore some splitting algorithms run this way, and some don't.
The actual code for each of the splitting algorithms is contained in a plugin in the same directory. Each plugin has a name that is used as the splitting type when those arguments are passed to the JobCreator, and has a method called algorithm() for use to actually split the jobs. Inside algorithm() the plugin uses the newGroup() and newJob() methods inherited from JobFactory to produce jobs. In the simplest case:
if eventsInFile >= eventsPerJob:
while currentEvent < eventsInFile:
self.newJob(name = self.getJobName(length=totalJobs))
self.currentJob.addBaggageParameter("eventsPerJob", eventsPerJob)
self.currentJob.addFile(f)
self.currentJob["mask"].setMaxAndSkipEvents(eventsPerJob, currentEvent)
currentEvent += eventsPerJob
totalJobs += 1
else:
self.newJob(name = self.getJobName(length=totalJobs))
self.currentJob.addBaggageParameter("eventsPerJob", eventsPerJob)
self.currentJob.addFile(f)
self.currentJob["mask"].setMaxAndSkipEvents(f["events"], 0)
totalJobs += 1
for EventBased splitting. It should be noted that newJob() not only creates a new job and attaches it to the currentJob attribute, but also handles the creation of the old job.
Here is a list of common splitting algorithms that I know of:
LumiBased The most common - used for processing data as lumis are natural units for Operations FileBased Much more convoluted than it seems - ask EWV for all the various permutations of options EventBased Primarily used for MC Production ParentlessMergeBySize The default merge algorithm - understood by sfoulkes SiblingProcessingBased Used for various output tasks - understood by sfoulkes EndOfRun For those tasks that run when the rest of the subscription is finished ACDC
Runtime On The Worker Node
On the worker node a lot of complicated stuff happens. Mostly I don't understand it, but I have a vague grasp of how things happen and in what order. Let's go through it.
The Job Basics
The job lands on the worker node with four objects. One is the tarball, which contains the WMSpec, the libraries, and any code needed to run. One is the JobPackage?, a pickled object that contains a list of jobs in a format that can be quickly unwrapped. The other two are the main executables, the submit.sh script that runs the code, and the Unpacker.py script, whih is responsible for unpacking and building the working directory.
The submit script is mostly concerned with grid variables, looking for the right directories in which to run code and to figure out what kind of grid it happens to be running on. The Unpacker is mostly a wrapper around the tarfile module, which is used to unpack the tarball, set up the directories, and get everything else running. The code is then handed off to Startup.py.
Startup
The actual code is run out of WMCore.WMRuntime.Startup. This is the master code that runs everything, and is remarkably simple:
print "Startup.py : loading job definition"
job = Bootstrap.loadJobDefinition()
print "Startup.py : loading task"
task = Bootstrap.loadTask(job)
print "Startup.py : setting up monitoring"
logLocation = "Report.%i.pkl" % job['retry_count']
Bootstrap.createInitialReport(job = job,
task = task,
logLocation = logLocation)
monitor = Bootstrap.setupMonitoring(logPath = logLocation)
print "Startup.py : setting up logging"
Bootstrap.setupLogging(os.getcwd())
print "Startup.py : building task"
task.build(os.getcwd())
print "Startup.py : executing task"
task.execute(job)
print "Startup.py : completing task"
task.completeTask(jobLocation = os.getcwd(),
logLocation = logLocation)
print "Startup.py : shutting down monitor"
os.fchmod(1, 0664)
os.fchmod(2, 0664)
if monitor.isAlive():
monitor.shutdown()
Startup basically does three things:
Uses WMCore.WMRuntime.Bootstrap to set up the logging, the environment, and the underlying necessities. Calls on WMCore.WMSpec.WMTask (which is in the WMSpec) to build the actual task, and execute it. Cleans up the monitoring at exit. Bootstrap
Bootstrap holds all the environment setup for the WMRuntime code. At the very least it:
Loads the job definition out of the JobPackage file that was brought along with the job. Loads the WMWorkload, which contains the whole workload definition, and then uses the task info from the job to load the WMTask being run by this job. Creates a dummy job report. If the job is terminated early by some unknown factor, this is what is returned to the submit node. The basic report has the following characteristics: report.data.WMAgentJobID = job.get('id', None) report.data.WMAgentJobName = job.get('name', None) report.data.seName = siteCfg.localStageOut.get('se-name', socket.gethostname()) report.data.siteName = getattr(siteCfg, 'siteName', 'Unknown') report.data.hostName = socket.gethostname() report.data.ceName = getSyncCE() report.data.completed = False report.setTaskName(taskName = job.get('task', 'TaskNotFound')) Sets up the monitoring. The monitoring code is in WMCore.WMRuntime.Watchdog, and is based on plugins in WMCore.WMRuntime.Monitors. Common monitoring includes the Dashboard and Performance monitors. Once Bootstrap loads the monitors that it finds in the WMWorkload it attaches them to a thread using the python threading module. This module runs in the background during most of the process, and is only directly signaled from the Execution stage. Sets up the global logging for the remaining processes. WMTask
Bootstrap sets things up so the task can run. This is where things get a bit too obscure for me to figure out entirely what happens, but the task calls upon the WMCore.WMSpec.Steps.BuildMaster? to build the local environment, pulling in the task-specific modules. Following this it sets up its own environment if further setup is called for, and then instantiates an ExecuteMaster.
ExecuteMaster
WMCore.WMSpec.Steps.ExecuteMaster? is probably the closest thing to a defined "execute" piece of code. All the actual difficult stuff is now run, one step at a time, in the call() function:
for step in task.steps().nodeIterator():
try:
helper = WMStepHelper(step)
stepType = helper.stepType()
stepName = helper.name()
if skipToStep and skipToStep != stepName:
# Then we continue until we get to the required step
continue
skipToStep = None # Reset this when we get to the right step
executor = StepFactory.getStepExecutor(stepType)
result = self.doExecution(executor, step, wmbsJob)
if not result == None:
skipToStep = result
A WMStepHelper is built out of the particular step, which then has all the data. An executor is built out of the step, using the step type. These are pulled out of WMCore.WMSpec.Steps.Executors, and there is one for each step type (CMSSW, StageOut?, etc) The executor is run The result is taken. If a result is returned it is interpreted as an instruction to skip to a particular step. No other steps will be run until that step is reached (for instance CMSSW can fail and tell you to go straight to LogArchive rather than attempting to stage out nonexistant files). The Exector - or Actually Running the Job
The actual code that runs whatever you have to do is in the Executor, the step-specific one in WMCore.WMSpec.Steps.Executors. Each executor consists of three steps.
A pre() step that is run before the rest of the code and handles setup. An execute() step that runs the main body of the code, and usually interfaces with other code (like CMSSW) to do the job. This is where most of the pain happens. A post() step that checks things after they've run. The post() step can return a value that is then used as the step to skip to (in the ExecuteMaster, above). Usually the execute() steps are very convoluted, and they themselves make use of multiple other libraries. However, there is one part I should point out - all information that is put into the FWJR by hand, and not by the CMSSW Framework, is handled in the CMSSW step:
self.report.setValidStatus(validStatus = validStatus) self.report.setGlobalTag(globalTag = globalTag) self.report.setInputDataset(inputPath = inputPath) self.report.setCustodialSite(custodialSite = custodialSite) So there are things that you occasionally have to do inside the step itself. The steps all contain a self.report, which is then collated into one report later on.
Finishing Up
Once all the steps have finished running, things go back up the chain. Most of this is unimportant, but I would like to touch on the WMTask for a moment again. The WMTask has a method called completeTask() which is of importance because it finishes up the job.
taskSteps = self.listAllStepNames()
for taskStep in taskSteps:
reportPath = os.path.join(jobLocation, taskStep, "Report.pkl")
if os.path.isfile(reportPath):
stepReport = Report.Report(taskStep)
stepReport.unpersist(reportPath)
finalReport.setStep(taskStep, stepReport.retrieveStep(taskStep))
else:
# Then we have a missing report
# This should raise an alarm bell, as per Steve's request
# TODO: Change error code
finalReport.addStep(reportname = taskStep, status = 1)
finalReport.addError(stepName = taskStep, exitCode = 99999, errorType = "ReportManipulatingError",
errorDetails = "Could not find report file for step %s!" % taskStep)
What this does is go through each step that was in the task and look in the step's running directory for a Report.pkl file. If it finds one, it collates this together into a single report object which is then reported to the WMAgent. If it misses one, it adds an error for that step into the FWJR.
This is where the actual FWJR you get back is created. Any failure that causes things to crash out before this step will cause all the FWJR data to get lost.
Common Feature Requests
I'm listing below a series of common feature requests that I've received. Hopefully this is enough to tell you how to fix things.
We Need to Add a New Argument When Submitting to the Batch System
This is done entirely in the JobSubmitter?, but it has to be done in several places.
First, you have to put the job information into the cache after you load it. jobInfo = (jobID, newJob["retry_count"], batchDir, loadedJob["sandbox"], ... loadedJob.get("swVersion", None))
self.jobDataCache[workflowName][jobID] = jobInfo\
Second, you have to attach that information to the job object before sending it to submit. jobDict = {'id': cachedJob[0], 'retry_count': cachedJob[1], 'custom': {'location': siteName}, 'cache_dir': cachedJob[4], 'packageDir': package, ... 'swVersion': cachedJob[11]}
# Add to jobsToSubmit
jobsToSubmit[package].append(jobDict)
Third, in BossAir, you need to tell the RunJob object to expect your new variable by defining it in the init() statement. self.setdefault('id', id) self.setdefault('jobid', jobid) self.setdefault('gridid', gridid) ... self.setdefault('siteName', siteName) Only with all of these steps done do we get to move it all the way through the chain.
We Need More Information in DBSBuffer from the Spec
This is a bit counter-intuitive, but it's been done several times before. Simply put, all information that goes into DBSBuffer comes from the FWJR in the JobAccountant. Rather then trying to add the information to the database and have the JobAccountant load a new field, the simplest matter just seems to be to put it in the FWJR in the first place while you're on the Worker Node, because there you have access to the spec.
First, you have to add a field to the FWJR to hold the data. Generally, this information has to be set for each file: def setGlobalTag(self, globalTag): """ setGlobalTag
Set the global Tag from the spec on the WN
ONLY run this after all the files have been attached
"""
fileRefs = self.getAllFileRefs()
# Should now have all the fileRefs
for f in fileRefs:
f.globalTag = globalTag
return
Next, make sure that information in the FWJR is read out properly when the files are created. newFile["globalTag"] = getattr(fileRef, 'globalTag', None) Then you have to make a column for the data in DBSBuffer. You should also write some DAOs or other methods to get the data into DBSBuffer. In the CMSSW step, you now have to make sure the information gets from the Spec to the FWJR. In WMCore.WMSpec.Steps.Executors.CMSSW: validStatus = self.workload.getValidStatus() inputPath = self.task.getInputDatasetPath() globalTag = typeHelper.getGlobalTag() custodialSite = self.workload.getCustodialSite() cacheUrl, cacheDB, configID = stepHelper.getConfigInfo()
self.report.setValidStatus(validStatus = validStatus)
self.report.setGlobalTag(globalTag = globalTag)
self.report.setInputDataset(inputPath = inputPath)
self.report.setCustodialSite(custodialSite = custodialSite)
Then in the JobAccountant, make sure that the Accountant transfers the data to DBSBuffer. dbsFile.setGlobalTag(globalTag = jobReportFile.get('globalTag', None)) Add Another Field to the WorkloadSummary
All of this is done through the TaskArchiver. The data sent to the WorkloadSummary is simply a dictionary, adding another field is simple.
self.workdatabase.commitOne(workflowData) The only problem is getting the data, which you'll have to load out of couch using one of the current load functions. You may need to talk to a couch expert to see about that.
Add/Change Data Going to Dashboard
Most of this is sent from the Worker Node, and can be found in WMCore.WMRuntime.DashboardInterface?. The code itself is run by WMCore.WMRuntime.Monitors.DashboardMonitor?, which is run by the watchdog monitoring system. The data is just sent as a dictionary, so you can add or remove anything you like.
Handle a Particular CMSSW Error on the Worker Node or Add Information From CMSSW Error
I haven't seen this one for a while because few errors require special handling, but sometimes you get requests to add more stuff to a diagnostic, like adding more lines from the CMSSW log file. This code is convoluted and I don't quite understand all the nuances, but for each step in WMCore.WMSpec.Steps.Executors, there is a corresponding diagnostic module that handles all the errors in WMCore.WMSpec.Steps.Diagnostics. This is where you can write handlers that handle errors or add more information to the FWJR error for each step.
Common Problems and Troubleshooting Techniques
All the jobs are finished on the batch system, but are stuck in the executing state
First, did you mess with the database? Second, did you mess with the database?
The problem here is almost always BossAir. Because of the way that BossAir is built, it requires information to match the job in BossAir with the job in WMBS. Because BossAir keeps an entry in its tables for every retry that the job undergoes (don't ask), it needs all the information to match. This means that if you change the retry_count, for instance, in the master wmbs_job table, BossAir will not be able to match the jobs it has with the jobs WMBS thinks it has, and it will never mark the WMBS jobs as finished.
Another common problem is the user field. Because of some idiosyncrasies that have never been fixed, BossAir only requires a proper User ID when ending the job. Therefore, if the job was submitted with a non-existent user, it will get to this state, and then fail. This is usually due to the User DN having to go to the system default. Check to make sure that the User field in the bl_runjob table has no NULL entries. If it does, assign those jobs to the appropriate users and that usually fixes the problem.
All the jobs are stuck in some other state
This could be any one of a nu