Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check for dead federated instances (fixes #2221) #3427

Merged
merged 15 commits into from
Jul 13, 2023
Merged
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ url_serde = "0.2.0"
reqwest = { version = "0.11.18", features = ["json", "blocking"] }
reqwest-middleware = "0.2.2"
reqwest-tracing = "0.4.4"
clokwerk = "0.3.5"
clokwerk = "0.4.0"
doku = { version = "0.21.1", features = ["url-2"] }
bcrypt = "0.13.0"
chrono = { version = "0.4.26", features = ["serde"], default-features = false }
Expand Down
31 changes: 28 additions & 3 deletions crates/apub/src/activities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ use activitypub_federation::{
};
use anyhow::anyhow;
use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::{newtypes::CommunityId, source::community::Community};
use lemmy_db_schema::{
newtypes::CommunityId,
source::{community::Community, instance::Instance},
};
use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView};
use lemmy_utils::error::LemmyError;
use moka::future::Cache;
use once_cell::sync::Lazy;
use serde::Serialize;
use std::ops::Deref;
use std::{ops::Deref, sync::Arc, time::Duration};
use tracing::info;
use url::{ParseError, Url};
use uuid::Uuid;
Expand All @@ -30,6 +35,10 @@ pub mod following;
pub mod unfederated;
pub mod voting;

/// Amount of time that the list of dead instances is cached. This is only updated once a day,
/// so there is no harm in caching it for a longer time.
pub static DEAD_INSTANCE_LIST_CACHE_DURATION: Duration = Duration::from_secs(30 * 60);

