Skip to content

Commit

Permalink
Send/receive TEE prover inputs via pub/sub.
Browse files Browse the repository at this point in the history
Need to run `zk server` and `zksync_tee_prover` with env variable
GOOGLE_APPLICATION_CREDENTIALS_JSON set to credentials file.
  • Loading branch information
thomasknauth committed May 15, 2024
1 parent cf80184 commit 4de7fee
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ members = [
"core/bin/verified_sources_fetcher",
"core/bin/zksync_server",
"core/bin/genesis_generator",
"core/bin/tee_prover",
# Node services
"core/node/node_framework",
"core/node/proof_data_handler",
Expand Down Expand Up @@ -102,6 +103,7 @@ ethabi = "18.0.0"
flate2 = "1.0.28"
futures = "0.3"
google-cloud-auth = "0.13.0"
google-cloud-pubsub = "0.24"
google-cloud-storage = "0.15.0"
governor = "0.4.2"
hex = "0.4"
Expand Down
41 changes: 41 additions & 0 deletions core/bin/tee_prover/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
[package]
name = "zksync_tee_prover"
version = "0.1.0"
edition.workspace = true
authors.workspace = true
homepage.workspace = true
repository.workspace = true
license.workspace = true
keywords.workspace = true
categories.workspace = true
publish = false

[dependencies]
zksync_config.workspace = true
zksync_env_config.workspace = true
zksync_protobuf_config.workspace = true
zksync_storage.workspace = true
zksync_utils.workspace = true
zksync_types.workspace = true
zksync_core.workspace = true
zksync_object_store.workspace = true
zksync_tee_verifier.workspace = true
google-cloud-pubsub.workspace = true
google-cloud-auth.workspace = true

# Consensus dependenices
zksync_consensus_crypto.workspace = true
zksync_consensus_roles.workspace = true
zksync_consensus_executor.workspace = true
zksync_concurrency.workspace = true
vlog.workspace = true

anyhow.workspace = true
clap = { workspace = true, features = ["derive"] }
serde_json.workspace = true
tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
futures.workspace = true
futures-util = "0.3"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator.workspace = true
85 changes: 85 additions & 0 deletions core/bin/tee_prover/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use anyhow::Context as _;
use clap::Parser;
use futures_util::StreamExt;
use google_cloud_pubsub::client::{Client, ClientConfig};
use zksync_core::temp_config_store::decode_yaml_repr;
use zksync_object_store::StoredObject;
use zksync_tee_verifier::TeeVerifierInput;

#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[derive(Debug, Parser)]
#[command(author = "Matter Labs", version, about = "zkSync operator node", long_about = None)]
struct Cli {
/// Path to the yaml config. If set, it will be used instead of env vars.
#[arg(long)]
config_path: Option<std::path::PathBuf>,
/// Path to the yaml with secrets. If set, it will be used instead of env vars.
#[arg(long)]
secrets_path: Option<std::path::PathBuf>,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let opt = Cli::parse();

let configs = match opt.config_path {
None => panic!(),
Some(path) => {
let yaml =
std::fs::read_to_string(&path).with_context(|| path.display().to_string())?;
decode_yaml_repr::<zksync_protobuf_config::proto::general::GeneralConfig>(&yaml)
.context("failed decoding general YAML config")?
}
};

let observability_config = configs
.observability
.clone()
.context("observability config")?;

let log_format: vlog::LogFormat = observability_config
.log_format
.parse()
.context("Invalid log format")?;

let mut builder = vlog::ObservabilityBuilder::new().with_log_format(log_format);
if let Some(log_directives) = observability_config.log_directives {
builder = builder.with_log_directives(log_directives);
}

let _guard = builder.build();

let pubsub_config = ClientConfig::default().with_auth().await?;
let client = Client::new(pubsub_config).await?;

let subscription_name = "sgx-prover-inputs";
let subscription = client.subscription(&subscription_name);
if !subscription.exists(None).await? {
return Err(anyhow::anyhow!("subscription missing"));
}

let mut iter: google_cloud_pubsub::subscription::MessageStream =
subscription.subscribe(None).await?;

while let Some(message) = iter.next().await {
let _ = message.ack().await?;

tracing::debug!("Received input. Generating proof ...");

if message.message.data.len() == 0 {
tracing::info!("empty data field");
continue;
}

let input =
TeeVerifierInput::deserialize(message.message.data).expect("Deserialization error");
let result = input.verify();
if result.is_ok() {
tracing::info!("Successfully proved a batch!");
}
}

Ok(())
}
2 changes: 2 additions & 0 deletions core/lib/zksync_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ zksync_node_genesis.workspace = true
zksync_eth_sender.workspace = true
zksync_node_fee_model.workspace = true
multivm.workspace = true
google-cloud-pubsub.workspace = true
google-cloud-googleapis = "0.12"

# Consensus dependenices
zksync_concurrency.workspace = true
Expand Down
36 changes: 36 additions & 0 deletions core/lib/zksync_core/src/tee_verifier_input_producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ use std::{sync::Arc, time::Instant};
use anyhow::Context;
use async_trait::async_trait;
use multivm::zk_evm_latest::ethereum_types::H256;
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::{
client::{Client, ClientConfig},
publisher::Publisher,
};
use tokio::{runtime::Handle, task::JoinHandle};
use vm_utils::storage::L1BatchParamsProvider;
use zksync_dal::{tee_verifier_input_producer_dal::JOB_MAX_ATTEMPT, ConnectionPool, Core, CoreDal};
Expand All @@ -33,6 +38,8 @@ pub struct TeeVerifierInputProducer {
connection_pool: ConnectionPool<Core>,
l2_chain_id: L2ChainId,
object_store: Arc<dyn ObjectStore>,
pubsub_client: Client,
publisher: Publisher,
}

impl TeeVerifierInputProducer {
Expand All @@ -41,10 +48,26 @@ impl TeeVerifierInputProducer {
store_factory: &ObjectStoreFactory,
l2_chain_id: L2ChainId,
) -> anyhow::Result<Self> {
// TODO: Supply credentials via alternate method, e.g., Google/Azure credentials mgmt solution.
// Right now, need to invoke binary with `GOOGLE_APPLICATION_CREDENTIALS=key.json zk server`.
let pubsub_config = ClientConfig::default().with_auth().await?;

let pubsub_client = Client::new(pubsub_config).await?;
let topic_name = "tee-prover-inputs";
let topic = pubsub_client.topic(topic_name);

if !topic.exists(None).await? {
return Err(anyhow::anyhow!(format!("topic {} missing", topic_name)));
}

let publisher = topic.new_publisher(None);

Ok(TeeVerifierInputProducer {
connection_pool,
object_store: store_factory.create_store().await,
l2_chain_id,
pubsub_client,
publisher,
})
}

Expand Down Expand Up @@ -263,6 +286,19 @@ impl JobProcessor for TeeVerifierInputProducer {
.commit()
.await
.context("failed to commit DB transaction for TeeVerifierInputProducer")?;

let serialized =
<TeeVerifierInput as zksync_object_store::StoredObject>::serialize(&artifacts)
.expect("Failed to serialize TeeVerifierInput.");

let msg = PubsubMessage {
data: serialized,
..Default::default()
};

let awaiter = self.publisher.publish(msg).await;
awaiter.get().await?;

METRICS.block_number_processed.set(job_id.0 as i64);
Ok(())
}
Expand Down

0 comments on commit 4de7fee

Please sign in to comment.