Skip to content

Commit

Permalink
Merge pull request #2964 from jbesraa/prune-stale-chanmonitor
Browse files Browse the repository at this point in the history
Add `archive_fully_resolved_monitors` to `ChainMonitor`
  • Loading branch information
TheBlueMatt authored Apr 18, 2024
2 parents ae4c35c + 4b55043 commit 195e666
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 0 deletions.
3 changes: 3 additions & 0 deletions fuzz/src/utils/test_persister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ impl chainmonitor::Persist<TestChannelSigner> for TestPersister {
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<TestChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
self.update_ret.lock().unwrap().clone()
}

fn archive_persisted_channel(&self, _: OutPoint) {
}
}
40 changes: 40 additions & 0 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ pub trait Persist<ChannelSigner: WriteableEcdsaChannelSigner> {
///
/// [`Writeable::write`]: crate::util::ser::Writeable::write
fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
/// Prevents the channel monitor from being loaded on startup.
///
/// Archiving the data in a backup location (rather than deleting it fully) is useful for
/// hedging against data loss in case of unexpected failure.
fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint);
}

struct MonitorHolder<ChannelSigner: WriteableEcdsaChannelSigner> {
Expand Down Expand Up @@ -656,6 +661,41 @@ where C::Target: chain::Filter,
}
}
}

/// Archives fully resolved channel monitors by calling [`Persist::archive_persisted_channel`].
///
/// This is useful for pruning fully resolved monitors from the monitor set and primary
/// storage so they are not kept in memory and reloaded on restart.
///
/// Should be called occasionally (once every handful of blocks or on startup).
///
/// Depending on the implementation of [`Persist::archive_persisted_channel`] the monitor
/// data could be moved to an archive location or removed entirely.
pub fn archive_fully_resolved_channel_monitors(&self) {
let mut have_monitors_to_prune = false;
for (_, monitor_holder) in self.monitors.read().unwrap().iter() {
let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor);
if monitor_holder.monitor.is_fully_resolved(&logger) {
have_monitors_to_prune = true;
}
}
if have_monitors_to_prune {
let mut monitors = self.monitors.write().unwrap();
monitors.retain(|funding_txo, monitor_holder| {
let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor);
if monitor_holder.monitor.is_fully_resolved(&logger) {
log_info!(logger,
"Archiving fully resolved ChannelMonitor for funding txo {}",
funding_txo
);
self.persister.archive_persisted_channel(*funding_txo);
false
} else {
true
}
});
}
}
}

impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
Expand Down
54 changes: 54 additions & 0 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,9 @@ pub(crate) struct ChannelMonitorImpl<Signer: WriteableEcdsaChannelSigner> {
/// Ordering of tuple data: (their_per_commitment_point, feerate_per_kw, to_broadcaster_sats,
/// to_countersignatory_sats)
initial_counterparty_commitment_info: Option<(PublicKey, u32, u64, u64)>,

/// The first block height at which we had no remaining claimable balances.
balances_empty_height: Option<u32>,
}

/// Transaction outputs to watch for on-chain spends.
Expand Down Expand Up @@ -1145,6 +1148,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Writeable for ChannelMonitorImpl<Signe
(15, self.counterparty_fulfilled_htlcs, required),
(17, self.initial_counterparty_commitment_info, option),
(19, self.channel_id, required),
(21, self.balances_empty_height, option),
});

Ok(())
Expand Down Expand Up @@ -1328,6 +1332,7 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
best_block,
counterparty_node_id: Some(counterparty_node_id),
initial_counterparty_commitment_info: None,
balances_empty_height: None,
})
}

Expand Down Expand Up @@ -1856,6 +1861,52 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
spendable_outputs
}

