Skip to content

Commit

Permalink
fix: GCS folder support (#4652)
Browse files Browse the repository at this point in the history
### Description

This PR fixes the problem of this issue:
[https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/4449](url)
The change was only in the gcs_storage.rs file. Slightly tweaked methods
so that you can write files to a specified folder in the terminal.

When trying to run a validator specifying checkpointSyncer settings for
GCS:
`--checkpointSyncer.type gcs --checkpointSyncer.bucket
an-s3-signatures-repository --checkpointSyncer.folder atletaolympia`

```
Building provider without signer
thread 'main' panicked at 'Failed to report agent metadata: error validating bucket name an-s3-signatures-repository/atletaolympia

Caused by:
    Character '/' @ 27 is not allowed

Location:
    hyperlane-base/src/types/gcs_storage.rs:181:9', agents/validator/src/validator.rs:178:14
```

To solve this problem I added the `folder: Option<String>` field to the
GcsStorageClient structure, I also added bucket and folder processing to
the impl CheckpointSyncer, GcsStorageClientBuilder methods
  • Loading branch information
IvanPsurtcev authored Oct 31, 2024
1 parent f26453e commit 21a0cf9
Showing 1 changed file with 109 additions and 45 deletions.
154 changes: 109 additions & 45 deletions rust/main/hyperlane-base/src/types/gcs_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use derive_new::new;
use eyre::{bail, Result};
use hyperlane_core::{ReorgEvent, SignedAnnouncement, SignedCheckpointWithMessageId};
use std::fmt;
use tracing::{error, info, instrument};
use ya_gcp::{
storage::{
api::{error::HttpStatusError, http::StatusCode, Error},
Expand All @@ -16,6 +17,7 @@ const LATEST_INDEX_KEY: &str = "gcsLatestIndexKey";
const METADATA_KEY: &str = "gcsMetadataKey";
const ANNOUNCEMENT_KEY: &str = "gcsAnnouncementKey";
const REORG_FLAG_KEY: &str = "gcsReorgFlagKey";

/// Path to GCS users_secret file
pub const GCS_USER_SECRET: &str = "GCS_USER_SECRET";
/// Path to GCS Service account key
Expand Down Expand Up @@ -80,12 +82,14 @@ pub struct GcsStorageClient {
inner: StorageClient,
// bucket name of this client's storage
bucket: String,
// folder name of this client's storage
folder: Option<String>,
}

impl GcsStorageClientBuilder {
/// Instantiates `ya_gcp:StorageClient` based on provided auth method
/// # Param
/// * `baucket_name` - String name of target bucket to work with, will be used by all store and get ops
/// * `bucket_name` - String name of target bucket to work with, will be used by all store and get ops
pub async fn build(
self,
bucket_name: impl Into<String>,
Expand All @@ -94,21 +98,71 @@ impl GcsStorageClientBuilder {
let inner = ClientBuilder::new(ClientBuilderConfig::new().auth_flow(self.auth))
.await?
.build_storage_client();
let bucket = if let Some(folder) = folder {
format! {"{}/{}", bucket_name.into(), folder}
} else {
bucket_name.into()
};

Ok(GcsStorageClient { inner, bucket })
let bucket = bucket_name.into();
let mut processed_folder = folder;

if let Some(ref mut folder_str) = processed_folder {
if folder_str.ends_with('/') {
folder_str.truncate(folder_str.trim_end_matches('/').len());
info!(
"Trimmed trailing '/' from folder name. New folder: '{}'",
folder_str
);
}
}

GcsStorageClient::validate_bucket_name(&bucket)?;
Ok(GcsStorageClient {
inner,
bucket,
folder: processed_folder,
})
}
}

impl GcsStorageClient {
// convenience formatter
// Convenience formatter
fn get_checkpoint_key(index: u32) -> String {
format!("checkpoint_{index}_with_id.json")
}

fn object_path(&self, object_name: &str) -> String {
if let Some(folder) = &self.folder {
format!("{}/{}", folder, object_name)
} else {
object_name.to_string()
}
}

fn validate_bucket_name(bucket: &str) -> Result<()> {
if bucket.contains('/') {
error!("Bucket name '{}' has an invalid symbol '/'", bucket);
bail!("Bucket name '{}' has an invalid symbol '/'", bucket)
} else {
Ok(())
}
}

/// Uploads data to GCS and logs the result.
#[instrument(skip(self, data))]
async fn upload_and_log(&self, object_name: &str, data: Vec<u8>) -> Result<()> {
match self
.inner
.insert_object(&self.bucket, object_name, data)
.await
{
Ok(_) => {
info!("Successfully uploaded to '{}'", object_name);
Ok(())
}
Err(e) => {
error!("Failed to upload to '{}': {:?}", object_name, e);
Err(e.into())
}
}
}

// #test only method[s]
#[cfg(test)]
pub(crate) async fn get_by_path(&self, path: impl AsRef<str>) -> Result<()> {
Expand All @@ -117,18 +171,20 @@ impl GcsStorageClient {
}
}

// required by `CheckpointSyncer`
// Required by `CheckpointSyncer`
impl fmt::Debug for GcsStorageClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("S3Storage")
f.debug_struct("GcsStorageClient")
.field("bucket", &self.bucket)
.field("folder", &self.folder)
.finish()
}
}

#[async_trait]
impl CheckpointSyncer for GcsStorageClient {
/// Read the highest index of this Syncer
#[instrument(skip(self))]
async fn latest_index(&self) -> Result<Option<u32>> {
match self.inner.get_object(&self.bucket, LATEST_INDEX_KEY).await {
Ok(data) => Ok(Some(serde_json::from_slice(data.as_ref())?)),
Expand All @@ -144,15 +200,14 @@ impl CheckpointSyncer for GcsStorageClient {
}

/// Writes the highest index of this Syncer
#[instrument(skip(self, index))]
async fn write_latest_index(&self, index: u32) -> Result<()> {
let d = serde_json::to_vec(&index)?;
self.inner
.insert_object(&self.bucket, LATEST_INDEX_KEY, d)
.await?;
Ok(())
let data = serde_json::to_vec(&index)?;
self.upload_and_log(LATEST_INDEX_KEY, data).await
}

/// Update the latest index of this syncer if necessary
#[instrument(skip(self, index))]
async fn update_latest_index(&self, index: u32) -> Result<()> {
let curr = self.latest_index().await?.unwrap_or(0);
if index > curr {
Expand All @@ -162,6 +217,7 @@ impl CheckpointSyncer for GcsStorageClient {
}

/// Attempt to fetch the signed (checkpoint, messageId) tuple at this index
#[instrument(skip(self, index))]
async fn fetch_checkpoint(&self, index: u32) -> Result<Option<SignedCheckpointWithMessageId>> {
match self
.inner
Expand All @@ -179,56 +235,64 @@ impl CheckpointSyncer for GcsStorageClient {
}

/// Write the signed (checkpoint, messageId) tuple to this syncer
#[instrument(skip(self, signed_checkpoint))]
async fn write_checkpoint(
&self,
signed_checkpoint: &SignedCheckpointWithMessageId,
) -> Result<()> {
self.inner
.insert_object(
&self.bucket,
GcsStorageClient::get_checkpoint_key(signed_checkpoint.value.index),
serde_json::to_vec(signed_checkpoint)?,
)
.await?;
Ok(())
let object_name = Self::get_checkpoint_key(signed_checkpoint.value.index);
let data = serde_json::to_vec(signed_checkpoint)?;
self.upload_and_log(&object_name, data).await
}

/// Write the agent metadata to this syncer
#[instrument(skip(self, metadata))]
async fn write_metadata(&self, metadata: &AgentMetadata) -> Result<()> {
let serialized_metadata = serde_json::to_string_pretty(metadata)?;
self.inner
.insert_object(&self.bucket, METADATA_KEY, serialized_metadata)
.await?;
Ok(())
let object_name = self.object_path(METADATA_KEY);
let data = serde_json::to_string_pretty(metadata)?.into_bytes();
self.upload_and_log(&object_name, data).await
}

/// Write the signed announcement to this syncer
async fn write_announcement(&self, signed_announcement: &SignedAnnouncement) -> Result<()> {
self.inner
.insert_object(
&self.bucket,
ANNOUNCEMENT_KEY,
serde_json::to_string(signed_announcement)?,
)
.await?;
Ok(())
#[instrument(skip(self, announcement))]
async fn write_announcement(&self, announcement: &SignedAnnouncement) -> Result<()> {
let object_name = self.object_path(ANNOUNCEMENT_KEY);
let data = serde_json::to_string(announcement)?.into_bytes();
self.upload_and_log(&object_name, data).await
}

/// Return the announcement storage location for this syncer
#[instrument(skip(self))]
fn announcement_location(&self) -> String {
format!("gs://{}/{}", &self.bucket, ANNOUNCEMENT_KEY)
let location = format!(
"gs://{}/{}",
&self.bucket,
self.object_path(ANNOUNCEMENT_KEY)
);
info!("Announcement storage location: '{}'", location);
location
}

async fn write_reorg_status(&self, reorged_event: &ReorgEvent) -> Result<()> {
let serialized_metadata = serde_json::to_string_pretty(reorged_event)?;
self.inner
.insert_object(&self.bucket, REORG_FLAG_KEY, serialized_metadata)
.await?;
Ok(())
/// Write the reorg status to this syncer
#[instrument(skip(self, reorg_event))]
async fn write_reorg_status(&self, reorg_event: &ReorgEvent) -> Result<()> {
let object_name = REORG_FLAG_KEY;
let data = serde_json::to_string_pretty(reorg_event)?.into_bytes();
self.upload_and_log(object_name, data).await
}

/// Read the reorg status from this syncer
#[instrument(skip(self))]
async fn reorg_status(&self) -> Result<Option<ReorgEvent>> {
Ok(None)
match self.inner.get_object(&self.bucket, REORG_FLAG_KEY).await {
Ok(data) => Ok(Some(serde_json::from_slice(data.as_ref())?)),
Err(e) => match e {
ObjectError::Failure(Error::HttpStatus(HttpStatusError(StatusCode::NOT_FOUND))) => {
Ok(None)
}
_ => bail!(e),
},
}
}
}

Expand Down

0 comments on commit 21a0cf9

Please sign in to comment.