[persist_redesign] Introduce redesigned persist types

This is a more generic version of `keychain::persist::*` structures.

Additional changes:

* The `Append` trait has a new method `is_empty`.
* Introduce `Store` structure for `bdk_file_store`.
This commit is contained in:
志宇
2023-05-09 09:59:42 +08:00
parent e3c137043f
commit 2aa08a5898
10 changed files with 560 additions and 100 deletions

View File

@@ -0,0 +1,100 @@
use bincode::Options;
use std::{
fs::File,
io::{self, Seek},
marker::PhantomData,
};
use crate::bincode_options;
/// Iterator over entries in a file store.
///
/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the
/// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`.
///
/// [`next`]: Self::next
pub struct EntryIter<'t, T> {
db_file: Option<&'t mut File>,
/// The file position for the first read of `db_file`.
start_pos: Option<u64>,
types: PhantomData<T>,
}
impl<'t, T> EntryIter<'t, T> {
pub fn new(start_pos: u64, db_file: &'t mut File) -> Self {
Self {
db_file: Some(db_file),
start_pos: Some(start_pos),
types: PhantomData,
}
}
}
impl<'t, T> Iterator for EntryIter<'t, T>
where
T: serde::de::DeserializeOwned,
{
type Item = Result<T, IterError>;
fn next(&mut self) -> Option<Self::Item> {
// closure which reads a single entry starting from `self.pos`
let read_one = |f: &mut File, start_pos: Option<u64>| -> Result<Option<T>, IterError> {
let pos = match start_pos {
Some(pos) => f.seek(io::SeekFrom::Start(pos))?,
None => f.stream_position()?,
};
match bincode_options().deserialize_from(&*f) {
Ok(changeset) => {
f.stream_position()?;
Ok(Some(changeset))
}
Err(e) => {
if let bincode::ErrorKind::Io(inner) = &*e {
if inner.kind() == io::ErrorKind::UnexpectedEof {
let eof = f.seek(io::SeekFrom::End(0))?;
if pos == eof {
return Ok(None);
}
}
}
f.seek(io::SeekFrom::Start(pos))?;
Err(IterError::Bincode(*e))
}
}
};
let result = read_one(self.db_file.as_mut()?, self.start_pos.take());
if result.is_err() {
self.db_file = None;
}
result.transpose()
}
}
impl From<io::Error> for IterError {
fn from(value: io::Error) -> Self {
IterError::Io(value)
}
}
/// Error type for [`EntryIter`].
#[derive(Debug)]
pub enum IterError {
/// Failure to read from the file.
Io(io::Error),
/// Failure to decode data from the file.
Bincode(bincode::ErrorKind),
}
impl core::fmt::Display for IterError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
IterError::Io(e) => write!(f, "io error trying to read entry {}", e),
IterError::Bincode(e) => write!(f, "bincode error while reading entry {}", e),
}
}
}
impl std::error::Error for IterError {}

View File

