diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index c38d6dfe080..e9e4e4ab487 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1152,6 +1152,11 @@ where let mut futures = Joiner::new(); + // Capture the number of pending monitor writes before persisting the channel manager. + // We'll only flush this many writes after the manager is persisted, to avoid flushing + // monitor updates that arrived after the manager state was captured. + let pending_monitor_writes = chain_monitor.pending_write_count(); + if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); @@ -1349,6 +1354,15 @@ where res?; } + // Flush the monitor writes that were pending before we persisted the channel manager. + // Any writes that arrived after are left in the queue for the next iteration. + if pending_monitor_writes > 0 { + match chain_monitor.flush(pending_monitor_writes) { + Ok(()) => log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes), + Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e), + } + } + match check_and_reset_sleeper(&mut last_onion_message_handler_call, || { sleeper(ONION_MESSAGE_HANDLER_TIMER) }) { @@ -1413,6 +1427,16 @@ where channel_manager.get_cm().encode(), ) .await?; + + // Flush all pending monitor writes after final channel manager persistence. + let pending_monitor_writes = chain_monitor.pending_write_count(); + if pending_monitor_writes > 0 { + match chain_monitor.flush(pending_monitor_writes) { + Ok(()) => log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes), + Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e), + } + } + if let Some(ref scorer) = scorer { kv_store .write( @@ -1722,6 +1746,9 @@ impl BackgroundProcessor { channel_manager.get_cm().timer_tick_occurred(); last_freshness_call = Instant::now(); } + // Capture the number of pending monitor writes before persisting the channel manager. + let pending_monitor_writes = chain_monitor.pending_write_count(); + if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); (kv_store.write( @@ -1733,6 +1760,16 @@ impl BackgroundProcessor { log_trace!(logger, "Done persisting ChannelManager."); } + // Flush the monitor writes that were pending before we persisted the channel manager. + if pending_monitor_writes > 0 { + match chain_monitor.flush(pending_monitor_writes) { + Ok(()) => { + log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes) + }, + Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e), + } + } + if let Some(liquidity_manager) = liquidity_manager.as_ref() { log_trace!(logger, "Persisting LiquidityManager..."); let _ = liquidity_manager.get_lm().persist().map_err(|e| { @@ -1853,6 +1890,18 @@ impl BackgroundProcessor { CHANNEL_MANAGER_PERSISTENCE_KEY, channel_manager.get_cm().encode(), )?; + + // Flush all pending monitor writes after final channel manager persistence. + let pending_monitor_writes = chain_monitor.pending_write_count(); + if pending_monitor_writes > 0 { + match chain_monitor.flush(pending_monitor_writes) { + Ok(()) => { + log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes) + }, + Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e), + } + } + if let Some(ref scorer) = scorer { kv_store.write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 9fd6383cf7e..c1d2265e0a9 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -39,6 +39,7 @@ use crate::chain::channelmonitor::{ use crate::chain::transaction::{OutPoint, TransactionData}; use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Filter, WatchedOutput}; use crate::events::{self, Event, EventHandler, ReplayEvent}; +use crate::io; use crate::ln::channel_state::ChannelDetails; #[cfg(peer_storage)] use crate::ln::msgs::PeerStorage; @@ -198,16 +199,23 @@ pub trait Persist { /// the monitor already exists in the archive. fn archive_persisted_channel(&self, monitor_name: MonitorName); - /// Fetches the set of [`ChannelMonitorUpdate`]s, previously persisted with - /// [`Self::update_persisted_channel`], which have completed. + /// Returns the number of pending writes in the queue. /// - /// Returning an update here is equivalent to calling - /// [`ChainMonitor::channel_monitor_updated`]. Because of this, this method is defaulted and - /// hidden in the docs. - #[doc(hidden)] - fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> { - Vec::new() + /// This can be used to capture the queue size before persisting the channel manager, + /// then pass that count to [`Self::flush`] to only flush those specific updates. + fn pending_write_count(&self) -> usize { + 0 } + + /// Flushes pending writes to the underlying storage. + /// + /// The `count` parameter specifies how many pending writes to flush. + /// + /// For implementations that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`] + /// from persist methods), this method should write queued data to storage. + /// + /// Returns the list of completed monitor updates (channel_id, update_id) that were flushed. + fn flush(&self, count: usize) -> Result, io::Error>; } struct MonitorHolder { @@ -272,7 +280,6 @@ pub struct AsyncPersister< FE::Target: FeeEstimator, { persister: MonitorUpdatingPersisterAsync, - event_notifier: Arc, } impl< @@ -320,8 +327,7 @@ where &self, monitor_name: MonitorName, monitor: &ChannelMonitor<::EcdsaSigner>, ) -> ChannelMonitorUpdateStatus { - let notifier = Arc::clone(&self.event_notifier); - self.persister.spawn_async_persist_new_channel(monitor_name, monitor, notifier); + self.persister.queue_new_channel(monitor_name, monitor); ChannelMonitorUpdateStatus::InProgress } @@ -329,8 +335,7 @@ where &self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<::EcdsaSigner>, ) -> ChannelMonitorUpdateStatus { - let notifier = Arc::clone(&self.event_notifier); - self.persister.spawn_async_update_channel(monitor_name, monitor_update, monitor, notifier); + self.persister.queue_channel_update(monitor_name, monitor_update, monitor); ChannelMonitorUpdateStatus::InProgress } @@ -338,8 +343,12 @@ where self.persister.spawn_async_archive_persisted_channel(monitor_name); } - fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> { - self.persister.get_and_clear_completed_updates() + fn pending_write_count(&self) -> usize { + self.persister.pending_write_count() + } + + fn flush(&self, count: usize) -> Result, io::Error> { + crate::util::persist::poll_sync_future(self.persister.flush(count)) } } @@ -440,7 +449,6 @@ impl< persister: MonitorUpdatingPersisterAsync, _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, ) -> Self { - let event_notifier = Arc::new(Notifier::new()); Self { monitors: RwLock::new(new_hash_map()), chain_source, @@ -450,8 +458,8 @@ impl< _entropy_source, pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), - event_notifier: Arc::clone(&event_notifier), - persister: AsyncPersister { persister, event_notifier }, + event_notifier: Arc::new(Notifier::new()), + persister: AsyncPersister { persister }, pending_send_only_events: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, @@ -742,6 +750,30 @@ where .collect() } + /// Returns the number of pending writes in the persister queue. + /// + /// This can be used to capture the queue size before persisting the channel manager, + /// then pass that count to [`Self::flush`] to only flush those specific updates. + pub fn pending_write_count(&self) -> usize { + self.persister.pending_write_count() + } + + /// Flushes pending writes to the underlying storage. + /// + /// If `count` is `Some(n)`, only the first `n` pending writes are flushed. + /// If `count` is `None`, all pending writes are flushed. + /// + /// For persisters that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`] + /// from persist methods), this method writes queued data to storage and signals + /// completion to the channel manager via [`Self::channel_monitor_updated`]. + pub fn flush(&self, count: usize) -> Result<(), io::Error> { + let completed = self.persister.flush(count)?; + for (channel_id, update_id) in completed { + let _ = self.channel_monitor_updated(channel_id, update_id); + } + Ok(()) + } + #[cfg(any(test, feature = "_test_utils"))] pub fn remove_monitor(&self, channel_id: &ChannelId) -> ChannelMonitor { self.monitors.write().unwrap().remove(channel_id).unwrap().monitor @@ -1497,9 +1529,6 @@ where fn release_pending_monitor_events( &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { - for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() { - let _ = self.channel_monitor_updated(channel_id, update_id); - } let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events(); diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 2e1e8805d0a..7bcf46534ce 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -18,9 +18,8 @@ use bitcoin::{BlockHash, Txid}; use core::convert::Infallible; use core::future::Future; -use core::mem; use core::ops::Deref; -use core::pin::{pin, Pin}; +use core::pin::pin; use core::str::FromStr; use core::task; @@ -41,7 +40,6 @@ use crate::util::async_poll::{ use crate::util::logger::Logger; use crate::util::native_async::FutureSpawner; use crate::util::ser::{Readable, ReadableArgs, Writeable}; -use crate::util::wakers::Notifier; /// The alphabet of characters allowed for namespaces and keys. pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = @@ -442,6 +440,11 @@ impl Persist Result, io::Error> { + // KVStoreSync implementations persist immediately, so there's nothing to flush. + Ok(Vec::new()) + } } /// Read previously persisted [`ChannelMonitor`]s from the store. @@ -501,7 +504,7 @@ impl FutureSpawner for PanicingSpawner { } } -fn poll_sync_future(future: F) -> F::Output { +pub(crate) fn poll_sync_future(future: F) -> F::Output { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); match pin!(future).poll(&mut ctx) { @@ -513,6 +516,28 @@ fn poll_sync_future(future: F) -> F::Output { } } +/// Represents a pending write operation that will be executed when +/// [`MonitorUpdatingPersister::flush`] is called. +enum PendingWrite { + /// A full channel monitor write. + FullMonitor { + monitor_key: String, + monitor_bytes: Vec, + /// The (channel_id, update_id) pair to signal as complete after the write. + completion: (ChannelId, u64), + /// Range of stale update IDs to clean up after the write: `start..end` (exclusive end). + stale_update_cleanup: Option<(u64, u64)>, + }, + /// A channel monitor update write. + Update { + monitor_key: String, + update_key: String, + update_bytes: Vec, + /// The (channel_id, update_id) pair to signal as complete after the write. + completion: (ChannelId, u64), + }, +} + /// Implements [`Persist`] in a way that writes and reads both [`ChannelMonitor`]s and /// [`ChannelMonitorUpdate`]s. /// @@ -588,6 +613,13 @@ fn poll_sync_future(future: F) -> F::Output { /// If you have many stale updates stored (such as after a crash with pending lazy deletes), and /// would like to get rid of them, consider using the /// [`MonitorUpdatingPersister::cleanup_stale_updates`] function. +/// +/// # Deferred Persistence +/// +/// When [`Persist::persist_new_channel`] or [`Persist::update_persisted_channel`] is called, the +/// serialized data is not immediately written to disk. Instead, it is stored in an internal queue +/// and [`ChannelMonitorUpdateStatus::InProgress`] is returned. To actually persist the data to disk, +/// call [`MonitorUpdatingPersister::flush`] which will write all pending data to the [`KVStoreSync`]. pub struct MonitorUpdatingPersister( MonitorUpdatingPersisterAsync, PanicingSpawner, L, ES, SP, BI, FE>, ) @@ -708,61 +740,49 @@ where BI::Target: BroadcasterInterface, FE::Target: FeeEstimator, { - /// Persists a new channel. This means writing the entire monitor to the - /// parametrized [`KVStoreSync`]. + /// Queues a new channel monitor to be persisted. The actual write happens when + /// [`MonitorUpdatingPersister::flush`] is called. + /// + /// Returns [`ChannelMonitorUpdateStatus::InProgress`] to indicate the write is pending. fn persist_new_channel( &self, monitor_name: MonitorName, monitor: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { - let res = poll_sync_future(self.0 .0.persist_new_channel(monitor_name, monitor)); - match res { - Ok(_) => chain::ChannelMonitorUpdateStatus::Completed, - Err(e) => { - log_error!( - self.0 .0.logger, - "Failed to write ChannelMonitor {}/{}/{} reason: {}", - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - monitor_name, - e - ); - chain::ChannelMonitorUpdateStatus::UnrecoverableError - }, - } + let completion = (monitor.channel_id(), monitor.get_latest_update_id()); + // New channel, no stale updates to clean up + self.0 .0.queue_monitor_write(monitor_name, monitor, completion, None); + chain::ChannelMonitorUpdateStatus::InProgress } - /// Persists a channel update, writing only the update to the parameterized [`KVStoreSync`] if possible. + /// Queues a channel update to be persisted. The actual write happens when + /// [`MonitorUpdatingPersister::flush`] is called. /// - /// In some cases, this will forward to [`MonitorUpdatingPersister::persist_new_channel`]: + /// In some cases, this will write the full monitor instead of just the update: /// - /// - No full monitor is found in [`KVStoreSync`] /// - The number of pending updates exceeds `maximum_pending_updates` as given to [`Self::new`] /// - LDK commands re-persisting the entire monitor through this function, specifically when - /// `update` is `None`. + /// `update` is `None`. /// - The update is at [`u64::MAX`], indicating an update generated by pre-0.1 LDK. + /// + /// Returns [`ChannelMonitorUpdateStatus::InProgress`] to indicate the write is pending. fn update_persisted_channel( &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { - let inner = Arc::clone(&self.0 .0); - let res = poll_sync_future(inner.update_persisted_channel(monitor_name, update, monitor)); - match res { - Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, - Err(e) => { - log_error!( - self.0 .0.logger, - "Failed to write ChannelMonitorUpdate {} id {} reason: {}", - monitor_name, - update.as_ref().map(|upd| upd.update_id).unwrap_or(0), - e - ); - chain::ChannelMonitorUpdateStatus::UnrecoverableError - }, - } + self.0 .0.queue_channel_update(monitor_name, update, monitor); + chain::ChannelMonitorUpdateStatus::InProgress } fn archive_persisted_channel(&self, monitor_name: MonitorName) { poll_sync_future(self.0 .0.archive_persisted_channel(monitor_name)); } + + fn pending_write_count(&self) -> usize { + self.0.pending_write_count() + } + + fn flush(&self, count: usize) -> Result, io::Error> { + poll_sync_future(self.0.flush(count)) + } } /// A variant of the [`MonitorUpdatingPersister`] which utilizes the async [`KVStore`] and offers @@ -811,7 +831,7 @@ struct MonitorUpdatingPersisterAsyncInner< FE::Target: FeeEstimator, { kv_store: K, - async_completed_updates: Mutex>, + pending_writes: Mutex>, future_spawner: S, logger: L, maximum_pending_updates: u64, @@ -840,7 +860,7 @@ where ) -> Self { MonitorUpdatingPersisterAsync(Arc::new(MonitorUpdatingPersisterAsyncInner { kv_store, - async_completed_updates: Mutex::new(Vec::new()), + pending_writes: Mutex::new(Vec::new()), future_spawner, logger, maximum_pending_updates, @@ -963,6 +983,86 @@ where pub async fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> { self.0.cleanup_stale_updates(lazy).await } + + /// Returns the number of pending writes in the queue. + /// + /// This can be used to capture the queue size before persisting the channel manager, + /// then pass that count to [`Self::flush`] to only flush those specific updates. + pub fn pending_write_count(&self) -> usize { + self.0.pending_writes.lock().unwrap().len() + } + + /// Flushes pending writes to the underlying [`KVStore`]. + /// + /// If `count` is `Some(n)`, only the first `n` pending writes are flushed. + /// If `count` is `None`, all pending writes are flushed. + /// + /// This method should be called after one or more calls that queue persist operations + /// to actually write the data to storage. + /// + /// Returns the list of completed monitor updates (channel_id, update_id) that were flushed. + pub async fn flush(&self, count: usize) -> Result, io::Error> { + let pending = { + let mut queue = self.0.pending_writes.lock().unwrap(); + let n = count.min(queue.len()); + queue.drain(..n).collect::>() + }; + + let mut completed = Vec::new(); + for write in pending { + match write { + PendingWrite::FullMonitor { + monitor_key, + monitor_bytes, + completion, + stale_update_cleanup, + } => { + self.0 + .kv_store + .write( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + &monitor_key, + monitor_bytes, + ) + .await?; + completed.push(completion); + + // Clean up stale updates after successfully writing a full monitor + if let Some((start, end)) = stale_update_cleanup { + for update_id in start..end { + let update_name = UpdateName::from(update_id); + // Lazy delete - ignore errors as this is just cleanup + let _ = self + .0 + .kv_store + .remove( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + &monitor_key, + update_name.as_str(), + true, + ) + .await; + } + } + }, + PendingWrite::Update { monitor_key, update_key, update_bytes, completion } => { + self.0 + .kv_store + .write( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + &monitor_key, + &update_key, + update_bytes, + ) + .await?; + completed.push(completion); + }, + } + } + + Ok(completed) + } } impl< @@ -983,63 +1083,24 @@ where FE::Target: FeeEstimator, ::EcdsaSigner: MaybeSend + 'static, { - pub(crate) fn spawn_async_persist_new_channel( + /// Queues a new channel monitor to be persisted. The actual write happens when + /// [`Self::flush`] is called. + pub(crate) fn queue_new_channel( &self, monitor_name: MonitorName, monitor: &ChannelMonitor<::EcdsaSigner>, - notifier: Arc, ) { - let inner = Arc::clone(&self.0); - // Note that `persist_new_channel` is a sync method which calls all the way through to the - // sync KVStore::write method (which returns a future) to ensure writes are well-ordered. - let future = inner.persist_new_channel(monitor_name, monitor); - let channel_id = monitor.channel_id(); let completion = (monitor.channel_id(), monitor.get_latest_update_id()); - let _runs_free = self.0.future_spawner.spawn(async move { - match future.await { - Ok(()) => { - inner.async_completed_updates.lock().unwrap().push(completion); - notifier.notify(); - }, - Err(e) => { - log_error!( - inner.logger, - "Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.", - ); - }, - } - }); + // New channel, no stale updates to clean up + self.0.queue_monitor_write(monitor_name, monitor, completion, None); } - pub(crate) fn spawn_async_update_channel( + /// Queues a channel update to be persisted. The actual write happens when + /// [`Self::flush`] is called. + pub(crate) fn queue_channel_update( &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<::EcdsaSigner>, - notifier: Arc, ) { - let inner = Arc::clone(&self.0); - // Note that `update_persisted_channel` is a sync method which calls all the way through to - // the sync KVStore::write method (which returns a future) to ensure writes are well-ordered - let future = inner.update_persisted_channel(monitor_name, update, monitor); - let channel_id = monitor.channel_id(); - let completion = if let Some(update) = update { - Some((monitor.channel_id(), update.update_id)) - } else { - None - }; - let inner = Arc::clone(&self.0); - let _runs_free = self.0.future_spawner.spawn(async move { - match future.await { - Ok(()) => if let Some(completion) = completion { - inner.async_completed_updates.lock().unwrap().push(completion); - notifier.notify(); - }, - Err(e) => { - log_error!( - inner.logger, - "Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.", - ); - }, - } - }); + self.0.queue_channel_update(monitor_name, update, monitor); } pub(crate) fn spawn_async_archive_persisted_channel(&self, monitor_name: MonitorName) { @@ -1048,15 +1109,8 @@ where inner.archive_persisted_channel(monitor_name).await; }); } - - pub(crate) fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> { - mem::take(&mut *self.0.async_completed_updates.lock().unwrap()) - } } -trait MaybeSendableFuture: Future> + MaybeSend {} -impl> + MaybeSend> MaybeSendableFuture for F {} - impl MonitorUpdatingPersisterAsyncInner where @@ -1231,111 +1285,83 @@ where Ok(()) } - fn persist_new_channel<'a, ChannelSigner: EcdsaChannelSigner>( - &'a self, monitor_name: MonitorName, monitor: &'a ChannelMonitor, - ) -> Pin> + 'static>> { - // Determine the proper key for this monitor + /// Queues a full monitor write to the pending writes queue. + fn queue_monitor_write( + &self, monitor_name: MonitorName, monitor: &ChannelMonitor, + completion: (ChannelId, u64), stale_update_cleanup: Option<(u64, u64)>, + ) { let monitor_key = monitor_name.to_string(); - // Serialize and write the new monitor + let mut monitor_bytes = Vec::with_capacity( MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(), ); - // If `maximum_pending_updates` is zero, we aren't actually writing monitor updates at all. - // Thus, there's no need to add the sentinel prefix as the monitor can be read directly - // from disk without issue. if self.maximum_pending_updates != 0 { monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL); } monitor.write(&mut monitor_bytes).unwrap(); - // Note that this is NOT an async function, but rather calls the *sync* KVStore write - // method, allowing it to do its queueing immediately, and then return a future for the - // completion of the write. This ensures monitor persistence ordering is preserved. - let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; - let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; - // There's no real reason why this needs to be boxed, but dropping it rams into the "hidden - // type for impl... captures lifetime that does not appear in bounds" issue. This can - // trivially be dropped once we upgrade to edition 2024/MSRV 1.85. - Box::pin(self.kv_store.write(primary, secondary, monitor_key.as_str(), monitor_bytes)) + + self.pending_writes.lock().unwrap().push(PendingWrite::FullMonitor { + monitor_key, + monitor_bytes, + completion, + stale_update_cleanup, + }); + } + + /// Queues an update write to the pending writes queue. + fn queue_update_write( + &self, monitor_name: MonitorName, update: &ChannelMonitorUpdate, + completion: (ChannelId, u64), + ) { + let monitor_key = monitor_name.to_string(); + let update_name = UpdateName::from(update.update_id); + let update_bytes = update.encode(); + + self.pending_writes.lock().unwrap().push(PendingWrite::Update { + monitor_key, + update_key: update_name.as_str().to_string(), + update_bytes, + completion, + }); } - fn update_persisted_channel<'a, ChannelSigner: EcdsaChannelSigner + 'a>( - self: Arc, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, + /// Queues writes for a channel update. Decides whether to write just the update or a full + /// monitor based on the update ID and maximum_pending_updates configuration. + fn queue_channel_update( + &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor, - ) -> impl Future> + 'a - where - Self: 'a, - { + ) { const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX; - let mut res_a = None; - let mut res_b = None; - let mut res_c = None; + + let completion = ( + monitor.channel_id(), + update.map(|u| u.update_id).unwrap_or_else(|| monitor.get_latest_update_id()), + ); + if let Some(update) = update { let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID && self.maximum_pending_updates != 0 && update.update_id % self.maximum_pending_updates != 0; + if persist_update { - let monitor_key = monitor_name.to_string(); - let update_name = UpdateName::from(update.update_id); - let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE; - // Note that this is NOT an async function, but rather calls the *sync* KVStore - // write method, allowing it to do its queueing immediately, and then return a - // future for the completion of the write. This ensures monitor persistence - // ordering is preserved. - let encoded = update.encode(); - res_a = Some(async move { - self.kv_store.write(primary, &monitor_key, update_name.as_str(), encoded).await - }); + // Write just the update + self.queue_update_write(monitor_name, update, completion); } else { - // We could write this update, but it meets criteria of our design that calls for a full monitor write. - // Note that this is NOT an async function, but rather calls the *sync* KVStore - // write method, allowing it to do its queueing immediately, and then return a - // future for the completion of the write. This ensures monitor persistence - // ordering is preserved. This, thus, must happen before any await we do below. - let write_fut = self.persist_new_channel(monitor_name, monitor); + // Write full monitor and clean up stale updates afterward let latest_update_id = monitor.get_latest_update_id(); - - res_b = Some(async move { - let write_status = write_fut.await; - if let Ok(()) = write_status { - if latest_update_id == LEGACY_CLOSED_CHANNEL_UPDATE_ID { - let monitor_key = monitor_name.to_string(); - self.cleanup_stale_updates_for_monitor_to( - &monitor_key, - latest_update_id, - true, - ) - .await?; - } else { - let end = latest_update_id; - let start = end.saturating_sub(self.maximum_pending_updates); - self.cleanup_in_range(monitor_name, start, end).await; - } - } - - write_status - }); + let stale_update_cleanup = if latest_update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID { + let end = latest_update_id; + let start = end.saturating_sub(self.maximum_pending_updates); + Some((start, end)) + } else { + None + }; + self.queue_monitor_write(monitor_name, monitor, completion, stale_update_cleanup); } } else { - // There is no update given, so we must persist a new monitor. - // Note that this is NOT an async function, but rather calls the *sync* KVStore write - // method, allowing it to do its queueing immediately, and then return a future for the - // completion of the write. This ensures monitor persistence ordering is preserved. - res_c = Some(self.persist_new_channel(monitor_name, monitor)); - } - async move { - // Complete any pending future(s). Note that to keep one return type we have to end - // with a single async move block that we return, rather than trying to return the - // individual futures themselves. - if let Some(a) = res_a { - a.await?; - } - if let Some(b) = res_b { - b.await?; - } - if let Some(c) = res_c { - c.await?; - } - Ok(()) + // No update given, persist full monitor (no cleanup needed as this is typically + // called for force-close or other special cases) + self.queue_monitor_write(monitor_name, monitor, completion, None); } } @@ -1355,24 +1381,6 @@ where let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; let _ = self.kv_store.remove(primary, secondary, &monitor_key, true).await; } - - // Cleans up monitor updates for given monitor in range `start..=end`. - async fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) { - let monitor_key = monitor_name.to_string(); - for update_id in start..=end { - let update_name = UpdateName::from(update_id); - let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE; - let res = self.kv_store.remove(primary, &monitor_key, update_name.as_str(), true).await; - if let Err(e) = res { - log_error!( - self.logger, - "Failed to clean up channel monitor updates for monitor {}, reason: {}", - monitor_key.as_str(), - e - ); - }; - } - } } /// A struct representing a name for a channel monitor. diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 34f5d5fe36e..5f7a8d5981a 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -824,6 +824,10 @@ impl Persist for WatchtowerPers monitor_name, ); } + + fn flush(&self, _count: usize) -> Result, io::Error> { + Ok(Vec::new()) + } } pub struct TestPersister { @@ -887,6 +891,10 @@ impl Persist for TestPersister self.offchain_monitor_updates.lock().unwrap().remove(&monitor_name); self.chain_sync_monitor_persistences.lock().unwrap().retain(|x| x != &monitor_name); } + + fn flush(&self, _count: usize) -> Result, io::Error> { + Ok(Vec::new()) + } } // A simple multi-producer-single-consumer one-shot channel