From b5944e50f25feee8fc1b5942f3851fa583fc71e1 Mon Sep 17 00:00:00 2001 From: taco-paco Date: Thu, 3 Oct 2024 17:18:16 +0900 Subject: [PATCH 1/2] feat: polling result of the task --- crates/lambdas/Cargo.toml | 7 +- crates/lambdas/src/common/utils.rs | 7 +- crates/lambdas/src/compile.rs | 2 +- crates/lambdas/src/generate_presigned_urls.rs | 2 +- crates/lambdas/src/poll.rs | 130 ++++++++++++++++++ crates/lambdas/src/verify.rs | 2 +- crates/types/Cargo.toml | 2 + crates/types/src/item/errors.rs | 16 ++- crates/types/src/item/task_result.rs | 4 +- 9 files changed, 163 insertions(+), 9 deletions(-) create mode 100644 crates/lambdas/src/poll.rs diff --git a/crates/lambdas/Cargo.toml b/crates/lambdas/Cargo.toml index 6a3f9e52..147e9841 100644 --- a/crates/lambdas/Cargo.toml +++ b/crates/lambdas/Cargo.toml @@ -35,4 +35,9 @@ path = "src/compile.rs" [[bin]] name = "verify" version = "0.0.1" -path = "src/verify.rs" \ No newline at end of file +path = "src/verify.rs" + +[[bin]] +name = "poll" +version = "0.0.1" +path = "src/poll.rs" \ No newline at end of file diff --git a/crates/lambdas/src/common/utils.rs b/crates/lambdas/src/common/utils.rs index 79be8f8d..2657815c 100644 --- a/crates/lambdas/src/common/utils.rs +++ b/crates/lambdas/src/common/utils.rs @@ -1,11 +1,14 @@ use lambda_http::{Request, RequestPayloadExt, Response}; -use serde::de::DeserializeOwned; +use serde::Deserialize; use crate::common::errors::{Error, Error::HttpError}; const EMPTY_PAYLOAD_ERROR: &str = "Request payload is empty"; -pub fn extract_request(request: Request) -> Result { +pub fn extract_request(request: &Request) -> Result +where + T: for<'de> Deserialize<'de>, +{ return match request.payload::() { Ok(Some(val)) => Ok(val), Ok(None) => { diff --git a/crates/lambdas/src/compile.rs b/crates/lambdas/src/compile.rs index 6ce545c0..25f841ac 100644 --- a/crates/lambdas/src/compile.rs +++ b/crates/lambdas/src/compile.rs @@ -113,7 +113,7 @@ async fn process_request( s3_client: &aws_sdk_s3::Client, bucket_name: &str, ) -> Result, Error> { - let request = extract_request::(request)?; + let request = extract_request::(&request)?; let objects = s3_client .list_objects_v2() diff --git a/crates/lambdas/src/generate_presigned_urls.rs b/crates/lambdas/src/generate_presigned_urls.rs index 0041809a..89b5f971 100644 --- a/crates/lambdas/src/generate_presigned_urls.rs +++ b/crates/lambdas/src/generate_presigned_urls.rs @@ -66,7 +66,7 @@ async fn process_request( bucket_name: &str, s3_client: &aws_sdk_s3::Client, ) -> Result, Error> { - let request = extract_request::(request)?; + let request = extract_request::(&request)?; if request.files.len() > MAX_FILES { warn!("MAX_FILES limit exceeded"); let response = LambdaResponse::builder() diff --git a/crates/lambdas/src/poll.rs b/crates/lambdas/src/poll.rs new file mode 100644 index 00000000..074a9c0f --- /dev/null +++ b/crates/lambdas/src/poll.rs @@ -0,0 +1,130 @@ +use aws_config::BehaviorVersion; +use aws_sdk_dynamodb::types::AttributeValue; +use lambda_http::http::StatusCode; +use lambda_http::{ + run, service_fn, Error as LambdaError, Request as LambdaRequest, Response as LambdaResponse, +}; +use serde::Deserialize; +use tracing::{error}; +use types::item::errors::ItemError; +use types::item::task_result::TaskResult; +use types::{ + item::{Item, Status}, +}; +use uuid::Uuid; + +const TABLE_NAME_DEFAULT: &str = "zksync-table"; +const NO_SUCH_ITEM: &str = "No such item"; + +mod common; +use crate::common::{errors::Error, utils::extract_request}; + +#[derive(Deserialize)] +struct PollRequest { + pub id: Uuid, +} + +#[tracing::instrument(skip(dynamo_client, table_name))] +async fn process_request( + request: LambdaRequest, + dynamo_client: &aws_sdk_dynamodb::Client, + table_name: &str, +) -> Result, Error> { + let request = extract_request::(&request)?; + let output = dynamo_client + .get_item() + .table_name(table_name) + .key( + Item::primary_key_name(), + AttributeValue::S(request.id.to_string()), + ) + .send() + .await + .map_err(Box::new)?; + + let raw_item = output.item.ok_or_else(|| { + error!("No objects in folder: {}", request.id); + let response = LambdaResponse::builder() + .status(StatusCode::NOT_FOUND) + .header("content-type", "text/html") + .body(NO_SUCH_ITEM.to_string()) + .map_err(Error::from); + + match response { + Ok(response) => Error::HttpError(response), + Err(err) => err, + } + })?; + + let item: Item = raw_item.try_into().map_err(|err: ItemError| { + let response = LambdaResponse::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header("content-type", "text/html") + .body(err.to_string()) + .map_err(Error::from); + + match response { + Ok(response) => Error::HttpError(response), + Err(err) => err, + } + })?; + + let task_result = if let Status::Done(task_result) = item.status { + Ok(task_result) + } else { + let response = LambdaResponse::builder() + .status(StatusCode::BAD_REQUEST) + .header("content-type", "text/html") + .body("Task isn't ready".to_owned()) + .map_err(Error::from)?; + + Err(Error::HttpError(response)) + }?; + + match task_result { + TaskResult::Success(value) => { + let response = LambdaResponse::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(serde_json::to_string(&value)?)?; + + Ok(response) + } + TaskResult::Failure(value) => { + let status_code: StatusCode = value.error_type.into(); + let response = LambdaResponse::builder() + .status(status_code) + .header("content-type", "text/html") + .body(value.message) + .map_err(Box::new)?; + + Err(Error::HttpError(response)) + } + } +} + +#[tokio::main] +async fn main() -> Result<(), LambdaError> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_ansi(false) + .without_time() // CloudWatch will add the ingestion time + .with_target(false) + .init(); + + let table_name = std::env::var("TABLE_NAME").unwrap_or(TABLE_NAME_DEFAULT.into()); + + let config = aws_config::load_defaults(BehaviorVersion::latest()).await; + let dynamo_client = aws_sdk_dynamodb::Client::new(&config); + + run(service_fn(|request: LambdaRequest| async { + let result = process_request(request, &dynamo_client, &table_name).await; + + match result { + Ok(val) => Ok(val), + Err(Error::HttpError(val)) => Ok(val), + Err(Error::LambdaError(err)) => Err(err), + } + })) + .await +} diff --git a/crates/lambdas/src/verify.rs b/crates/lambdas/src/verify.rs index 5d60e63d..f8b7775f 100644 --- a/crates/lambdas/src/verify.rs +++ b/crates/lambdas/src/verify.rs @@ -102,7 +102,7 @@ async fn process_request( s3_client: &aws_sdk_s3::Client, bucket_name: &str, ) -> Result, Error> { - let request = extract_request::(request)?; + let request = extract_request::(&request)?; let objects = s3_client .list_objects_v2() diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 564f7c51..a026b6ff 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -12,5 +12,7 @@ serde_json.workspace = true thiserror.workspace = true uuid.workspace = true +http = "1.1.0" + [dev-dependencies] serde_json.workspace = true diff --git a/crates/types/src/item/errors.rs b/crates/types/src/item/errors.rs index b168ad5c..d4d2a9bf 100644 --- a/crates/types/src/item/errors.rs +++ b/crates/types/src/item/errors.rs @@ -20,7 +20,7 @@ impl ItemError { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] #[cfg_attr(test, derive(PartialEq))] pub enum ServerError { UnsupportedCompilerVersion, @@ -37,7 +37,7 @@ impl Into<&'static str> for ServerError { ServerError::CompilationError => "CompilationError", ServerError::InternalError => "InternalError", ServerError::UnknownNetworkError => "UnknownNetworkError", - ServerError::VerificationError => "VerificationError" + ServerError::VerificationError => "VerificationError", } } } @@ -55,3 +55,15 @@ impl TryFrom<&str> for ServerError { } } } + +impl Into for ServerError { + fn into(self) -> http::StatusCode { + match self { + Self::UnsupportedCompilerVersion + | Self::CompilationError + | Self::UnknownNetworkError + | Self::VerificationError => http::StatusCode::BAD_REQUEST, + Self::InternalError => http::StatusCode::INTERNAL_SERVER_ERROR, + } + } +} diff --git a/crates/types/src/item/task_result.rs b/crates/types/src/item/task_result.rs index 90098661..74223405 100644 --- a/crates/types/src/item/task_result.rs +++ b/crates/types/src/item/task_result.rs @@ -1,4 +1,5 @@ use aws_sdk_dynamodb::types::AttributeValue; +use serde::Serialize; use std::collections::HashMap; use std::fmt::Formatter; @@ -77,8 +78,9 @@ impl TryFrom<&AttributeMap> for TaskResult { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] #[cfg_attr(test, derive(PartialEq))] +#[serde(untagged)] pub enum TaskSuccess { Compile { presigned_urls: Vec }, Verify { message: String }, From 95da4d41d6b5ae29afc2229ccd61bd2feac060cb Mon Sep 17 00:00:00 2001 From: taco-paco Date: Thu, 3 Oct 2024 17:22:40 +0900 Subject: [PATCH 2/2] fix: logging --- crates/lambdas/src/poll.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/crates/lambdas/src/poll.rs b/crates/lambdas/src/poll.rs index 074a9c0f..5322b1d0 100644 --- a/crates/lambdas/src/poll.rs +++ b/crates/lambdas/src/poll.rs @@ -5,12 +5,10 @@ use lambda_http::{ run, service_fn, Error as LambdaError, Request as LambdaRequest, Response as LambdaResponse, }; use serde::Deserialize; -use tracing::{error}; +use tracing::error; use types::item::errors::ItemError; use types::item::task_result::TaskResult; -use types::{ - item::{Item, Status}, -}; +use types::item::{Item, Status}; use uuid::Uuid; const TABLE_NAME_DEFAULT: &str = "zksync-table"; @@ -43,7 +41,6 @@ async fn process_request( .map_err(Box::new)?; let raw_item = output.item.ok_or_else(|| { - error!("No objects in folder: {}", request.id); let response = LambdaResponse::builder() .status(StatusCode::NOT_FOUND) .header("content-type", "text/html") @@ -57,6 +54,7 @@ async fn process_request( })?; let item: Item = raw_item.try_into().map_err(|err: ItemError| { + error!("Failed to deserialize item. id: {}", request.id); let response = LambdaResponse::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .header("content-type", "text/html")