/// Checks if the monitor is fully resolved. Resolved monitor is one that has claimed all of
/// its outputs and balances (i.e. [`Self::get_claimable_balances`] returns an empty set).
///
/// This function returns true only if [`Self::get_claimable_balances`] has been empty for at least
/// 2016 blocks as an additional protection against any bugs resulting in spuriously empty balance sets.
pub fn is_fully_resolved<L: Logger>(&self, logger: &L) -> bool {
let mut is_all_funds_claimed = self.get_claimable_balances().is_empty();
let current_height = self.current_best_block().height;
let mut inner = self.inner.lock().unwrap();

if is_all_funds_claimed {
if !inner.funding_spend_seen {
debug_assert!(false, "We should see funding spend by the time a monitor clears out");
is_all_funds_claimed = false;
}
}

match (inner.balances_empty_height, is_all_funds_claimed) {
(Some(balances_empty_height), true) => {
// Claimed all funds, check if reached the blocks threshold.
const BLOCKS_THRESHOLD: u32 = 4032; // ~four weeks
return current_height >= balances_empty_height + BLOCKS_THRESHOLD;
},
(Some(_), false) => {
// previously assumed we claimed all funds, but we have new funds to claim.
// Should not happen in practice.
debug_assert!(false, "Thought we were done claiming funds, but claimable_balances now has entries");
log_error!(logger,
"WARNING: LDK thought it was done claiming all the available funds in the ChannelMonitor for channel {}, but later decided it had more to claim. This is potentially an important bug in LDK, please report it at https://github.com/lightningdevkit/rust-lightning/issues/new",
inner.get_funding_txo().0);
inner.balances_empty_height = None;
false
},
(None, true) => {
// Claimed all funds but `balances_empty_height` is None. It is set to the
// current block height.
inner.balances_empty_height = Some(current_height);
false
},
(None, false) => {
// Have funds to claim.
false
},
}
}

#[cfg(test)]
pub fn get_counterparty_payment_script(&self) -> ScriptBuf {
self.inner.lock().unwrap().counterparty_payment_script.clone()
Expand Down Expand Up @@ -4632,6 +4683,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
let mut spendable_txids_confirmed = Some(Vec::new());
let mut counterparty_fulfilled_htlcs = Some(new_hash_map());
let mut initial_counterparty_commitment_info = None;
let mut balances_empty_height = None;
let mut channel_id = None;
read_tlv_fields!(reader, {
(1, funding_spend_confirmed, option),
Expand All @@ -4644,6 +4696,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
(15, counterparty_fulfilled_htlcs, option),
(17, initial_counterparty_commitment_info, option),
(19, channel_id, option),
(21, balances_empty_height, option),
});

// `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. If we have both
Expand Down Expand Up @@ -4722,6 +4775,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
best_block,
counterparty_node_id,
initial_counterparty_commitment_info,
balances_empty_height,
})))
}
}
Expand Down
54 changes: 54 additions & 0 deletions lightning/src/ln/monitor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,60 @@ fn revoked_output_htlc_resolution_timing() {
expect_payment_failed!(nodes[1], payment_hash_1, false);
}

#[test]
fn archive_fully_resolved_monitors() {
// Test we can archive fully resolved channel monitor.
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let mut user_config = test_default_channel_config();
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[Some(user_config), Some(user_config)]);
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);

let (_, _, chan_id, funding_tx) =
create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 1_000_000);

nodes[0].node.close_channel(&chan_id, &nodes[1].node.get_our_node_id()).unwrap();
let node_0_shutdown = get_event_msg!(nodes[0], MessageSendEvent::SendShutdown, nodes[1].node.get_our_node_id());
nodes[1].node.handle_shutdown(&nodes[0].node.get_our_node_id(), &node_0_shutdown);
let node_1_shutdown = get_event_msg!(nodes[1], MessageSendEvent::SendShutdown, nodes[0].node.get_our_node_id());
nodes[0].node.handle_shutdown(&nodes[1].node.get_our_node_id(), &node_1_shutdown);

let node_0_closing_signed = get_event_msg!(nodes[0], MessageSendEvent::SendClosingSigned, nodes[1].node.get_our_node_id());
nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &node_0_closing_signed);
let node_1_closing_signed = get_event_msg!(nodes[1], MessageSendEvent::SendClosingSigned, nodes[0].node.get_our_node_id());
nodes[0].node.handle_closing_signed(&nodes[1].node.get_our_node_id(), &node_1_closing_signed);
let (_, node_0_2nd_closing_signed) = get_closing_signed_broadcast!(nodes[0].node, nodes[1].node.get_our_node_id());
nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &node_0_2nd_closing_signed.unwrap());
let (_, _) = get_closing_signed_broadcast!(nodes[1].node, nodes[0].node.get_our_node_id());

let shutdown_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);

mine_transaction(&nodes[0], &shutdown_tx[0]);
mine_transaction(&nodes[1], &shutdown_tx[0]);

connect_blocks(&nodes[0], 6);
connect_blocks(&nodes[1], 6);

