Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: GCS folder support #4652

Merged
merged 3 commits into from
Oct 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 109 additions & 45 deletions rust/main/hyperlane-base/src/types/gcs_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use derive_new::new;
use eyre::{bail, Result};
use hyperlane_core::{ReorgEvent, SignedAnnouncement, SignedCheckpointWithMessageId};
use std::fmt;
use tracing::{error, info, instrument};
use ya_gcp::{
storage::{
api::{error::HttpStatusError, http::StatusCode, Error},
Expand All @@ -16,6 +17,7 @@ const LATEST_INDEX_KEY: &str = "gcsLatestIndexKey";
const METADATA_KEY: &str = "gcsMetadataKey";
const ANNOUNCEMENT_KEY: &str = "gcsAnnouncementKey";
const REORG_FLAG_KEY: &str = "gcsReorgFlagKey";

/// Path to GCS users_secret file
pub const GCS_USER_SECRET: &str = "GCS_USER_SECRET";
/// Path to GCS Service account key
Expand Down Expand Up @@ -80,12 +82,14 @@ pub struct GcsStorageClient {
inner: StorageClient,
// bucket name of this client's storage
bucket: String,
// folder name of this client's storage
folder: Option<String>,
}

impl GcsStorageClientBuilder {
/// Instantiates `ya_gcp:StorageClient` based on provided auth method
/// # Param
/// * `baucket_name` - String name of target bucket to work with, will be used by all store and get ops
/// * `bucket_name` - String name of target bucket to work with, will be used by all store and get ops
pub async fn build(
self,
bucket_name: impl Into<String>,
Expand All @@ -94,21 +98,71 @@ impl GcsStorageClientBuilder {
let inner = ClientBuilder::new(ClientBuilderConfig::new().auth_flow(self.auth))
.await?
.build_storage_client();
let bucket = if let Some(folder) = folder {
format! {"{}/{}", bucket_name.into(), folder}
} else {
bucket_name.into()
};

Ok(GcsStorageClient { inner, bucket })
let bucket = bucket_name.into();
let mut processed_folder = folder;

if let Some(ref mut folder_str) = processed_folder {
if folder_str.ends_with('/') {
folder_str.truncate(folder_str.trim_end_matches('/').len());
info!(
"Trimmed trailing '/' from folder name. New folder: '{}'",
folder_str
);
}
}

GcsStorageClient::validate_bucket_name(&bucket)?;
Ok(GcsStorageClient {
inner,
bucket,
folder: processed_folder,
})
}
}

impl GcsStorageClient {
// convenience formatter
// Convenience formatter
fn get_checkpoint_key(index: u32) -> String {
format!("checkpoint_{index}_with_id.json")
}

fn object_path(&self, object_name: &str) -> String {
if let Some(folder) = &self.folder {
format!("{}/{}", folder, object_name)
} else {
object_name.to_string()
}
}

fn validate_bucket_name(bucket: &str) -> Result<()> {
if bucket.contains('/') {
error!("Bucket name '{}' has an invalid symbol '/'", bucket);
bail!("Bucket name '{}' has an invalid symbol '/'", bucket)
} else {
Ok(())
}
}

/// Uploads data to GCS and logs the result.
#[instrument(skip(self, data))]
async fn upload_and_log(&self, object_name: &str, data: Vec<u8>) -> Result<()> {
match self
.inner
.insert_object(&self.bucket, object_name, data)
.await
{
Ok(_) => {
info!("Successfully uploaded to '{}'", object_name);
Ok(())
}
Err(e) => {
error!("Failed to upload to '{}': {:?}", object_name, e);
Err(e.into())
}
}
}

// #test only method[s]
#[cfg(test)]
pub(crate) async fn get_by_path(&self, path: impl AsRef<str>) -> Result<()> {
Expand All @@ -117,18 +171,20 @@ impl GcsStorageClient {
}
}

// required by `CheckpointSyncer`
// Required by `CheckpointSyncer`
impl fmt::Debug for GcsStorageClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("S3Storage")
f.debug_struct("GcsStorageClient")
daniel-savu marked this conversation as resolved.
Show resolved Hide resolved
.field("bucket", &self.bucket)
.field("folder", &self.folder)
.finish()
}
}

#[async_trait]
impl CheckpointSyncer for GcsStorageClient {
/// Read the highest index of this Syncer
#[instrument(skip(self))]
async fn latest_index(&self) -> Result<Option<u32>> {
match self.inner.get_object(&self.bucket, LATEST_INDEX_KEY).await {
Ok(data) => Ok(Some(serde_json::from_slice(data.as_ref())?)),
Expand All @@ -144,15 +200,14 @@ impl CheckpointSyncer for GcsStorageClient {
}

/// Writes the highest index of this Syncer
#[instrument(skip(self, index))]
async fn write_latest_index(&self, index: u32) -> Result<()> {
let d = serde_json::to_vec(&index)?;
self.inner
.insert_object(&self.bucket, LATEST_INDEX_KEY, d)
.await?;
Ok(())
let data = serde_json::to_vec(&index)?;
self.upload_and_log(LATEST_INDEX_KEY, data).await
}

/// Update the latest index of this syncer if necessary
#[instrument(skip(self, index))]
async fn update_latest_index(&self, index: u32) -> Result<()> {
let curr = self.latest_index().await?.unwrap_or(0);
if index > curr {
Expand All @@ -162,6 +217,7 @@ impl CheckpointSyncer for GcsStorageClient {
}

/// Attempt to fetch the signed (checkpoint, messageId) tuple at this index
#[instrument(skip(self, index))]
async fn fetch_checkpoint(&self, index: u32) -> Result<Option<SignedCheckpointWithMessageId>> {
match self
.inner
Expand All @@ -179,56 +235,64 @@ impl CheckpointSyncer for GcsStorageClient {
}

/// Write the signed (checkpoint, messageId) tuple to this syncer
#[instrument(skip(self, signed_checkpoint))]
async fn write_checkpoint(
&self,
signed_checkpoint: &SignedCheckpointWithMessageId,
) -> Result<()> {
self.inner
.insert_object(
&self.bucket,
GcsStorageClient::get_checkpoint_key(signed_checkpoint.value.index),
serde_json::to_vec(signed_checkpoint)?,
)
.await?;
Ok(())
let object_name = Self::get_checkpoint_key(signed_checkpoint.value.index);
let data = serde_json::to_vec(signed_checkpoint)?;
self.upload_and_log(&object_name, data).await
}

/// Write the agent metadata to this syncer
#[instrument(skip(self, metadata))]
async fn write_metadata(&self, metadata: &AgentMetadata) -> Result<()> {
let serialized_metadata = serde_json::to_string_pretty(metadata)?;
self.inner
.insert_object(&self.bucket, METADATA_KEY, serialized_metadata)
.await?;
Ok(())
let object_name = self.object_path(METADATA_KEY);
let data = serde_json::to_string_pretty(metadata)?.into_bytes();
self.upload_and_log(&object_name, data).await
}

/// Write the signed announcement to this syncer
async fn write_announcement(&self, signed_announcement: &SignedAnnouncement) -> Result<()> {
self.inner
.insert_object(
&self.bucket,
ANNOUNCEMENT_KEY,
serde_json::to_string(signed_announcement)?,
)
.await?;
Ok(())
#[instrument(skip(self, announcement))]
async fn write_announcement(&self, announcement: &SignedAnnouncement) -> Result<()> {
let object_name = self.object_path(ANNOUNCEMENT_KEY);
let data = serde_json::to_string(announcement)?.into_bytes();
self.upload_and_log(&object_name, data).await
}

/// Return the announcement storage location for this syncer
#[instrument(skip(self))]
fn announcement_location(&self) -> String {
format!("gs://{}/{}", &self.bucket, ANNOUNCEMENT_KEY)
let location = format!(
"gs://{}/{}",
&self.bucket,
self.object_path(ANNOUNCEMENT_KEY)
);
info!("Announcement storage location: '{}'", location);
location
}

async fn write_reorg_status(&self, reorged_event: &ReorgEvent) -> Result<()> {
let serialized_metadata = serde_json::to_string_pretty(reorged_event)?;
self.inner
.insert_object(&self.bucket, REORG_FLAG_KEY, serialized_metadata)
.await?;
Ok(())
/// Write the reorg status to this syncer
#[instrument(skip(self, reorg_event))]
async fn write_reorg_status(&self, reorg_event: &ReorgEvent) -> Result<()> {
let object_name = REORG_FLAG_KEY;
let data = serde_json::to_string_pretty(reorg_event)?.into_bytes();
self.upload_and_log(object_name, data).await
}

/// Read the reorg status from this syncer
#[instrument(skip(self))]
async fn reorg_status(&self) -> Result<Option<ReorgEvent>> {
Ok(None)
match self.inner.get_object(&self.bucket, REORG_FLAG_KEY).await {
Ok(data) => Ok(Some(serde_json::from_slice(data.as_ref())?)),
Err(e) => match e {
ObjectError::Failure(Error::HttpStatus(HttpStatusError(StatusCode::NOT_FOUND))) => {
Ok(None)
}
_ => bail!(e),
},
}
}
}

Expand Down
Loading