Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spot/Serve] Optimize the translation of filemounts #4016

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
19 changes: 19 additions & 0 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4574,6 +4574,25 @@ def _symlink_node(runner: command_runner.CommandRunner):
f'may already exist. Log: {log_path}')

subprocess_utils.run_in_parallel(_symlink_node, runners)
# (3) In case of compression, inflate all zip files
# TODO(warrickhe): track filenames for compression edgecase
# Can optimize further by going to specific filemounts & workdir only
def _decompress_workdir_zips(
runner: command_runner.CommandRunner) -> None:
zip_filename = (f'{constants.SKY_REMOTE_WORKDIR}'
'/skypilot-filemounts*.tar.gz')
decompress_command = (
f'[ -f {zip_filename} ] && '
f'(tar -xzf {zip_filename} -C {constants.SKY_REMOTE_WORKDIR} '
f'| cat && rm {zip_filename}) || '
'echo "Zipfile not found on this node"')
returncode = runner.run(decompress_command, log_path=log_path)
subprocess_utils.handle_returncode(
returncode, decompress_command,
'Failed to inflate or remove skypilot-filemounts-uuid.zip, '
f'check permissions. Log: {log_path}')

subprocess_utils.run_in_parallel(_decompress_workdir_zips, runners)
end = time.time()
logger.debug(f'File mount sync took {end - start} seconds.')

Expand Down
75 changes: 73 additions & 2 deletions sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
import os
import re
import shlex
import shutil
import subprocess
import tempfile
import time
import typing
from typing import Any, Dict, List, Optional, Tuple, Type, Union
import urllib.parse
import uuid

import colorama
import pathspec

from sky import check as sky_check
from sky import clouds
Expand Down Expand Up @@ -60,6 +64,8 @@
# Maximum number of concurrent rsync upload processes
_MAX_CONCURRENT_UPLOADS = 32

_MIN_FILES_TO_COMPRESS = 10

