From e1d0a55a346434ab7f4d627059090dbc356516be Mon Sep 17 00:00:00 2001 From: tomg10 Date: Mon, 22 Jan 2024 15:56:28 +0100 Subject: [PATCH 01/10] added unauthenticated version of gcs object store --- Cargo.lock | 1 + core/lib/object_store/Cargo.toml | 1 + .../object_store/src/gcs_unauthenticated.rs | 111 ++++++++++++++++++ core/lib/object_store/src/lib.rs | 1 + 4 files changed, 114 insertions(+) create mode 100644 core/lib/object_store/src/gcs_unauthenticated.rs diff --git a/Cargo.lock b/Cargo.lock index 6aae513b69a..c542a214a06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9259,6 +9259,7 @@ dependencies = [ "google-cloud-storage", "http", "prost", + "reqwest", "serde_json", "tempdir", "tokio", diff --git a/core/lib/object_store/Cargo.toml b/core/lib/object_store/Cargo.toml index ec42f47c6bf..85ed7b875f1 100644 --- a/core/lib/object_store/Cargo.toml +++ b/core/lib/object_store/Cargo.toml @@ -26,6 +26,7 @@ flate2 = "1.0.28" tokio = { version = "1.21.2", features = ["full"] } tracing = "0.1" prost = "0.12.1" +reqwest = { version = "0.11", features = ["blocking"] } [dev-dependencies] tempdir = "0.3.7" diff --git a/core/lib/object_store/src/gcs_unauthenticated.rs b/core/lib/object_store/src/gcs_unauthenticated.rs new file mode 100644 index 00000000000..b5bd158d009 --- /dev/null +++ b/core/lib/object_store/src/gcs_unauthenticated.rs @@ -0,0 +1,111 @@ +use std::time::Duration; + +use async_trait::async_trait; +use http::StatusCode; +use reqwest::Error; + +use crate::{raw::BoxedError, Bucket, ObjectStore, ObjectStoreError}; + +#[derive(Debug)] +pub struct UnauthenticatedGoogleCloudStorage { + bucket_prefix: String, + max_retries: u16, +} + +#[async_trait] +impl ObjectStore for UnauthenticatedGoogleCloudStorage { + async fn get_raw(&self, bucket: Bucket, key: &str) -> Result, ObjectStoreError> { + const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(600); + let client = reqwest::Client::builder() + .timeout(DOWNLOAD_TIMEOUT) + .build() + .map_err(|error| ObjectStoreError::Other(error.into()))?; + + let mut last_error: Option = None; + let retry_count = self.max_retries; + for retry_number in 0..retry_count { + let url = format!("{}/{key}", self.storage_prefix_raw(bucket)); + let response = client.get(url.to_string()).send().await; + + match response { + Ok(response) => { + let response_code = response.status(); + match response_code { + StatusCode::NOT_FOUND => { + let error_message = format!( + "missing key: {key} in bucket {bucket} (got 404 from {url})" + ); + return Err(ObjectStoreError::KeyNotFound(error_message.into())); + } + StatusCode::OK => {} + _ => { + let error_message = format!("unexpected error when fetching {key} from bucket {bucket}, received code {response_code}"); + return Err(ObjectStoreError::Other(error_message.into())); + } + } + let bytes = response.bytes().await.map(|bytes| bytes.to_vec()); + match bytes { + Ok(bytes) => return Ok(bytes), + Err(error) => { + last_error = Some(error); + } + } + } + Err(error) => { + last_error = Some(error); + } + } + + tracing::warn!( + "Failed to download {url} (attempt {}/{retry_count}). Backing off for 5 second", + retry_number + 1 + ); + tokio::time::sleep(Duration::from_secs(5)).await; + } + Err(ObjectStoreError::Other(BoxedError::from( + last_error.unwrap(), + ))) + } + + async fn put_raw( + &self, + _bucket: Bucket, + _key: &str, + _value: Vec, + ) -> Result<(), ObjectStoreError> { + unimplemented!("This store is read-only!") + } + + async fn remove_raw(&self, _bucket: Bucket, _key: &str) -> Result<(), ObjectStoreError> { + unimplemented!("This store is read-only!") + } + + fn storage_prefix_raw(&self, bucket: Bucket) -> String { + format!( + "https://storage.googleapis.com/{}/{}", + self.bucket_prefix.clone(), + bucket.as_str() + ) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + async fn test_happy_path() { + let store = UnauthenticatedGoogleCloudStorage { + bucket_prefix: "zksync-era-stage-external-node-snapshots".to_string(), + max_retries: 5, + }; + let file = store + .get_raw( + Bucket::StorageSnapshot, + "snapshot_l1_batch_460109_storage_logs_part_01016.proto.gzip", + ) + .await + .unwrap(); + assert_eq!(0, file.len()) + } +} diff --git a/core/lib/object_store/src/lib.rs b/core/lib/object_store/src/lib.rs index bf6630ef060..b6ae85ec39f 100644 --- a/core/lib/object_store/src/lib.rs +++ b/core/lib/object_store/src/lib.rs @@ -25,6 +25,7 @@ mod file; mod gcs; +mod gcs_unauthenticated; mod metrics; mod mock; mod objects; From 26a426b35288b21a1b84bc72ad081689ce802de1 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Mon, 22 Jan 2024 15:58:14 +0100 Subject: [PATCH 02/10] remove non-unit test --- .../object_store/src/gcs_unauthenticated.rs | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/core/lib/object_store/src/gcs_unauthenticated.rs b/core/lib/object_store/src/gcs_unauthenticated.rs index b5bd158d009..15a7899bf07 100644 --- a/core/lib/object_store/src/gcs_unauthenticated.rs +++ b/core/lib/object_store/src/gcs_unauthenticated.rs @@ -88,24 +88,3 @@ impl ObjectStore for UnauthenticatedGoogleCloudStorage { ) } } - -#[cfg(test)] -mod test { - use super::*; - - #[tokio::test] - async fn test_happy_path() { - let store = UnauthenticatedGoogleCloudStorage { - bucket_prefix: "zksync-era-stage-external-node-snapshots".to_string(), - max_retries: 5, - }; - let file = store - .get_raw( - Bucket::StorageSnapshot, - "snapshot_l1_batch_460109_storage_logs_part_01016.proto.gzip", - ) - .await - .unwrap(); - assert_eq!(0, file.len()) - } -} From cf47f60952109eba7816b81875031e2e4b538612 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Mon, 22 Jan 2024 16:05:33 +0100 Subject: [PATCH 03/10] add constructor from config --- core/lib/config/src/configs/object_store.rs | 1 + core/lib/object_store/src/gcs_unauthenticated.rs | 9 +++++++++ core/lib/object_store/src/raw.rs | 9 +++++++++ 3 files changed, 19 insertions(+) diff --git a/core/lib/config/src/configs/object_store.rs b/core/lib/config/src/configs/object_store.rs index 5524b6ade25..368e7f32dc9 100644 --- a/core/lib/config/src/configs/object_store.rs +++ b/core/lib/config/src/configs/object_store.rs @@ -5,6 +5,7 @@ pub enum ObjectStoreMode { GCS, GCSWithCredentialFile, FileBacked, + GCSUnauthenticated, } /// Configuration for the object store diff --git a/core/lib/object_store/src/gcs_unauthenticated.rs b/core/lib/object_store/src/gcs_unauthenticated.rs index 15a7899bf07..2403f3b3c45 100644 --- a/core/lib/object_store/src/gcs_unauthenticated.rs +++ b/core/lib/object_store/src/gcs_unauthenticated.rs @@ -12,6 +12,15 @@ pub struct UnauthenticatedGoogleCloudStorage { max_retries: u16, } +impl UnauthenticatedGoogleCloudStorage { + pub fn new(bucket_prefix: String, max_retries: u16) -> UnauthenticatedGoogleCloudStorage { + Self { + bucket_prefix, + max_retries, + } + } +} + #[async_trait] impl ObjectStore for UnauthenticatedGoogleCloudStorage { async fn get_raw(&self, bucket: Bucket, key: &str) -> Result, ObjectStoreError> { diff --git a/core/lib/object_store/src/raw.rs b/core/lib/object_store/src/raw.rs index 764809764da..8bb8139011f 100644 --- a/core/lib/object_store/src/raw.rs +++ b/core/lib/object_store/src/raw.rs @@ -3,6 +3,7 @@ use std::{error, fmt, sync::Arc}; use async_trait::async_trait; use zksync_config::configs::object_store::{ObjectStoreConfig, ObjectStoreMode}; +use crate::gcs_unauthenticated::UnauthenticatedGoogleCloudStorage; use crate::{file::FileBackedObjectStore, gcs::GoogleCloudStorage, mock::MockStore}; /// Bucket for [`ObjectStore`] in which objects can be placed. @@ -218,6 +219,14 @@ impl ObjectStoreFactory { let store = FileBackedObjectStore::new(config.file_backed_base_path.clone()).await; Arc::new(store) } + ObjectStoreMode::GCSUnauthenticated => { + tracing::trace!("Initialized GoogleCloudStorageUnauthenticated store"); + let store = UnauthenticatedGoogleCloudStorage::new( + config.bucket_base_url.clone(), + config.max_retries, + ); + Arc::new(store) + } } } } From 19a6708341ddf462b5d6bc58c9e673f05c6a093e Mon Sep 17 00:00:00 2001 From: tomg10 Date: Mon, 22 Jan 2024 16:06:32 +0100 Subject: [PATCH 04/10] zk fmt --- core/lib/object_store/src/raw.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/lib/object_store/src/raw.rs b/core/lib/object_store/src/raw.rs index 8bb8139011f..25f9171e215 100644 --- a/core/lib/object_store/src/raw.rs +++ b/core/lib/object_store/src/raw.rs @@ -3,8 +3,10 @@ use std::{error, fmt, sync::Arc}; use async_trait::async_trait; use zksync_config::configs::object_store::{ObjectStoreConfig, ObjectStoreMode}; -use crate::gcs_unauthenticated::UnauthenticatedGoogleCloudStorage; -use crate::{file::FileBackedObjectStore, gcs::GoogleCloudStorage, mock::MockStore}; +use crate::{ + file::FileBackedObjectStore, gcs::GoogleCloudStorage, + gcs_unauthenticated::UnauthenticatedGoogleCloudStorage, mock::MockStore, +}; /// Bucket for [`ObjectStore`] in which objects can be placed. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] From cec65b094f6c39d576d50c4b6f84e77ad715e9c6 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Mon, 22 Jan 2024 19:57:27 +0100 Subject: [PATCH 05/10] PR feedback --- Cargo.lock | 42 ++++- core/lib/config/src/configs/object_store.rs | 2 +- core/lib/object_store/Cargo.toml | 1 + core/lib/object_store/src/gcs_public.rs | 165 ++++++++++++++++++ .../object_store/src/gcs_unauthenticated.rs | 99 ----------- core/lib/object_store/src/lib.rs | 2 +- core/lib/object_store/src/raw.rs | 11 +- 7 files changed, 216 insertions(+), 106 deletions(-) create mode 100644 core/lib/object_store/src/gcs_public.rs delete mode 100644 core/lib/object_store/src/gcs_unauthenticated.rs diff --git a/Cargo.lock b/Cargo.lock index c542a214a06..0e9322b97a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -487,6 +487,16 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "assert_matches" version = "1.5.0" @@ -1460,6 +1470,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "colored" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbf2150cce219b664a8a70df7a1f933836724b503f8a413af9365b4dcc4d90b8" +dependencies = [ + "lazy_static", + "windows-sys 0.48.0", +] + [[package]] name = "compile-fmt" version = "0.1.0" @@ -4199,6 +4219,25 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mockito" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8d3038e23466858569c2d30a537f691fa0d53b51626630ae08262943e3bbb8b" +dependencies = [ + "assert-json-diff", + "colored", + "futures 0.3.28", + "hyper", + "log", + "rand 0.8.5", + "regex", + "serde_json", + "serde_urlencoded", + "similar", + "tokio", +] + [[package]] name = "multimap" version = "0.8.3" @@ -8696,7 +8735,7 @@ dependencies = [ "crossbeam 0.8.2", "curl", "derivative", - "env_logger 0.9.3", + "env_logger 0.10.0", "hex", "lazy_static", "rand 0.4.6", @@ -9258,6 +9297,7 @@ dependencies = [ "google-cloud-auth", "google-cloud-storage", "http", + "mockito", "prost", "reqwest", "serde_json", diff --git a/core/lib/config/src/configs/object_store.rs b/core/lib/config/src/configs/object_store.rs index 368e7f32dc9..9b5f40aa89b 100644 --- a/core/lib/config/src/configs/object_store.rs +++ b/core/lib/config/src/configs/object_store.rs @@ -5,7 +5,7 @@ pub enum ObjectStoreMode { GCS, GCSWithCredentialFile, FileBacked, - GCSUnauthenticated, + GCSPublicReadOnly, } /// Configuration for the object store diff --git a/core/lib/object_store/Cargo.toml b/core/lib/object_store/Cargo.toml index 85ed7b875f1..b0e5aa1eebc 100644 --- a/core/lib/object_store/Cargo.toml +++ b/core/lib/object_store/Cargo.toml @@ -30,3 +30,4 @@ reqwest = { version = "0.11", features = ["blocking"] } [dev-dependencies] tempdir = "0.3.7" +mockito = "1.2.0" diff --git a/core/lib/object_store/src/gcs_public.rs b/core/lib/object_store/src/gcs_public.rs new file mode 100644 index 00000000000..0dc57f026ba --- /dev/null +++ b/core/lib/object_store/src/gcs_public.rs @@ -0,0 +1,165 @@ +use std::time::Duration; + +use anyhow::anyhow; +use async_trait::async_trait; +use http::StatusCode; +use reqwest::Client; + +use crate::{raw::BoxedError, Bucket, ObjectStore, ObjectStoreError}; + +#[derive(Debug)] +pub struct PublicReadOnlyGoogleCloudStorage { + gcs_base_url: String, + bucket_prefix: String, + max_retries: u16, + client: Client, + backoff_seconds: u64, +} + +impl PublicReadOnlyGoogleCloudStorage { + pub fn new( + gcs_base_url: String, + bucket_prefix: String, + max_retries: u16, + timeout: u64, + backoff_seconds: u64, + ) -> Self { + let client = Client::builder() + .timeout(Duration::from_secs(timeout)) + .build() + .expect("Unable to build PublicReadOnlyGoogleCloudStorage http client from config"); + Self { + gcs_base_url, + bucket_prefix, + max_retries, + client, + backoff_seconds, + } + } +} + +#[async_trait] +impl ObjectStore for PublicReadOnlyGoogleCloudStorage { + async fn get_raw(&self, bucket: Bucket, key: &str) -> Result, ObjectStoreError> { + let mut last_error: Option = None; + let retry_count = self.max_retries; + for retry_number in 0..retry_count { + let url = format!("{}/{key}", self.storage_prefix_raw(bucket)); + let response = self.client.get(&url).send().await; + + match response { + Ok(response) => { + let response_code = response.status(); + match response_code { + StatusCode::NOT_FOUND => { + let error_message = format!( + "missing key: {key} in bucket {bucket} (got 404 from {url})" + ); + return Err(ObjectStoreError::KeyNotFound(error_message.into())); + } + StatusCode::OK => {} + _ => { + let error_message = format!("unexpected error when fetching {key} from bucket {bucket}, received code {response_code}, url was {url}"); + last_error = Some(anyhow!(error_message)); + continue; + } + } + let bytes = response.bytes().await.map(|bytes| bytes.into()); + match bytes { + Ok(bytes) => return Ok(bytes), + Err(error) => { + last_error = Some(error.into()); + continue; + } + } + } + Err(error) => { + last_error = Some(error.into()); + } + } + + tracing::warn!( + "Failed to download {url} (attempt {}/{retry_count}). Backing off for {} seconds", + retry_number + 1, + self.backoff_seconds, + ); + tokio::time::sleep(Duration::from_secs(self.backoff_seconds)).await; + } + Err(ObjectStoreError::Other(BoxedError::from( + last_error.unwrap(), + ))) + } + + async fn put_raw( + &self, + _bucket: Bucket, + _key: &str, + _value: Vec, + ) -> Result<(), ObjectStoreError> { + unimplemented!("This store is read-only!") + } + + async fn remove_raw(&self, _bucket: Bucket, _key: &str) -> Result<(), ObjectStoreError> { + unimplemented!("This store is read-only!") + } + + fn storage_prefix_raw(&self, bucket: Bucket) -> String { + format!( + "{}/{}/{}", + self.gcs_base_url, + self.bucket_prefix.clone(), + bucket.as_str() + ) + } +} +#[cfg(test)] +mod tests { + use std::time::Duration; + + use crate::{ + gcs_public::PublicReadOnlyGoogleCloudStorage, Bucket, ObjectStore, ObjectStoreError, + }; + + #[tokio::test] + async fn start_server() { + let mut server = mockito::Server::new(); + + // Use one of these addresses to configure your client + let host = server.host_with_port(); + let url = server.url(); + + let some_bytes = [1, 5, 124, 51]; + server + .mock("GET", "/test-url/storage_logs_snapshots/some_key1") + .with_status(200) + .with_body(some_bytes) + .create(); + + server + .mock("GET", "/test-url/storage_logs_snapshots/some_key2") + .with_status(404) + .create(); + + server + .mock("GET", "/test-url/storage_logs_snapshots/some_key3") + .with_status(500) + .create(); + + let storage = PublicReadOnlyGoogleCloudStorage::new(url, "test-url".to_string(), 5, 1, 5); + + let bytes = storage + .get_raw(Bucket::StorageSnapshot, "some_key1") + .await + .unwrap(); + assert_eq!(bytes, some_bytes); + + let not_found = storage.get_raw(Bucket::StorageSnapshot, "some_key1").await; + assert!(matches!(not_found, ObjectStoreError::KeyNotFound { .. })); + + let server_error = storage + .get_raw(Bucket::StorageSnapshot, "some_key1") + .await + .unwrap(); + assert!(matches!(not_found, ObjectStoreError::Other { .. })); + } +} diff --git a/core/lib/object_store/src/gcs_unauthenticated.rs b/core/lib/object_store/src/gcs_unauthenticated.rs deleted file mode 100644 index 2403f3b3c45..00000000000 --- a/core/lib/object_store/src/gcs_unauthenticated.rs +++ /dev/null @@ -1,99 +0,0 @@ -use std::time::Duration; - -use async_trait::async_trait; -use http::StatusCode; -use reqwest::Error; - -use crate::{raw::BoxedError, Bucket, ObjectStore, ObjectStoreError}; - -#[derive(Debug)] -pub struct UnauthenticatedGoogleCloudStorage { - bucket_prefix: String, - max_retries: u16, -} - -impl UnauthenticatedGoogleCloudStorage { - pub fn new(bucket_prefix: String, max_retries: u16) -> UnauthenticatedGoogleCloudStorage { - Self { - bucket_prefix, - max_retries, - } - } -} - -#[async_trait] -impl ObjectStore for UnauthenticatedGoogleCloudStorage { - async fn get_raw(&self, bucket: Bucket, key: &str) -> Result, ObjectStoreError> { - const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(600); - let client = reqwest::Client::builder() - .timeout(DOWNLOAD_TIMEOUT) - .build() - .map_err(|error| ObjectStoreError::Other(error.into()))?; - - let mut last_error: Option = None; - let retry_count = self.max_retries; - for retry_number in 0..retry_count { - let url = format!("{}/{key}", self.storage_prefix_raw(bucket)); - let response = client.get(url.to_string()).send().await; - - match response { - Ok(response) => { - let response_code = response.status(); - match response_code { - StatusCode::NOT_FOUND => { - let error_message = format!( - "missing key: {key} in bucket {bucket} (got 404 from {url})" - ); - return Err(ObjectStoreError::KeyNotFound(error_message.into())); - } - StatusCode::OK => {} - _ => { - let error_message = format!("unexpected error when fetching {key} from bucket {bucket}, received code {response_code}"); - return Err(ObjectStoreError::Other(error_message.into())); - } - } - let bytes = response.bytes().await.map(|bytes| bytes.to_vec()); - match bytes { - Ok(bytes) => return Ok(bytes), - Err(error) => { - last_error = Some(error); - } - } - } - Err(error) => { - last_error = Some(error); - } - } - - tracing::warn!( - "Failed to download {url} (attempt {}/{retry_count}). Backing off for 5 second", - retry_number + 1 - ); - tokio::time::sleep(Duration::from_secs(5)).await; - } - Err(ObjectStoreError::Other(BoxedError::from( - last_error.unwrap(), - ))) - } - - async fn put_raw( - &self, - _bucket: Bucket, - _key: &str, - _value: Vec, - ) -> Result<(), ObjectStoreError> { - unimplemented!("This store is read-only!") - } - - async fn remove_raw(&self, _bucket: Bucket, _key: &str) -> Result<(), ObjectStoreError> { - unimplemented!("This store is read-only!") - } - - fn storage_prefix_raw(&self, bucket: Bucket) -> String { - format!( - "https://storage.googleapis.com/{}/{}", - self.bucket_prefix.clone(), - bucket.as_str() - ) - } -} diff --git a/core/lib/object_store/src/lib.rs b/core/lib/object_store/src/lib.rs index b6ae85ec39f..cdb748bf3db 100644 --- a/core/lib/object_store/src/lib.rs +++ b/core/lib/object_store/src/lib.rs @@ -25,7 +25,7 @@ mod file; mod gcs; -mod gcs_unauthenticated; +mod gcs_public; mod metrics; mod mock; mod objects; diff --git a/core/lib/object_store/src/raw.rs b/core/lib/object_store/src/raw.rs index 25f9171e215..775af6bd50d 100644 --- a/core/lib/object_store/src/raw.rs +++ b/core/lib/object_store/src/raw.rs @@ -5,7 +5,7 @@ use zksync_config::configs::object_store::{ObjectStoreConfig, ObjectStoreMode}; use crate::{ file::FileBackedObjectStore, gcs::GoogleCloudStorage, - gcs_unauthenticated::UnauthenticatedGoogleCloudStorage, mock::MockStore, + gcs_public::PublicReadOnlyGoogleCloudStorage, mock::MockStore, }; /// Bucket for [`ObjectStore`] in which objects can be placed. @@ -221,11 +221,14 @@ impl ObjectStoreFactory { let store = FileBackedObjectStore::new(config.file_backed_base_path.clone()).await; Arc::new(store) } - ObjectStoreMode::GCSUnauthenticated => { - tracing::trace!("Initialized GoogleCloudStorageUnauthenticated store"); - let store = UnauthenticatedGoogleCloudStorage::new( + ObjectStoreMode::GCSPublicReadOnly => { + tracing::trace!("Initialized GoogleCloudStoragePublicReadOnly store"); + let store = PublicReadOnlyGoogleCloudStorage::new( + "https://storage.googleapis.com".to_string(), config.bucket_base_url.clone(), config.max_retries, + 600, + 5, ); Arc::new(store) } From fa8365f0337c410f69af122f9e6f78fcc545b849 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Mon, 22 Jan 2024 20:57:05 +0100 Subject: [PATCH 06/10] fix --- core/lib/object_store/src/gcs_public.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/lib/object_store/src/gcs_public.rs b/core/lib/object_store/src/gcs_public.rs index 0dc57f026ba..f13806181da 100644 --- a/core/lib/object_store/src/gcs_public.rs +++ b/core/lib/object_store/src/gcs_public.rs @@ -153,13 +153,16 @@ mod tests { .unwrap(); assert_eq!(bytes, some_bytes); - let not_found = storage.get_raw(Bucket::StorageSnapshot, "some_key1").await; + let not_found = storage + .get_raw(Bucket::StorageSnapshot, "some_key2") + .await + .unwrap_err(); assert!(matches!(not_found, ObjectStoreError::KeyNotFound { .. })); let server_error = storage - .get_raw(Bucket::StorageSnapshot, "some_key1") + .get_raw(Bucket::StorageSnapshot, "some_key3") .await - .unwrap(); - assert!(matches!(not_found, ObjectStoreError::Other { .. })); + .unwrap_err(); + assert!(matches!(server_error, ObjectStoreError::Other { .. })); } } From 56125419658589ce27d66b8f43078084bbfb5d89 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Tue, 23 Jan 2024 13:04:17 +0100 Subject: [PATCH 07/10] fix --- core/lib/object_store/src/gcs_public.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/core/lib/object_store/src/gcs_public.rs b/core/lib/object_store/src/gcs_public.rs index f13806181da..37a3dc50f79 100644 --- a/core/lib/object_store/src/gcs_public.rs +++ b/core/lib/object_store/src/gcs_public.rs @@ -64,7 +64,7 @@ impl ObjectStore for PublicReadOnlyGoogleCloudStorage { continue; } } - let bytes = response.bytes().await.map(|bytes| bytes.into()); + let bytes = response.bytes().await.map(std::convert::Into::into); match bytes { Ok(bytes) => return Ok(bytes), Err(error) => { @@ -114,8 +114,6 @@ impl ObjectStore for PublicReadOnlyGoogleCloudStorage { } #[cfg(test)] mod tests { - use std::time::Duration; - use crate::{ gcs_public::PublicReadOnlyGoogleCloudStorage, Bucket, ObjectStore, ObjectStoreError, }; @@ -123,12 +121,9 @@ mod tests { #[tokio::test] async fn start_server() { let mut server = mockito::Server::new(); - - // Use one of these addresses to configure your client - let host = server.host_with_port(); let url = server.url(); - let some_bytes = [1, 5, 124, 51]; + server .mock("GET", "/test-url/storage_logs_snapshots/some_key1") .with_status(200) From b0163e742dea61c72909413160d004d71d59a7e6 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Tue, 23 Jan 2024 17:32:10 +0100 Subject: [PATCH 08/10] refactor to use anonymous auth mode of gcs library --- core/lib/config/src/configs/object_store.rs | 2 +- core/lib/object_store/src/gcs.rs | 10 +- core/lib/object_store/src/gcs_public.rs | 163 -------------------- core/lib/object_store/src/lib.rs | 1 - core/lib/object_store/src/raw.rs | 19 ++- 5 files changed, 18 insertions(+), 177 deletions(-) delete mode 100644 core/lib/object_store/src/gcs_public.rs diff --git a/core/lib/config/src/configs/object_store.rs b/core/lib/config/src/configs/object_store.rs index 9b5f40aa89b..4cf5553d639 100644 --- a/core/lib/config/src/configs/object_store.rs +++ b/core/lib/config/src/configs/object_store.rs @@ -5,7 +5,7 @@ pub enum ObjectStoreMode { GCS, GCSWithCredentialFile, FileBacked, - GCSPublicReadOnly, + GCSAnonymousReadOnly, } /// Configuration for the object store diff --git a/core/lib/object_store/src/gcs.rs b/core/lib/object_store/src/gcs.rs index 93ee39fdef2..23239fedbc6 100644 --- a/core/lib/object_store/src/gcs.rs +++ b/core/lib/object_store/src/gcs.rs @@ -68,9 +68,10 @@ impl GoogleCloudStorage { credential_file_path: Option, bucket_prefix: String, max_retries: u16, + is_anonymous: bool, ) -> Self { let client_config = retry(max_retries, || { - Self::get_client_config(credential_file_path.clone()) + Self::get_client_config(credential_file_path.clone(), is_anonymous) }) .await .expect("failed fetching GCS client config after retries"); @@ -84,6 +85,7 @@ impl GoogleCloudStorage { async fn get_client_config( credential_file_path: Option, + is_anonymous: bool, ) -> Result { if let Some(path) = credential_file_path { let cred_file = CredentialsFile::new_from_file(path) @@ -91,7 +93,11 @@ impl GoogleCloudStorage { .expect("failed loading GCS credential file"); ClientConfig::default().with_credentials(cred_file).await } else { - ClientConfig::default().with_auth().await + if !is_anonymous { + ClientConfig::default().with_auth().await + } else { + Ok(ClientConfig::default().anonymous()) + } } } diff --git a/core/lib/object_store/src/gcs_public.rs b/core/lib/object_store/src/gcs_public.rs deleted file mode 100644 index 37a3dc50f79..00000000000 --- a/core/lib/object_store/src/gcs_public.rs +++ /dev/null @@ -1,163 +0,0 @@ -use std::time::Duration; - -use anyhow::anyhow; -use async_trait::async_trait; -use http::StatusCode; -use reqwest::Client; - -use crate::{raw::BoxedError, Bucket, ObjectStore, ObjectStoreError}; - -#[derive(Debug)] -pub struct PublicReadOnlyGoogleCloudStorage { - gcs_base_url: String, - bucket_prefix: String, - max_retries: u16, - client: Client, - backoff_seconds: u64, -} - -impl PublicReadOnlyGoogleCloudStorage { - pub fn new( - gcs_base_url: String, - bucket_prefix: String, - max_retries: u16, - timeout: u64, - backoff_seconds: u64, - ) -> Self { - let client = Client::builder() - .timeout(Duration::from_secs(timeout)) - .build() - .expect("Unable to build PublicReadOnlyGoogleCloudStorage http client from config"); - Self { - gcs_base_url, - bucket_prefix, - max_retries, - client, - backoff_seconds, - } - } -} - -#[async_trait] -impl ObjectStore for PublicReadOnlyGoogleCloudStorage { - async fn get_raw(&self, bucket: Bucket, key: &str) -> Result, ObjectStoreError> { - let mut last_error: Option = None; - let retry_count = self.max_retries; - for retry_number in 0..retry_count { - let url = format!("{}/{key}", self.storage_prefix_raw(bucket)); - let response = self.client.get(&url).send().await; - - match response { - Ok(response) => { - let response_code = response.status(); - match response_code { - StatusCode::NOT_FOUND => { - let error_message = format!( - "missing key: {key} in bucket {bucket} (got 404 from {url})" - ); - return Err(ObjectStoreError::KeyNotFound(error_message.into())); - } - StatusCode::OK => {} - _ => { - let error_message = format!("unexpected error when fetching {key} from bucket {bucket}, received code {response_code}, url was {url}"); - last_error = Some(anyhow!(error_message)); - continue; - } - } - let bytes = response.bytes().await.map(std::convert::Into::into); - match bytes { - Ok(bytes) => return Ok(bytes), - Err(error) => { - last_error = Some(error.into()); - continue; - } - } - } - Err(error) => { - last_error = Some(error.into()); - } - } - - tracing::warn!( - "Failed to download {url} (attempt {}/{retry_count}). Backing off for {} seconds", - retry_number + 1, - self.backoff_seconds, - ); - tokio::time::sleep(Duration::from_secs(self.backoff_seconds)).await; - } - Err(ObjectStoreError::Other(BoxedError::from( - last_error.unwrap(), - ))) - } - - async fn put_raw( - &self, - _bucket: Bucket, - _key: &str, - _value: Vec, - ) -> Result<(), ObjectStoreError> { - unimplemented!("This store is read-only!") - } - - async fn remove_raw(&self, _bucket: Bucket, _key: &str) -> Result<(), ObjectStoreError> { - unimplemented!("This store is read-only!") - } - - fn storage_prefix_raw(&self, bucket: Bucket) -> String { - format!( - "{}/{}/{}", - self.gcs_base_url, - self.bucket_prefix.clone(), - bucket.as_str() - ) - } -} -#[cfg(test)] -mod tests { - use crate::{ - gcs_public::PublicReadOnlyGoogleCloudStorage, Bucket, ObjectStore, ObjectStoreError, - }; - - #[tokio::test] - async fn start_server() { - let mut server = mockito::Server::new(); - let url = server.url(); - let some_bytes = [1, 5, 124, 51]; - - server - .mock("GET", "/test-url/storage_logs_snapshots/some_key1") - .with_status(200) - .with_body(some_bytes) - .create(); - - server - .mock("GET", "/test-url/storage_logs_snapshots/some_key2") - .with_status(404) - .create(); - - server - .mock("GET", "/test-url/storage_logs_snapshots/some_key3") - .with_status(500) - .create(); - - let storage = PublicReadOnlyGoogleCloudStorage::new(url, "test-url".to_string(), 5, 1, 5); - - let bytes = storage - .get_raw(Bucket::StorageSnapshot, "some_key1") - .await - .unwrap(); - assert_eq!(bytes, some_bytes); - - let not_found = storage - .get_raw(Bucket::StorageSnapshot, "some_key2") - .await - .unwrap_err(); - assert!(matches!(not_found, ObjectStoreError::KeyNotFound { .. })); - - let server_error = storage - .get_raw(Bucket::StorageSnapshot, "some_key3") - .await - .unwrap_err(); - assert!(matches!(server_error, ObjectStoreError::Other { .. })); - } -} diff --git a/core/lib/object_store/src/lib.rs b/core/lib/object_store/src/lib.rs index cdb748bf3db..bf6630ef060 100644 --- a/core/lib/object_store/src/lib.rs +++ b/core/lib/object_store/src/lib.rs @@ -25,7 +25,6 @@ mod file; mod gcs; -mod gcs_public; mod metrics; mod mock; mod objects; diff --git a/core/lib/object_store/src/raw.rs b/core/lib/object_store/src/raw.rs index 775af6bd50d..0063b0fb951 100644 --- a/core/lib/object_store/src/raw.rs +++ b/core/lib/object_store/src/raw.rs @@ -3,10 +3,7 @@ use std::{error, fmt, sync::Arc}; use async_trait::async_trait; use zksync_config::configs::object_store::{ObjectStoreConfig, ObjectStoreMode}; -use crate::{ - file::FileBackedObjectStore, gcs::GoogleCloudStorage, - gcs_public::PublicReadOnlyGoogleCloudStorage, mock::MockStore, -}; +use crate::{file::FileBackedObjectStore, gcs::GoogleCloudStorage, mock::MockStore}; /// Bucket for [`ObjectStore`] in which objects can be placed. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -202,6 +199,7 @@ impl ObjectStoreFactory { gcs_credential_file_path, config.bucket_base_url.clone(), config.max_retries, + false, ) .await; Arc::new(store) @@ -212,6 +210,7 @@ impl ObjectStoreFactory { gcs_credential_file_path, config.bucket_base_url.clone(), config.max_retries, + false, ) .await; Arc::new(store) @@ -221,15 +220,15 @@ impl ObjectStoreFactory { let store = FileBackedObjectStore::new(config.file_backed_base_path.clone()).await; Arc::new(store) } - ObjectStoreMode::GCSPublicReadOnly => { + ObjectStoreMode::GCSAnonymousReadOnly => { tracing::trace!("Initialized GoogleCloudStoragePublicReadOnly store"); - let store = PublicReadOnlyGoogleCloudStorage::new( - "https://storage.googleapis.com".to_string(), + let store = GoogleCloudStorage::new( + None, config.bucket_base_url.clone(), config.max_retries, - 600, - 5, - ); + true, + ) + .await; Arc::new(store) } } From 58c237a1028bc5f23d41e412b9022c8971a618ff Mon Sep 17 00:00:00 2001 From: tomg10 Date: Tue, 23 Jan 2024 17:46:32 +0100 Subject: [PATCH 09/10] fix --- Cargo.lock | 41 -------------------------------- core/lib/object_store/Cargo.toml | 2 -- core/lib/object_store/src/gcs.rs | 39 +++++++++++++++--------------- core/lib/object_store/src/raw.rs | 18 ++++++++------ 4 files changed, 31 insertions(+), 69 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0e9322b97a5..6d76bd87ee4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -487,16 +487,6 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" -[[package]] -name = "assert-json-diff" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" -dependencies = [ - "serde", - "serde_json", -] - [[package]] name = "assert_matches" version = "1.5.0" @@ -1470,16 +1460,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" -[[package]] -name = "colored" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbf2150cce219b664a8a70df7a1f933836724b503f8a413af9365b4dcc4d90b8" -dependencies = [ - "lazy_static", - "windows-sys 0.48.0", -] - [[package]] name = "compile-fmt" version = "0.1.0" @@ -4219,25 +4199,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "mockito" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8d3038e23466858569c2d30a537f691fa0d53b51626630ae08262943e3bbb8b" -dependencies = [ - "assert-json-diff", - "colored", - "futures 0.3.28", - "hyper", - "log", - "rand 0.8.5", - "regex", - "serde_json", - "serde_urlencoded", - "similar", - "tokio", -] - [[package]] name = "multimap" version = "0.8.3" @@ -9297,9 +9258,7 @@ dependencies = [ "google-cloud-auth", "google-cloud-storage", "http", - "mockito", "prost", - "reqwest", "serde_json", "tempdir", "tokio", diff --git a/core/lib/object_store/Cargo.toml b/core/lib/object_store/Cargo.toml index b0e5aa1eebc..ec42f47c6bf 100644 --- a/core/lib/object_store/Cargo.toml +++ b/core/lib/object_store/Cargo.toml @@ -26,8 +26,6 @@ flate2 = "1.0.28" tokio = { version = "1.21.2", features = ["full"] } tracing = "0.1" prost = "0.12.1" -reqwest = { version = "0.11", features = ["blocking"] } [dev-dependencies] tempdir = "0.3.7" -mockito = "1.2.0" diff --git a/core/lib/object_store/src/gcs.rs b/core/lib/object_store/src/gcs.rs index 23239fedbc6..d2650a48ea5 100644 --- a/core/lib/object_store/src/gcs.rs +++ b/core/lib/object_store/src/gcs.rs @@ -63,18 +63,22 @@ impl fmt::Debug for GoogleCloudStorage { } } +#[derive(Debug, Clone)] +pub enum GoogleCloudStorageAuthMode { + AuthenticatedWithCredentialFile(String), + Authenticated, + Anonymous, +} + impl GoogleCloudStorage { pub async fn new( - credential_file_path: Option, + auth_mode: GoogleCloudStorageAuthMode, bucket_prefix: String, max_retries: u16, - is_anonymous: bool, ) -> Self { - let client_config = retry(max_retries, || { - Self::get_client_config(credential_file_path.clone(), is_anonymous) - }) - .await - .expect("failed fetching GCS client config after retries"); + let client_config = retry(max_retries, || Self::get_client_config(auth_mode.clone())) + .await + .expect("failed fetching GCS client config after retries"); Self { client: Client::new(client_config), @@ -84,20 +88,17 @@ impl GoogleCloudStorage { } async fn get_client_config( - credential_file_path: Option, - is_anonymous: bool, + auth_mode: GoogleCloudStorageAuthMode, ) -> Result { - if let Some(path) = credential_file_path { - let cred_file = CredentialsFile::new_from_file(path) - .await - .expect("failed loading GCS credential file"); - ClientConfig::default().with_credentials(cred_file).await - } else { - if !is_anonymous { - ClientConfig::default().with_auth().await - } else { - Ok(ClientConfig::default().anonymous()) + match auth_mode { + GoogleCloudStorageAuthMode::AuthenticatedWithCredentialFile(path) => { + let cred_file = CredentialsFile::new_from_file(path) + .await + .expect("failed loading GCS credential file"); + ClientConfig::default().with_credentials(cred_file).await } + GoogleCloudStorageAuthMode::Authenticated => ClientConfig::default().with_auth().await, + GoogleCloudStorageAuthMode::Anonymous => Ok(ClientConfig::default().anonymous()), } } diff --git a/core/lib/object_store/src/raw.rs b/core/lib/object_store/src/raw.rs index 0063b0fb951..61340343c73 100644 --- a/core/lib/object_store/src/raw.rs +++ b/core/lib/object_store/src/raw.rs @@ -3,7 +3,11 @@ use std::{error, fmt, sync::Arc}; use async_trait::async_trait; use zksync_config::configs::object_store::{ObjectStoreConfig, ObjectStoreMode}; -use crate::{file::FileBackedObjectStore, gcs::GoogleCloudStorage, mock::MockStore}; +use crate::{ + file::FileBackedObjectStore, + gcs::{GoogleCloudStorage, GoogleCloudStorageAuthMode}, + mock::MockStore, +}; /// Bucket for [`ObjectStore`] in which objects can be placed. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -196,10 +200,9 @@ impl ObjectStoreFactory { "Initialized GoogleCloudStorage Object store without credential file" ); let store = GoogleCloudStorage::new( - gcs_credential_file_path, + GoogleCloudStorageAuthMode::Authenticated, config.bucket_base_url.clone(), config.max_retries, - false, ) .await; Arc::new(store) @@ -207,10 +210,12 @@ impl ObjectStoreFactory { ObjectStoreMode::GCSWithCredentialFile => { tracing::trace!("Initialized GoogleCloudStorage Object store with credential file"); let store = GoogleCloudStorage::new( - gcs_credential_file_path, + GoogleCloudStorageAuthMode::AuthenticatedWithCredentialFile( + gcs_credential_file_path + .expect("Credentials path must be provided for GCSWithCredentialFile"), + ), config.bucket_base_url.clone(), config.max_retries, - false, ) .await; Arc::new(store) @@ -223,10 +228,9 @@ impl ObjectStoreFactory { ObjectStoreMode::GCSAnonymousReadOnly => { tracing::trace!("Initialized GoogleCloudStoragePublicReadOnly store"); let store = GoogleCloudStorage::new( - None, + GoogleCloudStorageAuthMode::Anonymous, config.bucket_base_url.clone(), config.max_retries, - true, ) .await; Arc::new(store) From 00efa5c8c229f23322e49d1d3579e41fd5ec6d54 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Tue, 23 Jan 2024 17:48:29 +0100 Subject: [PATCH 10/10] fix --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 6d76bd87ee4..6aae513b69a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8696,7 +8696,7 @@ dependencies = [ "crossbeam 0.8.2", "curl", "derivative", - "env_logger 0.10.0", + "env_logger 0.9.3", "hex", "lazy_static", "rand 0.4.6",