From 765224890e523dfae3c03667921cb50da07f898a Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Fri, 13 Sep 2024 10:42:02 +0300 Subject: [PATCH 01/12] move save_node_aggregation directly to save_artifacts --- .../bin/witness_generator/src/artifacts.rs | 2 +- .../src/leaf_aggregation/artifacts.rs | 25 +++++++++++-------- .../src/node_aggregation/artifacts.rs | 21 ++++++++-------- .../crates/bin/witness_generator/src/utils.rs | 22 ---------------- 4 files changed, 26 insertions(+), 44 deletions(-) diff --git a/prover/crates/bin/witness_generator/src/artifacts.rs b/prover/crates/bin/witness_generator/src/artifacts.rs index f509d3b2f64..331ef456a4b 100644 --- a/prover/crates/bin/witness_generator/src/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/artifacts.rs @@ -6,7 +6,7 @@ use zksync_prover_dal::{ConnectionPool, Prover}; #[derive(Debug)] pub(crate) struct AggregationBlobUrls { - pub aggregations_urls: String, + pub aggregation_urls: String, pub circuit_ids_and_urls: Vec<(u8, String)>, } diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs index a94587d00ec..6e09dbf3cb1 100644 --- a/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs @@ -3,10 +3,11 @@ use std::time::Instant; use async_trait::async_trait; use zksync_object_store::ObjectStore; use zksync_prover_dal::{ConnectionPool, Prover, ProverDal}; -use zksync_prover_fri_types::keys::ClosedFormInputKey; +use zksync_prover_fri_types::keys::{AggregationsKey, ClosedFormInputKey}; use zksync_prover_fri_utils::get_recursive_layer_circuit_id_for_base_layer; use zksync_types::{basic_fri_types::AggregationRound, prover_dal::LeafAggregationJobMetadata}; +use crate::utils::AggregationWrapper; use crate::{ artifacts::{AggregationBlobUrls, ArtifactsManager, BlobUrls}, leaf_aggregation::{LeafAggregationArtifacts, LeafAggregationWitnessGenerator}, @@ -47,19 +48,21 @@ impl ArtifactsManager for LeafAggregationWitnessGenerator { object_store: &dyn ObjectStore, ) -> BlobUrls { let started_at = Instant::now(); - let aggregations_urls = save_node_aggregations_artifacts( - artifacts.block_number, - get_recursive_layer_circuit_id_for_base_layer(artifacts.circuit_id), - 0, - artifacts.aggregations, - object_store, - ) - .await; + let key = AggregationsKey { + block_number, + circuit_id, + depth, + }; + let aggregation_urls = object_store + .put(key, &AggregationWrapper(artifacts.aggregations)) + .await + .unwrap(); + WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::LeafAggregation.into()] .observe(started_at.elapsed()); BlobUrls::Aggregation(AggregationBlobUrls { - aggregations_urls, + aggregation_urls, circuit_ids_and_urls: artifacts.circuit_ids_and_urls, }) } @@ -124,7 +127,7 @@ impl ArtifactsManager for LeafAggregationWitnessGenerator { get_recursive_layer_circuit_id_for_base_layer(artifacts.circuit_id), number_of_dependent_jobs, 0, - blob_urls.aggregations_urls, + blob_urls.aggregation_urls, ) .await; tracing::info!( diff --git a/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs b/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs index 245027f0d67..1ce6dbfa9e8 100644 --- a/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs @@ -52,20 +52,21 @@ impl ArtifactsManager for NodeAggregationWitnessGenerator { object_store: &dyn ObjectStore, ) -> BlobUrls { let started_at = Instant::now(); - let aggregations_urls = save_node_aggregations_artifacts( - artifacts.block_number, - artifacts.circuit_id, - artifacts.depth, - artifacts.next_aggregations, - object_store, - ) - .await; + let key = AggregationsKey { + block_number, + circuit_id, + depth, + }; + let aggregation_urls = object_store + .put(key, &AggregationWrapper(artifacts.next_aggregations)) + .await + .unwrap(); WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::NodeAggregation.into()] .observe(started_at.elapsed()); BlobUrls::Aggregation(AggregationBlobUrls { - aggregations_urls, + aggregation_urls, circuit_ids_and_urls: artifacts.recursive_circuit_ids_and_urls, }) } @@ -111,7 +112,7 @@ impl ArtifactsManager for NodeAggregationWitnessGenerator { artifacts.circuit_id, Some(dependent_jobs as i32), artifacts.depth, - &blob_urls.aggregations_urls, + &blob_urls.aggregation_urls, protocol_version_id, ) .await; diff --git a/prover/crates/bin/witness_generator/src/utils.rs b/prover/crates/bin/witness_generator/src/utils.rs index 3ea2b539773..8524bdae9ff 100644 --- a/prover/crates/bin/witness_generator/src/utils.rs +++ b/prover/crates/bin/witness_generator/src/utils.rs @@ -204,28 +204,6 @@ pub async fn save_recursive_layer_prover_input_artifacts( ids_and_urls } -#[tracing::instrument( - skip_all, - fields(l1_batch = %block_number, circuit_id = %circuit_id) -)] -pub async fn save_node_aggregations_artifacts( - block_number: L1BatchNumber, - circuit_id: u8, - depth: u16, - aggregations: Vec<(u64, RecursionQueueSimulator)>, - object_store: &dyn ObjectStore, -) -> String { - let key = AggregationsKey { - block_number, - circuit_id, - depth, - }; - object_store - .put(key, &AggregationWrapper(aggregations)) - .await - .unwrap() -} - #[tracing::instrument(skip_all)] pub async fn load_proofs_for_job_ids( job_ids: &[u32], From c1a1d61f98ca8255511b5eff9cd5276e2808bd30 Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Fri, 13 Sep 2024 14:28:45 +0300 Subject: [PATCH 02/12] create witness generator trait --- .../bin/witness_generator/src/artifacts.rs | 2 +- .../src/basic_circuits/artifacts.rs | 2 +- .../src/basic_circuits/job_processor.rs | 2 +- .../src/basic_circuits/mod.rs | 21 +++++++++++++++++++ .../src/leaf_aggregation/artifacts.rs | 2 +- .../src/leaf_aggregation/job_processor.rs | 2 +- .../src/leaf_aggregation/mod.rs | 20 ++++++++++++++++++ .../crates/bin/witness_generator/src/lib.rs | 1 + .../src/node_aggregation/artifacts.rs | 2 +- .../src/node_aggregation/job_processor.rs | 2 +- .../src/node_aggregation/mod.rs | 20 ++++++++++++++++++ .../src/recursion_tip/artifacts.rs | 2 +- .../src/recursion_tip/job_processor.rs | 2 +- .../src/recursion_tip/mod.rs | 20 ++++++++++++++++++ .../src/scheduler/artifacts.rs | 2 +- .../src/scheduler/job_processor.rs | 2 +- .../witness_generator/src/scheduler/mod.rs | 19 +++++++++++++++++ .../src/witness_generator.rs | 17 +++++++++++++++ 18 files changed, 129 insertions(+), 11 deletions(-) create mode 100644 prover/crates/bin/witness_generator/src/witness_generator.rs diff --git a/prover/crates/bin/witness_generator/src/artifacts.rs b/prover/crates/bin/witness_generator/src/artifacts.rs index 331ef456a4b..953ee3d7fc0 100644 --- a/prover/crates/bin/witness_generator/src/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/artifacts.rs @@ -40,7 +40,7 @@ pub(crate) trait ArtifactsManager { object_store: &dyn ObjectStore, ) -> BlobUrls; - async fn update_database( + async fn save_to_database( connection_pool: &ConnectionPool, job_id: u32, started_at: Instant, diff --git a/prover/crates/bin/witness_generator/src/basic_circuits/artifacts.rs b/prover/crates/bin/witness_generator/src/basic_circuits/artifacts.rs index 3447659f829..fad32b082f5 100644 --- a/prover/crates/bin/witness_generator/src/basic_circuits/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/basic_circuits/artifacts.rs @@ -51,7 +51,7 @@ impl ArtifactsManager for BasicWitnessGenerator { } #[tracing::instrument(skip_all, fields(l1_batch = %job_id))] - async fn update_database( + async fn save_to_database( connection_pool: &ConnectionPool, job_id: u32, started_at: Instant, diff --git a/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs b/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs index 08732689e3a..ab0e4022f51 100644 --- a/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs @@ -116,7 +116,7 @@ impl JobProcessor for BasicWitnessGenerator { WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::BasicCircuits.into()] .observe(blob_started_at.elapsed()); - Self::update_database( + Self::save_to_database( &self.prover_connection_pool, job_id.0, started_at, diff --git a/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs b/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs index c9755c333da..9a94499ced3 100644 --- a/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs +++ b/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs @@ -35,11 +35,14 @@ use zksync_object_store::ObjectStore; use zksync_prover_dal::{ConnectionPool, Prover}; use zksync_prover_fri_types::{keys::ClosedFormInputKey, CircuitAuxData}; use zksync_prover_interface::inputs::WitnessInputData; +use zksync_prover_keystore::keystore::Keystore; use zksync_system_constants::BOOTLOADER_ADDRESS; use zksync_types::{ basic_fri_types::AggregationRound, protocol_version::ProtocolSemanticVersion, L1BatchNumber, }; +use crate::node_aggregation::NodeAggregationWitnessGenerator; +use crate::witness_generator::WitnessGenerator; use crate::{ metrics::WITNESS_GENERATOR_METRICS, precalculated_merkle_paths_provider::PrecalculatedMerklePathsProvider, @@ -464,3 +467,21 @@ async fn generate_witness( block_aux_witness, ) } + +impl WitnessGenerator for BasicWitnessGenerator { + type Job = (); + type Metadata = (); + type Artifacts = (); + + fn process_job(job: Self::Job, started_at: Instant) -> anyhow::Result { + todo!() + } + + fn prepare_job( + metadata: Self::Metadata, + object_store: &dyn ObjectStore, + keystore: Option, + ) -> Self::Job { + todo!() + } +} diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs index 6e09dbf3cb1..a3821287dfe 100644 --- a/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs @@ -71,7 +71,7 @@ impl ArtifactsManager for LeafAggregationWitnessGenerator { skip_all, fields(l1_batch = %job_id) )] - async fn update_database( + async fn save_to_database( connection_pool: &ConnectionPool, job_id: u32, started_at: Instant, diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs index e032084151e..a68f77eb7c9 100644 --- a/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs @@ -93,7 +93,7 @@ impl JobProcessor for LeafAggregationWitnessGenerator { block_number.0, circuit_id, ); - Self::update_database( + Self::save_to_database( &self.prover_connection_pool, job_id, started_at, diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation/mod.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation/mod.rs index d669a4cc97e..a5da4f043a6 100644 --- a/prover/crates/bin/witness_generator/src/leaf_aggregation/mod.rs +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation/mod.rs @@ -29,6 +29,8 @@ use zksync_types::{ prover_dal::LeafAggregationJobMetadata, L1BatchNumber, }; +use crate::node_aggregation::NodeAggregationWitnessGenerator; +use crate::witness_generator::WitnessGenerator; use crate::{ artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, @@ -261,3 +263,21 @@ pub async fn process_leaf_aggregation_job( closed_form_inputs: job.closed_form_inputs.0, } } + +impl WitnessGenerator for LeafAggregationWitnessGenerator { + type Job = (); + type Metadata = (); + type Artifacts = (); + + fn process_job(job: Self::Job, started_at: Instant) -> anyhow::Result { + todo!() + } + + fn prepare_job( + metadata: Self::Metadata, + object_store: &dyn ObjectStore, + keystore: Option, + ) -> Self::Job { + todo!() + } +} diff --git a/prover/crates/bin/witness_generator/src/lib.rs b/prover/crates/bin/witness_generator/src/lib.rs index c0ac9718c6e..b24b548a49b 100644 --- a/prover/crates/bin/witness_generator/src/lib.rs +++ b/prover/crates/bin/witness_generator/src/lib.rs @@ -14,3 +14,4 @@ mod storage_oracle; mod tests; pub mod utils; mod witness; +pub mod witness_generator; diff --git a/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs b/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs index 1ce6dbfa9e8..d70a2132771 100644 --- a/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs @@ -75,7 +75,7 @@ impl ArtifactsManager for NodeAggregationWitnessGenerator { skip_all, fields(l1_batch = % job_id) )] - async fn update_database( + async fn save_to_database( connection_pool: &ConnectionPool, job_id: u32, started_at: Instant, diff --git a/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs b/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs index a015462cd6f..a0c22efc0ea 100644 --- a/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs @@ -84,7 +84,7 @@ impl JobProcessor for NodeAggregationWitnessGenerator { WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::NodeAggregation.into()] .observe(blob_save_started_at.elapsed()); - Self::update_database( + Self::save_to_database( &self.prover_connection_pool, job_id, started_at, diff --git a/prover/crates/bin/witness_generator/src/node_aggregation/mod.rs b/prover/crates/bin/witness_generator/src/node_aggregation/mod.rs index 047caa363a8..1434157285a 100644 --- a/prover/crates/bin/witness_generator/src/node_aggregation/mod.rs +++ b/prover/crates/bin/witness_generator/src/node_aggregation/mod.rs @@ -26,6 +26,8 @@ use zksync_types::{ prover_dal::NodeAggregationJobMetadata, L1BatchNumber, }; +use crate::scheduler::SchedulerWitnessGenerator; +use crate::witness_generator::WitnessGenerator; use crate::{ artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, @@ -254,3 +256,21 @@ pub async fn prepare_job( all_leafs_layer_params: get_leaf_vk_params(&keystore).context("get_leaf_vk_params()")?, }) } + +impl WitnessGenerator for NodeAggregationWitnessGenerator { + type Job = (); + type Metadata = (); + type Artifacts = (); + + fn process_job(job: Self::Job, started_at: Instant) -> anyhow::Result { + todo!() + } + + fn prepare_job( + metadata: Self::Metadata, + object_store: &dyn ObjectStore, + keystore: Option, + ) -> Self::Job { + todo!() + } +} diff --git a/prover/crates/bin/witness_generator/src/recursion_tip/artifacts.rs b/prover/crates/bin/witness_generator/src/recursion_tip/artifacts.rs index 8379fcf9f93..60707cec306 100644 --- a/prover/crates/bin/witness_generator/src/recursion_tip/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/recursion_tip/artifacts.rs @@ -97,7 +97,7 @@ impl ArtifactsManager for RecursionTipWitnessGenerator { BlobUrls::Url(blob_url) } - async fn update_database( + async fn save_to_database( connection_pool: &ConnectionPool, job_id: u32, started_at: Instant, diff --git a/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs b/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs index f114724cfec..fe8d2931ce2 100644 --- a/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs @@ -98,7 +98,7 @@ impl JobProcessor for RecursionTipWitnessGenerator { WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::RecursionTip.into()] .observe(blob_save_started_at.elapsed()); - Self::update_database( + Self::save_to_database( &self.prover_connection_pool, job_id.0, started_at, diff --git a/prover/crates/bin/witness_generator/src/recursion_tip/mod.rs b/prover/crates/bin/witness_generator/src/recursion_tip/mod.rs index 4abb56a7d78..c6410820a60 100644 --- a/prover/crates/bin/witness_generator/src/recursion_tip/mod.rs +++ b/prover/crates/bin/witness_generator/src/recursion_tip/mod.rs @@ -43,6 +43,8 @@ use zksync_types::{ basic_fri_types::AggregationRound, protocol_version::ProtocolSemanticVersion, L1BatchNumber, }; +use crate::scheduler::SchedulerWitnessGenerator; +use crate::witness_generator::WitnessGenerator; use crate::{ artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, utils::ClosedFormInputWrapper, }; @@ -224,3 +226,21 @@ pub async fn prepare_job( node_vk, }) } + +impl WitnessGenerator for RecursionTipWitnessGenerator { + type Job = (); + type Metadata = (); + type Artifacts = (); + + fn process_job(job: Self::Job, started_at: Instant) -> anyhow::Result { + todo!() + } + + fn prepare_job( + metadata: Self::Metadata, + object_store: &dyn ObjectStore, + keystore: Option, + ) -> Self::Job { + todo!() + } +} diff --git a/prover/crates/bin/witness_generator/src/scheduler/artifacts.rs b/prover/crates/bin/witness_generator/src/scheduler/artifacts.rs index b20a9764188..16dcd65501e 100644 --- a/prover/crates/bin/witness_generator/src/scheduler/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/scheduler/artifacts.rs @@ -51,7 +51,7 @@ impl ArtifactsManager for SchedulerWitnessGenerator { BlobUrls::Url(blob_url) } - async fn update_database( + async fn save_to_database( connection_pool: &ConnectionPool, job_id: u32, started_at: Instant, diff --git a/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs b/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs index fe4f2db4090..01f58d3c697 100644 --- a/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs @@ -97,7 +97,7 @@ impl JobProcessor for SchedulerWitnessGenerator { WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::Scheduler.into()] .observe(blob_save_started_at.elapsed()); - Self::update_database( + Self::save_to_database( &self.prover_connection_pool, job_id.0, started_at, diff --git a/prover/crates/bin/witness_generator/src/scheduler/mod.rs b/prover/crates/bin/witness_generator/src/scheduler/mod.rs index 10230b35c4f..dc51c25f4b3 100644 --- a/prover/crates/bin/witness_generator/src/scheduler/mod.rs +++ b/prover/crates/bin/witness_generator/src/scheduler/mod.rs @@ -27,6 +27,7 @@ use zksync_types::{ basic_fri_types::AggregationRound, protocol_version::ProtocolSemanticVersion, L1BatchNumber, }; +use crate::witness_generator::WitnessGenerator; use crate::{ artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, utils::SchedulerPartialInputWrapper, @@ -181,3 +182,21 @@ pub async fn prepare_job( recursion_tip_vk, }) } + +impl WitnessGenerator for SchedulerWitnessGenerator { + type Job = (); + type Metadata = (); + type Artifacts = (); + + fn process_job(job: Self::Job, started_at: Instant) -> anyhow::Result { + todo!() + } + + fn prepare_job( + metadata: Self::Metadata, + object_store: &dyn ObjectStore, + keystore: Option, + ) -> Self::Job { + todo!() + } +} diff --git a/prover/crates/bin/witness_generator/src/witness_generator.rs b/prover/crates/bin/witness_generator/src/witness_generator.rs new file mode 100644 index 00000000000..4d170f33d51 --- /dev/null +++ b/prover/crates/bin/witness_generator/src/witness_generator.rs @@ -0,0 +1,17 @@ +use std::time::Instant; +use zksync_object_store::ObjectStore; +use zksync_prover_keystore::keystore::Keystore; +use zksync_queued_job_processor::JobProcessor; + +pub trait WitnessGenerator { + type Job: Send + 'static; + type Metadata; + type Artifacts; + + fn process_job(job: Self::Job, started_at: Instant) -> anyhow::Result; + fn prepare_job( + metadata: Self::Metadata, + object_store: &dyn ObjectStore, + keystore: Option, + ) -> Self::Job; +} From e61c3d91e79c6c941a1d866540fa5b54e42ea294 Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Tue, 17 Sep 2024 16:58:24 +0300 Subject: [PATCH 03/12] impl process_job --- .../src/basic_circuits/job_processor.rs | 16 ++- .../src/basic_circuits/mod.rs | 66 +++++----- .../src/leaf_aggregation/job_processor.rs | 9 +- .../src/leaf_aggregation/mod.rs | 35 +++++- .../src/node_aggregation/job_processor.rs | 9 +- .../src/node_aggregation/mod.rs | 116 +++++++++--------- .../src/recursion_tip/job_processor.rs | 6 +- .../src/recursion_tip/mod.rs | 98 +++++++-------- .../src/scheduler/job_processor.rs | 6 +- .../witness_generator/src/scheduler/mod.rs | 97 ++++++++------- .../src/witness_generator.rs | 13 +- 11 files changed, 260 insertions(+), 211 deletions(-) diff --git a/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs b/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs index ab0e4022f51..635776016ab 100644 --- a/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs @@ -11,6 +11,7 @@ use crate::{ artifacts::{ArtifactsManager, BlobUrls, SchedulerBlobUrls}, basic_circuits::{BasicCircuitArtifacts, BasicWitnessGenerator, BasicWitnessGeneratorJob}, metrics::WITNESS_GENERATOR_METRICS, + witness_generator::WitnessGenerator, }; #[async_trait] @@ -18,7 +19,7 @@ impl JobProcessor for BasicWitnessGenerator { type Job = BasicWitnessGeneratorJob; type JobId = L1BatchNumber; // The artifact is optional to support skipping blocks when sampling is enabled. - type JobArtifacts = Option; + type JobArtifacts = BasicCircuitArtifacts; const SERVICE_NAME: &'static str = "fri_basic_circuit_witness_generator"; @@ -68,16 +69,19 @@ impl JobProcessor for BasicWitnessGenerator { _job_id: &Self::JobId, job: BasicWitnessGeneratorJob, started_at: Instant, - ) -> tokio::task::JoinHandle>> { + ) -> tokio::task::JoinHandle> { let object_store = Arc::clone(&self.object_store); let max_circuits_in_flight = self.config.max_circuits_in_flight; tokio::spawn(async move { let block_number = job.block_number; - Ok( - Self::process_job_impl(object_store, job, started_at, max_circuits_in_flight) - .instrument(tracing::info_span!("basic_circuit", %block_number)) - .await, + ::process_job( + job, + object_store, + Some(max_circuits_in_flight), + started_at, ) + .instrument(tracing::info_span!("basic_circuit", %block_number)) + .await }) } diff --git a/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs b/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs index 9a94499ced3..017d157d56c 100644 --- a/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs +++ b/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs @@ -5,6 +5,7 @@ use std::{ time::Instant, }; +use async_trait::async_trait; use circuit_definitions::{ circuit_definitions::base_layer::{ZkSyncBaseLayerCircuit, ZkSyncBaseLayerStorage}, encodings::recursion_request::RecursionQueueSimulator, @@ -41,10 +42,9 @@ use zksync_types::{ basic_fri_types::AggregationRound, protocol_version::ProtocolSemanticVersion, L1BatchNumber, }; -use crate::node_aggregation::NodeAggregationWitnessGenerator; -use crate::witness_generator::WitnessGenerator; use crate::{ metrics::WITNESS_GENERATOR_METRICS, + node_aggregation::NodeAggregationWitnessGenerator, precalculated_merkle_paths_provider::PrecalculatedMerklePathsProvider, storage_oracle::StorageOracle, utils::{ @@ -52,6 +52,7 @@ use crate::{ ClosedFormInputWrapper, KZG_TRUSTED_SETUP_FILE, }, witness::WitnessStorage, + witness_generator::WitnessGenerator, }; mod artifacts; @@ -111,35 +112,6 @@ impl BasicWitnessGenerator { protocol_version, } } - - async fn process_job_impl( - object_store: Arc, - basic_job: BasicWitnessGeneratorJob, - started_at: Instant, - max_circuits_in_flight: usize, - ) -> Option { - let BasicWitnessGeneratorJob { - block_number, - data: job, - } = basic_job; - - tracing::info!( - "Starting witness generation of type {:?} for block {}", - AggregationRound::BasicCircuits, - block_number.0 - ); - - Some( - process_basic_circuits_job( - object_store, - started_at, - block_number, - job, - max_circuits_in_flight, - ) - .await, - ) - } } #[tracing::instrument(skip_all, fields(l1_batch = %block_number))] @@ -468,13 +440,37 @@ async fn generate_witness( ) } +#[async_trait] impl WitnessGenerator for BasicWitnessGenerator { - type Job = (); + type Job = BasicWitnessGeneratorJob; type Metadata = (); - type Artifacts = (); + type Artifacts = BasicCircuitArtifacts; - fn process_job(job: Self::Job, started_at: Instant) -> anyhow::Result { - todo!() + async fn process_job( + job: BasicWitnessGeneratorJob, + object_store: Arc<&dyn ObjectStore>, + max_circuits_in_flight: Option, + started_at: Instant, + ) -> anyhow::Result { + let BasicWitnessGeneratorJob { + block_number, + data: job, + } = job; + + tracing::info!( + "Starting witness generation of type {:?} for block {}", + AggregationRound::BasicCircuits, + block_number.0 + ); + + Ok(process_basic_circuits_job( + object_store, + started_at, + block_number, + job, + max_circuits_in_flight.unwrap(), + ) + .await) } fn prepare_job( diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs index a68f77eb7c9..1afb1d9131e 100644 --- a/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs @@ -14,6 +14,7 @@ use crate::{ LeafAggregationWitnessGeneratorJob, }, metrics::WITNESS_GENERATOR_METRICS, + witness_generator::WitnessGenerator, }; #[async_trait] @@ -63,7 +64,13 @@ impl JobProcessor for LeafAggregationWitnessGenerator { let object_store = self.object_store.clone(); let max_circuits_in_flight = self.config.max_circuits_in_flight; tokio::spawn(async move { - Ok(Self::process_job_impl(job, started_at, object_store, max_circuits_in_flight).await) + ::process_job( + job, + object_store, + Some(max_circuits_in_flight), + started_at, + ) + .await }) } diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation/mod.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation/mod.rs index a5da4f043a6..49ec9aaea78 100644 --- a/prover/crates/bin/witness_generator/src/leaf_aggregation/mod.rs +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation/mod.rs @@ -1,6 +1,7 @@ use std::{sync::Arc, time::Instant}; use anyhow::Context as _; +use async_trait::async_trait; use circuit_definitions::circuit_definitions::recursion_layer::base_circuit_type_into_recursive_leaf_circuit_type; use tokio::sync::Semaphore; use zkevm_test_harness::{ @@ -29,15 +30,15 @@ use zksync_types::{ prover_dal::LeafAggregationJobMetadata, L1BatchNumber, }; -use crate::node_aggregation::NodeAggregationWitnessGenerator; -use crate::witness_generator::WitnessGenerator; use crate::{ artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, + node_aggregation::NodeAggregationWitnessGenerator, utils::{ load_proofs_for_job_ids, save_recursive_layer_prover_input_artifacts, ClosedFormInputWrapper, }, + witness_generator::WitnessGenerator, }; mod artifacts; @@ -264,13 +265,35 @@ pub async fn process_leaf_aggregation_job( } } +#[async_trait] impl WitnessGenerator for LeafAggregationWitnessGenerator { - type Job = (); + type Job = LeafAggregationWitnessGeneratorJob; type Metadata = (); - type Artifacts = (); + type Artifacts = LeafAggregationArtifacts; - fn process_job(job: Self::Job, started_at: Instant) -> anyhow::Result { - todo!() + #[tracing::instrument( + skip_all, + fields(l1_batch = %job.block_number, circuit_id = %job.circuit_id) + )] + async fn process_job( + job: LeafAggregationWitnessGeneratorJob, + object_store: Arc, + max_circuits_in_flight: Option, + started_at: Instant, + ) -> anyhow::Result { + tracing::info!( + "Starting witness generation of type {:?} for block {} with circuit {}", + AggregationRound::LeafAggregation, + job.block_number.0, + job.circuit_id, + ); + Ok(process_leaf_aggregation_job( + started_at, + job, + object_store, + max_circuits_in_flight.unwrap(), + ) + .await) } fn prepare_job( diff --git a/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs b/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs index a0c22efc0ea..dbac8797f65 100644 --- a/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs @@ -14,6 +14,7 @@ use crate::{ prepare_job, NodeAggregationArtifacts, NodeAggregationWitnessGenerator, NodeAggregationWitnessGeneratorJob, }, + witness_generator::WitnessGenerator, }; #[async_trait] @@ -63,7 +64,13 @@ impl JobProcessor for NodeAggregationWitnessGenerator { let object_store = self.object_store.clone(); let max_circuits_in_flight = self.config.max_circuits_in_flight; tokio::spawn(async move { - Ok(Self::process_job_impl(job, started_at, object_store, max_circuits_in_flight).await) + ::process_job( + job, + object_store, + Some(max_circuits_in_flight), + started_at, + ) + .await }) } diff --git a/prover/crates/bin/witness_generator/src/node_aggregation/mod.rs b/prover/crates/bin/witness_generator/src/node_aggregation/mod.rs index 1434157285a..91a052fbec3 100644 --- a/prover/crates/bin/witness_generator/src/node_aggregation/mod.rs +++ b/prover/crates/bin/witness_generator/src/node_aggregation/mod.rs @@ -1,6 +1,7 @@ use std::{sync::Arc, time::Instant}; use anyhow::Context as _; +use async_trait::async_trait; use circuit_definitions::circuit_definitions::recursion_layer::RECURSION_ARITY; use tokio::sync::Semaphore; use zkevm_test_harness::witness::recursive_aggregation::{ @@ -26,12 +27,12 @@ use zksync_types::{ prover_dal::NodeAggregationJobMetadata, L1BatchNumber, }; -use crate::scheduler::SchedulerWitnessGenerator; -use crate::witness_generator::WitnessGenerator; use crate::{ artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, + scheduler::SchedulerWitnessGenerator, utils::{load_proofs_for_job_ids, save_recursive_layer_prover_input_artifacts}, + witness_generator::WitnessGenerator, }; mod artifacts; @@ -83,17 +84,64 @@ impl NodeAggregationWitnessGenerator { keystore, } } +} + +#[tracing::instrument( + skip_all, + fields(l1_batch = % metadata.block_number, circuit_id = % metadata.circuit_id) +)] +pub async fn prepare_job( + metadata: NodeAggregationJobMetadata, + object_store: &dyn ObjectStore, + keystore: Keystore, +) -> anyhow::Result { + let started_at = Instant::now(); + let artifacts = NodeAggregationWitnessGenerator::get_artifacts(&metadata, object_store).await?; + + WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::NodeAggregation.into()] + .observe(started_at.elapsed()); + + let started_at = Instant::now(); + let leaf_vk = keystore + .load_recursive_layer_verification_key(metadata.circuit_id) + .context("get_recursive_layer_vk_for_circuit_type")?; + let node_vk = keystore + .load_recursive_layer_verification_key( + ZkSyncRecursionLayerStorageType::NodeLayerCircuit as u8, + ) + .context("get_recursive_layer_vk_for_circuit_type()")?; + + WITNESS_GENERATOR_METRICS.prepare_job_time[&AggregationRound::NodeAggregation.into()] + .observe(started_at.elapsed()); + + Ok(NodeAggregationWitnessGeneratorJob { + circuit_id: metadata.circuit_id, + block_number: metadata.block_number, + depth: metadata.depth, + aggregations: artifacts.0, + proofs_ids: metadata.prover_job_ids_for_proofs, + leaf_vk, + node_vk, + all_leafs_layer_params: get_leaf_vk_params(&keystore).context("get_leaf_vk_params()")?, + }) +} + +#[async_trait] +impl WitnessGenerator for NodeAggregationWitnessGenerator { + type Job = NodeAggregationWitnessGeneratorJob; + type Metadata = (); + type Artifacts = NodeAggregationArtifacts; #[tracing::instrument( skip_all, fields(l1_batch = % job.block_number, circuit_id = % job.circuit_id) )] - pub async fn process_job_impl( + async fn process_job( job: NodeAggregationWitnessGeneratorJob, - started_at: Instant, object_store: Arc, - max_circuits_in_flight: usize, - ) -> NodeAggregationArtifacts { + max_circuits_in_flight: Option, + started_at: Instant, + ) -> anyhow::Result { let node_vk_commitment = compute_node_vk_commitment(job.node_vk.clone()); tracing::info!( "Starting witness generation of type {:?} for block {} circuit id {} depth {}", @@ -119,7 +167,7 @@ impl NodeAggregationWitnessGenerator { proofs_ids.len() ); - let semaphore = Arc::new(Semaphore::new(max_circuits_in_flight)); + let semaphore = Arc::new(Semaphore::new(max_circuits_in_flight.unwrap())); let mut handles = vec![]; for (circuit_idx, (chunk, proofs_ids_for_chunk)) in job @@ -207,63 +255,13 @@ impl NodeAggregationWitnessGenerator { started_at.elapsed(), ); - NodeAggregationArtifacts { + Ok(NodeAggregationArtifacts { circuit_id: job.circuit_id, block_number: job.block_number, depth: job.depth + 1, next_aggregations, recursive_circuit_ids_and_urls, - } - } -} - -#[tracing::instrument( - skip_all, - fields(l1_batch = % metadata.block_number, circuit_id = % metadata.circuit_id) -)] -pub async fn prepare_job( - metadata: NodeAggregationJobMetadata, - object_store: &dyn ObjectStore, - keystore: Keystore, -) -> anyhow::Result { - let started_at = Instant::now(); - let artifacts = NodeAggregationWitnessGenerator::get_artifacts(&metadata, object_store).await?; - - WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::NodeAggregation.into()] - .observe(started_at.elapsed()); - - let started_at = Instant::now(); - let leaf_vk = keystore - .load_recursive_layer_verification_key(metadata.circuit_id) - .context("get_recursive_layer_vk_for_circuit_type")?; - let node_vk = keystore - .load_recursive_layer_verification_key( - ZkSyncRecursionLayerStorageType::NodeLayerCircuit as u8, - ) - .context("get_recursive_layer_vk_for_circuit_type()")?; - - WITNESS_GENERATOR_METRICS.prepare_job_time[&AggregationRound::NodeAggregation.into()] - .observe(started_at.elapsed()); - - Ok(NodeAggregationWitnessGeneratorJob { - circuit_id: metadata.circuit_id, - block_number: metadata.block_number, - depth: metadata.depth, - aggregations: artifacts.0, - proofs_ids: metadata.prover_job_ids_for_proofs, - leaf_vk, - node_vk, - all_leafs_layer_params: get_leaf_vk_params(&keystore).context("get_leaf_vk_params()")?, - }) -} - -impl WitnessGenerator for NodeAggregationWitnessGenerator { - type Job = (); - type Metadata = (); - type Artifacts = (); - - fn process_job(job: Self::Job, started_at: Instant) -> anyhow::Result { - todo!() + }) } fn prepare_job( diff --git a/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs b/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs index fe8d2931ce2..1aae42ac723 100644 --- a/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs @@ -14,6 +14,7 @@ use crate::{ prepare_job, RecursionTipArtifacts, RecursionTipWitnessGenerator, RecursionTipWitnessGeneratorJob, }, + witness_generator::WitnessGenerator, }; #[async_trait] @@ -77,7 +78,10 @@ impl JobProcessor for RecursionTipWitnessGenerator { job: RecursionTipWitnessGeneratorJob, started_at: Instant, ) -> tokio::task::JoinHandle> { - tokio::task::spawn_blocking(move || Ok(Self::process_job_sync(job, started_at))) + let object_store = self.object_store.clone(); + tokio::task::spawn_blocking(async move || { + ::process_job(job, object_store, None, started_at).await + }) } #[tracing::instrument( diff --git a/prover/crates/bin/witness_generator/src/recursion_tip/mod.rs b/prover/crates/bin/witness_generator/src/recursion_tip/mod.rs index c6410820a60..0ffca93f04f 100644 --- a/prover/crates/bin/witness_generator/src/recursion_tip/mod.rs +++ b/prover/crates/bin/witness_generator/src/recursion_tip/mod.rs @@ -1,6 +1,7 @@ use std::{sync::Arc, time::Instant}; use anyhow::Context; +use async_trait::async_trait; use circuit_definitions::{ circuit_definitions::recursion_layer::{ recursion_tip::RecursionTipCircuit, ZkSyncRecursionLayerStorageType, @@ -43,10 +44,10 @@ use zksync_types::{ basic_fri_types::AggregationRound, protocol_version::ProtocolSemanticVersion, L1BatchNumber, }; -use crate::scheduler::SchedulerWitnessGenerator; -use crate::witness_generator::WitnessGenerator; use crate::{ - artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, utils::ClosedFormInputWrapper, + artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, + scheduler::SchedulerWitnessGenerator, utils::ClosedFormInputWrapper, + witness_generator::WitnessGenerator, }; mod artifacts; @@ -93,48 +94,6 @@ impl RecursionTipWitnessGenerator { keystore, } } - - #[tracing::instrument( - skip_all, - fields(l1_batch = %job.block_number) - )] - pub fn process_job_sync( - job: RecursionTipWitnessGeneratorJob, - started_at: Instant, - ) -> RecursionTipArtifacts { - tracing::info!( - "Starting fri witness generation of type {:?} for block {}", - AggregationRound::RecursionTip, - job.block_number.0 - ); - let config = RecursionTipConfig { - proof_config: recursion_layer_proof_config(), - vk_fixed_parameters: job.node_vk.clone().into_inner().fixed_parameters, - _marker: std::marker::PhantomData, - }; - - let recursive_tip_circuit = RecursionTipCircuit { - witness: job.recursion_tip_witness, - config, - transcript_params: (), - _marker: std::marker::PhantomData, - }; - - WITNESS_GENERATOR_METRICS.witness_generation_time[&AggregationRound::RecursionTip.into()] - .observe(started_at.elapsed()); - - tracing::info!( - "Recursion tip generation for block {} is complete in {:?}", - job.block_number.0, - started_at.elapsed() - ); - - RecursionTipArtifacts { - recursion_tip_circuit: ZkSyncRecursiveLayerCircuit::RecursionTipCircuit( - recursive_tip_circuit, - ), - } - } } #[tracing::instrument( @@ -227,13 +186,54 @@ pub async fn prepare_job( }) } +#[async_trait] impl WitnessGenerator for RecursionTipWitnessGenerator { - type Job = (); + type Job = RecursionTipWitnessGeneratorJob; type Metadata = (); - type Artifacts = (); + type Artifacts = RecursionTipArtifacts; - fn process_job(job: Self::Job, started_at: Instant) -> anyhow::Result { - todo!() + #[tracing::instrument( + skip_all, + fields(l1_batch = %job.block_number) + )] + async fn process_job( + job: Self::Job, + _object_store: Arc, + _max_circuits_in_flight: Option, + started_at: Instant, + ) -> anyhow::Result { + tracing::info!( + "Starting fri witness generation of type {:?} for block {}", + AggregationRound::RecursionTip, + job.block_number.0 + ); + let config = RecursionTipConfig { + proof_config: recursion_layer_proof_config(), + vk_fixed_parameters: job.node_vk.clone().into_inner().fixed_parameters, + _marker: std::marker::PhantomData, + }; + + let recursive_tip_circuit = RecursionTipCircuit { + witness: job.recursion_tip_witness, + config, + transcript_params: (), + _marker: std::marker::PhantomData, + }; + + WITNESS_GENERATOR_METRICS.witness_generation_time[&AggregationRound::RecursionTip.into()] + .observe(started_at.elapsed()); + + tracing::info!( + "Recursion tip generation for block {} is complete in {:?}", + job.block_number.0, + started_at.elapsed() + ); + + Ok(RecursionTipArtifacts { + recursion_tip_circuit: ZkSyncRecursiveLayerCircuit::RecursionTipCircuit( + recursive_tip_circuit, + ), + }) } fn prepare_job( diff --git a/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs b/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs index 01f58d3c697..55b5a3dae52 100644 --- a/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs @@ -13,6 +13,7 @@ use crate::{ scheduler::{ prepare_job, SchedulerArtifacts, SchedulerWitnessGenerator, SchedulerWitnessGeneratorJob, }, + witness_generator::WitnessGenerator, }; #[async_trait] @@ -72,10 +73,11 @@ impl JobProcessor for SchedulerWitnessGenerator { job: SchedulerWitnessGeneratorJob, started_at: Instant, ) -> tokio::task::JoinHandle> { - tokio::task::spawn_blocking(move || { + let object_store = self.object_store.clone(); + tokio::task::spawn_blocking(async move || { let block_number = job.block_number; let _span = tracing::info_span!("scheduler", %block_number).entered(); - Ok(Self::process_job_sync(job, started_at)) + ::process_job(job, object_store, None, started_at).await }) } diff --git a/prover/crates/bin/witness_generator/src/scheduler/mod.rs b/prover/crates/bin/witness_generator/src/scheduler/mod.rs index dc51c25f4b3..a3ea03c022b 100644 --- a/prover/crates/bin/witness_generator/src/scheduler/mod.rs +++ b/prover/crates/bin/witness_generator/src/scheduler/mod.rs @@ -1,6 +1,7 @@ use std::{convert::TryInto, sync::Arc, time::Instant}; use anyhow::Context as _; +use async_trait::async_trait; use zkevm_test_harness::zkevm_circuits::recursion::{ leaf_layer::input::RecursionLeafParametersWitness, NUM_BASE_LAYER_CIRCUITS, }; @@ -27,10 +28,9 @@ use zksync_types::{ basic_fri_types::AggregationRound, protocol_version::ProtocolSemanticVersion, L1BatchNumber, }; -use crate::witness_generator::WitnessGenerator; use crate::{ artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, - utils::SchedulerPartialInputWrapper, + utils::SchedulerPartialInputWrapper, witness_generator::WitnessGenerator, }; mod artifacts; @@ -80,49 +80,6 @@ impl SchedulerWitnessGenerator { keystore, } } - - #[tracing::instrument( - skip_all, - fields(l1_batch = %job.block_number) - )] - pub fn process_job_sync( - job: SchedulerWitnessGeneratorJob, - started_at: Instant, - ) -> SchedulerArtifacts { - tracing::info!( - "Starting fri witness generation of type {:?} for block {}", - AggregationRound::Scheduler, - job.block_number.0 - ); - let config = SchedulerConfig { - proof_config: recursion_layer_proof_config(), - vk_fixed_parameters: job.recursion_tip_vk.clone().into_inner().fixed_parameters, - capacity: SCHEDULER_CAPACITY, - _marker: std::marker::PhantomData, - recursion_tip_vk: job.recursion_tip_vk.into_inner(), - node_layer_vk: job.node_vk.into_inner(), - leaf_layer_parameters: job.leaf_layer_parameters, - }; - - let scheduler_circuit = SchedulerCircuit { - witness: job.scheduler_witness, - config, - transcript_params: (), - _marker: std::marker::PhantomData, - }; - WITNESS_GENERATOR_METRICS.witness_generation_time[&AggregationRound::Scheduler.into()] - .observe(started_at.elapsed()); - - tracing::info!( - "Scheduler generation for block {} is complete in {:?}", - job.block_number.0, - started_at.elapsed() - ); - - SchedulerArtifacts { - scheduler_circuit: ZkSyncRecursiveLayerCircuit::SchedulerCircuit(scheduler_circuit), - } - } } #[tracing::instrument( @@ -183,13 +140,55 @@ pub async fn prepare_job( }) } +#[async_trait] impl WitnessGenerator for SchedulerWitnessGenerator { - type Job = (); + type Job = SchedulerWitnessGeneratorJob; type Metadata = (); - type Artifacts = (); + type Artifacts = SchedulerArtifacts; - fn process_job(job: Self::Job, started_at: Instant) -> anyhow::Result { - todo!() + #[tracing::instrument( + skip_all, + fields(l1_batch = %job.block_number) + )] + async fn process_job( + job: SchedulerWitnessGeneratorJob, + _object_store: Arc, + _max_circuits_in_flight: Option, + started_at: Instant, + ) -> anyhow::Result { + tracing::info!( + "Starting fri witness generation of type {:?} for block {}", + AggregationRound::Scheduler, + job.block_number.0 + ); + let config = SchedulerConfig { + proof_config: recursion_layer_proof_config(), + vk_fixed_parameters: job.recursion_tip_vk.clone().into_inner().fixed_parameters, + capacity: SCHEDULER_CAPACITY, + _marker: std::marker::PhantomData, + recursion_tip_vk: job.recursion_tip_vk.into_inner(), + node_layer_vk: job.node_vk.into_inner(), + leaf_layer_parameters: job.leaf_layer_parameters, + }; + + let scheduler_circuit = SchedulerCircuit { + witness: job.scheduler_witness, + config, + transcript_params: (), + _marker: std::marker::PhantomData, + }; + WITNESS_GENERATOR_METRICS.witness_generation_time[&AggregationRound::Scheduler.into()] + .observe(started_at.elapsed()); + + tracing::info!( + "Scheduler generation for block {} is complete in {:?}", + job.block_number.0, + started_at.elapsed() + ); + + Ok(SchedulerArtifacts { + scheduler_circuit: ZkSyncRecursiveLayerCircuit::SchedulerCircuit(scheduler_circuit), + }) } fn prepare_job( diff --git a/prover/crates/bin/witness_generator/src/witness_generator.rs b/prover/crates/bin/witness_generator/src/witness_generator.rs index 4d170f33d51..aa313eb08fe 100644 --- a/prover/crates/bin/witness_generator/src/witness_generator.rs +++ b/prover/crates/bin/witness_generator/src/witness_generator.rs @@ -1,14 +1,23 @@ -use std::time::Instant; +use std::{sync::Arc, time::Instant}; + +use async_trait::async_trait; use zksync_object_store::ObjectStore; use zksync_prover_keystore::keystore::Keystore; use zksync_queued_job_processor::JobProcessor; +#[async_trait] pub trait WitnessGenerator { type Job: Send + 'static; type Metadata; type Artifacts; - fn process_job(job: Self::Job, started_at: Instant) -> anyhow::Result; + async fn process_job( + job: Self::Job, + object_store: Arc<&dyn ObjectStore>, + max_circuits_in_flight: Option, + started_at: Instant, + ) -> anyhow::Result; + fn prepare_job( metadata: Self::Metadata, object_store: &dyn ObjectStore, From ccf36ba542f187c9cad8ae4ce084cca60852f620 Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Tue, 17 Sep 2024 17:00:34 +0300 Subject: [PATCH 04/12] fix some things --- .../bin/witness_generator/src/leaf_aggregation/artifacts.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs index a3821287dfe..cebda362c79 100644 --- a/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs @@ -7,12 +7,11 @@ use zksync_prover_fri_types::keys::{AggregationsKey, ClosedFormInputKey}; use zksync_prover_fri_utils::get_recursive_layer_circuit_id_for_base_layer; use zksync_types::{basic_fri_types::AggregationRound, prover_dal::LeafAggregationJobMetadata}; -use crate::utils::AggregationWrapper; use crate::{ artifacts::{AggregationBlobUrls, ArtifactsManager, BlobUrls}, leaf_aggregation::{LeafAggregationArtifacts, LeafAggregationWitnessGenerator}, metrics::WITNESS_GENERATOR_METRICS, - utils::{save_node_aggregations_artifacts, ClosedFormInputWrapper}, + utils::{AggregationWrapper, ClosedFormInputWrapper}, }; #[async_trait] From 39bdd672c3fc120f43f56015379dbd044e08b97a Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Tue, 17 Sep 2024 18:05:39 +0300 Subject: [PATCH 05/12] impl prepare job --- .../src/basic_circuits/job_processor.rs | 28 ++- .../src/basic_circuits/mod.rs | 23 ++- .../src/leaf_aggregation/artifacts.rs | 6 +- .../src/leaf_aggregation/job_processor.rs | 12 +- .../src/leaf_aggregation/mod.rs | 111 ++++------ .../src/node_aggregation/artifacts.rs | 8 +- .../src/node_aggregation/job_processor.rs | 12 +- .../src/node_aggregation/mod.rs | 85 ++++---- .../src/recursion_tip/job_processor.rs | 12 +- .../src/recursion_tip/mod.rs | 194 +++++++++--------- .../src/scheduler/job_processor.rs | 13 +- .../witness_generator/src/scheduler/mod.rs | 126 ++++++------ .../src/witness_generator.rs | 9 +- 13 files changed, 308 insertions(+), 331 deletions(-) diff --git a/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs b/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs index 635776016ab..3e856bb478b 100644 --- a/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs @@ -4,6 +4,7 @@ use anyhow::Context as _; use tracing::Instrument; use zksync_prover_dal::ProverDal; use zksync_prover_fri_types::{get_current_pod_name, AuxOutputWitnessWrapper}; +use zksync_prover_keystore::keystore::Keystore; use zksync_queued_job_processor::{async_trait, JobProcessor}; use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber}; @@ -19,7 +20,7 @@ impl JobProcessor for BasicWitnessGenerator { type Job = BasicWitnessGeneratorJob; type JobId = L1BatchNumber; // The artifact is optional to support skipping blocks when sampling is enabled. - type JobArtifacts = BasicCircuitArtifacts; + type JobArtifacts = Option; const SERVICE_NAME: &'static str = "fri_basic_circuit_witness_generator"; @@ -36,19 +37,15 @@ impl JobProcessor for BasicWitnessGenerator { ) .await { - Some(block_number) => { - tracing::info!( - "Processing FRI basic witness-gen for block {}", - block_number - ); - let started_at = Instant::now(); - let job = Self::get_artifacts(&block_number, &*self.object_store).await?; - - WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::BasicCircuits.into()] - .observe(started_at.elapsed()); - - Ok(Some((block_number, job))) - } + Some(block_number) => Ok(Some(( + block_number, + ::prepare_job( + block_number, + &*self.object_store, + Keystore::locate(), // todo: this should be removed + ) + .await?, + ))), None => Ok(None), } } @@ -69,7 +66,7 @@ impl JobProcessor for BasicWitnessGenerator { _job_id: &Self::JobId, job: BasicWitnessGeneratorJob, started_at: Instant, - ) -> tokio::task::JoinHandle> { + ) -> tokio::task::JoinHandle>> { let object_store = Arc::clone(&self.object_store); let max_circuits_in_flight = self.config.max_circuits_in_flight; tokio::spawn(async move { @@ -82,6 +79,7 @@ impl JobProcessor for BasicWitnessGenerator { ) .instrument(tracing::info_span!("basic_circuit", %block_number)) .await + .map(Some) }) } diff --git a/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs b/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs index 017d157d56c..6b6dec7a920 100644 --- a/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs +++ b/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs @@ -43,8 +43,8 @@ use zksync_types::{ }; use crate::{ + artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, - node_aggregation::NodeAggregationWitnessGenerator, precalculated_merkle_paths_provider::PrecalculatedMerklePathsProvider, storage_oracle::StorageOracle, utils::{ @@ -443,12 +443,12 @@ async fn generate_witness( #[async_trait] impl WitnessGenerator for BasicWitnessGenerator { type Job = BasicWitnessGeneratorJob; - type Metadata = (); + type Metadata = L1BatchNumber; type Artifacts = BasicCircuitArtifacts; async fn process_job( job: BasicWitnessGeneratorJob, - object_store: Arc<&dyn ObjectStore>, + object_store: Arc, max_circuits_in_flight: Option, started_at: Instant, ) -> anyhow::Result { @@ -473,11 +473,18 @@ impl WitnessGenerator for BasicWitnessGenerator { .await) } - fn prepare_job( - metadata: Self::Metadata, + async fn prepare_job( + metadata: L1BatchNumber, object_store: &dyn ObjectStore, - keystore: Option, - ) -> Self::Job { - todo!() + _keystore: Keystore, + ) -> anyhow::Result { + tracing::info!("Processing FRI basic witness-gen for block {}", metadata.0); + let started_at = Instant::now(); + let job = Self::get_artifacts(&metadata, object_store).await?; + + WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::BasicCircuits.into()] + .observe(started_at.elapsed()); + + Ok(job) } } diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs index cebda362c79..a43b8bc2580 100644 --- a/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs @@ -48,9 +48,9 @@ impl ArtifactsManager for LeafAggregationWitnessGenerator { ) -> BlobUrls { let started_at = Instant::now(); let key = AggregationsKey { - block_number, - circuit_id, - depth, + block_number: artifacts.block_number, + circuit_id: get_recursive_layer_circuit_id_for_base_layer(artifacts.circuit_id), + depth: 0, }; let aggregation_urls = object_store .put(key, &AggregationWrapper(artifacts.aggregations)) diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs index 1afb1d9131e..416d62c49a3 100644 --- a/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs @@ -10,7 +10,7 @@ use zksync_types::basic_fri_types::AggregationRound; use crate::{ artifacts::ArtifactsManager, leaf_aggregation::{ - prepare_leaf_aggregation_job, LeafAggregationArtifacts, LeafAggregationWitnessGenerator, + LeafAggregationArtifacts, LeafAggregationWitnessGenerator, LeafAggregationWitnessGeneratorJob, }, metrics::WITNESS_GENERATOR_METRICS, @@ -38,9 +38,13 @@ impl JobProcessor for LeafAggregationWitnessGenerator { tracing::info!("Processing leaf aggregation job {:?}", metadata.id); Ok(Some(( metadata.id, - prepare_leaf_aggregation_job(metadata, &*self.object_store, self.keystore.clone()) - .await - .context("prepare_leaf_aggregation_job()")?, + ::prepare_job( + metadata, + &*self.object_store, + self.keystore.clone(), + ) + .await + .context("prepare_leaf_aggregation_job()")?, ))) } diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation/mod.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation/mod.rs index 49ec9aaea78..960843259c3 100644 --- a/prover/crates/bin/witness_generator/src/leaf_aggregation/mod.rs +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation/mod.rs @@ -33,7 +33,6 @@ use zksync_types::{ use crate::{ artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, - node_aggregation::NodeAggregationWitnessGenerator, utils::{ load_proofs_for_job_ids, save_recursive_layer_prover_input_artifacts, ClosedFormInputWrapper, @@ -88,69 +87,6 @@ impl LeafAggregationWitnessGenerator { keystore, } } - - #[tracing::instrument( - skip_all, - fields(l1_batch = %leaf_job.block_number, circuit_id = %leaf_job.circuit_id) - )] - pub async fn process_job_impl( - leaf_job: LeafAggregationWitnessGeneratorJob, - started_at: Instant, - object_store: Arc, - max_circuits_in_flight: usize, - ) -> LeafAggregationArtifacts { - tracing::info!( - "Starting witness generation of type {:?} for block {} with circuit {}", - AggregationRound::LeafAggregation, - leaf_job.block_number.0, - leaf_job.circuit_id, - ); - process_leaf_aggregation_job(started_at, leaf_job, object_store, max_circuits_in_flight) - .await - } -} - -#[tracing::instrument( - skip_all, - fields(l1_batch = %metadata.block_number, circuit_id = %metadata.circuit_id) -)] -pub async fn prepare_leaf_aggregation_job( - metadata: LeafAggregationJobMetadata, - object_store: &dyn ObjectStore, - keystore: Keystore, -) -> anyhow::Result { - let started_at = Instant::now(); - let closed_form_input = - LeafAggregationWitnessGenerator::get_artifacts(&metadata, object_store).await?; - - WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::LeafAggregation.into()] - .observe(started_at.elapsed()); - - let started_at = Instant::now(); - let base_vk = keystore - .load_base_layer_verification_key(metadata.circuit_id) - .context("get_base_layer_vk_for_circuit_type()")?; - - let leaf_circuit_id = base_circuit_type_into_recursive_leaf_circuit_type( - BaseLayerCircuitType::from_numeric_value(metadata.circuit_id), - ) as u8; - - let leaf_vk = keystore - .load_recursive_layer_verification_key(leaf_circuit_id) - .context("get_recursive_layer_vk_for_circuit_type()")?; - let leaf_params = compute_leaf_params(metadata.circuit_id, base_vk.clone(), leaf_vk); - - WITNESS_GENERATOR_METRICS.prepare_job_time[&AggregationRound::LeafAggregation.into()] - .observe(started_at.elapsed()); - - Ok(LeafAggregationWitnessGeneratorJob { - circuit_id: metadata.circuit_id, - block_number: metadata.block_number, - closed_form_inputs: closed_form_input, - proofs_ids: metadata.prover_job_ids_for_proofs, - base_vk, - leaf_params, - }) } #[tracing::instrument( @@ -268,7 +204,7 @@ pub async fn process_leaf_aggregation_job( #[async_trait] impl WitnessGenerator for LeafAggregationWitnessGenerator { type Job = LeafAggregationWitnessGeneratorJob; - type Metadata = (); + type Metadata = LeafAggregationJobMetadata; type Artifacts = LeafAggregationArtifacts; #[tracing::instrument( @@ -296,11 +232,46 @@ impl WitnessGenerator for LeafAggregationWitnessGenerator { .await) } - fn prepare_job( - metadata: Self::Metadata, + #[tracing::instrument( + skip_all, + fields(l1_batch = %metadata.block_number, circuit_id = %metadata.circuit_id) + )] + async fn prepare_job( + metadata: LeafAggregationJobMetadata, object_store: &dyn ObjectStore, - keystore: Option, - ) -> Self::Job { - todo!() + keystore: Keystore, + ) -> anyhow::Result { + let started_at = Instant::now(); + let closed_form_input = + LeafAggregationWitnessGenerator::get_artifacts(&metadata, object_store).await?; + + WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::LeafAggregation.into()] + .observe(started_at.elapsed()); + + let started_at = Instant::now(); + let base_vk = keystore + .load_base_layer_verification_key(metadata.circuit_id) + .context("get_base_layer_vk_for_circuit_type()")?; + + let leaf_circuit_id = base_circuit_type_into_recursive_leaf_circuit_type( + BaseLayerCircuitType::from_numeric_value(metadata.circuit_id), + ) as u8; + + let leaf_vk = keystore + .load_recursive_layer_verification_key(leaf_circuit_id) + .context("get_recursive_layer_vk_for_circuit_type()")?; + let leaf_params = compute_leaf_params(metadata.circuit_id, base_vk.clone(), leaf_vk); + + WITNESS_GENERATOR_METRICS.prepare_job_time[&AggregationRound::LeafAggregation.into()] + .observe(started_at.elapsed()); + + Ok(LeafAggregationWitnessGeneratorJob { + circuit_id: metadata.circuit_id, + block_number: metadata.block_number, + closed_form_inputs: closed_form_input, + proofs_ids: metadata.prover_job_ids_for_proofs, + base_vk, + leaf_params, + }) } } diff --git a/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs b/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs index d70a2132771..275e7cccb39 100644 --- a/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs @@ -10,7 +10,7 @@ use crate::{ artifacts::{AggregationBlobUrls, ArtifactsManager, BlobUrls}, metrics::WITNESS_GENERATOR_METRICS, node_aggregation::{NodeAggregationArtifacts, NodeAggregationWitnessGenerator}, - utils::{save_node_aggregations_artifacts, AggregationWrapper}, + utils::AggregationWrapper, }; #[async_trait] @@ -53,9 +53,9 @@ impl ArtifactsManager for NodeAggregationWitnessGenerator { ) -> BlobUrls { let started_at = Instant::now(); let key = AggregationsKey { - block_number, - circuit_id, - depth, + block_number: artifacts.block_number, + circuit_id: artifacts.circuit_id, + depth: artifacts.depth, }; let aggregation_urls = object_store .put(key, &AggregationWrapper(artifacts.next_aggregations)) diff --git a/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs b/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs index dbac8797f65..05d0361f9c9 100644 --- a/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs @@ -11,7 +11,7 @@ use crate::{ artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, node_aggregation::{ - prepare_job, NodeAggregationArtifacts, NodeAggregationWitnessGenerator, + NodeAggregationArtifacts, NodeAggregationWitnessGenerator, NodeAggregationWitnessGeneratorJob, }, witness_generator::WitnessGenerator, @@ -38,9 +38,13 @@ impl JobProcessor for NodeAggregationWitnessGenerator { tracing::info!("Processing node aggregation job {:?}", metadata.id); Ok(Some(( metadata.id, - prepare_job(metadata, &*self.object_store, self.keystore.clone()) - .await - .context("prepare_job()")?, + ::prepare_job( + metadata, + &*self.object_store, + self.keystore.clone(), + ) + .await + .context("prepare_job()")?, ))) } diff --git a/prover/crates/bin/witness_generator/src/node_aggregation/mod.rs b/prover/crates/bin/witness_generator/src/node_aggregation/mod.rs index 91a052fbec3..f2c9a6fb891 100644 --- a/prover/crates/bin/witness_generator/src/node_aggregation/mod.rs +++ b/prover/crates/bin/witness_generator/src/node_aggregation/mod.rs @@ -30,7 +30,6 @@ use zksync_types::{ use crate::{ artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, - scheduler::SchedulerWitnessGenerator, utils::{load_proofs_for_job_ids, save_recursive_layer_prover_input_artifacts}, witness_generator::WitnessGenerator, }; @@ -86,50 +85,10 @@ impl NodeAggregationWitnessGenerator { } } -#[tracing::instrument( - skip_all, - fields(l1_batch = % metadata.block_number, circuit_id = % metadata.circuit_id) -)] -pub async fn prepare_job( - metadata: NodeAggregationJobMetadata, - object_store: &dyn ObjectStore, - keystore: Keystore, -) -> anyhow::Result { - let started_at = Instant::now(); - let artifacts = NodeAggregationWitnessGenerator::get_artifacts(&metadata, object_store).await?; - - WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::NodeAggregation.into()] - .observe(started_at.elapsed()); - - let started_at = Instant::now(); - let leaf_vk = keystore - .load_recursive_layer_verification_key(metadata.circuit_id) - .context("get_recursive_layer_vk_for_circuit_type")?; - let node_vk = keystore - .load_recursive_layer_verification_key( - ZkSyncRecursionLayerStorageType::NodeLayerCircuit as u8, - ) - .context("get_recursive_layer_vk_for_circuit_type()")?; - - WITNESS_GENERATOR_METRICS.prepare_job_time[&AggregationRound::NodeAggregation.into()] - .observe(started_at.elapsed()); - - Ok(NodeAggregationWitnessGeneratorJob { - circuit_id: metadata.circuit_id, - block_number: metadata.block_number, - depth: metadata.depth, - aggregations: artifacts.0, - proofs_ids: metadata.prover_job_ids_for_proofs, - leaf_vk, - node_vk, - all_leafs_layer_params: get_leaf_vk_params(&keystore).context("get_leaf_vk_params()")?, - }) -} - #[async_trait] impl WitnessGenerator for NodeAggregationWitnessGenerator { type Job = NodeAggregationWitnessGeneratorJob; - type Metadata = (); + type Metadata = NodeAggregationJobMetadata; type Artifacts = NodeAggregationArtifacts; #[tracing::instrument( @@ -264,11 +223,45 @@ impl WitnessGenerator for NodeAggregationWitnessGenerator { }) } - fn prepare_job( + #[tracing::instrument( + skip_all, + fields(l1_batch = % metadata.block_number, circuit_id = % metadata.circuit_id) + )] + async fn prepare_job( metadata: Self::Metadata, object_store: &dyn ObjectStore, - keystore: Option, - ) -> Self::Job { - todo!() + keystore: Keystore, + ) -> anyhow::Result { + let started_at = Instant::now(); + let artifacts = + NodeAggregationWitnessGenerator::get_artifacts(&metadata, object_store).await?; + + WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::NodeAggregation.into()] + .observe(started_at.elapsed()); + + let started_at = Instant::now(); + let leaf_vk = keystore + .load_recursive_layer_verification_key(metadata.circuit_id) + .context("get_recursive_layer_vk_for_circuit_type")?; + let node_vk = keystore + .load_recursive_layer_verification_key( + ZkSyncRecursionLayerStorageType::NodeLayerCircuit as u8, + ) + .context("get_recursive_layer_vk_for_circuit_type()")?; + + WITNESS_GENERATOR_METRICS.prepare_job_time[&AggregationRound::NodeAggregation.into()] + .observe(started_at.elapsed()); + + Ok(NodeAggregationWitnessGeneratorJob { + circuit_id: metadata.circuit_id, + block_number: metadata.block_number, + depth: metadata.depth, + aggregations: artifacts.0, + proofs_ids: metadata.prover_job_ids_for_proofs, + leaf_vk, + node_vk, + all_leafs_layer_params: get_leaf_vk_params(&keystore) + .context("get_leaf_vk_params()")?, + }) } } diff --git a/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs b/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs index 1aae42ac723..accd12e3f31 100644 --- a/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs @@ -11,7 +11,7 @@ use crate::{ artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, recursion_tip::{ - prepare_job, RecursionTipArtifacts, RecursionTipWitnessGenerator, + RecursionTipArtifacts, RecursionTipJobMetadata, RecursionTipWitnessGenerator, RecursionTipWitnessGeneratorJob, }, witness_generator::WitnessGenerator, @@ -50,9 +50,11 @@ impl JobProcessor for RecursionTipWitnessGenerator { Ok(Some(( l1_batch_number, - prepare_job( - l1_batch_number, - final_node_proof_job_ids, + ::prepare_job( + RecursionTipJobMetadata { + l1_batch_number, + final_node_proof_job_ids, + }, &*self.object_store, self.keystore.clone(), ) @@ -79,7 +81,7 @@ impl JobProcessor for RecursionTipWitnessGenerator { started_at: Instant, ) -> tokio::task::JoinHandle> { let object_store = self.object_store.clone(); - tokio::task::spawn_blocking(async move || { + tokio::spawn(async move || { ::process_job(job, object_store, None, started_at).await }) } diff --git a/prover/crates/bin/witness_generator/src/recursion_tip/mod.rs b/prover/crates/bin/witness_generator/src/recursion_tip/mod.rs index 0ffca93f04f..40abb756c8a 100644 --- a/prover/crates/bin/witness_generator/src/recursion_tip/mod.rs +++ b/prover/crates/bin/witness_generator/src/recursion_tip/mod.rs @@ -45,8 +45,7 @@ use zksync_types::{ }; use crate::{ - artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, - scheduler::SchedulerWitnessGenerator, utils::ClosedFormInputWrapper, + artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, utils::ClosedFormInputWrapper, witness_generator::WitnessGenerator, }; @@ -69,6 +68,11 @@ pub struct RecursionTipArtifacts { pub recursion_tip_circuit: ZkSyncRecursiveLayerCircuit, } +pub struct RecursionTipJobMetadata { + pub l1_batch_number: L1BatchNumber, + pub final_node_proof_job_ids: Vec<(u8, u32)>, +} + #[derive(Debug)] pub struct RecursionTipWitnessGenerator { config: FriWitnessGeneratorConfig, @@ -96,100 +100,10 @@ impl RecursionTipWitnessGenerator { } } -#[tracing::instrument( - skip_all, - fields(l1_batch = %l1_batch_number) -)] -pub async fn prepare_job( - l1_batch_number: L1BatchNumber, - final_node_proof_job_ids: Vec<(u8, u32)>, - object_store: &dyn ObjectStore, - keystore: Keystore, -) -> anyhow::Result { - let started_at = Instant::now(); - let recursion_tip_proofs = - RecursionTipWitnessGenerator::get_artifacts(&final_node_proof_job_ids, object_store) - .await?; - WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::RecursionTip.into()] - .observe(started_at.elapsed()); - - let node_vk = keystore - .load_recursive_layer_verification_key( - ZkSyncRecursionLayerStorageType::NodeLayerCircuit as u8, - ) - .context("get_recursive_layer_vk_for_circuit_type()")?; - - let node_layer_vk_commitment = compute_node_vk_commitment(node_vk.clone()); - - let mut recursion_queues = vec![]; - for circuit_id in BaseLayerCircuitType::as_iter_u8() { - let key = ClosedFormInputKey { - block_number: l1_batch_number, - circuit_id, - }; - let ClosedFormInputWrapper(_, recursion_queue) = object_store.get(key).await?; - recursion_queues.push((circuit_id, recursion_queue)); - } - - // RECURSION_TIP_ARITY is the maximum amount of proof that a single recursion tip can support. - // Given recursion_tip has at most 1 proof per circuit, it implies we can't add more circuit types without bumping arity up. - assert!( - RECURSION_TIP_ARITY >= recursion_queues.len(), - "recursion tip received more circuits ({}) than supported ({})", - recursion_queues.len(), - RECURSION_TIP_ARITY - ); - let mut branch_circuit_type_set = [GoldilocksField::ZERO; RECURSION_TIP_ARITY]; - let mut queue_set: [_; RECURSION_TIP_ARITY] = - std::array::from_fn(|_| QueueState::placeholder_witness()); - - for (index, (circuit_id, recursion_queue)) in recursion_queues.iter().enumerate() { - branch_circuit_type_set[index] = GoldilocksField::from_u64_unchecked(*circuit_id as u64); - queue_set[index] = take_sponge_like_queue_state_from_simulator(recursion_queue); - } - - let leaf_vk_commits = get_leaf_vk_params(&keystore).context("get_leaf_vk_params()")?; - assert_eq!( - leaf_vk_commits.len(), - 16, - "expected 16 leaf vk commits, which corresponds to the numebr of circuits, got {}", - leaf_vk_commits.len() - ); - let leaf_layer_parameters: [RecursionLeafParametersWitness; 16] = - leaf_vk_commits - .iter() - .map(|el| el.1.clone()) - .collect::>() - .try_into() - .unwrap(); - - let input = RecursionTipInputWitness { - leaf_layer_parameters, - node_layer_vk_commitment, - branch_circuit_type_set, - queue_set, - }; - - let recursion_tip_witness = RecursionTipInstanceWitness { - input, - vk_witness: node_vk.clone().into_inner(), - proof_witnesses: recursion_tip_proofs.into(), - }; - - WITNESS_GENERATOR_METRICS.prepare_job_time[&AggregationRound::RecursionTip.into()] - .observe(started_at.elapsed()); - - Ok(RecursionTipWitnessGeneratorJob { - block_number: l1_batch_number, - recursion_tip_witness, - node_vk, - }) -} - #[async_trait] impl WitnessGenerator for RecursionTipWitnessGenerator { type Job = RecursionTipWitnessGeneratorJob; - type Metadata = (); + type Metadata = RecursionTipJobMetadata; type Artifacts = RecursionTipArtifacts; #[tracing::instrument( @@ -236,11 +150,95 @@ impl WitnessGenerator for RecursionTipWitnessGenerator { }) } - fn prepare_job( - metadata: Self::Metadata, + #[tracing::instrument( + skip_all, + fields(l1_batch = %metadata.l1_batch_number) + )] + async fn prepare_job( + metadata: RecursionTipJobMetadata, object_store: &dyn ObjectStore, - keystore: Option, - ) -> Self::Job { - todo!() + keystore: Keystore, + ) -> anyhow::Result { + let started_at = Instant::now(); + let recursion_tip_proofs = RecursionTipWitnessGenerator::get_artifacts( + &metadata.final_node_proof_job_ids, + object_store, + ) + .await?; + WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::RecursionTip.into()] + .observe(started_at.elapsed()); + + let node_vk = keystore + .load_recursive_layer_verification_key( + ZkSyncRecursionLayerStorageType::NodeLayerCircuit as u8, + ) + .context("get_recursive_layer_vk_for_circuit_type()")?; + + let node_layer_vk_commitment = compute_node_vk_commitment(node_vk.clone()); + + let mut recursion_queues = vec![]; + for circuit_id in BaseLayerCircuitType::as_iter_u8() { + let key = ClosedFormInputKey { + block_number: metadata.l1_batch_number, + circuit_id, + }; + let ClosedFormInputWrapper(_, recursion_queue) = object_store.get(key).await?; + recursion_queues.push((circuit_id, recursion_queue)); + } + + // RECURSION_TIP_ARITY is the maximum amount of proof that a single recursion tip can support. + // Given recursion_tip has at most 1 proof per circuit, it implies we can't add more circuit types without bumping arity up. + assert!( + RECURSION_TIP_ARITY >= recursion_queues.len(), + "recursion tip received more circuits ({}) than supported ({})", + recursion_queues.len(), + RECURSION_TIP_ARITY + ); + let mut branch_circuit_type_set = [GoldilocksField::ZERO; RECURSION_TIP_ARITY]; + let mut queue_set: [_; RECURSION_TIP_ARITY] = + std::array::from_fn(|_| QueueState::placeholder_witness()); + + for (index, (circuit_id, recursion_queue)) in recursion_queues.iter().enumerate() { + branch_circuit_type_set[index] = + GoldilocksField::from_u64_unchecked(*circuit_id as u64); + queue_set[index] = take_sponge_like_queue_state_from_simulator(recursion_queue); + } + + let leaf_vk_commits = get_leaf_vk_params(&keystore).context("get_leaf_vk_params()")?; + assert_eq!( + leaf_vk_commits.len(), + 16, + "expected 16 leaf vk commits, which corresponds to the numebr of circuits, got {}", + leaf_vk_commits.len() + ); + let leaf_layer_parameters: [RecursionLeafParametersWitness; 16] = + leaf_vk_commits + .iter() + .map(|el| el.1.clone()) + .collect::>() + .try_into() + .unwrap(); + + let input = RecursionTipInputWitness { + leaf_layer_parameters, + node_layer_vk_commitment, + branch_circuit_type_set, + queue_set, + }; + + let recursion_tip_witness = RecursionTipInstanceWitness { + input, + vk_witness: node_vk.clone().into_inner(), + proof_witnesses: recursion_tip_proofs.into(), + }; + + WITNESS_GENERATOR_METRICS.prepare_job_time[&AggregationRound::RecursionTip.into()] + .observe(started_at.elapsed()); + + Ok(RecursionTipWitnessGeneratorJob { + block_number: metadata.l1_batch_number, + recursion_tip_witness, + node_vk, + }) } } diff --git a/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs b/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs index 55b5a3dae52..158a1d0e8e1 100644 --- a/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs @@ -11,7 +11,8 @@ use crate::{ artifacts::ArtifactsManager, metrics::WITNESS_GENERATOR_METRICS, scheduler::{ - prepare_job, SchedulerArtifacts, SchedulerWitnessGenerator, SchedulerWitnessGeneratorJob, + SchedulerArtifacts, SchedulerWitnessGenerator, SchedulerWitnessGeneratorJob, + SchedulerWitnessJobMetadata, }, witness_generator::WitnessGenerator, }; @@ -45,9 +46,11 @@ impl JobProcessor for SchedulerWitnessGenerator { Ok(Some(( l1_batch_number, - prepare_job( - l1_batch_number, - recursion_tip_job_id, + ::prepare_job( + SchedulerWitnessJobMetadata { + l1_batch_number, + recursion_tip_job_id, + }, &*self.object_store, self.keystore.clone(), ) @@ -74,7 +77,7 @@ impl JobProcessor for SchedulerWitnessGenerator { started_at: Instant, ) -> tokio::task::JoinHandle> { let object_store = self.object_store.clone(); - tokio::task::spawn_blocking(async move || { + tokio::spawn(async move || { let block_number = job.block_number; let _span = tracing::info_span!("scheduler", %block_number).entered(); ::process_job(job, object_store, None, started_at).await diff --git a/prover/crates/bin/witness_generator/src/scheduler/mod.rs b/prover/crates/bin/witness_generator/src/scheduler/mod.rs index a3ea03c022b..7af3d68d5a7 100644 --- a/prover/crates/bin/witness_generator/src/scheduler/mod.rs +++ b/prover/crates/bin/witness_generator/src/scheduler/mod.rs @@ -55,6 +55,11 @@ pub struct SchedulerWitnessGeneratorJob { [RecursionLeafParametersWitness; NUM_BASE_LAYER_CIRCUITS], } +pub struct SchedulerWitnessJobMetadata { + pub l1_batch_number: L1BatchNumber, + pub recursion_tip_job_id: u32, +} + #[derive(Debug)] pub struct SchedulerWitnessGenerator { config: FriWitnessGeneratorConfig, @@ -82,68 +87,10 @@ impl SchedulerWitnessGenerator { } } -#[tracing::instrument( - skip_all, - fields(l1_batch = %l1_batch_number) -)] -pub async fn prepare_job( - l1_batch_number: L1BatchNumber, - recursion_tip_job_id: u32, - object_store: &dyn ObjectStore, - keystore: Keystore, -) -> anyhow::Result { - let started_at = Instant::now(); - let wrapper = - SchedulerWitnessGenerator::get_artifacts(&recursion_tip_job_id, object_store).await?; - let recursion_tip_proof = match wrapper { - FriProofWrapper::Base(_) => Err(anyhow::anyhow!( - "Expected only recursive proofs for scheduler l1 batch {l1_batch_number}, got Base" - )), - FriProofWrapper::Recursive(recursive_proof) => Ok(recursive_proof.into_inner()), - }?; - WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::Scheduler.into()] - .observe(started_at.elapsed()); - - let started_at = Instant::now(); - let node_vk = keystore - .load_recursive_layer_verification_key( - ZkSyncRecursionLayerStorageType::NodeLayerCircuit as u8, - ) - .context("get_recursive_layer_vk_for_circuit_type()")?; - let SchedulerPartialInputWrapper(mut scheduler_witness) = - object_store.get(l1_batch_number).await?; - - let recursion_tip_vk = keystore - .load_recursive_layer_verification_key( - ZkSyncRecursionLayerStorageType::RecursionTipCircuit as u8, - ) - .context("get_recursion_tip_vk()")?; - scheduler_witness.proof_witnesses = vec![recursion_tip_proof].into(); - - let leaf_vk_commits = get_leaf_vk_params(&keystore).context("get_leaf_vk_params()")?; - let leaf_layer_parameters = leaf_vk_commits - .iter() - .map(|el| el.1.clone()) - .collect::>() - .try_into() - .unwrap(); - - WITNESS_GENERATOR_METRICS.prepare_job_time[&AggregationRound::Scheduler.into()] - .observe(started_at.elapsed()); - - Ok(SchedulerWitnessGeneratorJob { - block_number: l1_batch_number, - scheduler_witness, - node_vk, - leaf_layer_parameters, - recursion_tip_vk, - }) -} - #[async_trait] impl WitnessGenerator for SchedulerWitnessGenerator { type Job = SchedulerWitnessGeneratorJob; - type Metadata = (); + type Metadata = SchedulerWitnessJobMetadata; type Artifacts = SchedulerArtifacts; #[tracing::instrument( @@ -191,11 +138,62 @@ impl WitnessGenerator for SchedulerWitnessGenerator { }) } - fn prepare_job( - metadata: Self::Metadata, + #[tracing::instrument( + skip_all, + fields(l1_batch = %metadata.l1_batch_number) + )] + async fn prepare_job( + metadata: SchedulerWitnessJobMetadata, object_store: &dyn ObjectStore, - keystore: Option, - ) -> Self::Job { - todo!() + keystore: Keystore, + ) -> anyhow::Result { + let started_at = Instant::now(); + let wrapper = + SchedulerWitnessGenerator::get_artifacts(&metadata.recursion_tip_job_id, object_store) + .await?; + let recursion_tip_proof = match wrapper { + FriProofWrapper::Base(_) => Err(anyhow::anyhow!( + "Expected only recursive proofs for scheduler l1 batch {}, got Base", + metadata.l1_batch_number + )), + FriProofWrapper::Recursive(recursive_proof) => Ok(recursive_proof.into_inner()), + }?; + WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::Scheduler.into()] + .observe(started_at.elapsed()); + + let started_at = Instant::now(); + let node_vk = keystore + .load_recursive_layer_verification_key( + ZkSyncRecursionLayerStorageType::NodeLayerCircuit as u8, + ) + .context("get_recursive_layer_vk_for_circuit_type()")?; + let SchedulerPartialInputWrapper(mut scheduler_witness) = + object_store.get(metadata.l1_batch_number).await?; + + let recursion_tip_vk = keystore + .load_recursive_layer_verification_key( + ZkSyncRecursionLayerStorageType::RecursionTipCircuit as u8, + ) + .context("get_recursion_tip_vk()")?; + scheduler_witness.proof_witnesses = vec![recursion_tip_proof].into(); + + let leaf_vk_commits = get_leaf_vk_params(&keystore).context("get_leaf_vk_params()")?; + let leaf_layer_parameters = leaf_vk_commits + .iter() + .map(|el| el.1.clone()) + .collect::>() + .try_into() + .unwrap(); + + WITNESS_GENERATOR_METRICS.prepare_job_time[&AggregationRound::Scheduler.into()] + .observe(started_at.elapsed()); + + Ok(SchedulerWitnessGeneratorJob { + block_number: metadata.l1_batch_number, + scheduler_witness, + node_vk, + leaf_layer_parameters, + recursion_tip_vk, + }) } } diff --git a/prover/crates/bin/witness_generator/src/witness_generator.rs b/prover/crates/bin/witness_generator/src/witness_generator.rs index aa313eb08fe..eb9200d7950 100644 --- a/prover/crates/bin/witness_generator/src/witness_generator.rs +++ b/prover/crates/bin/witness_generator/src/witness_generator.rs @@ -3,7 +3,6 @@ use std::{sync::Arc, time::Instant}; use async_trait::async_trait; use zksync_object_store::ObjectStore; use zksync_prover_keystore::keystore::Keystore; -use zksync_queued_job_processor::JobProcessor; #[async_trait] pub trait WitnessGenerator { @@ -13,14 +12,14 @@ pub trait WitnessGenerator { async fn process_job( job: Self::Job, - object_store: Arc<&dyn ObjectStore>, + object_store: Arc, max_circuits_in_flight: Option, started_at: Instant, ) -> anyhow::Result; - fn prepare_job( + async fn prepare_job( metadata: Self::Metadata, object_store: &dyn ObjectStore, - keystore: Option, - ) -> Self::Job; + keystore: Keystore, + ) -> anyhow::Result; } From c376db10eca7245eca7c22bdb84481dac01f11df Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Tue, 17 Sep 2024 18:20:27 +0300 Subject: [PATCH 06/12] fix build --- .../bin/witness_generator/src/recursion_tip/job_processor.rs | 2 +- .../bin/witness_generator/src/scheduler/job_processor.rs | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs b/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs index accd12e3f31..4ace4567fab 100644 --- a/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs @@ -81,7 +81,7 @@ impl JobProcessor for RecursionTipWitnessGenerator { started_at: Instant, ) -> tokio::task::JoinHandle> { let object_store = self.object_store.clone(); - tokio::spawn(async move || { + tokio::spawn(async move { ::process_job(job, object_store, None, started_at).await }) } diff --git a/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs b/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs index 158a1d0e8e1..f49b8c2c4ed 100644 --- a/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs @@ -77,9 +77,7 @@ impl JobProcessor for SchedulerWitnessGenerator { started_at: Instant, ) -> tokio::task::JoinHandle> { let object_store = self.object_store.clone(); - tokio::spawn(async move || { - let block_number = job.block_number; - let _span = tracing::info_span!("scheduler", %block_number).entered(); + tokio::spawn(async move { ::process_job(job, object_store, None, started_at).await }) } From b1cc39f95a3896793dd2532f8dc3fe14bbbf51fe Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Tue, 17 Sep 2024 18:40:38 +0300 Subject: [PATCH 07/12] restructure basic round --- .../src/basic_circuits/mod.rs | 167 ++++++++---------- 1 file changed, 77 insertions(+), 90 deletions(-) diff --git a/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs b/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs index 6b6dec7a920..e76ef180c52 100644 --- a/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs +++ b/prover/crates/bin/witness_generator/src/basic_circuits/mod.rs @@ -114,52 +114,66 @@ impl BasicWitnessGenerator { } } -#[tracing::instrument(skip_all, fields(l1_batch = %block_number))] -pub(super) async fn process_basic_circuits_job( - object_store: Arc, - started_at: Instant, - block_number: L1BatchNumber, - job: WitnessInputData, - max_circuits_in_flight: usize, -) -> BasicCircuitArtifacts { - let (circuit_urls, queue_urls, scheduler_witness, aux_output_witness) = - generate_witness(block_number, object_store, job, max_circuits_in_flight).await; - WITNESS_GENERATOR_METRICS.witness_generation_time[&AggregationRound::BasicCircuits.into()] - .observe(started_at.elapsed()); - tracing::info!( - "Witness generation for block {} is complete in {:?}", - block_number.0, - started_at.elapsed() - ); +#[async_trait] +impl WitnessGenerator for BasicWitnessGenerator { + type Job = BasicWitnessGeneratorJob; + type Metadata = L1BatchNumber; + type Artifacts = BasicCircuitArtifacts; - BasicCircuitArtifacts { - circuit_urls, - queue_urls, - scheduler_witness, - aux_output_witness, + async fn process_job( + job: BasicWitnessGeneratorJob, + object_store: Arc, + max_circuits_in_flight: Option, + started_at: Instant, + ) -> anyhow::Result { + let BasicWitnessGeneratorJob { + block_number, + data: job, + } = job; + + tracing::info!( + "Starting witness generation of type {:?} for block {}", + AggregationRound::BasicCircuits, + block_number.0 + ); + + let (circuit_urls, queue_urls, scheduler_witness, aux_output_witness) = generate_witness( + block_number, + object_store, + job, + max_circuits_in_flight.unwrap(), + ) + .await; + WITNESS_GENERATOR_METRICS.witness_generation_time[&AggregationRound::BasicCircuits.into()] + .observe(started_at.elapsed()); + tracing::info!( + "Witness generation for block {} is complete in {:?}", + block_number.0, + started_at.elapsed() + ); + + Ok(BasicCircuitArtifacts { + circuit_urls, + queue_urls, + scheduler_witness, + aux_output_witness, + }) } -} -#[tracing::instrument(skip_all, fields(l1_batch = %block_number, circuit_id = %circuit_id))] -async fn save_recursion_queue( - block_number: L1BatchNumber, - circuit_id: u8, - recursion_queue_simulator: RecursionQueueSimulator, - closed_form_inputs: Vec>, - object_store: Arc, -) -> (u8, String, usize) { - let key = ClosedFormInputKey { - block_number, - circuit_id, - }; - let basic_circuit_count = closed_form_inputs.len(); - let closed_form_inputs = closed_form_inputs - .iter() - .map(|x| ZkSyncBaseLayerStorage::from_inner(circuit_id, x.clone())) - .collect(); - let wrapper = ClosedFormInputWrapper(closed_form_inputs, recursion_queue_simulator); - let blob_url = object_store.put(key, &wrapper).await.unwrap(); - (circuit_id, blob_url, basic_circuit_count) + async fn prepare_job( + metadata: L1BatchNumber, + object_store: &dyn ObjectStore, + _keystore: Keystore, + ) -> anyhow::Result { + tracing::info!("Processing FRI basic witness-gen for block {}", metadata.0); + let started_at = Instant::now(); + let job = Self::get_artifacts(&metadata, object_store).await?; + + WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::BasicCircuits.into()] + .observe(started_at.elapsed()); + + Ok(job) + } } #[tracing::instrument(skip_all, fields(l1_batch = %block_number))] @@ -440,51 +454,24 @@ async fn generate_witness( ) } -#[async_trait] -impl WitnessGenerator for BasicWitnessGenerator { - type Job = BasicWitnessGeneratorJob; - type Metadata = L1BatchNumber; - type Artifacts = BasicCircuitArtifacts; - - async fn process_job( - job: BasicWitnessGeneratorJob, - object_store: Arc, - max_circuits_in_flight: Option, - started_at: Instant, - ) -> anyhow::Result { - let BasicWitnessGeneratorJob { - block_number, - data: job, - } = job; - - tracing::info!( - "Starting witness generation of type {:?} for block {}", - AggregationRound::BasicCircuits, - block_number.0 - ); - - Ok(process_basic_circuits_job( - object_store, - started_at, - block_number, - job, - max_circuits_in_flight.unwrap(), - ) - .await) - } - - async fn prepare_job( - metadata: L1BatchNumber, - object_store: &dyn ObjectStore, - _keystore: Keystore, - ) -> anyhow::Result { - tracing::info!("Processing FRI basic witness-gen for block {}", metadata.0); - let started_at = Instant::now(); - let job = Self::get_artifacts(&metadata, object_store).await?; - - WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::BasicCircuits.into()] - .observe(started_at.elapsed()); - - Ok(job) - } +#[tracing::instrument(skip_all, fields(l1_batch = %block_number, circuit_id = %circuit_id))] +async fn save_recursion_queue( + block_number: L1BatchNumber, + circuit_id: u8, + recursion_queue_simulator: RecursionQueueSimulator, + closed_form_inputs: Vec>, + object_store: Arc, +) -> (u8, String, usize) { + let key = ClosedFormInputKey { + block_number, + circuit_id, + }; + let basic_circuit_count = closed_form_inputs.len(); + let closed_form_inputs = closed_form_inputs + .iter() + .map(|x| ZkSyncBaseLayerStorage::from_inner(circuit_id, x.clone())) + .collect(); + let wrapper = ClosedFormInputWrapper(closed_form_inputs, recursion_queue_simulator); + let blob_url = object_store.put(key, &wrapper).await.unwrap(); + (circuit_id, blob_url, basic_circuit_count) } From 57a619499ae5af6e7064ee3a92a236a1b641be94 Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Tue, 17 Sep 2024 18:42:24 +0300 Subject: [PATCH 08/12] save_artifacts -> save_to_bucket --- prover/crates/bin/witness_generator/src/artifacts.rs | 2 +- .../bin/witness_generator/src/basic_circuits/artifacts.rs | 2 +- .../bin/witness_generator/src/basic_circuits/job_processor.rs | 2 +- .../bin/witness_generator/src/leaf_aggregation/artifacts.rs | 2 +- .../bin/witness_generator/src/leaf_aggregation/job_processor.rs | 2 +- .../bin/witness_generator/src/node_aggregation/artifacts.rs | 2 +- .../bin/witness_generator/src/node_aggregation/job_processor.rs | 2 +- .../crates/bin/witness_generator/src/recursion_tip/artifacts.rs | 2 +- .../bin/witness_generator/src/recursion_tip/job_processor.rs | 2 +- prover/crates/bin/witness_generator/src/scheduler/artifacts.rs | 2 +- .../crates/bin/witness_generator/src/scheduler/job_processor.rs | 2 +- 11 files changed, 11 insertions(+), 11 deletions(-) diff --git a/prover/crates/bin/witness_generator/src/artifacts.rs b/prover/crates/bin/witness_generator/src/artifacts.rs index 953ee3d7fc0..b7439773c58 100644 --- a/prover/crates/bin/witness_generator/src/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/artifacts.rs @@ -34,7 +34,7 @@ pub(crate) trait ArtifactsManager { object_store: &dyn ObjectStore, ) -> anyhow::Result; - async fn save_artifacts( + async fn save_to_bucket( job_id: u32, artifacts: Self::OutputArtifacts, object_store: &dyn ObjectStore, diff --git a/prover/crates/bin/witness_generator/src/basic_circuits/artifacts.rs b/prover/crates/bin/witness_generator/src/basic_circuits/artifacts.rs index fad32b082f5..f4ab62b4863 100644 --- a/prover/crates/bin/witness_generator/src/basic_circuits/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/basic_circuits/artifacts.rs @@ -31,7 +31,7 @@ impl ArtifactsManager for BasicWitnessGenerator { }) } - async fn save_artifacts( + async fn save_to_bucket( job_id: u32, artifacts: Self::OutputArtifacts, object_store: &dyn ObjectStore, diff --git a/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs b/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs index 3e856bb478b..1f6d5edfade 100644 --- a/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs @@ -108,7 +108,7 @@ impl JobProcessor for BasicWitnessGenerator { } let scheduler_witness_url = - match Self::save_artifacts(job_id.0, artifacts.clone(), &*self.object_store) + match Self::save_to_bucket(job_id.0, artifacts.clone(), &*self.object_store) .await { BlobUrls::Url(url) => url, diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs index a43b8bc2580..44a0822a6a4 100644 --- a/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs @@ -41,7 +41,7 @@ impl ArtifactsManager for LeafAggregationWitnessGenerator { skip_all, fields(l1_batch = %artifacts.block_number, circuit_id = %artifacts.circuit_id) )] - async fn save_artifacts( + async fn save_to_bucket( _job_id: u32, artifacts: Self::OutputArtifacts, object_store: &dyn ObjectStore, diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs index 416d62c49a3..440636b85fa 100644 --- a/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation/job_processor.rs @@ -94,7 +94,7 @@ impl JobProcessor for LeafAggregationWitnessGenerator { let blob_save_started_at = Instant::now(); - let blob_urls = Self::save_artifacts(job_id, artifacts.clone(), &*self.object_store).await; + let blob_urls = Self::save_to_bucket(job_id, artifacts.clone(), &*self.object_store).await; WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::LeafAggregation.into()] .observe(blob_save_started_at.elapsed()); diff --git a/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs b/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs index 275e7cccb39..15451f91b83 100644 --- a/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs @@ -46,7 +46,7 @@ impl ArtifactsManager for NodeAggregationWitnessGenerator { skip_all, fields(l1_batch = %artifacts.block_number, circuit_id = %artifacts.circuit_id) )] - async fn save_artifacts( + async fn save_to_bucket( _job_id: u32, artifacts: Self::OutputArtifacts, object_store: &dyn ObjectStore, diff --git a/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs b/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs index 05d0361f9c9..0f66c988c10 100644 --- a/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/node_aggregation/job_processor.rs @@ -90,7 +90,7 @@ impl JobProcessor for NodeAggregationWitnessGenerator { ) -> anyhow::Result<()> { let blob_save_started_at = Instant::now(); - let blob_urls = Self::save_artifacts(job_id, artifacts.clone(), &*self.object_store).await; + let blob_urls = Self::save_to_bucket(job_id, artifacts.clone(), &*self.object_store).await; WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::NodeAggregation.into()] .observe(blob_save_started_at.elapsed()); diff --git a/prover/crates/bin/witness_generator/src/recursion_tip/artifacts.rs b/prover/crates/bin/witness_generator/src/recursion_tip/artifacts.rs index 60707cec306..78bbf683b33 100644 --- a/prover/crates/bin/witness_generator/src/recursion_tip/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/recursion_tip/artifacts.rs @@ -73,7 +73,7 @@ impl ArtifactsManager for RecursionTipWitnessGenerator { Ok(proofs) } - async fn save_artifacts( + async fn save_to_bucket( job_id: u32, artifacts: Self::OutputArtifacts, object_store: &dyn ObjectStore, diff --git a/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs b/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs index 4ace4567fab..9ab7d934a3e 100644 --- a/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/recursion_tip/job_processor.rs @@ -99,7 +99,7 @@ impl JobProcessor for RecursionTipWitnessGenerator { let blob_save_started_at = Instant::now(); let blob_urls = - Self::save_artifacts(job_id.0, artifacts.clone(), &*self.object_store).await; + Self::save_to_bucket(job_id.0, artifacts.clone(), &*self.object_store).await; WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::RecursionTip.into()] .observe(blob_save_started_at.elapsed()); diff --git a/prover/crates/bin/witness_generator/src/scheduler/artifacts.rs b/prover/crates/bin/witness_generator/src/scheduler/artifacts.rs index 16dcd65501e..aa72f92ca6f 100644 --- a/prover/crates/bin/witness_generator/src/scheduler/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/scheduler/artifacts.rs @@ -27,7 +27,7 @@ impl ArtifactsManager for SchedulerWitnessGenerator { Ok(artifacts) } - async fn save_artifacts( + async fn save_to_bucket( job_id: u32, artifacts: Self::OutputArtifacts, object_store: &dyn ObjectStore, diff --git a/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs b/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs index f49b8c2c4ed..b5745f98091 100644 --- a/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/scheduler/job_processor.rs @@ -95,7 +95,7 @@ impl JobProcessor for SchedulerWitnessGenerator { let blob_save_started_at = Instant::now(); let blob_urls = - Self::save_artifacts(job_id.0, artifacts.clone(), &*self.object_store).await; + Self::save_to_bucket(job_id.0, artifacts.clone(), &*self.object_store).await; WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::Scheduler.into()] .observe(blob_save_started_at.elapsed()); From 04d2675f35fba0496a79ca834232c43526fcb78e Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Tue, 17 Sep 2024 18:47:28 +0300 Subject: [PATCH 09/12] fix basic test --- .../bin/witness_generator/tests/basic_test.rs | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/prover/crates/bin/witness_generator/tests/basic_test.rs b/prover/crates/bin/witness_generator/tests/basic_test.rs index 3323e3c681e..b6eb1ccb96e 100644 --- a/prover/crates/bin/witness_generator/tests/basic_test.rs +++ b/prover/crates/bin/witness_generator/tests/basic_test.rs @@ -15,9 +15,9 @@ use zksync_types::{ L1BatchNumber, }; use zksync_witness_generator::{ - leaf_aggregation::{prepare_leaf_aggregation_job, LeafAggregationWitnessGenerator}, - node_aggregation::{self, NodeAggregationWitnessGenerator}, - utils::AggregationWrapper, + leaf_aggregation::LeafAggregationWitnessGenerator, + node_aggregation::NodeAggregationWitnessGenerator, utils::AggregationWrapper, + witness_generator::WitnessGenerator, }; fn compare_serialized(expected: &T, actual: &T) { @@ -52,9 +52,13 @@ async fn test_leaf_witness_gen() { .unwrap(); let keystore = Keystore::locate(); - let job = prepare_leaf_aggregation_job(leaf_aggregation_job_metadata, &*object_store, keystore) - .await - .unwrap(); + let job = LeafAggregationWitnessGenerator::prepare_job( + leaf_aggregation_job_metadata, + &*object_store, + keystore, + ) + .await + .unwrap(); let artifacts = LeafAggregationWitnessGenerator::process_job_impl( job, @@ -142,10 +146,13 @@ async fn test_node_witness_gen() { }; let keystore = Keystore::locate(); - let job = - node_aggregation::prepare_job(node_aggregation_job_metadata, &*object_store, keystore) - .await - .unwrap(); + let job = NodeAggregationWitnessGenerator::prepare_job( + node_aggregation_job_metadata, + &*object_store, + keystore, + ) + .await + .unwrap(); let artifacts = NodeAggregationWitnessGenerator::process_job_impl( job, From d2c61b0f76fdf37ed94b31f922dfdb4856043775 Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Wed, 18 Sep 2024 10:34:19 +0300 Subject: [PATCH 10/12] fix basic test --- .../bin/witness_generator/tests/basic_test.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/prover/crates/bin/witness_generator/tests/basic_test.rs b/prover/crates/bin/witness_generator/tests/basic_test.rs index b6eb1ccb96e..1b85d89de08 100644 --- a/prover/crates/bin/witness_generator/tests/basic_test.rs +++ b/prover/crates/bin/witness_generator/tests/basic_test.rs @@ -60,13 +60,13 @@ async fn test_leaf_witness_gen() { .await .unwrap(); - let artifacts = LeafAggregationWitnessGenerator::process_job_impl( + let artifacts = LeafAggregationWitnessGenerator::process_job( job, - Instant::now(), object_store.clone(), - 500, + Some(500), + Instant::now(), ) - .await; + .await?; let aggregations = AggregationWrapper(artifacts.aggregations); @@ -154,13 +154,14 @@ async fn test_node_witness_gen() { .await .unwrap(); - let artifacts = NodeAggregationWitnessGenerator::process_job_impl( + let artifacts = NodeAggregationWitnessGenerator::process_job( job, - Instant::now(), object_store.clone(), - 500, + Some(500), + Instant::now(), ) - .await; + .await?; + let aggregations = AggregationWrapper(artifacts.next_aggregations); let expected_results_object_store_config = ObjectStoreConfig { From c5a0567dbe1f3db51f1e920e3a3cabd06637099f Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Wed, 18 Sep 2024 11:27:44 +0300 Subject: [PATCH 11/12] use better type for blob urls --- .../bin/witness_generator/src/artifacts.rs | 21 +++++---------- .../src/basic_circuits/artifacts.rs | 26 +++++++------------ .../src/basic_circuits/job_processor.rs | 19 +++----------- .../src/leaf_aggregation/artifacts.rs | 16 +++++------- .../src/node_aggregation/artifacts.rs | 15 +++++------ .../src/recursion_tip/artifacts.rs | 20 +++++--------- .../src/scheduler/artifacts.rs | 20 +++++--------- .../bin/witness_generator/tests/basic_test.rs | 6 +++-- 8 files changed, 51 insertions(+), 92 deletions(-) diff --git a/prover/crates/bin/witness_generator/src/artifacts.rs b/prover/crates/bin/witness_generator/src/artifacts.rs index b7439773c58..6b21a4cdb93 100644 --- a/prover/crates/bin/witness_generator/src/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/artifacts.rs @@ -9,25 +9,18 @@ pub(crate) struct AggregationBlobUrls { pub aggregation_urls: String, pub circuit_ids_and_urls: Vec<(u8, String)>, } +/// Marker trait for managing different types of blob urls +pub trait BlobUrl {} -#[derive(Debug)] -pub(crate) struct SchedulerBlobUrls { - pub circuit_ids_and_urls: Vec<(u8, String)>, - pub closed_form_inputs_and_urls: Vec<(u8, String, usize)>, - pub scheduler_witness_url: String, -} - -pub(crate) enum BlobUrls { - Url(String), - Aggregation(AggregationBlobUrls), - Scheduler(SchedulerBlobUrls), -} +impl BlobUrl for AggregationBlobUrls {} +impl BlobUrl for String {} #[async_trait] pub(crate) trait ArtifactsManager { type InputMetadata; type InputArtifacts; type OutputArtifacts; + type BlobUrls: BlobUrl; async fn get_artifacts( metadata: &Self::InputMetadata, @@ -38,13 +31,13 @@ pub(crate) trait ArtifactsManager { job_id: u32, artifacts: Self::OutputArtifacts, object_store: &dyn ObjectStore, - ) -> BlobUrls; + ) -> Self::BlobUrls; async fn save_to_database( connection_pool: &ConnectionPool, job_id: u32, started_at: Instant, - blob_urls: BlobUrls, + blob_urls: Self::BlobUrls, artifacts: Self::OutputArtifacts, ) -> anyhow::Result<()>; } diff --git a/prover/crates/bin/witness_generator/src/basic_circuits/artifacts.rs b/prover/crates/bin/witness_generator/src/basic_circuits/artifacts.rs index f4ab62b4863..aa85d185e66 100644 --- a/prover/crates/bin/witness_generator/src/basic_circuits/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/basic_circuits/artifacts.rs @@ -8,7 +8,7 @@ use zksync_prover_fri_utils::get_recursive_layer_circuit_id_for_base_layer; use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber}; use crate::{ - artifacts::{ArtifactsManager, BlobUrls}, + artifacts::ArtifactsManager, basic_circuits::{BasicCircuitArtifacts, BasicWitnessGenerator, BasicWitnessGeneratorJob}, utils::SchedulerPartialInputWrapper, }; @@ -18,6 +18,7 @@ impl ArtifactsManager for BasicWitnessGenerator { type InputMetadata = L1BatchNumber; type InputArtifacts = BasicWitnessGeneratorJob; type OutputArtifacts = BasicCircuitArtifacts; + type BlobUrls = String; async fn get_artifacts( metadata: &Self::InputMetadata, @@ -35,19 +36,17 @@ impl ArtifactsManager for BasicWitnessGenerator { job_id: u32, artifacts: Self::OutputArtifacts, object_store: &dyn ObjectStore, - ) -> BlobUrls { + ) -> String { let aux_output_witness_wrapper = AuxOutputWitnessWrapper(artifacts.aux_output_witness); object_store .put(L1BatchNumber(job_id), &aux_output_witness_wrapper) .await .unwrap(); let wrapper = SchedulerPartialInputWrapper(artifacts.scheduler_witness); - let url = object_store + object_store .put(L1BatchNumber(job_id), &wrapper) .await - .unwrap(); - - BlobUrls::Url(url) + .unwrap() } #[tracing::instrument(skip_all, fields(l1_batch = %job_id))] @@ -55,14 +54,9 @@ impl ArtifactsManager for BasicWitnessGenerator { connection_pool: &ConnectionPool, job_id: u32, started_at: Instant, - blob_urls: BlobUrls, - _artifacts: Self::OutputArtifacts, + blob_urls: String, + artifacts: Self::OutputArtifacts, ) -> anyhow::Result<()> { - let blob_urls = match blob_urls { - BlobUrls::Scheduler(blobs) => blobs, - _ => unreachable!(), - }; - let mut connection = connection_pool .connection() .await @@ -79,7 +73,7 @@ impl ArtifactsManager for BasicWitnessGenerator { .fri_prover_jobs_dal() .insert_prover_jobs( L1BatchNumber(job_id), - blob_urls.circuit_ids_and_urls, + artifacts.circuit_urls, AggregationRound::BasicCircuits, 0, protocol_version_id, @@ -89,8 +83,8 @@ impl ArtifactsManager for BasicWitnessGenerator { .fri_witness_generator_dal() .create_aggregation_jobs( L1BatchNumber(job_id), - &blob_urls.closed_form_inputs_and_urls, - &blob_urls.scheduler_witness_url, + &artifacts.queue_urls, + &blob_urls, get_recursive_layer_circuit_id_for_base_layer, protocol_version_id, ) diff --git a/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs b/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs index 1f6d5edfade..50e747b1ce1 100644 --- a/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs +++ b/prover/crates/bin/witness_generator/src/basic_circuits/job_processor.rs @@ -9,7 +9,7 @@ use zksync_queued_job_processor::{async_trait, JobProcessor}; use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber}; use crate::{ - artifacts::{ArtifactsManager, BlobUrls, SchedulerBlobUrls}, + artifacts::ArtifactsManager, basic_circuits::{BasicCircuitArtifacts, BasicWitnessGenerator, BasicWitnessGeneratorJob}, metrics::WITNESS_GENERATOR_METRICS, witness_generator::WitnessGenerator, @@ -94,8 +94,6 @@ impl JobProcessor for BasicWitnessGenerator { None => Ok(()), Some(artifacts) => { let blob_started_at = Instant::now(); - let circuit_urls = artifacts.circuit_urls.clone(); - let queue_urls = artifacts.queue_urls.clone(); let aux_output_witness_wrapper = AuxOutputWitnessWrapper(artifacts.aux_output_witness.clone()); @@ -107,13 +105,8 @@ impl JobProcessor for BasicWitnessGenerator { .unwrap(); } - let scheduler_witness_url = - match Self::save_to_bucket(job_id.0, artifacts.clone(), &*self.object_store) - .await - { - BlobUrls::Url(url) => url, - _ => unreachable!(), - }; + let blob_urls = + Self::save_to_bucket(job_id.0, artifacts.clone(), &*self.object_store).await; WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::BasicCircuits.into()] .observe(blob_started_at.elapsed()); @@ -122,11 +115,7 @@ impl JobProcessor for BasicWitnessGenerator { &self.prover_connection_pool, job_id.0, started_at, - BlobUrls::Scheduler(SchedulerBlobUrls { - circuit_ids_and_urls: circuit_urls, - closed_form_inputs_and_urls: queue_urls, - scheduler_witness_url, - }), + blob_urls, artifacts, ) .await?; diff --git a/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs b/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs index 44a0822a6a4..c83997e36b8 100644 --- a/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/leaf_aggregation/artifacts.rs @@ -8,7 +8,7 @@ use zksync_prover_fri_utils::get_recursive_layer_circuit_id_for_base_layer; use zksync_types::{basic_fri_types::AggregationRound, prover_dal::LeafAggregationJobMetadata}; use crate::{ - artifacts::{AggregationBlobUrls, ArtifactsManager, BlobUrls}, + artifacts::{AggregationBlobUrls, ArtifactsManager}, leaf_aggregation::{LeafAggregationArtifacts, LeafAggregationWitnessGenerator}, metrics::WITNESS_GENERATOR_METRICS, utils::{AggregationWrapper, ClosedFormInputWrapper}, @@ -19,6 +19,7 @@ impl ArtifactsManager for LeafAggregationWitnessGenerator { type InputMetadata = LeafAggregationJobMetadata; type InputArtifacts = ClosedFormInputWrapper; type OutputArtifacts = LeafAggregationArtifacts; + type BlobUrls = AggregationBlobUrls; async fn get_artifacts( metadata: &Self::InputMetadata, @@ -45,7 +46,7 @@ impl ArtifactsManager for LeafAggregationWitnessGenerator { _job_id: u32, artifacts: Self::OutputArtifacts, object_store: &dyn ObjectStore, - ) -> BlobUrls { + ) -> AggregationBlobUrls { let started_at = Instant::now(); let key = AggregationsKey { block_number: artifacts.block_number, @@ -60,10 +61,10 @@ impl ArtifactsManager for LeafAggregationWitnessGenerator { WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::LeafAggregation.into()] .observe(started_at.elapsed()); - BlobUrls::Aggregation(AggregationBlobUrls { + AggregationBlobUrls { aggregation_urls, circuit_ids_and_urls: artifacts.circuit_ids_and_urls, - }) + } } #[tracing::instrument( @@ -74,7 +75,7 @@ impl ArtifactsManager for LeafAggregationWitnessGenerator { connection_pool: &ConnectionPool, job_id: u32, started_at: Instant, - blob_urls: BlobUrls, + blob_urls: AggregationBlobUrls, artifacts: Self::OutputArtifacts, ) -> anyhow::Result<()> { tracing::info!( @@ -84,11 +85,6 @@ impl ArtifactsManager for LeafAggregationWitnessGenerator { artifacts.circuit_id, ); - let blob_urls = match blob_urls { - BlobUrls::Aggregation(blob_urls) => blob_urls, - _ => panic!("Unexpected blob urls type"), - }; - let mut prover_connection = connection_pool.connection().await.unwrap(); let mut transaction = prover_connection.start_transaction().await.unwrap(); let number_of_dependent_jobs = blob_urls.circuit_ids_and_urls.len(); diff --git a/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs b/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs index 15451f91b83..09f01899bf3 100644 --- a/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/node_aggregation/artifacts.rs @@ -7,7 +7,7 @@ use zksync_prover_fri_types::keys::AggregationsKey; use zksync_types::{basic_fri_types::AggregationRound, prover_dal::NodeAggregationJobMetadata}; use crate::{ - artifacts::{AggregationBlobUrls, ArtifactsManager, BlobUrls}, + artifacts::{AggregationBlobUrls, ArtifactsManager}, metrics::WITNESS_GENERATOR_METRICS, node_aggregation::{NodeAggregationArtifacts, NodeAggregationWitnessGenerator}, utils::AggregationWrapper, @@ -18,6 +18,7 @@ impl ArtifactsManager for NodeAggregationWitnessGenerator { type InputMetadata = NodeAggregationJobMetadata; type InputArtifacts = AggregationWrapper; type OutputArtifacts = NodeAggregationArtifacts; + type BlobUrls = AggregationBlobUrls; #[tracing::instrument( skip_all, @@ -50,7 +51,7 @@ impl ArtifactsManager for NodeAggregationWitnessGenerator { _job_id: u32, artifacts: Self::OutputArtifacts, object_store: &dyn ObjectStore, - ) -> BlobUrls { + ) -> AggregationBlobUrls { let started_at = Instant::now(); let key = AggregationsKey { block_number: artifacts.block_number, @@ -65,10 +66,10 @@ impl ArtifactsManager for NodeAggregationWitnessGenerator { WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::NodeAggregation.into()] .observe(started_at.elapsed()); - BlobUrls::Aggregation(AggregationBlobUrls { + AggregationBlobUrls { aggregation_urls, circuit_ids_and_urls: artifacts.recursive_circuit_ids_and_urls, - }) + } } #[tracing::instrument( @@ -79,14 +80,10 @@ impl ArtifactsManager for NodeAggregationWitnessGenerator { connection_pool: &ConnectionPool, job_id: u32, started_at: Instant, - blob_urls: BlobUrls, + blob_urls: AggregationBlobUrls, artifacts: Self::OutputArtifacts, ) -> anyhow::Result<()> { let mut prover_connection = connection_pool.connection().await.unwrap(); - let blob_urls = match blob_urls { - BlobUrls::Aggregation(blobs) => blobs, - _ => unreachable!(), - }; let mut transaction = prover_connection.start_transaction().await.unwrap(); let dependent_jobs = blob_urls.circuit_ids_and_urls.len(); let protocol_version_id = transaction diff --git a/prover/crates/bin/witness_generator/src/recursion_tip/artifacts.rs b/prover/crates/bin/witness_generator/src/recursion_tip/artifacts.rs index 78bbf683b33..b61aa948100 100644 --- a/prover/crates/bin/witness_generator/src/recursion_tip/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/recursion_tip/artifacts.rs @@ -12,7 +12,7 @@ use zksync_prover_fri_types::{keys::FriCircuitKey, CircuitWrapper, FriProofWrapp use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber}; use crate::{ - artifacts::{ArtifactsManager, BlobUrls}, + artifacts::ArtifactsManager, recursion_tip::{RecursionTipArtifacts, RecursionTipWitnessGenerator}, }; @@ -21,6 +21,7 @@ impl ArtifactsManager for RecursionTipWitnessGenerator { type InputMetadata = Vec<(u8, u32)>; type InputArtifacts = Vec; type OutputArtifacts = RecursionTipArtifacts; + type BlobUrls = String; /// Loads all proofs for a given recursion tip's job ids. /// Note that recursion tip may not have proofs for some specific circuits (because the batch didn't contain them). @@ -77,7 +78,7 @@ impl ArtifactsManager for RecursionTipWitnessGenerator { job_id: u32, artifacts: Self::OutputArtifacts, object_store: &dyn ObjectStore, - ) -> BlobUrls { + ) -> String { let key = FriCircuitKey { block_number: L1BatchNumber(job_id), circuit_id: 255, @@ -86,29 +87,22 @@ impl ArtifactsManager for RecursionTipWitnessGenerator { aggregation_round: AggregationRound::RecursionTip, }; - let blob_url = object_store + object_store .put( key, &CircuitWrapper::Recursive(artifacts.recursion_tip_circuit.clone()), ) .await - .unwrap(); - - BlobUrls::Url(blob_url) + .unwrap() } async fn save_to_database( connection_pool: &ConnectionPool, job_id: u32, started_at: Instant, - blob_urls: BlobUrls, + blob_urls: String, _artifacts: Self::OutputArtifacts, ) -> anyhow::Result<()> { - let blob_url = match blob_urls { - BlobUrls::Url(url) => url, - _ => panic!("Unexpected blob urls type"), - }; - let mut prover_connection = connection_pool.connection().await?; let mut transaction = prover_connection.start_transaction().await?; let protocol_version_id = transaction @@ -123,7 +117,7 @@ impl ArtifactsManager for RecursionTipWitnessGenerator { 0, 0, AggregationRound::RecursionTip, - &blob_url, + &blob_urls, false, protocol_version_id, ) diff --git a/prover/crates/bin/witness_generator/src/scheduler/artifacts.rs b/prover/crates/bin/witness_generator/src/scheduler/artifacts.rs index aa72f92ca6f..77d1da685d0 100644 --- a/prover/crates/bin/witness_generator/src/scheduler/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/scheduler/artifacts.rs @@ -8,7 +8,7 @@ use zksync_prover_fri_types::{keys::FriCircuitKey, CircuitWrapper, FriProofWrapp use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber}; use crate::{ - artifacts::{ArtifactsManager, BlobUrls}, + artifacts::ArtifactsManager, scheduler::{SchedulerArtifacts, SchedulerWitnessGenerator}, }; @@ -17,6 +17,7 @@ impl ArtifactsManager for SchedulerWitnessGenerator { type InputMetadata = u32; type InputArtifacts = FriProofWrapper; type OutputArtifacts = SchedulerArtifacts; + type BlobUrls = String; async fn get_artifacts( metadata: &Self::InputMetadata, @@ -31,7 +32,7 @@ impl ArtifactsManager for SchedulerWitnessGenerator { job_id: u32, artifacts: Self::OutputArtifacts, object_store: &dyn ObjectStore, - ) -> BlobUrls { + ) -> String { let key = FriCircuitKey { block_number: L1BatchNumber(job_id), circuit_id: 1, @@ -40,29 +41,22 @@ impl ArtifactsManager for SchedulerWitnessGenerator { aggregation_round: AggregationRound::Scheduler, }; - let blob_url = object_store + object_store .put( key, &CircuitWrapper::Recursive(artifacts.scheduler_circuit.clone()), ) .await - .unwrap(); - - BlobUrls::Url(blob_url) + .unwrap() } async fn save_to_database( connection_pool: &ConnectionPool, job_id: u32, started_at: Instant, - blob_urls: BlobUrls, + blob_urls: String, _artifacts: Self::OutputArtifacts, ) -> anyhow::Result<()> { - let blob_url = match blob_urls { - BlobUrls::Url(url) => url, - _ => panic!("Unexpected blob urls type"), - }; - let mut prover_connection = connection_pool.connection().await?; let mut transaction = prover_connection.start_transaction().await?; let protocol_version_id = transaction @@ -77,7 +71,7 @@ impl ArtifactsManager for SchedulerWitnessGenerator { 0, 0, AggregationRound::Scheduler, - &blob_url, + &blob_urls, false, protocol_version_id, ) diff --git a/prover/crates/bin/witness_generator/tests/basic_test.rs b/prover/crates/bin/witness_generator/tests/basic_test.rs index 1b85d89de08..379ddc3a4eb 100644 --- a/prover/crates/bin/witness_generator/tests/basic_test.rs +++ b/prover/crates/bin/witness_generator/tests/basic_test.rs @@ -66,7 +66,8 @@ async fn test_leaf_witness_gen() { Some(500), Instant::now(), ) - .await?; + .await + .unwrap(); let aggregations = AggregationWrapper(artifacts.aggregations); @@ -160,7 +161,8 @@ async fn test_node_witness_gen() { Some(500), Instant::now(), ) - .await?; + .await + .unwrap(); let aggregations = AggregationWrapper(artifacts.next_aggregations); From 662e9d245b6fc86753bd921ee1d7478e1412497e Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Wed, 18 Sep 2024 11:28:45 +0300 Subject: [PATCH 12/12] remove BlobUrl trait --- prover/crates/bin/witness_generator/src/artifacts.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/prover/crates/bin/witness_generator/src/artifacts.rs b/prover/crates/bin/witness_generator/src/artifacts.rs index 6b21a4cdb93..7c444da047b 100644 --- a/prover/crates/bin/witness_generator/src/artifacts.rs +++ b/prover/crates/bin/witness_generator/src/artifacts.rs @@ -9,18 +9,13 @@ pub(crate) struct AggregationBlobUrls { pub aggregation_urls: String, pub circuit_ids_and_urls: Vec<(u8, String)>, } -/// Marker trait for managing different types of blob urls -pub trait BlobUrl {} - -impl BlobUrl for AggregationBlobUrls {} -impl BlobUrl for String {} #[async_trait] pub(crate) trait ArtifactsManager { type InputMetadata; type InputArtifacts; type OutputArtifacts; - type BlobUrls: BlobUrl; + type BlobUrls; async fn get_artifacts( metadata: &Self::InputMetadata,