-
Notifications
You must be signed in to change notification settings - Fork 0
/
s3_transfer.py
105 lines (82 loc) · 2.56 KB
/
s3_transfer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""Non-production script to copy large files to S3."""
import os
from pathlib import Path
import boto3
import typer
import tqdm
from typing import Union, List
from boto3.s3.transfer import TransferConfig
from botocore.exceptions import ClientError
app = typer.Typer()
S3_CLIENT = boto3.resource("s3")
@app.command()
def download_file(
bucket_name: str = typer.Argument(
...,
help='The name of the S3 bucket'
),
src: str = typer.Argument(
...,
help='The files to download'
),
dest: str = typer.Argument(
...,
help='The destination directory'
)
):
"""Download a file"""
src = Path(src)
dest = Path(dest).expanduser().resolve()
bucket = S3_CLIENT.Bucket(bucket_name)
if dest.is_dir():
dest = dest / src.name
typer.secho(f'Downloading {src} to {dest}', fg='green')
with open(dest, 'wb') as f:
try:
bucket.download_fileobj(str(src), f)
except ClientError as err:
typer.secho(
f'Error occurred while uploading file - {err}', err=True
)
raise typer.Exit(1)
@app.command()
def transfer(
file_path: str = typer.Argument(..., help="The file to upload"),
bucket_name: str = typer.Argument(..., help="The name of the S3 bucket"), # NOQA
prefix: str = typer.Option(
None, help='The path in S3 to upload the file to'),
object_name: str = typer.Option(
None, help='Optional - rename the uploaded file')
):
"""Performs multi-part uploads of files to S3."""
if not isinstance(file_path, Path):
file_path = Path(file_path)
if object_name is None:
object_name = file_path.name
config = TransferConfig(
multipart_threshold=1024 * 25,
max_concurrency=20,
multipart_chunksize=1024 * 25,
use_threads=True
)
if prefix:
object_name = f'{prefix}/{object_name}'
try:
s3_object = S3_CLIENT.Object(
bucket_name,
object_name
)
file_size = os.stat(file_path).st_size
with tqdm.tqdm(total=file_size, unit='B', unit_scale=True, desc=file_path.name) as pbar:
s3_object.upload_file(
Filename=str(file_path),
Config=config,
Callback=lambda bytes_transferred: pbar.update(
bytes_transferred
)
)
except ClientError as err:
typer.secho(f'Error occurred while uploading file - {err}', err=True)
raise typer.Exit(1)
if __name__ == "__main__":
app()