_BUCKET_FAIL_TO_CONNECT_MESSAGE = (
'Failed to access existing bucket {name!r}. '
'This is likely because it is a private bucket you do not have access to.\n'
Expand Down Expand Up @@ -848,7 +854,8 @@ def from_metadata(cls, metadata: StorageMetadata,

def add_store(self,
store_type: Union[str, StoreType],
region: Optional[str] = None) -> AbstractStore:
region: Optional[str] = None,
compress_local: bool = False) -> AbstractStore:
cblmemo marked this conversation as resolved.
Show resolved Hide resolved
"""Initializes and adds a new store to the storage.

Invoked by the optimizer after it has selected a store to
Expand All @@ -858,6 +865,8 @@ def add_store(self,
store_type: StoreType; Type of the storage [S3, GCS, AZURE, R2, IBM]
region: str; Region to place the bucket in. Caller must ensure that
the region is valid for the chosen store_type.
compress_local: boolean; Decides whether we want to compress the
filemount before uploading to the bucket.
"""
if isinstance(store_type, str):
store_type = StoreType(store_type)
Expand Down Expand Up @@ -922,7 +931,10 @@ def add_store(self,
self._add_store(store)

# Upload source to store
self._sync_store(store)
if compress_local:
self._maybe_compress_sync_store(store)
else:
self._sync_store(store)

return store

Expand Down Expand Up @@ -986,6 +998,65 @@ def sync_all_stores(self):
for _, store in self.stores.items():
self._sync_store(store)

def _maybe_compress_sync_store(self, store: AbstractStore):
"""Same as sync_store, but compresses before uploading"""
# Only supports workdir, self.source should be type str
if self.source is None or not isinstance(self.source, str):
self._sync_store(store)
return

def num_files(source, excluded_list):
warrickhe marked this conversation as resolved.
Show resolved Hide resolved

file_count = 0
spec = pathspec.PathSpec.from_lines(
pathspec.patterns.GitWildMatchPattern, excluded_list)
for root, _, files in os.walk(source):
for file in files:
if file.startswith('skypilot-filemounts-'
) and file.endswith('.tar.gz'):
logger.warning(f'Consider renaming the file: {file}')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should directly raise an error here since IIUC this will essentially remove the user file by override them?

Copy link
Author

@warrickhe warrickhe Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might it be better to leave it as a warning and call _sync_store from here, so the run will complete?

rel_path = os.path.relpath(os.path.join(root, file), source)
if not spec.match_file(rel_path):
file_count += 1
return file_count

excluded_list = storage_utils.get_excluded_files_from_gitignore(
self.source)
excluded_list.append('.git')

# Checks for total number of files before compressing
# Rsync main latency delay is in compressing a certain number of files.
# Testing shows that with 10+ files it's worth compressing.
if num_files(self.source, excluded_list) <= _MIN_FILES_TO_COMPRESS:
self._sync_store(store)
return

try:
with tempfile.TemporaryDirectory() as tmp_cmpdir:
# uuid used to avoid collisions if compressing multiple sources
# currently only compressing
file_name = f'skypilot-filemounts-{uuid.uuid1()}'
tmp_path = os.path.join(tmp_cmpdir, file_name)
tmp_workdir = tempfile.mkdtemp()
# secure a tempdir, delete for make_archive
shutil.rmtree(tmp_workdir)
# should only be used on workdir, which is a str
assert isinstance(store.source, str)
shutil.copytree(store.source,
tmp_workdir,
ignore=shutil.ignore_patterns(*excluded_list))
shutil.make_archive(tmp_path,
'gztar',
root_dir=tmp_workdir,
base_dir='.')
original_source = store.source
store.source = tmp_cmpdir
self._sync_store(store)
store.source = original_source
except exceptions.StorageUploadError:
logger.error('Problem when trying to upload compressed file')
raise

def _sync_store(self, store: AbstractStore):
"""Runs the upload routine for the store and handles failures"""

Expand Down
1 change: 1 addition & 0 deletions sky/setup_files/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def parse_readme(readme: str) -> str:
'jsonschema',
'networkx',
'pandas>=1.3.0',
'pathspec',
'pendulum',
# PrettyTable with version >=2.0.0 is required for the support of
# `add_rows` method.
Expand Down
6 changes: 3 additions & 3 deletions sky/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,11 +955,12 @@ def sync_storage_mounts(self) -> None:
file_mounts of the form ``{ /remote/path: {s3,gs,..}://<bucket path>
}``.
"""
for storage in self.storage_mounts.values():
for key, storage in self.storage_mounts.items():
if len(storage.stores) == 0:
store_type, store_region = self._get_preferred_store()
self.storage_plans[storage] = store_type
storage.add_store(store_type, store_region)
is_workdir = key == constants.SKY_REMOTE_WORKDIR
storage.add_store(store_type, store_region, is_workdir)
else:
# We will download the first store that is added to remote.
self.storage_plans[storage] = list(storage.stores.keys())[0]
Expand All @@ -970,7 +971,6 @@ def sync_storage_mounts(self) -> None:
if storage.mode == storage_lib.StorageMode.COPY:
store_type = storage_plans[storage]
if store_type is storage_lib.StoreType.S3:
# TODO: allow for Storage mounting of different clouds
if isinstance(storage.source,
str) and storage.source.startswith('s3://'):
blob_path = storage.source
Expand Down
1 change: 1 addition & 0 deletions sky/utils/controller_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ def maybe_translate_local_file_mounts_and_sync_up(task: 'task_lib.Task',
logger.info(f'{colorama.Fore.YELLOW}Uploading sources to cloud storage.'
f'{colorama.Style.RESET_ALL} See: sky storage ls')
try:
# Optimize filemount translation
task.sync_storage_mounts()
except ValueError as e:
if 'No enabled cloud for storage' in str(e):
Expand Down
Loading