Implement linked-list LocalChain and update chain-src crates/examples
This commit changes the `LocalChain` implementation to have blocks stored as a linked-list. This allows the data-src thread to hold a shared ref to a single checkpoint and have access to the whole history of checkpoints without cloning or keeping a lock on `LocalChain`. The APIs of `bdk::Wallet`, `esplora` and `electrum` are also updated to reflect these changes. Note that the `esplora` crate is rewritten to anchor txs in the confirmation block (using the esplora API's tx status block_hash). This guarantees 100% consistency between anchor blocks and their transactions (instead of anchoring txs to the latest tip). `ExploraExt` now has separate methods for updating the `TxGraph` and `LocalChain`. A new method `TxGraph::missing_blocks` is introduced for finding "floating anchors" of a `TxGraph` update (given a chain). Additional changes: * `test_local_chain.rs` is refactored to make test cases easier to write. Additional tests are also added. * Examples are updated. * Fix `tempfile` dev dependency of `bdk_file_store` to work with MSRV Co-authored-by: LLFourn <lloyd.fourn@gmail.com>
This commit is contained in:
@@ -1,41 +1,55 @@
|
||||
use async_trait::async_trait;
|
||||
use bdk_chain::collections::btree_map;
|
||||
use bdk_chain::{
|
||||
bitcoin::{BlockHash, OutPoint, Script, Txid},
|
||||
collections::BTreeMap,
|
||||
keychain::LocalUpdate,
|
||||
BlockId, ConfirmationTimeAnchor,
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
local_chain::{self, CheckPoint},
|
||||
BlockId, ConfirmationTimeAnchor, TxGraph,
|
||||
};
|
||||
use esplora_client::{Error, OutputStatus, TxStatus};
|
||||
use esplora_client::{Error, TxStatus};
|
||||
use futures::{stream::FuturesOrdered, TryStreamExt};
|
||||
|
||||
use crate::map_confirmation_time_anchor;
|
||||
use crate::{anchor_from_status, ASSUME_FINAL_DEPTH};
|
||||
|
||||
/// Trait to extend [`esplora_client::AsyncClient`] functionality.
|
||||
/// Trait to extend the functionality of [`esplora_client::AsyncClient`].
|
||||
///
|
||||
/// This is the async version of [`EsploraExt`]. Refer to
|
||||
/// [crate-level documentation] for more.
|
||||
/// Refer to [crate-level documentation] for more.
|
||||
///
|
||||
/// [`EsploraExt`]: crate::EsploraExt
|
||||
/// [crate-level documentation]: crate
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
|
||||
pub trait EsploraAsyncExt {
|
||||
/// Scan the blockchain (via esplora) for the data specified and returns a
|
||||
/// [`LocalUpdate<K, ConfirmationTimeAnchor>`].
|
||||
/// Prepare an [`LocalChain`] update with blocks fetched from Esplora.
|
||||
///
|
||||
/// - `local_chain`: the most recent block hashes present locally
|
||||
/// - `keychain_spks`: keychains that we want to scan transactions for
|
||||
/// - `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
|
||||
/// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
|
||||
/// want to included in the update
|
||||
/// * `prev_tip` is the previous tip of [`LocalChain::tip`].
|
||||
/// * `get_heights` is the block heights that we are interested in fetching from Esplora.
|
||||
///
|
||||
/// The result of this method can be applied to [`LocalChain::apply_update`].
|
||||
///
|
||||
/// [`LocalChain`]: bdk_chain::local_chain::LocalChain
|
||||
/// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
|
||||
/// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update
|
||||
#[allow(clippy::result_large_err)]
|
||||
async fn update_local_chain(
|
||||
&self,
|
||||
local_tip: Option<CheckPoint>,
|
||||
request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
|
||||
) -> Result<local_chain::Update, Error>;
|
||||
|
||||
/// Scan Esplora for the data specified and return a [`TxGraph`] and a map of last active
|
||||
/// indices.
|
||||
///
|
||||
/// * `keychain_spks`: keychains that we want to scan transactions for
|
||||
/// * `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
|
||||
/// * `outpoints`: transactions associated with these outpoints (residing, spending) that we
|
||||
/// want to include in the update
|
||||
///
|
||||
/// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
|
||||
/// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
|
||||
/// parallel.
|
||||
#[allow(clippy::result_large_err)] // FIXME
|
||||
async fn scan<K: Ord + Clone + Send>(
|
||||
#[allow(clippy::result_large_err)]
|
||||
async fn update_tx_graph<K: Ord + Clone + Send>(
|
||||
&self,
|
||||
local_chain: &BTreeMap<u32, BlockHash>,
|
||||
keychain_spks: BTreeMap<
|
||||
K,
|
||||
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, Script)> + Send> + Send,
|
||||
@@ -44,22 +58,20 @@ pub trait EsploraAsyncExt {
|
||||
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
|
||||
stop_gap: usize,
|
||||
parallel_requests: usize,
|
||||
) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error>;
|
||||
) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error>;
|
||||
|
||||
/// Convenience method to call [`scan`] without requiring a keychain.
|
||||
/// Convenience method to call [`update_tx_graph`] without requiring a keychain.
|
||||
///
|
||||
/// [`scan`]: EsploraAsyncExt::scan
|
||||
#[allow(clippy::result_large_err)] // FIXME
|
||||
async fn scan_without_keychain(
|
||||
/// [`update_tx_graph`]: EsploraAsyncExt::update_tx_graph
|
||||
#[allow(clippy::result_large_err)]
|
||||
async fn update_tx_graph_without_keychain(
|
||||
&self,
|
||||
local_chain: &BTreeMap<u32, BlockHash>,
|
||||
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = Script> + Send> + Send,
|
||||
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
|
||||
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
|
||||
parallel_requests: usize,
|
||||
) -> Result<LocalUpdate<(), ConfirmationTimeAnchor>, Error> {
|
||||
self.scan(
|
||||
local_chain,
|
||||
) -> Result<TxGraph<ConfirmationTimeAnchor>, Error> {
|
||||
self.update_tx_graph(
|
||||
[(
|
||||
(),
|
||||
misc_spks
|
||||
@@ -74,16 +86,123 @@ pub trait EsploraAsyncExt {
|
||||
parallel_requests,
|
||||
)
|
||||
.await
|
||||
.map(|(g, _)| g)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
|
||||
impl EsploraAsyncExt for esplora_client::AsyncClient {
|
||||
#[allow(clippy::result_large_err)] // FIXME
|
||||
async fn scan<K: Ord + Clone + Send>(
|
||||
async fn update_local_chain(
|
||||
&self,
|
||||
local_tip: Option<CheckPoint>,
|
||||
request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
|
||||
) -> Result<local_chain::Update, Error> {
|
||||
let request_heights = request_heights.into_iter().collect::<BTreeSet<_>>();
|
||||
let new_tip_height = self.get_height().await?;
|
||||
|
||||
// atomically fetch blocks from esplora
|
||||
let mut fetched_blocks = {
|
||||
let heights = (0..=new_tip_height).rev();
|
||||
let hashes = self
|
||||
.get_blocks(Some(new_tip_height))
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|b| b.id);
|
||||
heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>()
|
||||
};
|
||||
|
||||
// fetch heights that the caller is interested in
|
||||
for height in request_heights {
|
||||
// do not fetch blocks higher than remote tip
|
||||
if height > new_tip_height {
|
||||
continue;
|
||||
}
|
||||
// only fetch what is missing
|
||||
if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
|
||||
let hash = self.get_block_hash(height).await?;
|
||||
entry.insert(hash);
|
||||
}
|
||||
}
|
||||
|
||||
// find the earliest point of agreement between local chain and fetched chain
|
||||
let earliest_agreement_cp = {
|
||||
let mut earliest_agreement_cp = Option::<CheckPoint>::None;
|
||||
|
||||
if let Some(local_tip) = local_tip {
|
||||
let local_tip_height = local_tip.height();
|
||||
for local_cp in local_tip.iter() {
|
||||
let local_block = local_cp.block_id();
|
||||
|
||||
// the updated hash (block hash at this height after the update), can either be:
|
||||
// 1. a block that already existed in `fetched_blocks`
|
||||
// 2. a block that exists locally and atleast has a depth of ASSUME_FINAL_DEPTH
|
||||
// 3. otherwise we can freshly fetch the block from remote, which is safe as it
|
||||
// is guaranteed that this would be at or below ASSUME_FINAL_DEPTH from the
|
||||
// remote tip
|
||||
let updated_hash = match fetched_blocks.entry(local_block.height) {
|
||||
btree_map::Entry::Occupied(entry) => *entry.get(),
|
||||
btree_map::Entry::Vacant(entry) => *entry.insert(
|
||||
if local_tip_height - local_block.height >= ASSUME_FINAL_DEPTH {
|
||||
local_block.hash
|
||||
} else {
|
||||
self.get_block_hash(local_block.height).await?
|
||||
},
|
||||
),
|
||||
};
|
||||
|
||||
// since we may introduce blocks below the point of agreement, we cannot break
|
||||
// here unconditionally - we only break if we guarantee there are no new heights
|
||||
// below our current local checkpoint
|
||||
if local_block.hash == updated_hash {
|
||||
earliest_agreement_cp = Some(local_cp);
|
||||
|
||||
let first_new_height = *fetched_blocks
|
||||
.keys()
|
||||
.next()
|
||||
.expect("must have atleast one new block");
|
||||
if first_new_height >= local_block.height {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
earliest_agreement_cp
|
||||
};
|
||||
|
||||
let tip = {
|
||||
// first checkpoint to use for the update chain
|
||||
let first_cp = match earliest_agreement_cp {
|
||||
Some(cp) => cp,
|
||||
None => {
|
||||
let (&height, &hash) = fetched_blocks
|
||||
.iter()
|
||||
.next()
|
||||
.expect("must have atleast one new block");
|
||||
CheckPoint::new(BlockId { height, hash })
|
||||
}
|
||||
};
|
||||
// transform fetched chain into the update chain
|
||||
fetched_blocks
|
||||
// we exclude anything at or below the first cp of the update chain otherwise
|
||||
// building the chain will fail
|
||||
.split_off(&(first_cp.height() + 1))
|
||||
.into_iter()
|
||||
.map(|(height, hash)| BlockId { height, hash })
|
||||
.fold(first_cp, |prev_cp, block| {
|
||||
prev_cp.push(block).expect("must extend checkpoint")
|
||||
})
|
||||
};
|
||||
|
||||
Ok(local_chain::Update {
|
||||
tip,
|
||||
introduce_older_blocks: true,
|
||||
})
|
||||
}
|
||||
|
||||
async fn update_tx_graph<K: Ord + Clone + Send>(
|
||||
&self,
|
||||
local_chain: &BTreeMap<u32, BlockHash>,
|
||||
keychain_spks: BTreeMap<
|
||||
K,
|
||||
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, Script)> + Send> + Send,
|
||||
@@ -92,178 +211,116 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
|
||||
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
|
||||
stop_gap: usize,
|
||||
parallel_requests: usize,
|
||||
) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error> {
|
||||
) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error> {
|
||||
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
|
||||
let parallel_requests = Ord::max(parallel_requests, 1);
|
||||
|
||||
let (mut update, tip_at_start) = loop {
|
||||
let mut update = LocalUpdate::<K, ConfirmationTimeAnchor>::default();
|
||||
|
||||
for (&height, &original_hash) in local_chain.iter().rev() {
|
||||
let update_block_id = BlockId {
|
||||
height,
|
||||
hash: self.get_block_hash(height).await?,
|
||||
};
|
||||
let _ = update
|
||||
.chain
|
||||
.insert_block(update_block_id)
|
||||
.expect("cannot repeat height here");
|
||||
if update_block_id.hash == original_hash {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let tip_at_start = BlockId {
|
||||
height: self.get_height().await?,
|
||||
hash: self.get_tip_hash().await?,
|
||||
};
|
||||
|
||||
if update.chain.insert_block(tip_at_start).is_ok() {
|
||||
break (update, tip_at_start);
|
||||
}
|
||||
};
|
||||
let mut graph = TxGraph::<ConfirmationTimeAnchor>::default();
|
||||
let mut last_active_indexes = BTreeMap::<K, u32>::new();
|
||||
|
||||
for (keychain, spks) in keychain_spks {
|
||||
let mut spks = spks.into_iter();
|
||||
let mut last_active_index = None;
|
||||
let mut empty_scripts = 0;
|
||||
type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
|
||||
let mut last_index = Option::<u32>::None;
|
||||
let mut last_active_index = Option::<u32>::None;
|
||||
|
||||
loop {
|
||||
let futures = (0..parallel_requests)
|
||||
.filter_map(|_| {
|
||||
let (index, script) = spks.next()?;
|
||||
let handles = spks
|
||||
.by_ref()
|
||||
.take(parallel_requests)
|
||||
.map(|(spk_index, spk)| {
|
||||
let client = self.clone();
|
||||
Some(async move {
|
||||
let mut related_txs = client.scripthash_txs(&script, None).await?;
|
||||
|
||||
let n_confirmed =
|
||||
related_txs.iter().filter(|tx| tx.status.confirmed).count();
|
||||
// esplora pages on 25 confirmed transactions. If there are 25 or more we
|
||||
// keep requesting to see if there's more.
|
||||
if n_confirmed >= 25 {
|
||||
loop {
|
||||
let new_related_txs = client
|
||||
.scripthash_txs(
|
||||
&script,
|
||||
Some(related_txs.last().unwrap().txid),
|
||||
)
|
||||
.await?;
|
||||
let n = new_related_txs.len();
|
||||
related_txs.extend(new_related_txs);
|
||||
// we've reached the end
|
||||
if n < 25 {
|
||||
break;
|
||||
}
|
||||
async move {
|
||||
let mut last_seen = None;
|
||||
let mut spk_txs = Vec::new();
|
||||
loop {
|
||||
let txs = client.scripthash_txs(&spk, last_seen).await?;
|
||||
let tx_count = txs.len();
|
||||
last_seen = txs.last().map(|tx| tx.txid);
|
||||
spk_txs.extend(txs);
|
||||
if tx_count < 25 {
|
||||
break Result::<_, Error>::Ok((spk_index, spk_txs));
|
||||
}
|
||||
}
|
||||
|
||||
Result::<_, esplora_client::Error>::Ok((index, related_txs))
|
||||
})
|
||||
}
|
||||
})
|
||||
.collect::<FuturesOrdered<_>>();
|
||||
|
||||
let n_futures = futures.len();
|
||||
if handles.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
for (index, related_txs) in futures.try_collect::<Vec<IndexWithTxs>>().await? {
|
||||
if related_txs.is_empty() {
|
||||
empty_scripts += 1;
|
||||
} else {
|
||||
for (index, txs) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
|
||||
last_index = Some(index);
|
||||
if !txs.is_empty() {
|
||||
last_active_index = Some(index);
|
||||
empty_scripts = 0;
|
||||
}
|
||||
for tx in related_txs {
|
||||
let anchor = map_confirmation_time_anchor(&tx.status, tip_at_start);
|
||||
|
||||
let _ = update.graph.insert_tx(tx.to_tx());
|
||||
if let Some(anchor) = anchor {
|
||||
let _ = update.graph.insert_anchor(tx.txid, anchor);
|
||||
for tx in txs {
|
||||
let _ = graph.insert_tx(tx.to_tx());
|
||||
if let Some(anchor) = anchor_from_status(&tx.status) {
|
||||
let _ = graph.insert_anchor(tx.txid, anchor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if n_futures == 0 || empty_scripts >= stop_gap {
|
||||
if last_index > last_active_index.map(|i| i + stop_gap as u32) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(last_active_index) = last_active_index {
|
||||
update.keychain.insert(keychain, last_active_index);
|
||||
last_active_indexes.insert(keychain, last_active_index);
|
||||
}
|
||||
}
|
||||
|
||||
for txid in txids.into_iter() {
|
||||
if update.graph.get_tx(txid).is_none() {
|
||||
match self.get_tx(&txid).await? {
|
||||
Some(tx) => {
|
||||
let _ = update.graph.insert_tx(tx);
|
||||
}
|
||||
None => continue,
|
||||
}
|
||||
let mut txids = txids.into_iter();
|
||||
loop {
|
||||
let handles = txids
|
||||
.by_ref()
|
||||
.take(parallel_requests)
|
||||
.filter(|&txid| graph.get_tx(txid).is_none())
|
||||
.map(|txid| {
|
||||
let client = self.clone();
|
||||
async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) }
|
||||
})
|
||||
.collect::<FuturesOrdered<_>>();
|
||||
// .collect::<Vec<JoinHandle<Result<(Txid, TxStatus), Error>>>>();
|
||||
|
||||
if handles.is_empty() {
|
||||
break;
|
||||
}
|
||||
match self.get_tx_status(&txid).await? {
|
||||
tx_status if tx_status.confirmed => {
|
||||
if let Some(anchor) = map_confirmation_time_anchor(&tx_status, tip_at_start) {
|
||||
let _ = update.graph.insert_anchor(txid, anchor);
|
||||
}
|
||||
|
||||
for (txid, status) in handles.try_collect::<Vec<(Txid, TxStatus)>>().await? {
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = graph.insert_anchor(txid, anchor);
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
|
||||
for op in outpoints.into_iter() {
|
||||
let mut op_txs = Vec::with_capacity(2);
|
||||
if let (
|
||||
Some(tx),
|
||||
tx_status @ TxStatus {
|
||||
confirmed: true, ..
|
||||
},
|
||||
) = (
|
||||
self.get_tx(&op.txid).await?,
|
||||
self.get_tx_status(&op.txid).await?,
|
||||
) {
|
||||
op_txs.push((tx, tx_status));
|
||||
if let Some(OutputStatus {
|
||||
txid: Some(txid),
|
||||
status: Some(spend_status),
|
||||
..
|
||||
}) = self.get_output_status(&op.txid, op.vout as _).await?
|
||||
{
|
||||
if let Some(spend_tx) = self.get_tx(&txid).await? {
|
||||
op_txs.push((spend_tx, spend_status));
|
||||
if graph.get_tx(op.txid).is_none() {
|
||||
if let Some(tx) = self.get_tx(&op.txid).await? {
|
||||
let _ = graph.insert_tx(tx);
|
||||
}
|
||||
let status = self.get_tx_status(&op.txid).await?;
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = graph.insert_anchor(op.txid, anchor);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _).await? {
|
||||
if let Some(txid) = op_status.txid {
|
||||
if graph.get_tx(txid).is_none() {
|
||||
if let Some(tx) = self.get_tx(&txid).await? {
|
||||
let _ = graph.insert_tx(tx);
|
||||
}
|
||||
let status = self.get_tx_status(&txid).await?;
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = graph.insert_anchor(txid, anchor);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (tx, status) in op_txs {
|
||||
let txid = tx.txid();
|
||||
let anchor = map_confirmation_time_anchor(&status, tip_at_start);
|
||||
|
||||
let _ = update.graph.insert_tx(tx);
|
||||
if let Some(anchor) = anchor {
|
||||
let _ = update.graph.insert_anchor(txid, anchor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if tip_at_start.hash != self.get_block_hash(tip_at_start.height).await? {
|
||||
// A reorg occurred, so let's find out where all the txids we found are now in the chain
|
||||
let txids_found = update
|
||||
.graph
|
||||
.full_txs()
|
||||
.map(|tx_node| tx_node.txid)
|
||||
.collect::<Vec<_>>();
|
||||
update.chain = EsploraAsyncExt::scan_without_keychain(
|
||||
self,
|
||||
local_chain,
|
||||
[],
|
||||
txids_found,
|
||||
[],
|
||||
parallel_requests,
|
||||
)
|
||||
.await?
|
||||
.chain;
|
||||
}
|
||||
|
||||
Ok(update)
|
||||
Ok((graph, last_active_indexes))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,54 +1,73 @@
|
||||
use bdk_chain::bitcoin::{BlockHash, OutPoint, Script, Txid};
|
||||
use bdk_chain::collections::BTreeMap;
|
||||
use bdk_chain::BlockId;
|
||||
use bdk_chain::{keychain::LocalUpdate, ConfirmationTimeAnchor};
|
||||
use esplora_client::{Error, OutputStatus, TxStatus};
|
||||
use std::thread::JoinHandle;
|
||||
|
||||
use crate::map_confirmation_time_anchor;
|
||||
use bdk_chain::bitcoin::{OutPoint, Txid};
|
||||
use bdk_chain::collections::btree_map;
|
||||
use bdk_chain::collections::{BTreeMap, BTreeSet};
|
||||
use bdk_chain::{
|
||||
bitcoin::{BlockHash, Script},
|
||||
local_chain::{self, CheckPoint},
|
||||
};
|
||||
use bdk_chain::{BlockId, ConfirmationTimeAnchor, TxGraph};
|
||||
use esplora_client::{Error, TxStatus};
|
||||
|
||||
/// Trait to extend [`esplora_client::BlockingClient`] functionality.
|
||||
use crate::{anchor_from_status, ASSUME_FINAL_DEPTH};
|
||||
|
||||
/// Trait to extend the functionality of [`esplora_client::BlockingClient`].
|
||||
///
|
||||
/// Refer to [crate-level documentation] for more.
|
||||
///
|
||||
/// [crate-level documentation]: crate
|
||||
pub trait EsploraExt {
|
||||
/// Scan the blockchain (via esplora) for the data specified and returns a
|
||||
/// [`LocalUpdate<K, ConfirmationTimeAnchor>`].
|
||||
/// Prepare an [`LocalChain`] update with blocks fetched from Esplora.
|
||||
///
|
||||
/// - `local_chain`: the most recent block hashes present locally
|
||||
/// - `keychain_spks`: keychains that we want to scan transactions for
|
||||
/// - `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
|
||||
/// - `outpoints`: transactions associated with these outpoints (residing, spending) that we
|
||||
/// want to included in the update
|
||||
/// * `prev_tip` is the previous tip of [`LocalChain::tip`].
|
||||
/// * `get_heights` is the block heights that we are interested in fetching from Esplora.
|
||||
///
|
||||
/// The result of this method can be applied to [`LocalChain::apply_update`].
|
||||
///
|
||||
/// [`LocalChain`]: bdk_chain::local_chain::LocalChain
|
||||
/// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
|
||||
/// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update
|
||||
#[allow(clippy::result_large_err)]
|
||||
fn update_local_chain(
|
||||
&self,
|
||||
local_tip: Option<CheckPoint>,
|
||||
request_heights: impl IntoIterator<Item = u32>,
|
||||
) -> Result<local_chain::Update, Error>;
|
||||
|
||||
/// Scan Esplora for the data specified and return a [`TxGraph`] and a map of last active
|
||||
/// indices.
|
||||
///
|
||||
/// * `keychain_spks`: keychains that we want to scan transactions for
|
||||
/// * `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
|
||||
/// * `outpoints`: transactions associated with these outpoints (residing, spending) that we
|
||||
/// want to include in the update
|
||||
///
|
||||
/// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
|
||||
/// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
|
||||
/// parallel.
|
||||
#[allow(clippy::result_large_err)] // FIXME
|
||||
fn scan<K: Ord + Clone>(
|
||||
#[allow(clippy::result_large_err)]
|
||||
fn update_tx_graph<K: Ord + Clone>(
|
||||
&self,
|
||||
local_chain: &BTreeMap<u32, BlockHash>,
|
||||
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
|
||||
txids: impl IntoIterator<Item = Txid>,
|
||||
outpoints: impl IntoIterator<Item = OutPoint>,
|
||||
stop_gap: usize,
|
||||
parallel_requests: usize,
|
||||
) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error>;
|
||||
) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error>;
|
||||
|
||||
/// Convenience method to call [`scan`] without requiring a keychain.
|
||||
/// Convenience method to call [`update_tx_graph`] without requiring a keychain.
|
||||
///
|
||||
/// [`scan`]: EsploraExt::scan
|
||||
#[allow(clippy::result_large_err)] // FIXME
|
||||
fn scan_without_keychain(
|
||||
/// [`update_tx_graph`]: EsploraExt::update_tx_graph
|
||||
#[allow(clippy::result_large_err)]
|
||||
fn update_tx_graph_without_keychain(
|
||||
&self,
|
||||
local_chain: &BTreeMap<u32, BlockHash>,
|
||||
misc_spks: impl IntoIterator<Item = Script>,
|
||||
txids: impl IntoIterator<Item = Txid>,
|
||||
outpoints: impl IntoIterator<Item = OutPoint>,
|
||||
parallel_requests: usize,
|
||||
) -> Result<LocalUpdate<(), ConfirmationTimeAnchor>, Error> {
|
||||
self.scan(
|
||||
local_chain,
|
||||
) -> Result<TxGraph<ConfirmationTimeAnchor>, Error> {
|
||||
self.update_tx_graph(
|
||||
[(
|
||||
(),
|
||||
misc_spks
|
||||
@@ -62,190 +81,240 @@ pub trait EsploraExt {
|
||||
usize::MAX,
|
||||
parallel_requests,
|
||||
)
|
||||
.map(|(g, _)| g)
|
||||
}
|
||||
}
|
||||
|
||||
impl EsploraExt for esplora_client::BlockingClient {
|
||||
fn scan<K: Ord + Clone>(
|
||||
fn update_local_chain(
|
||||
&self,
|
||||
local_tip: Option<CheckPoint>,
|
||||
request_heights: impl IntoIterator<Item = u32>,
|
||||
) -> Result<local_chain::Update, Error> {
|
||||
let request_heights = request_heights.into_iter().collect::<BTreeSet<_>>();
|
||||
let new_tip_height = self.get_height()?;
|
||||
|
||||
// atomically fetch blocks from esplora
|
||||
let mut fetched_blocks = {
|
||||
let heights = (0..=new_tip_height).rev();
|
||||
let hashes = self
|
||||
.get_blocks(Some(new_tip_height))?
|
||||
.into_iter()
|
||||
.map(|b| b.id);
|
||||
heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>()
|
||||
};
|
||||
|
||||
// fetch heights that the caller is interested in
|
||||
for height in request_heights {
|
||||
// do not fetch blocks higher than remote tip
|
||||
if height > new_tip_height {
|
||||
continue;
|
||||
}
|
||||
// only fetch what is missing
|
||||
if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
|
||||
let hash = self.get_block_hash(height)?;
|
||||
entry.insert(hash);
|
||||
}
|
||||
}
|
||||
|
||||
// find the earliest point of agreement between local chain and fetched chain
|
||||
let earliest_agreement_cp = {
|
||||
let mut earliest_agreement_cp = Option::<CheckPoint>::None;
|
||||
|
||||
if let Some(local_tip) = local_tip {
|
||||
let local_tip_height = local_tip.height();
|
||||
for local_cp in local_tip.iter() {
|
||||
let local_block = local_cp.block_id();
|
||||
|
||||
// the updated hash (block hash at this height after the update), can either be:
|
||||
// 1. a block that already existed in `fetched_blocks`
|
||||
// 2. a block that exists locally and atleast has a depth of ASSUME_FINAL_DEPTH
|
||||
// 3. otherwise we can freshly fetch the block from remote, which is safe as it
|
||||
// is guaranteed that this would be at or below ASSUME_FINAL_DEPTH from the
|
||||
// remote tip
|
||||
let updated_hash = match fetched_blocks.entry(local_block.height) {
|
||||
btree_map::Entry::Occupied(entry) => *entry.get(),
|
||||
btree_map::Entry::Vacant(entry) => *entry.insert(
|
||||
if local_tip_height - local_block.height >= ASSUME_FINAL_DEPTH {
|
||||
local_block.hash
|
||||
} else {
|
||||
self.get_block_hash(local_block.height)?
|
||||
},
|
||||
),
|
||||
};
|
||||
|
||||
// since we may introduce blocks below the point of agreement, we cannot break
|
||||
// here unconditionally - we only break if we guarantee there are no new heights
|
||||
// below our current local checkpoint
|
||||
if local_block.hash == updated_hash {
|
||||
earliest_agreement_cp = Some(local_cp);
|
||||
|
||||
let first_new_height = *fetched_blocks
|
||||
.keys()
|
||||
.next()
|
||||
.expect("must have atleast one new block");
|
||||
if first_new_height >= local_block.height {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
earliest_agreement_cp
|
||||
};
|
||||
|
||||
let tip = {
|
||||
// first checkpoint to use for the update chain
|
||||
let first_cp = match earliest_agreement_cp {
|
||||
Some(cp) => cp,
|
||||
None => {
|
||||
let (&height, &hash) = fetched_blocks
|
||||
.iter()
|
||||
.next()
|
||||
.expect("must have atleast one new block");
|
||||
CheckPoint::new(BlockId { height, hash })
|
||||
}
|
||||
};
|
||||
// transform fetched chain into the update chain
|
||||
fetched_blocks
|
||||
// we exclude anything at or below the first cp of the update chain otherwise
|
||||
// building the chain will fail
|
||||
.split_off(&(first_cp.height() + 1))
|
||||
.into_iter()
|
||||
.map(|(height, hash)| BlockId { height, hash })
|
||||
.fold(first_cp, |prev_cp, block| {
|
||||
prev_cp.push(block).expect("must extend checkpoint")
|
||||
})
|
||||
};
|
||||
|
||||
Ok(local_chain::Update {
|
||||
tip,
|
||||
introduce_older_blocks: true,
|
||||
})
|
||||
}
|
||||
|
||||
fn update_tx_graph<K: Ord + Clone>(
|
||||
&self,
|
||||
local_chain: &BTreeMap<u32, BlockHash>,
|
||||
keychain_spks: BTreeMap<K, impl IntoIterator<Item = (u32, Script)>>,
|
||||
txids: impl IntoIterator<Item = Txid>,
|
||||
outpoints: impl IntoIterator<Item = OutPoint>,
|
||||
stop_gap: usize,
|
||||
parallel_requests: usize,
|
||||
) -> Result<LocalUpdate<K, ConfirmationTimeAnchor>, Error> {
|
||||
) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error> {
|
||||
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
|
||||
let parallel_requests = Ord::max(parallel_requests, 1);
|
||||
|
||||
let (mut update, tip_at_start) = loop {
|
||||
let mut update = LocalUpdate::<K, ConfirmationTimeAnchor>::default();
|
||||
|
||||
for (&height, &original_hash) in local_chain.iter().rev() {
|
||||
let update_block_id = BlockId {
|
||||
height,
|
||||
hash: self.get_block_hash(height)?,
|
||||
};
|
||||
let _ = update
|
||||
.chain
|
||||
.insert_block(update_block_id)
|
||||
.expect("cannot repeat height here");
|
||||
if update_block_id.hash == original_hash {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let tip_at_start = BlockId {
|
||||
height: self.get_height()?,
|
||||
hash: self.get_tip_hash()?,
|
||||
};
|
||||
|
||||
if update.chain.insert_block(tip_at_start).is_ok() {
|
||||
break (update, tip_at_start);
|
||||
}
|
||||
};
|
||||
let mut graph = TxGraph::<ConfirmationTimeAnchor>::default();
|
||||
let mut last_active_indexes = BTreeMap::<K, u32>::new();
|
||||
|
||||
for (keychain, spks) in keychain_spks {
|
||||
let mut spks = spks.into_iter();
|
||||
let mut last_active_index = None;
|
||||
let mut empty_scripts = 0;
|
||||
type IndexWithTxs = (u32, Vec<esplora_client::Tx>);
|
||||
let mut last_index = Option::<u32>::None;
|
||||
let mut last_active_index = Option::<u32>::None;
|
||||
|
||||
loop {
|
||||
let handles = (0..parallel_requests)
|
||||
.filter_map(
|
||||
|_| -> Option<std::thread::JoinHandle<Result<IndexWithTxs, _>>> {
|
||||
let (index, script) = spks.next()?;
|
||||
let handles = spks
|
||||
.by_ref()
|
||||
.take(parallel_requests)
|
||||
.map(|(spk_index, spk)| {
|
||||
std::thread::spawn({
|
||||
let client = self.clone();
|
||||
Some(std::thread::spawn(move || {
|
||||
let mut related_txs = client.scripthash_txs(&script, None)?;
|
||||
|
||||
let n_confirmed =
|
||||
related_txs.iter().filter(|tx| tx.status.confirmed).count();
|
||||
// esplora pages on 25 confirmed transactions. If there are 25 or more we
|
||||
// keep requesting to see if there's more.
|
||||
if n_confirmed >= 25 {
|
||||
loop {
|
||||
let new_related_txs = client.scripthash_txs(
|
||||
&script,
|
||||
Some(related_txs.last().unwrap().txid),
|
||||
)?;
|
||||
let n = new_related_txs.len();
|
||||
related_txs.extend(new_related_txs);
|
||||
// we've reached the end
|
||||
if n < 25 {
|
||||
break;
|
||||
}
|
||||
move || -> Result<TxsOfSpkIndex, Error> {
|
||||
let mut last_seen = None;
|
||||
let mut spk_txs = Vec::new();
|
||||
loop {
|
||||
let txs = client.scripthash_txs(&spk, last_seen)?;
|
||||
let tx_count = txs.len();
|
||||
last_seen = txs.last().map(|tx| tx.txid);
|
||||
spk_txs.extend(txs);
|
||||
if tx_count < 25 {
|
||||
break Ok((spk_index, spk_txs));
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect::<Vec<JoinHandle<Result<TxsOfSpkIndex, Error>>>>();
|
||||
|
||||
Result::<_, esplora_client::Error>::Ok((index, related_txs))
|
||||
}))
|
||||
},
|
||||
)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let n_handles = handles.len();
|
||||
if handles.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
for handle in handles {
|
||||
let (index, related_txs) = handle.join().unwrap()?; // TODO: don't unwrap
|
||||
if related_txs.is_empty() {
|
||||
empty_scripts += 1;
|
||||
} else {
|
||||
let (index, txs) = handle.join().expect("thread must not panic")?;
|
||||
last_index = Some(index);
|
||||
if !txs.is_empty() {
|
||||
last_active_index = Some(index);
|
||||
empty_scripts = 0;
|
||||
}
|
||||
for tx in related_txs {
|
||||
let anchor = map_confirmation_time_anchor(&tx.status, tip_at_start);
|
||||
|
||||
let _ = update.graph.insert_tx(tx.to_tx());
|
||||
if let Some(anchor) = anchor {
|
||||
let _ = update.graph.insert_anchor(tx.txid, anchor);
|
||||
for tx in txs {
|
||||
let _ = graph.insert_tx(tx.to_tx());
|
||||
if let Some(anchor) = anchor_from_status(&tx.status) {
|
||||
let _ = graph.insert_anchor(tx.txid, anchor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if n_handles == 0 || empty_scripts >= stop_gap {
|
||||
if last_index > last_active_index.map(|i| i + stop_gap as u32) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(last_active_index) = last_active_index {
|
||||
update.keychain.insert(keychain, last_active_index);
|
||||
last_active_indexes.insert(keychain, last_active_index);
|
||||
}
|
||||
}
|
||||
|
||||
for txid in txids.into_iter() {
|
||||
if update.graph.get_tx(txid).is_none() {
|
||||
match self.get_tx(&txid)? {
|
||||
Some(tx) => {
|
||||
let _ = update.graph.insert_tx(tx);
|
||||
}
|
||||
None => continue,
|
||||
}
|
||||
let mut txids = txids.into_iter();
|
||||
loop {
|
||||
let handles = txids
|
||||
.by_ref()
|
||||
.take(parallel_requests)
|
||||
.filter(|&txid| graph.get_tx(txid).is_none())
|
||||
.map(|txid| {
|
||||
std::thread::spawn({
|
||||
let client = self.clone();
|
||||
move || client.get_tx_status(&txid).map(|s| (txid, s))
|
||||
})
|
||||
})
|
||||
.collect::<Vec<JoinHandle<Result<(Txid, TxStatus), Error>>>>();
|
||||
|
||||
if handles.is_empty() {
|
||||
break;
|
||||
}
|
||||
match self.get_tx_status(&txid)? {
|
||||
tx_status @ TxStatus {
|
||||
confirmed: true, ..
|
||||
} => {
|
||||
if let Some(anchor) = map_confirmation_time_anchor(&tx_status, tip_at_start) {
|
||||
let _ = update.graph.insert_anchor(txid, anchor);
|
||||
}
|
||||
|
||||
for handle in handles {
|
||||
let (txid, status) = handle.join().expect("thread must not panic")?;
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = graph.insert_anchor(txid, anchor);
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
|
||||
for op in outpoints.into_iter() {
|
||||
let mut op_txs = Vec::with_capacity(2);
|
||||
if let (
|
||||
Some(tx),
|
||||
tx_status @ TxStatus {
|
||||
confirmed: true, ..
|
||||
},
|
||||
) = (self.get_tx(&op.txid)?, self.get_tx_status(&op.txid)?)
|
||||
{
|
||||
op_txs.push((tx, tx_status));
|
||||
if let Some(OutputStatus {
|
||||
txid: Some(txid),
|
||||
status: Some(spend_status),
|
||||
..
|
||||
}) = self.get_output_status(&op.txid, op.vout as _)?
|
||||
{
|
||||
if let Some(spend_tx) = self.get_tx(&txid)? {
|
||||
op_txs.push((spend_tx, spend_status));
|
||||
if graph.get_tx(op.txid).is_none() {
|
||||
if let Some(tx) = self.get_tx(&op.txid)? {
|
||||
let _ = graph.insert_tx(tx);
|
||||
}
|
||||
let status = self.get_tx_status(&op.txid)?;
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = graph.insert_anchor(op.txid, anchor);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _)? {
|
||||
if let Some(txid) = op_status.txid {
|
||||
if graph.get_tx(txid).is_none() {
|
||||
if let Some(tx) = self.get_tx(&txid)? {
|
||||
let _ = graph.insert_tx(tx);
|
||||
}
|
||||
let status = self.get_tx_status(&txid)?;
|
||||
if let Some(anchor) = anchor_from_status(&status) {
|
||||
let _ = graph.insert_anchor(txid, anchor);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (tx, status) in op_txs {
|
||||
let txid = tx.txid();
|
||||
let anchor = map_confirmation_time_anchor(&status, tip_at_start);
|
||||
|
||||
let _ = update.graph.insert_tx(tx);
|
||||
if let Some(anchor) = anchor {
|
||||
let _ = update.graph.insert_anchor(txid, anchor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if tip_at_start.hash != self.get_block_hash(tip_at_start.height)? {
|
||||
// A reorg occurred, so let's find out where all the txids we found are now in the chain
|
||||
let txids_found = update
|
||||
.graph
|
||||
.full_txs()
|
||||
.map(|tx_node| tx_node.txid)
|
||||
.collect::<Vec<_>>();
|
||||
update.chain = EsploraExt::scan_without_keychain(
|
||||
self,
|
||||
local_chain,
|
||||
[],
|
||||
txids_found,
|
||||
[],
|
||||
parallel_requests,
|
||||
)?
|
||||
.chain;
|
||||
}
|
||||
|
||||
Ok(update)
|
||||
Ok((graph, last_active_indexes))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,16 +14,22 @@ mod async_ext;
|
||||
#[cfg(feature = "async")]
|
||||
pub use async_ext::*;
|
||||
|
||||
pub(crate) fn map_confirmation_time_anchor(
|
||||
tx_status: &TxStatus,
|
||||
tip_at_start: BlockId,
|
||||
) -> Option<ConfirmationTimeAnchor> {
|
||||
match (tx_status.block_time, tx_status.block_height) {
|
||||
(Some(confirmation_time), Some(confirmation_height)) => Some(ConfirmationTimeAnchor {
|
||||
anchor_block: tip_at_start,
|
||||
confirmation_height,
|
||||
confirmation_time,
|
||||
}),
|
||||
_ => None,
|
||||
const ASSUME_FINAL_DEPTH: u32 = 15;
|
||||
|
||||
fn anchor_from_status(status: &TxStatus) -> Option<ConfirmationTimeAnchor> {
|
||||
if let TxStatus {
|
||||
block_height: Some(height),
|
||||
block_hash: Some(hash),
|
||||
block_time: Some(time),
|
||||
..
|
||||
} = status.clone()
|
||||
{
|
||||
Some(ConfirmationTimeAnchor {
|
||||
anchor_block: BlockId { height, hash },
|
||||
confirmation_height: height,
|
||||
confirmation_time: time,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user