From 21a0cf964fe34000443a427c2db92caceb969d32 Mon Sep 17 00:00:00 2001 From: IvanPsurtcev <87025698+IvanPsurtcev@users.noreply.github.com> Date: Thu, 31 Oct 2024 15:26:39 +0300 Subject: [PATCH] fix: GCS folder support (#4652) ### Description This PR fixes the problem of this issue: [https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/4449](url) The change was only in the gcs_storage.rs file. Slightly tweaked methods so that you can write files to a specified folder in the terminal. When trying to run a validator specifying checkpointSyncer settings for GCS: `--checkpointSyncer.type gcs --checkpointSyncer.bucket an-s3-signatures-repository --checkpointSyncer.folder atletaolympia` ``` Building provider without signer thread 'main' panicked at 'Failed to report agent metadata: error validating bucket name an-s3-signatures-repository/atletaolympia Caused by: Character '/' @ 27 is not allowed Location: hyperlane-base/src/types/gcs_storage.rs:181:9', agents/validator/src/validator.rs:178:14 ``` To solve this problem I added the `folder: Option` field to the GcsStorageClient structure, I also added bucket and folder processing to the impl CheckpointSyncer, GcsStorageClientBuilder methods --- .../hyperlane-base/src/types/gcs_storage.rs | 154 +++++++++++++----- 1 file changed, 109 insertions(+), 45 deletions(-) diff --git a/rust/main/hyperlane-base/src/types/gcs_storage.rs b/rust/main/hyperlane-base/src/types/gcs_storage.rs index ba40ec2ca9..a413edb323 100644 --- a/rust/main/hyperlane-base/src/types/gcs_storage.rs +++ b/rust/main/hyperlane-base/src/types/gcs_storage.rs @@ -4,6 +4,7 @@ use derive_new::new; use eyre::{bail, Result}; use hyperlane_core::{ReorgEvent, SignedAnnouncement, SignedCheckpointWithMessageId}; use std::fmt; +use tracing::{error, info, instrument}; use ya_gcp::{ storage::{ api::{error::HttpStatusError, http::StatusCode, Error}, @@ -16,6 +17,7 @@ const LATEST_INDEX_KEY: &str = "gcsLatestIndexKey"; const METADATA_KEY: &str = "gcsMetadataKey"; const ANNOUNCEMENT_KEY: &str = "gcsAnnouncementKey"; const REORG_FLAG_KEY: &str = "gcsReorgFlagKey"; + /// Path to GCS users_secret file pub const GCS_USER_SECRET: &str = "GCS_USER_SECRET"; /// Path to GCS Service account key @@ -80,12 +82,14 @@ pub struct GcsStorageClient { inner: StorageClient, // bucket name of this client's storage bucket: String, + // folder name of this client's storage + folder: Option, } impl GcsStorageClientBuilder { /// Instantiates `ya_gcp:StorageClient` based on provided auth method /// # Param - /// * `baucket_name` - String name of target bucket to work with, will be used by all store and get ops + /// * `bucket_name` - String name of target bucket to work with, will be used by all store and get ops pub async fn build( self, bucket_name: impl Into, @@ -94,21 +98,71 @@ impl GcsStorageClientBuilder { let inner = ClientBuilder::new(ClientBuilderConfig::new().auth_flow(self.auth)) .await? .build_storage_client(); - let bucket = if let Some(folder) = folder { - format! {"{}/{}", bucket_name.into(), folder} - } else { - bucket_name.into() - }; - Ok(GcsStorageClient { inner, bucket }) + let bucket = bucket_name.into(); + let mut processed_folder = folder; + + if let Some(ref mut folder_str) = processed_folder { + if folder_str.ends_with('/') { + folder_str.truncate(folder_str.trim_end_matches('/').len()); + info!( + "Trimmed trailing '/' from folder name. New folder: '{}'", + folder_str + ); + } + } + + GcsStorageClient::validate_bucket_name(&bucket)?; + Ok(GcsStorageClient { + inner, + bucket, + folder: processed_folder, + }) } } impl GcsStorageClient { - // convenience formatter + // Convenience formatter fn get_checkpoint_key(index: u32) -> String { format!("checkpoint_{index}_with_id.json") } + + fn object_path(&self, object_name: &str) -> String { + if let Some(folder) = &self.folder { + format!("{}/{}", folder, object_name) + } else { + object_name.to_string() + } + } + + fn validate_bucket_name(bucket: &str) -> Result<()> { + if bucket.contains('/') { + error!("Bucket name '{}' has an invalid symbol '/'", bucket); + bail!("Bucket name '{}' has an invalid symbol '/'", bucket) + } else { + Ok(()) + } + } + + /// Uploads data to GCS and logs the result. + #[instrument(skip(self, data))] + async fn upload_and_log(&self, object_name: &str, data: Vec) -> Result<()> { + match self + .inner + .insert_object(&self.bucket, object_name, data) + .await + { + Ok(_) => { + info!("Successfully uploaded to '{}'", object_name); + Ok(()) + } + Err(e) => { + error!("Failed to upload to '{}': {:?}", object_name, e); + Err(e.into()) + } + } + } + // #test only method[s] #[cfg(test)] pub(crate) async fn get_by_path(&self, path: impl AsRef) -> Result<()> { @@ -117,11 +171,12 @@ impl GcsStorageClient { } } -// required by `CheckpointSyncer` +// Required by `CheckpointSyncer` impl fmt::Debug for GcsStorageClient { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("S3Storage") + f.debug_struct("GcsStorageClient") .field("bucket", &self.bucket) + .field("folder", &self.folder) .finish() } } @@ -129,6 +184,7 @@ impl fmt::Debug for GcsStorageClient { #[async_trait] impl CheckpointSyncer for GcsStorageClient { /// Read the highest index of this Syncer + #[instrument(skip(self))] async fn latest_index(&self) -> Result> { match self.inner.get_object(&self.bucket, LATEST_INDEX_KEY).await { Ok(data) => Ok(Some(serde_json::from_slice(data.as_ref())?)), @@ -144,15 +200,14 @@ impl CheckpointSyncer for GcsStorageClient { } /// Writes the highest index of this Syncer + #[instrument(skip(self, index))] async fn write_latest_index(&self, index: u32) -> Result<()> { - let d = serde_json::to_vec(&index)?; - self.inner - .insert_object(&self.bucket, LATEST_INDEX_KEY, d) - .await?; - Ok(()) + let data = serde_json::to_vec(&index)?; + self.upload_and_log(LATEST_INDEX_KEY, data).await } /// Update the latest index of this syncer if necessary + #[instrument(skip(self, index))] async fn update_latest_index(&self, index: u32) -> Result<()> { let curr = self.latest_index().await?.unwrap_or(0); if index > curr { @@ -162,6 +217,7 @@ impl CheckpointSyncer for GcsStorageClient { } /// Attempt to fetch the signed (checkpoint, messageId) tuple at this index + #[instrument(skip(self, index))] async fn fetch_checkpoint(&self, index: u32) -> Result> { match self .inner @@ -179,56 +235,64 @@ impl CheckpointSyncer for GcsStorageClient { } /// Write the signed (checkpoint, messageId) tuple to this syncer + #[instrument(skip(self, signed_checkpoint))] async fn write_checkpoint( &self, signed_checkpoint: &SignedCheckpointWithMessageId, ) -> Result<()> { - self.inner - .insert_object( - &self.bucket, - GcsStorageClient::get_checkpoint_key(signed_checkpoint.value.index), - serde_json::to_vec(signed_checkpoint)?, - ) - .await?; - Ok(()) + let object_name = Self::get_checkpoint_key(signed_checkpoint.value.index); + let data = serde_json::to_vec(signed_checkpoint)?; + self.upload_and_log(&object_name, data).await } /// Write the agent metadata to this syncer + #[instrument(skip(self, metadata))] async fn write_metadata(&self, metadata: &AgentMetadata) -> Result<()> { - let serialized_metadata = serde_json::to_string_pretty(metadata)?; - self.inner - .insert_object(&self.bucket, METADATA_KEY, serialized_metadata) - .await?; - Ok(()) + let object_name = self.object_path(METADATA_KEY); + let data = serde_json::to_string_pretty(metadata)?.into_bytes(); + self.upload_and_log(&object_name, data).await } /// Write the signed announcement to this syncer - async fn write_announcement(&self, signed_announcement: &SignedAnnouncement) -> Result<()> { - self.inner - .insert_object( - &self.bucket, - ANNOUNCEMENT_KEY, - serde_json::to_string(signed_announcement)?, - ) - .await?; - Ok(()) + #[instrument(skip(self, announcement))] + async fn write_announcement(&self, announcement: &SignedAnnouncement) -> Result<()> { + let object_name = self.object_path(ANNOUNCEMENT_KEY); + let data = serde_json::to_string(announcement)?.into_bytes(); + self.upload_and_log(&object_name, data).await } /// Return the announcement storage location for this syncer + #[instrument(skip(self))] fn announcement_location(&self) -> String { - format!("gs://{}/{}", &self.bucket, ANNOUNCEMENT_KEY) + let location = format!( + "gs://{}/{}", + &self.bucket, + self.object_path(ANNOUNCEMENT_KEY) + ); + info!("Announcement storage location: '{}'", location); + location } - async fn write_reorg_status(&self, reorged_event: &ReorgEvent) -> Result<()> { - let serialized_metadata = serde_json::to_string_pretty(reorged_event)?; - self.inner - .insert_object(&self.bucket, REORG_FLAG_KEY, serialized_metadata) - .await?; - Ok(()) + /// Write the reorg status to this syncer + #[instrument(skip(self, reorg_event))] + async fn write_reorg_status(&self, reorg_event: &ReorgEvent) -> Result<()> { + let object_name = REORG_FLAG_KEY; + let data = serde_json::to_string_pretty(reorg_event)?.into_bytes(); + self.upload_and_log(object_name, data).await } + /// Read the reorg status from this syncer + #[instrument(skip(self))] async fn reorg_status(&self) -> Result> { - Ok(None) + match self.inner.get_object(&self.bucket, REORG_FLAG_KEY).await { + Ok(data) => Ok(Some(serde_json::from_slice(data.as_ref())?)), + Err(e) => match e { + ObjectError::Failure(Error::HttpStatus(HttpStatusError(StatusCode::NOT_FOUND))) => { + Ok(None) + } + _ => bail!(e), + }, + } } }