Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hot rank update batching + deadlock avoidance #3175

Merged
merged 1 commit into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,4 @@ rustls = { workspace = true }
futures-util = { workspace = true }
tokio-postgres = { workspace = true }
tokio-postgres-rustls = { workspace = true }

chrono = { workspace = true }
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
drop index idx_comment_aggregates_published;
drop index idx_community_aggregates_published;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Add indexes on published column (needed for hot_rank updates)

create index idx_community_aggregates_published on community_aggregates (published desc);
create index idx_comment_aggregates_published on comment_aggregates (published desc);
149 changes: 84 additions & 65 deletions src/scheduled_tasks.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
use chrono::NaiveDateTime;
use clokwerk::{Scheduler, TimeUnits as CTimeUnits};
use diesel::{
dsl::{now, IntervalDsl},
sql_types::{Integer, Timestamp},
Connection,
ExpressionMethods,
NullableExpressionMethods,
QueryDsl,
QueryableByName,
};
// Import week days and WeekDay
use diesel::{sql_query, PgConnection, RunQueryDsl};
use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::{
schema::{
activity,
comment,
comment_aggregates,
community_aggregates,
community_person_ban,
instance,
person,
post,
post_aggregates,
},
schema::{activity, comment, community_person_ban, instance, person, post},
source::instance::{Instance, InstanceForm},
utils::{functions::hot_rank, naive_now, DELETED_REPLACEMENT_TEXT},
utils::{naive_now, DELETED_REPLACEMENT_TEXT},
};
use lemmy_routes::nodeinfo::NodeInfo;
use lemmy_utils::{error::LemmyError, REQWEST_TIMEOUT};
Expand All @@ -49,9 +42,9 @@ pub fn setup(
update_banned_when_expired(&mut conn);
});

// Update hot ranks every 5 minutes
// Update hot ranks every 15 minutes
let url = db_url.clone();
scheduler.every(CTimeUnits::minutes(5)).run(move || {
scheduler.every(CTimeUnits::minutes(15)).run(move || {
let mut conn = PgConnection::establish(&url).expect("could not establish connection");
update_hot_ranks(&mut conn, true);
});
Expand Down Expand Up @@ -100,66 +93,92 @@ fn startup_jobs(db_url: &str) {
}

/// Update the hot_rank columns for the aggregates tables
/// Runs in batches until all necessary rows are updated once
fn update_hot_ranks(conn: &mut PgConnection, last_week_only: bool) {
let mut post_update = diesel::update(post_aggregates::table).into_boxed();
let mut comment_update = diesel::update(comment_aggregates::table).into_boxed();
let mut community_update = diesel::update(community_aggregates::table).into_boxed();

// Only update for the last week of content
if last_week_only {
let process_start_time = if last_week_only {
info!("Updating hot ranks for last week...");
let last_week = now - diesel::dsl::IntervalDsl::weeks(1);

post_update = post_update.filter(post_aggregates::published.gt(last_week));
comment_update = comment_update.filter(comment_aggregates::published.gt(last_week));
community_update = community_update.filter(community_aggregates::published.gt(last_week));
naive_now() - chrono::Duration::days(7)
} else {
info!("Updating hot ranks for all history...");
}
NaiveDateTime::from_timestamp_opt(0, 0).expect("0 timestamp creation")
};

match post_update
.set((
post_aggregates::hot_rank.eq(hot_rank(post_aggregates::score, post_aggregates::published)),
post_aggregates::hot_rank_active.eq(hot_rank(
post_aggregates::score,
post_aggregates::newest_comment_time_necro,
)),
))
.execute(conn)
{
Ok(_) => {}
Err(e) => {
error!("Failed to update post_aggregates hot_ranks: {}", e)
}
}
process_hot_ranks_in_batches(
conn,
"post_aggregates",
"SET hot_rank = hot_rank(a.score, a.published),
hot_rank_active = hot_rank(a.score, a.newest_comment_time_necro)",
process_start_time,
);

process_hot_ranks_in_batches(
conn,
"comment_aggregates",
"SET hot_rank = hot_rank(a.score, a.published)",
process_start_time,
);

process_hot_ranks_in_batches(
conn,
"community_aggregates",
"SET hot_rank = hot_rank(a.subscribers, a.published)",
process_start_time,
);

info!("Finished hot ranks update!");
}

match comment_update
.set(comment_aggregates::hot_rank.eq(hot_rank(
comment_aggregates::score,
comment_aggregates::published,
)))
.execute(conn)
{
Ok(_) => {}
Err(e) => {
error!("Failed to update comment_aggregates hot_ranks: {}", e)
}
}
#[derive(QueryableByName)]
struct HotRanksUpdateResult {
#[diesel(sql_type = Timestamp)]
published: NaiveDateTime,
}

match community_update
.set(community_aggregates::hot_rank.eq(hot_rank(
community_aggregates::subscribers,
community_aggregates::published,
)))
.execute(conn)
{
Ok(_) => {
info!("Done.");
}
Err(e) => {
error!("Failed to update community_aggregates hot_ranks: {}", e)
/// Runs the hot rank update query in batches until all rows after `process_start_time` have been
/// processed.
/// In `set_clause`, "a" will refer to the current aggregates table.
/// Locked rows are skipped in order to prevent deadlocks (they will likely get updated on the next
/// run)
fn process_hot_ranks_in_batches(
conn: &mut PgConnection,
table_name: &str,
set_clause: &str,
process_start_time: NaiveDateTime,
) {
let update_batch_size = 1000; // Bigger batches than this tend to cause seq scans
let mut previous_batch_result = Some(process_start_time);
while let Some(previous_batch_last_published) = previous_batch_result {
// Raw `sql_query` is used as a performance optimization - Diesel does not support doing this
// in a single query (neither as a CTE, nor using a subquery)
let result = sql_query(format!(
r#"WITH batch AS (SELECT a.id
FROM {aggregates_table} a
WHERE a.published > $1
ORDER BY a.published
dessalines marked this conversation as resolved.
Show resolved Hide resolved
LIMIT $2
FOR UPDATE SKIP LOCKED)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, had no idea about that one. I wonder if diesel has this available so we can use it on other scheduled jobs.

Copy link
Collaborator Author

@sunaurus sunaurus Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Diesel has .for_update() and .skip_locked(), but they have at least one significan't limitation: they can't be used together with .into_boxed()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. @Nutomic would these be potentially useful in any apub jobs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be useful when using a table as a kind of job queue? Might be useful for #2142 once we implement that.

UPDATE {aggregates_table} a {set_clause}
FROM batch WHERE a.id = batch.id RETURNING a.published;
"#,
aggregates_table = table_name,
set_clause = set_clause
))
.bind::<Timestamp, _>(previous_batch_last_published)
.bind::<Integer, _>(update_batch_size)
.get_results::<HotRanksUpdateResult>(conn);

match result {
Ok(updated_rows) => previous_batch_result = updated_rows.last().map(|row| row.published),
Err(e) => {
error!("Failed to update {} hot_ranks: {}", table_name, e);
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it really stop processing new batches if any one batch threw an error? Seems unnecessary.

}
}
}
info!(
"Finished process_hot_ranks_in_batches execution for {}",
table_name
);
}

/// Clear old activities (this table gets very large)
Expand Down