From bbd46adb48d7123407a89eadfcedc08a696b05ba Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 2 Feb 2024 22:00:43 -0500 Subject: [PATCH 1/3] Reduce the number of get requests in gcsio. --- sdks/python/apache_beam/io/gcp/gcsio.py | 49 +++++------- sdks/python/apache_beam/io/gcp/gcsio_test.py | 83 +++++++++++++++----- 2 files changed, 83 insertions(+), 49 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index a6ba82a6e07c..5a4bb91b0bad 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -175,17 +175,14 @@ def open( ValueError: Invalid open file mode. """ bucket_name, blob_name = parse_gcs_path(filename) - bucket = self.client.get_bucket(bucket_name) + bucket = self.client.bucket(bucket_name) if mode == 'r' or mode == 'rb': - blob = bucket.get_blob(blob_name) + blob = bucket.blob(blob_name) return BeamBlobReader(blob, chunk_size=read_buffer_size) elif mode == 'w' or mode == 'wb': - blob = bucket.get_blob(blob_name) - if not blob: - blob = storage.Blob(blob_name, bucket) + blob = bucket.blob(blob_name) return BeamBlobWriter(blob, mime_type) - else: raise ValueError('Invalid file open mode: %s.' % mode) @@ -199,7 +196,7 @@ def delete(self, path): """ bucket_name, blob_name = parse_gcs_path(path) try: - bucket = self.client.get_bucket(bucket_name) + bucket = self.client.bucket(bucket_name) bucket.delete_blob(blob_name) except NotFound: return @@ -228,16 +225,15 @@ def delete_batch(self, paths): with current_batch: for path in current_paths: bucket_name, blob_name = parse_gcs_path(path) - bucket = self.client.get_bucket(bucket_name) + bucket = self.client.bucket(bucket_name) bucket.delete_blob(blob_name) for i, path in enumerate(current_paths): error_code = None - for j in range(2): - resp = current_batch._responses[2 * i + j] - if resp.status_code >= 400 and resp.status_code != 404: - error_code = resp.status_code - break + resp = current_batch._responses[i] + if resp.status_code >= 400 and resp.status_code != 404: + error_code = resp.status_code + break final_results.append((path, error_code)) s += MAX_BATCH_OPERATION_SIZE @@ -258,11 +254,9 @@ def copy(self, src, dest): """ src_bucket_name, src_blob_name = parse_gcs_path(src) dest_bucket_name, dest_blob_name= parse_gcs_path(dest, object_optional=True) - src_bucket = self.get_bucket(src_bucket_name) - src_blob = src_bucket.get_blob(src_blob_name) - if not src_blob: - raise NotFound("Source %s not found", src) - dest_bucket = self.get_bucket(dest_bucket_name) + src_bucket = self.client.bucket(src_bucket_name) + src_blob = src_bucket.blob(src_blob_name) + dest_bucket = self.client.bucket(dest_bucket_name) if not dest_blob_name: dest_blob_name = None src_bucket.copy_blob(src_blob, dest_bucket, new_name=dest_blob_name) @@ -291,19 +285,18 @@ def copy_batch(self, src_dest_pairs): for pair in current_pairs: src_bucket_name, src_blob_name = parse_gcs_path(pair[0]) dest_bucket_name, dest_blob_name = parse_gcs_path(pair[1]) - src_bucket = self.client.get_bucket(src_bucket_name) - src_blob = src_bucket.get_blob(src_blob_name) - dest_bucket = self.client.get_bucket(dest_bucket_name) + src_bucket = self.client.bucket(src_bucket_name) + src_blob = src_bucket.blob(src_blob_name) + dest_bucket = self.client.bucket(dest_bucket_name) src_bucket.copy_blob(src_blob, dest_bucket, dest_blob_name) for i, pair in enumerate(current_pairs): error_code = None - for j in range(4): - resp = current_batch._responses[4 * i + j] - if resp.status_code >= 400: - error_code = resp.status_code - break + resp = current_batch._responses[i] + if resp.status_code >= 400: + error_code = resp.status_code + break final_results.append((pair[0], pair[1], error_code)) s += MAX_BATCH_OPERATION_SIZE @@ -417,7 +410,7 @@ def _gcs_object(self, path): """Returns a gcs object for the given path This method does not perform glob expansion. Hence the given path must be - for a single GCS object. + for a single GCS object. The method will make HTTP requests. Returns: GCS object. """ @@ -470,7 +463,7 @@ def list_files(self, path, with_metadata=False): _LOGGER.debug("Starting the file information of the input") else: _LOGGER.debug("Starting the size estimation of the input") - bucket = self.client.get_bucket(bucket_name) + bucket = self.client.bucket(bucket_name) response = self.client.list_blobs(bucket, prefix=prefix) for item in response: file_name = 'gs://%s/%s' % (item.bucket.name, item.name) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index f8b580c91c95..5995a794f23f 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -43,9 +43,15 @@ class FakeGcsClient(object): def __init__(self): self.buckets = {} + def _add_bucket(self, bucket): + self.buckets[bucket.name] = bucket + return self.buckets[bucket.name] + + def bucket(self, name): + return FakeBucket(self, name) + def create_bucket(self, name): - self.buckets[name] = FakeBucket(self, name) - return self.buckets[name] + return self._add_bucket(self.bucket(name)) def get_bucket(self, name): if name in self.buckets: @@ -92,40 +98,51 @@ def __init__(self, client, name): self.name = name self.blobs = {} self.default_kms_key_name = None - self.client.buckets[name] = self - def add_blob(self, blob): - self.blobs[blob.name] = blob + def _get_canonical_bucket(self): + return self.client.get_bucket(self.name) - def create_blob(self, name): + def _create_blob(self, name): return FakeBlob(name, self) + def add_blob(self, blob): + bucket = self._get_canonical_bucket() + bucket.blobs[blob.name] = blob + return bucket.blobs[blob.name] + + def blob(self, name): + return self._create_blob(name) + def copy_blob(self, blob, dest, new_name=None): + if self.get_blob(blob.name) is None: + raise NotFound("source blob not found") if not new_name: new_name = blob.name - dest.blobs[new_name] = blob - dest.blobs[new_name].name = new_name - dest.blobs[new_name].bucket = dest - return dest.blobs[new_name] + new_blob = FakeBlob(new_name, dest) + dest.add_blob(new_blob) + return new_blob def get_blob(self, blob_name): - if blob_name in self.blobs: - return self.blobs[blob_name] + bucket = self._get_canonical_bucket() + if blob_name in bucket.blobs: + return bucket.blobs[blob_name] else: return None def lookup_blob(self, name): - if name in self.blobs: - return self.blobs[name] + bucket = self._get_canonical_bucket() + if name in bucket.blobs: + return bucket.blobs[name] else: - return self.create_blob(name) + return bucket.create_blob(name) def set_default_kms_key_name(self, name): self.default_kms_key_name = name def delete_blob(self, name): - if name in self.blobs: - del self.blobs[name] + bucket = self._get_canonical_bucket() + if name in bucket.blobs: + del bucket.blobs[name] class FakeBlob(object): @@ -151,12 +168,18 @@ def __init__( self.updated = updated self._fail_when_getting_metadata = fail_when_getting_metadata self._fail_when_reading = fail_when_reading - self.bucket.add_blob(self) def delete(self): - if self.name in self.bucket.blobs: - del self.bucket.blobs[self.name] + self.bucket.delete_blob(self.name) + + def download_as_bytes(self, **kwargs): + blob = self.bucket.get_blob(self.name) + if blob is None: + raise NotFound("blob not found") + return blob.contents + def __eq__(self, other): + return self.bucket.get_blob(self.name) is other.bucket.get_blob(other.name) @unittest.skipIf(NotFound is None, 'GCP dependencies are not installed') class TestGCSPathParser(unittest.TestCase): @@ -224,6 +247,7 @@ def _insert_random_file( updated=updated, fail_when_getting_metadata=fail_when_getting_metadata, fail_when_reading=fail_when_reading) + bucket.add_blob(blob) return blob def setUp(self): @@ -475,8 +499,25 @@ def test_list_prefix(self): def test_downloader_fail_non_existent_object(self): file_name = 'gs://gcsio-metrics-test/dummy_mode_file' with self.assertRaises(NotFound): - self.gcs.open(file_name, 'r') + with self.gcs.open(file_name, 'r') as f: + f.read(1) + + def test_blob_delete(self): + file_name = 'gs://gcsio-test/delete_me' + file_size = 1024 + bucket_name, blob_name = gcsio.parse_gcs_path(file_name) + # Test deletion of non-existent file. + bucket = self.client.get_bucket(bucket_name) + self.gcs.delete(file_name) + + self._insert_random_file(self.client, file_name, file_size) + self.assertTrue(blob_name in bucket.blobs) + blob = bucket.get_blob(blob_name) + self.assertIsNotNone(blob) + + blob.delete() + self.assertFalse(blob_name in bucket.blobs) if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From d7f7b43248b370188df6775b604b70fd63512713 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 2 Feb 2024 22:28:37 -0500 Subject: [PATCH 2/3] Apply formatter. --- sdks/python/apache_beam/io/gcp/gcsio_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index 5995a794f23f..c9a7fb72f779 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -181,6 +181,7 @@ def download_as_bytes(self, **kwargs): def __eq__(self, other): return self.bucket.get_blob(self.name) is other.bucket.get_blob(other.name) + @unittest.skipIf(NotFound is None, 'GCP dependencies are not installed') class TestGCSPathParser(unittest.TestCase): @@ -519,6 +520,7 @@ def test_blob_delete(self): blob.delete() self.assertFalse(blob_name in bucket.blobs) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() From 8f51910b87d63b373da307068686ec53dbd7d79b Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 5 Feb 2024 09:44:15 -0500 Subject: [PATCH 3/3] Replace get_bucket with bucket in _gcs_object --- sdks/python/apache_beam/io/gcp/gcsio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 5a4bb91b0bad..b5a291428767 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -415,7 +415,7 @@ def _gcs_object(self, path): Returns: GCS object. """ bucket_name, blob_name = parse_gcs_path(path) - bucket = self.client.get_bucket(bucket_name) + bucket = self.client.bucket(bucket_name) blob = bucket.get_blob(blob_name) if blob: return blob