diff --git a/Cargo.toml b/Cargo.toml index 99aeed7..c352d0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,13 +5,16 @@ edition = "2021" [dependencies] bitcoin = "0.29" -lightning = { version = "0.0.116" } -lightning-block-sync = { version = "0.0.116", features=["rest-client"] } -lightning-net-tokio = { version = "0.0.116" } +lightning = { version = "0.0.117" } +lightning-block-sync = { version = "0.0.117", features=["rest-client"] } +lightning-net-tokio = { version = "0.0.117" } tokio = { version = "1.25", features = ["full"] } -tokio-postgres = { version="=0.7.5" } +tokio-postgres = { version = "=0.7.5" } futures = "0.3" +[dev-dependencies] +lightning-rapid-gossip-sync = { version = "0.0.117" } + [profile.dev] panic = "abort" diff --git a/src/config.rs b/src/config.rs index 1de2482..0655aa7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -74,13 +74,19 @@ pub(crate) fn cache_path() -> String { pub(crate) fn db_connection_config() -> Config { let mut config = Config::new(); - let host = env::var("RAPID_GOSSIP_SYNC_SERVER_DB_HOST").unwrap_or("localhost".to_string()); - let user = env::var("RAPID_GOSSIP_SYNC_SERVER_DB_USER").unwrap_or("alice".to_string()); - let db = env::var("RAPID_GOSSIP_SYNC_SERVER_DB_NAME").unwrap_or("ln_graph_sync".to_string()); + let env_name_prefix = if cfg!(test) { + "RAPID_GOSSIP_TEST_DB" + } else { + "RAPID_GOSSIP_SYNC_SERVER_DB" + }; + + let host = env::var(format!("{}{}", env_name_prefix, "_HOST")).unwrap_or("localhost".to_string()); + let user = env::var(format!("{}{}", env_name_prefix, "_USER")).unwrap_or("alice".to_string()); + let db = env::var(format!("{}{}", env_name_prefix, "_NAME")).unwrap_or("ln_graph_sync".to_string()); config.host(&host); config.user(&user); config.dbname(&db); - if let Ok(password) = env::var("RAPID_GOSSIP_SYNC_SERVER_DB_PASSWORD") { + if let Ok(password) = env::var(format!("{}{}", env_name_prefix, "_PASSWORD")) { config.password(&password); } config diff --git a/src/lib.rs b/src/lib.rs index f56aca2..1852911 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,6 +41,9 @@ mod verifier; pub mod types; +#[cfg(test)] +mod tests; + /// The purpose of this prefix is to identify the serialization format, should other rapid gossip /// sync formats arise in the future. /// @@ -125,6 +128,14 @@ pub(crate) async fn connect_to_db() -> Client { } }); + #[cfg(test)] + { + let schema_name = tests::db_test_schema(); + let schema_creation_command = format!("CREATE SCHEMA IF NOT EXISTS {}", schema_name); + client.execute(&schema_creation_command, &[]).await.unwrap(); + client.execute(&format!("SET search_path TO {}", schema_name), &[]).await.unwrap(); + } + client.execute("set time zone UTC", &[]).await.unwrap(); client } diff --git a/src/tests/mod.rs b/src/tests/mod.rs new file mode 100644 index 0000000..b62406d --- /dev/null +++ b/src/tests/mod.rs @@ -0,0 +1,216 @@ +//! Multi-module tests that use database fixtures + +use std::cell::RefCell; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use bitcoin::{BlockHash, Network}; +use bitcoin::secp256k1::ecdsa::Signature; +use bitcoin::secp256k1::{Secp256k1, SecretKey}; +use bitcoin::hashes::Hash; +use bitcoin::hashes::hex::ToHex; +use bitcoin::hashes::sha256d::Hash as Sha256dHash; +use lightning::ln::features::ChannelFeatures; +use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate}; +use lightning::routing::gossip::{NetworkGraph, NodeId}; +use lightning::util::ser::Writeable; +use lightning_rapid_gossip_sync::RapidGossipSync; +use crate::{config, serialize_delta}; +use crate::persistence::GossipPersister; +use crate::types::{GossipMessage, tests::TestLogger}; + +const CLIENT_BACKDATE_INTERVAL: u32 = 3600 * 24 * 7; // client backdates RGS by a week + +thread_local! { + static DB_TEST_SCHEMA: RefCell> = RefCell::new(None); + static IS_TEST_SCHEMA_CLEAN: RefCell> = RefCell::new(None); +} + +fn blank_signature() -> Signature { + Signature::from_compact(&[0u8; 64]).unwrap() +} + +fn genesis_hash() -> BlockHash { + bitcoin::blockdata::constants::genesis_block(Network::Bitcoin).block_hash() +} + +fn current_time() -> u32 { + SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as u32 +} + +pub(crate) fn db_test_schema() -> String { + DB_TEST_SCHEMA.with(|suffix_reference| { + let mut suffix_option = suffix_reference.borrow(); + suffix_option.as_ref().unwrap().clone() + }) +} + +fn generate_announcement(short_channel_id: u64) -> ChannelAnnouncement { + let secp_context = Secp256k1::new(); + + let random_private_key_1 = SecretKey::from_slice(&[1; 32]).unwrap(); + let random_public_key_1 = random_private_key_1.public_key(&secp_context); + let node_id_1 = NodeId::from_pubkey(&random_public_key_1); + + let random_private_key_2 = SecretKey::from_slice(&[2; 32]).unwrap(); + let random_public_key_2 = random_private_key_2.public_key(&secp_context); + let node_id_2 = NodeId::from_pubkey(&random_public_key_2); + + let announcement = UnsignedChannelAnnouncement { + features: ChannelFeatures::empty(), + chain_hash: genesis_hash(), + short_channel_id, + node_id_1, + node_id_2, + bitcoin_key_1: node_id_1, + bitcoin_key_2: node_id_2, + excess_data: vec![], + }; + + let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap(); + let node_signature_1 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_1); + let node_signature_2 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_2); + + ChannelAnnouncement { + node_signature_1, + node_signature_2, + bitcoin_signature_1: node_signature_1, + bitcoin_signature_2: node_signature_2, + contents: announcement, + } +} + +fn generate_update(scid: u64, direction: bool, timestamp: u32, expiry_delta: u16, min_msat: u64, max_msat: u64, base_msat: u32, fee_rate: u32) -> ChannelUpdate { + let flag_mask = if direction { 1 } else { 0 }; + ChannelUpdate { + signature: blank_signature(), + contents: UnsignedChannelUpdate { + chain_hash: genesis_hash(), + short_channel_id: scid, + timestamp, + flags: 0 | flag_mask, + cltv_expiry_delta: expiry_delta, + htlc_minimum_msat: min_msat, + htlc_maximum_msat: max_msat, + fee_base_msat: base_msat, + fee_proportional_millionths: fee_rate, + excess_data: vec![], + }, + } +} + +struct SchemaSanitizer {} + +impl SchemaSanitizer { + fn new() -> Self { + IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| { + let mut is_clean_option = cleanliness_reference.borrow_mut(); + *is_clean_option = Some(false); + }); + + DB_TEST_SCHEMA.with(|suffix_reference| { + let mut suffix_option = suffix_reference.borrow_mut(); + let current_time = SystemTime::now(); + let unix_time = current_time.duration_since(UNIX_EPOCH).expect("Time went backwards"); + let timestamp_seconds = unix_time.as_secs(); + let timestamp_nanos = unix_time.as_nanos(); + let preimage = format!("{}", timestamp_nanos); + let suffix = Sha256dHash::hash(preimage.as_bytes()).into_inner().to_hex(); + // the schema must start with a letter + let schema = format!("test_{}_{}", timestamp_seconds, suffix); + *suffix_option = Some(schema); + }); + + return Self {}; + } +} + +impl Drop for SchemaSanitizer { + fn drop(&mut self) { + IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| { + let is_clean_option = cleanliness_reference.borrow(); + if let Some(is_clean) = *is_clean_option { + assert_eq!(is_clean, true); + } + }); + } +} + + +async fn clean_test_db() { + let client = crate::connect_to_db().await; + let schema = db_test_schema(); + client.execute(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema), &[]).await.unwrap(); + IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| { + let mut is_clean_option = cleanliness_reference.borrow_mut(); + *is_clean_option = Some(true); + }); +} + +#[tokio::test] +async fn test_trivial_setup() { + let _sanitizer = SchemaSanitizer::new(); + let logger = Arc::new(TestLogger::new()); + let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let network_graph_arc = Arc::new(network_graph); + let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()); + + let short_channel_id = 1; + let timestamp = current_time() - 10; + println!("timestamp: {}", timestamp); + + { // seed the db + let announcement = generate_announcement(short_channel_id); + let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0); + let update_2 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 10, 0); + + network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); + network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); + network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); + + receiver.send(GossipMessage::ChannelAnnouncement(announcement)).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_1)).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_2)).await.unwrap(); + drop(receiver); + persister.persist_gossip().await; + } + + let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await; + logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1); + clean_test_db().await; + + let channel_count = network_graph_arc.read_only().channels().len(); + + assert_eq!(channel_count, 1); + assert_eq!(serialization.message_count, 3); + assert_eq!(serialization.announcement_count, 1); + assert_eq!(serialization.update_count, 2); + + let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone()); + let client_graph_arc = Arc::new(client_graph); + let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone()); + let update_result = rgs.update_network_graph(&serialization.data).unwrap(); + println!("update result: {}", update_result); + // the update result must be a multiple of our snapshot granularity + assert_eq!(update_result % config::snapshot_generation_interval(), 0); + assert!(update_result < timestamp); + + let timestamp_delta = timestamp - update_result; + println!("timestamp delta: {}", timestamp_delta); + assert!(timestamp_delta < config::snapshot_generation_interval()); + + let readonly_graph = client_graph_arc.read_only(); + let channels = readonly_graph.channels(); + let client_channel_count = channels.len(); + assert_eq!(client_channel_count, 1); + + let first_channel = channels.get(&short_channel_id).unwrap(); + assert!(&first_channel.announcement_message.is_none()); + assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.base_msat, 5); + assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.base_msat, 10); + let last_update_seen_a = first_channel.one_to_two.as_ref().unwrap().last_update; + let last_update_seen_b = first_channel.two_to_one.as_ref().unwrap().last_update; + println!("last update a: {}", last_update_seen_a); + println!("last update b: {}", last_update_seen_b); + assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL); + assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL); +} diff --git a/src/types.rs b/src/types.rs index 0b03081..f530bd4 100644 --- a/src/types.rs +++ b/src/types.rs @@ -36,3 +36,56 @@ impl Logger for RGSSLogger { println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args); } } + +#[cfg(test)] +pub mod tests { + use std::collections::HashMap; + use std::sync::{Mutex}; + use lightning::util::logger::{Level, Logger, Record}; + + pub struct TestLogger { + level: Level, + pub(crate) id: String, + pub lines: Mutex>, + } + + impl TestLogger { + pub fn new() -> TestLogger { + let id = crate::tests::db_test_schema(); + Self::with_id(id) + } + pub fn with_id(id: String) -> TestLogger { + TestLogger { + level: Level::Gossip, + id, + lines: Mutex::new(HashMap::new()), + } + } + pub fn enable(&mut self, level: Level) { + self.level = level; + } + pub fn assert_log(&self, module: String, line: String, count: usize) { + let log_entries = self.lines.lock().unwrap(); + assert_eq!(log_entries.get(&(module, line)), Some(&count)); + } + + /// Search for the number of occurrence of the logged lines which + /// 1. belongs to the specified module and + /// 2. contains `line` in it. + /// And asserts if the number of occurrences is the same with the given `count` + pub fn assert_log_contains(&self, module: &str, line: &str, count: usize) { + let log_entries = self.lines.lock().unwrap(); + let l: usize = log_entries.iter().filter(|&(&(ref m, ref l), _c)| { + m == module && l.contains(line) + }).map(|(_, c)| { c }).sum(); + assert_eq!(l, count) + } + } + + impl Logger for TestLogger { + fn log(&self, record: &Record) { + *self.lines.lock().unwrap().entry((record.module_path.to_string(), format!("{}", record.args))).or_insert(0) += 1; + println!("{:<5} {} [{} : {}, {}] {}", record.level.to_string(), self.id, record.module_path, record.file, record.line, record.args); + } + } +}