Skip to content

Commit

Permalink
fix: propagate timeout in BlobWriter (#1186)
Browse files Browse the repository at this point in the history

Fixes #1184
  • Loading branch information
cojenco authored Dec 4, 2023
1 parent 9e4d1d8 commit 22f36da
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
8 changes: 7 additions & 1 deletion google/cloud/storage/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,15 @@ def _upload_chunks_from_buffer(self, num_chunks):

upload, transport = self._upload_and_transport

# Attach timeout if specified in the keyword arguments.
# Otherwise, the default timeout will be used from the media library.
kwargs = {}
if "timeout" in self._upload_kwargs:
kwargs = {"timeout": self._upload_kwargs.get("timeout")}

# Upload chunks. The SlidingBuffer class will manage seek position.
for _ in range(num_chunks):
upload.transmit_next_chunk(transport)
upload.transmit_next_chunk(transport, **kwargs)

# Wipe the buffer of chunks uploaded, preserving any remaining data.
self._buffer.flush()
Expand Down
10 changes: 7 additions & 3 deletions tests/unit/test_fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ def test_write(self, mock_warn):
blob = mock.Mock()
upload = mock.Mock()
transport = mock.Mock()
timeout = 600

blob._initiate_resumable_upload.return_value = (upload, transport)

Expand All @@ -354,7 +355,10 @@ def test_write(self, mock_warn):
# arguments are used.
# It would be normal to use a context manager here, but not doing so
# gives us more control over close() for test purposes.
upload_kwargs = {"if_metageneration_match": 1}
upload_kwargs = {
"if_metageneration_match": 1,
"timeout": timeout,
}
chunk_size = 8 # Note: Real upload requires a multiple of 256KiB.
writer = self._make_blob_writer(
blob,
Expand All @@ -366,7 +370,7 @@ def test_write(self, mock_warn):

# The transmit_next_chunk method must actually consume bytes from the
# sliding buffer for the flush() feature to work properly.
upload.transmit_next_chunk.side_effect = lambda _: writer._buffer.read(
upload.transmit_next_chunk.side_effect = lambda _, timeout: writer._buffer.read(
chunk_size
)

Expand All @@ -388,7 +392,7 @@ def test_write(self, mock_warn):
retry=None,
**upload_kwargs
)
upload.transmit_next_chunk.assert_called_with(transport)
upload.transmit_next_chunk.assert_called_with(transport, timeout=timeout)
self.assertEqual(upload.transmit_next_chunk.call_count, 4)

# Write another byte, finalize and close.
Expand Down

0 comments on commit 22f36da

Please sign in to comment.