Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Alipan Storage backend #651

Merged
merged 8 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions docs/source/how_to_guides/configure_cloud_storage_credentials.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,50 @@ export DATABRICKS_HOST='hostname'
export DATABRICKS_TOKEN='token key'
```
````


## Alipan

To authenticate Alipan access, users must set their Alipan refresh token (`ALIPAN_WEB_REFRESH_TOKEN`) in the run environment.

To get the refresh token from the Alipan website, the user needs to login to the [Alipan website](https://www.alipan.com/drive), go to the console of the browser's devTools, and pass the code below into the console to get the refresh token.

```javascript
JSON.parse(localStorage.token).refresh_token;
```

Then set the refresh token in the run environment.

````{tabs}
```{code-tab} py
import os
os.environ['ALIPAN_WEB_REFRESH_TOKEN'] = 'refresh_token'
```

```{code-tab} sh
export ALIPAN_WEB_REFRESH_TOKEN='refresh_token'
```
````

### Encryption Data

If you want to encrypt data in cloud storage, you can set the environment variable below to encrypt and decrypt the data.

````{tabs}
```{code-tab} py
import os
os.environ['ALIPAN_ENCRYPT_PASSWORD'] = 'encryption_key'

# For uploading, the encryption type must be set as one of `Simple`, `ChaCha20`, `AES256CBC`
# When downloading, this environment variable is not required
os.environ['ALIPAN_ENCRYPT_TYPE'] = 'AES256CBC'
```

```{code-tab} sh
export ALIPAN_ENCRYPT_PASSWORD='encryption_key'

