Skip to content

Commit

Permalink
Add retries to node analyze (#31885)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 84706d622497416371b45020d623efe26c4daa23
  • Loading branch information
emmaling27 authored and Convex, Inc committed Dec 4, 2024
1 parent aabd795 commit e140f62
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 98 deletions.
4 changes: 2 additions & 2 deletions crates/application/src/application_function_runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ pub struct ApplicationFunctionRunner<RT: Runtime> {

isolate_functions: FunctionRouter<RT>,
// Used for analyze, schema, etc.
node_actions: Actions,
node_actions: Actions<RT>,

pub(crate) module_cache: Arc<dyn ModuleLoader<RT>>,
modules_storage: Arc<dyn Storage>,
Expand All @@ -582,7 +582,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
database: Database<RT>,
key_broker: KeyBroker,
function_runner: Arc<dyn FunctionRunner<RT>>,
node_actions: Actions,
node_actions: Actions<RT>,
file_storage: TransactionalFileStorage<RT>,
modules_storage: Arc<dyn Storage>,
module_cache: Arc<dyn ModuleLoader<RT>>,
Expand Down
2 changes: 1 addition & 1 deletion crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ impl<RT: Runtime> Application<RT> {
searcher: Arc<dyn Searcher>,
segment_term_metadata_fetcher: Arc<dyn SegmentTermMetadataFetcher>,
persistence: Arc<dyn Persistence>,
node_actions: Actions,
node_actions: Actions<RT>,
log_sender: Arc<dyn LogSender>,
log_visibility: Arc<dyn LogVisibility<RT>>,
snapshot_import_pause_client: PauseClient,
Expand Down
7 changes: 6 additions & 1 deletion crates/application/src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,12 @@ impl<RT: Runtime> ApplicationTestExt<RT> for Application<RT> {

let node_process_timeout = *ACTION_USER_TIMEOUT + Duration::from_secs(5);
let node_executor = Arc::new(LocalNodeExecutor::new(node_process_timeout)?);
let actions = Actions::new(node_executor, convex_origin.clone(), *ACTION_USER_TIMEOUT);
let actions = Actions::new(
node_executor,
convex_origin.clone(),
*ACTION_USER_TIMEOUT,
rt.clone(),
);

let application = Application::new(
rt.clone(),
Expand Down
4 changes: 4 additions & 0 deletions crates/common/src/knobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,10 @@ pub static AWS_LAMBDA_DEPLOY_SPLAY_SECONDS: LazyLock<Duration> =
pub static AWS_LAMBDA_CLIENT_MAX_CONCURRENT_REQUESTS: LazyLock<usize> =
LazyLock::new(|| env_config("AWS_LAMBDA_MAX_CONCURRENT_STREAMS_PER_CONNECTION", 100));

/// The maximum number of times to retry analyze requests for node actions.
pub static NODE_ANALYZE_MAX_RETRIES: LazyLock<usize> =
LazyLock::new(|| env_config("NODE_ANALYZE_MAX_RETRIES", 3));

/// The number of seconds backend should wait for requests to drain before
/// shutting down after SIGINT.
pub static BACKEND_REQUEST_DRAIN_TIMEOUT: LazyLock<Duration> =
Expand Down
1 change: 1 addition & 0 deletions crates/local_backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ pub async fn make_app(
node_executor,
config.convex_origin_url(),
*ACTION_USER_TIMEOUT,
runtime.clone(),
);

#[cfg(not(debug_assertions))]
Expand Down
43 changes: 37 additions & 6 deletions crates/node_executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@ use std::{

use async_trait::async_trait;
use common::{
backoff::Backoff,
errors::{
report_error,
FrameData,
JsError,
},
execution_context::ExecutionContext,
knobs::NODE_ANALYZE_MAX_RETRIES,
log_lines::{
LogLine,
LogLineStructured,
},
runtime::Runtime,
sha256::Sha256Digest,
types::{
ActionCallbackToken,
Expand All @@ -28,6 +32,7 @@ use common::{
UdfType,
},
};
use errors::ErrorMetadataAnyhowExt;
use http::Uri;
use isolate::{
deserialize_udf_custom_error,
Expand Down Expand Up @@ -109,6 +114,9 @@ pub static EXECUTE_TIMEOUT_RESPONSE_JSON: LazyLock<JsonValue> = LazyLock::new(||
)
});

const NODE_ANALYZE_INITIAL_BACKOFF: Duration = Duration::from_millis(100);
const NODE_ANALYZE_MAX_BACKOFF: Duration = Duration::from_secs(5);

#[async_trait]
pub trait NodeExecutor: Sync + Send {
fn enable(&self) -> anyhow::Result<()>;
Expand All @@ -127,10 +135,11 @@ pub struct InvokeResponse {
}

#[derive(Clone)]
pub struct Actions {
pub struct Actions<RT: Runtime> {
executor: Arc<dyn NodeExecutor>,
convex_origin: ConvexOrigin,
user_timeout: Duration,
runtime: RT,
}

fn construct_js_error(
Expand Down Expand Up @@ -178,16 +187,18 @@ fn construct_js_error(
Ok(error)
}

impl Actions {
impl<RT: Runtime> Actions<RT> {
pub fn new(
executor: Arc<dyn NodeExecutor>,
convex_origin: ConvexOrigin,
user_timeout: Duration,
runtime: RT,
) -> Self {
Self {
executor,
convex_origin,
user_timeout,
runtime,
}
}

Expand Down Expand Up @@ -342,6 +353,28 @@ impl Actions {
result
}

async fn invoke_analyze(&self, request: AnalyzeRequest) -> anyhow::Result<InvokeResponse> {
let mut backoff = Backoff::new(NODE_ANALYZE_INITIAL_BACKOFF, NODE_ANALYZE_MAX_BACKOFF);
let mut retries = 0;
loop {
let (log_line_sender, _log_line_receiver) = mpsc::unbounded_channel();
let request = ExecutorRequest::Analyze(request.clone());
match self.executor.invoke(request, log_line_sender).await {
Ok(response) => return Ok(response),
Err(mut e) => {
if retries >= *NODE_ANALYZE_MAX_RETRIES || e.is_deterministic_user_error() {
return Err(e);
}
tracing::warn!("Failed to invoke analyze: {:?}", e);
retries += 1;
report_error(&mut e);
let duration = backoff.fail(&mut self.runtime.rng());
self.runtime.wait(duration).await;
},
}
}
}

#[minitrace::trace]
pub async fn analyze(
&self,
Expand All @@ -350,13 +383,11 @@ impl Actions {
) -> anyhow::Result<Result<BTreeMap<CanonicalizedModulePath, AnalyzedModule>, JsError>> {
let timer = node_executor("analyze");

let (log_line_sender, _log_line_receiver) = mpsc::unbounded_channel();
let request = ExecutorRequest::Analyze(request);
let InvokeResponse {
response,
memory_used_in_mb: _,
aws_request_id,
} = self.executor.invoke(request, log_line_sender).await?;
} = self.invoke_analyze(request).await?;
let response: AnalyzeResponse = serde_json::from_value(response.clone()).map_err(|e| {
anyhow::anyhow!(
"Failed to deserialize analyze response: {}. Response: {}",
Expand Down Expand Up @@ -779,7 +810,7 @@ impl TryFrom<JsonValue> for ExecuteResponse {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct AnalyzeRequest {
pub source_package: SourcePackage,
pub environment_variables: BTreeMap<EnvVarName, EnvVarValue>,
Expand Down
Loading

0 comments on commit e140f62

Please sign in to comment.