check_closed_event!(nodes[0], 1, ClosureReason::LocallyInitiatedCooperativeClosure, [nodes[1].node.get_our_node_id()], 1000000);
check_closed_event!(nodes[1], 1, ClosureReason::CounterpartyInitiatedCooperativeClosure, [nodes[0].node.get_our_node_id()], 1000000);

assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 1);
// First archive should set balances_empty_height to current block height
nodes[0].chain_monitor.chain_monitor.archive_fully_resolved_channel_monitors();
assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 1);
connect_blocks(&nodes[0], 4032);
// Second call after 4032 blocks, should archive the monitor
nodes[0].chain_monitor.chain_monitor.archive_fully_resolved_channel_monitors();
// Should have no monitors left
assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 0);
// Remove the corresponding outputs and transactions the chain source is
// watching. This is to make sure the `Drop` function assertions pass.
nodes.get_mut(0).unwrap().chain_source.remove_watched_txn_and_outputs(
OutPoint { txid: funding_tx.txid(), index: 0 },
funding_tx.output[0].script_pubkey.clone()
);
}

fn do_chanmon_claim_value_coop_close(anchors: bool) {
// Tests `get_claimable_balances` returns the correct values across a simple cooperative claim.
// Specifically, this tests that the channel non-HTLC balances show up in
Expand Down
55 changes: 55 additions & 0 deletions lightning/src/util/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ pub const CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
/// The primary namespace under which [`ChannelMonitorUpdate`]s will be persisted.
pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitor_updates";

/// The primary namespace under which archived [`ChannelMonitor`]s will be persisted.
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "archived_monitors";
/// The secondary namespace under which archived [`ChannelMonitor`]s will be persisted.
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";

/// The primary namespace under which the [`NetworkGraph`] will be persisted.
pub const NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
/// The secondary namespace under which the [`NetworkGraph`] will be persisted.
Expand Down Expand Up @@ -226,6 +231,33 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore + ?Sized> Persist<Ch
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
}
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) {
let monitor_name = MonitorName::from(funding_txo);
let monitor = match self.read(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
) {
Ok(monitor) => monitor,
Err(_) => return
};
match self.write(
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
&monitor,
) {
Ok(()) => {}
Err(_e) => return
};
let _ = self.remove(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
true,
);
}
}

/// Read previously persisted [`ChannelMonitor`]s from the store.
Expand Down Expand Up @@ -732,6 +764,29 @@ where
self.persist_new_channel(funding_txo, monitor, monitor_update_call_id)
}
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) {
let monitor_name = MonitorName::from(funding_txo);
let monitor = match self.read_monitor(&monitor_name) {
Ok((_block_hash, monitor)) => monitor,
Err(_) => return
};
match self.kv_store.write(
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
&monitor.encode()
) {
Ok(()) => {},
Err(_e) => return,
};
let _ = self.kv_store.remove(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
true,
);
}
}

impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>
Expand Down
20 changes: 20 additions & 0 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,10 @@ impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Sig
}
res
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) {
<TestPersister as chainmonitor::Persist<TestChannelSigner>>::archive_persisted_channel(&self.persister, funding_txo);
}
}

pub struct TestPersister {
Expand Down Expand Up @@ -552,6 +556,18 @@ impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Sig
}
ret
}

fn archive_persisted_channel(&self, funding_txo: OutPoint) {
// remove the channel from the offchain_monitor_updates map
match self.offchain_monitor_updates.lock().unwrap().remove(&funding_txo) {
Some(_) => {},
None => {
// If the channel was not in the offchain_monitor_updates map, it should be in the
// chain_sync_monitor_persistences map.
assert!(self.chain_sync_monitor_persistences.lock().unwrap().remove(&funding_txo).is_some());
}
};
}
}

pub struct TestStore {
Expand Down Expand Up @@ -1366,6 +1382,10 @@ impl TestChainSource {
watched_outputs: Mutex::new(new_hash_set()),
}
}
pub fn remove_watched_txn_and_outputs(&self, outpoint: OutPoint, script_pubkey: ScriptBuf) {
self.watched_outputs.lock().unwrap().remove(&(outpoint, script_pubkey.clone()));
self.watched_txn.lock().unwrap().remove(&(outpoint.txid, script_pubkey));
}
}

impl UtxoLookup for TestChainSource {
Expand Down

0 comments on commit 195e666

Please sign in to comment.