/// Checks that the specified Url actually identifies a Person (by fetching it), and that the person
/// doesn't have a site ban.
#[tracing::instrument(skip_all)]
Expand Down Expand Up @@ -150,14 +159,30 @@ async fn send_lemmy_activity<Activity, ActorT>(
data: &Data<LemmyContext>,
activity: Activity,
actor: &ActorT,
inbox: Vec<Url>,
mut inbox: Vec<Url>,
sensitive: bool,
) -> Result<(), LemmyError>
where
Activity: ActivityHandler + Serialize + Send + Sync + Clone,
ActorT: Actor,
Activity: ActivityHandler<Error = LemmyError>,
{
static CACHE: Lazy<Cache<(), Arc<Vec<String>>>> = Lazy::new(|| {
Cache::builder()
.max_capacity(1)
.time_to_live(DEAD_INSTANCE_LIST_CACHE_DURATION)
.build()
});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bit weird to use caches with capacity one and no key, but seems like the easiest way to implement this.

Copy link
Collaborator

@phiresky phiresky Jul 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to mention that when writing my other PR, I accidentally made it construct a new whole moka cache for every single incoming event (so 1000s per second) and insert a single value, and it didn't negatively affect performance at all. Just as a reference that constructing tiny moka caches is probably fine regarding performance (if maybe not code beauty).

let dead_instances = CACHE
.try_get_with((), async {
Ok::<_, diesel::result::Error>(Arc::new(Instance::dead_instances(data.pool()).await?))
})
.await?;

inbox.retain(|i| {
let domain = i.domain().expect("has domain").to_string();
!dead_instances.contains(&domain)
});
info!("Sending activity {}", activity.id().to_string());
let activity = WithContext::new(activity, CONTEXT.deref().clone());

Expand Down
30 changes: 29 additions & 1 deletion crates/db_schema/src/impls/instance.rs
dessalines marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
use crate::{
diesel::dsl::IntervalDsl,
newtypes::InstanceId,
schema::{federation_allowlist, federation_blocklist, instance},
source::instance::{Instance, InstanceForm},
utils::{get_conn, naive_now, DbPool},
};
use diesel::{dsl::insert_into, result::Error, ExpressionMethods, QueryDsl};
use diesel::{
dsl::{insert_into, now},
result::Error,
sql_types::{Nullable, Timestamp},
ExpressionMethods,
QueryDsl,
};
use diesel_async::{AsyncPgConnection, RunQueryDsl};

impl Instance {
Expand Down Expand Up @@ -46,12 +53,31 @@ impl Instance {
let conn = &mut get_conn(pool).await?;
Self::read_or_create_with_conn(conn, domain).await
}

pub async fn delete(pool: &DbPool, instance_id: InstanceId) -> Result<usize, Error> {
let conn = &mut get_conn(pool).await?;
diesel::delete(instance::table.find(instance_id))
.execute(conn)
.await
}

pub async fn read_all(pool: &DbPool) -> Result<Vec<Instance>, Error> {
let conn = &mut get_conn(pool).await?;
instance::table
.select(instance::all_columns)
.get_results(conn)
.await
}

pub async fn dead_instances(pool: &DbPool) -> Result<Vec<String>, Error> {
let conn = &mut get_conn(pool).await?;
instance::table
.select(instance::domain)
.filter(coalesce(instance::updated, instance::published).lt(now - 3.days()))
.get_results(conn)
.await
}

#[cfg(test)]
pub async fn delete_all(pool: &DbPool) -> Result<usize, Error> {
let conn = &mut get_conn(pool).await?;
Expand Down Expand Up @@ -85,3 +111,5 @@ impl Instance {
.await
}
}

sql_function! { fn coalesce(x: Nullable<Timestamp>, y: Timestamp) -> Timestamp; }
1 change: 0 additions & 1 deletion crates/db_schema/src/impls/site.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ impl Site {
)
}

// TODO this needs fixed
pub async fn read_remote_sites(pool: &DbPool) -> Result<Vec<Self>, Error> {
let conn = &mut get_conn(pool).await?;
site.order_by(id).offset(1).get_results::<Self>(conn).await
Expand Down
15 changes: 8 additions & 7 deletions crates/db_schema/src/source/site.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::newtypes::{DbUrl, InstanceId, SiteId};
#[cfg(feature = "full")]
use crate::schema::site;
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;
#[cfg(feature = "full")]
Expand All @@ -18,8 +19,8 @@ pub struct Site {
pub name: String,
/// A sidebar for the site in markdown.
pub sidebar: Option<String>,
pub published: chrono::NaiveDateTime,
pub updated: Option<chrono::NaiveDateTime>,
pub published: NaiveDateTime,
pub updated: Option<NaiveDateTime>,
/// An icon URL.
pub icon: Option<DbUrl>,
/// A banner url.
Expand All @@ -29,7 +30,7 @@ pub struct Site {
/// The federated actor_id.
pub actor_id: DbUrl,
/// The time the site was last refreshed.
pub last_refreshed_at: chrono::NaiveDateTime,
pub last_refreshed_at: NaiveDateTime,
/// The site inbox
pub inbox_url: DbUrl,
pub private_key: Option<String>,
Expand All @@ -45,12 +46,12 @@ pub struct SiteInsertForm {
#[builder(!default)]
pub name: String,
pub sidebar: Option<String>,
pub updated: Option<chrono::NaiveDateTime>,
pub updated: Option<NaiveDateTime>,
pub icon: Option<DbUrl>,
pub banner: Option<DbUrl>,
pub description: Option<String>,
pub actor_id: Option<DbUrl>,
pub last_refreshed_at: Option<chrono::NaiveDateTime>,
pub last_refreshed_at: Option<NaiveDateTime>,
pub inbox_url: Option<DbUrl>,
pub private_key: Option<String>,
pub public_key: Option<String>,
Expand All @@ -65,13 +66,13 @@ pub struct SiteInsertForm {
pub struct SiteUpdateForm {
pub name: Option<String>,
pub sidebar: Option<Option<String>>,
pub updated: Option<Option<chrono::NaiveDateTime>>,
pub updated: Option<Option<NaiveDateTime>>,
// when you want to null out a column, you have to send Some(None)), since sending None means you just don't want to update that column.
pub icon: Option<Option<DbUrl>>,
pub banner: Option<Option<DbUrl>>,
pub description: Option<Option<String>>,
pub actor_id: Option<DbUrl>,
pub last_refreshed_at: Option<chrono::NaiveDateTime>,
pub last_refreshed_at: Option<NaiveDateTime>,
pub inbox_url: Option<DbUrl>,
pub private_key: Option<Option<String>>,
pub public_key: Option<String>,
Expand Down
98 changes: 53 additions & 45 deletions src/scheduled_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ use lemmy_db_schema::{
utils::{naive_now, DELETED_REPLACEMENT_TEXT},
};
use lemmy_routes::nodeinfo::NodeInfo;
use lemmy_utils::{error::LemmyError, REQWEST_TIMEOUT};
use lemmy_utils::{
error::{LemmyError, LemmyResult},
REQWEST_TIMEOUT,
};
use reqwest::blocking::Client;
use std::{thread, time::Duration};
use tracing::{error, info};
use tracing::{error, info, warn};

/// Schedules various cleanup tasks for lemmy in a background thread
pub fn setup(
Expand Down Expand Up @@ -79,7 +82,9 @@ pub fn setup(
// Update the Instance Software
scheduler.every(CTimeUnits::days(1)).run(move || {
let mut conn = PgConnection::establish(&db_url).expect("could not establish connection");
update_instance_software(&mut conn, &user_agent);
update_instance_software(&mut conn, &user_agent)
.map_err(|e| warn!("Failed to update instance software: {e}"))
.ok();
});

// Manually run the scheduler in an event loop
Expand Down Expand Up @@ -323,62 +328,65 @@ fn update_banned_when_expired(conn: &mut PgConnection) {
}

/// Updates the instance software and version
fn update_instance_software(conn: &mut PgConnection, user_agent: &str) {
///
/// TODO: this should be async
/// TODO: if instance has been dead for a long time, it should be checked less frequently
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this one, in the select below, you could do let instances = instance::table.filter(coalesce(updated, published).gt(now - 1.months())

Even better, would be to add this as alive_instances in impls/instance.rs

Another possibility, would be to recheck the alive_instances every day, but only re-check all of them (even previously dead ones) every month. Up to you.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or check old instances using random probability, eg in 1% of all checks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway this can be improved later, no need to include it in this PR.

fn update_instance_software(conn: &mut PgConnection, user_agent: &str) -> LemmyResult<()> {
info!("Updating instances software and versions...");

let client = match Client::builder()
let client = Client::builder()
.user_agent(user_agent)
.timeout(REQWEST_TIMEOUT)
.build()
{
Ok(client) => client,
Err(e) => {
error!("Failed to build reqwest client: {}", e);
return;
}
};
.build()?;

let instances = match instance::table.get_results::<Instance>(conn) {
Ok(instances) => instances,
Err(e) => {
error!("Failed to get instances: {}", e);
return;
}
};
let instances = instance::table.get_results::<Instance>(conn)?;

for instance in instances {
let node_info_url = format!("https://{}/nodeinfo/2.0.json", instance.domain);

// Skip it if it can't connect
let res = client
.get(&node_info_url)
.send()
.ok()
.and_then(|t| t.json::<NodeInfo>().ok());

if let Some(node_info) = res {
let software = node_info.software.as_ref();
let form = InstanceForm::builder()
.domain(instance.domain)
.software(software.and_then(|s| s.name.clone()))
.version(software.and_then(|s| s.version.clone()))
.updated(Some(naive_now()))
.build();

match diesel::update(instance::table.find(instance.id))
.set(form)
.execute(conn)
{
Ok(_) => {
info!("Done.");
// The `updated` column is used to check if instances are alive. If it is more than three days
// in the past, no outgoing activities will be sent to that instance. However not every
// Fediverse instance has a valid Nodeinfo endpoint (its not required for Activitypub). That's
// why we always need to mark instances as updated if they are alive.
let default_form = InstanceForm::builder()
.domain(instance.domain.clone())
.updated(Some(naive_now()))
.build();
let form = match client.get(&node_info_url).send() {
Ok(res) if res.status().is_client_error() => {
// Instance doesnt have nodeinfo but sent a response, consider it alive
Some(default_form)
}
Ok(res) => match res.json::<NodeInfo>() {
Ok(node_info) => {
// Instance sent valid nodeinfo, write it to db
Some(
InstanceForm::builder()
.domain(instance.domain)
.updated(Some(naive_now()))
.software(node_info.software.and_then(|s| s.name))
.version(node_info.version.clone())
.build(),
)
}
Err(e) => {
error!("Failed to update site instance software: {}", e);
return;
Err(_) => {
// No valid nodeinfo but valid HTTP response, consider instance alive
Some(default_form)
}
},
Err(_) => {
// dead instance, do nothing
None
}
};
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is quite confusing, open for suggestions how to simplify it.

Copy link

@ciscprocess ciscprocess Jul 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like this? I'm new to lemmy and rust so apologies if it is not appropriate for me to post this here. It explicitly sets default_view on HTTP 500 but I suspect that would happen anyway with the OG code on deserialization failure. Haven't tested this for correctness, but maybe it can be a template for something more readable?

    let form_result = client.get(&node_info_url).send()
      .map(|response| {
        response.error_for_status()
          .and_then(|response|  response.json::<NodeInfo>())
          .map_or(default_form, |node_info| {
            InstanceForm::builder()
              .domain(instance.domain)
              .updated(Some(naive_now()))
              .software(node_info.software.and_then(|s| s.name))
              .version(node_info.version.clone())
              .build()
          })
      });

      if let Ok(form) = form_result {
        diesel::update(instance::table.find(instance.id))
          .set(form)
          .execute(conn)?;
      }

Edit: Or perhaps even this.

    let form_result = client.get(&node_info_url).send()
      .and_then(|response| response.error_for_status())
      .and_then(|response| response.json::<NodeInfo>())
      .map(|node_info| InstanceForm::builder()
                .domain(instance.domain)
                .updated(Some(naive_now()))
                .software(node_info.software.and_then(|s| s.name))
                .version(node_info.version.clone())
                .build())
      .or_else(|err| if err.is_status() { Ok(default_form) } else { Err(err) });

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave this a try but feel like its getting even more confusing. So I will leave it as is.

if let Some(form) = form {
diesel::update(instance::table.find(instance.id))
.set(form)
.execute(conn)?;
}
}
info!("Finished updating instances software and versions...");
Ok(())
}

#[cfg(test)]
Expand Down