From 28543721232be64fe4aa163175a4648e68705b0d Mon Sep 17 00:00:00 2001 From: IvanPsurtcev Date: Mon, 7 Oct 2024 18:17:18 +0300 Subject: [PATCH 1/3] fix: fix folder problem --- .../hyperlane-base/src/types/gcs_storage.rs | 79 +++++++++++++------ 1 file changed, 57 insertions(+), 22 deletions(-) diff --git a/rust/main/hyperlane-base/src/types/gcs_storage.rs b/rust/main/hyperlane-base/src/types/gcs_storage.rs index ba40ec2ca9..33055b657b 100644 --- a/rust/main/hyperlane-base/src/types/gcs_storage.rs +++ b/rust/main/hyperlane-base/src/types/gcs_storage.rs @@ -11,6 +11,7 @@ use ya_gcp::{ }, AuthFlow, ClientBuilder, ClientBuilderConfig, }; +use tracing::{info, error}; const LATEST_INDEX_KEY: &str = "gcsLatestIndexKey"; const METADATA_KEY: &str = "gcsMetadataKey"; @@ -80,6 +81,8 @@ pub struct GcsStorageClient { inner: StorageClient, // bucket name of this client's storage bucket: String, + // folder name of this client's storage + folder: Option, } impl GcsStorageClientBuilder { @@ -94,13 +97,12 @@ impl GcsStorageClientBuilder { let inner = ClientBuilder::new(ClientBuilderConfig::new().auth_flow(self.auth)) .await? .build_storage_client(); - let bucket = if let Some(folder) = folder { - format! {"{}/{}", bucket_name.into(), folder} - } else { - bucket_name.into() - }; - Ok(GcsStorageClient { inner, bucket }) + let bucket = bucket_name.into(); + let folder = folder; + + GcsStorageClient::validate_bucket_name(&bucket)?; + Ok(GcsStorageClient { inner, bucket, folder }) } } @@ -109,6 +111,24 @@ impl GcsStorageClient { fn get_checkpoint_key(index: u32) -> String { format!("checkpoint_{index}_with_id.json") } + + fn object_path(&self, object_name: &str) -> String { + if let Some(folder) = &self.folder { + format!("{}/{}", folder.trim_end_matches('/'), object_name) + } else { + object_name.to_string() + } + } + + fn validate_bucket_name(bucket: &str) -> Result<()> { + if bucket.contains('/') { + error!("Bucket name '{}' has an invalid symbol '/'", bucket); + bail!("Bucket name '{}' has an invalid symbol '/'", bucket); + } else { + Ok(()) + } + } + // #test only method[s] #[cfg(test)] pub(crate) async fn get_by_path(&self, path: impl AsRef) -> Result<()> { @@ -117,11 +137,12 @@ impl GcsStorageClient { } } -// required by `CheckpointSyncer` +// Required by `CheckpointSyncer` impl fmt::Debug for GcsStorageClient { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("S3Storage") + f.debug_struct("GcsStorageClient") .field("bucket", &self.bucket) + .field("folder", &self.folder) .finish() } } @@ -195,28 +216,42 @@ impl CheckpointSyncer for GcsStorageClient { /// Write the agent metadata to this syncer async fn write_metadata(&self, metadata: &AgentMetadata) -> Result<()> { + let object_name = self.object_path(METADATA_KEY); let serialized_metadata = serde_json::to_string_pretty(metadata)?; - self.inner - .insert_object(&self.bucket, METADATA_KEY, serialized_metadata) - .await?; - Ok(()) + + match self.inner.insert_object(&self.bucket, &object_name, serialized_metadata.into_bytes()).await { + Ok(_) => { + info!("Successfully uploaded metadata to '{}'", object_name); + Ok(()) + } + Err(e) => { + error!("Failed to upload metadata to '{}': {:?}", object_name, e); + Err(e.into()) + } + } } /// Write the signed announcement to this syncer - async fn write_announcement(&self, signed_announcement: &SignedAnnouncement) -> Result<()> { - self.inner - .insert_object( - &self.bucket, - ANNOUNCEMENT_KEY, - serde_json::to_string(signed_announcement)?, - ) - .await?; - Ok(()) + async fn write_announcement(&self, announcement: &SignedAnnouncement) -> Result<()> { + let object_name = self.object_path("announcement.json"); + let data = serde_json::to_vec(announcement)?; + + match self.inner.insert_object(&self.bucket, &object_name, data).await { + Ok(_) => { + info!("Successfully uploaded announcement to '{}'", object_name); + Ok(()) + } + Err(e) => { + error!("Failed to upload announcement to '{}': {:?}", object_name, e); + Err(e.into()) + } + } } /// Return the announcement storage location for this syncer fn announcement_location(&self) -> String { - format!("gs://{}/{}", &self.bucket, ANNOUNCEMENT_KEY) + let location = format!("gs://{}/{}", &self.bucket, self.object_path(ANNOUNCEMENT_KEY)); + location } async fn write_reorg_status(&self, reorged_event: &ReorgEvent) -> Result<()> { From 007831d353f728fc9a117d1d55202a6a7f3ea145 Mon Sep 17 00:00:00 2001 From: IvanPsurtcev Date: Tue, 8 Oct 2024 10:11:20 +0300 Subject: [PATCH 2/3] fix: fix ANNOUNCEMENT_KEY --- rust/main/hyperlane-base/src/types/gcs_storage.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/main/hyperlane-base/src/types/gcs_storage.rs b/rust/main/hyperlane-base/src/types/gcs_storage.rs index 33055b657b..48e3c8437c 100644 --- a/rust/main/hyperlane-base/src/types/gcs_storage.rs +++ b/rust/main/hyperlane-base/src/types/gcs_storage.rs @@ -233,8 +233,8 @@ impl CheckpointSyncer for GcsStorageClient { /// Write the signed announcement to this syncer async fn write_announcement(&self, announcement: &SignedAnnouncement) -> Result<()> { - let object_name = self.object_path("announcement.json"); - let data = serde_json::to_vec(announcement)?; + let object_name = self.object_path(ANNOUNCEMENT_KEY); + let data = serde_json::to_string(announcement)?; match self.inner.insert_object(&self.bucket, &object_name, data).await { Ok(_) => { From e543e266e35f9a8de45bc6ffafbdd1efb251d1f0 Mon Sep 17 00:00:00 2001 From: IvanPsurtcev Date: Wed, 30 Oct 2024 17:08:29 +0300 Subject: [PATCH 3/3] fix: corrected defects mentioned in comments to PR, optimized code, added upload_and_log function --- .../hyperlane-base/src/types/gcs_storage.rs | 133 +++++++++++------- 1 file changed, 81 insertions(+), 52 deletions(-) diff --git a/rust/main/hyperlane-base/src/types/gcs_storage.rs b/rust/main/hyperlane-base/src/types/gcs_storage.rs index 48e3c8437c..a413edb323 100644 --- a/rust/main/hyperlane-base/src/types/gcs_storage.rs +++ b/rust/main/hyperlane-base/src/types/gcs_storage.rs @@ -4,6 +4,7 @@ use derive_new::new; use eyre::{bail, Result}; use hyperlane_core::{ReorgEvent, SignedAnnouncement, SignedCheckpointWithMessageId}; use std::fmt; +use tracing::{error, info, instrument}; use ya_gcp::{ storage::{ api::{error::HttpStatusError, http::StatusCode, Error}, @@ -11,12 +12,12 @@ use ya_gcp::{ }, AuthFlow, ClientBuilder, ClientBuilderConfig, }; -use tracing::{info, error}; const LATEST_INDEX_KEY: &str = "gcsLatestIndexKey"; const METADATA_KEY: &str = "gcsMetadataKey"; const ANNOUNCEMENT_KEY: &str = "gcsAnnouncementKey"; const REORG_FLAG_KEY: &str = "gcsReorgFlagKey"; + /// Path to GCS users_secret file pub const GCS_USER_SECRET: &str = "GCS_USER_SECRET"; /// Path to GCS Service account key @@ -88,7 +89,7 @@ pub struct GcsStorageClient { impl GcsStorageClientBuilder { /// Instantiates `ya_gcp:StorageClient` based on provided auth method /// # Param - /// * `baucket_name` - String name of target bucket to work with, will be used by all store and get ops + /// * `bucket_name` - String name of target bucket to work with, will be used by all store and get ops pub async fn build( self, bucket_name: impl Into, @@ -99,22 +100,36 @@ impl GcsStorageClientBuilder { .build_storage_client(); let bucket = bucket_name.into(); - let folder = folder; + let mut processed_folder = folder; + + if let Some(ref mut folder_str) = processed_folder { + if folder_str.ends_with('/') { + folder_str.truncate(folder_str.trim_end_matches('/').len()); + info!( + "Trimmed trailing '/' from folder name. New folder: '{}'", + folder_str + ); + } + } GcsStorageClient::validate_bucket_name(&bucket)?; - Ok(GcsStorageClient { inner, bucket, folder }) + Ok(GcsStorageClient { + inner, + bucket, + folder: processed_folder, + }) } } impl GcsStorageClient { - // convenience formatter + // Convenience formatter fn get_checkpoint_key(index: u32) -> String { format!("checkpoint_{index}_with_id.json") } fn object_path(&self, object_name: &str) -> String { if let Some(folder) = &self.folder { - format!("{}/{}", folder.trim_end_matches('/'), object_name) + format!("{}/{}", folder, object_name) } else { object_name.to_string() } @@ -123,12 +138,31 @@ impl GcsStorageClient { fn validate_bucket_name(bucket: &str) -> Result<()> { if bucket.contains('/') { error!("Bucket name '{}' has an invalid symbol '/'", bucket); - bail!("Bucket name '{}' has an invalid symbol '/'", bucket); + bail!("Bucket name '{}' has an invalid symbol '/'", bucket) } else { Ok(()) } } + /// Uploads data to GCS and logs the result. + #[instrument(skip(self, data))] + async fn upload_and_log(&self, object_name: &str, data: Vec) -> Result<()> { + match self + .inner + .insert_object(&self.bucket, object_name, data) + .await + { + Ok(_) => { + info!("Successfully uploaded to '{}'", object_name); + Ok(()) + } + Err(e) => { + error!("Failed to upload to '{}': {:?}", object_name, e); + Err(e.into()) + } + } + } + // #test only method[s] #[cfg(test)] pub(crate) async fn get_by_path(&self, path: impl AsRef) -> Result<()> { @@ -150,6 +184,7 @@ impl fmt::Debug for GcsStorageClient { #[async_trait] impl CheckpointSyncer for GcsStorageClient { /// Read the highest index of this Syncer + #[instrument(skip(self))] async fn latest_index(&self) -> Result> { match self.inner.get_object(&self.bucket, LATEST_INDEX_KEY).await { Ok(data) => Ok(Some(serde_json::from_slice(data.as_ref())?)), @@ -165,15 +200,14 @@ impl CheckpointSyncer for GcsStorageClient { } /// Writes the highest index of this Syncer + #[instrument(skip(self, index))] async fn write_latest_index(&self, index: u32) -> Result<()> { - let d = serde_json::to_vec(&index)?; - self.inner - .insert_object(&self.bucket, LATEST_INDEX_KEY, d) - .await?; - Ok(()) + let data = serde_json::to_vec(&index)?; + self.upload_and_log(LATEST_INDEX_KEY, data).await } /// Update the latest index of this syncer if necessary + #[instrument(skip(self, index))] async fn update_latest_index(&self, index: u32) -> Result<()> { let curr = self.latest_index().await?.unwrap_or(0); if index > curr { @@ -183,6 +217,7 @@ impl CheckpointSyncer for GcsStorageClient { } /// Attempt to fetch the signed (checkpoint, messageId) tuple at this index + #[instrument(skip(self, index))] async fn fetch_checkpoint(&self, index: u32) -> Result> { match self .inner @@ -200,70 +235,64 @@ impl CheckpointSyncer for GcsStorageClient { } /// Write the signed (checkpoint, messageId) tuple to this syncer + #[instrument(skip(self, signed_checkpoint))] async fn write_checkpoint( &self, signed_checkpoint: &SignedCheckpointWithMessageId, ) -> Result<()> { - self.inner - .insert_object( - &self.bucket, - GcsStorageClient::get_checkpoint_key(signed_checkpoint.value.index), - serde_json::to_vec(signed_checkpoint)?, - ) - .await?; - Ok(()) + let object_name = Self::get_checkpoint_key(signed_checkpoint.value.index); + let data = serde_json::to_vec(signed_checkpoint)?; + self.upload_and_log(&object_name, data).await } /// Write the agent metadata to this syncer + #[instrument(skip(self, metadata))] async fn write_metadata(&self, metadata: &AgentMetadata) -> Result<()> { let object_name = self.object_path(METADATA_KEY); - let serialized_metadata = serde_json::to_string_pretty(metadata)?; - - match self.inner.insert_object(&self.bucket, &object_name, serialized_metadata.into_bytes()).await { - Ok(_) => { - info!("Successfully uploaded metadata to '{}'", object_name); - Ok(()) - } - Err(e) => { - error!("Failed to upload metadata to '{}': {:?}", object_name, e); - Err(e.into()) - } - } + let data = serde_json::to_string_pretty(metadata)?.into_bytes(); + self.upload_and_log(&object_name, data).await } /// Write the signed announcement to this syncer + #[instrument(skip(self, announcement))] async fn write_announcement(&self, announcement: &SignedAnnouncement) -> Result<()> { let object_name = self.object_path(ANNOUNCEMENT_KEY); - let data = serde_json::to_string(announcement)?; - - match self.inner.insert_object(&self.bucket, &object_name, data).await { - Ok(_) => { - info!("Successfully uploaded announcement to '{}'", object_name); - Ok(()) - } - Err(e) => { - error!("Failed to upload announcement to '{}': {:?}", object_name, e); - Err(e.into()) - } - } + let data = serde_json::to_string(announcement)?.into_bytes(); + self.upload_and_log(&object_name, data).await } /// Return the announcement storage location for this syncer + #[instrument(skip(self))] fn announcement_location(&self) -> String { - let location = format!("gs://{}/{}", &self.bucket, self.object_path(ANNOUNCEMENT_KEY)); + let location = format!( + "gs://{}/{}", + &self.bucket, + self.object_path(ANNOUNCEMENT_KEY) + ); + info!("Announcement storage location: '{}'", location); location } - async fn write_reorg_status(&self, reorged_event: &ReorgEvent) -> Result<()> { - let serialized_metadata = serde_json::to_string_pretty(reorged_event)?; - self.inner - .insert_object(&self.bucket, REORG_FLAG_KEY, serialized_metadata) - .await?; - Ok(()) + /// Write the reorg status to this syncer + #[instrument(skip(self, reorg_event))] + async fn write_reorg_status(&self, reorg_event: &ReorgEvent) -> Result<()> { + let object_name = REORG_FLAG_KEY; + let data = serde_json::to_string_pretty(reorg_event)?.into_bytes(); + self.upload_and_log(object_name, data).await } + /// Read the reorg status from this syncer + #[instrument(skip(self))] async fn reorg_status(&self) -> Result> { - Ok(None) + match self.inner.get_object(&self.bucket, REORG_FLAG_KEY).await { + Ok(data) => Ok(Some(serde_json::from_slice(data.as_ref())?)), + Err(e) => match e { + ObjectError::Failure(Error::HttpStatus(HttpStatusError(StatusCode::NOT_FOUND))) => { + Ok(None) + } + _ => bail!(e), + }, + } } }