Skip to content

Commit

Permalink
[S3 Storage] Obey throughput limit configuration (#647)
Browse files Browse the repository at this point in the history
  • Loading branch information
rzvoncek authored Sep 19, 2023
1 parent 5841926 commit c45c94a
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 2 deletions.
20 changes: 20 additions & 0 deletions medusa/storage/abstract_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,23 @@ def _human_readable_size(size, decimal_places=3):
break
size /= 1024.0
return '{:.{}f}{}'.format(size, decimal_places, unit)

@staticmethod
def _human_size_to_bytes(size_str: str) -> int:
multipliers = [
('PB', 1024 ** 5),
('TB', 1024 ** 4),
('GB', 1024 ** 3),
('MB', 1024 ** 2),
('KB', 1024),
('B', 1),
]

cleaned_size_str = size_str.replace(' ', '').replace('/s', '')

for unit, multiplier in multipliers:
if cleaned_size_str.endswith(unit):
size = float(cleaned_size_str.rstrip(unit))
return int(size * multiplier)

raise ValueError(f"Invalid human-friendly size format: {size_str}")
12 changes: 10 additions & 2 deletions medusa/storage/s3_base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ def connect(self):
tcp_keepalive=True
)

self.trasnfer_config = TransferConfig(
# we hard-code this one because the parallelism is for now applied to chunking the files
max_concurrency=4,
max_bandwidth=AbstractStorage._human_size_to_bytes(self.config.transfer_max_bandwidth),
)

self.s3_client = boto3.client(
's3',
config=boto_config,
Expand Down Expand Up @@ -211,6 +217,8 @@ async def _upload_object(self, data: io.BytesIO, object_key: str, headers: t.Dic
)

try:
# not passing in the transfer config because that is meant to cap a throughput
# here we are uploading a small-ish file so no need to cap
self.s3_client.put_object(
Bucket=self.config.bucket_name,
Key=object_key,
Expand Down Expand Up @@ -249,6 +257,7 @@ async def _download_blob(self, src: str, dest: str):
Bucket=self.config.bucket_name,
Key=object_key,
Filename=file_path,
Config=self.trasnfer_config,
)
except Exception as e:
logging.error('Error downloading file from s3://{}/{}: {}'.format(self.config.bucket_name, object_key, e))
Expand Down Expand Up @@ -292,12 +301,11 @@ async def _upload_blob(self, src: str, dest: str) -> ManifestObject:
)
)

config = TransferConfig(max_concurrency=5)
self.s3_client.upload_file(
Filename=src,
Bucket=self.bucket_name,
Key=object_key,
Config=config,
Config=self.trasnfer_config,
ExtraArgs=kms_args,
)

Expand Down
40 changes: 40 additions & 0 deletions tests/storage/abstract_storage_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
# Copyright 2021 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest

from medusa.storage.abstract_storage import AbstractStorage


class S3StorageTest(unittest.TestCase):

def test_convert_human_friendly_size_to_bytes(self):
self.assertEqual(50, AbstractStorage._human_size_to_bytes('50B'))
self.assertEqual(50, AbstractStorage._human_size_to_bytes('50B/s'))
self.assertEqual(50, AbstractStorage._human_size_to_bytes('50 B'))
self.assertEqual(50, AbstractStorage._human_size_to_bytes('50 B '))
self.assertEqual(50, AbstractStorage._human_size_to_bytes(' 50 B '))
self.assertEqual(50, AbstractStorage._human_size_to_bytes(' 50 B / s'))
self.assertEqual(50, AbstractStorage._human_size_to_bytes(' 50 B/s'))
self.assertEqual(50, AbstractStorage._human_size_to_bytes(' 50 B /s'))
self.assertEqual(50, AbstractStorage._human_size_to_bytes(' 50 B/ s'))
self.assertEqual(2 * 1024, AbstractStorage._human_size_to_bytes('2KB'))
self.assertEqual(2 * 1024 ** 2, AbstractStorage._human_size_to_bytes('2MB'))
self.assertEqual(2.5 * 1024 ** 2, AbstractStorage._human_size_to_bytes('2.5MB'))
self.assertEqual(2.5 * 1024 ** 2, AbstractStorage._human_size_to_bytes('2.5MB/s'))
self.assertEqual(2.5 * 1024 ** 2, AbstractStorage._human_size_to_bytes('2.5 MB/s'))
self.assertEqual(2 * 1024 ** 3, AbstractStorage._human_size_to_bytes('2GB'))
self.assertEqual(2 * 1024 ** 4, AbstractStorage._human_size_to_bytes('2TB'))
self.assertEqual(2 * 1024 ** 5, AbstractStorage._human_size_to_bytes('2PB'))

0 comments on commit c45c94a

Please sign in to comment.