# For uploading, the encryption type must be set as one of `Simple`, `ChaCha20`, `AES256CBC`
# When downloading, this environment variable is not required
export ALIPAN_ENCRYPT_TYPE='AES256CBC'
```
````
4 changes: 4 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@
'databricks-sdk==0.23.0',
]

extra_deps['alipan'] = [
'AliPCS-Py>=0.8,<1',
]

extra_deps['testing'] = [
'mosaicml-cli>=0.5.25,<0.7',
]
Expand Down
5 changes: 3 additions & 2 deletions streaming/base/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

"""Base module for downloading/uploading files from/to cloud storage."""

from streaming.base.storage.download import (download_file, download_from_azure,
download_from_azure_datalake,
from streaming.base.storage.download import (download_file, download_from_alipan,
download_from_azure, download_from_azure_datalake,
download_from_databricks_unity_catalog,
download_from_dbfs, download_from_gcs,
download_from_local, download_from_oci,
Expand All @@ -29,5 +29,6 @@
'download_from_azure_datalake',
'download_from_databricks_unity_catalog',
'download_from_dbfs',
'download_from_alipan',
'download_from_local',
]
47 changes: 47 additions & 0 deletions streaming/base/storage/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
'download_from_azure_datalake',
'download_from_databricks_unity_catalog',
'download_from_dbfs',
'download_from_alipan',
'download_from_local',
]

Expand Down Expand Up @@ -422,6 +423,50 @@ def download_from_dbfs(remote: str, local: str) -> None:
os.rename(local_tmp, local)


def download_from_alipan(remote: str, local: str) -> None:
"""Download a file from remote Alipan to local.

Args:
remote (str): Remote path (Alipan).
local (str): Local path (local filesystem).
"""
from alipcs_py.alipcs import AliPCSApiMix
from alipcs_py.commands.download import download_file

web_refresh_token = os.environ['ALIPAN_WEB_REFRESH_TOKEN']
web_token_type = 'Bearer'
alipan_encrypt_password = os.environ.get('ALIPAN_ENCRYPT_PASSWORD', '').encode()

api = AliPCSApiMix(web_refresh_token, web_token_type=web_token_type)

obj = urllib.parse.urlparse(remote)
if obj.scheme != 'alipan':
raise ValueError(
f'Expected obj.scheme to be `alipan`, instead, got {obj.scheme} for remote={remote}')
if obj.netloc != '':
raise ValueError(
f'Expected remote to be alipan:///path/to/some, instead, got remote={remote}')

remote_path = obj.path
filename = pathlib.PosixPath(remote_path).name
localdir = pathlib.Path(local).parent

remote_pcs_file = api.get_file(remotepath=remote_path)
if remote_pcs_file is None:
raise FileNotFoundError(f'Object {remote} not found.')

download_file(
api,
remote_pcs_file,
localdir=localdir,
downloader='me',
concurrency=1,
show_progress=False,
encrypt_password=alipan_encrypt_password,
)
os.rename(localdir / filename, local)


def download_from_local(remote: str, local: str) -> None:
"""Download a file from remote to local.

Expand Down Expand Up @@ -474,6 +519,8 @@ def download_file(remote: Optional[str], local: str, timeout: float):
download_from_databricks_unity_catalog(remote, local)
elif remote.startswith('dbfs:/'):
download_from_dbfs(remote, local)
elif remote.startswith('alipan://'):
snarayan21 marked this conversation as resolved.
Show resolved Hide resolved
download_from_alipan(remote, local)
else:
download_from_local(remote, local)

Expand Down
133 changes: 133 additions & 0 deletions streaming/base/storage/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
'AzureUploader',
'DatabricksUnityCatalogUploader',
'DBFSUploader',
'AlipanUploader',
'LocalUploader',
]

Expand All @@ -40,6 +41,7 @@
'azure-dl': 'AzureDataLakeUploader',
'dbfs:/Volumes': 'DatabricksUnityCatalogUploader',
'dbfs': 'DBFSUploader',
'alipan': 'AlipanUploader',
'': 'LocalUploader',
}

Expand Down Expand Up @@ -945,6 +947,137 @@ def check_folder_exists(self):
raise e


class AlipanUploader(CloudUploader):
"""Upload file from local machine to Alipan.

Args:
out (str | Tuple[str, str]): Output dataset directory to save shard files.

1. If ``out`` is a local directory, shard files are saved locally.
2. If ``out`` is a remote directory, a local temporary directory is created to
cache the shard files and then the shard files are uploaded to a remote
location. At the end, the temp directory is deleted once shards are uploaded.
3. If ``out`` is a tuple of ``(local_dir, remote_dir)``, shard files are saved in
the `local_dir` and also uploaded to a remote location.
keep_local (bool): If the dataset is uploaded, whether to keep the local dataset
shard file or remove it after uploading. Defaults to ``False``.
progress_bar (bool): Display TQDM progress bars for uploading output dataset files to
a remote location. Default to ``False``.
retry (int): Number of times to retry uploading a file. Defaults to ``2``.
exist_ok (bool): When exist_ok = False, raise error if the local part of ``out`` already
exists and has contents. Defaults to ``False``.
"""

def __init__(self,
out: Union[str, Tuple[str, str]],
keep_local: bool = False,
progress_bar: bool = False,
retry: int = 2,
exist_ok: bool = False) -> None:
super().__init__(out, keep_local, progress_bar, retry, exist_ok)

obj = urllib.parse.urlparse(self.remote)
if obj.scheme != 'alipan':
raise ValueError(
f'Expected obj.scheme to be `alipan`, instead, got {obj.scheme} for remote={self.remote}'
)
if obj.netloc != '':
raise ValueError(
f'Expected remote to be alipan:///path/to/some, instead, got remote={self.remote}')

from alipcs_py.alipcs import AliPCSApiMix
from alipcs_py.commands.upload import EncryptType

web_refresh_token = os.environ['ALIPAN_WEB_REFRESH_TOKEN']
web_token_type = 'Bearer'
self.alipan_encrypt_password = os.environ.get('ALIPAN_ENCRYPT_PASSWORD', '').encode()
self.alipan_encrypt_type = EncryptType.No # No encryption by default
encrypt_type = os.environ.get('ALIPAN_ENCRYPT_TYPE', '')
if encrypt_type:
if getattr(EncryptType, encrypt_type, None) is None:
raise ValueError(
f'Invalid ALIPAN_ENCRYPT_TYPE: {encrypt_type}. ' +
'Encryption type must be one of `Simple`, `ChaCha20`, `AES256CBC`')
else:
self.alipan_encrypt_type = getattr(EncryptType, encrypt_type)

self.api = AliPCSApiMix(web_refresh_token, web_token_type=web_token_type)
self.check_token()

def check_token(self):
"""Raise an exception if the refresh_token is invalid.

Raises:
error: AliPCSError with code `AccessTokenInvalid`
"""
self.api.refresh()

def upload_file(self, filename: str):
"""Upload file from local instance to Alipan directory.

Args:
filename (str): File to upload.
"""
from alipcs_py.commands import upload as alipcs_upload
from alipcs_py.commands.upload import EncryptType, total_len

@retry(num_attempts=self.retry)
def _upload_file():
local_filename = os.path.join(self.local, filename)
remote_filename = os.path.join(self.remote, filename) # pyright: ignore
obj = urllib.parse.urlparse(remote_filename)
logger.debug(f'Uploading to {remote_filename}')
if self.alipan_encrypt_type == EncryptType.No:
file_size = os.stat(local_filename).st_size
else:
file_size = self.alipan_encrypt_type
encrypt_io = self.alipan_encrypt_type.encrypt_io(open(local_filename, 'rb'),
self.alipan_encrypt_password)
file_size = total_len(encrypt_io)
with tqdm.tqdm(total=file_size,
unit='B',
unit_scale=True,
desc=f'Uploading to {remote_filename}',
disable=(not self.progress_bar)) as pbar:
alipcs_upload.upload_file(
self.api, (local_filename, obj.path),
'overwrite',
encrypt_password=self.alipan_encrypt_password,
encrypt_type=self.alipan_encrypt_type,
callback_for_monitor=lambda offset: pbar.update(offset - pbar.n))
self.clear_local(local=local_filename)

_upload_file()

def list_objects(self, prefix: Optional[str] = None) -> List[str]:
"""List all objects in the remote path with the given prefix.

Args:
prefix (Optional[str], optional): The prefix to search for. Defaults to ``None``.

Returns:
List[str]: A list of object names that match the prefix.
"""
if prefix is None:
prefix = ''

if self.remote is None:
raise ValueError('Alipan remote path must be set.')

obj = urllib.parse.urlparse(self.remote)
remote_pcs_file = self.api.get_file(remotepath=obj.path)
if remote_pcs_file is None:
raise FileNotFoundError(f'Alipan remote path `{obj.path}` not found.')

file_paths = []
for pcs_file in self.api.list_iter(remote_pcs_file.file_id, recursive=True):
file_path = pathlib.PosixPath(obj.path, pcs_file.path).as_posix()
if file_path.startswith(prefix):
file_paths.append(file_path)

return sorted(file_paths)


class LocalUploader(CloudUploader):
"""Copy file from one local directory to another local directory.

Expand Down
8 changes: 8 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,11 @@ def test_list_r2_buckets():
client = boto3.client('s3', region_name='us-east-1', endpoint_url=R2_URL)
buckets = client.list_buckets()
assert buckets['Buckets'][0]['Name'] == MY_BUCKET


@pytest.fixture()
def alipan_credentials():
"""Mocked alipan Credentials."""
os.environ['ALIPAN_WEB_REFRESH_TOKEN'] = 'testing'
yield
del os.environ['ALIPAN_WEB_REFRESH_TOKEN']
8 changes: 8 additions & 0 deletions tests/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ def test_download_from_dbfs_gets_called(self, mocked_requests: Mock, remote_loca
mocked_requests.assert_called_once()
mocked_requests.assert_called_once_with(mock_remote_filepath, mock_local_filepath)

@patch('streaming.base.storage.download.download_from_alipan')
@pytest.mark.usefixtures('remote_local_file')
def test_download_from_alipan_gets_called(self, mocked_requests: Mock, remote_local_file: Any):
mock_remote_filepath, mock_local_filepath = remote_local_file(cloud_prefix='alipan://')
download_file(mock_remote_filepath, mock_local_filepath, 60)
mocked_requests.assert_called_once()
mocked_requests.assert_called_once_with(mock_remote_filepath, mock_local_filepath)

@patch('streaming.base.storage.download.download_from_local')
@pytest.mark.usefixtures('remote_local_file')
def test_download_from_local_gets_called(self, mocked_requests: Mock, remote_local_file: Any):
Expand Down
43 changes: 39 additions & 4 deletions tests/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import boto3
import pytest

from streaming.base.storage.upload import (AzureDataLakeUploader, AzureUploader, CloudUploader,
DatabricksUnityCatalogUploader, DBFSUploader,
GCSAuthentication, GCSUploader, LocalUploader,
S3Uploader)
from streaming.base.storage.upload import (AlipanUploader, AzureDataLakeUploader, AzureUploader,
CloudUploader, DatabricksUnityCatalogUploader,
DBFSUploader, GCSAuthentication, GCSUploader,
LocalUploader, S3Uploader)
from tests.conftest import MY_BUCKET, R2_URL

MY_PREFIX = 'train'
Expand Down Expand Up @@ -488,6 +488,41 @@ def test_local_directory_is_empty(self, mock_create_client: Mock,
_ = DBFSUploader(out=local)


class TestAlipanUploader:

@patch('streaming.base.storage.upload.AlipanUploader.check_token')
@pytest.mark.usefixtures('alipan_credentials')
@pytest.mark.parametrize('out',
['alipan:///container/dir', ('./dir1', 'alipan:///container/dir/')])
def test_instantiation(self, mock_create_client: Mock, out: Any):
mock_create_client.side_effect = None
_ = AlipanUploader(out=out)
if not isinstance(out, str):
shutil.rmtree(out[0], ignore_errors=True)

@patch('streaming.base.storage.upload.AlipanUploader.check_token')
@pytest.mark.usefixtures('alipan_credentials')
@pytest.mark.parametrize('out', ['alipann://bucket/dir', ('./dir1', 'gcs://bucket/dir/')])
def test_invalid_remote_list(self, mock_create_client: Mock, out: Any):
mock_create_client.side_effect = None
with pytest.raises(ValueError, match=f'Invalid Cloud provider prefix.*'):
_ = AlipanUploader(out=out)

@patch('streaming.base.storage.upload.AlipanUploader.check_token')
@pytest.mark.usefixtures('alipan_credentials')
def test_local_directory_is_empty(self, mock_create_client: Mock,
local_remote_dir: Tuple[str, str]):
with pytest.raises(FileExistsError, match=f'Directory is not empty.*'):
mock_create_client.side_effect = None
local, _ = local_remote_dir
os.makedirs(local, exist_ok=True)
local_file_path = os.path.join(local, 'file.txt')
# Creating an empty file at specified location
with open(local_file_path, 'w') as _:
pass
_ = AlipanUploader(out=local)


class TestLocalUploader:

def test_upload_file(self, local_remote_dir: Tuple[str, str]):
Expand Down
Loading