diff --git a/Cargo.lock b/Cargo.lock index 5a0a40d58a..b01dd41c64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2786,6 +2786,7 @@ dependencies = [ "activitypub_federation", "actix-cors", "actix-web", + "chrono", "clokwerk", "console-subscriber", "diesel", diff --git a/Cargo.toml b/Cargo.toml index 00b400dd12..49d176b423 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -147,4 +147,4 @@ rustls = { workspace = true } futures-util = { workspace = true } tokio-postgres = { workspace = true } tokio-postgres-rustls = { workspace = true } - +chrono = { workspace = true } \ No newline at end of file diff --git a/migrations/2023-06-24-185942_aggegates_published_indexes/down.sql b/migrations/2023-06-24-185942_aggegates_published_indexes/down.sql new file mode 100644 index 0000000000..fa7f7d48f0 --- /dev/null +++ b/migrations/2023-06-24-185942_aggegates_published_indexes/down.sql @@ -0,0 +1,2 @@ +drop index idx_comment_aggregates_published; +drop index idx_community_aggregates_published; diff --git a/migrations/2023-06-24-185942_aggegates_published_indexes/up.sql b/migrations/2023-06-24-185942_aggegates_published_indexes/up.sql new file mode 100644 index 0000000000..42230af109 --- /dev/null +++ b/migrations/2023-06-24-185942_aggegates_published_indexes/up.sql @@ -0,0 +1,4 @@ +-- Add indexes on published column (needed for hot_rank updates) + +create index idx_community_aggregates_published on community_aggregates (published desc); +create index idx_comment_aggregates_published on comment_aggregates (published desc); \ No newline at end of file diff --git a/src/scheduled_tasks.rs b/src/scheduled_tasks.rs index 891dca365d..c67dac0a41 100644 --- a/src/scheduled_tasks.rs +++ b/src/scheduled_tasks.rs @@ -1,28 +1,21 @@ +use chrono::NaiveDateTime; use clokwerk::{Scheduler, TimeUnits as CTimeUnits}; use diesel::{ dsl::{now, IntervalDsl}, + sql_types::{Integer, Timestamp}, Connection, ExpressionMethods, NullableExpressionMethods, QueryDsl, + QueryableByName, }; // Import week days and WeekDay use diesel::{sql_query, PgConnection, RunQueryDsl}; use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ - schema::{ - activity, - comment, - comment_aggregates, - community_aggregates, - community_person_ban, - instance, - person, - post, - post_aggregates, - }, + schema::{activity, comment, community_person_ban, instance, person, post}, source::instance::{Instance, InstanceForm}, - utils::{functions::hot_rank, naive_now, DELETED_REPLACEMENT_TEXT}, + utils::{naive_now, DELETED_REPLACEMENT_TEXT}, }; use lemmy_routes::nodeinfo::NodeInfo; use lemmy_utils::{error::LemmyError, REQWEST_TIMEOUT}; @@ -49,9 +42,9 @@ pub fn setup( update_banned_when_expired(&mut conn); }); - // Update hot ranks every 5 minutes + // Update hot ranks every 15 minutes let url = db_url.clone(); - scheduler.every(CTimeUnits::minutes(5)).run(move || { + scheduler.every(CTimeUnits::minutes(15)).run(move || { let mut conn = PgConnection::establish(&url).expect("could not establish connection"); update_hot_ranks(&mut conn, true); }); @@ -100,66 +93,92 @@ fn startup_jobs(db_url: &str) { } /// Update the hot_rank columns for the aggregates tables +/// Runs in batches until all necessary rows are updated once fn update_hot_ranks(conn: &mut PgConnection, last_week_only: bool) { - let mut post_update = diesel::update(post_aggregates::table).into_boxed(); - let mut comment_update = diesel::update(comment_aggregates::table).into_boxed(); - let mut community_update = diesel::update(community_aggregates::table).into_boxed(); - - // Only update for the last week of content - if last_week_only { + let process_start_time = if last_week_only { info!("Updating hot ranks for last week..."); - let last_week = now - diesel::dsl::IntervalDsl::weeks(1); - - post_update = post_update.filter(post_aggregates::published.gt(last_week)); - comment_update = comment_update.filter(comment_aggregates::published.gt(last_week)); - community_update = community_update.filter(community_aggregates::published.gt(last_week)); + naive_now() - chrono::Duration::days(7) } else { info!("Updating hot ranks for all history..."); - } + NaiveDateTime::from_timestamp_opt(0, 0).expect("0 timestamp creation") + }; - match post_update - .set(( - post_aggregates::hot_rank.eq(hot_rank(post_aggregates::score, post_aggregates::published)), - post_aggregates::hot_rank_active.eq(hot_rank( - post_aggregates::score, - post_aggregates::newest_comment_time_necro, - )), - )) - .execute(conn) - { - Ok(_) => {} - Err(e) => { - error!("Failed to update post_aggregates hot_ranks: {}", e) - } - } + process_hot_ranks_in_batches( + conn, + "post_aggregates", + "SET hot_rank = hot_rank(a.score, a.published), + hot_rank_active = hot_rank(a.score, a.newest_comment_time_necro)", + process_start_time, + ); + + process_hot_ranks_in_batches( + conn, + "comment_aggregates", + "SET hot_rank = hot_rank(a.score, a.published)", + process_start_time, + ); + + process_hot_ranks_in_batches( + conn, + "community_aggregates", + "SET hot_rank = hot_rank(a.subscribers, a.published)", + process_start_time, + ); + + info!("Finished hot ranks update!"); +} - match comment_update - .set(comment_aggregates::hot_rank.eq(hot_rank( - comment_aggregates::score, - comment_aggregates::published, - ))) - .execute(conn) - { - Ok(_) => {} - Err(e) => { - error!("Failed to update comment_aggregates hot_ranks: {}", e) - } - } +#[derive(QueryableByName)] +struct HotRanksUpdateResult { + #[diesel(sql_type = Timestamp)] + published: NaiveDateTime, +} - match community_update - .set(community_aggregates::hot_rank.eq(hot_rank( - community_aggregates::subscribers, - community_aggregates::published, - ))) - .execute(conn) - { - Ok(_) => { - info!("Done."); - } - Err(e) => { - error!("Failed to update community_aggregates hot_ranks: {}", e) +/// Runs the hot rank update query in batches until all rows after `process_start_time` have been +/// processed. +/// In `set_clause`, "a" will refer to the current aggregates table. +/// Locked rows are skipped in order to prevent deadlocks (they will likely get updated on the next +/// run) +fn process_hot_ranks_in_batches( + conn: &mut PgConnection, + table_name: &str, + set_clause: &str, + process_start_time: NaiveDateTime, +) { + let update_batch_size = 1000; // Bigger batches than this tend to cause seq scans + let mut previous_batch_result = Some(process_start_time); + while let Some(previous_batch_last_published) = previous_batch_result { + // Raw `sql_query` is used as a performance optimization - Diesel does not support doing this + // in a single query (neither as a CTE, nor using a subquery) + let result = sql_query(format!( + r#"WITH batch AS (SELECT a.id + FROM {aggregates_table} a + WHERE a.published > $1 + ORDER BY a.published + LIMIT $2 + FOR UPDATE SKIP LOCKED) + UPDATE {aggregates_table} a {set_clause} + FROM batch WHERE a.id = batch.id RETURNING a.published; + "#, + aggregates_table = table_name, + set_clause = set_clause + )) + .bind::(previous_batch_last_published) + .bind::(update_batch_size) + .get_results::(conn); + + match result { + Ok(updated_rows) => previous_batch_result = updated_rows.last().map(|row| row.published), + Err(e) => { + error!("Failed to update {} hot_ranks: {}", table_name, e); + break; + } } } + info!( + "Finished process_hot_ranks_in_batches execution for {}", + table_name + ); } /// Clear old activities (this table gets very large)