diff --git a/chain-signatures/node/src/indexer.rs b/chain-signatures/node/src/indexer.rs index 69d7e237f..bb5db968d 100644 --- a/chain-signatures/node/src/indexer.rs +++ b/chain-signatures/node/src/indexer.rs @@ -91,6 +91,7 @@ async fn handle_block( mut block: near_lake_primitives::block::Block, ctx: &Context, ) -> anyhow::Result<()> { + let mut pending_requests = Vec::new(); for action in block.actions().cloned().collect::>() { if action.receiver_id() == ctx.mpc_contract_id { let receipt = @@ -132,8 +133,7 @@ async fn handle_block( entropy = hex::encode(entropy), "indexed new `sign` function call" ); - let mut queue = ctx.queue.write().await; - queue.add(SignRequest { + pending_requests.push(SignRequest { receipt_id, request: arguments.request, epsilon, @@ -141,10 +141,6 @@ async fn handle_block( entropy, time_added: Instant::now(), }); - crate::metrics::NUM_SIGN_REQUESTS - .with_label_values(&[ctx.gcp_service.account_id.as_str()]) - .inc(); - drop(queue); } } } @@ -162,6 +158,17 @@ async fn handle_block( .with_label_values(&[ctx.gcp_service.account_id.as_str()]) .set(block.block_height() as i64); + // Add the requests after going through the whole block to avoid partial processing if indexer fails somewhere. + // This way we can revisit the same block if we failed while not having added the requests partially. + let mut queue = ctx.queue.write().await; + for request in pending_requests { + queue.add(request); + crate::metrics::NUM_SIGN_REQUESTS + .with_label_values(&[ctx.gcp_service.account_id.as_str()]) + .inc(); + } + drop(queue); + if block.block_height() % 1000 == 0 { tracing::info!(block_height = block.block_height(), "indexed block"); }