Skip to content

Commit

Permalink
fix: cancel upload when BlobWriter exits with exception (#1243)
Browse files Browse the repository at this point in the history
  • Loading branch information
ddelange authored and andrewsg committed Dec 11, 2024
1 parent 22b8c30 commit df107d2
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 7 deletions.
6 changes: 6 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ setup.py file. Applications which do not import directly from
`google-resumable-media` can safely disregard this dependency. This backwards
compatibility feature will be removed in a future major version update.

Miscellaneous
~~~~~~~~~~~~~

- The BlobWriter class now attempts to terminate an ongoing resumable upload if
the writer exits with an exception.

Quick Start
-----------

Expand Down
13 changes: 13 additions & 0 deletions google/cloud/storage/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,19 @@ def close(self):
self._upload_chunks_from_buffer(1)
self._buffer.close()

def terminate(self):
"""Cancel the ResumableUpload."""
if self._upload_and_transport:
upload, transport = self._upload_and_transport
transport.delete(upload.upload_url)
self._buffer.close()

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
self.terminate()
else:
self.close()

@property
def closed(self):
return self._buffer.closed
Expand Down
40 changes: 40 additions & 0 deletions tests/system/test_fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# limitations under the License.


import pytest

from google.cloud.storage.fileio import CHUNK_SIZE_MULTIPLE
from .test_blob import _check_blob_hash


Expand Down Expand Up @@ -76,3 +79,40 @@ def test_blobwriter_and_blobreader_text_mode(
assert text_data[:100] == reader.read(100)
assert 0 == reader.seek(0)
assert reader.read() == text_data


def test_blobwriter_exit(
shared_bucket,
blobs_to_delete,
service_account,
):
blob = shared_bucket.blob("NeverUploaded")

# no-op when nothing was uploaded yet
with pytest.raises(ValueError, match="SIGTERM received"):
with blob.open("wb") as writer:
writer.write(b"first chunk") # not yet uploaded
raise ValueError("SIGTERM received") # no upload to cancel in __exit__
# blob should not exist
assert not blob.exists()

# unhandled exceptions should cancel the upload
with pytest.raises(ValueError, match="SIGTERM received"):
with blob.open("wb", chunk_size=CHUNK_SIZE_MULTIPLE) as writer:
writer.write(b"first chunk") # not yet uploaded
writer.write(bytes(CHUNK_SIZE_MULTIPLE)) # uploaded
raise ValueError("SIGTERM received") # upload is cancelled in __exit__
# blob should not exist
assert not blob.exists()

# handled exceptions should not cancel the upload
with blob.open("wb", chunk_size=CHUNK_SIZE_MULTIPLE) as writer:
writer.write(b"first chunk") # not yet uploaded
writer.write(bytes(CHUNK_SIZE_MULTIPLE)) # uploaded
try:
raise ValueError("This is fine")
except ValueError:
pass # no exception context passed to __exit__
blobs_to_delete.append(blob)
# blob should have been uploaded
assert blob.exists()
61 changes: 54 additions & 7 deletions tests/unit/test_fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import mock

from google.api_core.exceptions import RequestRangeNotSatisfiable
from google.cloud.storage.fileio import CHUNK_SIZE_MULTIPLE
from google.cloud.storage.retry import DEFAULT_RETRY

TEST_TEXT_DATA = string.ascii_lowercase + "\n" + string.ascii_uppercase + "\n"
Expand Down Expand Up @@ -377,7 +378,7 @@ def test_write(self, mock_warn):
# Write under chunk_size. This should be buffered and the upload not
# initiated.
writer.write(TEST_BINARY_DATA[0:4])
blob.initiate_resumable_upload.assert_not_called()
blob._initiate_resumable_upload.assert_not_called()

# Write over chunk_size. This should result in upload initialization
# and multiple chunks uploaded.
Expand Down Expand Up @@ -426,6 +427,52 @@ def test_close_errors(self):
with self.assertRaises(ValueError):
writer.write(TEST_BINARY_DATA)

def test_terminate_after_initiate(self):
blob = mock.Mock()

upload = mock.Mock(upload_url="dummy")
transport = mock.Mock()

blob._initiate_resumable_upload.return_value = (upload, transport)

with self.assertRaises(RuntimeError):
with self._make_blob_writer(blob, chunk_size=CHUNK_SIZE_MULTIPLE) as writer:
writer.write(bytes(CHUNK_SIZE_MULTIPLE + 1)) # initiate upload
raise RuntimeError # should terminate the upload
blob._initiate_resumable_upload.assert_called_once() # upload initiated
self.assertTrue(writer.closed) # terminate called
transport.delete.assert_called_with("dummy") # resumable upload terminated

def test_terminate_before_initiate(self):
blob = mock.Mock()

upload = mock.Mock()
transport = mock.Mock()

blob._initiate_resumable_upload.return_value = (upload, transport)

with self.assertRaises(RuntimeError):
with self._make_blob_writer(blob, chunk_size=CHUNK_SIZE_MULTIPLE) as writer:
writer.write(bytes(CHUNK_SIZE_MULTIPLE - 1)) # upload not yet initiated
raise RuntimeError # there is no resumable upload to terminate
blob._initiate_resumable_upload.assert_not_called() # upload not yet initiated
self.assertTrue(writer.closed) # terminate called
transport.delete.assert_not_called() # there's no resumable upload to terminate

def test_terminate_skipped(self):
blob = mock.Mock()

upload = mock.Mock()
transport = mock.Mock()

blob._initiate_resumable_upload.return_value = (upload, transport)

with self._make_blob_writer(blob, chunk_size=CHUNK_SIZE_MULTIPLE) as writer:
writer.write(bytes(CHUNK_SIZE_MULTIPLE + 1)) # upload initiated
blob._initiate_resumable_upload.assert_called() # upload initiated
self.assertTrue(writer.closed) # close called
transport.delete.assert_not_called() # terminate not called

def test_flush_fails(self):
blob = mock.Mock(chunk_size=None)
writer = self._make_blob_writer(blob)
Expand Down Expand Up @@ -468,7 +515,7 @@ def test_conditional_retry_failure(self):
# Write under chunk_size. This should be buffered and the upload not
# initiated.
writer.write(TEST_BINARY_DATA[0:4])
blob.initiate_resumable_upload.assert_not_called()
blob._initiate_resumable_upload.assert_not_called()

# Write over chunk_size. This should result in upload initialization
# and multiple chunks uploaded.
Expand Down Expand Up @@ -520,7 +567,7 @@ def test_conditional_retry_pass(self):
# Write under chunk_size. This should be buffered and the upload not
# initiated.
writer.write(TEST_BINARY_DATA[0:4])
blob.initiate_resumable_upload.assert_not_called()
blob._initiate_resumable_upload.assert_not_called()

# Write over chunk_size. This should result in upload initialization
# and multiple chunks uploaded.
Expand Down Expand Up @@ -573,7 +620,7 @@ def test_forced_default_retry(self):
# Write under chunk_size. This should be buffered and the upload not
# initiated.
writer.write(TEST_BINARY_DATA[0:4])
blob.initiate_resumable_upload.assert_not_called()
blob._initiate_resumable_upload.assert_not_called()

# Write over chunk_size. This should result in upload initialization
# and multiple chunks uploaded.
Expand Down Expand Up @@ -619,7 +666,7 @@ def test_num_retries_and_retry_conflict(self, mock_warn):
# Write under chunk_size. This should be buffered and the upload not
# initiated.
writer.write(TEST_BINARY_DATA[0:4])
blob.initiate_resumable_upload.assert_not_called()
blob._initiate_resumable_upload.assert_not_called()

# Write over chunk_size. The mock will raise a ValueError, simulating
# actual behavior when num_retries and retry are both specified.
Expand Down Expand Up @@ -673,7 +720,7 @@ def test_num_retries_only(self, mock_warn):
# Write under chunk_size. This should be buffered and the upload not
# initiated.
writer.write(TEST_BINARY_DATA[0:4])
blob.initiate_resumable_upload.assert_not_called()
blob._initiate_resumable_upload.assert_not_called()

# Write over chunk_size. This should result in upload initialization
# and multiple chunks uploaded.
Expand Down Expand Up @@ -965,7 +1012,7 @@ def test_write(self, mock_warn):
# Write under chunk_size. This should be buffered and the upload not
# initiated.
writer.write(TEST_MULTIBYTE_TEXT_DATA[0:2])
blob.initiate_resumable_upload.assert_not_called()
blob._initiate_resumable_upload.assert_not_called()

# Write all data and close.
writer.write(TEST_MULTIBYTE_TEXT_DATA[2:])
Expand Down

0 comments on commit df107d2

Please sign in to comment.