From f7855c84013dbe7c11a279ea9bce9e86a37be6e5 Mon Sep 17 00:00:00 2001 From: dachengx Date: Sun, 15 Sep 2024 15:30:41 -0500 Subject: [PATCH 1/4] Tarball strax(en) and cutax for later user installation --- outsource/outsource.py | 90 ++++++++++++++++++--------- outsource/workflow/combine-wrapper.sh | 8 +-- outsource/workflow/install.sh | 34 ++++++++++ outsource/workflow/process-wrapper.sh | 7 +-- 4 files changed, 97 insertions(+), 42 deletions(-) create mode 100644 outsource/workflow/install.sh diff --git a/outsource/outsource.py b/outsource/outsource.py index d8337d3..e034731 100644 --- a/outsource/outsource.py +++ b/outsource/outsource.py @@ -10,6 +10,7 @@ from tqdm import tqdm from utilix import DB, uconfig from utilix.x509 import _validate_x509_proxy +from utilix.tarball import Tarball from utilix.config import setup_logger, set_logging_level import cutax @@ -274,12 +275,8 @@ def _generate_sc(self): condorpool.add_profiles(Namespace.PEGASUS, style="condor") condorpool.add_profiles(Namespace.CONDOR, universe="vanilla") # We need the x509 proxy for Rucio transfers - condorpool.add_profiles( - Namespace.CONDOR, key="x509userproxy", value=os.environ["X509_USER_PROXY"] - ) - condorpool.add_profiles( - Namespace.CONDOR, key="+SingularityImage", value=f'"{self.singularity_image}"' - ) + condorpool.add_profiles(Namespace.CONDOR, "x509userproxy", os.environ["X509_USER_PROXY"]) + condorpool.add_profiles(Namespace.CONDOR, "+SingularityImage", self.singularity_image) # Ignore the site settings - the container will set all this up inside condorpool.add_profiles(Namespace.ENV, OSG_LOCATION="") @@ -312,6 +309,44 @@ def _generate_tc(self): def _generate_rc(self): return ReplicaCatalog() + def make_tarballs(self): + """Make tarballs of Ax-based packages if they are in editable user- + installed mode.""" + tarballs = [] + tarball_paths = [] + for package_name in ["strax", "straxen", "cutax"]: + _tarball = Tarball(self.generated_dir, package_name) + if not Tarball.get_installed_git_repo(package_name): + # Packages should not be non-editable user-installed + if Tarball.is_user_installed(package_name): + raise RuntimeError( + f"You should install {package_name} in non-editable user-installed mode." + ) + # cutax is special because it is not installed in site-pacakges of the environment + if package_name == "cutax": + if "CUTAX_LOCATION" not in os.environ: + raise RuntimeError( + "cutax should either be editable user-installed from a git repo " + "or patched by the software environment by CUTAX_LOCATION." + ) + tarball = File(_tarball.tarball_name) + tarball_path = ( + "/ospool/uc-shared/project/xenon/xenonnt/software" + f"/cutax/v{cutax.__version__}.tar.gz" + ) + else: + continue + else: + _tarball.create_tarball() + tarball = File(_tarball.tarball_name) + tarball_path = _tarball.tarball_path + self.logger.warning( + f"Using tarball of user installed package {package_name} at {tarball_path}." + ) + tarballs.append(tarball) + tarball_paths.append(tarball_path) + return tarballs, tarball_paths + def _generate_workflow(self): """Use the Pegasus API to build an abstract graph of the workflow.""" @@ -338,6 +373,10 @@ def _generate_workflow(self): combinepy = File("combine.py") rc.add_replica("local", "combine.py", f"file://{base_dir}/workflow/combine.py") + # script to install packages + combinepy = File("install.sh") + rc.add_replica("local", "install.sh", f"file://{base_dir}/workflow/install.sh") + # Add common data files to the replica catalog xenon_config = File(".xenon_config") rc.add_replica("local", ".xenon_config", f"file://{uconfig.config_path}") @@ -348,17 +387,9 @@ def _generate_workflow(self): "local", ".dbtoken", "file://" + os.path.join(os.environ["HOME"], ".dbtoken") ) - # cutax tarball - cutax_tarball = File("cutax.tar.gz") - if "CUTAX_LOCATION" not in os.environ: - self.logger.warning( - "No CUTAX_LOCATION env variable found. Using the latest by default!" - ) - tarball_path = "/ospool/uc-shared/project/xenon/xenonnt/software/cutax/latest.tar.gz" - else: - tarball_path = os.environ["CUTAX_LOCATION"].replace(".", "-") + ".tar.gz" - self.logger.warning(f"Using cutax: {tarball_path}") - rc.add_replica("local", "cutax.tar.gz", tarball_path) + tarballs, tarball_paths = self.make_tarballs() + for tarball, tarball_path in zip(tarballs, tarball_paths): + rc.add_replica("local", tarball, tarball_path) # runs iterator = self._runlist if len(self._runlist) == 1 else tqdm(self._runlist) @@ -388,7 +419,7 @@ def _generate_workflow(self): if not all( [dbcfg._raw_data_exists(raw_type=d) for d in dbcfg.depends_on(dtype)] ): - self.logger.warning( + self.logger.error( f"Doesn't have raw data for {dtype} of run_id {run_id}, skipping" ) continue @@ -442,8 +473,8 @@ def _generate_workflow(self): # combine jobs must happen in the US combine_job.add_profiles(Namespace.CONDOR, "requirements", requirements_us) # priority is given in the order they were submitted - combine_job.add_profiles(Namespace.CONDOR, "priority", f"{dbcfg.priority}") - combine_job.add_inputs(combinepy, xenon_config, cutax_tarball, token) + combine_job.add_profiles(Namespace.CONDOR, "priority", dbcfg.priority) + combine_job.add_inputs(combinepy, xenon_config, token, *tarballs) combine_output_tar_name = f"{dbcfg.key_for(dtype)}-combined.tar.gz" combine_output_tar = File(combine_output_tar_name) combine_job.add_outputs( @@ -488,9 +519,7 @@ def _generate_workflow(self): download_job.add_profiles( Namespace.CONDOR, "requirements", requirements ) - download_job.add_profiles( - Namespace.CONDOR, "priority", f"{dbcfg.priority}" - ) + download_job.add_profiles(Namespace.CONDOR, "priority", dbcfg.priority) download_job.add_args( dbcfg.run_id, self.context_name, @@ -502,7 +531,7 @@ def _generate_workflow(self): f"{self.update_db}".lower(), chunk_str, ) - download_job.add_inputs(processpy, xenon_config, token, cutax_tarball) + download_job.add_inputs(processpy, xenon_config, token, *tarballs) download_job.add_outputs(data_tar, stage_out=False) wf.add_jobs(download_job) @@ -526,11 +555,10 @@ def _generate_workflow(self): # Give a hint to glideinWMS for the sites # we want(mostly useful for XENONVO in Europe) job.add_profiles( - Namespace.CONDOR, "+XENON_DESIRED_Sites", f'"{desired_sites}"' + Namespace.CONDOR, "+XENON_DESIRED_Sites", desired_sites ) job.add_profiles(Namespace.CONDOR, "requirements", requirements) - job.add_profiles(Namespace.CONDOR, "priority", f"{dbcfg.priority}") - # job.add_profiles(Namespace.CONDOR, 'periodic_remove', periodic_remove) + job.add_profiles(Namespace.CONDOR, "priority", dbcfg.priority) job.add_args( dbcfg.run_id, @@ -544,7 +572,7 @@ def _generate_workflow(self): chunk_str, ) - job.add_inputs(processpy, xenon_config, token, cutax_tarball) + job.add_inputs(processpy, xenon_config, token, *tarballs) job.add_outputs(job_output_tar, stage_out=(not self.upload_to_rucio)) wf.add_jobs(job) @@ -574,7 +602,7 @@ def _generate_workflow(self): job = self._job(**self.job_kwargs[dtype], cores=2) # https://support.opensciencegrid.org/support/solutions/articles/12000028940-working-with-tensorflow-gpus-and-containers job.add_profiles(Namespace.CONDOR, "requirements", requirements) - job.add_profiles(Namespace.CONDOR, "priority", f"{dbcfg.priority}") + job.add_profiles(Namespace.CONDOR, "priority", dbcfg.priority) # Note that any changes to this argument list, # also means process-wrapper.sh has to be updated @@ -589,7 +617,7 @@ def _generate_workflow(self): f"{self.update_db}".lower(), ) - job.add_inputs(processpy, xenon_config, token, cutax_tarball) + job.add_inputs(processpy, xenon_config, token, *tarballs) # As long as we are giving outputs job.add_outputs(job_output_tar, stage_out=True) wf.add_jobs(job) @@ -636,7 +664,7 @@ def submit(self, force=False): if os.path.exists(self.workflow_dir): if force: self.logger.warning( - f"Overwriting workflow at {self.workflow_dir}. CTRL+C now to stop." + f"Overwriting workflow at {self.workflow_dir}. Press ctrl+C now to stop." ) time.sleep(10) shutil.rmtree(self.workflow_dir) diff --git a/outsource/workflow/combine-wrapper.sh b/outsource/workflow/combine-wrapper.sh index 0cdcd25..ec51204 100755 --- a/outsource/workflow/combine-wrapper.sh +++ b/outsource/workflow/combine-wrapper.sh @@ -49,12 +49,8 @@ if [ "X$upload_to_rucio" = "Xtrue" ]; then export RUCIO_ACCOUNT=production fi -echo "Installing cutax:" -mkdir cutax -tar -xzf cutax.tar.gz -C cutax --strip-components=1 -# Install in a very quiet mode by -qq -pip install ./cutax --user --no-deps --qq -python -c "import cutax; print(cutax.__file__)" +# Installing customized packages +. install.sh # Combine the data time python combine.py ${run_id} ${dtype} --context ${context} --xedocs_version ${xedocs_version} --input data ${combine_extra_args} diff --git a/outsource/workflow/install.sh b/outsource/workflow/install.sh new file mode 100644 index 0000000..55b313e --- /dev/null +++ b/outsource/workflow/install.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +set -e + +# List of packages +packages=("cutax" "straxen" "strax") + +# Loop through each package +for package in "${packages[@]}" +do + # Check if the tarball exists + if [ ! -f "$package.tar.gz" ]; then + echo "Tarball $package.tar.gz not found. Skipping $package." + echo + continue + fi + + echo "Installing $package:" + + # Create a directory for the package + mkdir -p $package + + # Extract the tarball to the package directory + tar -xzf $package.tar.gz -C $package --strip-components=1 + + # Install the package in very quiet mode by -qq + pip install ./$package --user --no-deps -qq + + # Verify the installation by importing the package + python -c "import $package; print($package.__file__)" + + echo "$package installation complete." + echo +done diff --git a/outsource/workflow/process-wrapper.sh b/outsource/workflow/process-wrapper.sh index 1a51599..50da998 100755 --- a/outsource/workflow/process-wrapper.sh +++ b/outsource/workflow/process-wrapper.sh @@ -75,11 +75,8 @@ if [ "X${standalone_download}" = "Xno-download" ]; then fi -echo "Installing cutax:" -mkdir cutax -tar -xzf cutax.tar.gz -C cutax --strip-components=1 -pip install ./cutax --user --no-deps -qq -python -c "import cutax; print(cutax.__file__)" +# Installing customized packages +. install.sh # See if we have any input tarballs From a4b28ea6af56d1e91197d41faab86b4f10b5974e Mon Sep 17 00:00:00 2001 From: dachengx Date: Sun, 15 Sep 2024 19:24:02 -0500 Subject: [PATCH 2/4] Add more comments, `+SingularityImage` have to be string --- outsource/outsource.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/outsource/outsource.py b/outsource/outsource.py index e034731..68044f0 100644 --- a/outsource/outsource.py +++ b/outsource/outsource.py @@ -276,7 +276,9 @@ def _generate_sc(self): condorpool.add_profiles(Namespace.CONDOR, universe="vanilla") # We need the x509 proxy for Rucio transfers condorpool.add_profiles(Namespace.CONDOR, "x509userproxy", os.environ["X509_USER_PROXY"]) - condorpool.add_profiles(Namespace.CONDOR, "+SingularityImage", self.singularity_image) + condorpool.add_profiles( + Namespace.CONDOR, "+SingularityImage", f'"{self.singularity_image}"' + ) # Ignore the site settings - the container will set all this up inside condorpool.add_profiles(Namespace.ENV, OSG_LOCATION="") @@ -552,10 +554,19 @@ def _generate_workflow(self): # Add job job = self._job(**self.job_kwargs[dtype]) if desired_sites: - # Give a hint to glideinWMS for the sites - # we want(mostly useful for XENONVO in Europe) + # Give a hint to glideinWMS for the sites we want + # (mostly useful for XENON VO in Europe). + # Glideinwms is the provisioning system. + # It starts pilot jobs (glideins) at sites when you + # have idle jobs in the queue. + # Most of the jobs you run to the OSPool (Open Science Pool), + # but you do have a few sites where you have allocations at, + # and those are labeled XENON VO (Virtual Organization). + # The "+" has to be used by non-standard HTCondor attributes. + # The attribute has to have double quotes, + # otherwise HTCondor will try to evaluate it as an expression. job.add_profiles( - Namespace.CONDOR, "+XENON_DESIRED_Sites", desired_sites + Namespace.CONDOR, "+XENON_DESIRED_Sites", f'"{desired_sites}"' ) job.add_profiles(Namespace.CONDOR, "requirements", requirements) job.add_profiles(Namespace.CONDOR, "priority", dbcfg.priority) From c063214d764d4aa7eb2de701ce6fc9e32d575dad Mon Sep 17 00:00:00 2001 From: dachengx Date: Mon, 16 Sep 2024 14:08:14 -0500 Subject: [PATCH 3/4] Add `install.sh` as jobs' inputs --- README.md | 1 - outsource/outsource.py | 12 +++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 3af2d1e..849fd4b 100644 --- a/README.md +++ b/README.md @@ -108,7 +108,6 @@ dtypes = peaklets, hitlets_nv, events_nv, events_mv, event_info_double, afterpul ; below are specific dtype options us_only = False hs06_test_run = False -this_site_only = False raw_records_rse = UC_OSG_USERDISK records_rse = UC_MIDWAY_USERDISK peaklets_rse = UC_OSG_USERDISK diff --git a/outsource/outsource.py b/outsource/outsource.py index 68044f0..756294b 100644 --- a/outsource/outsource.py +++ b/outsource/outsource.py @@ -376,7 +376,7 @@ def _generate_workflow(self): rc.add_replica("local", "combine.py", f"file://{base_dir}/workflow/combine.py") # script to install packages - combinepy = File("install.sh") + installsh = File("install.sh") rc.add_replica("local", "install.sh", f"file://{base_dir}/workflow/install.sh") # Add common data files to the replica catalog @@ -476,7 +476,7 @@ def _generate_workflow(self): combine_job.add_profiles(Namespace.CONDOR, "requirements", requirements_us) # priority is given in the order they were submitted combine_job.add_profiles(Namespace.CONDOR, "priority", dbcfg.priority) - combine_job.add_inputs(combinepy, xenon_config, token, *tarballs) + combine_job.add_inputs(installsh, combinepy, xenon_config, token, *tarballs) combine_output_tar_name = f"{dbcfg.key_for(dtype)}-combined.tar.gz" combine_output_tar = File(combine_output_tar_name) combine_job.add_outputs( @@ -533,7 +533,9 @@ def _generate_workflow(self): f"{self.update_db}".lower(), chunk_str, ) - download_job.add_inputs(processpy, xenon_config, token, *tarballs) + download_job.add_inputs( + installsh, processpy, xenon_config, token, *tarballs + ) download_job.add_outputs(data_tar, stage_out=False) wf.add_jobs(download_job) @@ -583,7 +585,7 @@ def _generate_workflow(self): chunk_str, ) - job.add_inputs(processpy, xenon_config, token, *tarballs) + job.add_inputs(installsh, processpy, xenon_config, token, *tarballs) job.add_outputs(job_output_tar, stage_out=(not self.upload_to_rucio)) wf.add_jobs(job) @@ -628,7 +630,7 @@ def _generate_workflow(self): f"{self.update_db}".lower(), ) - job.add_inputs(processpy, xenon_config, token, *tarballs) + job.add_inputs(installsh, processpy, xenon_config, token, *tarballs) # As long as we are giving outputs job.add_outputs(job_output_tar, stage_out=True) wf.add_jobs(job) From 4309d69313af8d4dab27a46af5b04be1f4535615 Mon Sep 17 00:00:00 2001 From: dachengx Date: Mon, 16 Sep 2024 15:27:48 -0500 Subject: [PATCH 4/4] Install strax straxen cutax in order in `--no-deps` mode --- outsource/workflow/install.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/outsource/workflow/install.sh b/outsource/workflow/install.sh index 55b313e..945eb2b 100644 --- a/outsource/workflow/install.sh +++ b/outsource/workflow/install.sh @@ -3,7 +3,7 @@ set -e # List of packages -packages=("cutax" "straxen" "strax") +packages=("strax" "straxen" "cutax") # Loop through each package for package in "${packages[@]}" @@ -32,3 +32,5 @@ do echo "$package installation complete." echo done + +straxen_print_versions