diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs
index dfe08afe1a70..e69478479efc 100644
--- a/node/core/pvf/src/artifacts.rs
+++ b/node/core/pvf/src/artifacts.rs
@@ -14,6 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
+use crate::error::PrepareError;
use always_assert::always;
use async_std::path::{Path, PathBuf};
use parity_scale_codec::{Decode, Encode};
@@ -23,30 +24,19 @@ use std::{
time::{Duration, SystemTime},
};
-/// A final product of preparation process. Contains either a ready to run compiled artifact or
-/// a description what went wrong.
+/// A wrapper for the compiled PVF code.
#[derive(Encode, Decode)]
-pub enum Artifact {
- /// During the prevalidation stage of preparation an issue was found with the PVF.
- PrevalidationErr(String),
- /// Compilation failed for the given PVF.
- PreparationErr(String),
- /// This state indicates that the process assigned to prepare the artifact wasn't responsible
- /// or were killed. This state is reported by the validation host (not by the worker).
- DidntMakeIt,
- /// The PVF passed all the checks and is ready for execution.
- Compiled { compiled_artifact: Vec },
-}
+pub struct CompiledArtifact(Vec);
-impl Artifact {
- /// Serializes this struct into a byte buffer.
- pub fn serialize(&self) -> Vec {
- self.encode()
+impl CompiledArtifact {
+ pub fn new(code: Vec) -> Self {
+ Self(code)
}
+}
- /// Deserialize the given byte buffer to an artifact.
- pub fn deserialize(mut bytes: &[u8]) -> Result {
- Artifact::decode(&mut bytes).map_err(|e| format!("{:?}", e))
+impl AsRef<[u8]> for CompiledArtifact {
+ fn as_ref(&self) -> &[u8] {
+ self.0.as_slice()
}
}
@@ -117,6 +107,9 @@ pub enum ArtifactState {
},
/// A task to prepare this artifact is scheduled.
Preparing,
+ /// The code couldn't be compiled due to an error. Such artifacts
+ /// never reach the executor and stay in the host's memory.
+ FailedToProcess(PrepareError),
}
/// A container of all known artifact ids and their states.
diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs
index f0ba95515054..8afd0ddddb4b 100644
--- a/node/core/pvf/src/error.rs
+++ b/node/core/pvf/src/error.rs
@@ -14,6 +14,20 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see .
+use parity_scale_codec::{Decode, Encode};
+
+/// An error that occurred during the prepare part of the PVF pipeline.
+#[derive(Debug, Clone, Encode, Decode)]
+pub enum PrepareError {
+ /// During the prevalidation stage of preparation an issue was found with the PVF.
+ Prevalidation(String),
+ /// Compilation failed for the given PVF.
+ Preparation(String),
+ /// This state indicates that the process assigned to prepare the artifact wasn't responsible
+ /// or were killed. This state is reported by the validation host (not by the worker).
+ DidNotMakeIt,
+}
+
/// A error raised during validation of the candidate.
#[derive(Debug, Clone)]
pub enum ValidationError {
@@ -54,3 +68,14 @@ pub enum InvalidCandidate {
/// PVF execution (compilation is not included) took more time than was allotted.
HardTimeout,
}
+
+impl From for ValidationError {
+ fn from(error: PrepareError) -> Self {
+ let error_str = match error {
+ PrepareError::Prevalidation(err) => err,
+ PrepareError::Preparation(err) => err,
+ PrepareError::DidNotMakeIt => "preparation timeout".to_owned(),
+ };
+ ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedError(error_str))
+ }
+}
diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker.rs
index 86e892672d9e..a1c4e6d55788 100644
--- a/node/core/pvf/src/execute/worker.rs
+++ b/node/core/pvf/src/execute/worker.rs
@@ -15,7 +15,7 @@
// along with Polkadot. If not, see .
use crate::{
- artifacts::{Artifact, ArtifactPathId},
+ artifacts::{ArtifactPathId, CompiledArtifact},
executor_intf::TaskExecutor,
worker_common::{
bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path,
@@ -49,8 +49,8 @@ pub enum Outcome {
/// PVF execution completed successfully and the result is returned. The worker is ready for
/// another job.
Ok { result_descriptor: ValidationResult, duration_ms: u64, idle_worker: IdleWorker },
- /// The candidate validation failed. It may be for example because the preparation process
- /// produced an error or the wasm execution triggered a trap.
+ /// The candidate validation failed. It may be for example because the wasm execution triggered a trap.
+ /// Errors related to the preparation process are not expected to be encountered by the execution workers.
InvalidCandidate { err: String, idle_worker: IdleWorker },
/// An internal error happened during the validation. Such an error is most likely related to
/// some transient glitch.
@@ -216,18 +216,12 @@ async fn validate_using_artifact(
Ok(b) => b,
};
- let artifact = match Artifact::deserialize(&artifact_bytes) {
+ let artifact = match CompiledArtifact::decode(&mut artifact_bytes.as_slice()) {
Err(e) => return Response::InternalError(format!("artifact deserialization: {:?}", e)),
Ok(a) => a,
};
- let compiled_artifact = match &artifact {
- Artifact::PrevalidationErr(msg) => return Response::format_invalid("prevalidation", msg),
- Artifact::PreparationErr(msg) => return Response::format_invalid("preparation", msg),
- Artifact::DidntMakeIt => return Response::format_invalid("preparation timeout", ""),
-
- Artifact::Compiled { compiled_artifact } => compiled_artifact,
- };
+ let compiled_artifact = artifact.as_ref();
let validation_started_at = Instant::now();
let descriptor_bytes = match unsafe {
diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs
index 40c30ca65c21..292e37cdc30d 100644
--- a/node/core/pvf/src/host.rs
+++ b/node/core/pvf/src/host.rs
@@ -344,8 +344,7 @@ async fn run(
.await);
},
from_prepare_queue = from_prepare_queue_rx.next() => {
- let prepare::FromQueue::Prepared(artifact_id)
- = break_if_fatal!(from_prepare_queue.ok_or(Fatal));
+ let from_queue = break_if_fatal!(from_prepare_queue.ok_or(Fatal));
// Note that preparation always succeeds.
//
@@ -361,7 +360,7 @@ async fn run(
&mut artifacts,
&mut to_execute_queue_tx,
&mut awaiting_prepare,
- artifact_id,
+ from_queue,
).await);
},
}
@@ -439,6 +438,9 @@ async fn handle_execute_pvf(
awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
},
+ ArtifactState::FailedToProcess(error) => {
+ let _ = result_tx.send(Err(ValidationError::from(error.clone())));
+ },
}
} else {
// Artifact is unknown: register it and enqueue a job with the corresponding priority and
@@ -470,6 +472,7 @@ async fn handle_heads_up(
// Already preparing. We don't need to send a priority amend either because
// it can't get any lower than the background.
},
+ ArtifactState::FailedToProcess(_) => {},
}
} else {
// The artifact is unknown: register it and put a background job into the prepare queue.
@@ -491,8 +494,10 @@ async fn handle_prepare_done(
artifacts: &mut Artifacts,
execute_queue: &mut mpsc::Sender,
awaiting_prepare: &mut AwaitingPrepare,
- artifact_id: ArtifactId,
+ from_queue: prepare::FromQueue,
) -> Result<(), Fatal> {
+ let prepare::FromQueue { artifact_id, result } = from_queue;
+
// Make some sanity checks and extract the current state.
let state = match artifacts.artifact_state_mut(&artifact_id) {
None => {
@@ -513,6 +518,12 @@ async fn handle_prepare_done(
never!("the artifact is already prepared: {:?}", artifact_id);
return Ok(())
},
+ Some(ArtifactState::FailedToProcess(_)) => {
+ // The reasoning is similar to the above, the artifact cannot be
+ // processed at this point.
+ never!("the artifact is already processed unsuccessfully: {:?}", artifact_id);
+ return Ok(())
+ },
Some(state @ ArtifactState::Preparing) => state,
};
@@ -526,6 +537,12 @@ async fn handle_prepare_done(
continue
}
+ // Don't send failed artifacts to the execution's queue.
+ if let Err(ref error) = result {
+ let _ = result_tx.send(Err(ValidationError::from(error.clone())));
+ continue
+ }
+
send_execute(
execute_queue,
execute::ToQueue::Enqueue {
@@ -538,8 +555,10 @@ async fn handle_prepare_done(
.await?;
}
- // Now consider the artifact prepared.
- *state = ArtifactState::Prepared { last_time_needed: SystemTime::now() };
+ *state = match result {
+ Ok(()) => ArtifactState::Prepared { last_time_needed: SystemTime::now() },
+ Err(error) => ArtifactState::FailedToProcess(error.clone()),
+ };
Ok(())
}
@@ -937,7 +956,7 @@ mod tests {
);
test.from_prepare_queue_tx
- .send(prepare::FromQueue::Prepared(artifact_id(1)))
+ .send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) })
.await
.unwrap();
let result_tx_pvf_1_1 = assert_matches!(
@@ -950,7 +969,7 @@ mod tests {
);
test.from_prepare_queue_tx
- .send(prepare::FromQueue::Prepared(artifact_id(2)))
+ .send(prepare::FromQueue { artifact_id: artifact_id(2), result: Ok(()) })
.await
.unwrap();
let result_tx_pvf_2 = assert_matches!(
@@ -1005,7 +1024,7 @@ mod tests {
);
test.from_prepare_queue_tx
- .send(prepare::FromQueue::Prepared(artifact_id(1)))
+ .send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) })
.await
.unwrap();
diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs
index 035d799ac594..729f813432f9 100644
--- a/node/core/pvf/src/prepare/pool.rs
+++ b/node/core/pvf/src/prepare/pool.rs
@@ -16,6 +16,7 @@
use super::worker::{self, Outcome};
use crate::{
+ error::PrepareError,
metrics::Metrics,
worker_common::{IdleWorker, WorkerHandle},
LOG_TARGET,
@@ -78,9 +79,16 @@ pub enum FromPool {
/// The given worker was just spawned and is ready to be used.
Spawned(Worker),
- /// The given worker either succeeded or failed the given job. Under any circumstances the
- /// artifact file has been written. The `bool` says whether the worker ripped.
- Concluded(Worker, bool),
+ /// The given worker either succeeded or failed the given job.
+ Concluded {
+ /// A key for retrieving the worker data from the pool.
+ worker: Worker,
+ /// Indicates whether the worker process was killed.
+ rip: bool,
+ /// [`Ok`] indicates that compiled artifact is successfully stored on disk.
+ /// Otherwise, an [error](PrepareError) is supplied.
+ result: Result<(), PrepareError>,
+ },
/// The given worker ceased to exist.
Rip(Worker),
@@ -295,7 +303,7 @@ fn handle_mux(
},
PoolEvent::StartWork(worker, outcome) => {
match outcome {
- Outcome::Concluded(idle) => {
+ Outcome::Concluded { worker: idle, result } => {
let data = match spawned.get_mut(worker) {
None => {
// Perhaps the worker was killed meanwhile and the result is no longer
@@ -310,7 +318,7 @@ fn handle_mux(
let old = data.idle.replace(idle);
assert_matches!(old, None, "attempt to overwrite an idle worker");
- reply(from_pool, FromPool::Concluded(worker, false))?;
+ reply(from_pool, FromPool::Concluded { worker, rip: false, result })?;
Ok(())
},
@@ -321,9 +329,16 @@ fn handle_mux(
Ok(())
},
- Outcome::DidntMakeIt => {
+ Outcome::DidNotMakeIt => {
if attempt_retire(metrics, spawned, worker) {
- reply(from_pool, FromPool::Concluded(worker, true))?;
+ reply(
+ from_pool,
+ FromPool::Concluded {
+ worker,
+ rip: true,
+ result: Err(PrepareError::DidNotMakeIt),
+ },
+ )?;
}
Ok(())
diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs
index 4ffa21de435b..d85e6b8a1422 100644
--- a/node/core/pvf/src/prepare/queue.rs
+++ b/node/core/pvf/src/prepare/queue.rs
@@ -17,7 +17,9 @@
//! A queue that handles requests for PVF preparation.
use super::pool::{self, Worker};
-use crate::{artifacts::ArtifactId, metrics::Metrics, Priority, Pvf, LOG_TARGET};
+use crate::{
+ artifacts::ArtifactId, error::PrepareError, metrics::Metrics, Priority, Pvf, LOG_TARGET,
+};
use always_assert::{always, never};
use async_std::path::PathBuf;
use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt};
@@ -29,7 +31,7 @@ pub enum ToQueue {
/// This schedules preparation of the given PVF.
///
/// Note that it is incorrect to enqueue the same PVF again without first receiving the
- /// [`FromQueue::Prepared`] response. In case there is a need to bump the priority, use
+ /// [`FromQueue`] response. In case there is a need to bump the priority, use
/// [`ToQueue::Amend`].
Enqueue { priority: Priority, pvf: Pvf },
/// Amends the priority for the given [`ArtifactId`] if it is running. If it's not, then it's noop.
@@ -37,9 +39,13 @@ pub enum ToQueue {
}
/// A response from queue.
-#[derive(Debug, PartialEq, Eq)]
-pub enum FromQueue {
- Prepared(ArtifactId),
+#[derive(Debug)]
+pub struct FromQueue {
+ /// Identifier of an artifact.
+ pub(crate) artifact_id: ArtifactId,
+ /// Outcome of the PVF processing. [`Ok`] indicates that compiled artifact
+ /// is successfully stored on disk. Otherwise, an [error](PrepareError) is supplied.
+ pub(crate) result: Result<(), PrepareError>,
}
#[derive(Default)]
@@ -299,7 +305,8 @@ async fn handle_from_pool(queue: &mut Queue, from_pool: pool::FromPool) -> Resul
use pool::FromPool::*;
match from_pool {
Spawned(worker) => handle_worker_spawned(queue, worker).await?,
- Concluded(worker, rip) => handle_worker_concluded(queue, worker, rip).await?,
+ Concluded { worker, rip, result } =>
+ handle_worker_concluded(queue, worker, rip, result).await?,
Rip(worker) => handle_worker_rip(queue, worker).await?,
}
Ok(())
@@ -320,6 +327,7 @@ async fn handle_worker_concluded(
queue: &mut Queue,
worker: Worker,
rip: bool,
+ result: Result<(), PrepareError>,
) -> Result<(), Fatal> {
queue.metrics.prepare_concluded();
@@ -377,7 +385,7 @@ async fn handle_worker_concluded(
"prepare worker concluded",
);
- reply(&mut queue.from_queue_tx, FromQueue::Prepared(artifact_id))?;
+ reply(&mut queue.from_queue_tx, FromQueue { artifact_id, result })?;
// Figure out what to do with the worker.
if rip {
@@ -641,12 +649,9 @@ mod tests {
let w = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w));
- test.send_from_pool(pool::FromPool::Concluded(w, false));
+ test.send_from_pool(pool::FromPool::Concluded { worker: w, rip: false, result: Ok(()) });
- assert_eq!(
- test.poll_and_recv_from_queue().await,
- FromQueue::Prepared(pvf(1).as_artifact_id())
- );
+ assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id());
}
#[async_std::test]
@@ -671,7 +676,7 @@ mod tests {
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
- test.send_from_pool(pool::FromPool::Concluded(w1, false));
+ test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, result: Ok(()) });
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
@@ -704,7 +709,7 @@ mod tests {
// That's a bit silly in this context, but in production there will be an entire pool up
// to the `soft_capacity` of workers and it doesn't matter which one to cull. Either way,
// we just check that edge case of an edge case works.
- test.send_from_pool(pool::FromPool::Concluded(w1, false));
+ test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, result: Ok(()) });
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1));
}
@@ -749,15 +754,12 @@ mod tests {
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
// Conclude worker 1 and rip it.
- test.send_from_pool(pool::FromPool::Concluded(w1, true));
+ test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, result: Ok(()) });
// Since there is still work, the queue requested one extra worker to spawn to handle the
// remaining enqueued work items.
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
- assert_eq!(
- test.poll_and_recv_from_queue().await,
- FromQueue::Prepared(pvf(1).as_artifact_id())
- );
+ assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id());
}
#[async_std::test]
@@ -773,7 +775,11 @@ mod tests {
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
- test.send_from_pool(pool::FromPool::Concluded(w1, true));
+ test.send_from_pool(pool::FromPool::Concluded {
+ worker: w1,
+ rip: true,
+ result: Err(PrepareError::DidNotMakeIt),
+ });
test.poll_ensure_to_pool_is_empty().await;
}
diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs
index 510d582f7e03..a8bb3516e296 100644
--- a/node/core/pvf/src/prepare/worker.rs
+++ b/node/core/pvf/src/prepare/worker.rs
@@ -15,7 +15,8 @@
// along with Polkadot. If not, see .
use crate::{
- artifacts::Artifact,
+ artifacts::CompiledArtifact,
+ error::PrepareError,
worker_common::{
bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path,
tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
@@ -29,6 +30,8 @@ use async_std::{
};
use futures::FutureExt as _;
use futures_timer::Delay;
+use parity_scale_codec::{Decode, Encode};
+use sp_core::hexdisplay::HexDisplay;
use std::{sync::Arc, time::Duration};
const NICENESS_BACKGROUND: i32 = 10;
@@ -48,7 +51,7 @@ pub async fn spawn(
pub enum Outcome {
/// The worker has finished the work assigned to it.
- Concluded(IdleWorker),
+ Concluded { worker: IdleWorker, result: Result<(), PrepareError> },
/// The host tried to reach the worker but failed. This is most likely because the worked was
/// killed by the system.
Unreachable,
@@ -59,7 +62,7 @@ pub enum Outcome {
/// the artifact).
///
/// This doesn't return an idle worker instance, thus this worker is no longer usable.
- DidntMakeIt,
+ DidNotMakeIt,
}
/// Given the idle token of a worker and parameters of work, communicates with the worker and
@@ -99,13 +102,11 @@ pub async fn start_work(
// Wait for the result from the worker, keeping in mind that there may be a timeout, the
// worker may get killed, or something along these lines.
//
- // In that case we should handle these gracefully by writing the artifact file by ourselves.
- // We may potentially overwrite the artifact in rare cases where the worker didn't make
- // it to report back the result.
+ // In that case we should propagate the error to the pool.
#[derive(Debug)]
enum Selected {
- Done,
+ Done(Result<(), PrepareError>),
IoErr,
Deadline,
}
@@ -113,41 +114,48 @@ pub async fn start_work(
let selected = futures::select! {
res = framed_recv(&mut stream).fuse() => {
match res {
- Ok(x) if x == &[1u8] => {
- tracing::debug!(
- target: LOG_TARGET,
- worker_pid = %pid,
- "promoting WIP artifact {} to {}",
- tmp_file.display(),
- artifact_path.display(),
- );
-
- async_std::fs::rename(&tmp_file, &artifact_path)
- .await
- .map(|_| Selected::Done)
- .unwrap_or_else(|err| {
- tracing::warn!(
+ Ok(response_bytes) => {
+ // By convention we expect encoded `Result<(), PrepareError>`.
+ if let Ok(result) =
+ >::decode(&mut response_bytes.clone().as_slice())
+ {
+ if result.is_ok() {
+ tracing::debug!(
target: LOG_TARGET,
worker_pid = %pid,
- "failed to rename the artifact from {} to {}: {:?}",
+ "promoting WIP artifact {} to {}",
tmp_file.display(),
artifact_path.display(),
- err,
);
- Selected::IoErr
- })
- }
- Ok(response_bytes) => {
- use sp_core::hexdisplay::HexDisplay;
- let bound_bytes =
- &response_bytes[..response_bytes.len().min(4)];
- tracing::warn!(
- target: LOG_TARGET,
- worker_pid = %pid,
- "received unexpected response from the prepare worker: {}",
- HexDisplay::from(&bound_bytes),
- );
- Selected::IoErr
+
+ async_std::fs::rename(&tmp_file, &artifact_path)
+ .await
+ .map(|_| Selected::Done(result))
+ .unwrap_or_else(|err| {
+ tracing::warn!(
+ target: LOG_TARGET,
+ worker_pid = %pid,
+ "failed to rename the artifact from {} to {}: {:?}",
+ tmp_file.display(),
+ artifact_path.display(),
+ err,
+ );
+ Selected::IoErr
+ })
+ } else {
+ Selected::Done(result)
+ }
+ } else {
+ // We received invalid bytes from the worker.
+ let bound_bytes = &response_bytes[..response_bytes.len().min(4)];
+ tracing::warn!(
+ target: LOG_TARGET,
+ worker_pid = %pid,
+ "received unexpected response from the prepare worker: {}",
+ HexDisplay::from(&bound_bytes),
+ );
+ Selected::IoErr
+ }
},
Err(err) => {
tracing::warn!(
@@ -164,24 +172,11 @@ pub async fn start_work(
};
match selected {
- Selected::Done => {
+ Selected::Done(result) => {
renice(pid, NICENESS_FOREGROUND);
- Outcome::Concluded(IdleWorker { stream, pid })
- },
- Selected::IoErr | Selected::Deadline => {
- let bytes = Artifact::DidntMakeIt.serialize();
- // best effort: there is nothing we can do here if the write fails.
- if let Err(err) = async_std::fs::write(&artifact_path, &bytes).await {
- tracing::warn!(
- target: LOG_TARGET,
- worker_pid = %pid,
- "preparation didn't make it, because of `{:?}`: {:?}",
- selected,
- err,
- );
- }
- Outcome::DidntMakeIt
+ Outcome::Concluded { worker: IdleWorker { stream, pid }, result }
},
+ Selected::IoErr | Selected::Deadline => Outcome::DidNotMakeIt,
}
})
.await
@@ -205,7 +200,7 @@ where
"failed to create a temp file for the artifact: {:?}",
err,
);
- return Outcome::DidntMakeIt
+ return Outcome::DidNotMakeIt
},
};
@@ -288,31 +283,47 @@ pub fn worker_entrypoint(socket_path: &str) {
worker_pid = %std::process::id(),
"worker: preparing artifact",
);
- let artifact_bytes = prepare_artifact(&code).serialize();
- // Write the serialized artifact into into a temp file.
- tracing::debug!(
- target: LOG_TARGET,
- worker_pid = %std::process::id(),
- "worker: writing artifact to {}",
- dest.display(),
- );
- async_std::fs::write(&dest, &artifact_bytes).await?;
+ let result = match prepare_artifact(&code) {
+ Err(err) => {
+ // Serialized error will be written into the socket.
+ Err(err)
+ },
+ Ok(compiled_artifact) => {
+ // Write the serialized artifact into a temp file.
+ // PVF host only keeps artifacts statuses in its memory,
+ // successfully compiled code gets stored on the disk (and
+ // consequently deserialized by execute-workers). The prepare
+ // worker is only required to send an empty `Ok` to the pool
+ // to indicate the success.
+
+ let artifact_bytes = compiled_artifact.encode();
+
+ tracing::debug!(
+ target: LOG_TARGET,
+ worker_pid = %std::process::id(),
+ "worker: writing artifact to {}",
+ dest.display(),
+ );
+ async_std::fs::write(&dest, &artifact_bytes).await?;
+
+ Ok(())
+ },
+ };
- // Return back a byte that signals finishing the work.
- framed_send(&mut stream, &[1u8]).await?;
+ framed_send(&mut stream, result.encode().as_slice()).await?;
}
});
}
-fn prepare_artifact(code: &[u8]) -> Artifact {
+fn prepare_artifact(code: &[u8]) -> Result {
let blob = match crate::executor_intf::prevalidate(code) {
- Err(err) => return Artifact::PrevalidationErr(format!("{:?}", err)),
+ Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
Ok(b) => b,
};
match crate::executor_intf::prepare(blob) {
- Ok(compiled_artifact) => Artifact::Compiled { compiled_artifact },
- Err(err) => Artifact::PreparationErr(format!("{:?}", err)),
+ Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)),
+ Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
}
}