diff --git a/Cargo.toml b/Cargo.toml index 09ae4b03a..e0ace123f 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,17 +39,17 @@ default = [] #lightning-liquidity = { version = "0.2.0", features = ["std"] } #lightning-macros = { version = "0.2.0" } -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] } -lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } -lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] } -lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } -lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["tokio"] } -lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } -lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } -lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["rest-client", "rpc-client", "tokio"] } -lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } -lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] } -lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] } +lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } +lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] } +lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } +lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["tokio"] } +lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } +lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } +lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["rest-client", "rpc-client", "tokio"] } +lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } +lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] } +lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] } bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]} @@ -78,13 +78,13 @@ log = { version = "0.4.22", default-features = false, features = ["std"]} vss-client = { package = "vss-client-ng", version = "0.4" } prost = { version = "0.11.6", default-features = false} #bitcoin-payment-instructions = { version = "0.6" } -bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", rev = "fdca6c62f2fe2c53427d3e51e322a49aa7323ee2" } +bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", rev = "ce9ff5281ae9bb05526981f6f9df8f8d929c7c44" } [target.'cfg(windows)'.dependencies] winapi = { version = "0.3", features = ["winbase"] } [dev-dependencies] -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std", "_test_utils"] } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std", "_test_utils"] } proptest = "1.0.0" regex = "1.5.6" criterion = { version = "0.7.0", features = ["async_tokio"] } diff --git a/src/builder.rs b/src/builder.rs index 510d86bdd..2046956e0 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -33,7 +33,7 @@ use lightning::routing::scoring::{ }; use lightning::sign::{EntropySource, NodeSigner}; use lightning::util::persist::{ - KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::ser::ReadableArgs; @@ -69,11 +69,12 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::peer_store::PeerStore; -use crate::runtime::Runtime; +use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, KeysManager, - MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, SyncAndAsyncKVStore, + AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, + KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, + SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1051,10 +1052,20 @@ fn build_with_store_internal( } } + let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger))); + let fee_estimator = Arc::new(OnchainFeeEstimator::new()); + + let kv_store_ref = Arc::clone(&kv_store); + let logger_ref = Arc::clone(&logger); + let (payment_store_res, node_metris_res) = runtime.block_on(async move { + tokio::join!( + read_payments(&*kv_store_ref, Arc::clone(&logger_ref)), + read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), + ) + }); + // Initialize the status fields. - let node_metrics = match runtime - .block_on(async { read_node_metrics(&*kv_store, Arc::clone(&logger)).await }) - { + let node_metrics = match node_metris_res { Ok(metrics) => Arc::new(RwLock::new(metrics)), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1065,23 +1076,20 @@ fn build_with_store_internal( } }, }; - let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger))); - let fee_estimator = Arc::new(OnchainFeeEstimator::new()); - let payment_store = - match runtime.block_on(async { read_payments(&*kv_store, Arc::clone(&logger)).await }) { - Ok(payments) => Arc::new(PaymentStore::new( - payments, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), - Arc::clone(&kv_store), - Arc::clone(&logger), - )), - Err(e) => { - log_error!(logger, "Failed to read payment data from store: {}", e); - return Err(BuildError::ReadFailed); - }, - }; + let payment_store = match payment_store_res { + Ok(payments) => Arc::new(PaymentStore::new( + payments, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read payment data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; let (chain_source, chain_tip_opt) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { @@ -1261,8 +1269,9 @@ fn build_with_store_internal( )); let peer_storage_key = keys_manager.get_peer_storage_key(); - let persister = Arc::new(Persister::new( + let monitor_reader = Arc::new(AsyncPersister::new( Arc::clone(&kv_store), + RuntimeSpawner::new(Arc::clone(&runtime)), Arc::clone(&logger), PERSISTER_MAX_PENDING_UPDATES, Arc::clone(&keys_manager), @@ -1271,8 +1280,18 @@ fn build_with_store_internal( Arc::clone(&fee_estimator), )); + // Read ChannelMonitors and the NetworkGraph + let kv_store_ref = Arc::clone(&kv_store); + let logger_ref = Arc::clone(&logger); + let (monitor_read_res, network_graph_res) = runtime.block_on(async move { + tokio::join!( + monitor_reader.read_all_channel_monitors_with_updates_parallel(), + read_network_graph(&*kv_store_ref, logger_ref), + ) + }); + // Read ChannelMonitor state from store - let channel_monitors = match persister.read_all_channel_monitors_with_updates() { + let channel_monitors = match monitor_read_res { Ok(monitors) => monitors, Err(e) => { if e.kind() == lightning::io::ErrorKind::NotFound { @@ -1284,6 +1303,16 @@ fn build_with_store_internal( }, }; + let persister = Arc::new(Persister::new( + Arc::clone(&kv_store), + Arc::clone(&logger), + PERSISTER_MAX_PENDING_UPDATES, + Arc::clone(&keys_manager), + Arc::clone(&keys_manager), + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + )); + // Initialize the ChainMonitor let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( Some(Arc::clone(&chain_source)), @@ -1296,9 +1325,7 @@ fn build_with_store_internal( )); // Initialize the network graph, scorer, and router - let network_graph = match runtime - .block_on(async { read_network_graph(&*kv_store, Arc::clone(&logger)).await }) - { + let network_graph = match network_graph_res { Ok(graph) => Arc::new(graph), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1310,9 +1337,42 @@ fn build_with_store_internal( }, }; - let local_scorer = match runtime.block_on(async { - read_scorer(&*kv_store, Arc::clone(&network_graph), Arc::clone(&logger)).await - }) { + // Read various smaller LDK and ldk-node objects from the store + let kv_store_ref = Arc::clone(&kv_store); + let logger_ref = Arc::clone(&logger); + let network_graph_ref = Arc::clone(&network_graph); + let output_sweeper_future = read_output_sweeper( + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + Arc::clone(&chain_source), + Arc::clone(&keys_manager), + Arc::clone(&kv_store_ref), + Arc::clone(&logger_ref), + ); + let ( + scorer_res, + external_scores_res, + channel_manager_bytes_res, + sweeper_bytes_res, + event_queue_res, + peer_info_res, + ) = runtime.block_on(async move { + tokio::join!( + read_scorer(&*kv_store_ref, network_graph_ref, Arc::clone(&logger_ref)), + read_external_pathfinding_scores_from_cache(&*kv_store_ref, Arc::clone(&logger_ref)), + KVStore::read( + &*kv_store_ref, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ), + output_sweeper_future, + read_event_queue(Arc::clone(&kv_store_ref), Arc::clone(&logger_ref)), + read_peer_info(Arc::clone(&kv_store_ref), Arc::clone(&logger_ref)), + ) + }); + + let local_scorer = match scorer_res { Ok(scorer) => scorer, Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1328,9 +1388,7 @@ fn build_with_store_internal( let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer))); // Restore external pathfinding scores from cache if possible. - match runtime.block_on(async { - read_external_pathfinding_scores_from_cache(&*kv_store, Arc::clone(&logger)).await - }) { + match external_scores_res { Ok(external_scores) => { scorer.lock().unwrap().merge(external_scores, cur_time); log_trace!(logger, "External scores from cache merged successfully"); @@ -1383,12 +1441,7 @@ fn build_with_store_internal( // Initialize the ChannelManager let channel_manager = { - if let Ok(reader) = KVStoreSync::read( - &*kv_store, - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - ) { + if let Ok(reader) = channel_manager_bytes_res { let channel_monitor_references = channel_monitors.iter().map(|(_, chanmon)| chanmon).collect(); let read_args = ChannelManagerReadArgs::new( @@ -1613,17 +1666,7 @@ fn build_with_store_internal( let connection_manager = Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger))); - let output_sweeper = match runtime.block_on(async { - read_output_sweeper( - Arc::clone(&tx_broadcaster), - Arc::clone(&fee_estimator), - Arc::clone(&chain_source), - Arc::clone(&keys_manager), - Arc::clone(&kv_store), - Arc::clone(&logger), - ) - .await - }) { + let output_sweeper = match sweeper_bytes_res { Ok(output_sweeper) => Arc::new(output_sweeper), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1644,9 +1687,7 @@ fn build_with_store_internal( }, }; - let event_queue = match runtime - .block_on(async { read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)).await }) - { + let event_queue = match event_queue_res { Ok(event_queue) => Arc::new(event_queue), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1658,9 +1699,7 @@ fn build_with_store_internal( }, }; - let peer_store = match runtime - .block_on(async { read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)).await }) - { + let peer_store = match peer_info_res { Ok(peer_store) => Arc::new(peer_store), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { diff --git a/src/gossip.rs b/src/gossip.rs index 2b524d9ae..f42b4602c 100644 --- a/src/gossip.rs +++ b/src/gossip.rs @@ -5,18 +5,16 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use std::future::Future; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use std::time::Duration; -use lightning::util::native_async::FutureSpawner; use lightning_block_sync::gossip::GossipVerifier; use crate::chain::ChainSource; use crate::config::RGS_SYNC_TIMEOUT_SECS; use crate::logger::{log_trace, LdkLogger, Logger}; -use crate::runtime::Runtime; +use crate::runtime::{Runtime, RuntimeSpawner}; use crate::types::{GossipSync, Graph, P2PGossipSync, RapidGossipSync}; use crate::Error; @@ -114,19 +112,3 @@ impl GossipSource { } } } - -pub(crate) struct RuntimeSpawner { - runtime: Arc, -} - -impl RuntimeSpawner { - pub(crate) fn new(runtime: Arc) -> Self { - Self { runtime } - } -} - -impl FutureSpawner for RuntimeSpawner { - fn spawn + Send + 'static>(&self, future: T) { - self.runtime.spawn_cancellable_background_task(future); - } -} diff --git a/src/runtime.rs b/src/runtime.rs index 1e9883ae4..f43cbb9f0 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -9,6 +9,8 @@ use std::future::Future; use std::sync::{Arc, Mutex}; use std::time::Duration; +use lightning::util::native_async::FutureSpawner; + use tokio::task::{JoinHandle, JoinSet}; use crate::config::{ @@ -219,3 +221,29 @@ enum RuntimeMode { Owned(tokio::runtime::Runtime), Handle(tokio::runtime::Handle), } + +pub(crate) struct RuntimeSpawner { + runtime: Arc, +} + +impl RuntimeSpawner { + pub(crate) fn new(runtime: Arc) -> Self { + Self { runtime } + } +} + +impl FutureSpawner for RuntimeSpawner { + type E = tokio::sync::oneshot::error::RecvError; + type SpawnedFutureResult = tokio::sync::oneshot::Receiver; + fn spawn + Send + 'static>( + &self, future: F, + ) -> Self::SpawnedFutureResult { + let (result, output) = tokio::sync::oneshot::channel(); + self.runtime.spawn_cancellable_background_task(async move { + // We don't care if the send works or not, if the receiver is dropped its not our + // problem. + let _ = result.send(future.await); + }); + output + } +} diff --git a/src/types.rs b/src/types.rs index 2b7d3829a..614efd90e 100644 --- a/src/types.rs +++ b/src/types.rs @@ -23,7 +23,9 @@ use lightning::routing::gossip; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{CombinedScorer, ProbabilisticScoringFeeParameters}; use lightning::sign::InMemorySigner; -use lightning::util::persist::{KVStore, KVStoreSync, MonitorUpdatingPersister}; +use lightning::util::persist::{ + KVStore, KVStoreSync, MonitorUpdatingPersister, MonitorUpdatingPersisterAsync, +}; use lightning::util::ser::{Readable, Writeable, Writer}; use lightning::util::sweep::OutputSweeper; use lightning_block_sync::gossip::GossipVerifier; @@ -35,10 +37,10 @@ use crate::chain::ChainSource; use crate::config::ChannelConfig; use crate::data_store::DataStore; use crate::fee_estimator::OnchainFeeEstimator; -use crate::gossip::RuntimeSpawner; use crate::logger::Logger; use crate::message_handler::NodeCustomMessageHandler; use crate::payment::PaymentDetails; +use crate::runtime::RuntimeSpawner; /// A supertrait that requires that a type implements both [`KVStore`] and [`KVStoreSync`] at the /// same time. @@ -185,6 +187,16 @@ impl DynStoreTrait for DynStoreWrapper } } +pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync< + Arc, + RuntimeSpawner, + Arc, + Arc, + Arc, + Arc, + Arc, +>; + pub type Persister = MonitorUpdatingPersister< Arc, Arc,