@@ -6,14 +6,15 @@ use bdk_chain::{
keychain::{KeychainChangeSet, KeychainTracker},
sparse_chain,
};
use bincode::{DefaultOptions, Options};
use core::marker::PhantomData;
use bincode::Options;
use std::{
fs::{File, OpenOptions},
io::{self, Read, Seek, Write},
path::Path,
};
use crate::{bincode_options, EntryIter, IterError};
/// BDK File Store magic bytes length.
const MAGIC_BYTES_LEN: usize = 12;
@@ -28,10 +29,6 @@ pub struct KeychainStore<K, P> {
changeset_type_params: core::marker::PhantomData<(K, P)>,
}
fn bincode() -> impl bincode::Options {
DefaultOptions::new().with_varint_encoding()
}
impl<K, P> KeychainStore<K, P>
where
K: Ord + Clone + core::fmt::Debug,
@@ -85,11 +82,8 @@ where
/// **WARNING**: This method changes the write position in the underlying file. You should
/// always iterate over all entries until `None` is returned if you want your next write to go
/// at the end; otherwise, you will write over existing entries.
pub fn iter_changesets(&mut self) -> Result<EntryIter<'_, KeychainChangeSet<K, P>>, io::Error> {
self.db_file
.seek(io::SeekFrom::Start(MAGIC_BYTES_LEN as _))?;
Ok(EntryIter::new(&mut self.db_file))
pub fn iter_changesets(&mut self) -> Result<EntryIter<KeychainChangeSet<K, P>>, io::Error> {
Ok(EntryIter::new(MAGIC_BYTES_LEN as u64, &mut self.db_file))
}
/// Loads all the changesets that have been stored as one giant changeset.
@@ -144,7 +138,7 @@ where
return Ok(());
}
bincode()
bincode_options()
.serialize_into(&mut self.db_file, changeset)
.map_err(|e| match *e {
bincode::ErrorKind::Io(inner) => inner,
@@ -197,92 +191,6 @@ impl From<io::Error> for FileError {
impl std::error::Error for FileError {}
/// Error type for [`EntryIter`].
#[derive(Debug)]
pub enum IterError {
/// Failure to read from the file.
Io(io::Error),
/// Failure to decode data from the file.
Bincode(bincode::ErrorKind),
}
impl core::fmt::Display for IterError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
IterError::Io(e) => write!(f, "io error trying to read entry {}", e),
IterError::Bincode(e) => write!(f, "bincode error while reading entry {}", e),
}
}
}
impl std::error::Error for IterError {}
/// Iterator over entries in a file store.
///
/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the
/// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`.
///
/// [`next`]: Self::next
pub struct EntryIter<'a, V> {
db_file: &'a mut File,
types: PhantomData<V>,
error_exit: bool,
}
impl<'a, V> EntryIter<'a, V> {
pub fn new(db_file: &'a mut File) -> Self {
Self {
db_file,
types: PhantomData,
error_exit: false,
}
}
}
impl<'a, V> Iterator for EntryIter<'a, V>
where
V: serde::de::DeserializeOwned,
{
type Item = Result<V, IterError>;
fn next(&mut self) -> Option<Self::Item> {
let result = (|| {
let pos = self.db_file.stream_position()?;
match bincode().deserialize_from(&mut self.db_file) {
Ok(changeset) => Ok(Some(changeset)),
Err(e) => {
if let bincode::ErrorKind::Io(inner) = &*e {
if inner.kind() == io::ErrorKind::UnexpectedEof {
let eof = self.db_file.seek(io::SeekFrom::End(0))?;
if pos == eof {
return Ok(None);
}
}
}
self.db_file.seek(io::SeekFrom::Start(pos))?;
Err(IterError::Bincode(*e))
}
}
})();
let result = result.transpose();
if let Some(Err(_)) = &result {
self.error_exit = true;
}
result
}
}
impl From<io::Error> for IterError {
fn from(value: io::Error) -> Self {
IterError::Io(value)
}
}
#[cfg(test)]
mod test {
use super::*;
@@ -290,6 +198,7 @@ mod test {
keychain::{DerivationAdditions, KeychainChangeSet},
TxHeight,
};
use bincode::DefaultOptions;
use std::{
io::{Read, Write},
vec::Vec,

View File

@@ -1,10 +1,51 @@
#![doc = include_str!("../README.md")]
mod file_store;
mod entry_iter;
mod keychain_store;
mod store;
use std::io;
use bdk_chain::{
keychain::{KeychainChangeSet, KeychainTracker, PersistBackend},
sparse_chain::ChainPosition,
};
pub use file_store::*;
use bincode::{DefaultOptions, Options};
pub use entry_iter::*;
pub use keychain_store::*;
pub use store::*;
pub(crate) fn bincode_options() -> impl bincode::Options {
DefaultOptions::new().with_varint_encoding()
}
/// Error that occurs due to problems encountered with the file.
#[derive(Debug)]
pub enum FileError<'a> {
/// IO error, this may mean that the file is too short.
Io(io::Error),
/// Magic bytes do not match what is expected.
InvalidMagicBytes { got: Vec<u8>, expected: &'a [u8] },
}
impl<'a> core::fmt::Display for FileError<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::Io(e) => write!(f, "io error trying to read file: {}", e),
Self::InvalidMagicBytes { got, expected } => write!(
f,
"file has invalid magic bytes: expected={:?} got={:?}",
expected, got,
),
}
}
}
impl<'a> From<io::Error> for FileError<'a> {
fn from(value: io::Error) -> Self {
Self::Io(value)
}
}
impl<'a> std::error::Error for FileError<'a> {}
impl<K, P> PersistBackend<K, P> for KeychainStore<K, P>
where

