Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tarball strax(en) and cutax for later user installation #175

Merged
merged 4 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 69 additions & 30 deletions outsource/outsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from tqdm import tqdm
from utilix import DB, uconfig
from utilix.x509 import _validate_x509_proxy
from utilix.tarball import Tarball
from utilix.config import setup_logger, set_logging_level
import cutax

Expand Down Expand Up @@ -274,11 +275,9 @@ def _generate_sc(self):
condorpool.add_profiles(Namespace.PEGASUS, style="condor")
condorpool.add_profiles(Namespace.CONDOR, universe="vanilla")
# We need the x509 proxy for Rucio transfers
condorpool.add_profiles(Namespace.CONDOR, "x509userproxy", os.environ["X509_USER_PROXY"])
condorpool.add_profiles(
Namespace.CONDOR, key="x509userproxy", value=os.environ["X509_USER_PROXY"]
)
condorpool.add_profiles(
Namespace.CONDOR, key="+SingularityImage", value=f'"{self.singularity_image}"'
Namespace.CONDOR, "+SingularityImage", f'"{self.singularity_image}"'
)

# Ignore the site settings - the container will set all this up inside
Expand Down Expand Up @@ -312,6 +311,44 @@ def _generate_tc(self):
def _generate_rc(self):
return ReplicaCatalog()

def make_tarballs(self):
"""Make tarballs of Ax-based packages if they are in editable user-
installed mode."""
tarballs = []
tarball_paths = []
for package_name in ["strax", "straxen", "cutax"]:
FaroutYLq marked this conversation as resolved.
Show resolved Hide resolved
_tarball = Tarball(self.generated_dir, package_name)
FaroutYLq marked this conversation as resolved.
Show resolved Hide resolved
if not Tarball.get_installed_git_repo(package_name):
# Packages should not be non-editable user-installed
if Tarball.is_user_installed(package_name):
FaroutYLq marked this conversation as resolved.
Show resolved Hide resolved
raise RuntimeError(
f"You should install {package_name} in non-editable user-installed mode."
)
# cutax is special because it is not installed in site-pacakges of the environment
if package_name == "cutax":
if "CUTAX_LOCATION" not in os.environ:
raise RuntimeError(
"cutax should either be editable user-installed from a git repo "
"or patched by the software environment by CUTAX_LOCATION."
)
tarball = File(_tarball.tarball_name)
tarball_path = (
"/ospool/uc-shared/project/xenon/xenonnt/software"
f"/cutax/v{cutax.__version__}.tar.gz"
)
else:
continue
else:
_tarball.create_tarball()
tarball = File(_tarball.tarball_name)
tarball_path = _tarball.tarball_path
self.logger.warning(
f"Using tarball of user installed package {package_name} at {tarball_path}."
)
tarballs.append(tarball)
tarball_paths.append(tarball_path)
return tarballs, tarball_paths

def _generate_workflow(self):
"""Use the Pegasus API to build an abstract graph of the workflow."""

Expand All @@ -338,6 +375,10 @@ def _generate_workflow(self):
combinepy = File("combine.py")
rc.add_replica("local", "combine.py", f"file://{base_dir}/workflow/combine.py")

# script to install packages
combinepy = File("install.sh")
dachengx marked this conversation as resolved.
Show resolved Hide resolved
rc.add_replica("local", "install.sh", f"file://{base_dir}/workflow/install.sh")

# Add common data files to the replica catalog
xenon_config = File(".xenon_config")
rc.add_replica("local", ".xenon_config", f"file://{uconfig.config_path}")
Expand All @@ -348,17 +389,9 @@ def _generate_workflow(self):
"local", ".dbtoken", "file://" + os.path.join(os.environ["HOME"], ".dbtoken")
)

# cutax tarball
cutax_tarball = File("cutax.tar.gz")
if "CUTAX_LOCATION" not in os.environ:
self.logger.warning(
"No CUTAX_LOCATION env variable found. Using the latest by default!"
)
tarball_path = "/ospool/uc-shared/project/xenon/xenonnt/software/cutax/latest.tar.gz"
else:
tarball_path = os.environ["CUTAX_LOCATION"].replace(".", "-") + ".tar.gz"
self.logger.warning(f"Using cutax: {tarball_path}")
rc.add_replica("local", "cutax.tar.gz", tarball_path)
tarballs, tarball_paths = self.make_tarballs()
for tarball, tarball_path in zip(tarballs, tarball_paths):
rc.add_replica("local", tarball, tarball_path)

