Skip to content

Commit

Permalink
refactor(external-prover-api): Polish the API implementation (#2774)
Browse files Browse the repository at this point in the history
## What ❔

- Use `L1BatchProofForL1` instead of `VerifyProofRequest`.
- Rework errors: do not use "branching" variants that are handled
separately in `IntoResponse`; instead use one variant per possible
error.
- Use `thiserror` to improve ergonomics of errors.
- Do not use `Multipart` directly, instead use a dedicated type that
implements `FromRequest`.
- Introduce `Api` structure to implement axum API (instead of procedural
approach) -- aligns better with the framework design.
- Better separate `Processor` and `Api` in a way that `Processor` is
backend-agnostic (e.g. know nothing about `axum`).
- Remove dependency on `zksync_config`.
- Improve framework integration.
- Other minor things.

## Why ❔

Ergonomics, maintainability, and readability.
  • Loading branch information
popzxc authored Aug 30, 2024
1 parent 05c940e commit 755fc4a
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 290 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion core/node/external_proof_integration_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ categories.workspace = true

[dependencies]
axum = { workspace = true, features = ["multipart"] }
async-trait.workspace = true
tracing.workspace = true
thiserror.workspace = true
zksync_prover_interface.workspace = true
zksync_basic_types.workspace = true
zksync_config.workspace = true
zksync_object_store.workspace = true
zksync_dal.workspace = true
tokio.workspace = true
Expand Down
117 changes: 55 additions & 62 deletions core/node/external_proof_integration_api/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,81 +6,74 @@ use zksync_basic_types::L1BatchNumber;
use zksync_dal::DalError;
use zksync_object_store::ObjectStoreError;

#[derive(Debug, thiserror::Error)]
pub(crate) enum ProcessorError {
ObjectStore(ObjectStoreError),
Dal(DalError),
Serialization(bincode::Error),
#[error("Failed to deserialize proof data")]
Serialization(#[from] bincode::Error),
#[error("Invalid proof submitted")]
InvalidProof,
#[error("Batch {0} is not yet ready for proving. Most likely our proof for this batch is not generated yet, try again later")]
BatchNotReady(L1BatchNumber),
#[error("Invalid file: {0}")]
InvalidFile(#[from] FileError),
#[error("Internal error")]
Internal,
#[error("Proof verification not possible anymore, batch is too old")]
ProofIsGone,
}

impl From<ObjectStoreError> for ProcessorError {
fn from(err: ObjectStoreError) -> Self {
Self::ObjectStore(err)
impl ProcessorError {
fn status_code(&self) -> StatusCode {
match self {
Self::Internal => StatusCode::INTERNAL_SERVER_ERROR,
Self::Serialization(_) => StatusCode::BAD_REQUEST,
Self::InvalidProof => StatusCode::BAD_REQUEST,
Self::InvalidFile(_) => StatusCode::BAD_REQUEST,
Self::BatchNotReady(_) => StatusCode::NOT_FOUND,
Self::ProofIsGone => StatusCode::GONE,
}
}
}

impl From<DalError> for ProcessorError {
fn from(err: DalError) -> Self {
Self::Dal(err)
impl IntoResponse for ProcessorError {
fn into_response(self) -> Response {
(self.status_code(), self.to_string()).into_response()
}
}

impl From<bincode::Error> for ProcessorError {
fn from(err: bincode::Error) -> Self {
Self::Serialization(err)
impl From<ObjectStoreError> for ProcessorError {
fn from(err: ObjectStoreError) -> Self {
match err {
ObjectStoreError::KeyNotFound(_) => {
tracing::debug!("Too old proof was requested: {:?}", err);
Self::ProofIsGone
}
_ => {
tracing::warn!("GCS error: {:?}", err);
Self::Internal
}
}
}
}

impl IntoResponse for ProcessorError {
fn into_response(self) -> Response {
let (status_code, message) = match self {
ProcessorError::ObjectStore(err) => {
tracing::error!("GCS error: {:?}", err);
match err {
ObjectStoreError::KeyNotFound(_) => (
StatusCode::NOT_FOUND,
"Proof verification not possible anymore, batch is too old.".to_owned(),
),
_ => (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed fetching from GCS".to_owned(),
),
}
}
ProcessorError::Dal(err) => {
tracing::error!("Sqlx error: {:?}", err);
match err.inner() {
zksync_dal::SqlxError::RowNotFound => {
(StatusCode::NOT_FOUND, "Non existing L1 batch".to_owned())
}
_ => (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed fetching/saving from db".to_owned(),
),
}
}
ProcessorError::Serialization(err) => {
tracing::error!("Serialization error: {:?}", err);
(
StatusCode::BAD_REQUEST,
"Failed to deserialize proof data".to_owned(),
)
}
ProcessorError::BatchNotReady(l1_batch_number) => {
tracing::error!(
"Batch {l1_batch_number:?} is not yet ready for proving. Most likely our proof for this batch is not generated yet"
);
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Batch {l1_batch_number:?} is not yet ready for proving. Most likely our proof for this batch is not generated yet, try again later"),
)
}
ProcessorError::InvalidProof => {
tracing::error!("Invalid proof data");
(StatusCode::BAD_REQUEST, "Invalid proof data".to_owned())
}
};
(status_code, message).into_response()
impl From<DalError> for ProcessorError {
fn from(_err: DalError) -> Self {
// We don't want to check if the error is `RowNotFound`: we check that batch exists before
// processing a request, so it's handled separately.
// Thus, any unhandled error from DAL is an internal error.
Self::Internal
}
}

#[derive(Debug, thiserror::Error)]
pub(crate) enum FileError {
#[error("Multipart error: {0}")]
MultipartRejection(#[from] axum::extract::multipart::MultipartRejection),
#[error("Multipart error: {0}")]
Multipart(#[from] axum::extract::multipart::MultipartError),
#[error("File not found in request. It was expected to be in the field {field_name} with the content type {content_type}")]
FileNotFound {
field_name: &'static str,
content_type: &'static str,
},
}
155 changes: 84 additions & 71 deletions core/node/external_proof_integration_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,81 @@ mod error;
mod metrics;
mod middleware;
mod processor;
mod types;

use std::{net::SocketAddr, sync::Arc};
pub use crate::processor::Processor;

use std::net::SocketAddr;

use anyhow::Context;
use axum::{
extract::{Multipart, Path, Request},
extract::{Path, Request, State},
middleware::Next,
routing::{get, post},
Router,
};
use error::ProcessorError;
use tokio::sync::watch;
use zksync_basic_types::commitment::L1BatchCommitmentMode;
use zksync_config::configs::external_proof_integration_api::ExternalProofIntegrationApiConfig;
use zksync_dal::{ConnectionPool, Core};
use zksync_object_store::ObjectStore;
use types::{ExternalProof, ProofGenerationDataResponse};
use zksync_basic_types::L1BatchNumber;

use crate::{
metrics::{CallOutcome, Method},
middleware::MetricsMiddleware,
processor::Processor,
};

pub async fn run_server(
config: ExternalProofIntegrationApiConfig,
blob_store: Arc<dyn ObjectStore>,
connection_pool: ConnectionPool<Core>,
commitment_mode: L1BatchCommitmentMode,
mut stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
let bind_address = SocketAddr::from(([0, 0, 0, 0], config.http_port));
tracing::info!("Starting external prover API server on {bind_address}");
let app = create_router(blob_store, connection_pool, commitment_mode).await;
/// External API implementation.
#[derive(Debug)]
pub struct Api {
router: Router,
port: u16,
}

let listener = tokio::net::TcpListener::bind(bind_address)
.await
.with_context(|| format!("Failed binding external prover API server to {bind_address}"))?;
axum::serve(listener, app)
impl Api {
pub fn new(processor: Processor, port: u16) -> Self {
let middleware_factory = |method: Method| {
axum::middleware::from_fn(move |req: Request, next: Next| async move {
let middleware = MetricsMiddleware::new(method);
let response = next.run(req).await;
let outcome = match response.status().is_success() {
true => CallOutcome::Success,
false => CallOutcome::Failure,
};
middleware.observe(outcome);
response
})
};

let router = Router::new()
.route(
"/proof_generation_data",
get(Api::latest_generation_data)
.layer(middleware_factory(Method::GetLatestProofGenerationData)),
)
.route(
"/proof_generation_data/:l1_batch_number",
get(Api::generation_data_for_existing_batch)
.layer(middleware_factory(Method::GetSpecificProofGenerationData)),
)
.route(
"/verify_proof/:l1_batch_number",
post(Api::verify_proof).layer(middleware_factory(Method::VerifyProof)),
)
.with_state(processor);

Self { router, port }
}

pub async fn run(self, mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let bind_address = SocketAddr::from(([0, 0, 0, 0], self.port));
tracing::info!("Starting external prover API server on {bind_address}");

let listener = tokio::net::TcpListener::bind(bind_address)
.await
.with_context(|| {
format!("Failed binding external prover API server to {bind_address}")
})?;
axum::serve(listener, self.router)
.with_graceful_shutdown(async move {
if stop_receiver.changed().await.is_err() {
tracing::warn!("Stop signal sender for external prover API server was dropped without sending a signal");
Expand All @@ -47,57 +85,32 @@ pub async fn run_server(
})
.await
.context("External prover API server failed")?;
tracing::info!("External prover API server shut down");
Ok(())
}
tracing::info!("External prover API server shut down");
Ok(())
}

async fn create_router(
blob_store: Arc<dyn ObjectStore>,
connection_pool: ConnectionPool<Core>,
commitment_mode: L1BatchCommitmentMode,
) -> Router {
let mut processor =
Processor::new(blob_store.clone(), connection_pool.clone(), commitment_mode);
let verify_proof_processor = processor.clone();
let specific_proof_processor = processor.clone();
async fn latest_generation_data(
State(processor): State<Processor>,
) -> Result<ProofGenerationDataResponse, ProcessorError> {
processor.get_proof_generation_data().await
}

let middleware_factory = |method: Method| {
axum::middleware::from_fn(move |req: Request, next: Next| async move {
let middleware = MetricsMiddleware::new(method);
let response = next.run(req).await;
let outcome = match response.status().is_success() {
true => CallOutcome::Success,
false => CallOutcome::Failure,
};
middleware.observe(outcome);
response
})
};
async fn generation_data_for_existing_batch(
State(processor): State<Processor>,
Path(l1_batch_number): Path<u32>,
) -> Result<ProofGenerationDataResponse, ProcessorError> {
processor
.proof_generation_data_for_existing_batch(L1BatchNumber(l1_batch_number))
.await
}

Router::new()
.route(
"/proof_generation_data",
get(move || async move { processor.get_proof_generation_data().await })
.layer(middleware_factory(Method::GetLatestProofGenerationData)),
)
.route(
"/proof_generation_data/:l1_batch_number",
get(move |l1_batch_number: Path<u32>| async move {
specific_proof_processor
.proof_generation_data_for_existing_batch(l1_batch_number)
.await
})
.layer(middleware_factory(Method::GetSpecificProofGenerationData)),
)
.route(
"/verify_proof/:l1_batch_number",
post(
move |l1_batch_number: Path<u32>, multipart: Multipart| async move {
verify_proof_processor
.verify_proof(l1_batch_number, multipart)
.await
},
)
.layer(middleware_factory(Method::VerifyProof)),
)
async fn verify_proof(
State(processor): State<Processor>,
Path(l1_batch_number): Path<u32>,
proof: ExternalProof,
) -> Result<(), ProcessorError> {
processor
.verify_proof(L1BatchNumber(l1_batch_number), proof)
.await
}
}
Loading

0 comments on commit 755fc4a

Please sign in to comment.