Skip to content

Commit

Permalink
Merge pull request #288 from DataBiosphere/dev
Browse files Browse the repository at this point in the history
PR for 0.4.11 release
  • Loading branch information
wnojopra authored May 6, 2024
2 parents 254f3b0 + 088c444 commit 991c485
Show file tree
Hide file tree
Showing 17 changed files with 663 additions and 207 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ The image below illustrates this:

By default, `dsub` will use the [default Compute Engine service account](https://cloud.google.com/compute/docs/access/service-accounts#default_service_account)
as the authorized service account on the VM instance. You can choose to specify
the email address of another service acount using `--service-account`.
the email address of another service account using `--service-account`.

By default, `dsub` will grant the following access scopes to the service account:

Expand Down
4 changes: 2 additions & 2 deletions docs/compute_resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ or a

## Disks

By default, `dsub` launches a Compute Engine VM with a boot disk of 10 GB and an
attached data disk of size 200 GB.
By default, `dsub` launches a Compute Engine VM with a boot disk of 10 GB (30 GB
for `google-batch`) and an attached data disk of size 200 GB.

To change the boot disk size, use the `--boot-disk-size` flag.<sup>(\*)</sup>

Expand Down
2 changes: 1 addition & 1 deletion dsub/_dsub_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
0.1.3.dev0 -> 0.1.3 -> 0.1.4.dev0 -> ...
"""

DSUB_VERSION = '0.4.10'
DSUB_VERSION = '0.4.11'
9 changes: 7 additions & 2 deletions dsub/commands/dsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,13 @@ def _parse_arguments(prog, argv):
'--boot-disk-size',
default=job_model.DEFAULT_BOOT_DISK_SIZE,
type=int,
help='Size (in GB) of the boot disk (default: {})'.format(
job_model.DEFAULT_BOOT_DISK_SIZE))
help=(
'Size (in GB) of the boot disk (default: {}, {} for google-batch)'
.format(
job_model.DEFAULT_BOOT_DISK_SIZE, job_model.LARGE_BOOT_DISK_SIZE
)
),
)
google_common.add_argument(
'--preemptible',
const=param_util.preemptile_param_type(True),
Expand Down
1 change: 1 addition & 0 deletions dsub/lib/job_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
DEFAULT_MACHINE_TYPE = 'n1-standard-1'
DEFAULT_DISK_SIZE = 200
DEFAULT_BOOT_DISK_SIZE = 10
LARGE_BOOT_DISK_SIZE = 30
DEFAULT_MOUNTED_DISK_SIZE = 10
DEFAULT_PREEMPTIBLE = False
DEFAULT_DISK_TYPE = 'pd-standard'
Expand Down
3 changes: 3 additions & 0 deletions dsub/providers/batch_dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class types(object):
Volume = None
TaskSpec = None
Environment = None
ServiceAccount = None

class task(object):

Expand All @@ -36,6 +37,8 @@ class AllocationPolicy(object):
InstancePolicy = None
AttachedDisk = None
Disk = None
NetworkPolicy = None
Accelerator = None

class LogsPolicy(object):
Destination = None
Expand Down
214 changes: 115 additions & 99 deletions dsub/providers/google_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
"""

import ast
import json
import os
import sys
import textwrap
Expand All @@ -27,6 +26,7 @@
from . import base
from . import google_base
from . import google_batch_operations
from . import google_custom_machine
from . import google_utils
from ..lib import job_model
from ..lib import param_util
Expand Down Expand Up @@ -58,10 +58,9 @@
_TMP_DIR = f'{_DATA_MOUNT_POINT}/tmp'
_WORKING_DIR = f'{_DATA_MOUNT_POINT}/workingdir'

# These are visible to the user; not yet documented, as we'd *like* to
# These are visible to the user task; not yet documented, as we'd *like* to
# find a way to have them visible only to the logging tasks.
_BATCH_LOG_FILE_PATH = f'{_VOLUME_MOUNT_POINT}/.log.txt'
_LOG_FILE_PATH = f'{_DATA_MOUNT_POINT}/.log.txt'
_BATCH_LOG_DIR = f'{_VOLUME_MOUNT_POINT}/.logging'
_LOGGING_DIR = f'{_DATA_MOUNT_POINT}/.logging'

_LOG_FILTER_VAR = '_LOG_FILTER_REPR'
Expand All @@ -70,105 +69,76 @@
# _LOG_FILTER_PYTHON is a block of Python code to execute in both the
# "continuous_logging" and "final_logging" tasks.
#
# Batch API gives us one log file that is an interleave of all of the task
# outputs to both STDOUT and STDERR. A typical line looks something like:
# Batch API will eventually create three log files in the _BATCH_LOG_FILE_PATH
# directory. They are named:
#
# [batch_task_logs]<date> logging.go:227: INFO: [task_id:<task>]
# Task task/<task>-0/0/0/1, STDOUT: <message emitted from the task>
# - output-*.log
# - stdout-*.log
# - stderr-*.log
#
# Parsing text inputs can be fragile, so we try to be loose but accurate with
# our assumptions on log file format.
# We will be creating a "staging" location for each of these files.
#
# The way the following Python executes is to
# - Open the log file
# - For each line
# - If the line is *not* associated with the "user command", only emit it
# to the aggregate log (and emit it as-is)
# - AND if we find "<stuff>/<user_task_number>, STDOUT: " (or "...STDERR: ")
# - emit the task-generated message to either the stdout or stderr log file
# If any of the batch log files don't exist, touch the associated staging file.
#
# Thus the stdout and stderr log files only contain output from the user task,
# while the aggregate log contains everything.
# If the output batch log file exists, copy it directly to the staging
# location.
#
# There's tricky behavior to handle blank lines in the GCP Batch log file.
# What we observe is that *every* valid message emitted to the log is followed
# by a blank line.
# So we'd like to filter out those blank lines, but we need to be careful
# because a user command can emit blank lines, *which we want to preserve.*
# Thus when we see a blank line, we "hold" it as we don't know if we are going
# to print it until we see the next line.
#
# Note that to find the user task messages, we set up search strings
# (STDOUT_STR, STDERR_STR) which look something like: "/3, STDOUT: "
# If the stdout/stderr batch log files exist, copy them to their staging
# location. Then filter it so that only user-action logs exist and the prefixes
# are removed. The prefixes look something like:
# [batch_task_logs]<datetime> ERROR:
# [task_id:task/<job_uid>,runnable_index:<action_number>]

# pylint: disable=anomalous-backslash-in-string
_LOG_FILTER_PYTHON = textwrap.dedent("""
import fileinput
import glob
import re
import shutil
import sys
from pathlib import Path
INFILE_PATH = sys.argv[1]
LOGGING_DIR = sys.argv[1]
LOG_FILE_PATH = sys.argv[2]
STDOUT_FILE_PATH = sys.argv[3]
STDERR_FILE_PATH = sys.argv[4]
EMIT_UNEXPECTED_PATH = sys.argv[5]
USER_TASK = sys.argv[6]
EMIT_UNEXPECTED_ONCE = Path(EMIT_UNEXPECTED_PATH)
STDOUT_STR = f"/{USER_TASK}, STDOUT: "
STDERR_STR = f"/{USER_TASK}, STDERR: "
with open(INFILE_PATH) as IN_FILE:
LOG_FILE = open(LOG_FILE_PATH, "w")
STDOUT_FILE = open(STDOUT_FILE_PATH, "w")
STDERR_FILE = open(STDERR_FILE_PATH, "w")
BLANK_LINE_HOLD = False
WHICH_STD_FILE = None
line_no = 0
for line in IN_FILE:
line_no += 1
line = line.splitlines()[0]
if not line:
BLANK_LINE_HOLD = True
continue
elif line.startswith("[batch_task_logs]"):
BLANK_LINE_HOLD = False
WHICH_STD_FILE = None
else:
if BLANK_LINE_HOLD and WHICH_STD_FILE:
print(file=LOG_FILE)
print(file=WHICH_STD_FILE)
print(line, file=LOG_FILE)
stdout = line.find(STDOUT_STR)
stderr = line.find(STDERR_STR)
stdout = sys.maxsize if stdout == -1 else stdout
stderr = sys.maxsize if stderr == -1 else stderr
if stdout < stderr:
print(line[stdout + len(STDOUT_STR):], file=STDOUT_FILE)
WHICH_STD_FILE = STDOUT_FILE
elif stdout > stderr:
print(line[stderr + len(STDERR_STR):], file=STDERR_FILE)
WHICH_STD_FILE = STDERR_FILE
elif WHICH_STD_FILE:
print(line, file=WHICH_STD_FILE)
elif not EMIT_UNEXPECTED_ONCE.is_file():
print(f"Unexpected log format at line {line_no}: {line}")
EMIT_UNEXPECTED_ONCE.touch()
USER_TASK = sys.argv[5]
def filter_log_file(staging_path: str, stream_string: str):
# Replaces lines in file inplace
for line in fileinput.input(staging_path, inplace=True):
re_search_string = fr"^\[batch_task_logs\].*{stream_string}: \[task_id:task\/.*runnable_index:{USER_TASK}] (.*)"
match = re.search(re_search_string, line)
if match:
modified_line = match.group(1)
print(modified_line)
def copy_log_to_staging(glob_str: str, staging_path: str, filter_str: str = None):
# Check if log files exist, and copy to their staging location
matching_files = list(Path(LOGGING_DIR).glob(glob_str))
if matching_files:
assert(len(matching_files) == 1)
shutil.copy(matching_files[0], staging_path)
if filter_str:
filter_log_file(staging_path, filter_str)
else:
Path(staging_path).touch()
# We know the log file is named output-<job-uid>.log
# and the stdout/stderr files are named stdout-<job-uid>.log and
# stderr-<job-uid>.log
copy_log_to_staging("output-*.log", LOG_FILE_PATH)
copy_log_to_staging("stdout-*.log", STDOUT_FILE_PATH, filter_str="INFO")
copy_log_to_staging("stderr-*.log", STDERR_FILE_PATH, filter_str="ERROR")
""")
# pylint: enable=anomalous-backslash-in-string

_LOG_CP = textwrap.dedent("""
cp "{log_file_path}" .
python3 "{log_filter_script_path}" \
"{log_file_path}" \
"${{LOGGING_DIR}}" \
"${{LOGGING_DIR}}/log.txt" \
"${{LOGGING_DIR}}/stdout.txt" \
"${{LOGGING_DIR}}/stderr.txt" \
"${{LOGGING_DIR}}/emit_unexpected_sentinel.txt" \
"{user_action}"
gsutil_cp "${{LOGGING_DIR}}/stdout.txt" "${{STDOUT_PATH}}" "text/plain" "${{USER_PROJECT}}" &
Expand Down Expand Up @@ -219,9 +189,6 @@
> "{log_filter_script_path}"
chmod a+x "{log_filter_script_path}"
# Make sure the log file exists
touch "{log_file_path}"
while [[ ! -e "${{LOGGING_DIR}}/.stop_logging" ]]; do
{log_cp}
Expand Down Expand Up @@ -501,10 +468,8 @@ def _create_batch_request(
log_filter_script_path=_LOG_FILTER_SCRIPT_PATH,
python_decode_script=google_utils.PYTHON_DECODE_SCRIPT,
logging_dir=_LOGGING_DIR,
log_file_path=_LOG_FILE_PATH,
log_cp=_LOG_CP.format(
log_filter_script_path=_LOG_FILTER_SCRIPT_PATH,
log_file_path=_LOG_FILE_PATH,
user_action=user_action,
),
log_interval=job_resources.log_interval or '60s',
Expand All @@ -517,10 +482,8 @@ def _create_batch_request(
log_filter_script_path=_LOG_FILTER_SCRIPT_PATH,
python_decode_script=google_utils.PYTHON_DECODE_SCRIPT,
logging_dir=_LOGGING_DIR,
log_file_path=_LOG_FILE_PATH,
log_cp=_LOG_CP.format(
log_filter_script_path=_LOG_FILTER_SCRIPT_PATH,
log_file_path=_LOG_FILE_PATH,
user_action=user_action,
),
)
Expand Down Expand Up @@ -662,22 +625,74 @@ def _create_batch_request(
# instance type and resources attached to each VM. The AllocationPolicy
# describes when, where, and how compute resources should be allocated
# for the Job.
boot_disk = google_batch_operations.build_persistent_disk(
size_gb=max(
job_resources.boot_disk_size, job_model.LARGE_BOOT_DISK_SIZE
),
disk_type=job_model.DEFAULT_DISK_TYPE,
)
disk = google_batch_operations.build_persistent_disk(
size_gb=job_resources.disk_size,
disk_type=job_resources.disk_type or job_model.DEFAULT_DISK_TYPE,
)
attached_disk = google_batch_operations.build_attached_disk(
disk=disk, device_name=google_utils.DATA_DISK_NAME
)

if job_resources.machine_type:
machine_type = job_resources.machine_type
elif job_resources.min_cores or job_resources.min_ram:
machine_type = (
google_custom_machine.GoogleCustomMachine.build_machine_type(
job_resources.min_cores, job_resources.min_ram
)
)
else:
machine_type = job_model.DEFAULT_MACHINE_TYPE

instance_policy = google_batch_operations.build_instance_policy(
attached_disk
boot_disk=boot_disk,
disks=attached_disk,
machine_type=machine_type,
accelerators=google_batch_operations.build_accelerators(
accelerator_type=job_resources.accelerator_type,
accelerator_count=job_resources.accelerator_count,
),
)

ipt = google_batch_operations.build_instance_policy_or_template(
instance_policy
instance_policy=instance_policy,
install_gpu_drivers=True
if job_resources.accelerator_type is not None
else False,
)
allocation_policy = google_batch_operations.build_allocation_policy([ipt])

if job_resources.service_account:
scopes = job_resources.scopes or google_base.DEFAULT_SCOPES
service_account = google_batch_operations.build_service_account(
service_account_email=job_resources.service_account, scopes=scopes
)
else:
service_account = None

network_policy = google_batch_operations.build_network_policy(
network=job_resources.network,
subnetwork=job_resources.subnetwork,
no_external_ip_address=job_resources.use_private_address,
)

allocation_policy = google_batch_operations.build_allocation_policy(
ipts=[ipt],
service_account=service_account,
network_policy=network_policy,
)

logs_policy = google_batch_operations.build_logs_policy(
batch_v1.LogsPolicy.Destination.PATH, _BATCH_LOG_FILE_PATH
# Explicitly end the logging path with a slash.
# This will prompt Batch API to create the log, stdout, and stderr
# files in the specified directory.
batch_v1.LogsPolicy.Destination.PATH,
_BATCH_LOG_DIR + '/',
)

# Bring together the task definition(s) and build the Job request.
Expand Down Expand Up @@ -775,14 +790,15 @@ def submit_job(
if self._dry_run:
requests.append(request)
else:
# task_id = client.create_job(request=request)
task_id = self._submit_batch_job(request)
launched_tasks.append(task_id)
# If this is a dry-run, emit all the pipeline request objects
# If this is a dry-run, emit all the batch request objects
if self._dry_run:
print(
json.dumps(requests, indent=2, sort_keys=True, separators=(',', ': '))
)
# Each request is a google.cloud.batch_v1.types.batch.CreateJobRequest
# object. The __repr__ method for this object outputs something that
# closely resembles yaml, but can't actually be serialized into yaml.
# Ideally, we could serialize these request objects to yaml or json.
print(requests)
return {
'job-id': job_id,
'user-id': job_descriptor.job_metadata.get('user-id'),
Expand Down
Loading

0 comments on commit 991c485

Please sign in to comment.