Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(prover): Add sending scale requests for Scaler targets #3194

Merged
merged 10 commits into from
Oct 30, 2024
5 changes: 4 additions & 1 deletion core/lib/config/src/configs/prover_autoscaler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pub struct ProverAutoscalerScalerConfig {
pub long_pending_duration: Duration,
/// List of simple autoscaler targets.
pub scaler_targets: Vec<ScalerTarget>,
/// If dry-run enabled don't send any scale requests.
#[serde(default)]
pub dry_run: bool,
}

#[derive(
Expand Down Expand Up @@ -122,7 +125,7 @@ pub enum QueueReportFields {
#[derive(Debug, Clone, PartialEq, Deserialize, Default)]
pub struct ScalerTarget {
pub queue_report_field: QueueReportFields,
pub pod_name_prefix: String,
pub deployment: String,
/// Max replicas per cluster.
pub max_replicas: HashMap<String, usize>,
/// The queue will be divided by the speed and rounded up to get number of replicas.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ message MaxReplica {

message ScalerTarget {
optional string queue_report_field = 1; // required
optional string pod_name_prefix = 2; // required
optional string deployment = 5; // required
repeated MaxReplica max_replicas = 3; // required at least one
optional uint64 speed = 4; // optional
reserved 2; reserved "pod_name_prefix";
}

message ProverAutoscalerScalerConfig {
Expand All @@ -69,4 +70,5 @@ message ProverAutoscalerScalerConfig {
repeated MaxProver max_provers = 9; // optional
repeated MinProver min_provers = 10; // optional
repeated ScalerTarget scaler_targets = 11; // optional
optional bool dry_run = 12; // optional
}
8 changes: 4 additions & 4 deletions core/lib/protobuf_config/src/prover_autoscaler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig {
.enumerate()
.map(|(i, x)| x.read().context(i).unwrap())
.collect::<Vec<_>>(),
dry_run: self.dry_run.unwrap_or_default(),
})
}

Expand Down Expand Up @@ -158,6 +159,7 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig {
.map(|(k, v)| proto::MinProver::build(&(k.clone(), *v)))
.collect(),
scaler_targets: this.scaler_targets.iter().map(ProtoRepr::build).collect(),
dry_run: Some(this.dry_run),
}
}
}
Expand Down Expand Up @@ -269,9 +271,7 @@ impl ProtoRepr for proto::ScalerTarget {
queue_report_field: required(&self.queue_report_field)
.and_then(|x| Ok((*x).parse()?))
.context("queue_report_field")?,
pod_name_prefix: required(&self.pod_name_prefix)
.context("pod_name_prefix")?
.clone(),
deployment: required(&self.deployment).context("deployment")?.clone(),
max_replicas: self
.max_replicas
.iter()
Expand All @@ -289,7 +289,7 @@ impl ProtoRepr for proto::ScalerTarget {
fn build(this: &Self::Type) -> Self {
Self {
queue_report_field: Some(this.queue_report_field.to_string()),
pod_name_prefix: Some(this.pod_name_prefix.clone()),
deployment: Some(this.deployment.clone()),
max_replicas: this
.max_replicas
.iter()
Expand Down
2 changes: 1 addition & 1 deletion prover/crates/bin/prover_autoscaler/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub struct ScaleRequest {
pub deployments: Vec<ScaleDeploymentRequest>,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct ScaleResponse {
pub scale_result: Vec<String>,
}
Expand Down
92 changes: 66 additions & 26 deletions prover/crates/bin/prover_autoscaler/src/global/scaler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub struct GpuScaler {

pub struct SimpleScaler {
queue_report_field: QueueReportFields,
pod_name_prefix: String,
deployment: String,
/// Which cluster to use first.
cluster_priorities: HashMap<String, u32>,
max_replicas: HashMap<String, usize>,
Expand Down Expand Up @@ -365,6 +365,47 @@ impl GpuScaler {

provers
}

fn diff(
namespace: &str,
provers: HashMap<GPUPoolKey, u32>,
clusters: &Clusters,
requests: &mut HashMap<String, ScaleRequest>,
) {
provers
.into_iter()
.for_each(|(GPUPoolKey { cluster, gpu }, replicas)| {
let prover = gpu_to_prover(gpu);
clusters
.clusters
.get(&cluster)
.and_then(|c| c.namespaces.get(namespace))
.and_then(|ns| ns.deployments.get(&prover))
.map_or_else(
|| {
tracing::error!(
"Wasn't able to find deployment {} in cluster {}, namespace {}",
prover,
cluster,
namespace
)
},
|deployment| {
if deployment.desired != replicas as i32 {
requests
.entry(cluster.clone())
.or_default()
.deployments
.push(ScaleDeploymentRequest {
namespace: namespace.into(),
name: prover.clone(),
size: replicas as i32,
});
}
},
);
})
}
}

#[derive(Default, Debug, PartialEq, Eq)]
Expand All @@ -389,7 +430,7 @@ impl SimpleScaler {
) -> Self {
Self {
queue_report_field: config.queue_report_field.clone(),
pod_name_prefix: config.pod_name_prefix.clone(),
deployment: config.deployment.clone(),
cluster_priorities,
max_replicas: config.max_replicas.clone(),
speed: config.speed,
Expand Down Expand Up @@ -418,7 +459,7 @@ impl SimpleScaler {
// Initialize pool only if we have ready deployments.
pool.pods.insert(PodStatus::Running, 0);

let pod_re = Regex::new(&format!("^{}-", self.pod_name_prefix)).unwrap();
let pod_re = Regex::new(&format!("^{}-", self.deployment)).unwrap();
for (_, pod) in namespace_value
.pods
.iter()
Expand Down Expand Up @@ -551,47 +592,46 @@ impl SimpleScaler {

pods
}
}

fn diff(
namespace: &str,
provers: HashMap<GPUPoolKey, u32>,
clusters: &Clusters,
requests: &mut HashMap<String, ScaleRequest>,
) {
provers
.into_iter()
.for_each(|(GPUPoolKey { cluster, gpu }, n)| {
let prover = gpu_to_prover(gpu);
fn diff(
&self,
namespace: &str,
replicas: HashMap<String, usize>,
clusters: &Clusters,
requests: &mut HashMap<String, ScaleRequest>,
) {
let deployment_name = self.deployment.clone();
replicas.into_iter().for_each(|(cluster, replicas)| {
clusters
.clusters
.get(&cluster)
.and_then(|c| c.namespaces.get(namespace))
.and_then(|ns| ns.deployments.get(&prover))
.and_then(|ns| ns.deployments.get(&deployment_name))
.map_or_else(
|| {
tracing::error!(
"Wasn't able to find deployment {} in cluster {}, namespace {}",
prover,
deployment_name,
cluster,
namespace
)
},
|d| {
if d.desired != n as i32 {
|deployment| {
if deployment.desired != replicas as i32 {
requests
.entry(cluster.clone())
.or_default()
.deployments
.push(ScaleDeploymentRequest {
namespace: namespace.into(),
name: prover.clone(),
size: n as i32,
name: deployment_name.clone(),
size: replicas as i32,
});
}
},
);
})
}
}

/// is_namespace_running returns true if there are some pods running in it.
Expand Down Expand Up @@ -638,7 +678,7 @@ impl Task for Scaler {
AUTOSCALER_METRICS.provers[&(k.cluster.clone(), ns.clone(), k.gpu)]
.set(*num as u64);
}
diff(ns, provers, &guard.clusters, &mut scale_requests);
GpuScaler::diff(ns, provers, &guard.clusters, &mut scale_requests);
}

// Simple Scalers.
Expand All @@ -647,15 +687,15 @@ impl Task for Scaler {
.get(&(ppv.to_string(), scaler.queue_report_field.clone()))
.cloned()
.unwrap_or(0);
tracing::debug!("Running eval for namespace {ns}, PPV {ppv}, simple scaler {} found queue {q}", scaler.pod_name_prefix);
tracing::debug!("Running eval for namespace {ns}, PPV {ppv}, simple scaler {} found queue {q}", scaler.deployment);
if q > 0 || is_namespace_running(ns, &guard.clusters) {
let pods = scaler.run(ns, q, &guard.clusters);
for (k, num) in &pods {
let replicas = scaler.run(ns, q, &guard.clusters);
for (k, num) in &replicas {
AUTOSCALER_METRICS.jobs
[&(scaler.pod_name_prefix.clone(), k.clone(), ns.clone())]
[&(scaler.deployment.clone(), k.clone(), ns.clone())]
.set(*num as u64);
}
// TODO: diff and add into scale_requests.
scaler.diff(ns, replicas, &guard.clusters, &mut scale_requests);
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion prover/crates/bin/prover_autoscaler/src/global/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ pub fn check_is_ready(v: &Vec<bool>) -> Result<()> {
pub struct Watcher {
/// List of base URLs of all agents.
pub cluster_agents: Vec<Arc<Url>>,
pub dry_run: bool,
pub data: Arc<Mutex<WatchedData>>,
}

impl Watcher {
pub fn new(agent_urls: Vec<String>) -> Self {
pub fn new(agent_urls: Vec<String>, dry_run: bool) -> Self {
let size = agent_urls.len();
Self {
cluster_agents: agent_urls
Expand All @@ -54,6 +55,7 @@ impl Watcher {
)
})
.collect(),
dry_run,
data: Arc::new(Mutex::new(WatchedData {
clusters: Clusters::default(),
is_ready: vec![false; size],
Expand All @@ -80,6 +82,7 @@ impl Watcher {
.collect();
}

let dry_run = self.dry_run;
let handles: Vec<_> = id_requests
.into_iter()
.map(|(id, sr)| {
Expand All @@ -92,6 +95,10 @@ impl Watcher {
tokio::spawn(async move {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
if dry_run {
tracing::info!("Dry-run mode, not sending the request.");
return Ok((id, Ok(ScaleResponse::default())));
}
let response = send_request_with_retries(
&url,
MAX_RETRIES,
Expand Down
3 changes: 2 additions & 1 deletion prover/crates/bin/prover_autoscaler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ async fn main() -> anyhow::Result<()> {
let interval = scaler_config.scaler_run_interval.unsigned_abs();
let exporter_config = PrometheusExporterConfig::pull(scaler_config.prometheus_port);
tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone())));
let watcher = global::watcher::Watcher::new(scaler_config.agents.clone());
let watcher =
global::watcher::Watcher::new(scaler_config.agents.clone(), scaler_config.dry_run);
let queuer = global::queuer::Queuer::new(scaler_config.prover_job_monitor_url.clone());
let scaler = global::scaler::Scaler::new(watcher.clone(), queuer, scaler_config);
tasks.extend(get_tasks(watcher, scaler, interval, stop_receiver)?);
Expand Down
Loading