feat!: Rework sqlite, changesets, persistence and wallet-construction

Rework sqlite: Instead of only supported one schema (defined in
`bdk_sqlite`), we have a schema per changeset type for more flexiblity.

* rm `bdk_sqlite` crate (as we don't need `bdk_sqlite::Store` anymore).
* add `sqlite` feature on `bdk_chain` which adds methods on each
  changeset type for initializing tables, loading the changeset and
  writing.

Rework changesets: Some callers may want to use `KeychainTxOutIndex`
where `K` may change per descriptor on every run. So we only want to
persist the last revealed indices by `DescriptorId` (which uniquely-ish
identifies the descriptor).

* rm `keychain_added` field from `keychain_txout`'s changeset.
* Add `keychain_added` to `CombinedChangeSet` (which is renamed to
  `WalletChangeSet`).

Rework persistence: add back some safety and convenience when persisting
our types. Working with changeset directly (as we were doing before) can
be cumbersome.

* Intoduce `struct Persisted<T>` which wraps a type `T` which stores
  staged changes to it. This adds safety when creating and or loading
  `T` from db.
* `struct Persisted<T>` methods, `create`, `load` and `persist`, are
  avaliable if `trait PersistWith<Db>` is implemented for `T`. `Db`
  represents the database connection and `PersistWith` should be
  implemented per database-type.
* For async, we have `trait PersistedAsyncWith<Db>`.
* `Wallet` has impls of `PersistedWith<rusqlite::Connection>`,
  `PersistedWith<rusqlite::Transaction>` and
  `PersistedWith<bdk_file_store::Store>` by default.

Rework wallet-construction: Before, we had multiple methods for loading
and creating with different input-counts so it would be unwieldly to add
more parameters in the future. This also makes it difficult to impl
`PersistWith` (which has a single method for `load` that takes in
`PersistWith::LoadParams` and a single method for `create` that takes in
`PersistWith::CreateParams`).

* Introduce a builder pattern when constructing a `Wallet`. For loading
  from persistence or `ChangeSet`, we have `LoadParams`. For creating a
  new wallet, we have `CreateParams`.
This commit is contained in:
志宇
2024-07-11 04:49:01 +00:00
parent d99b3ef4b4
commit 6b43001951
49 changed files with 2217 additions and 2058 deletions

View File

@@ -1,92 +1,206 @@
use crate::{ConfirmationBlockTime, Merge};
type IndexedTxGraphChangeSet =
crate::indexed_tx_graph::ChangeSet<ConfirmationBlockTime, crate::keychain_txout::ChangeSet>;
/// A changeset containing [`crate`] structures typically persisted together.
#[cfg(feature = "miniscript")]
#[derive(Debug, Clone, PartialEq)]
#[derive(Default, Debug, Clone, PartialEq)]
#[cfg_attr(
feature = "serde",
derive(crate::serde::Deserialize, crate::serde::Serialize),
serde(
crate = "crate::serde",
bound(
deserialize = "A: Ord + crate::serde::Deserialize<'de>, K: Ord + crate::serde::Deserialize<'de>",
serialize = "A: Ord + crate::serde::Serialize, K: Ord + crate::serde::Serialize",
),
)
serde(crate = "crate::serde")
)]
pub struct CombinedChangeSet<K, A> {
/// Changes to the [`LocalChain`](crate::local_chain::LocalChain).
pub chain: crate::local_chain::ChangeSet,
/// Changes to [`IndexedTxGraph`](crate::indexed_tx_graph::IndexedTxGraph).
pub indexed_tx_graph:
crate::indexed_tx_graph::ChangeSet<A, crate::indexer::keychain_txout::ChangeSet<K>>,
pub struct WalletChangeSet {
/// Descriptor for recipient addresses.
pub descriptor: Option<miniscript::Descriptor<miniscript::DescriptorPublicKey>>,
/// Descriptor for change addresses.
pub change_descriptor: Option<miniscript::Descriptor<miniscript::DescriptorPublicKey>>,
/// Stores the network type of the transaction data.
pub network: Option<bitcoin::Network>,
/// Changes to the [`LocalChain`](crate::local_chain::LocalChain).
pub local_chain: crate::local_chain::ChangeSet,
/// Changes to [`TxGraph`](crate::tx_graph::TxGraph).
pub tx_graph: crate::tx_graph::ChangeSet<crate::ConfirmationBlockTime>,
/// Changes to [`KeychainTxOutIndex`](crate::keychain_txout::KeychainTxOutIndex).
pub indexer: crate::keychain_txout::ChangeSet,
}
#[cfg(feature = "miniscript")]
impl<K, A> core::default::Default for CombinedChangeSet<K, A> {
fn default() -> Self {
Self {
chain: core::default::Default::default(),
indexed_tx_graph: core::default::Default::default(),
network: None,
}
}
}
#[cfg(feature = "miniscript")]
impl<K: Ord, A: crate::Anchor> crate::Merge for CombinedChangeSet<K, A> {
impl Merge for WalletChangeSet {
/// Merge another [`WalletChangeSet`] into itself.
///
/// The `keychains_added` field respects the invariants of... TODO: FINISH THIS!
fn merge(&mut self, other: Self) {
crate::Merge::merge(&mut self.chain, other.chain);
crate::Merge::merge(&mut self.indexed_tx_graph, other.indexed_tx_graph);
if other.descriptor.is_some() {
debug_assert!(
self.descriptor.is_none() || self.descriptor == other.descriptor,
"descriptor must never change"
);
self.descriptor = other.descriptor;
}
if other.change_descriptor.is_some() {
debug_assert!(
self.change_descriptor.is_none()
|| self.change_descriptor == other.change_descriptor,
"change descriptor must never change"
);
}
if other.network.is_some() {
debug_assert!(
self.network.is_none() || self.network == other.network,
"network type must either be just introduced or remain the same"
"network must never change"
);
self.network = other.network;
}
crate::Merge::merge(&mut self.local_chain, other.local_chain);
crate::Merge::merge(&mut self.tx_graph, other.tx_graph);
crate::Merge::merge(&mut self.indexer, other.indexer);
}
fn is_empty(&self) -> bool {
self.chain.is_empty() && self.indexed_tx_graph.is_empty() && self.network.is_none()
self.descriptor.is_none()
&& self.change_descriptor.is_none()
&& self.network.is_none()
&& self.local_chain.is_empty()
&& self.tx_graph.is_empty()
&& self.indexer.is_empty()
}
}
#[cfg(feature = "miniscript")]
impl<K, A> From<crate::local_chain::ChangeSet> for CombinedChangeSet<K, A> {
#[cfg(feature = "sqlite")]
impl WalletChangeSet {
/// Schema name for wallet.
pub const WALLET_SCHEMA_NAME: &'static str = "bdk_wallet";
/// Name of table to store wallet descriptors and network.
pub const WALLET_TABLE_NAME: &'static str = "bdk_wallet";
/// Initialize sqlite tables for wallet schema & table.
fn init_wallet_sqlite_tables(db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> {
let schema_v0: &[&str] = &[&format!(
"CREATE TABLE {} ( \
id INTEGER PRIMARY KEY NOT NULL CHECK (id = 0), \
descriptor TEXT, \
change_descriptor TEXT, \
network TEXT \
) STRICT;",
Self::WALLET_TABLE_NAME,
)];
crate::sqlite::migrate_schema(db_tx, Self::WALLET_SCHEMA_NAME, &[schema_v0])
}
/// Recover a [`WalletChangeSet`] from sqlite database.
pub fn from_sqlite(db_tx: &rusqlite::Transaction) -> rusqlite::Result<Self> {
Self::init_wallet_sqlite_tables(db_tx)?;
use crate::sqlite::Sql;
use miniscript::{Descriptor, DescriptorPublicKey};
use rusqlite::OptionalExtension;
let mut changeset = Self::default();
let mut wallet_statement = db_tx.prepare(&format!(
"SELECT descriptor, change_descriptor, network FROM {}",
Self::WALLET_TABLE_NAME,
))?;
let row = wallet_statement
.query_row([], |row| {
Ok((
row.get::<_, Sql<Descriptor<DescriptorPublicKey>>>("descriptor")?,
row.get::<_, Sql<Descriptor<DescriptorPublicKey>>>("change_descriptor")?,
row.get::<_, Sql<bitcoin::Network>>("network")?,
))
})
.optional()?;
if let Some((Sql(desc), Sql(change_desc), Sql(network))) = row {
changeset.descriptor = Some(desc);
changeset.change_descriptor = Some(change_desc);
changeset.network = Some(network);
}
changeset.local_chain = crate::local_chain::ChangeSet::from_sqlite(db_tx)?;
changeset.tx_graph = crate::tx_graph::ChangeSet::<_>::from_sqlite(db_tx)?;
changeset.indexer = crate::indexer::keychain_txout::ChangeSet::from_sqlite(db_tx)?;
Ok(changeset)
}
/// Persist [`WalletChangeSet`] to sqlite database.
pub fn persist_to_sqlite(&self, db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> {
Self::init_wallet_sqlite_tables(db_tx)?;
use crate::sqlite::Sql;
use rusqlite::named_params;
let mut descriptor_statement = db_tx.prepare_cached(&format!(
"INSERT INTO {}(id, descriptor) VALUES(:id, :descriptor) ON CONFLICT(id) DO UPDATE SET descriptor=:descriptor",
Self::WALLET_TABLE_NAME,
))?;
if let Some(descriptor) = &self.descriptor {
descriptor_statement.execute(named_params! {
":id": 0,
":descriptor": Sql(descriptor.clone()),
})?;
}
let mut change_descriptor_statement = db_tx.prepare_cached(&format!(
"INSERT INTO {}(id, change_descriptor) VALUES(:id, :change_descriptor) ON CONFLICT(id) DO UPDATE SET change_descriptor=:change_descriptor",
Self::WALLET_TABLE_NAME,
))?;
if let Some(change_descriptor) = &self.change_descriptor {
change_descriptor_statement.execute(named_params! {
":id": 0,
":change_descriptor": Sql(change_descriptor.clone()),
})?;
}
let mut network_statement = db_tx.prepare_cached(&format!(
"INSERT INTO {}(id, network) VALUES(:id, :network) ON CONFLICT(id) DO UPDATE SET network=:network",
Self::WALLET_TABLE_NAME,
))?;
if let Some(network) = self.network {
network_statement.execute(named_params! {
":id": 0,
":network": Sql(network),
})?;
}
self.local_chain.persist_to_sqlite(db_tx)?;
self.tx_graph.persist_to_sqlite(db_tx)?;
self.indexer.persist_to_sqlite(db_tx)?;
Ok(())
}
}
impl From<crate::local_chain::ChangeSet> for WalletChangeSet {
fn from(chain: crate::local_chain::ChangeSet) -> Self {
Self {
chain,
local_chain: chain,
..Default::default()
}
}
}
#[cfg(feature = "miniscript")]
impl<K, A> From<crate::indexed_tx_graph::ChangeSet<A, crate::indexer::keychain_txout::ChangeSet<K>>>
for CombinedChangeSet<K, A>
{
fn from(
indexed_tx_graph: crate::indexed_tx_graph::ChangeSet<
A,
crate::indexer::keychain_txout::ChangeSet<K>,
>,
) -> Self {
impl From<IndexedTxGraphChangeSet> for WalletChangeSet {
fn from(indexed_tx_graph: IndexedTxGraphChangeSet) -> Self {
Self {
indexed_tx_graph,
tx_graph: indexed_tx_graph.tx_graph,
indexer: indexed_tx_graph.indexer,
..Default::default()
}
}
}
#[cfg(feature = "miniscript")]
impl<K, A> From<crate::indexer::keychain_txout::ChangeSet<K>> for CombinedChangeSet<K, A> {
fn from(indexer: crate::indexer::keychain_txout::ChangeSet<K>) -> Self {
impl From<crate::tx_graph::ChangeSet<ConfirmationBlockTime>> for WalletChangeSet {
fn from(tx_graph: crate::tx_graph::ChangeSet<ConfirmationBlockTime>) -> Self {
Self {
indexed_tx_graph: crate::indexed_tx_graph::ChangeSet {
indexer,
..Default::default()
},
tx_graph,
..Default::default()
}
}
}
impl From<crate::keychain_txout::ChangeSet> for WalletChangeSet {
fn from(indexer: crate::keychain_txout::ChangeSet) -> Self {
Self {
indexer,
..Default::default()
}
}

View File

@@ -1,5 +1,7 @@
//! Contains the [`IndexedTxGraph`] and associated types. Refer to the
//! [`IndexedTxGraph`] documentation for more.
use core::fmt::Debug;
use alloc::vec::Vec;
use bitcoin::{Block, OutPoint, Transaction, TxOut, Txid};
@@ -47,21 +49,24 @@ impl<A: Anchor, I: Indexer> IndexedTxGraph<A, I> {
pub fn apply_changeset(&mut self, changeset: ChangeSet<A, I::ChangeSet>) {
self.index.apply_changeset(changeset.indexer);
for tx in &changeset.graph.txs {
for tx in &changeset.tx_graph.txs {
self.index.index_tx(tx);
}
for (&outpoint, txout) in &changeset.graph.txouts {
for (&outpoint, txout) in &changeset.tx_graph.txouts {
self.index.index_txout(outpoint, txout);
}
self.graph.apply_changeset(changeset.graph);
self.graph.apply_changeset(changeset.tx_graph);
}
/// Determines the [`ChangeSet`] between `self` and an empty [`IndexedTxGraph`].
pub fn initial_changeset(&self) -> ChangeSet<A, I::ChangeSet> {
let graph = self.graph.initial_changeset();
let indexer = self.index.initial_changeset();
ChangeSet { graph, indexer }
ChangeSet {
tx_graph: graph,
indexer,
}
}
}
@@ -89,21 +94,30 @@ where
pub fn apply_update(&mut self, update: TxGraph<A>) -> ChangeSet<A, I::ChangeSet> {
let graph = self.graph.apply_update(update);
let indexer = self.index_tx_graph_changeset(&graph);
ChangeSet { graph, indexer }
ChangeSet {
tx_graph: graph,
indexer,
}
}
/// Insert a floating `txout` of given `outpoint`.
pub fn insert_txout(&mut self, outpoint: OutPoint, txout: TxOut) -> ChangeSet<A, I::ChangeSet> {
let graph = self.graph.insert_txout(outpoint, txout);
let indexer = self.index_tx_graph_changeset(&graph);
ChangeSet { graph, indexer }
ChangeSet {
tx_graph: graph,
indexer,
}
}
/// Insert and index a transaction into the graph.
pub fn insert_tx(&mut self, tx: Transaction) -> ChangeSet<A, I::ChangeSet> {
let graph = self.graph.insert_tx(tx);
let indexer = self.index_tx_graph_changeset(&graph);
ChangeSet { graph, indexer }
ChangeSet {
tx_graph: graph,
indexer,
}
}
/// Insert an `anchor` for a given transaction.
@@ -151,7 +165,10 @@ where
}
}
ChangeSet { graph, indexer }
ChangeSet {
tx_graph: graph,
indexer,
}
}
/// Batch insert unconfirmed transactions, filtering out those that are irrelevant.
@@ -185,7 +202,10 @@ where
.map(|(tx, seen_at)| (tx.clone(), seen_at)),
);
ChangeSet { graph, indexer }
ChangeSet {
tx_graph: graph,
indexer,
}
}
/// Batch insert unconfirmed transactions.
@@ -203,7 +223,10 @@ where
) -> ChangeSet<A, I::ChangeSet> {
let graph = self.graph.batch_insert_unconfirmed(txs);
let indexer = self.index_tx_graph_changeset(&graph);
ChangeSet { graph, indexer }
ChangeSet {
tx_graph: graph,
indexer,
}
}
}
@@ -236,9 +259,9 @@ where
if self.index.is_tx_relevant(tx) {
let txid = tx.compute_txid();
let anchor = A::from_block_position(block, block_id, tx_pos);
changeset.graph.merge(self.graph.insert_tx(tx.clone()));
changeset.tx_graph.merge(self.graph.insert_tx(tx.clone()));
changeset
.graph
.tx_graph
.merge(self.graph.insert_anchor(txid, anchor));
}
}
@@ -265,7 +288,16 @@ where
graph.merge(self.graph.insert_tx(tx.clone()));
}
let indexer = self.index_tx_graph_changeset(&graph);
ChangeSet { graph, indexer }
ChangeSet {
tx_graph: graph,
indexer,
}
}
}
impl<A, I> AsRef<TxGraph<A>> for IndexedTxGraph<A, I> {
fn as_ref(&self) -> &TxGraph<A> {
&self.graph
}
}
@@ -285,7 +317,7 @@ where
#[must_use]
pub struct ChangeSet<A, IA> {
/// [`TxGraph`] changeset.
pub graph: tx_graph::ChangeSet<A>,
pub tx_graph: tx_graph::ChangeSet<A>,
/// [`Indexer`] changeset.
pub indexer: IA,
}
@@ -293,7 +325,7 @@ pub struct ChangeSet<A, IA> {
impl<A, IA: Default> Default for ChangeSet<A, IA> {
fn default() -> Self {
Self {
graph: Default::default(),
tx_graph: Default::default(),
indexer: Default::default(),
}
}
@@ -301,38 +333,30 @@ impl<A, IA: Default> Default for ChangeSet<A, IA> {
impl<A: Anchor, IA: Merge> Merge for ChangeSet<A, IA> {
fn merge(&mut self, other: Self) {
self.graph.merge(other.graph);
self.tx_graph.merge(other.tx_graph);
self.indexer.merge(other.indexer);
}
fn is_empty(&self) -> bool {
self.graph.is_empty() && self.indexer.is_empty()
self.tx_graph.is_empty() && self.indexer.is_empty()
}
}
impl<A, IA: Default> From<tx_graph::ChangeSet<A>> for ChangeSet<A, IA> {
fn from(graph: tx_graph::ChangeSet<A>) -> Self {
Self {
graph,
tx_graph: graph,
..Default::default()
}
}
}
#[cfg(feature = "miniscript")]
impl<A, K> From<crate::indexer::keychain_txout::ChangeSet<K>>
for ChangeSet<A, crate::indexer::keychain_txout::ChangeSet<K>>
{
fn from(indexer: crate::indexer::keychain_txout::ChangeSet<K>) -> Self {
impl<A> From<crate::keychain_txout::ChangeSet> for ChangeSet<A, crate::keychain_txout::ChangeSet> {
fn from(indexer: crate::keychain_txout::ChangeSet) -> Self {
Self {
graph: Default::default(),
tx_graph: Default::default(),
indexer,
}
}
}
impl<A, I> AsRef<TxGraph<A>> for IndexedTxGraph<A, I> {
fn as_ref(&self) -> &TxGraph<A> {
&self.graph
}
}

View File

@@ -5,7 +5,8 @@ use crate::{
collections::*,
miniscript::{Descriptor, DescriptorPublicKey},
spk_iter::BIP32_MAX_INDEX,
DescriptorExt, DescriptorId, Indexed, Indexer, KeychainIndexed, SpkIterator, SpkTxOutIndex,
spk_txout::SpkTxOutIndex,
DescriptorExt, DescriptorId, Indexed, Indexer, KeychainIndexed, SpkIterator,
};
use alloc::{borrow::ToOwned, vec::Vec};
use bitcoin::{Amount, OutPoint, Script, ScriptBuf, SignedAmount, Transaction, TxOut, Txid};
@@ -135,7 +136,7 @@ impl<K> Default for KeychainTxOutIndex<K> {
}
impl<K: Clone + Ord + Debug> Indexer for KeychainTxOutIndex<K> {
type ChangeSet = ChangeSet<K>;
type ChangeSet = ChangeSet;
fn index_txout(&mut self, outpoint: OutPoint, txout: &TxOut) -> Self::ChangeSet {
let mut changeset = ChangeSet::default();
@@ -154,7 +155,7 @@ impl<K: Clone + Ord + Debug> Indexer for KeychainTxOutIndex<K> {
}
fn index_tx(&mut self, tx: &bitcoin::Transaction) -> Self::ChangeSet {
let mut changeset = ChangeSet::<K>::default();
let mut changeset = ChangeSet::default();
let txid = tx.compute_txid();
for (op, txout) in tx.output.iter().enumerate() {
changeset.merge(self.index_txout(OutPoint::new(txid, op as u32), txout));
@@ -164,10 +165,6 @@ impl<K: Clone + Ord + Debug> Indexer for KeychainTxOutIndex<K> {
fn initial_changeset(&self) -> Self::ChangeSet {
ChangeSet {
keychains_added: self
.keychains()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
last_revealed: self.last_revealed.clone().into_iter().collect(),
}
}
@@ -354,7 +351,7 @@ impl<K: Clone + Ord + Debug> KeychainTxOutIndex<K> {
/// keychain <-> descriptor is a one-to-one mapping that cannot be changed. Attempting to do so
/// will return a [`InsertDescriptorError<K>`].
///
/// `[KeychainTxOutIndex]` will prevent you from inserting two descriptors which derive the same
/// [`KeychainTxOutIndex`] will prevent you from inserting two descriptors which derive the same
/// script pubkey at index 0, but it's up to you to ensure that descriptors don't collide at
/// other indices. If they do nothing catastrophic happens at the `KeychainTxOutIndex` level
/// (one keychain just becomes the defacto owner of that spk arbitrarily) but this may have
@@ -364,8 +361,7 @@ impl<K: Clone + Ord + Debug> KeychainTxOutIndex<K> {
&mut self,
keychain: K,
descriptor: Descriptor<DescriptorPublicKey>,
) -> Result<ChangeSet<K>, InsertDescriptorError<K>> {
let mut changeset = ChangeSet::<K>::default();
) -> Result<bool, InsertDescriptorError<K>> {
let did = descriptor.descriptor_id();
if !self.keychain_to_descriptor_id.contains_key(&keychain)
&& !self.descriptor_id_to_keychain.contains_key(&did)
@@ -374,33 +370,31 @@ impl<K: Clone + Ord + Debug> KeychainTxOutIndex<K> {
self.keychain_to_descriptor_id.insert(keychain.clone(), did);
self.descriptor_id_to_keychain.insert(did, keychain.clone());
self.replenish_inner_index(did, &keychain, self.lookahead);
changeset
.keychains_added
.insert(keychain.clone(), descriptor);
} else {
if let Some(existing_desc_id) = self.keychain_to_descriptor_id.get(&keychain) {
let descriptor = self.descriptors.get(existing_desc_id).expect("invariant");
if *existing_desc_id != did {
return Err(InsertDescriptorError::KeychainAlreadyAssigned {
existing_assignment: descriptor.clone(),
keychain,
});
}
}
return Ok(true);
}
if let Some(existing_keychain) = self.descriptor_id_to_keychain.get(&did) {
let descriptor = self.descriptors.get(&did).expect("invariant").clone();
if *existing_keychain != keychain {
return Err(InsertDescriptorError::DescriptorAlreadyAssigned {
existing_assignment: existing_keychain.clone(),
descriptor,
});
}
if let Some(existing_desc_id) = self.keychain_to_descriptor_id.get(&keychain) {
let descriptor = self.descriptors.get(existing_desc_id).expect("invariant");
if *existing_desc_id != did {
return Err(InsertDescriptorError::KeychainAlreadyAssigned {
existing_assignment: descriptor.clone(),
keychain,
});
}
}
Ok(changeset)
if let Some(existing_keychain) = self.descriptor_id_to_keychain.get(&did) {
let descriptor = self.descriptors.get(&did).expect("invariant").clone();
if *existing_keychain != keychain {
return Err(InsertDescriptorError::DescriptorAlreadyAssigned {
existing_assignment: existing_keychain.clone(),
descriptor,
});
}
}
Ok(false)
}
/// Gets the descriptor associated with the keychain. Returns `None` if the keychain doesn't
@@ -627,7 +621,7 @@ impl<K: Clone + Ord + Debug> KeychainTxOutIndex<K> {
}
/// Convenience method to call [`Self::reveal_to_target`] on multiple keychains.
pub fn reveal_to_target_multi(&mut self, keychains: &BTreeMap<K, u32>) -> ChangeSet<K> {
pub fn reveal_to_target_multi(&mut self, keychains: &BTreeMap<K, u32>) -> ChangeSet {
let mut changeset = ChangeSet::default();
for (keychain, &index) in keychains {
@@ -656,7 +650,7 @@ impl<K: Clone + Ord + Debug> KeychainTxOutIndex<K> {
&mut self,
keychain: &K,
target_index: u32,
) -> Option<(Vec<Indexed<ScriptBuf>>, ChangeSet<K>)> {
) -> Option<(Vec<Indexed<ScriptBuf>>, ChangeSet)> {
let mut changeset = ChangeSet::default();
let mut spks: Vec<Indexed<ScriptBuf>> = vec![];
while let Some((i, new)) = self.next_index(keychain) {
@@ -687,7 +681,7 @@ impl<K: Clone + Ord + Debug> KeychainTxOutIndex<K> {
/// 1. The descriptor has no wildcard and already has one script revealed.
/// 2. The descriptor has already revealed scripts up to the numeric bound.
/// 3. There is no descriptor associated with the given keychain.
pub fn reveal_next_spk(&mut self, keychain: &K) -> Option<(Indexed<ScriptBuf>, ChangeSet<K>)> {
pub fn reveal_next_spk(&mut self, keychain: &K) -> Option<(Indexed<ScriptBuf>, ChangeSet)> {
let (next_index, new) = self.next_index(keychain)?;
let mut changeset = ChangeSet::default();
@@ -717,7 +711,7 @@ impl<K: Clone + Ord + Debug> KeychainTxOutIndex<K> {
/// could be revealed (see [`reveal_next_spk`] for when this happens).
///
/// [`reveal_next_spk`]: Self::reveal_next_spk
pub fn next_unused_spk(&mut self, keychain: &K) -> Option<(Indexed<ScriptBuf>, ChangeSet<K>)> {
pub fn next_unused_spk(&mut self, keychain: &K) -> Option<(Indexed<ScriptBuf>, ChangeSet)> {
let next_unused = self
.unused_keychain_spks(keychain)
.next()
@@ -780,27 +774,80 @@ impl<K: Clone + Ord + Debug> KeychainTxOutIndex<K> {
}
/// Applies the `ChangeSet<K>` to the [`KeychainTxOutIndex<K>`]
///
/// Keychains added by the `keychains_added` field of `ChangeSet<K>` respect the one-to-one
/// keychain <-> descriptor invariant by silently ignoring attempts to violate it (but will
/// panic if `debug_assertions` are enabled).
pub fn apply_changeset(&mut self, changeset: ChangeSet<K>) {
let ChangeSet {
keychains_added,
last_revealed,
} = changeset;
for (keychain, descriptor) in keychains_added {
let _ignore_invariant_violation = self.insert_descriptor(keychain, descriptor);
}
for (&desc_id, &index) in &last_revealed {
pub fn apply_changeset(&mut self, changeset: ChangeSet) {
for (&desc_id, &index) in &changeset.last_revealed {
let v = self.last_revealed.entry(desc_id).or_default();
*v = index.max(*v);
self.replenish_inner_index_did(desc_id, self.lookahead);
}
}
}
#[cfg(feature = "sqlite")]
impl ChangeSet {
/// Schema name for the changeset.
pub const SCHEMA_NAME: &'static str = "bdk_keychaintxout";
/// Name for table that stores last revealed indices per descriptor id.
pub const LAST_REVEALED_TABLE_NAME: &'static str = "bdk_descriptor_last_revealed";
/// Initialize sqlite tables for persisting [`KeychainTxOutIndex`].
fn init_sqlite_tables(db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> {
let schema_v0: &[&str] = &[
// last revealed
&format!(
"CREATE TABLE {} ( \
descriptor_id TEXT PRIMARY KEY NOT NULL, \
last_revealed INTEGER NOT NULL \
) STRICT",
Self::LAST_REVEALED_TABLE_NAME,
),
];
crate::sqlite::migrate_schema(db_tx, Self::SCHEMA_NAME, &[schema_v0])
}
/// Construct [`KeychainTxOutIndex`] from sqlite database and given parameters.
pub fn from_sqlite(db_tx: &rusqlite::Transaction) -> rusqlite::Result<Self> {
Self::init_sqlite_tables(db_tx)?;
use crate::sqlite::Sql;
let mut changeset = Self::default();
let mut statement = db_tx.prepare(&format!(
"SELECT descriptor_id, last_revealed FROM {}",
Self::LAST_REVEALED_TABLE_NAME,
))?;
let row_iter = statement.query_map([], |row| {
Ok((
row.get::<_, Sql<DescriptorId>>("descriptor_id")?,
row.get::<_, u32>("last_revealed")?,
))
})?;
for row in row_iter {
let (Sql(descriptor_id), last_revealed) = row?;
changeset.last_revealed.insert(descriptor_id, last_revealed);
}
for did in last_revealed.keys() {
self.replenish_inner_index_did(*did, self.lookahead);
Ok(changeset)
}
/// Persist `changeset` to the sqlite database.
pub fn persist_to_sqlite(&self, db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> {
Self::init_sqlite_tables(db_tx)?;
use crate::rusqlite::named_params;
use crate::sqlite::Sql;
let mut statement = db_tx.prepare_cached(&format!(
"REPLACE INTO {}(descriptor_id, last_revealed) VALUES(:descriptor_id, :last_revealed)",
Self::LAST_REVEALED_TABLE_NAME,
))?;
for (&descriptor_id, &last_revealed) in &self.last_revealed {
statement.execute(named_params! {
":descriptor_id": Sql(descriptor_id),
":last_revealed": last_revealed,
})?;
}
Ok(())
}
}
@@ -860,49 +907,24 @@ impl<K: core::fmt::Debug> std::error::Error for InsertDescriptorError<K> {}
/// `keychains_added` is *not* monotone, once it is set any attempt to change it is subject to the
/// same *one-to-one* keychain <-> descriptor mapping invariant as [`KeychainTxOutIndex`] itself.
///
/// [`apply_changeset`]: KeychainTxOutIndex::apply_changeset
/// [`Merge`]: Self::merge
#[derive(Clone, Debug, PartialEq)]
/// [`KeychainTxOutIndex`]: crate::keychain_txout::KeychainTxOutIndex
/// [`apply_changeset`]: crate::keychain_txout::KeychainTxOutIndex::apply_changeset
/// [`merge`]: Self::merge
#[derive(Clone, Debug, Default, PartialEq)]
#[cfg_attr(
feature = "serde",
derive(serde::Deserialize, serde::Serialize),
serde(
crate = "serde_crate",
bound(
deserialize = "K: Ord + serde::Deserialize<'de>",
serialize = "K: Ord + serde::Serialize"
)
)
serde(crate = "serde_crate")
)]
#[must_use]
pub struct ChangeSet<K> {
/// Contains the keychains that have been added and their respective descriptor
pub keychains_added: BTreeMap<K, Descriptor<DescriptorPublicKey>>,
pub struct ChangeSet {
/// Contains for each descriptor_id the last revealed index of derivation
pub last_revealed: BTreeMap<DescriptorId, u32>,
}
impl<K: Ord> Merge for ChangeSet<K> {
/// Merge another [`ChangeSet<K>`] into self.
///
/// For the `keychains_added` field this method respects the invariants of
/// [`insert_descriptor`]. `last_revealed` always becomes the larger of the two.
///
/// [`insert_descriptor`]: KeychainTxOutIndex::insert_descriptor
impl Merge for ChangeSet {
/// Merge another [`ChangeSet`] into self.
fn merge(&mut self, other: Self) {
for (new_keychain, new_descriptor) in other.keychains_added {
// enforce 1-to-1 invariance
if !self.keychains_added.contains_key(&new_keychain)
// FIXME: very inefficient
&& self
.keychains_added
.values()
.all(|descriptor| descriptor != &new_descriptor)
{
self.keychains_added.insert(new_keychain, new_descriptor);
}
}
// for `last_revealed`, entries of `other` will take precedence ONLY if it is greater than
// what was originally in `self`.
for (desc_id, index) in other.last_revealed {
@@ -922,25 +944,6 @@ impl<K: Ord> Merge for ChangeSet<K> {
/// Returns whether the changeset are empty.
fn is_empty(&self) -> bool {
self.last_revealed.is_empty() && self.keychains_added.is_empty()
}
}
impl<K> Default for ChangeSet<K> {
fn default() -> Self {
Self {
last_revealed: BTreeMap::default(),
keychains_added: BTreeMap::default(),
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
/// The keychain doesn't exist. Most likley hasn't been inserted with [`KeychainTxOutIndex::insert_descriptor`].
pub struct NoSuchKeychain<K>(K);
impl<K: Debug> core::fmt::Display for NoSuchKeychain<K> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "no such keychain {:?} exists", &self.0)
self.last_revealed.is_empty()
}
}

View File

@@ -208,7 +208,7 @@ impl<I: Clone + Ord + core::fmt::Debug> SpkTxOutIndex<I> {
/// # Example
///
/// ```rust
/// # use bdk_chain::SpkTxOutIndex;
/// # use bdk_chain::spk_txout::SpkTxOutIndex;
///
/// // imagine our spks are indexed like (keychain, derivation_index).
/// let txout_index = SpkTxOutIndex::<(u32, u32)>::default();

View File

@@ -28,7 +28,7 @@ pub use chain_data::*;
pub mod indexed_tx_graph;
pub use indexed_tx_graph::IndexedTxGraph;
pub mod indexer;
pub use indexer::spk_txout::*;
pub use indexer::spk_txout;
pub use indexer::Indexer;
pub mod local_chain;
mod tx_data_traits;
@@ -37,6 +37,8 @@ pub use tx_data_traits::*;
pub use tx_graph::TxGraph;
mod chain_oracle;
pub use chain_oracle::*;
mod persist;
pub use persist::*;
#[doc(hidden)]
pub mod example_utils;
@@ -51,8 +53,16 @@ pub use descriptor_ext::{DescriptorExt, DescriptorId};
mod spk_iter;
#[cfg(feature = "miniscript")]
pub use spk_iter::*;
#[cfg(feature = "miniscript")]
mod changeset;
#[cfg(feature = "miniscript")]
pub use changeset::*;
#[cfg(feature = "miniscript")]
pub use indexer::keychain_txout;
#[cfg(feature = "sqlite")]
pub mod sqlite;
#[cfg(feature = "sqlite")]
pub use rusqlite;
pub mod spk_client;
#[allow(unused_imports)]

View File

@@ -4,17 +4,11 @@ use core::convert::Infallible;
use core::ops::RangeBounds;
use crate::collections::BTreeMap;
use crate::{BlockId, ChainOracle};
use crate::{BlockId, ChainOracle, Merge};
use alloc::sync::Arc;
use bitcoin::block::Header;
use bitcoin::BlockHash;
/// The [`ChangeSet`] represents changes to [`LocalChain`].
///
/// The key represents the block height, and the value either represents added a new [`CheckPoint`]
/// (if [`Some`]), or removing a [`CheckPoint`] (if [`None`]).
pub type ChangeSet = BTreeMap<u32, Option<BlockHash>>;
/// A [`LocalChain`] checkpoint is used to find the agreement point between two chains and as a
/// transaction anchor.
///
@@ -216,7 +210,7 @@ impl CheckPoint {
/// Apply `changeset` to the checkpoint.
fn apply_changeset(mut self, changeset: &ChangeSet) -> Result<CheckPoint, MissingGenesisError> {
if let Some(start_height) = changeset.keys().next().cloned() {
if let Some(start_height) = changeset.blocks.keys().next().cloned() {
// changes after point of agreement
let mut extension = BTreeMap::default();
// point of agreement
@@ -231,7 +225,7 @@ impl CheckPoint {
}
}
for (&height, &hash) in changeset {
for (&height, &hash) in &changeset.blocks {
match hash {
Some(hash) => {
extension.insert(height, hash);
@@ -331,7 +325,7 @@ impl LocalChain {
/// Construct a [`LocalChain`] from an initial `changeset`.
pub fn from_changeset(changeset: ChangeSet) -> Result<Self, MissingGenesisError> {
let genesis_entry = changeset.get(&0).copied().flatten();
let genesis_entry = changeset.blocks.get(&0).copied().flatten();
let genesis_hash = match genesis_entry {
Some(hash) => hash,
None => return Err(MissingGenesisError),
@@ -521,12 +515,14 @@ impl LocalChain {
}
let mut changeset = ChangeSet::default();
changeset.insert(block_id.height, Some(block_id.hash));
changeset
.blocks
.insert(block_id.height, Some(block_id.hash));
self.apply_changeset(&changeset)
.map_err(|_| AlterCheckPointError {
height: 0,
original_hash: self.genesis_hash(),
update_hash: changeset.get(&0).cloned().flatten(),
update_hash: changeset.blocks.get(&0).cloned().flatten(),
})?;
Ok(changeset)
}
@@ -548,7 +544,7 @@ impl LocalChain {
if cp_id.height < block_id.height {
break;
}
changeset.insert(cp_id.height, None);
changeset.blocks.insert(cp_id.height, None);
if cp_id == block_id {
remove_from = Some(cp);
}
@@ -569,13 +565,16 @@ impl LocalChain {
/// Derives an initial [`ChangeSet`], meaning that it can be applied to an empty chain to
/// recover the current chain.
pub fn initial_changeset(&self) -> ChangeSet {
self.tip
.iter()
.map(|cp| {
let block_id = cp.block_id();
(block_id.height, Some(block_id.hash))
})
.collect()
ChangeSet {
blocks: self
.tip
.iter()
.map(|cp| {
let block_id = cp.block_id();
(block_id.height, Some(block_id.hash))
})
.collect(),
}
}
/// Iterate over checkpoints in descending height order.
@@ -587,7 +586,7 @@ impl LocalChain {
fn _check_changeset_is_applied(&self, changeset: &ChangeSet) -> bool {
let mut curr_cp = self.tip.clone();
for (height, exp_hash) in changeset.iter().rev() {
for (height, exp_hash) in changeset.blocks.iter().rev() {
match curr_cp.get(*height) {
Some(query_cp) => {
if query_cp.height() != *height || Some(query_cp.hash()) != *exp_hash {
@@ -630,6 +629,135 @@ impl LocalChain {
}
}
/// The [`ChangeSet`] represents changes to [`LocalChain`].
#[derive(Debug, Default, Clone, PartialEq)]
#[cfg_attr(
feature = "serde",
derive(serde::Deserialize, serde::Serialize),
serde(crate = "serde_crate")
)]
pub struct ChangeSet {
/// Changes to the [`LocalChain`] blocks.
///
/// The key represents the block height, and the value either represents added a new [`CheckPoint`]
/// (if [`Some`]), or removing a [`CheckPoint`] (if [`None`]).
pub blocks: BTreeMap<u32, Option<BlockHash>>,
}
impl Merge for ChangeSet {
fn merge(&mut self, other: Self) {
Merge::merge(&mut self.blocks, other.blocks)
}
fn is_empty(&self) -> bool {
self.blocks.is_empty()
}
}
impl<B: IntoIterator<Item = (u32, Option<BlockHash>)>> From<B> for ChangeSet {
fn from(blocks: B) -> Self {
Self {
blocks: blocks.into_iter().collect(),
}
}
}
impl FromIterator<(u32, Option<BlockHash>)> for ChangeSet {
fn from_iter<T: IntoIterator<Item = (u32, Option<BlockHash>)>>(iter: T) -> Self {
Self {
blocks: iter.into_iter().collect(),
}
}
}
impl FromIterator<(u32, BlockHash)> for ChangeSet {
fn from_iter<T: IntoIterator<Item = (u32, BlockHash)>>(iter: T) -> Self {
Self {
blocks: iter
.into_iter()
.map(|(height, hash)| (height, Some(hash)))
.collect(),
}
}
}
#[cfg(feature = "sqlite")]
impl ChangeSet {
/// Schema name for the changeset.
pub const SCHEMA_NAME: &'static str = "bdk_localchain";
/// Name of sqlite table that stores blocks of [`LocalChain`].
pub const BLOCKS_TABLE_NAME: &'static str = "bdk_blocks";
/// Initialize sqlite tables for persisting [`LocalChain`].
fn init_sqlite_tables(db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> {
let schema_v0: &[&str] = &[
// blocks
&format!(
"CREATE TABLE {} ( \
block_height INTEGER PRIMARY KEY NOT NULL, \
block_hash TEXT NOT NULL \
) STRICT",
Self::BLOCKS_TABLE_NAME,
),
];
crate::sqlite::migrate_schema(db_tx, Self::SCHEMA_NAME, &[schema_v0])
}
/// Construct a [`LocalChain`] from sqlite database.
pub fn from_sqlite(db_tx: &rusqlite::Transaction) -> rusqlite::Result<Self> {
Self::init_sqlite_tables(db_tx)?;
use crate::sqlite::Sql;
let mut changeset = Self::default();
let mut statement = db_tx.prepare(&format!(
"SELECT block_height, block_hash FROM {}",
Self::BLOCKS_TABLE_NAME,
))?;
let row_iter = statement.query_map([], |row| {
Ok((
row.get::<_, u32>("block_height")?,
row.get::<_, Sql<BlockHash>>("block_hash")?,
))
})?;
for row in row_iter {
let (height, Sql(hash)) = row?;
changeset.blocks.insert(height, Some(hash));
}
Ok(changeset)
}
/// Persist `changeset` to the sqlite database.
pub fn persist_to_sqlite(&self, db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> {
Self::init_sqlite_tables(db_tx)?;
use crate::sqlite::Sql;
use rusqlite::named_params;
let mut replace_statement = db_tx.prepare_cached(&format!(
"REPLACE INTO {}(block_height, block_hash) VALUES(:block_height, :block_hash)",
Self::BLOCKS_TABLE_NAME,
))?;
let mut delete_statement = db_tx.prepare_cached(&format!(
"DELETE FROM {} WHERE block_height=:block_height",
Self::BLOCKS_TABLE_NAME,
))?;
for (&height, &hash) in &self.blocks {
match hash {
Some(hash) => replace_statement.execute(named_params! {
":block_height": height,
":block_hash": Sql(hash),
})?,
None => delete_statement.execute(named_params! {
":block_height": height,
})?,
};
}
Ok(())
}
}
/// An error which occurs when a [`LocalChain`] is constructed without a genesis checkpoint.
#[derive(Clone, Debug, PartialEq)]
pub struct MissingGenesisError;
@@ -761,7 +889,7 @@ fn merge_chains(
match (curr_orig.as_ref(), curr_update.as_ref()) {
// Update block that doesn't exist in the original chain
(o, Some(u)) if Some(u.height()) > o.map(|o| o.height()) => {
changeset.insert(u.height(), Some(u.hash()));
changeset.blocks.insert(u.height(), Some(u.hash()));
prev_update = curr_update.take();
}
// Original block that isn't in the update
@@ -813,9 +941,9 @@ fn merge_chains(
} else {
// We have an invalidation height so we set the height to the updated hash and
// also purge all the original chain block hashes above this block.
changeset.insert(u.height(), Some(u.hash()));
changeset.blocks.insert(u.height(), Some(u.hash()));
for invalidated_height in potentially_invalidated_heights.drain(..) {
changeset.insert(invalidated_height, None);
changeset.blocks.insert(invalidated_height, None);
}
prev_orig_was_invalidated = true;
}

135
crates/chain/src/persist.rs Normal file
View File

@@ -0,0 +1,135 @@
use core::{
future::Future,
ops::{Deref, DerefMut},
pin::Pin,
};
use alloc::boxed::Box;
/// Trait that persists the type with `Db`.
///
/// Methods of this trait should not be called directly.
pub trait PersistWith<Db>: Sized {
/// Parameters for [`PersistWith::create`].
type CreateParams;
/// Parameters for [`PersistWith::load`].
type LoadParams;
/// Error type of [`PersistWith::create`].
type CreateError;
/// Error type of [`PersistWith::load`].
type LoadError;
/// Error type of [`PersistWith::persist`].
type PersistError;
/// Create the type and initialize the `Db`.
fn create(db: &mut Db, params: Self::CreateParams) -> Result<Self, Self::CreateError>;
/// Load the type from the `Db`.
fn load(db: &mut Db, params: Self::LoadParams) -> Result<Option<Self>, Self::LoadError>;
/// Persist staged changes into `Db`.
fn persist(&mut self, db: &mut Db) -> Result<bool, Self::PersistError>;
}
type FutureResult<'a, T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>;
/// Trait that persists the type with an async `Db`.
pub trait PersistAsyncWith<Db>: Sized {
/// Parameters for [`PersistAsyncWith::create`].
type CreateParams;
/// Parameters for [`PersistAsyncWith::load`].
type LoadParams;
/// Error type of [`PersistAsyncWith::create`].
type CreateError;
/// Error type of [`PersistAsyncWith::load`].
type LoadError;
/// Error type of [`PersistAsyncWith::persist`].
type PersistError;
/// Create the type and initialize the `Db`.
fn create(db: &mut Db, params: Self::CreateParams) -> FutureResult<Self, Self::CreateError>;
/// Load the type from `Db`.
fn load(db: &mut Db, params: Self::LoadParams) -> FutureResult<Option<Self>, Self::LoadError>;
/// Persist staged changes into `Db`.
fn persist<'a>(&'a mut self, db: &'a mut Db) -> FutureResult<'a, bool, Self::PersistError>;
}
/// Represents a persisted `T`.
pub struct Persisted<T> {
inner: T,
}
impl<T> Persisted<T> {
/// Create a new persisted `T`.
pub fn create<Db>(db: &mut Db, params: T::CreateParams) -> Result<Self, T::CreateError>
where
T: PersistWith<Db>,
{
T::create(db, params).map(|inner| Self { inner })
}
/// Create a new persisted `T` with async `Db`.
pub async fn create_async<Db>(
db: &mut Db,
params: T::CreateParams,
) -> Result<Self, T::CreateError>
where
T: PersistAsyncWith<Db>,
{
T::create(db, params).await.map(|inner| Self { inner })
}
/// Construct a persisted `T` from `Db`.
pub fn load<Db>(db: &mut Db, params: T::LoadParams) -> Result<Option<Self>, T::LoadError>
where
T: PersistWith<Db>,
{
Ok(T::load(db, params)?.map(|inner| Self { inner }))
}
/// Contruct a persisted `T` from an async `Db`.
pub async fn load_async<Db>(
db: &mut Db,
params: T::LoadParams,
) -> Result<Option<Self>, T::LoadError>
where
T: PersistAsyncWith<Db>,
{
Ok(T::load(db, params).await?.map(|inner| Self { inner }))
}
/// Persist staged changes of `T` into `Db`.
pub fn persist<Db>(&mut self, db: &mut Db) -> Result<bool, T::PersistError>
where
T: PersistWith<Db>,
{
self.inner.persist(db)
}
/// Persist staged changes of `T` into an async `Db`.
pub async fn persist_async<'a, Db>(
&'a mut self,
db: &'a mut Db,
) -> Result<bool, T::PersistError>
where
T: PersistAsyncWith<Db>,
{
self.inner.persist(db).await
}
}
impl<T> Deref for Persisted<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T> DerefMut for Persisted<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

332
crates/chain/src/sqlite.rs Normal file
View File

@@ -0,0 +1,332 @@
//! Module for stuff
use core::{fmt::Debug, ops::Deref, str::FromStr};
use alloc::{borrow::ToOwned, boxed::Box, string::ToString, vec::Vec};
use bitcoin::consensus::{Decodable, Encodable};
pub use rusqlite;
pub use rusqlite::Connection;
use rusqlite::OptionalExtension;
pub use rusqlite::Transaction;
use rusqlite::{
named_params,
types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef},
ToSql,
};
use crate::{Anchor, Merge};
/// Parameters for [`Persister`].
pub trait PersistParams {
/// Data type that is loaded and written to the database.
type ChangeSet: Default + Merge;
/// Initialize SQL tables.
fn initialize_tables(&self, db_tx: &Transaction) -> rusqlite::Result<()>;
/// Load all data from tables.
fn load_changeset(&self, db_tx: &Transaction) -> rusqlite::Result<Option<Self::ChangeSet>>;
/// Write data into table(s).
fn write_changeset(
&self,
db_tx: &Transaction,
changeset: &Self::ChangeSet,
) -> rusqlite::Result<()>;
}
// TODO: Use macros
impl<A: PersistParams, B: PersistParams> PersistParams for (A, B) {
type ChangeSet = (A::ChangeSet, B::ChangeSet);
fn initialize_tables(&self, db_tx: &Transaction) -> rusqlite::Result<()> {
self.0.initialize_tables(db_tx)?;
self.1.initialize_tables(db_tx)?;
Ok(())
}
fn load_changeset(&self, db_tx: &Transaction) -> rusqlite::Result<Option<Self::ChangeSet>> {
let changeset = (
self.0.load_changeset(db_tx)?.unwrap_or_default(),
self.1.load_changeset(db_tx)?.unwrap_or_default(),
);
if changeset.is_empty() {
Ok(None)
} else {
Ok(Some(changeset))
}
}
fn write_changeset(
&self,
db_tx: &Transaction,
changeset: &Self::ChangeSet,
) -> rusqlite::Result<()> {
self.0.write_changeset(db_tx, &changeset.0)?;
self.1.write_changeset(db_tx, &changeset.1)?;
Ok(())
}
}
/// Persists data in to a relational schema based [SQLite] database file.
///
/// The changesets loaded or stored represent changes to keychain and blockchain data.
///
/// [SQLite]: https://www.sqlite.org/index.html
#[derive(Debug)]
pub struct Persister<P> {
conn: rusqlite::Connection,
params: P,
}
impl<P: PersistParams> Persister<P> {
/// Persist changeset to the database connection.
pub fn persist(&mut self, changeset: &P::ChangeSet) -> rusqlite::Result<()> {
if !changeset.is_empty() {
let db_tx = self.conn.transaction()?;
self.params.write_changeset(&db_tx, changeset)?;
db_tx.commit()?;
}
Ok(())
}
}
/// Extends [`rusqlite::Connection`] to transform into a [`Persister`].
pub trait ConnectionExt: Sized {
/// Transform into a [`Persister`].
fn into_persister<P: PersistParams>(
self,
params: P,
) -> rusqlite::Result<(Persister<P>, Option<P::ChangeSet>)>;
}
impl ConnectionExt for rusqlite::Connection {
fn into_persister<P: PersistParams>(
mut self,
params: P,
) -> rusqlite::Result<(Persister<P>, Option<P::ChangeSet>)> {
let db_tx = self.transaction()?;
params.initialize_tables(&db_tx)?;
let changeset = params.load_changeset(&db_tx)?;
db_tx.commit()?;
let persister = Persister { conn: self, params };
Ok((persister, changeset))
}
}
/// Table name for schemas.
pub const SCHEMAS_TABLE_NAME: &str = "bdk_schemas";
/// Initialize the schema table.
fn init_schemas_table(db_tx: &Transaction) -> rusqlite::Result<()> {
let sql = format!("CREATE TABLE IF NOT EXISTS {}( name TEXT PRIMARY KEY NOT NULL, version INTEGER NOT NULL ) STRICT", SCHEMAS_TABLE_NAME);
db_tx.execute(&sql, ())?;
Ok(())
}
/// Get schema version of `schema_name`.
fn schema_version(db_tx: &Transaction, schema_name: &str) -> rusqlite::Result<Option<u32>> {
let sql = format!(
"SELECT version FROM {} WHERE name=:name",
SCHEMAS_TABLE_NAME
);
db_tx
.query_row(&sql, named_params! { ":name": schema_name }, |row| {
row.get::<_, u32>("version")
})
.optional()
}
/// Set the `schema_version` of `schema_name`.
fn set_schema_version(
db_tx: &Transaction,
schema_name: &str,
schema_version: u32,
) -> rusqlite::Result<()> {
let sql = format!(
"REPLACE INTO {}(name, version) VALUES(:name, :version)",
SCHEMAS_TABLE_NAME,
);
db_tx.execute(
&sql,
named_params! { ":name": schema_name, ":version": schema_version },
)?;
Ok(())
}
/// Runs logic that initializes/migrates the table schemas.
pub fn migrate_schema(
db_tx: &Transaction,
schema_name: &str,
versioned_scripts: &[&[&str]],
) -> rusqlite::Result<()> {
init_schemas_table(db_tx)?;
let current_version = schema_version(db_tx, schema_name)?;
let exec_from = current_version.map_or(0_usize, |v| v as usize + 1);
let scripts_to_exec = versioned_scripts.iter().enumerate().skip(exec_from);
for (version, &script) in scripts_to_exec {
set_schema_version(db_tx, schema_name, version as u32)?;
for statement in script {
db_tx.execute(statement, ())?;
}
}
Ok(())
}
/// A wrapper so that we can impl [FromSql] and [ToSql] for multiple types.
pub struct Sql<T>(pub T);
impl<T> From<T> for Sql<T> {
fn from(value: T) -> Self {
Self(value)
}
}
impl<T> Deref for Sql<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl FromSql for Sql<bitcoin::Txid> {
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
bitcoin::Txid::from_str(value.as_str()?)
.map(Self)
.map_err(from_sql_error)
}
}
impl ToSql for Sql<bitcoin::Txid> {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
Ok(self.to_string().into())
}
}
impl FromSql for Sql<bitcoin::BlockHash> {
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
bitcoin::BlockHash::from_str(value.as_str()?)
.map(Self)
.map_err(from_sql_error)
}
}
impl ToSql for Sql<bitcoin::BlockHash> {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
Ok(self.to_string().into())
}
}
#[cfg(feature = "miniscript")]
impl FromSql for Sql<crate::DescriptorId> {
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
crate::DescriptorId::from_str(value.as_str()?)
.map(Self)
.map_err(from_sql_error)
}
}
#[cfg(feature = "miniscript")]
impl ToSql for Sql<crate::DescriptorId> {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
Ok(self.to_string().into())
}
}
impl FromSql for Sql<bitcoin::Transaction> {
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
bitcoin::Transaction::consensus_decode_from_finite_reader(&mut value.as_bytes()?)
.map(Self)
.map_err(from_sql_error)
}
}
impl ToSql for Sql<bitcoin::Transaction> {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
let mut bytes = Vec::<u8>::new();
self.consensus_encode(&mut bytes).map_err(to_sql_error)?;
Ok(bytes.into())
}
}
impl FromSql for Sql<bitcoin::ScriptBuf> {
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
Ok(bitcoin::Script::from_bytes(value.as_bytes()?)
.to_owned()
.into())
}
}
impl ToSql for Sql<bitcoin::ScriptBuf> {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
Ok(self.as_bytes().into())
}
}
impl FromSql for Sql<bitcoin::Amount> {
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
Ok(bitcoin::Amount::from_sat(value.as_i64()?.try_into().map_err(from_sql_error)?).into())
}
}
impl ToSql for Sql<bitcoin::Amount> {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
let amount: i64 = self.to_sat().try_into().map_err(to_sql_error)?;
Ok(amount.into())
}
}
impl<A: Anchor + serde_crate::de::DeserializeOwned> FromSql for Sql<A> {
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
serde_json::from_str(value.as_str()?)
.map(Sql)
.map_err(from_sql_error)
}
}
impl<A: Anchor + serde_crate::Serialize> ToSql for Sql<A> {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
serde_json::to_string(&self.0)
.map(Into::into)
.map_err(to_sql_error)
}
}
#[cfg(feature = "miniscript")]
impl FromSql for Sql<miniscript::Descriptor<miniscript::DescriptorPublicKey>> {
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
miniscript::Descriptor::from_str(value.as_str()?)
.map(Self)
.map_err(from_sql_error)
}
}
#[cfg(feature = "miniscript")]
impl ToSql for Sql<miniscript::Descriptor<miniscript::DescriptorPublicKey>> {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
Ok(self.to_string().into())
}
}
impl FromSql for Sql<bitcoin::Network> {
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
bitcoin::Network::from_str(value.as_str()?)
.map(Self)
.map_err(from_sql_error)
}
}
impl ToSql for Sql<bitcoin::Network> {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
Ok(self.to_string().into())
}
}
fn from_sql_error<E: std::error::Error + Send + Sync + 'static>(err: E) -> FromSqlError {
FromSqlError::Other(Box::new(err))
}
fn to_sql_error<E: std::error::Error + Send + Sync + 'static>(err: E) -> rusqlite::Error {
rusqlite::Error::ToSqlConversionFailure(Box::new(err))
}

View File

@@ -1293,6 +1293,188 @@ impl<A> ChangeSet<A> {
}
}
#[cfg(feature = "sqlite")]
impl<A> ChangeSet<A>
where
A: Anchor + Clone + Ord + serde::Serialize + serde::de::DeserializeOwned,
{
/// Schema name for the [`ChangeSet`].
pub const SCHEMA_NAME: &'static str = "bdk_txgraph";
/// Name of table that stores full transactions and `last_seen` timestamps.
pub const TXS_TABLE_NAME: &'static str = "bdk_txs";
/// Name of table that stores floating txouts.
pub const TXOUTS_TABLE_NAME: &'static str = "bdk_txouts";
/// Name of table that stores [`Anchor`]s.
pub const ANCHORS_TABLE_NAME: &'static str = "bdk_anchors";
/// Initialize sqlite tables.
fn init_sqlite_tables(db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> {
let schema_v0: &[&str] = &[
// full transactions
&format!(
"CREATE TABLE {} ( \
txid TEXT PRIMARY KEY NOT NULL, \
raw_tx BLOB, \
last_seen INTEGER \
) STRICT",
Self::TXS_TABLE_NAME,
),
// floating txouts
&format!(
"CREATE TABLE {} ( \
txid TEXT NOT NULL, \
vout INTEGER NOT NULL, \
value INTEGER NOT NULL, \
script BLOB NOT NULL, \
PRIMARY KEY (txid, vout) \
) STRICT",
Self::TXOUTS_TABLE_NAME,
),
// anchors
&format!(
"CREATE TABLE {} ( \
txid TEXT NOT NULL REFERENCES {} (txid), \
block_height INTEGER NOT NULL, \
block_hash TEXT NOT NULL, \
anchor BLOB NOT NULL, \
PRIMARY KEY (txid, block_height, block_hash) \
) STRICT",
Self::ANCHORS_TABLE_NAME,
Self::TXS_TABLE_NAME,
),
];
crate::sqlite::migrate_schema(db_tx, Self::SCHEMA_NAME, &[schema_v0])
}
/// Construct a [`TxGraph`] from an sqlite database.
pub fn from_sqlite(db_tx: &rusqlite::Transaction) -> rusqlite::Result<Self> {
Self::init_sqlite_tables(db_tx)?;
use crate::sqlite::Sql;
let mut changeset = Self::default();
let mut statement = db_tx.prepare(&format!(
"SELECT txid, raw_tx, last_seen FROM {}",
Self::TXS_TABLE_NAME,
))?;
let row_iter = statement.query_map([], |row| {
Ok((
row.get::<_, Sql<Txid>>("txid")?,
row.get::<_, Option<Sql<Transaction>>>("raw_tx")?,
row.get::<_, Option<u64>>("last_seen")?,
))
})?;
for row in row_iter {
let (Sql(txid), tx, last_seen) = row?;
if let Some(Sql(tx)) = tx {
changeset.txs.insert(Arc::new(tx));
}
if let Some(last_seen) = last_seen {
changeset.last_seen.insert(txid, last_seen);
}
}
let mut statement = db_tx.prepare(&format!(
"SELECT txid, vout, value, script FROM {}",
Self::TXOUTS_TABLE_NAME,
))?;
let row_iter = statement.query_map([], |row| {
Ok((
row.get::<_, Sql<Txid>>("txid")?,
row.get::<_, u32>("vout")?,
row.get::<_, Sql<Amount>>("value")?,
row.get::<_, Sql<bitcoin::ScriptBuf>>("script")?,
))
})?;
for row in row_iter {
let (Sql(txid), vout, Sql(value), Sql(script_pubkey)) = row?;
changeset.txouts.insert(
OutPoint { txid, vout },
TxOut {
value,
script_pubkey,
},
);
}
let mut statement = db_tx.prepare(&format!(
"SELECT json(anchor), txid FROM {}",
Self::ANCHORS_TABLE_NAME,
))?;
let row_iter = statement.query_map([], |row| {
Ok((
row.get::<_, Sql<A>>("json(anchor)")?,
row.get::<_, Sql<Txid>>("txid")?,
))
})?;
for row in row_iter {
let (Sql(anchor), Sql(txid)) = row?;
changeset.anchors.insert((anchor, txid));
}
Ok(changeset)
}
/// Persist `changeset` to the sqlite database.
pub fn persist_to_sqlite(&self, db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> {
Self::init_sqlite_tables(db_tx)?;
use crate::rusqlite::named_params;
use crate::sqlite::Sql;
let mut statement = db_tx.prepare_cached(&format!(
"INSERT INTO {}(txid, raw_tx) VALUES(:txid, :raw_tx) ON CONFLICT(txid) DO UPDATE SET raw_tx=:raw_tx",
Self::TXS_TABLE_NAME,
))?;
for tx in &self.txs {
statement.execute(named_params! {
":txid": Sql(tx.compute_txid()),
":raw_tx": Sql(tx.as_ref().clone()),
})?;
}
let mut statement = db_tx
.prepare_cached(&format!(
"INSERT INTO {}(txid, last_seen) VALUES(:txid, :last_seen) ON CONFLICT(txid) DO UPDATE SET last_seen=:last_seen",
Self::TXS_TABLE_NAME,
))?;
for (&txid, &last_seen) in &self.last_seen {
statement.execute(named_params! {
":txid": Sql(txid),
":last_seen": Some(last_seen),
})?;
}
let mut statement = db_tx.prepare_cached(&format!(
"REPLACE INTO {}(txid, vout, value, script) VALUES(:txid, :vout, :value, :script)",
Self::TXOUTS_TABLE_NAME,
))?;
for (op, txo) in &self.txouts {
statement.execute(named_params! {
":txid": Sql(op.txid),
":vout": op.vout,
":value": Sql(txo.value),
":script": Sql(txo.script_pubkey.clone()),
})?;
}
let mut statement = db_tx.prepare_cached(&format!(
"REPLACE INTO {}(txid, block_height, block_hash, anchor) VALUES(:txid, :block_height, :block_hash, jsonb(:anchor))",
Self::ANCHORS_TABLE_NAME,
))?;
for (anchor, txid) in &self.anchors {
let anchor_block = anchor.anchor_block();
statement.execute(named_params! {
":txid": Sql(*txid),
":block_height": anchor_block.height,
":block_hash": Sql(anchor_block.hash),
":anchor": Sql(anchor.clone()),
})?;
}
Ok(())
}
}
impl<A: Ord> Merge for ChangeSet<A> {
fn merge(&mut self, other: Self) {
// We use `extend` instead of `BTreeMap::append` due to performance issues with `append`.