From c6d98cf2e95b155353e126aa0a165c99592924ec Mon Sep 17 00:00:00 2001 From: Nipunn Koorapati Date: Wed, 4 Dec 2024 14:31:32 -0700 Subject: [PATCH] Make report_error async. Yield once before reporting. (#31853) Yielding allows the task to die during shutdown - which can avoid spuriously reporting useless abort errors to sentry. Should hopefully reduce a lot of noise during pushes. I considered making it the responsibility of the callsite to yield, but given that every background worker would prefer this yield, and it doesn't hurt to add it on main threads - I just achieved it everywhere with this technique. Open to better ideas, but this seems to compile. GitOrigin-RevId: 168f9e54d7f66e942959d51c0d17749820b03596 --- crates/application/src/cron_jobs/mod.rs | 6 +++--- crates/application/src/export_worker.rs | 4 ++-- crates/application/src/function_log.rs | 8 +++---- crates/application/src/lib.rs | 3 ++- crates/application/src/scheduled_jobs/mod.rs | 10 ++++----- crates/application/src/schema_worker/mod.rs | 2 +- crates/application/src/snapshot_import/mod.rs | 4 ++-- .../application/src/snapshot_import/worker.rs | 2 +- .../src/system_table_cleanup/mod.rs | 2 +- .../application/src/table_summary_worker.rs | 2 +- crates/common/src/errors.rs | 18 ++++++++++++---- crates/common/src/http/mod.rs | 4 ++-- crates/config_loader/src/lib.rs | 2 +- crates/database/src/index_worker.rs | 2 +- .../src/index_workers/retriable_worker.rs | 2 +- crates/database/src/retention.rs | 21 +++++++++++-------- crates/database/src/search_index_bootstrap.rs | 2 +- crates/database/src/subscription.rs | 4 ++-- .../src/environment/helpers/promise.rs | 4 ++-- crates/isolate/src/ops/errors.rs | 4 ++-- crates/isolate/src/termination.rs | 4 ++-- crates/local_backend/src/subs/mod.rs | 10 ++++----- crates/search/src/fragmented_segment.rs | 2 +- crates/storage/src/lib.rs | 5 +++-- 24 files changed, 71 insertions(+), 56 deletions(-) diff --git a/crates/application/src/cron_jobs/mod.rs b/crates/application/src/cron_jobs/mod.rs index bd96186c..578d1115 100644 --- a/crates/application/src/cron_jobs/mod.rs +++ b/crates/application/src/cron_jobs/mod.rs @@ -129,7 +129,7 @@ impl CronJobExecutor { while let Err(mut e) = executor.run(&mut backoff).await { // Only report OCCs that happen repeatedly if !e.is_occ() || (backoff.failures() as usize) > *UDF_EXECUTOR_OCC_MAX_RETRIES { - report_error(&mut e); + report_error(&mut e).await; } let delay = backoff.fail(&mut executor.rt.rng()); tracing::error!("Cron job executor failed, sleeping {delay:?}"); @@ -308,7 +308,7 @@ impl CronJobExecutor { Err(mut e) => { let delay = function_backoff.fail(&mut self.rt.rng()); tracing::error!("System error executing job:, sleeping {delay:?}"); - report_error(&mut e); + report_error(&mut e).await; metrics::log_cron_job_failure(&e); self.rt.wait(delay).await; }, @@ -621,7 +621,7 @@ impl CronJobExecutor { { let delay = backoff.fail(&mut self.rt.rng()); tracing::error!("Failed to update action state, sleeping {delay:?}"); - report_error(&mut err); + report_error(&mut err).await; self.rt.wait(delay).await; } self.function_log.log_action(completion, usage_tracker); diff --git a/crates/application/src/export_worker.rs b/crates/application/src/export_worker.rs index b429ef5d..8e28b633 100644 --- a/crates/application/src/export_worker.rs +++ b/crates/application/src/export_worker.rs @@ -202,7 +202,7 @@ impl ExportWorker { async move { loop { if let Err(e) = worker.run().await { - report_error(&mut e.context("ExportWorker died")); + report_error(&mut e.context("ExportWorker died")).await; let delay = worker.backoff.fail(&mut worker.runtime.rng()); worker.runtime.wait(delay).await; } else { @@ -287,7 +287,7 @@ impl ExportWorker { return Ok(()); }, Err(mut e) => { - report_error(&mut e); + report_error(&mut e).await; let delay = self.backoff.fail(&mut self.runtime.rng()); tracing::error!("Export failed, retrying in {delay:?}"); self.runtime.wait(delay).await; diff --git a/crates/application/src/function_log.rs b/crates/application/src/function_log.rs index daf1d6f8..213ac28d 100644 --- a/crates/application/src/function_log.rs +++ b/crates/application/src/function_log.rs @@ -19,7 +19,7 @@ use common::{ ComponentPath, }, errors::{ - report_error, + report_error_sync, JsError, }, execution_context::ExecutionContext, @@ -1041,7 +1041,7 @@ impl FunctionExecutionLog { .lock() .log_execution(execution, send_console_events) { - report_error(&mut e); + report_error_sync(&mut e); } } @@ -1056,7 +1056,7 @@ impl FunctionExecutionLog { .lock() .log_execution_progress(log_lines, event_source, timestamp) { - report_error(&mut e); + report_error_sync(&mut e); } } @@ -1259,7 +1259,7 @@ impl Inner { // Don't let failing to construct the UDF execution record block sending // the other log events tracing::error!("failed to create UDF execution record: {}", e); - report_error(&mut e); + report_error_sync(&mut e); }, } diff --git a/crates/application/src/lib.rs b/crates/application/src/lib.rs index cbf3dd3e..4a736432 100644 --- a/crates/application/src/lib.rs +++ b/crates/application/src/lib.rs @@ -1534,7 +1534,8 @@ impl Application { // project default env variables. Report the error but do not fail the request. report_error(&mut anyhow::anyhow!( "Error setting initial environment variables: {e}" - )); + )) + .await; Ok(()) } else { Err(e) diff --git a/crates/application/src/scheduled_jobs/mod.rs b/crates/application/src/scheduled_jobs/mod.rs index ebabe7a2..eba01728 100644 --- a/crates/application/src/scheduled_jobs/mod.rs +++ b/crates/application/src/scheduled_jobs/mod.rs @@ -206,7 +206,7 @@ impl ScheduledJobExecutor { while let Err(mut e) = executor.run(&mut backoff).await { let delay = backoff.fail(&mut executor.rt.rng()); tracing::error!("Scheduled job executor failed, sleeping {delay:?}"); - report_error(&mut e); + report_error(&mut e).await; executor.rt.wait(delay).await; } } @@ -437,7 +437,7 @@ impl ScheduledJobContext { // If scheduling a retry hits an error, nothing has // changed so the job will remain at the head of the queue and // will be picked up by the scheduler in the next cycle. - report_error(&mut retry_err); + report_error(&mut retry_err).await; }, } }, @@ -465,7 +465,7 @@ impl ScheduledJobContext { // Only report OCCs that happen repeatedly if !system_error.is_occ() || (attempts.occ_errors as usize) > *UDF_EXECUTOR_OCC_MAX_RETRIES { - report_error(&mut system_error); + report_error(&mut system_error).await; } if system_error.is_occ() { attempts.occ_errors += 1; @@ -770,7 +770,7 @@ impl ScheduledJobContext { { let delay = backoff.fail(&mut self.rt.rng()); tracing::error!("Failed to update action state, sleeping {delay:?}"); - report_error(&mut err); + report_error(&mut err).await; self.rt.wait(delay).await; } self.function_log.log_action(completion, usage_tracker); @@ -882,7 +882,7 @@ impl ScheduledJobGarbageCollector { tracing::error!("Scheduled job garbage collector failed, sleeping {delay:?}"); // Only report OCCs that happen repeatedly if !e.is_occ() || (backoff.failures() as usize) > *UDF_EXECUTOR_OCC_MAX_RETRIES { - report_error(&mut e); + report_error(&mut e).await; } garbage_collector.rt.wait(delay).await; } diff --git a/crates/application/src/schema_worker/mod.rs b/crates/application/src/schema_worker/mod.rs index 19f4681e..0e95b3fc 100644 --- a/crates/application/src/schema_worker/mod.rs +++ b/crates/application/src/schema_worker/mod.rs @@ -81,7 +81,7 @@ impl SchemaWorker { loop { if let Err(e) = worker.run().await { let delay = backoff.fail(&mut worker.runtime.rng()); - report_error(&mut e.context("SchemaWorker died")); + report_error(&mut e.context("SchemaWorker died")).await; tracing::error!("Schema worker failed, sleeping {delay:?}"); worker.runtime.wait(delay).await; } else { diff --git a/crates/application/src/snapshot_import/mod.rs b/crates/application/src/snapshot_import/mod.rs index fa438cc0..64e34d54 100644 --- a/crates/application/src/snapshot_import/mod.rs +++ b/crates/application/src/snapshot_import/mod.rs @@ -213,7 +213,7 @@ impl SnapshotImportExecutor { Err(e) => { let mut e = wrap_import_err(e); if e.is_bad_request() { - report_error(&mut e); + report_error(&mut e).await; self.database .execute_with_overloaded_retries( Identity::system(), @@ -273,7 +273,7 @@ impl SnapshotImportExecutor { Err(e) => { let mut e = wrap_import_err(e); if e.is_bad_request() { - report_error(&mut e); + report_error(&mut e).await; self.database .execute_with_overloaded_retries( Identity::system(), diff --git a/crates/application/src/snapshot_import/worker.rs b/crates/application/src/snapshot_import/worker.rs index 5118cd7d..91f02788 100644 --- a/crates/application/src/snapshot_import/worker.rs +++ b/crates/application/src/snapshot_import/worker.rs @@ -54,7 +54,7 @@ impl SnapshotImportWorker { async move { loop { if let Err(e) = Self::run_once(&mut worker).await { - report_error(&mut e.context("SnapshotImportWorker died")); + report_error(&mut e.context("SnapshotImportWorker died")).await; let delay = worker.backoff.fail(&mut worker.runtime.rng()); worker.runtime.wait(delay).await; } else { diff --git a/crates/application/src/system_table_cleanup/mod.rs b/crates/application/src/system_table_cleanup/mod.rs index 1d0c29cd..a8abd5c1 100644 --- a/crates/application/src/system_table_cleanup/mod.rs +++ b/crates/application/src/system_table_cleanup/mod.rs @@ -99,7 +99,7 @@ impl SystemTableCleanupWorker { } loop { if let Err(e) = worker.run().await { - report_error(&mut e.context("SystemTableCleanupWorker died")); + report_error(&mut e.context("SystemTableCleanupWorker died")).await; } } } diff --git a/crates/application/src/table_summary_worker.rs b/crates/application/src/table_summary_worker.rs index baa5a693..b1290141 100644 --- a/crates/application/src/table_summary_worker.rs +++ b/crates/application/src/table_summary_worker.rs @@ -152,7 +152,7 @@ impl TableSummaryWorker { } } if let Err(mut err) = result { - report_error(&mut err); + report_error(&mut err).await; } let wait_fut = self.runtime.wait(Duration::from_secs(10)).fuse(); pin_mut!(wait_fut); diff --git a/crates/common/src/errors.rs b/crates/common/src/errors.rs index 5a468bf5..810720c3 100644 --- a/crates/common/src/errors.rs +++ b/crates/common/src/errors.rs @@ -60,7 +60,7 @@ pub struct MainError(anyhow::Error); impl> From for MainError { fn from(e: T) -> Self { let mut err: anyhow::Error = e.into(); - report_error(&mut err); + report_error_sync(&mut err); Self(err) } } @@ -106,7 +106,17 @@ fn strip_pii(err: &mut anyhow::Error) { /// This is the one point where we call into Sentry. /// /// Other parts of codebase should not use the `sentry_anyhow` crate directly! -pub fn report_error(err: &mut anyhow::Error) { +pub async fn report_error(err: &mut anyhow::Error) { + // Yield in case this is during shutdown - at which point, errors being reported + // explicitly aren't useful. Yielding allows tokio to complete a cancellation. + tokio::task::yield_now().await; + + report_error_sync(err); +} + +/// Use the `pub async fn report_error` above if possible to log an error to +/// sentry. This is a synchronous version for use in sync contexts. +pub fn report_error_sync(err: &mut anyhow::Error) { strip_pii(err); if let Some(label) = err.metric_server_error_label() { log_errors_reported_total(label); @@ -165,7 +175,7 @@ pub fn report_error(err: &mut anyhow::Error) { /// See https://docs.rs/anyhow/latest/anyhow/struct.Error.html#display-representations pub fn recapture_stacktrace(mut err: anyhow::Error) -> anyhow::Error { let new_error = recapture_stacktrace_noreport(&err); - report_error(&mut err); // report original error, mutating it to strip pii + report_error_sync(&mut err); // report original error, mutating it to strip pii new_error } @@ -525,7 +535,7 @@ impl JsError { Err(err) => { // This is not expected so report an error. let mut err = err.context("Failed to lookup source_map"); - report_error(&mut err); + report_error_sync(&mut err); continue; }, }; diff --git a/crates/common/src/http/mod.rs b/crates/common/src/http/mod.rs index b98c35a5..b99d79f4 100644 --- a/crates/common/src/http/mod.rs +++ b/crates/common/src/http/mod.rs @@ -102,7 +102,7 @@ use utoipa::ToSchema; use self::metrics::log_http_request; use crate::{ - errors::report_error, + errors::report_error_sync, knobs::HTTP_SERVER_TCP_BACKLOG, metrics::log_client_version_unsupported, runtime::TaskManager, @@ -541,7 +541,7 @@ impl IntoResponse for HttpResponseError { fn into_response(mut self) -> Response { // This is the only place we capture errors to sentry because it is the exit // point of the HTTP layer - report_error(&mut self.trace); + report_error_sync(&mut self.trace); self.http_error.into_response() } } diff --git a/crates/config_loader/src/lib.rs b/crates/config_loader/src/lib.rs index 2c7f3b2d..536a09c1 100644 --- a/crates/config_loader/src/lib.rs +++ b/crates/config_loader/src/lib.rs @@ -92,7 +92,7 @@ impl ConfigLoader { }); }, Err(mut e) => { - report_error(&mut e); + report_error(&mut e).await; continue; }, } diff --git a/crates/database/src/index_worker.rs b/crates/database/src/index_worker.rs index e88ad10f..ac2f5f25 100644 --- a/crates/database/src/index_worker.rs +++ b/crates/database/src/index_worker.rs @@ -230,7 +230,7 @@ impl IndexWorker { } loop { if let Err(e) = worker.run().await { - report_error(&mut e.context("IndexWorker died")); + report_error(&mut e.context("IndexWorker died")).await; let delay = worker.backoff.fail(&mut worker.runtime.rng()); log::error!( "IndexWorker died, num_failures: {}. Backing off for {}ms", diff --git a/crates/database/src/index_workers/retriable_worker.rs b/crates/database/src/index_workers/retriable_worker.rs index 66081434..e75f8da4 100644 --- a/crates/database/src/index_workers/retriable_worker.rs +++ b/crates/database/src/index_workers/retriable_worker.rs @@ -84,7 +84,7 @@ async fn retry_failures( let expected_error = expected_occ || expected_overloaded; if !expected_error { - report_error(&mut e); + report_error(&mut e).await; } let delay = backoff.fail(&mut runtime.rng()); tracing::error!( diff --git a/crates/database/src/retention.rs b/crates/database/src/retention.rs index 648ee8df..6b8d8f6d 100644 --- a/crates/database/src/retention.rs +++ b/crates/database/src/retention.rs @@ -463,11 +463,11 @@ impl LeaderRetentionManager { ) { match ts { Err(mut err) => { - report_error(&mut err); + report_error(&mut err).await; }, Ok(Some(ts)) => { if let Err(err) = snapshot_sender.send(ts) { - report_error(&mut err.into()); + report_error(&mut err.into()).await; } }, Ok(None) => {}, @@ -557,7 +557,8 @@ impl LeaderRetentionManager { report_error(&mut anyhow::anyhow!( "Skipping deleting indexes for {id}@{prev_rev_ts}. It is a tombstone \ at {prev_rev_ts} but has a later revision at {ts}" - )); + )) + .await; log_retention_scanned_document(maybe_doc.is_none(), false); continue; }; @@ -968,7 +969,8 @@ impl LeaderRetentionManager { report_error(&mut anyhow::anyhow!( "retention wanted to delete {index_entries_to_delete} entries but found \ {deleted_rows} to delete" - )); + )) + .await; } tracing::trace!("delete: deleted {deleted_rows:?} rows"); @@ -1006,7 +1008,8 @@ impl LeaderRetentionManager { report_error(&mut anyhow::anyhow!( "retention wanted to delete {documents_to_delete} documents but found \ {deleted_rows} to delete" - )); + )) + .await; } tracing::trace!("delete_documents: deleted {deleted_rows:?} rows"); @@ -1043,7 +1046,7 @@ impl LeaderRetentionManager { if !is_working { min_snapshot_ts = match min_snapshot_rx.changed().await { Err(err) => { - report_error(&mut err.into()); + report_error(&mut err.into()).await; // Fall back to polling if the channel is closed or falls over. This should // really never happen. Self::wait_with_jitter(&rt, *MAX_RETENTION_DELAY_SECONDS).await; @@ -1130,7 +1133,7 @@ impl LeaderRetentionManager { } }; if let Err(mut err) = r { - report_error(&mut err); + report_error(&mut err).await; let delay = error_backoff.fail(&mut rt.rng()); tracing::debug!("go_delete: error, {err:?}, delaying {delay:?}"); rt.wait(delay).await; @@ -1168,7 +1171,7 @@ impl LeaderRetentionManager { if !is_working { min_document_snapshot_ts = match min_document_snapshot_rx.changed().await { Err(err) => { - report_error(&mut err.into()); + report_error(&mut err.into()).await; // Fall back to polling if the channel is closed or falls over. This should // really never happen. Self::wait_with_jitter(&rt, *DOCUMENT_RETENTION_BATCH_INTERVAL_SECONDS) @@ -1229,7 +1232,7 @@ impl LeaderRetentionManager { } }; if let Err(mut err) = r { - report_error(&mut err); + report_error(&mut err).await; let delay = error_backoff.fail(&mut rt.rng()); tracing::debug!("go_delete_documents: error, {err:?}, delaying {delay:?}"); rt.wait(delay).await; diff --git a/crates/database/src/search_index_bootstrap.rs b/crates/database/src/search_index_bootstrap.rs index 21f33afe..14f8b276 100644 --- a/crates/database/src/search_index_bootstrap.rs +++ b/crates/database/src/search_index_bootstrap.rs @@ -476,7 +476,7 @@ impl SearchIndexBootstrapWorker { // Forgive OCC errors < N to match UDF mutation retry behavior. if !e.is_occ() || (self.backoff.failures() as usize) > *UDF_EXECUTOR_OCC_MAX_RETRIES { - report_error(&mut e.context("SearchAndVectorBootstrapWorker died")); + report_error(&mut e.context("SearchAndVectorBootstrapWorker died")).await; tracing::error!( "SearchIndexBoostrapWorker died, num_failures: {}. Backing off for {}ms", self.backoff.failures(), diff --git a/crates/database/src/subscription.rs b/crates/database/src/subscription.rs index e47d95fc..7c263a22 100644 --- a/crates/database/src/subscription.rs +++ b/crates/database/src/subscription.rs @@ -151,7 +151,7 @@ impl SubscriptionsWorker { Ok(s) => { let _: Result<_, _> = result.send(s); }, - Err(mut e) => report_error(&mut e), + Err(mut e) => report_error(&mut e).await, } }, Some(SubscriptionRequest::Cancel(key)) => { @@ -165,7 +165,7 @@ impl SubscriptionsWorker { }, next_ts = self.subscriptions.wait_for_next_ts() => { if let Err(mut e) = self.subscriptions.advance_log(next_ts) { - report_error(&mut e); + report_error(&mut e).await; } }, } diff --git a/crates/isolate/src/environment/helpers/promise.rs b/crates/isolate/src/environment/helpers/promise.rs index d74cdac1..b3ddba07 100644 --- a/crates/isolate/src/environment/helpers/promise.rs +++ b/crates/isolate/src/environment/helpers/promise.rs @@ -1,6 +1,6 @@ use anyhow::anyhow; use common::errors::{ - report_error, + report_error_sync, JsError, }; use deno_core::v8; @@ -47,7 +47,7 @@ fn resolve_promise_inner( // - log it now, and convert it to a JsError if !e.is_deterministic_user_error() { if allow_all_errors { - report_error(&mut e); + report_error_sync(&mut e); } else { return Err(e); }; diff --git a/crates/isolate/src/ops/errors.rs b/crates/isolate/src/ops/errors.rs index 35340625..816199bf 100644 --- a/crates/isolate/src/ops/errors.rs +++ b/crates/isolate/src/ops/errors.rs @@ -1,6 +1,6 @@ use anyhow::Context; use common::errors::{ - report_error, + report_error_sync, FrameData, JsError, }; @@ -17,7 +17,7 @@ pub fn op_throw_uncatchable_developer_error<'b, P: OpProvider<'b>>( let js_error = JsError::from_frames(message.clone(), frame_data, None, |s| { provider.lookup_source_map(s) }); - report_error(&mut anyhow::anyhow!(format!( + report_error_sync(&mut anyhow::anyhow!(format!( "UncatchableDeveloperError: {}", message ))); diff --git a/crates/isolate/src/termination.rs b/crates/isolate/src/termination.rs index b39cee03..8e53ae2d 100644 --- a/crates/isolate/src/termination.rs +++ b/crates/isolate/src/termination.rs @@ -4,7 +4,7 @@ use std::{ }; use common::errors::{ - report_error, + report_error_sync, JsError, TIMEOUT_ERROR_MESSAGE, }; @@ -83,7 +83,7 @@ impl IsolateHandle { if inner.reason.is_none() { inner.reason = Some(reason); } else { - report_error(&mut anyhow::anyhow!( + report_error_sync(&mut anyhow::anyhow!( "termination after already terminated: {reason:?}" )); } diff --git a/crates/local_backend/src/subs/mod.rs b/crates/local_backend/src/subs/mod.rs index e07b5428..5aa2fac5 100644 --- a/crates/local_backend/src/subs/mod.rs +++ b/crates/local_backend/src/subs/mod.rs @@ -22,8 +22,8 @@ use axum::{ }; use common::{ errors::{ - self, report_error, + report_error_sync, }, http::{ ExtractClientVersion, @@ -284,11 +284,11 @@ async fn run_sync_socket( if is_connection_closed_error(&*e) { log_websocket_closed_error_not_reported() } else { - errors::report_error(&mut e); + report_error(&mut e).await; } } } - sentry::with_scope(|s| *s = sentry_scope, || report_error(&mut err)); + sentry::with_scope(|s| *s = sentry_scope, || report_error_sync(&mut err)); if let Some(label) = err.metric_server_error_label() { log_websocket_server_error(label); } @@ -307,7 +307,7 @@ async fn run_sync_socket( log_websocket_closed_error_not_reported() } else { let msg = format!("Failed to gracefully close WebSocket: {e:?}"); - errors::report_error(&mut anyhow::anyhow!(e).context(msg)); + report_error(&mut anyhow::anyhow!(e).context(msg)).await; } } } @@ -319,7 +319,7 @@ async fn run_sync_socket( if let Err(e) = socket.flush().await { if !is_connection_closed_error(&e) { let msg = format!("Failed to flush WebSocket: {e:?}"); - errors::report_error(&mut anyhow::anyhow!(e).context(msg)); + report_error(&mut anyhow::anyhow!(e).context(msg)).await; } } log_websocket_closed(); diff --git a/crates/search/src/fragmented_segment.rs b/crates/search/src/fragmented_segment.rs index 7a9cd6c7..c0e306d2 100644 --- a/crates/search/src/fragmented_segment.rs +++ b/crates/search/src/fragmented_segment.rs @@ -401,7 +401,7 @@ impl FragmentedSegmentPrefetcher { { log_vector_prefetch_expiration(); } else { - report_error(&mut e); + report_error(&mut e).await; } } }) diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index ffc967f2..feaa7b54 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -675,10 +675,11 @@ async fn stream_object_with_retries( return Err(e); }, Err(e) => { - report_error(&mut anyhow::anyhow!(e).context(format!( + let mut toreport = anyhow::anyhow!(e).context(format!( "failed while reading stream for {key:?}. {retries_remaining} attempts \ remaining" - ))); + )); + report_error(&mut toreport).await; let new_range = (small_byte_range.start + bytes_yielded as u64)..small_byte_range.end; let output = storage