Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: health check in worker #1006

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Changes

- Added health check endpoints to the prover service (#1006).
- Implemented serialization for `AccountHeader` (#996).
- Updated Pingora crates to 0.4 and added polling time to the configuration file (#997).
- Added support for `miden-tx-prover` proxy to update workers on a running proxy (#989).
Expand Down
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/tx-prover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ serde_qs = { version = "0.13" }
tokio = { version = "1.38", optional = true, features = ["full"] }
tokio-stream = { version = "0.1", optional = true, features = [ "net" ]}
toml = { version = "0.8" }
tonic-health = { version = "0.12" }
tonic-web = { version = "0.12", optional = true }
tracing = { version = "0.1", optional = true }
tracing-subscriber = { version = "0.3", features = ["fmt", "json", "env-filter"], optional = true }
Expand Down
8 changes: 8 additions & 0 deletions bin/tx-prover/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ max_queue_items = 10
max_retries_per_request = 1
# Maximum amount of requests that a given IP address can make per second
max_req_per_sec = 5
# Interval to check the health of the workers
health_check_interval_secs = 1

[[workers]]
host = "0.0.0.0"
Expand Down Expand Up @@ -102,6 +104,12 @@ This changes will be persisted to the configuration file.

Note that, in order to update the workers, the proxy must be running in the same computer as the command is being executed because it will check if the client address is localhost to avoid any security issues.

### Health check

The worker service implements the [gRPC Health Check](https://grpc.io/docs/guides/health-checking/) standard, and includes the methods described in this [official proto file](https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto).

The proxy service uses this health check to determine if a worker is available to receive requests. If a worker is not available, it will be removed from the set of workers that the proxy can use to send requests, and will persist this change in the configuration file.

## Logging

Both the worker and the proxy will use the `info` log level by default, but it can be changed by setting the `RUST_LOG` environment variable.
Expand Down
31 changes: 20 additions & 11 deletions bin/tx-prover/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use init::Init;
use miden_tx_prover::PROVER_SERVICE_CONFIG_FILE_NAME;
use proxy::StartProxy;
use serde::{Deserialize, Serialize};
use tracing::debug;
use update_workers::{AddWorkers, RemoveWorkers, UpdateWorkers};
use worker::StartWorker;

Expand Down Expand Up @@ -41,6 +42,8 @@ pub struct ProxyConfig {
pub max_req_per_sec: isize,
/// Time in milliseconds to poll available workers.
pub available_workers_polling_time_ms: u64,
/// Health check interval in seconds.
pub health_check_interval_secs: u64,
}

impl Default for ProxyConfig {
Expand All @@ -55,6 +58,7 @@ impl Default for ProxyConfig {
max_retries_per_request: 1,
max_req_per_sec: 5,
available_workers_polling_time_ms: 20,
health_check_interval_secs: 1,
}
}
}
Expand Down Expand Up @@ -96,10 +100,19 @@ impl ProxyConfig {
.write(config_as_toml_string.as_bytes())
.map_err(|err| format!("error writing to file: {err}"))?;

println!("Config updated successfully");
debug!("Config updated successfully");

Ok(())
}

/// Updates the workers in the configuration with the new list.
pub(crate) fn set_workers(workers: Vec<WorkerConfig>) -> Result<(), String> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this could be done as part of the incoming follow-up work, but we should probably check (unless it's being dome somewhere already) that there are no duplicate workers at any point (both in the worker list and persisted config file) to avoid problems if the user accidentally adds the same address/port twice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not being checked. Though the worker config is planned to be removed from the configuration file. I think we can dismiss this for now and use that issue to fix this.

let mut proxy_config = Self::load_config_from_file()?;

proxy_config.workers = workers;

proxy_config.save_to_config_file()
}
}

/// Configuration for a worker
Expand Down Expand Up @@ -137,7 +150,7 @@ pub enum Command {
/// values. The file will be named as defined in the
/// [miden_tx_prover::PROVER_SERVICE_CONFIG_FILE_NAME] constant.
Init(Init),
/// Starts the workers defined in the config file.
/// Starts the workers with the configuration defined in the command.
StartWorker(StartWorker),
/// Starts the proxy defined in the config file.
StartProxy(StartProxy),
Expand All @@ -155,26 +168,22 @@ pub enum Command {

/// CLI entry point
impl Cli {
pub fn execute(&self) -> Result<(), String> {
pub async fn execute(&self) -> Result<(), String> {
match &self.action {
// For the `StartWorker` command, we need to create a new runtime and run the worker
Command::StartWorker(worker_init) => {
let rt = tokio::runtime::Runtime::new()
.map_err(|e| format!("Failed to create runtime: {:?}", e))?;
rt.block_on(worker_init.execute())
},
Command::StartProxy(proxy_init) => proxy_init.execute(),
Command::StartWorker(worker_init) => worker_init.execute().await,
Command::StartProxy(proxy_init) => proxy_init.execute().await,
Command::Init(init) => {
// Init does not require async, so run directly
init.execute()
},
Command::AddWorkers(update_workers) => {
let update_workers: UpdateWorkers = update_workers.clone().into();
update_workers.execute()
update_workers.execute().await
},
Command::RemoveWorkers(update_workers) => {
let update_workers: UpdateWorkers = update_workers.clone().into();
update_workers.execute()
update_workers.execute().await
},
}
}
Expand Down
33 changes: 23 additions & 10 deletions bin/tx-prover/src/commands/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use clap::Parser;
use pingora::{apps::HttpServerOptions, lb::Backend, prelude::Opt, server::Server};
use pingora::{
apps::HttpServerOptions,
lb::Backend,
prelude::{background_service, Opt},
server::Server,
};
use pingora_proxy::http_proxy_service;

use crate::proxy::LoadBalancer;
use crate::proxy::{LoadBalancer, LoadBalancerState};

/// Starts the proxy defined in the config file.
#[derive(Debug, Parser)]
Expand All @@ -13,8 +18,8 @@ impl StartProxy {
///
/// This method will first read the config file to get the list of workers to start. It will
/// then start a proxy with each worker as a backend.
pub fn execute(&self) -> Result<(), String> {
let mut server = Server::new(Some(Opt::default())).expect("Failed to create server");
pub async fn execute(&self) -> Result<(), String> {
let mut server = Server::new(Some(Opt::default())).map_err(|err| err.to_string())?;
server.bootstrap();

let proxy_config = super::ProxyConfig::load_config_from_file()?;
Expand All @@ -23,25 +28,33 @@ impl StartProxy {
.workers
.iter()
.map(|worker| format!("{}:{}", worker.host, worker.port))
.map(|worker| Backend::new(&worker).expect("Failed to create backend"))
.collect::<Vec<Backend>>();
.map(|worker| Backend::new(&worker).map_err(|err| err.to_string()))
.collect::<Result<Vec<Backend>, String>>()?;

let worker_lb = LoadBalancer::new(workers, &proxy_config);
let worker_lb = LoadBalancerState::new(workers, &proxy_config).await?;

let health_check_service = background_service("health_check", worker_lb);
let worker_lb = health_check_service.task();

// Set up the load balancer
let mut lb = http_proxy_service(&server.configuration, worker_lb);
let mut lb = http_proxy_service(&server.configuration, LoadBalancer(worker_lb));

let proxy_host = proxy_config.host;
let proxy_port = proxy_config.port.to_string();
lb.add_tcp(format!("{}:{}", proxy_host, proxy_port).as_str());
let logic = lb.app_logic_mut().expect("No app logic found");
let logic = lb.app_logic_mut().ok_or("Failed to get app logic")?;
let mut http_server_options = HttpServerOptions::default();

// Enable HTTP/2 for plaintext
http_server_options.h2c = true;
logic.server_options = Some(http_server_options);

server.add_service(health_check_service);
server.add_service(lb);
server.run_forever();
tokio::task::spawn_blocking(|| server.run_forever())
.await
.map_err(|err| err.to_string())?;
Comment on lines +54 to +56
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to run run_forever in a separate thread if we instantly wait for it to end?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do so because Pingora create a new runtime in .run_forever() without the possibility of passing and existing one. The tokio::task::spawn_block was introduced in order to avoid a panic each time that we instantiate a proxy.


Ok(())
}
}
6 changes: 2 additions & 4 deletions bin/tx-prover/src/commands/update_workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,8 @@ impl UpdateWorkers {
/// - If the request fails.
/// - If the status code is not successful.
/// - If the X-Worker-Count header is missing.
pub fn execute(&self) -> Result<(), String> {
pub async fn execute(&self) -> Result<(), String> {
// Define a runtime
let rt = tokio::runtime::Runtime::new()
.map_err(|e| format!("Failed to create runtime: {:?}", e))?;

let query_params = serde_qs::to_string(&self).map_err(|err| err.to_string())?;

Expand All @@ -79,7 +77,7 @@ impl UpdateWorkers {
.map_err(|err| err.to_string())?;

// Make the request
let response = rt.block_on(client.get(url).send()).map_err(|err| err.to_string())?;
let response = client.get(url).send().await.map_err(|err| err.to_string())?;

// Check status code
if !response.status().is_success() {
Expand Down
13 changes: 13 additions & 0 deletions bin/tx-prover/src/commands/worker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use clap::Parser;
use miden_tx_prover::generated::api_server::ApiServer;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tonic_health::server::health_reporter;
use tracing::info;

use crate::api::RpcListener;
Expand All @@ -22,6 +24,10 @@ impl StartWorker {
/// This method receives the host and port from the CLI and starts a worker on that address.
/// In case that one of the parameters is not provided, it will default to `0.0.0.0` for the
/// host and `50051` for the port.
///
/// The worker includes a health reporter that will mark the service as serving, following the
/// [gRPC health checking protocol](
/// https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto).
pub async fn execute(&self) -> Result<(), String> {
let worker_addr = format!("{}:{}", self.host, self.port);
let rpc =
Expand All @@ -32,9 +38,16 @@ impl StartWorker {
rpc.listener.local_addr().map_err(|err| err.to_string())?
);

// Create a health reporter
let (mut health_reporter, health_service) = health_reporter();

// Mark the service as serving
health_reporter.set_serving::<ApiServer<RpcListener>>().await;

tonic::transport::Server::builder()
.accept_http1(true)
.add_service(tonic_web::enable(rpc.api_service))
.add_service(health_service)
.serve_with_incoming(TcpListenerStream::new(rpc.listener))
.await
.map_err(|err| err.to_string())?;
Expand Down
5 changes: 3 additions & 2 deletions bin/tx-prover/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ mod utils;
use commands::Cli;
use utils::setup_tracing;

fn main() -> Result<(), String> {
#[tokio::main]
async fn main() -> Result<(), String> {
use clap::Parser;

setup_tracing();
Expand All @@ -14,7 +15,7 @@ fn main() -> Result<(), String> {
let cli = Cli::parse();

// execute cli action
cli.execute()
cli.execute().await
}

// TESTS
Expand Down
Loading
Loading