Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(prover): WitnessGenerator refactoring #2 #2899

Merged
merged 12 commits into from
Sep 19, 2024
24 changes: 6 additions & 18 deletions prover/crates/bin/witness_generator/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,33 @@ use zksync_prover_dal::{ConnectionPool, Prover};

#[derive(Debug)]
pub(crate) struct AggregationBlobUrls {
pub aggregations_urls: String,
pub aggregation_urls: String,
pub circuit_ids_and_urls: Vec<(u8, String)>,
}

#[derive(Debug)]
pub(crate) struct SchedulerBlobUrls {
pub circuit_ids_and_urls: Vec<(u8, String)>,
pub closed_form_inputs_and_urls: Vec<(u8, String, usize)>,
pub scheduler_witness_url: String,
}

pub(crate) enum BlobUrls {
Url(String),
Aggregation(AggregationBlobUrls),
Scheduler(SchedulerBlobUrls),
}

#[async_trait]
pub(crate) trait ArtifactsManager {
type InputMetadata;
type InputArtifacts;
type OutputArtifacts;
type BlobUrls;

async fn get_artifacts(
metadata: &Self::InputMetadata,
object_store: &dyn ObjectStore,
) -> anyhow::Result<Self::InputArtifacts>;

async fn save_artifacts(
async fn save_to_bucket(
job_id: u32,
artifacts: Self::OutputArtifacts,
object_store: &dyn ObjectStore,
) -> BlobUrls;
) -> Self::BlobUrls;

async fn update_database(
async fn save_to_database(
connection_pool: &ConnectionPool<Prover>,
job_id: u32,
started_at: Instant,
blob_urls: BlobUrls,
blob_urls: Self::BlobUrls,
artifacts: Self::OutputArtifacts,
) -> anyhow::Result<()>;
}
30 changes: 12 additions & 18 deletions prover/crates/bin/witness_generator/src/basic_circuits/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use zksync_prover_fri_utils::get_recursive_layer_circuit_id_for_base_layer;
use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber};

use crate::{
artifacts::{ArtifactsManager, BlobUrls},
artifacts::ArtifactsManager,
basic_circuits::{BasicCircuitArtifacts, BasicWitnessGenerator, BasicWitnessGeneratorJob},
utils::SchedulerPartialInputWrapper,
};
Expand All @@ -18,6 +18,7 @@ impl ArtifactsManager for BasicWitnessGenerator {
type InputMetadata = L1BatchNumber;
type InputArtifacts = BasicWitnessGeneratorJob;
type OutputArtifacts = BasicCircuitArtifacts;
type BlobUrls = String;
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved

async fn get_artifacts(
metadata: &Self::InputMetadata,
Expand All @@ -31,38 +32,31 @@ impl ArtifactsManager for BasicWitnessGenerator {
})
}

