Skip to content

Commit

Permalink
Make report_error async. Yield once before reporting. (#31853)
Browse files Browse the repository at this point in the history
Yielding allows the task to die during shutdown - which can
avoid spuriously reporting useless abort errors to sentry.
Should hopefully reduce a lot of noise during pushes.

I considered making it the responsibility of the callsite to yield, but given that every background worker would prefer this yield, and it doesn't hurt to add it on main threads - I just achieved it everywhere with this technique. Open to better ideas, but this seems to compile.

GitOrigin-RevId: 168f9e54d7f66e942959d51c0d17749820b03596
  • Loading branch information
nipunn1313 authored and Convex, Inc committed Dec 4, 2024
1 parent 2ab659b commit c6d98cf
Show file tree
Hide file tree
Showing 24 changed files with 71 additions and 56 deletions.
6 changes: 3 additions & 3 deletions crates/application/src/cron_jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
while let Err(mut e) = executor.run(&mut backoff).await {
// Only report OCCs that happen repeatedly
if !e.is_occ() || (backoff.failures() as usize) > *UDF_EXECUTOR_OCC_MAX_RETRIES {
report_error(&mut e);
report_error(&mut e).await;
}
let delay = backoff.fail(&mut executor.rt.rng());
tracing::error!("Cron job executor failed, sleeping {delay:?}");
Expand Down Expand Up @@ -308,7 +308,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
Err(mut e) => {
let delay = function_backoff.fail(&mut self.rt.rng());
tracing::error!("System error executing job:, sleeping {delay:?}");
report_error(&mut e);
report_error(&mut e).await;
metrics::log_cron_job_failure(&e);
self.rt.wait(delay).await;
},
Expand Down Expand Up @@ -621,7 +621,7 @@ impl<RT: Runtime> CronJobExecutor<RT> {
{
let delay = backoff.fail(&mut self.rt.rng());
tracing::error!("Failed to update action state, sleeping {delay:?}");
report_error(&mut err);
report_error(&mut err).await;
self.rt.wait(delay).await;
}
self.function_log.log_action(completion, usage_tracker);
Expand Down
4 changes: 2 additions & 2 deletions crates/application/src/export_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl<RT: Runtime> ExportWorker<RT> {
async move {
loop {
if let Err(e) = worker.run().await {
report_error(&mut e.context("ExportWorker died"));
report_error(&mut e.context("ExportWorker died")).await;
let delay = worker.backoff.fail(&mut worker.runtime.rng());
worker.runtime.wait(delay).await;
} else {
Expand Down Expand Up @@ -287,7 +287,7 @@ impl<RT: Runtime> ExportWorker<RT> {
return Ok(());
},
Err(mut e) => {
report_error(&mut e);
report_error(&mut e).await;
let delay = self.backoff.fail(&mut self.runtime.rng());
tracing::error!("Export failed, retrying in {delay:?}");
self.runtime.wait(delay).await;
Expand Down
8 changes: 4 additions & 4 deletions crates/application/src/function_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use common::{
ComponentPath,
},
errors::{
report_error,
report_error_sync,
JsError,
},
execution_context::ExecutionContext,
Expand Down Expand Up @@ -1041,7 +1041,7 @@ impl<RT: Runtime> FunctionExecutionLog<RT> {
.lock()
.log_execution(execution, send_console_events)
{
report_error(&mut e);
report_error_sync(&mut e);
}
}

Expand All @@ -1056,7 +1056,7 @@ impl<RT: Runtime> FunctionExecutionLog<RT> {
.lock()
.log_execution_progress(log_lines, event_source, timestamp)
{
report_error(&mut e);
report_error_sync(&mut e);
}
}

Expand Down Expand Up @@ -1259,7 +1259,7 @@ impl<RT: Runtime> Inner<RT> {
// Don't let failing to construct the UDF execution record block sending
// the other log events
tracing::error!("failed to create UDF execution record: {}", e);
report_error(&mut e);
report_error_sync(&mut e);
},
}

Expand Down
3 changes: 2 additions & 1 deletion crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1534,7 +1534,8 @@ impl<RT: Runtime> Application<RT> {
// project default env variables. Report the error but do not fail the request.
report_error(&mut anyhow::anyhow!(
"Error setting initial environment variables: {e}"
));
))
.await;
Ok(())
} else {
Err(e)
Expand Down
10 changes: 5 additions & 5 deletions crates/application/src/scheduled_jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl<RT: Runtime> ScheduledJobExecutor<RT> {
while let Err(mut e) = executor.run(&mut backoff).await {
let delay = backoff.fail(&mut executor.rt.rng());
tracing::error!("Scheduled job executor failed, sleeping {delay:?}");
report_error(&mut e);
report_error(&mut e).await;
executor.rt.wait(delay).await;
}
}
Expand Down Expand Up @@ -437,7 +437,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
// If scheduling a retry hits an error, nothing has
// changed so the job will remain at the head of the queue and
// will be picked up by the scheduler in the next cycle.
report_error(&mut retry_err);
report_error(&mut retry_err).await;
},
}
},
Expand Down Expand Up @@ -465,7 +465,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
// Only report OCCs that happen repeatedly
if !system_error.is_occ() || (attempts.occ_errors as usize) > *UDF_EXECUTOR_OCC_MAX_RETRIES
{
report_error(&mut system_error);
report_error(&mut system_error).await;
}
if system_error.is_occ() {
attempts.occ_errors += 1;
Expand Down Expand Up @@ -770,7 +770,7 @@ impl<RT: Runtime> ScheduledJobContext<RT> {
{
let delay = backoff.fail(&mut self.rt.rng());
tracing::error!("Failed to update action state, sleeping {delay:?}");
report_error(&mut err);
report_error(&mut err).await;
self.rt.wait(delay).await;
}
self.function_log.log_action(completion, usage_tracker);
Expand Down Expand Up @@ -882,7 +882,7 @@ impl<RT: Runtime> ScheduledJobGarbageCollector<RT> {
tracing::error!("Scheduled job garbage collector failed, sleeping {delay:?}");
// Only report OCCs that happen repeatedly
if !e.is_occ() || (backoff.failures() as usize) > *UDF_EXECUTOR_OCC_MAX_RETRIES {
report_error(&mut e);
report_error(&mut e).await;
}
garbage_collector.rt.wait(delay).await;
}
Expand Down
2 changes: 1 addition & 1 deletion crates/application/src/schema_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<RT: Runtime> SchemaWorker<RT> {
loop {
if let Err(e) = worker.run().await {
let delay = backoff.fail(&mut worker.runtime.rng());
report_error(&mut e.context("SchemaWorker died"));
report_error(&mut e.context("SchemaWorker died")).await;
tracing::error!("Schema worker failed, sleeping {delay:?}");
worker.runtime.wait(delay).await;
} else {
Expand Down
4 changes: 2 additions & 2 deletions crates/application/src/snapshot_import/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl<RT: Runtime> SnapshotImportExecutor<RT> {
Err(e) => {
let mut e = wrap_import_err(e);
if e.is_bad_request() {
report_error(&mut e);
report_error(&mut e).await;
self.database
.execute_with_overloaded_retries(
Identity::system(),
Expand Down Expand Up @@ -273,7 +273,7 @@ impl<RT: Runtime> SnapshotImportExecutor<RT> {
Err(e) => {
let mut e = wrap_import_err(e);
if e.is_bad_request() {
report_error(&mut e);
report_error(&mut e).await;
self.database
.execute_with_overloaded_retries(
Identity::system(),
Expand Down
2 changes: 1 addition & 1 deletion crates/application/src/snapshot_import/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl SnapshotImportWorker {
async move {
loop {
if let Err(e) = Self::run_once(&mut worker).await {
report_error(&mut e.context("SnapshotImportWorker died"));
report_error(&mut e.context("SnapshotImportWorker died")).await;
let delay = worker.backoff.fail(&mut worker.runtime.rng());
worker.runtime.wait(delay).await;
} else {
Expand Down
2 changes: 1 addition & 1 deletion crates/application/src/system_table_cleanup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl<RT: Runtime> SystemTableCleanupWorker<RT> {
}
loop {
if let Err(e) = worker.run().await {
report_error(&mut e.context("SystemTableCleanupWorker died"));
report_error(&mut e.context("SystemTableCleanupWorker died")).await;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/application/src/table_summary_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl<RT: Runtime> TableSummaryWorker<RT> {
}
}
if let Err(mut err) = result {
report_error(&mut err);
report_error(&mut err).await;
}
let wait_fut = self.runtime.wait(Duration::from_secs(10)).fuse();
pin_mut!(wait_fut);
Expand Down
18 changes: 14 additions & 4 deletions crates/common/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct MainError(anyhow::Error);
impl<T: Into<anyhow::Error>> From<T> for MainError {
fn from(e: T) -> Self {
let mut err: anyhow::Error = e.into();
report_error(&mut err);
report_error_sync(&mut err);
Self(err)
}
}
Expand Down Expand Up @@ -106,7 +106,17 @@ fn strip_pii(err: &mut anyhow::Error) {
/// This is the one point where we call into Sentry.
///
/// Other parts of codebase should not use the `sentry_anyhow` crate directly!
pub fn report_error(err: &mut anyhow::Error) {
pub async fn report_error(err: &mut anyhow::Error) {
// Yield in case this is during shutdown - at which point, errors being reported
// explicitly aren't useful. Yielding allows tokio to complete a cancellation.
tokio::task::yield_now().await;

report_error_sync(err);
}

/// Use the `pub async fn report_error` above if possible to log an error to
/// sentry. This is a synchronous version for use in sync contexts.
pub fn report_error_sync(err: &mut anyhow::Error) {
strip_pii(err);
if let Some(label) = err.metric_server_error_label() {
log_errors_reported_total(label);
Expand Down Expand Up @@ -165,7 +175,7 @@ pub fn report_error(err: &mut anyhow::Error) {
/// See https://docs.rs/anyhow/latest/anyhow/struct.Error.html#display-representations
pub fn recapture_stacktrace(mut err: anyhow::Error) -> anyhow::Error {
let new_error = recapture_stacktrace_noreport(&err);
report_error(&mut err); // report original error, mutating it to strip pii
report_error_sync(&mut err); // report original error, mutating it to strip pii
new_error
}

Expand Down Expand Up @@ -525,7 +535,7 @@ impl JsError {
Err(err) => {
// This is not expected so report an error.
let mut err = err.context("Failed to lookup source_map");
report_error(&mut err);
report_error_sync(&mut err);
continue;
},
};
Expand Down
4 changes: 2 additions & 2 deletions crates/common/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ use utoipa::ToSchema;

use self::metrics::log_http_request;
use crate::{
errors::report_error,
errors::report_error_sync,
knobs::HTTP_SERVER_TCP_BACKLOG,
metrics::log_client_version_unsupported,
runtime::TaskManager,
Expand Down Expand Up @@ -541,7 +541,7 @@ impl IntoResponse for HttpResponseError {
fn into_response(mut self) -> Response {
// This is the only place we capture errors to sentry because it is the exit
// point of the HTTP layer
report_error(&mut self.trace);
report_error_sync(&mut self.trace);
self.http_error.into_response()
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/config_loader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl<D: ConfigDecoder + Send + 'static> ConfigLoader<D> {
});
},
Err(mut e) => {
report_error(&mut e);
report_error(&mut e).await;
continue;
},
}
Expand Down
2 changes: 1 addition & 1 deletion crates/database/src/index_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl<RT: Runtime> IndexWorker<RT> {
}
loop {
if let Err(e) = worker.run().await {
report_error(&mut e.context("IndexWorker died"));
report_error(&mut e.context("IndexWorker died")).await;
let delay = worker.backoff.fail(&mut worker.runtime.rng());
log::error!(
"IndexWorker died, num_failures: {}. Backing off for {}ms",
Expand Down
2 changes: 1 addition & 1 deletion crates/database/src/index_workers/retriable_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async fn retry_failures<RT: Runtime>(

let expected_error = expected_occ || expected_overloaded;
if !expected_error {
report_error(&mut e);
report_error(&mut e).await;
}
let delay = backoff.fail(&mut runtime.rng());
tracing::error!(
Expand Down
21 changes: 12 additions & 9 deletions crates/database/src/retention.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,11 +463,11 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
) {
match ts {
Err(mut err) => {
report_error(&mut err);
report_error(&mut err).await;
},
Ok(Some(ts)) => {
if let Err(err) = snapshot_sender.send(ts) {
report_error(&mut err.into());
report_error(&mut err.into()).await;
}
},
Ok(None) => {},
Expand Down Expand Up @@ -557,7 +557,8 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
report_error(&mut anyhow::anyhow!(
"Skipping deleting indexes for {id}@{prev_rev_ts}. It is a tombstone \
at {prev_rev_ts} but has a later revision at {ts}"
));
))
.await;
log_retention_scanned_document(maybe_doc.is_none(), false);
continue;
};
Expand Down Expand Up @@ -968,7 +969,8 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
report_error(&mut anyhow::anyhow!(
"retention wanted to delete {index_entries_to_delete} entries but found \
{deleted_rows} to delete"
));
))
.await;
}

tracing::trace!("delete: deleted {deleted_rows:?} rows");
Expand Down Expand Up @@ -1006,7 +1008,8 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
report_error(&mut anyhow::anyhow!(
"retention wanted to delete {documents_to_delete} documents but found \
{deleted_rows} to delete"
));
))
.await;
}

tracing::trace!("delete_documents: deleted {deleted_rows:?} rows");
Expand Down Expand Up @@ -1043,7 +1046,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
if !is_working {
min_snapshot_ts = match min_snapshot_rx.changed().await {
Err(err) => {
report_error(&mut err.into());
report_error(&mut err.into()).await;
// Fall back to polling if the channel is closed or falls over. This should
// really never happen.
Self::wait_with_jitter(&rt, *MAX_RETENTION_DELAY_SECONDS).await;
Expand Down Expand Up @@ -1130,7 +1133,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
}
};
if let Err(mut err) = r {
report_error(&mut err);
report_error(&mut err).await;
let delay = error_backoff.fail(&mut rt.rng());
tracing::debug!("go_delete: error, {err:?}, delaying {delay:?}");
rt.wait(delay).await;
Expand Down Expand Up @@ -1168,7 +1171,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
if !is_working {
min_document_snapshot_ts = match min_document_snapshot_rx.changed().await {
Err(err) => {
report_error(&mut err.into());
report_error(&mut err.into()).await;
// Fall back to polling if the channel is closed or falls over. This should
// really never happen.
Self::wait_with_jitter(&rt, *DOCUMENT_RETENTION_BATCH_INTERVAL_SECONDS)
Expand Down Expand Up @@ -1229,7 +1232,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
}
};
if let Err(mut err) = r {
report_error(&mut err);
report_error(&mut err).await;
let delay = error_backoff.fail(&mut rt.rng());
tracing::debug!("go_delete_documents: error, {err:?}, delaying {delay:?}");
rt.wait(delay).await;
Expand Down
2 changes: 1 addition & 1 deletion crates/database/src/search_index_bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ impl<RT: Runtime> SearchIndexBootstrapWorker<RT> {
// Forgive OCC errors < N to match UDF mutation retry behavior.
if !e.is_occ() || (self.backoff.failures() as usize) > *UDF_EXECUTOR_OCC_MAX_RETRIES
{
report_error(&mut e.context("SearchAndVectorBootstrapWorker died"));
report_error(&mut e.context("SearchAndVectorBootstrapWorker died")).await;
tracing::error!(
"SearchIndexBoostrapWorker died, num_failures: {}. Backing off for {}ms",
self.backoff.failures(),
Expand Down
4 changes: 2 additions & 2 deletions crates/database/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl SubscriptionsWorker {
Ok(s) => {
let _: Result<_, _> = result.send(s);
},
Err(mut e) => report_error(&mut e),
Err(mut e) => report_error(&mut e).await,
}
},
Some(SubscriptionRequest::Cancel(key)) => {
Expand All @@ -165,7 +165,7 @@ impl SubscriptionsWorker {
},
next_ts = self.subscriptions.wait_for_next_ts() => {
if let Err(mut e) = self.subscriptions.advance_log(next_ts) {
report_error(&mut e);
report_error(&mut e).await;
}
},
}
Expand Down
Loading

0 comments on commit c6d98cf

Please sign in to comment.