View File

@@ -0,0 +1,289 @@
use std::{
fmt::Debug,
fs::{File, OpenOptions},
io::{self, Read, Seek, Write},
marker::PhantomData,
path::Path,
};
use bdk_chain::{Append, PersistBackend};
use bincode::Options;
use crate::{bincode_options, EntryIter, FileError, IterError};
/// Persists an append-only list of changesets (`C`) to a single file.
///
/// The changesets are the results of altering a tracker implementation (`T`).
#[derive(Debug)]
pub struct Store<'a, C> {
magic: &'a [u8],
db_file: File,
marker: PhantomData<C>,
}
impl<'a, C> PersistBackend<C> for Store<'a, C>
where
C: Default + Append + serde::Serialize + serde::de::DeserializeOwned,
{
type WriteError = std::io::Error;
type LoadError = IterError;
fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError> {
self.append_changeset(changeset)
}
fn load_from_persistence(&mut self) -> Result<C, Self::LoadError> {
let (changeset, result) = self.aggregate_changesets();
result.map(|_| changeset)
}
}
impl<'a, C> Store<'a, C>
where
C: Default + Append + serde::Serialize + serde::de::DeserializeOwned,
{
/// Creates a new store from a [`File`].
///
/// The file must have been opened with read and write permissions.
///
/// [`File`]: std::fs::File
pub fn new(magic: &'a [u8], mut db_file: File) -> Result<Self, FileError> {
db_file.rewind()?;
let mut magic_buf = Vec::from_iter((0..).take(magic.len()));
db_file.read_exact(magic_buf.as_mut())?;
if magic_buf != magic {
return Err(FileError::InvalidMagicBytes {
got: magic_buf,
expected: magic,
});
}
Ok(Self {
magic,
db_file,
marker: Default::default(),
})
}
/// Creates or loads a store from `db_path`.
///
/// If no file exists there, it will be created.
pub fn new_from_path<P>(magic: &'a [u8], db_path: P) -> Result<Self, FileError>
where
P: AsRef<Path>,
{
let already_exists = db_path.as_ref().exists();
let mut db_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(db_path)?;
if !already_exists {
db_file.write_all(magic)?;
}
Self::new(magic, db_file)
}
/// Iterates over the stored changeset from first to last, changing the seek position at each
/// iteration.
///
/// The iterator may fail to read an entry and therefore return an error. However, the first time
/// it returns an error will be the last. After doing so, the iterator will always yield `None`.
///
/// **WARNING**: This method changes the write position in the underlying file. You should
/// always iterate over all entries until `None` is returned if you want your next write to go
/// at the end; otherwise, you will write over existing entries.
pub fn iter_changesets(&mut self) -> EntryIter<C> {
EntryIter::new(self.magic.len() as u64, &mut self.db_file)
}
/// Loads all the changesets that have been stored as one giant changeset.
///
/// This function returns a tuple of the aggregate changeset and a result that indicates
/// whether an error occurred while reading or deserializing one of the entries. If so the
/// changeset will consist of all of those it was able to read.
///
/// You should usually check the error. In many applications, it may make sense to do a full
/// wallet scan with a stop-gap after getting an error, since it is likely that one of the
/// changesets it was unable to read changed the derivation indices of the tracker.
///
/// **WARNING**: This method changes the write position of the underlying file. The next
/// changeset will be written over the erroring entry (or the end of the file if none existed).
pub fn aggregate_changesets(&mut self) -> (C, Result<(), IterError>) {
let mut changeset = C::default();
let result = (|| {
for next_changeset in self.iter_changesets() {
changeset.append(next_changeset?);
}
Ok(())
})();
(changeset, result)
}
/// Append a new changeset to the file and truncate the file to the end of the appended
/// changeset.
///
/// The truncation is to avoid the possibility of having a valid but inconsistent changeset
/// directly after the appended changeset.
pub fn append_changeset(&mut self, changeset: &C) -> Result<(), io::Error> {
// no need to write anything if changeset is empty
if changeset.is_empty() {
return Ok(());
}
bincode_options()
.serialize_into(&mut self.db_file, changeset)
.map_err(|e| match *e {
bincode::ErrorKind::Io(inner) => inner,
unexpected_err => panic!("unexpected bincode error: {}", unexpected_err),
})?;
// truncate file after this changeset addition
// if this is not done, data after this changeset may represent valid changesets, however
// applying those changesets on top of this one may result in an inconsistent state
let pos = self.db_file.stream_position()?;
self.db_file.set_len(pos)?;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use bincode::DefaultOptions;
use std::{
io::{Read, Write},
vec::Vec,
};
use tempfile::NamedTempFile;
const TEST_MAGIC_BYTES_LEN: usize = 12;
const TEST_MAGIC_BYTES: [u8; TEST_MAGIC_BYTES_LEN] =
[98, 100, 107, 102, 115, 49, 49, 49, 49, 49, 49, 49];
#[derive(
Debug,
Clone,
Copy,
PartialOrd,
Ord,
PartialEq,
Eq,
Hash,
serde::Serialize,
serde::Deserialize,
)]
enum TestKeychain {
External,
Internal,
}
impl core::fmt::Display for TestKeychain {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::External => write!(f, "external"),
Self::Internal => write!(f, "internal"),
}
}
}
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
struct TestChangeSet {
pub changes: Vec<String>,
}
impl Append for TestChangeSet {
fn append(&mut self, mut other: Self) {
self.changes.append(&mut other.changes)
}
fn is_empty(&self) -> bool {
self.changes.is_empty()
}
}
#[derive(Debug)]
struct TestTracker;
#[test]
fn new_fails_if_file_is_too_short() {
let mut file = NamedTempFile::new().unwrap();
file.write_all(&TEST_MAGIC_BYTES[..TEST_MAGIC_BYTES_LEN - 1])
.expect("should write");
match Store::<TestChangeSet>::new(&TEST_MAGIC_BYTES, file.reopen().unwrap()) {
Err(FileError::Io(e)) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof),
unexpected => panic!("unexpected result: {:?}", unexpected),
};
}
#[test]
fn new_fails_if_magic_bytes_are_invalid() {
let invalid_magic_bytes = "ldkfs0000000";
let mut file = NamedTempFile::new().unwrap();
file.write_all(invalid_magic_bytes.as_bytes())
.expect("should write");
match Store::<TestChangeSet>::new(&TEST_MAGIC_BYTES, file.reopen().unwrap()) {
Err(FileError::InvalidMagicBytes { got, .. }) => {
assert_eq!(got, invalid_magic_bytes.as_bytes())
}
unexpected => panic!("unexpected result: {:?}", unexpected),
};
}
#[test]
fn append_changeset_truncates_invalid_bytes() {
// initial data to write to file (magic bytes + invalid data)
let mut data = [255_u8; 2000];
data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES);
let changeset = TestChangeSet {
changes: vec!["one".into(), "two".into(), "three!".into()],
};
let mut file = NamedTempFile::new().unwrap();
file.write_all(&data).expect("should write");
let mut store = Store::<TestChangeSet>::new(&TEST_MAGIC_BYTES, file.reopen().unwrap())
.expect("should open");
match store.iter_changesets().next() {
Some(Err(IterError::Bincode(_))) => {}
unexpected_res => panic!("unexpected result: {:?}", unexpected_res),
}
store.append_changeset(&changeset).expect("should append");
drop(store);
let got_bytes = {
let mut buf = Vec::new();
file.reopen()
.unwrap()
.read_to_end(&mut buf)
.expect("should read");
buf
};
let expected_bytes = {
let mut buf = TEST_MAGIC_BYTES.to_vec();
DefaultOptions::new()
.with_varint_encoding()
.serialize_into(&mut buf, &changeset)
.expect("should encode");
buf
};
assert_eq!(got_bytes, expected_bytes);
}
}