Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix transactions service being stuck before warp sync finishes #1110

Merged
merged 2 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 53 additions & 19 deletions light-base/src/transactions_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ use core::{
time::Duration,
};
use futures_channel::mpsc;
use futures_lite::FutureExt as _;
use futures_util::stream::FuturesUnordered;
use futures_util::{future, FutureExt as _, SinkExt as _, StreamExt as _};
use itertools::Itertools as _;
Expand Down Expand Up @@ -352,19 +353,51 @@ async fn background_task<TPlat: PlatformRef>(mut config: BackgroundTaskConfig<TP
// service. This happens when there is a gap in the blocks, either intentionally (e.g.
// after a Grandpa warp sync) or because the transactions service was too busy to process
// the new blocks.
let mut subscribe_all = {
let sub_future = async {
Some(
// The buffer size should be large enough so that, if the CPU is busy, it
// doesn't become full before the execution of the transactions service resumes.
// The maximum number of pinned block is ignored, as this maximum is a way to
// avoid malicious behaviors. This code is by definition not considered
// malicious.
worker
.runtime_service
.subscribe_all(
"transactions-service",
32,
NonZeroUsize::new(usize::max_value()).unwrap(),
)
.await,
)
};

// Because `runtime_service.subscribe_all()` might take a long time (potentially
// forever), we need to process messages coming from the foreground in parallel.
let from_foreground = &mut config.from_foreground;
let messages_process = async move {
loop {
match from_foreground.next().await {
Some(ToBackground::SubmitTransaction {
updates_report: Some(mut updates_report),
..
}) => {
let _ = updates_report
.send(TransactionStatus::Dropped(DropReason::GapInChain))
.await;
}
Some(ToBackground::SubmitTransaction { .. }) => {}
None => break None,
}
}
};

match sub_future.or(messages_process).await {
Some(s) => s,
None => return,
}
};

// The buffer size should be large enough so that, if the CPU is busy, it doesn't
// become full before the execution of the transactions service resumes.
// The maximum number of pinned block is ignored, as this maximum is a way to avoid
// malicious behaviors. This code is by definition not considered malicious.
let mut subscribe_all = worker
.runtime_service
.subscribe_all(
"transactions-service",
32,
NonZeroUsize::new(usize::max_value()).unwrap(),
)
.await;
let initial_finalized_block_hash = header::hash_from_scale_encoded_header(
&subscribe_all.finalized_block_scale_encoded_header,
);
Expand Down Expand Up @@ -491,7 +524,7 @@ async fn background_task<TPlat: PlatformRef>(mut config: BackgroundTaskConfig<TP
let (to_execute, result_rx) = validation_future.remote_handle();
worker
.validations_in_progress
.push(to_execute.map(move |()| to_start_validate).boxed());
.push(Box::pin(to_execute.map(move |()| to_start_validate)));
let tx = worker
.pending_transactions
.transaction_user_data_mut(to_start_validate)
Expand Down Expand Up @@ -582,8 +615,9 @@ async fn background_task<TPlat: PlatformRef>(mut config: BackgroundTaskConfig<TP
NonZeroU32::new(3).unwrap(),
);

async move { (block_hash, download_future.await.map(|b| b.body.unwrap())) }
.boxed()
Box::pin(
async move { (block_hash, download_future.await.map(|b| b.body.unwrap())) },
)
});

worker
Expand Down Expand Up @@ -756,10 +790,10 @@ async fn background_task<TPlat: PlatformRef>(mut config: BackgroundTaskConfig<TP
tx.when_reannounce = now + Duration::from_secs(5);
worker.next_reannounce.push({
let platform = worker.platform.clone();
async move {
Box::pin(async move {
platform.sleep(Duration::from_secs(5)).await;
maybe_reannounce_tx_id
}.boxed()
})
});

// Perform the announce.
Expand Down Expand Up @@ -844,9 +878,9 @@ async fn background_task<TPlat: PlatformRef>(mut config: BackgroundTaskConfig<TP
.update_status(TransactionStatus::Validated);

// Schedule this transaction for announcement.
worker.next_reannounce.push(async move {
worker.next_reannounce.push(Box::pin(async move {
maybe_validated_tx_id
}.boxed());
}));

Ok(result)
}
Expand Down
9 changes: 9 additions & 0 deletions wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

## Unreleased

### Changed

- Transactions submitted through the JSON-RPC server before the warp syncing process is finished will now immediately be dropped. ([#1110](https://github.com/smol-dot/smoldot/pull/1110))

### Fixed

- Fix `Chain.remove()` not actually removing the chain until the warp syncing process is finished (which might never happen if for example bootnodes are misconfigured). ([#1110](https://github.com/smol-dot/smoldot/pull/1110))
- Fix JSON-RPC server not processing requests if many transactions are submitted before the warp syncing process is finished. ([#1110](https://github.com/smol-dot/smoldot/pull/1110))

## 1.0.17 - 2023-08-25

### Changed
Expand Down
Loading