# runs
iterator = self._runlist if len(self._runlist) == 1 else tqdm(self._runlist)
Expand Down Expand Up @@ -388,7 +421,7 @@ def _generate_workflow(self):
if not all(
[dbcfg._raw_data_exists(raw_type=d) for d in dbcfg.depends_on(dtype)]
):
self.logger.warning(
self.logger.error(
f"Doesn't have raw data for {dtype} of run_id {run_id}, skipping"
)
continue
Expand Down Expand Up @@ -442,8 +475,8 @@ def _generate_workflow(self):
# combine jobs must happen in the US
combine_job.add_profiles(Namespace.CONDOR, "requirements", requirements_us)
# priority is given in the order they were submitted
combine_job.add_profiles(Namespace.CONDOR, "priority", f"{dbcfg.priority}")
combine_job.add_inputs(combinepy, xenon_config, cutax_tarball, token)
combine_job.add_profiles(Namespace.CONDOR, "priority", dbcfg.priority)
combine_job.add_inputs(combinepy, xenon_config, token, *tarballs)
combine_output_tar_name = f"{dbcfg.key_for(dtype)}-combined.tar.gz"
combine_output_tar = File(combine_output_tar_name)
combine_job.add_outputs(
Expand Down Expand Up @@ -488,9 +521,7 @@ def _generate_workflow(self):
download_job.add_profiles(
Namespace.CONDOR, "requirements", requirements
)
download_job.add_profiles(
Namespace.CONDOR, "priority", f"{dbcfg.priority}"
)
download_job.add_profiles(Namespace.CONDOR, "priority", dbcfg.priority)
download_job.add_args(
dbcfg.run_id,
self.context_name,
Expand All @@ -502,7 +533,7 @@ def _generate_workflow(self):
f"{self.update_db}".lower(),
chunk_str,
)
download_job.add_inputs(processpy, xenon_config, token, cutax_tarball)
download_job.add_inputs(processpy, xenon_config, token, *tarballs)
download_job.add_outputs(data_tar, stage_out=False)
wf.add_jobs(download_job)

Expand All @@ -523,14 +554,22 @@ def _generate_workflow(self):
# Add job
job = self._job(**self.job_kwargs[dtype])
if desired_sites:
# Give a hint to glideinWMS for the sites
# we want(mostly useful for XENONVO in Europe)
# Give a hint to glideinWMS for the sites we want
# (mostly useful for XENON VO in Europe).
# Glideinwms is the provisioning system.
# It starts pilot jobs (glideins) at sites when you
# have idle jobs in the queue.
# Most of the jobs you run to the OSPool (Open Science Pool),
# but you do have a few sites where you have allocations at,
# and those are labeled XENON VO (Virtual Organization).
# The "+" has to be used by non-standard HTCondor attributes.
# The attribute has to have double quotes,
# otherwise HTCondor will try to evaluate it as an expression.
job.add_profiles(
Namespace.CONDOR, "+XENON_DESIRED_Sites", f'"{desired_sites}"'
)
job.add_profiles(Namespace.CONDOR, "requirements", requirements)
job.add_profiles(Namespace.CONDOR, "priority", f"{dbcfg.priority}")
# job.add_profiles(Namespace.CONDOR, 'periodic_remove', periodic_remove)
job.add_profiles(Namespace.CONDOR, "priority", dbcfg.priority)

job.add_args(
dbcfg.run_id,
Expand All @@ -544,7 +583,7 @@ def _generate_workflow(self):
chunk_str,
)

job.add_inputs(processpy, xenon_config, token, cutax_tarball)
job.add_inputs(processpy, xenon_config, token, *tarballs)
job.add_outputs(job_output_tar, stage_out=(not self.upload_to_rucio))
wf.add_jobs(job)

Expand Down Expand Up @@ -574,7 +613,7 @@ def _generate_workflow(self):
job = self._job(**self.job_kwargs[dtype], cores=2)
# https://support.opensciencegrid.org/support/solutions/articles/12000028940-working-with-tensorflow-gpus-and-containers
job.add_profiles(Namespace.CONDOR, "requirements", requirements)
job.add_profiles(Namespace.CONDOR, "priority", f"{dbcfg.priority}")
job.add_profiles(Namespace.CONDOR, "priority", dbcfg.priority)

# Note that any changes to this argument list,
# also means process-wrapper.sh has to be updated
Expand All @@ -589,7 +628,7 @@ def _generate_workflow(self):
f"{self.update_db}".lower(),
)

job.add_inputs(processpy, xenon_config, token, cutax_tarball)
job.add_inputs(processpy, xenon_config, token, *tarballs)
# As long as we are giving outputs
job.add_outputs(job_output_tar, stage_out=True)
wf.add_jobs(job)
Expand Down Expand Up @@ -636,7 +675,7 @@ def submit(self, force=False):
if os.path.exists(self.workflow_dir):
if force:
self.logger.warning(
f"Overwriting workflow at {self.workflow_dir}. CTRL+C now to stop."
f"Overwriting workflow at {self.workflow_dir}. Press ctrl+C now to stop."
)
time.sleep(10)
shutil.rmtree(self.workflow_dir)
Expand Down
8 changes: 2 additions & 6 deletions outsource/workflow/combine-wrapper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,8 @@ if [ "X$upload_to_rucio" = "Xtrue" ]; then
export RUCIO_ACCOUNT=production
fi

echo "Installing cutax:"
mkdir cutax
tar -xzf cutax.tar.gz -C cutax --strip-components=1
# Install in a very quiet mode by -qq
pip install ./cutax --user --no-deps --qq
python -c "import cutax; print(cutax.__file__)"
# Installing customized packages
. install.sh

# Combine the data
time python combine.py ${run_id} ${dtype} --context ${context} --xedocs_version ${xedocs_version} --input data ${combine_extra_args}
Expand Down
34 changes: 34 additions & 0 deletions outsource/workflow/install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/bin/bash

set -e

# List of packages
packages=("cutax" "straxen" "strax")

# Loop through each package
for package in "${packages[@]}"
do
# Check if the tarball exists
if [ ! -f "$package.tar.gz" ]; then
echo "Tarball $package.tar.gz not found. Skipping $package."
echo
continue
fi

echo "Installing $package:"

# Create a directory for the package
mkdir -p $package

# Extract the tarball to the package directory
tar -xzf $package.tar.gz -C $package --strip-components=1

# Install the package in very quiet mode by -qq
pip install ./$package --user --no-deps -qq

# Verify the installation by importing the package
python -c "import $package; print($package.__file__)"

echo "$package installation complete."
echo
done
7 changes: 2 additions & 5 deletions outsource/workflow/process-wrapper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,8 @@ if [ "X${standalone_download}" = "Xno-download" ]; then
fi


echo "Installing cutax:"
mkdir cutax
tar -xzf cutax.tar.gz -C cutax --strip-components=1
pip install ./cutax --user --no-deps -qq
python -c "import cutax; print(cutax.__file__)"
# Installing customized packages
. install.sh


# See if we have any input tarballs
Expand Down