diff --git a/docs/source/how_to_guides/configure_cloud_storage_credentials.md b/docs/source/how_to_guides/configure_cloud_storage_credentials.md index 57a29a89d..8b38c428e 100644 --- a/docs/source/how_to_guides/configure_cloud_storage_credentials.md +++ b/docs/source/how_to_guides/configure_cloud_storage_credentials.md @@ -274,3 +274,50 @@ export DATABRICKS_HOST='hostname' export DATABRICKS_TOKEN='token key' ``` ```` + + +## Alipan + +To authenticate Alipan access, users must set their Alipan refresh token (`ALIPAN_WEB_REFRESH_TOKEN`) in the run environment. + +To get the refresh token from the Alipan website, the user needs to login to the [Alipan website](https://www.alipan.com/drive), go to the console of the browser's devTools, and pass the code below into the console to get the refresh token. + +```javascript +JSON.parse(localStorage.token).refresh_token; +``` + +Then set the refresh token in the run environment. + +````{tabs} +```{code-tab} py +import os +os.environ['ALIPAN_WEB_REFRESH_TOKEN'] = 'refresh_token' +``` + +```{code-tab} sh +export ALIPAN_WEB_REFRESH_TOKEN='refresh_token' +``` +```` + +### Encryption Data + +If you want to encrypt data in cloud storage, you can set the environment variable below to encrypt and decrypt the data. + +````{tabs} +```{code-tab} py +import os +os.environ['ALIPAN_ENCRYPT_PASSWORD'] = 'encryption_key' + +# For uploading, the encryption type must be set as one of `Simple`, `ChaCha20`, `AES256CBC` +# When downloading, this environment variable is not required +os.environ['ALIPAN_ENCRYPT_TYPE'] = 'AES256CBC' +``` + +```{code-tab} sh +export ALIPAN_ENCRYPT_PASSWORD='encryption_key' + +# For uploading, the encryption type must be set as one of `Simple`, `ChaCha20`, `AES256CBC` +# When downloading, this environment variable is not required +export ALIPAN_ENCRYPT_TYPE='AES256CBC' +``` +```` diff --git a/setup.py b/setup.py index 6948efb43..bdfefbfd8 100644 --- a/setup.py +++ b/setup.py @@ -119,6 +119,10 @@ 'databricks-sdk==0.23.0', ] +extra_deps['alipan'] = [ + 'AliPCS-Py>=0.8,<1', +] + extra_deps['testing'] = [ 'mosaicml-cli>=0.5.25,<0.7', ] diff --git a/streaming/base/storage/__init__.py b/streaming/base/storage/__init__.py index 030a47c4d..e9653db9d 100644 --- a/streaming/base/storage/__init__.py +++ b/streaming/base/storage/__init__.py @@ -3,8 +3,8 @@ """Base module for downloading/uploading files from/to cloud storage.""" -from streaming.base.storage.download import (download_file, download_from_azure, - download_from_azure_datalake, +from streaming.base.storage.download import (download_file, download_from_alipan, + download_from_azure, download_from_azure_datalake, download_from_databricks_unity_catalog, download_from_dbfs, download_from_gcs, download_from_local, download_from_oci, @@ -29,5 +29,6 @@ 'download_from_azure_datalake', 'download_from_databricks_unity_catalog', 'download_from_dbfs', + 'download_from_alipan', 'download_from_local', ] diff --git a/streaming/base/storage/download.py b/streaming/base/storage/download.py index 777fdde47..cdcf3d489 100644 --- a/streaming/base/storage/download.py +++ b/streaming/base/storage/download.py @@ -21,6 +21,7 @@ 'download_from_azure_datalake', 'download_from_databricks_unity_catalog', 'download_from_dbfs', + 'download_from_alipan', 'download_from_local', ] @@ -422,6 +423,50 @@ def download_from_dbfs(remote: str, local: str) -> None: os.rename(local_tmp, local) +def download_from_alipan(remote: str, local: str) -> None: + """Download a file from remote Alipan to local. + + Args: + remote (str): Remote path (Alipan). + local (str): Local path (local filesystem). + """ + from alipcs_py.alipcs import AliPCSApiMix + from alipcs_py.commands.download import download_file + + web_refresh_token = os.environ['ALIPAN_WEB_REFRESH_TOKEN'] + web_token_type = 'Bearer' + alipan_encrypt_password = os.environ.get('ALIPAN_ENCRYPT_PASSWORD', '').encode() + + api = AliPCSApiMix(web_refresh_token, web_token_type=web_token_type) + + obj = urllib.parse.urlparse(remote) + if obj.scheme != 'alipan': + raise ValueError( + f'Expected obj.scheme to be `alipan`, instead, got {obj.scheme} for remote={remote}') + if obj.netloc != '': + raise ValueError( + f'Expected remote to be alipan:///path/to/some, instead, got remote={remote}') + + remote_path = obj.path + filename = pathlib.PosixPath(remote_path).name + localdir = pathlib.Path(local).parent + + remote_pcs_file = api.get_file(remotepath=remote_path) + if remote_pcs_file is None: + raise FileNotFoundError(f'Object {remote} not found.') + + download_file( + api, + remote_pcs_file, + localdir=localdir, + downloader='me', + concurrency=1, + show_progress=False, + encrypt_password=alipan_encrypt_password, + ) + os.rename(localdir / filename, local) + + def download_from_local(remote: str, local: str) -> None: """Download a file from remote to local. @@ -474,6 +519,8 @@ def download_file(remote: Optional[str], local: str, timeout: float): download_from_databricks_unity_catalog(remote, local) elif remote.startswith('dbfs:/'): download_from_dbfs(remote, local) + elif remote.startswith('alipan://'): + download_from_alipan(remote, local) else: download_from_local(remote, local) diff --git a/streaming/base/storage/upload.py b/streaming/base/storage/upload.py index 75d8d44ff..644cf6913 100644 --- a/streaming/base/storage/upload.py +++ b/streaming/base/storage/upload.py @@ -27,6 +27,7 @@ 'AzureUploader', 'DatabricksUnityCatalogUploader', 'DBFSUploader', + 'AlipanUploader', 'LocalUploader', ] @@ -40,6 +41,7 @@ 'azure-dl': 'AzureDataLakeUploader', 'dbfs:/Volumes': 'DatabricksUnityCatalogUploader', 'dbfs': 'DBFSUploader', + 'alipan': 'AlipanUploader', '': 'LocalUploader', } @@ -945,6 +947,137 @@ def check_folder_exists(self): raise e +class AlipanUploader(CloudUploader): + """Upload file from local machine to Alipan. + + Args: + out (str | Tuple[str, str]): Output dataset directory to save shard files. + + 1. If ``out`` is a local directory, shard files are saved locally. + 2. If ``out`` is a remote directory, a local temporary directory is created to + cache the shard files and then the shard files are uploaded to a remote + location. At the end, the temp directory is deleted once shards are uploaded. + 3. If ``out`` is a tuple of ``(local_dir, remote_dir)``, shard files are saved in + the `local_dir` and also uploaded to a remote location. + keep_local (bool): If the dataset is uploaded, whether to keep the local dataset + shard file or remove it after uploading. Defaults to ``False``. + progress_bar (bool): Display TQDM progress bars for uploading output dataset files to + a remote location. Default to ``False``. + retry (int): Number of times to retry uploading a file. Defaults to ``2``. + exist_ok (bool): When exist_ok = False, raise error if the local part of ``out`` already + exists and has contents. Defaults to ``False``. + """ + + def __init__(self, + out: Union[str, Tuple[str, str]], + keep_local: bool = False, + progress_bar: bool = False, + retry: int = 2, + exist_ok: bool = False) -> None: + super().__init__(out, keep_local, progress_bar, retry, exist_ok) + + obj = urllib.parse.urlparse(self.remote) + if obj.scheme != 'alipan': + raise ValueError( + f'Expected obj.scheme to be `alipan`, instead, got {obj.scheme} for remote={self.remote}' + ) + if obj.netloc != '': + raise ValueError( + f'Expected remote to be alipan:///path/to/some, instead, got remote={self.remote}') + + from alipcs_py.alipcs import AliPCSApiMix + from alipcs_py.commands.upload import EncryptType + + web_refresh_token = os.environ['ALIPAN_WEB_REFRESH_TOKEN'] + web_token_type = 'Bearer' + self.alipan_encrypt_password = os.environ.get('ALIPAN_ENCRYPT_PASSWORD', '').encode() + self.alipan_encrypt_type = EncryptType.No # No encryption by default + encrypt_type = os.environ.get('ALIPAN_ENCRYPT_TYPE', '') + if encrypt_type: + if getattr(EncryptType, encrypt_type, None) is None: + raise ValueError( + f'Invalid ALIPAN_ENCRYPT_TYPE: {encrypt_type}. ' + + 'Encryption type must be one of `Simple`, `ChaCha20`, `AES256CBC`') + else: + self.alipan_encrypt_type = getattr(EncryptType, encrypt_type) + + self.api = AliPCSApiMix(web_refresh_token, web_token_type=web_token_type) + self.check_token() + + def check_token(self): + """Raise an exception if the refresh_token is invalid. + + Raises: + error: AliPCSError with code `AccessTokenInvalid` + """ + self.api.refresh() + + def upload_file(self, filename: str): + """Upload file from local instance to Alipan directory. + + Args: + filename (str): File to upload. + """ + from alipcs_py.commands import upload as alipcs_upload + from alipcs_py.commands.upload import EncryptType, total_len + + @retry(num_attempts=self.retry) + def _upload_file(): + local_filename = os.path.join(self.local, filename) + remote_filename = os.path.join(self.remote, filename) # pyright: ignore + obj = urllib.parse.urlparse(remote_filename) + logger.debug(f'Uploading to {remote_filename}') + if self.alipan_encrypt_type == EncryptType.No: + file_size = os.stat(local_filename).st_size + else: + file_size = self.alipan_encrypt_type + encrypt_io = self.alipan_encrypt_type.encrypt_io(open(local_filename, 'rb'), + self.alipan_encrypt_password) + file_size = total_len(encrypt_io) + with tqdm.tqdm(total=file_size, + unit='B', + unit_scale=True, + desc=f'Uploading to {remote_filename}', + disable=(not self.progress_bar)) as pbar: + alipcs_upload.upload_file( + self.api, (local_filename, obj.path), + 'overwrite', + encrypt_password=self.alipan_encrypt_password, + encrypt_type=self.alipan_encrypt_type, + callback_for_monitor=lambda offset: pbar.update(offset - pbar.n)) + self.clear_local(local=local_filename) + + _upload_file() + + def list_objects(self, prefix: Optional[str] = None) -> List[str]: + """List all objects in the remote path with the given prefix. + + Args: + prefix (Optional[str], optional): The prefix to search for. Defaults to ``None``. + + Returns: + List[str]: A list of object names that match the prefix. + """ + if prefix is None: + prefix = '' + + if self.remote is None: + raise ValueError('Alipan remote path must be set.') + + obj = urllib.parse.urlparse(self.remote) + remote_pcs_file = self.api.get_file(remotepath=obj.path) + if remote_pcs_file is None: + raise FileNotFoundError(f'Alipan remote path `{obj.path}` not found.') + + file_paths = [] + for pcs_file in self.api.list_iter(remote_pcs_file.file_id, recursive=True): + file_path = pathlib.PosixPath(obj.path, pcs_file.path).as_posix() + if file_path.startswith(prefix): + file_paths.append(file_path) + + return sorted(file_paths) + + class LocalUploader(CloudUploader): """Copy file from one local directory to another local directory. diff --git a/tests/conftest.py b/tests/conftest.py index 5cfd5ddfe..ac7844539 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -168,3 +168,11 @@ def test_list_r2_buckets(): client = boto3.client('s3', region_name='us-east-1', endpoint_url=R2_URL) buckets = client.list_buckets() assert buckets['Buckets'][0]['Name'] == MY_BUCKET + + +@pytest.fixture() +def alipan_credentials(): + """Mocked alipan Credentials.""" + os.environ['ALIPAN_WEB_REFRESH_TOKEN'] = 'testing' + yield + del os.environ['ALIPAN_WEB_REFRESH_TOKEN'] diff --git a/tests/test_download.py b/tests/test_download.py index 8c9d44fa0..8d9a0c1b2 100644 --- a/tests/test_download.py +++ b/tests/test_download.py @@ -225,6 +225,14 @@ def test_download_from_dbfs_gets_called(self, mocked_requests: Mock, remote_loca mocked_requests.assert_called_once() mocked_requests.assert_called_once_with(mock_remote_filepath, mock_local_filepath) + @patch('streaming.base.storage.download.download_from_alipan') + @pytest.mark.usefixtures('remote_local_file') + def test_download_from_alipan_gets_called(self, mocked_requests: Mock, remote_local_file: Any): + mock_remote_filepath, mock_local_filepath = remote_local_file(cloud_prefix='alipan://') + download_file(mock_remote_filepath, mock_local_filepath, 60) + mocked_requests.assert_called_once() + mocked_requests.assert_called_once_with(mock_remote_filepath, mock_local_filepath) + @patch('streaming.base.storage.download.download_from_local') @pytest.mark.usefixtures('remote_local_file') def test_download_from_local_gets_called(self, mocked_requests: Mock, remote_local_file: Any): diff --git a/tests/test_upload.py b/tests/test_upload.py index e98171522..455b6b8c4 100644 --- a/tests/test_upload.py +++ b/tests/test_upload.py @@ -11,10 +11,10 @@ import boto3 import pytest -from streaming.base.storage.upload import (AzureDataLakeUploader, AzureUploader, CloudUploader, - DatabricksUnityCatalogUploader, DBFSUploader, - GCSAuthentication, GCSUploader, LocalUploader, - S3Uploader) +from streaming.base.storage.upload import (AlipanUploader, AzureDataLakeUploader, AzureUploader, + CloudUploader, DatabricksUnityCatalogUploader, + DBFSUploader, GCSAuthentication, GCSUploader, + LocalUploader, S3Uploader) from tests.conftest import MY_BUCKET, R2_URL MY_PREFIX = 'train' @@ -488,6 +488,41 @@ def test_local_directory_is_empty(self, mock_create_client: Mock, _ = DBFSUploader(out=local) +class TestAlipanUploader: + + @patch('streaming.base.storage.upload.AlipanUploader.check_token') + @pytest.mark.usefixtures('alipan_credentials') + @pytest.mark.parametrize('out', + ['alipan:///container/dir', ('./dir1', 'alipan:///container/dir/')]) + def test_instantiation(self, mock_create_client: Mock, out: Any): + mock_create_client.side_effect = None + _ = AlipanUploader(out=out) + if not isinstance(out, str): + shutil.rmtree(out[0], ignore_errors=True) + + @patch('streaming.base.storage.upload.AlipanUploader.check_token') + @pytest.mark.usefixtures('alipan_credentials') + @pytest.mark.parametrize('out', ['alipann://bucket/dir', ('./dir1', 'gcs://bucket/dir/')]) + def test_invalid_remote_list(self, mock_create_client: Mock, out: Any): + mock_create_client.side_effect = None + with pytest.raises(ValueError, match=f'Invalid Cloud provider prefix.*'): + _ = AlipanUploader(out=out) + + @patch('streaming.base.storage.upload.AlipanUploader.check_token') + @pytest.mark.usefixtures('alipan_credentials') + def test_local_directory_is_empty(self, mock_create_client: Mock, + local_remote_dir: Tuple[str, str]): + with pytest.raises(FileExistsError, match=f'Directory is not empty.*'): + mock_create_client.side_effect = None + local, _ = local_remote_dir + os.makedirs(local, exist_ok=True) + local_file_path = os.path.join(local, 'file.txt') + # Creating an empty file at specified location + with open(local_file_path, 'w') as _: + pass + _ = AlipanUploader(out=local) + + class TestLocalUploader: def test_upload_file(self, local_remote_dir: Tuple[str, str]):