Skip to content

Commit

Permalink
Check for dead federated instances (fixes #2221) (#3427)
Browse files Browse the repository at this point in the history
* Check for dead federated instances (fixes #2221)

* move to apub crate, use timestamp

* make it compile

* clippy

* use moka to cache blocklists, dead instances, restore orig scheduled tasks

* remove leftover last_alive var

* error handling

* wip

* fix alive check for instances without nodeinfo, add coalesce

* clippy

* move federation blocklist cache to #3486

* unused deps
  • Loading branch information
Nutomic committed Jul 21, 2023
1 parent 3d3c927 commit 6d52c46
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 57 deletions.
31 changes: 28 additions & 3 deletions crates/apub/src/activities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ use activitypub_federation::{
};
use anyhow::anyhow;
use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::{newtypes::CommunityId, source::community::Community};
use lemmy_db_schema::{
newtypes::CommunityId,
source::{community::Community, instance::Instance},
};
use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView};
use lemmy_utils::error::LemmyError;
use moka::future::Cache;
use once_cell::sync::Lazy;
use serde::Serialize;
use std::ops::Deref;
use std::{ops::Deref, sync::Arc, time::Duration};
use tracing::info;
use url::{ParseError, Url};
use uuid::Uuid;
Expand All @@ -30,6 +35,10 @@ pub mod following;
pub mod unfederated;
pub mod voting;

/// Amount of time that the list of dead instances is cached. This is only updated once a day,
/// so there is no harm in caching it for a longer time.
pub static DEAD_INSTANCE_LIST_CACHE_DURATION: Duration = Duration::from_secs(30 * 60);

/// Checks that the specified Url actually identifies a Person (by fetching it), and that the person
/// doesn't have a site ban.
#[tracing::instrument(skip_all)]
Expand Down Expand Up @@ -150,14 +159,30 @@ async fn send_lemmy_activity<Activity, ActorT>(
data: &Data<LemmyContext>,
activity: Activity,
actor: &ActorT,
inbox: Vec<Url>,
mut inbox: Vec<Url>,
sensitive: bool,
) -> Result<(), LemmyError>
where
Activity: ActivityHandler + Serialize + Send + Sync + Clone,
ActorT: Actor,
Activity: ActivityHandler<Error = LemmyError>,
{
static CACHE: Lazy<Cache<(), Arc<Vec<String>>>> = Lazy::new(|| {
Cache::builder()
.max_capacity(1)
.time_to_live(DEAD_INSTANCE_LIST_CACHE_DURATION)
.build()
});
let dead_instances = CACHE
.try_get_with((), async {
Ok::<_, diesel::result::Error>(Arc::new(Instance::dead_instances(&mut data.pool()).await?))
})
.await?;

inbox.retain(|i| {
let domain = i.domain().expect("has domain").to_string();
!dead_instances.contains(&domain)
});
info!("Sending activity {}", activity.id().to_string());
let activity = WithContext::new(activity, CONTEXT.deref().clone());

Expand Down
29 changes: 28 additions & 1 deletion crates/db_schema/src/impls/instance.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
use crate::{
diesel::dsl::IntervalDsl,
newtypes::InstanceId,
schema::{federation_allowlist, federation_blocklist, instance},
source::instance::{Instance, InstanceForm},
utils::{get_conn, naive_now, DbPool},
};
use diesel::{dsl::insert_into, result::Error, ExpressionMethods, QueryDsl};
use diesel::{
dsl::{insert_into, now},
result::Error,
sql_types::{Nullable, Timestamp},
ExpressionMethods,
QueryDsl,
};
use diesel_async::{AsyncPgConnection, RunQueryDsl};

impl Instance {
Expand Down Expand Up @@ -52,6 +59,24 @@ impl Instance {
.execute(conn)
.await
}

pub async fn read_all(pool: &DbPool) -> Result<Vec<Instance>, Error> {
let conn = &mut get_conn(pool).await?;
instance::table
.select(instance::all_columns)
.get_results(conn)
.await
}

pub async fn dead_instances(pool: &DbPool) -> Result<Vec<String>, Error> {
let conn = &mut get_conn(pool).await?;
instance::table
.select(instance::domain)
.filter(coalesce(instance::updated, instance::published).lt(now - 3.days()))
.get_results(conn)
.await
}

#[cfg(test)]
pub async fn delete_all(pool: &DbPool) -> Result<usize, Error> {
let conn = &mut get_conn(pool).await?;
Expand Down Expand Up @@ -85,3 +110,5 @@ impl Instance {
.await
}
}

sql_function! { fn coalesce(x: Nullable<Timestamp>, y: Timestamp) -> Timestamp; }
1 change: 0 additions & 1 deletion crates/db_schema/src/impls/site.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ impl Site {
)
}

// TODO this needs fixed
pub async fn read_remote_sites(pool: &DbPool) -> Result<Vec<Self>, Error> {
let conn = &mut get_conn(pool).await?;
site.order_by(id).offset(1).get_results::<Self>(conn).await
Expand Down
15 changes: 8 additions & 7 deletions crates/db_schema/src/source/site.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::newtypes::{DbUrl, InstanceId, SiteId};
#[cfg(feature = "full")]
use crate::schema::site;
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;
#[cfg(feature = "full")]
Expand All @@ -18,8 +19,8 @@ pub struct Site {
pub name: String,
/// A sidebar for the site in markdown.
pub sidebar: Option<String>,
pub published: chrono::NaiveDateTime,
pub updated: Option<chrono::NaiveDateTime>,
pub published: NaiveDateTime,
pub updated: Option<NaiveDateTime>,
/// An icon URL.
pub icon: Option<DbUrl>,
/// A banner url.
Expand All @@ -29,7 +30,7 @@ pub struct Site {
/// The federated actor_id.
pub actor_id: DbUrl,
/// The time the site was last refreshed.
pub last_refreshed_at: chrono::NaiveDateTime,
pub last_refreshed_at: NaiveDateTime,
/// The site inbox
pub inbox_url: DbUrl,
pub private_key: Option<String>,
Expand All @@ -45,12 +46,12 @@ pub struct SiteInsertForm {
#[builder(!default)]
pub name: String,
pub sidebar: Option<String>,
pub updated: Option<chrono::NaiveDateTime>,
pub updated: Option<NaiveDateTime>,
pub icon: Option<DbUrl>,
pub banner: Option<DbUrl>,
pub description: Option<String>,
pub actor_id: Option<DbUrl>,
pub last_refreshed_at: Option<chrono::NaiveDateTime>,
pub last_refreshed_at: Option<NaiveDateTime>,
pub inbox_url: Option<DbUrl>,
pub private_key: Option<String>,
pub public_key: Option<String>,
Expand All @@ -65,13 +66,13 @@ pub struct SiteInsertForm {
pub struct SiteUpdateForm {
pub name: Option<String>,
pub sidebar: Option<Option<String>>,
pub updated: Option<Option<chrono::NaiveDateTime>>,
pub updated: Option<Option<NaiveDateTime>>,
// when you want to null out a column, you have to send Some(None)), since sending None means you just don't want to update that column.
pub icon: Option<Option<DbUrl>>,
pub banner: Option<Option<DbUrl>>,
pub description: Option<Option<String>>,
pub actor_id: Option<DbUrl>,
pub last_refreshed_at: Option<chrono::NaiveDateTime>,
pub last_refreshed_at: Option<NaiveDateTime>,
pub inbox_url: Option<DbUrl>,
pub private_key: Option<Option<String>>,
pub public_key: Option<String>,
Expand Down
98 changes: 53 additions & 45 deletions src/scheduled_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ use lemmy_db_schema::{
utils::{naive_now, DELETED_REPLACEMENT_TEXT},
};
use lemmy_routes::nodeinfo::NodeInfo;
use lemmy_utils::{error::LemmyError, REQWEST_TIMEOUT};
use lemmy_utils::{
error::{LemmyError, LemmyResult},
REQWEST_TIMEOUT,
};
use reqwest::blocking::Client;
use std::{thread, time::Duration};
use tracing::{error, info};
use tracing::{error, info, warn};

/// Schedules various cleanup tasks for lemmy in a background thread
pub fn setup(
Expand Down Expand Up @@ -79,7 +82,9 @@ pub fn setup(
// Update the Instance Software
scheduler.every(CTimeUnits::days(1)).run(move || {
let mut conn = PgConnection::establish(&db_url).expect("could not establish connection");
update_instance_software(&mut conn, &user_agent);
update_instance_software(&mut conn, &user_agent)
.map_err(|e| warn!("Failed to update instance software: {e}"))
.ok();
});

// Manually run the scheduler in an event loop
Expand Down Expand Up @@ -323,62 +328,65 @@ fn update_banned_when_expired(conn: &mut PgConnection) {
}

/// Updates the instance software and version
fn update_instance_software(conn: &mut PgConnection, user_agent: &str) {
///
/// TODO: this should be async
/// TODO: if instance has been dead for a long time, it should be checked less frequently
fn update_instance_software(conn: &mut PgConnection, user_agent: &str) -> LemmyResult<()> {
info!("Updating instances software and versions...");

let client = match Client::builder()
let client = Client::builder()
.user_agent(user_agent)
.timeout(REQWEST_TIMEOUT)
.build()
{
Ok(client) => client,
Err(e) => {
error!("Failed to build reqwest client: {}", e);
return;
}
};
.build()?;

let instances = match instance::table.get_results::<Instance>(conn) {
Ok(instances) => instances,
Err(e) => {
error!("Failed to get instances: {}", e);
return;
}
};
let instances = instance::table.get_results::<Instance>(conn)?;

for instance in instances {
let node_info_url = format!("https://{}/nodeinfo/2.0.json", instance.domain);

// Skip it if it can't connect
let res = client
.get(&node_info_url)
.send()
.ok()
.and_then(|t| t.json::<NodeInfo>().ok());

if let Some(node_info) = res {
let software = node_info.software.as_ref();
let form = InstanceForm::builder()
.domain(instance.domain)
.software(software.and_then(|s| s.name.clone()))
.version(software.and_then(|s| s.version.clone()))
.updated(Some(naive_now()))
.build();

match diesel::update(instance::table.find(instance.id))
.set(form)
.execute(conn)
{
Ok(_) => {
info!("Done.");
// The `updated` column is used to check if instances are alive. If it is more than three days
// in the past, no outgoing activities will be sent to that instance. However not every
// Fediverse instance has a valid Nodeinfo endpoint (its not required for Activitypub). That's
// why we always need to mark instances as updated if they are alive.
let default_form = InstanceForm::builder()
.domain(instance.domain.clone())
.updated(Some(naive_now()))
.build();
let form = match client.get(&node_info_url).send() {
Ok(res) if res.status().is_client_error() => {
// Instance doesnt have nodeinfo but sent a response, consider it alive
Some(default_form)
}
Ok(res) => match res.json::<NodeInfo>() {
Ok(node_info) => {
// Instance sent valid nodeinfo, write it to db
Some(
InstanceForm::builder()
.domain(instance.domain)
.updated(Some(naive_now()))
.software(node_info.software.and_then(|s| s.name))
.version(node_info.version.clone())
.build(),
)
}
Err(e) => {
error!("Failed to update site instance software: {}", e);
return;
Err(_) => {
// No valid nodeinfo but valid HTTP response, consider instance alive
Some(default_form)
}
},
Err(_) => {
// dead instance, do nothing
None
}
};
if let Some(form) = form {
diesel::update(instance::table.find(instance.id))
.set(form)
.execute(conn)?;
}
}
info!("Finished updating instances software and versions...");
Ok(())
}

#[cfg(test)]
Expand Down

0 comments on commit 6d52c46

Please sign in to comment.