Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix update recency issue with expanded test coverage #63

Merged
merged 7 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ tokio-postgres = { version = "=0.7.5" }
futures = "0.3"

[dev-dependencies]
lightning = { version = "0.0.117", features = ["_test_utils"] }
lightning-rapid-gossip-sync = { version = "0.0.117" }

[profile.dev]
Expand Down
10 changes: 8 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use lightning::util::ser::Readable;
use lightning_block_sync::http::HttpEndpoint;
use tokio_postgres::Config;

pub(crate) const SCHEMA_VERSION: i32 = 12;
pub(crate) const SCHEMA_VERSION: i32 = 13;
pub(crate) const SYMLINK_GRANULARITY_INTERVAL: u32 = 3600 * 3; // three hours
pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
// generate symlinks based on a 3-hour-granularity
Expand Down Expand Up @@ -143,7 +143,7 @@ pub(crate) fn db_index_creation_query() -> &'static str {
CREATE INDEX IF NOT EXISTS channel_updates_scid_dir_seen_desc_with_id ON channel_updates(short_channel_id ASC, direction ASC, seen DESC) INCLUDE (id);
CREATE UNIQUE INDEX IF NOT EXISTS channel_updates_key ON channel_updates (short_channel_id, direction, timestamp);
CREATE INDEX IF NOT EXISTS channel_updates_seen ON channel_updates(seen);
CREATE INDEX IF NOT EXISTS channel_updates_timestamp_desc ON channel_updates(timestamp DESC);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you check that this is now unused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only tested the specific query that I now changed, but prepending short_channel_id ASC does make it no longer use that index.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, my question was if any other queries use that index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other three channel_update queries don't seem to be using it with my local test database, but my postgres seems to occasionally select indices distinct from yours, so you may wanna double-check your instance, too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mine doesn't seem to either.

CREATE INDEX IF NOT EXISTS channel_updates_scid_asc_timestamp_desc ON channel_updates(short_channel_id ASC, timestamp DESC);
"
}

Expand Down Expand Up @@ -282,6 +282,12 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client)
tx.execute("UPDATE config SET db_schema = 12 WHERE id = 1", &[]).await.unwrap();
tx.commit().await.unwrap();
}
if schema >= 1 && schema <= 12 {
let tx = client.transaction().await.unwrap();
tx.execute("DROP INDEX IF EXISTS channel_updates_timestamp_desc", &[]).await.unwrap();
tx.execute("UPDATE config SET db_schema = 13 WHERE id = 1", &[]).await.unwrap();
tx.commit().await.unwrap();
}
if schema <= 1 || schema > SCHEMA_VERSION {
panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION);
}
Expand Down
4 changes: 2 additions & 2 deletions src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {
counter.channel_announcements += 1;
}

let gossip_message = GossipMessage::ChannelAnnouncement(msg);
let gossip_message = GossipMessage::ChannelAnnouncement(msg, None);
if let Err(err) = self.sender.try_send(gossip_message) {
let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
Expand All @@ -73,7 +73,7 @@ impl<L: Deref + Clone + Send + Sync> GossipRouter<L> where L::Target: Logger {

fn new_channel_update(&self, msg: ChannelUpdate) {
self.counter.write().unwrap().channel_updates += 1;
let gossip_message = GossipMessage::ChannelUpdate(msg);
let gossip_message = GossipMessage::ChannelUpdate(msg, None);

if let Err(err) = self.sender.try_send(gossip_message) {
let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
Expand Down
2 changes: 1 addition & 1 deletion src/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, cl
SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
FROM channel_updates
WHERE seen >= TO_TIMESTAMP($1)
ORDER BY timestamp DESC
ORDER BY short_channel_id ASC, timestamp DESC
", [last_sync_timestamp_float]).await.unwrap();
let mut pinned_updates = Box::pin(intermediate_updates);
log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
Expand Down
34 changes: 29 additions & 5 deletions src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
}

match &gossip_message {
GossipMessage::ChannelAnnouncement(announcement) => {
GossipMessage::ChannelAnnouncement(announcement, _) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add the override field if we can't use it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't added the unit test for an old announcement yet that was gonna use it. I can do that now to make sure this does end up finding some use immediately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's alright, I just feel weird adding it freestanding but if you already have code queued up to use it we'll survive.

let scid = announcement.contents.short_channel_id as i64;

// start with the type prefix, which is already known a priori
Expand All @@ -127,7 +127,7 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
&announcement_signed
])).await.unwrap().unwrap();
}
GossipMessage::ChannelUpdate(update) => {
GossipMessage::ChannelUpdate(update, seen_override) => {
let scid = update.contents.short_channel_id as i64;

let timestamp = update.contents.timestamp as i64;
Expand All @@ -146,10 +146,11 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
let mut update_signed = Vec::new();
update.write(&mut update_signed).unwrap();

tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute("INSERT INTO channel_updates (\
let insertion_statement = if cfg!(test) {
"INSERT INTO channel_updates (\
short_channel_id, \
timestamp, \
seen, \
channel_flags, \
direction, \
disable, \
Expand All @@ -159,9 +160,32 @@ impl<L: Deref> GossipPersister<L> where L::Target: Logger {
fee_proportional_millionths, \
htlc_maximum_msat, \
blob_signed \
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING", &[
) VALUES ($1, $2, TO_TIMESTAMP($3), $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT DO NOTHING"
} else {
"INSERT INTO channel_updates (\
short_channel_id, \
timestamp, \
channel_flags, \
direction, \
disable, \
cltv_expiry_delta, \
htlc_minimum_msat, \
fee_base_msat, \
fee_proportional_millionths, \
htlc_maximum_msat, \
blob_signed \
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING"
};

// this may not be used outside test cfg
let _seen_timestamp = seen_override.unwrap_or(timestamp as u32) as f64;

tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client
.execute(insertion_statement, &[
&scid,
&timestamp,
#[cfg(test)]
&_seen_timestamp,
&(update.contents.flags as i16),
&direction,
&disable,
Expand Down
Loading
Loading