Skip to content

Commit

Permalink
Add dynamic exec deadline
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreiEres committed Oct 2, 2024
1 parent c52675e commit c19bba4
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 47 deletions.
33 changes: 9 additions & 24 deletions polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ async fn validate_candidate_exhaustive(
pov: Arc<PoV>,
executor_params: ExecutorParams,
exec_kind: PvfExecPriority,
request_ttl: Option<Duration>,
request_ttl: Option<Instant>,
metrics: &Metrics,
) -> Result<ValidationResult, ValidationFailed> {
let _timer = metrics.time_validate_candidate_exhaustive();
Expand Down Expand Up @@ -863,7 +863,6 @@ async fn validate_candidate_exhaustive(
PvfExecPriority::Backing | PvfExecPriority::BackingSystemParas => {
let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
let exec_timeout = pvf_exec_timeout(&executor_params, exec_kind.into());
let exec_deadline = pvf_exec_deadline(exec_timeout, request_ttl);
let pvf = PvfPrepData::from_code(
validation_code.0,
executor_params,
Expand All @@ -875,7 +874,7 @@ async fn validate_candidate_exhaustive(
.validate_candidate(
pvf,
exec_timeout,
exec_deadline,
request_ttl,
persisted_validation_data.clone(),
pov,
exec_kind.into(),
Expand All @@ -885,12 +884,11 @@ async fn validate_candidate_exhaustive(
},
PvfExecPriority::Approval | PvfExecPriority::Dispute => {
let exec_timeout = pvf_exec_timeout(&executor_params, exec_kind.into());
let exec_deadline = pvf_exec_deadline(exec_timeout, request_ttl);
validation_backend
.validate_candidate_with_retry(
validation_code.0,
exec_timeout,
exec_deadline,
request_ttl,
persisted_validation_data.clone(),
pov,
executor_params,
Expand Down Expand Up @@ -990,7 +988,7 @@ trait ValidationBackend {
&mut self,
pvf: PvfPrepData,
exec_timeout: Duration,
exec_deadline: Option<Instant>,
exec_ttl: Option<Instant>,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
// The priority for the preparation job.
Expand All @@ -1011,7 +1009,7 @@ trait ValidationBackend {
&mut self,
code: Vec<u8>,
exec_timeout: Duration,
exec_deadline: Option<Instant>,
exec_ttl: Option<Instant>,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
executor_params: ExecutorParams,
Expand All @@ -1038,7 +1036,7 @@ trait ValidationBackend {
.validate_candidate(
pvf.clone(),
exec_timeout,
exec_deadline,
exec_ttl,
pvd.clone(),
pov.clone(),
prepare_priority,
Expand Down Expand Up @@ -1125,7 +1123,7 @@ trait ValidationBackend {
.validate_candidate(
pvf.clone(),
new_timeout,
exec_deadline,
exec_ttl,
pvd.clone(),
pov.clone(),
prepare_priority,
Expand All @@ -1150,7 +1148,7 @@ impl ValidationBackend for ValidationHost {
&mut self,
pvf: PvfPrepData,
exec_timeout: Duration,
exec_deadline: Option<Instant>,
exec_ttl: Option<Instant>,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
// The priority for the preparation job.
Expand All @@ -1163,7 +1161,7 @@ impl ValidationBackend for ValidationHost {
.execute_pvf(
pvf,
exec_timeout,
exec_deadline,
exec_ttl,
pvd,
pov,
prepare_priority,
Expand Down Expand Up @@ -1271,16 +1269,3 @@ fn pvf_exec_timeout(executor_params: &ExecutorParams, kind: PvfExecKind) -> Dura
PvfExecKind::Approval => DEFAULT_APPROVAL_EXECUTION_TIMEOUT,
}
}

fn pvf_exec_deadline(
exec_timeout: Duration,
validation_request_ttl: Option<Duration>,
) -> Option<Instant> {
validation_request_ttl.and_then(|ttl| {
if ttl <= exec_timeout {
None // Job dropping should be turned off
} else {
Some(Instant::now() + ttl - exec_timeout)
}
})
}
30 changes: 24 additions & 6 deletions polkadot/node/core/pvf/src/execute/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub enum FromQueue {
#[derive(Debug)]
pub struct PendingExecutionRequest {
pub exec_timeout: Duration,
pub exec_deadline: Option<Instant>,
pub exec_ttl: Option<Instant>,
pub pvd: Arc<PersistedValidationData>,
pub pov: Arc<PoV>,
pub executor_params: ExecutorParams,
Expand All @@ -83,7 +83,7 @@ pub struct PendingExecutionRequest {
struct ExecuteJob {
artifact: ArtifactPathId,
exec_timeout: Duration,
exec_deadline: Option<Instant>,
exec_ttl: Option<Instant>,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
executor_params: ExecutorParams,
Expand Down Expand Up @@ -174,6 +174,8 @@ struct Queue {
unscheduled: Unscheduled,
workers: Workers,
mux: Mux,
/// Minimal observed execution time
min_exec_time: Option<Duration>,
}

impl Queue {
Expand Down Expand Up @@ -204,6 +206,7 @@ impl Queue {
spawn_inflight: 0,
capacity: worker_capacity,
},
min_exec_time: None,
}
}

Expand Down Expand Up @@ -280,8 +283,17 @@ impl Queue {
}

let job = queue.remove(job_index).expect("Job is just checked to be in queue; qed");
let exec_deadline = job.exec_ttl.and_then(|ttl| {
// Because we observe the execution of different jobs, their execution time can exceed
// the current job's execution timeout.
let min_exec_time = self.min_exec_time.unwrap_or_default().min(job.exec_timeout);
// There is a high possibility that the current execution time will be less than the
// execution timeout but more than the minimum observed execution time. Therefore, we
// subtract it from the TTL to avoid exceeding the deadline.
ttl.checked_sub(min_exec_time)
});

if let Some(deadline) = job.exec_deadline {
if let Some(deadline) = exec_deadline {
let now = Instant::now();
gum::debug!(target: LOG_TARGET, ?priority, ?deadline, ?now, "Job has a deadline");
if now > deadline {
Expand Down Expand Up @@ -325,7 +337,7 @@ fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
let ToQueue::Enqueue { artifact, pending_execution_request } = to_queue;
let PendingExecutionRequest {
exec_timeout,
exec_deadline,
exec_ttl,
pvd,
pov,
executor_params,
Expand All @@ -342,7 +354,7 @@ fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
let job = ExecuteJob {
artifact,
exec_timeout,
exec_deadline,
exec_ttl,
pvd,
pov,
executor_params,
Expand Down Expand Up @@ -499,6 +511,12 @@ async fn handle_job_finish(
err
);
} else {
if let Some(dur) = duration {
queue.min_exec_time = queue
.min_exec_time
.map(|min_time| if dur < min_time { dur } else { min_time })
.or(Some(dur));
}
gum::trace!(
target: LOG_TARGET,
?artifact_id,
Expand Down Expand Up @@ -835,7 +853,7 @@ mod tests {
ExecuteJob {
artifact: ArtifactPathId { id: artifact_id(0), path: PathBuf::new() },
exec_timeout: Duration::from_secs(10),
exec_deadline: None,
exec_ttl: None,
pvd,
pov,
executor_params: ExecutorParams::default(),
Expand Down
22 changes: 11 additions & 11 deletions polkadot/node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl ValidationHost {
&mut self,
pvf: PvfPrepData,
exec_timeout: Duration,
exec_deadline: Option<Instant>,
exec_ttl: Option<Instant>,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
priority: Priority,
Expand All @@ -122,7 +122,7 @@ impl ValidationHost {
.send(ToHost::ExecutePvf(ExecutePvfInputs {
pvf,
exec_timeout,
exec_deadline,
exec_ttl,
pvd,
pov,
priority,
Expand Down Expand Up @@ -156,7 +156,7 @@ enum ToHost {
struct ExecutePvfInputs {
pvf: PvfPrepData,
exec_timeout: Duration,
exec_deadline: Option<Instant>,
exec_ttl: Option<Instant>,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
priority: Priority,
Expand Down Expand Up @@ -554,7 +554,7 @@ async fn handle_execute_pvf(
let ExecutePvfInputs {
pvf,
exec_timeout,
exec_deadline,
exec_ttl,
pvd,
pov,
priority,
Expand All @@ -579,7 +579,7 @@ async fn handle_execute_pvf(
artifact: ArtifactPathId::new(artifact_id, path),
pending_execution_request: PendingExecutionRequest {
exec_timeout,
exec_deadline,
exec_ttl,
pvd,
pov,
executor_params,
Expand Down Expand Up @@ -611,7 +611,7 @@ async fn handle_execute_pvf(
artifact_id,
PendingExecutionRequest {
exec_timeout,
exec_deadline,
exec_ttl,
pvd,
pov,
executor_params,
Expand All @@ -627,7 +627,7 @@ async fn handle_execute_pvf(
artifact_id,
PendingExecutionRequest {
exec_timeout,
exec_deadline,
exec_ttl,
pvd,
pov,
executor_params,
Expand Down Expand Up @@ -662,7 +662,7 @@ async fn handle_execute_pvf(
artifact_id,
PendingExecutionRequest {
exec_timeout,
exec_deadline,
exec_ttl,
pvd,
pov,
executor_params,
Expand All @@ -688,7 +688,7 @@ async fn handle_execute_pvf(
artifact_id,
PendingExecutionRequest {
exec_timeout,
exec_deadline,
exec_ttl,
pvd,
pov,
executor_params,
Expand Down Expand Up @@ -818,7 +818,7 @@ async fn handle_prepare_done(
let pending_requests = awaiting_prepare.take(&artifact_id);
for PendingExecutionRequest {
exec_timeout,
exec_deadline,
exec_ttl,
pvd,
pov,
executor_params,
Expand Down Expand Up @@ -846,7 +846,7 @@ async fn handle_prepare_done(
artifact: ArtifactPathId::new(artifact_id.clone(), &path),
pending_execution_request: PendingExecutionRequest {
exec_timeout,
exec_deadline,
exec_ttl,
pvd,
pov,
executor_params,
Expand Down
8 changes: 4 additions & 4 deletions polkadot/node/core/pvf/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl TestHost {
pvd: PersistedValidationData,
pov: PoV,
executor_params: ExecutorParams,
exec_deadline: Option<std::time::Instant>,
exec_ttl: Option<std::time::Instant>,
) -> Result<ValidationResult, ValidationError> {
let (result_tx, result_rx) = futures::channel::oneshot::channel();

Expand All @@ -122,7 +122,7 @@ impl TestHost {
PrepareJobKind::Compilation,
),
TEST_EXECUTION_TIMEOUT,
exec_deadline,
exec_ttl,
Arc::new(pvd),
Arc::new(pov),
polkadot_node_core_pvf::Priority::Normal,
Expand Down Expand Up @@ -201,7 +201,7 @@ async fn execute_job_terminates_on_execution_deadline() {
max_pov_size: 4096 * 1024,
};
let pov = PoV { block_data: BlockData(Vec::new()) };
let exec_deadline = Some(std::time::Instant::now());
let exec_ttl = Some(std::time::Instant::now());

let start = std::time::Instant::now();
let result = host
Expand All @@ -210,7 +210,7 @@ async fn execute_job_terminates_on_execution_deadline() {
pvd,
pov,
Default::default(),
exec_deadline,
exec_ttl,
)
.await;

Expand Down
4 changes: 2 additions & 2 deletions polkadot/node/subsystem-types/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use polkadot_statement_table::v2::Misbehavior;
use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque},
sync::Arc,
time::Duration,
time::Instant,
};

/// Network events as transmitted to other subsystems, wrapped in their message types.
Expand Down Expand Up @@ -192,7 +192,7 @@ pub enum CandidateValidationMessage {
response_sender: oneshot::Sender<Result<ValidationResult, ValidationFailed>>,
/// The time within which the validation should be completed.
/// Important for time-sensitive tasks such as backing.
ttl: Option<Duration>,
ttl: Option<Instant>,
},
/// Try to compile the given validation code and send back
/// the outcome.
Expand Down

0 comments on commit c19bba4

Please sign in to comment.