async fn save_artifacts(
async fn save_to_bucket(
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved
job_id: u32,
artifacts: Self::OutputArtifacts,
object_store: &dyn ObjectStore,
) -> BlobUrls {
) -> String {
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved
let aux_output_witness_wrapper = AuxOutputWitnessWrapper(artifacts.aux_output_witness);
object_store
.put(L1BatchNumber(job_id), &aux_output_witness_wrapper)
.await
.unwrap();
let wrapper = SchedulerPartialInputWrapper(artifacts.scheduler_witness);
let url = object_store
object_store
.put(L1BatchNumber(job_id), &wrapper)
.await
.unwrap();

BlobUrls::Url(url)
.unwrap()
}

#[tracing::instrument(skip_all, fields(l1_batch = %job_id))]
async fn update_database(
async fn save_to_database(
connection_pool: &ConnectionPool<Prover>,
job_id: u32,
started_at: Instant,
blob_urls: BlobUrls,
_artifacts: Self::OutputArtifacts,
blob_urls: String,
artifacts: Self::OutputArtifacts,
) -> anyhow::Result<()> {
let blob_urls = match blob_urls {
BlobUrls::Scheduler(blobs) => blobs,
_ => unreachable!(),
};

let mut connection = connection_pool
.connection()
.await
Expand All @@ -79,7 +73,7 @@ impl ArtifactsManager for BasicWitnessGenerator {
.fri_prover_jobs_dal()
.insert_prover_jobs(
L1BatchNumber(job_id),
blob_urls.circuit_ids_and_urls,
artifacts.circuit_urls,
AggregationRound::BasicCircuits,
0,
protocol_version_id,
Expand All @@ -89,8 +83,8 @@ impl ArtifactsManager for BasicWitnessGenerator {
.fri_witness_generator_dal()
.create_aggregation_jobs(
L1BatchNumber(job_id),
&blob_urls.closed_form_inputs_and_urls,
&blob_urls.scheduler_witness_url,
&artifacts.queue_urls,
&blob_urls,
get_recursive_layer_circuit_id_for_base_layer,
protocol_version_id,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ use anyhow::Context as _;
use tracing::Instrument;
use zksync_prover_dal::ProverDal;
use zksync_prover_fri_types::{get_current_pod_name, AuxOutputWitnessWrapper};
use zksync_prover_keystore::keystore::Keystore;
use zksync_queued_job_processor::{async_trait, JobProcessor};
use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber};

use crate::{
artifacts::{ArtifactsManager, BlobUrls, SchedulerBlobUrls},
artifacts::ArtifactsManager,
basic_circuits::{BasicCircuitArtifacts, BasicWitnessGenerator, BasicWitnessGeneratorJob},
metrics::WITNESS_GENERATOR_METRICS,
witness_generator::WitnessGenerator,
};

#[async_trait]
Expand All @@ -35,19 +37,15 @@ impl JobProcessor for BasicWitnessGenerator {
)
.await
{
Some(block_number) => {
tracing::info!(
"Processing FRI basic witness-gen for block {}",
block_number
);
let started_at = Instant::now();
let job = Self::get_artifacts(&block_number, &*self.object_store).await?;

WITNESS_GENERATOR_METRICS.blob_fetch_time[&AggregationRound::BasicCircuits.into()]
.observe(started_at.elapsed());

Ok(Some((block_number, job)))
}
Some(block_number) => Ok(Some((
block_number,
<Self as WitnessGenerator>::prepare_job(
block_number,
&*self.object_store,
Keystore::locate(), // todo: this should be removed
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved
)
.await?,
Artemka374 marked this conversation as resolved.
Show resolved Hide resolved
))),
None => Ok(None),
}
}
Expand All @@ -73,11 +71,15 @@ impl JobProcessor for BasicWitnessGenerator {
let max_circuits_in_flight = self.config.max_circuits_in_flight;
tokio::spawn(async move {
let block_number = job.block_number;
Ok(
Self::process_job_impl(object_store, job, started_at, max_circuits_in_flight)
.instrument(tracing::info_span!("basic_circuit", %block_number))
.await,
<Self as WitnessGenerator>::process_job(
job,
object_store,
Some(max_circuits_in_flight),
started_at,
)
.instrument(tracing::info_span!("basic_circuit", %block_number))
.await
.map(Some)
})
}

Expand All @@ -92,8 +94,6 @@ impl JobProcessor for BasicWitnessGenerator {
None => Ok(()),
Some(artifacts) => {
let blob_started_at = Instant::now();
let circuit_urls = artifacts.circuit_urls.clone();
let queue_urls = artifacts.queue_urls.clone();

let aux_output_witness_wrapper =
AuxOutputWitnessWrapper(artifacts.aux_output_witness.clone());
Expand All @@ -105,26 +105,17 @@ impl JobProcessor for BasicWitnessGenerator {
.unwrap();
}

let scheduler_witness_url =
match Self::save_artifacts(job_id.0, artifacts.clone(), &*self.object_store)
.await
{
BlobUrls::Url(url) => url,
_ => unreachable!(),
};
let blob_urls =
Self::save_to_bucket(job_id.0, artifacts.clone(), &*self.object_store).await;

WITNESS_GENERATOR_METRICS.blob_save_time[&AggregationRound::BasicCircuits.into()]
.observe(blob_started_at.elapsed());

Self::update_database(
Self::save_to_database(
&self.prover_connection_pool,
job_id.0,
started_at,
BlobUrls::Scheduler(SchedulerBlobUrls {
circuit_ids_and_urls: circuit_urls,
closed_form_inputs_and_urls: queue_urls,
scheduler_witness_url,
}),
blob_urls,
artifacts,
)
.await?;
Expand Down
Loading
Loading