Skip to content

Commit

Permalink
name spawned tasks in core
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon-Laux committed Oct 3, 2024
1 parent dc39cb7 commit 56f7394
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 57 deletions.
4 changes: 2 additions & 2 deletions deltachat-jsonrpc/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use deltachat::constants::DC_MSG_ID_DAYMARKER;
use deltachat::contact::{may_be_valid_addr, Contact, ContactId, Origin};
use deltachat::context::get_info;
use deltachat::ephemeral::Timer;
use deltachat::location;
use deltachat::message::get_msg_read_receipts;
use deltachat::message::{
self, delete_msgs, markseen_msgs, Message, MessageState, MsgId, Viewtype,
Expand All @@ -35,6 +34,7 @@ use deltachat::stock_str::StockMessage;
use deltachat::webxdc::StatusUpdateSerial;
use deltachat::EventEmitter;
use deltachat::{imex, info};
use deltachat::{location, spawn_named_task};
use sanitize_filename::is_sanitized;
use tokio::fs;
use tokio::sync::{watch, Mutex, RwLock};
Expand Down Expand Up @@ -1826,7 +1826,7 @@ impl CommandApi {
let ctx = self.get_context(account_id).await?;
let fut = send_webxdc_realtime_advertisement(&ctx, MsgId::new(instance_msg_id)).await?;
if let Some(fut) = fut {
tokio::spawn(async move {
spawn_named_task!("send_webxdc_realtime_advertisement", async move {
fut.await.ok();
info!(ctx, "send_webxdc_realtime_advertisement done")
});
Expand Down
53 changes: 28 additions & 25 deletions src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,36 +400,39 @@ impl Config {

#[cfg(not(target_os = "ios"))]
async fn create_lock_task(dir: PathBuf) -> Result<Option<JoinHandle<anyhow::Result<()>>>> {
use crate::spawn_named_task;

let lockfile = dir.join(LOCKFILE_NAME);
let mut lock = fd_lock::RwLock::new(fs::File::create(lockfile).await?);
let (locked_tx, locked_rx) = oneshot::channel();
let lock_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
let mut timeout = Duration::from_millis(100);
let _guard = loop {
match lock.try_write() {
Ok(guard) => break Ok(guard),
Err(err) => {
if timeout.as_millis() > 1600 {
break Err(err);
}
// We need to wait for the previous lock_task to be aborted thus unlocking
// the lockfile. We don't open configs for writing often outside of the
// tests, so this adds delays to the tests, but otherwise ok.
sleep(timeout).await;
if err.kind() == std::io::ErrorKind::WouldBlock {
timeout *= 2;
let lock_task: JoinHandle<anyhow::Result<()>> =
spawn_named_task!("lock_task", async move {
let mut timeout = Duration::from_millis(100);
let _guard = loop {
match lock.try_write() {
Ok(guard) => break Ok(guard),
Err(err) => {
if timeout.as_millis() > 1600 {
break Err(err);
}
// We need to wait for the previous lock_task to be aborted thus unlocking
// the lockfile. We don't open configs for writing often outside of the
// tests, so this adds delays to the tests, but otherwise ok.
sleep(timeout).await;
if err.kind() == std::io::ErrorKind::WouldBlock {
timeout *= 2;
}
}
}
}
}?;
locked_tx
.send(())
.ok()
.context("Cannot notify about lockfile locking")?;
let (_tx, rx) = oneshot::channel();
rx.await?;
Ok(())
});
}?;
locked_tx
.send(())
.ok()
.context("Cannot notify about lockfile locking")?;
let (_tx, rx) = oneshot::channel();
rx.await?;
Ok(())
});
locked_rx.await?;
Ok(Some(lock_task))
}
Expand Down
6 changes: 3 additions & 3 deletions src/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ use deltachat_contact_tools::{sanitize_bidi_characters, sanitize_single_line, Co
use deltachat_derive::{FromSql, ToSql};
use serde::{Deserialize, Serialize};
use strum_macros::EnumIter;
use tokio::task;


use crate::aheader::EncryptPreference;
use crate::blob::BlobObject;
use crate::chatlist::Chatlist;
use crate::chatlist_events;
use crate::color::str_to_color;
use crate::config::Config;
use crate::constants::{
Expand Down Expand Up @@ -50,6 +49,7 @@ use crate::tools::{
truncate_msg_text, IsNoneOrEmpty, SystemTime,
};
use crate::webxdc::StatusUpdateSerial;
use crate::{chatlist_events, spawn_named_task};

/// An chat item, such as a message or a marker.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -1438,7 +1438,7 @@ impl ChatId {
/// and otherwise notifying the user accordingly.
pub(crate) fn spawn_securejoin_wait(self, context: &Context, timeout: u64) {
let context = context.clone();
task::spawn(async move {
spawn_named_task!("securejoin_wait", async move {
tokio::time::sleep(Duration::from_secs(timeout)).await;
let chat = Chat::load_from_db(&context, self).await?;
chat.check_securejoin_wait(&context, 0).await?;
Expand Down
9 changes: 5 additions & 4 deletions src/configure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use futures::FutureExt;
use futures_lite::FutureExt as _;
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
use server_params::{expand_param_vector, ServerParams};
use tokio::task;

use crate::config::{self, Config};
use crate::context::Context;
Expand All @@ -35,10 +34,10 @@ use crate::message::{Message, Viewtype};
use crate::oauth2::get_oauth2_addr;
use crate::provider::{Protocol, Socket, UsernamePattern};
use crate::smtp::Smtp;
use crate::stock_str;
use crate::sync::Sync::*;
use crate::tools::time;
use crate::{chat, e2ee, provider};
use crate::{spawn_named_task, stock_str};
use deltachat_contact_tools::addr_cmp;

macro_rules! progress {
Expand Down Expand Up @@ -375,7 +374,9 @@ async fn configure(ctx: &Context, param: &EnteredLoginParam) -> Result<Configure
progress!(ctx, 1);

let ctx2 = ctx.clone();
let update_device_chats_handle = task::spawn(async move { ctx2.update_device_chats().await });
let update_device_chats_handle = spawn_named_task!("update_device_chats", async move {
ctx2.update_device_chats().await
});

let configured_param = get_configured_param(ctx, param).await?;
let strict_tls = configured_param.strict_tls();
Expand All @@ -390,7 +391,7 @@ async fn configure(ctx: &Context, param: &EnteredLoginParam) -> Result<Configure
let smtp_addr = configured_param.addr.clone();
let proxy_config = configured_param.proxy_config.clone();

let smtp_config_task = task::spawn(async move {
let smtp_config_task = spawn_named_task!("smtp_config", async move {
let mut smtp = Smtp::new();
smtp.connect(
&context_smtp,
Expand Down
4 changes: 2 additions & 2 deletions src/contact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::peerstate::Peerstate;
use crate::sql::{self, params_iter};
use crate::sync::{self, Sync::*};
use crate::tools::{duration_to_str, get_abs_path, smeared_time, time, SystemTime};
use crate::{chat, chatlist_events, stock_str};
use crate::{chat, chatlist_events, spawn_named_task, stock_str};

/// Time during which a contact is considered as seen recently.
const SEEN_RECENTLY_SECONDS: i64 = 600;
Expand Down Expand Up @@ -1791,7 +1791,7 @@ impl RecentlySeenLoop {
pub(crate) fn new(context: Context) -> Self {
let (interrupt_send, interrupt_recv) = channel::bounded(1);

let handle = task::spawn(Self::run(context, interrupt_recv));
let handle = spawn_named_task!("recently_seen_loop", Self::run(context, interrupt_recv));
Self {
handle,
interrupt_send,
Expand Down
3 changes: 2 additions & 1 deletion src/debug_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::context::Context;
use crate::events::EventType;
use crate::message::{Message, MsgId, Viewtype};
use crate::param::Param;
use crate::spawn_named_task;
use crate::tools::time;
use crate::webxdc::StatusUpdateItem;
use async_channel::{self as channel, Receiver, Sender};
Expand Down Expand Up @@ -150,7 +151,7 @@ pub(crate) async fn set_debug_logging_xdc(ctx: &Context, id: Option<MsgId>) -> a
let (sender, debug_logging_recv) = channel::bounded(1000);
let loop_handle = {
let ctx = ctx.clone();
task::spawn(async move {
spawn_named_task!("debug_logging_loop", async move {
debug_logging_loop(&ctx, debug_logging_recv).await
})
};
Expand Down
9 changes: 7 additions & 2 deletions src/imap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use rand::Rng;
use ratelimit::Ratelimit;
use url::Url;

use crate::chat::{self, ChatId, ChatIdBlocked};
use crate::chatlist_events;
use crate::config::Config;
use crate::constants::{self, Blocked, Chattype, ShowEmails};
Expand All @@ -47,6 +46,10 @@ use crate::scheduler::connectivity::ConnectivityStore;
use crate::sql;
use crate::stock_str;
use crate::tools::{self, create_id, duration_to_str};
use crate::{
chat::{self, ChatId, ChatIdBlocked},
spawn_named_task,
};

pub(crate) mod capabilities;
mod client;
Expand Down Expand Up @@ -1539,7 +1542,9 @@ impl Session {
} else if !context.push_subscriber.heartbeat_subscribed().await {
let context = context.clone();
// Subscribe for heartbeat notifications.
tokio::spawn(async move { context.push_subscriber.subscribe(&context).await });
spawn_named_task!("subscribe_to_heartbeat_notifications", async move {
context.push_subscriber.subscribe(&context).await
});
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions src/imex/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::message::{Message, Viewtype};
use crate::qr::Qr;
use crate::stock_str::backup_transfer_msg_body;
use crate::tools::{create_id, time, TempPathGuard};
use crate::EventType;
use crate::{spawn_named_task, EventType};

use super::{export_backup_stream, export_database, import_backup_stream, DBFILE_BACKUP_NAME};

Expand Down Expand Up @@ -131,7 +131,7 @@ impl BackupProvider {
let drop_token = drop_token.clone();
let endpoint = endpoint.clone();
let auth_token = auth_token.clone();
tokio::spawn(async move {
spawn_named_task!("accept_loop", async move {
Self::accept_loop(
context.clone(),
endpoint,
Expand Down
3 changes: 2 additions & 1 deletion src/net/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::context::Context;
use crate::net::proxy::ProxyConfig;
use crate::net::session::SessionStream;
use crate::net::tls::wrap_rustls;
use crate::spawn_named_task;

/// HTTP(S) GET response.
#[derive(Debug)]
Expand Down Expand Up @@ -85,7 +86,7 @@ where

let io = TokioIo::new(stream);
let (sender, conn) = hyper::client::conn::http1::handshake(io).await?;
tokio::task::spawn(conn);
spawn_named_task!("http_connection", conn);

Ok(sender)
}
Expand Down
11 changes: 7 additions & 4 deletions src/peer_channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::context::Context;
use crate::headerdef::HeaderDef;
use crate::message::{Message, MsgId, Viewtype};
use crate::mimeparser::SystemMessage;
use crate::EventType;
use crate::{spawn_named_task, EventType};

/// The length of an ed25519 `PublicKey`, in bytes.
const PUBLIC_KEY_LENGTH: usize = 32;
Expand Down Expand Up @@ -125,7 +125,7 @@ impl Iroh {
.split();

let ctx = ctx.clone();
let subscribe_loop = tokio::spawn(async move {
let subscribe_loop = spawn_named_task!("subscribe_loop", async move {
if let Err(e) = subscribe_loop(&ctx, gossip_receiver, topic, msg_id, join_tx).await {
warn!(ctx, "subscribe_loop failed: {e}")
}
Expand Down Expand Up @@ -264,7 +264,10 @@ impl Context {
let context = self.clone();

// Shuts down on deltachat shutdown
tokio::spawn(endpoint_loop(context, endpoint.clone(), gossip.clone()));
spawn_named_task!(
"endpoint_loop",
endpoint_loop(context, endpoint.clone(), gossip.clone())
);

Ok(Iroh {
endpoint,
Expand Down Expand Up @@ -442,7 +445,7 @@ async fn endpoint_loop(context: Context, endpoint: Endpoint, gossip: Gossip) {
info!(context, "IROH_REALTIME: accepting iroh connection");
let gossip = gossip.clone();
let context = context.clone();
tokio::spawn(async move {
spawn_named_task!("handle_connection", async move {
if let Err(err) = handle_connection(&context, conn, gossip).await {
warn!(context, "IROH_REALTIME: iroh connection error: {err}");
}
Expand Down
20 changes: 13 additions & 7 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ use crate::download::{download_msg, DownloadState};
use crate::ephemeral::{self, delete_expired_imap_messages};
use crate::events::EventType;
use crate::imap::{session::Session, FolderMeaning, Imap};
use crate::location;
use crate::log::LogExt;
use crate::message::MsgId;
use crate::smtp::{send_smtp_messages, Smtp};
use crate::sql;
use crate::tools::{self, duration_to_str, maybe_add_time_based_warnings, time, time_elapsed};
use crate::{location, spawn_named_task};

pub(crate) mod connectivity;

Expand Down Expand Up @@ -164,7 +164,7 @@ impl SchedulerState {
}

let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
spawn_named_task!("pause", async move {
rx.await.ok();
let mut inner = context.scheduler.inner.write().await;
match *inner {
Expand Down Expand Up @@ -855,7 +855,10 @@ impl Scheduler {
let (inbox_start_send, inbox_start_recv) = oneshot::channel();
let handle = {
let ctx = ctx.clone();
task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))
spawn_named_task!(
"inbox_loop",
inbox_loop(ctx, inbox_start_send, inbox_handlers)
)
};
let inbox = SchedBox {
meaning: FolderMeaning::Inbox,
Expand All @@ -872,7 +875,10 @@ impl Scheduler {
let (conn_state, handlers) = ImapConnectionState::new(ctx).await?;
let (start_send, start_recv) = oneshot::channel();
let ctx = ctx.clone();
let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning));
let handle = spawn_named_task!(
"simple_imap_loop",
simple_imap_loop(ctx, start_send, handlers, meaning)
);
oboxes.push(SchedBox {
meaning,
conn_state,
Expand All @@ -884,20 +890,20 @@ impl Scheduler {

let smtp_handle = {
let ctx = ctx.clone();
task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))
spawn_named_task!("smtp_loop", smtp_loop(ctx, smtp_start_send, smtp_handlers))
};
start_recvs.push(smtp_start_recv);

let ephemeral_handle = {
let ctx = ctx.clone();
task::spawn(async move {
spawn_named_task!("ephemeral_loop", async move {
ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
})
};

let location_handle = {
let ctx = ctx.clone();
task::spawn(async move {
spawn_named_task!("location_loop", async move {
location::location_loop(&ctx, location_interrupt_recv).await;
})
};
Expand Down
6 changes: 3 additions & 3 deletions src/smtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod send;
use anyhow::{bail, format_err, Context as _, Error, Result};
use async_smtp::response::{Category, Code, Detail};
use async_smtp::{EmailAddress, SmtpTransport};
use tokio::task;


use crate::chat::{add_info_msg_with_cmd, ChatId};
use crate::config::Config;
Expand All @@ -21,9 +21,9 @@ use crate::mimefactory::MimeFactory;
use crate::net::proxy::ProxyConfig;
use crate::net::session::SessionBufStream;
use crate::scheduler::connectivity::ConnectivityStore;
use crate::sql;
use crate::stock_str::unencrypted_email;
use crate::tools::{self, time_elapsed};
use crate::{spawn_named_task, sql};

#[derive(Default)]
pub(crate) struct Smtp {
Expand Down Expand Up @@ -56,7 +56,7 @@ impl Smtp {
// Closing connection with a QUIT command may take some time, especially if it's a
// stale connection and an attempt to send the command times out. Send a command in a
// separate task to avoid waiting for reply or timeout.
task::spawn(async move { transport.quit().await });
spawn_named_task!("disconnect SMTP", async move { transport.quit().await });
}
self.last_success = None;
}
Expand Down
Loading

0 comments on commit 56f7394

Please sign in to comment.