Added Logging and refactored a bit

This commit is contained in:
junderw
2023-06-26 19:29:32 -07:00
committed by Mononaut
parent 702c4c123e
commit 6650541b2d
7 changed files with 265 additions and 23 deletions

View File

@@ -17,6 +17,9 @@ bytes = "1.4.0"
napi = { version = "2.13.2", features = ["napi8", "tokio_rt"] }
napi-derive = "2.13.0"
bytemuck = "1.13.1"
tracing = "0.1.36"
tracing-log = "0.1.3"
tracing-subscriber = { version = "0.3.15", features = ["env-filter"]}
[build-dependencies]
napi-build = "2.0.1"

View File

@@ -9,7 +9,7 @@ use std::{
};
#[allow(clippy::struct_excessive_bools)]
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct AuditTransaction {
pub uid: u32,
pub fee: u64,
@@ -97,11 +97,6 @@ impl AuditTransaction {
self.score
}
#[inline]
pub const fn ancestor_fee(&self) -> u64 {
self.ancestor_fee
}
#[inline]
pub const fn ancestor_weight(&self) -> u32 {
self.ancestor_weight
@@ -112,6 +107,23 @@ impl AuditTransaction {
self.ancestor_sigops
}
#[inline]
pub fn cluster_rate(&self) -> f64 {
// Safety: self.ancestor_weight can never be 0.
// Even if it could, as it approaches 0, the value inside the min() call
// grows, so if we think of 0 as "grew infinitely" then dependency_rate would be
// the smaller of the two. If either side is NaN, the other side is returned.
self.dependency_rate
.min(self.ancestor_fee as f64 / (f64::from(self.ancestor_weight) / 4.0))
}
pub fn set_dirty_if_different(&mut self, cluster_rate: f64) {
if self.effective_fee_per_vsize != cluster_rate {
self.effective_fee_per_vsize = cluster_rate;
self.dirty = true;
}
}
/// Safety: This function must NEVER set score to NaN.
#[inline]
fn calc_new_score(&mut self) {

View File

@@ -3,6 +3,7 @@ use std::{
cmp::Ordering,
collections::{HashMap, HashSet},
};
use tracing::{info, trace};
use crate::{
audit_transaction::AuditTransaction,
@@ -20,6 +21,7 @@ const MAX_BLOCKS: usize = 8;
type AuditPool = HashMap<u32, AuditTransaction, U32HasherState>;
type ModifiedQueue = PriorityQueue<u32, TxPriority, U32HasherState>;
#[derive(Debug)]
struct TxPriority {
uid: u32,
score: f64,
@@ -60,7 +62,7 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult {
let mut mempool_stack: Vec<u32> = Vec::with_capacity(STARTING_CAPACITY);
let mut clusters: Vec<Vec<u32>> = Vec::new();
// Initialize working structs
info!("Initializing working structs");
for (uid, tx) in mempool {
let audit_tx = AuditTransaction::from_thread_transaction(tx);
// Safety: audit_pool and mempool_stack must always contain the same transactions
@@ -68,12 +70,13 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult {
mempool_stack.push(*uid);
}
// Build relatives graph & calculate ancestor scores
info!("Building relatives graph & calculate ancestor scores");
for txid in &mempool_stack {
set_relatives(*txid, &mut audit_pool);
}
trace!("Post relative graph Audit Pool: {:#?}", audit_pool);
// Sort by descending ancestor score
info!("Sorting by descending ancestor score");
mempool_stack.sort_unstable_by(|a, b| {
let a_tx = audit_pool
.get(a)
@@ -84,8 +87,8 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult {
a_tx.cmp(b_tx)
});
// Build blocks by greedily choosing the highest feerate package
// (i.e. the package rooted in the transaction with the best ancestor score)
info!("Building blocks by greedily choosing the highest feerate package");
info!("(i.e. the package rooted in the transaction with the best ancestor score)");
let mut blocks: Vec<Vec<u32>> = Vec::new();
let mut block_weight: u32 = BLOCK_RESERVED_WEIGHT;
let mut block_sigops: u32 = 0;
@@ -94,6 +97,22 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult {
let mut overflow: Vec<u32> = Vec::new();
let mut failures = 0;
while !mempool_stack.is_empty() || !modified.is_empty() {
// This trace log storm is big, so to make scrolling through
// Each iteration easier, leaving a bunch of empty rows
// And a header of ======
trace!("\n\n\n\n\n\n\n\n\n\n==================================");
trace!("mempool_array: {:#?}", mempool_stack);
trace!("clusters: {:#?}", clusters);
trace!("modified: {:#?}", modified);
trace!("audit_pool: {:#?}", audit_pool);
trace!("blocks: {:#?}", blocks);
trace!("block_weight: {:#?}", block_weight);
trace!("block_sigops: {:#?}", block_sigops);
trace!("transactions: {:#?}", transactions);
trace!("overflow: {:#?}", overflow);
trace!("failures: {:#?}", failures);
trace!("\n==================================");
let next_from_stack = next_valid_from_stack(&mut mempool_stack, &audit_pool);
let next_from_queue = next_valid_from_queue(&mut modified, &audit_pool);
if next_from_stack.is_none() && next_from_queue.is_none() {
@@ -134,18 +153,13 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult {
package.sort_unstable_by_key(|a| a.1);
package.push((next_tx.uid, next_tx.ancestors.len()));
let cluster_rate = next_tx
.dependency_rate
.min(next_tx.ancestor_fee() as f64 / (f64::from(next_tx.ancestor_weight()) / 4.0));
let cluster_rate = next_tx.cluster_rate();
for (txid, _) in &package {
cluster.push(*txid);
if let Some(tx) = audit_pool.get_mut(txid) {
tx.used = true;
if tx.effective_fee_per_vsize != cluster_rate {
tx.effective_fee_per_vsize = cluster_rate;
tx.dirty = true;
}
tx.set_dirty_if_different(cluster_rate);
transactions.push(tx.uid);
block_weight += tx.weight;
block_sigops += tx.sigops;
@@ -202,10 +216,15 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult {
// make a list of dirty transactions and their new rates
let mut rates: Vec<Vec<f64>> = Vec::new();
for (txid, tx) in audit_pool {
trace!("txid: {}, is_dirty: {}", txid, tx.dirty);
if tx.dirty {
rates.push(vec![f64::from(txid), tx.effective_fee_per_vsize]);
}
}
trace!("\n\n\n\n\n====================");
trace!("blocks: {:#?}", blocks);
trace!("clusters: {:#?}", clusters);
trace!("rates: {:#?}\n====================\n\n\n\n\n", rates);
GbtResult {
blocks,
@@ -214,7 +233,10 @@ pub fn gbt(mempool: &mut ThreadTransactionsMap) -> GbtResult {
}
}
fn next_valid_from_stack<'a>(mempool_stack: &mut Vec<u32>, audit_pool: &'a AuditPool) -> Option<&'a AuditTransaction> {
fn next_valid_from_stack<'a>(
mempool_stack: &mut Vec<u32>,
audit_pool: &'a AuditPool,
) -> Option<&'a AuditTransaction> {
let mut next_txid = mempool_stack.last()?;
let mut tx: &AuditTransaction = audit_pool.get(next_txid)?;
while tx.used || tx.modified {
@@ -225,7 +247,10 @@ fn next_valid_from_stack<'a>(mempool_stack: &mut Vec<u32>, audit_pool: &'a Audit
Some(tx)
}
fn next_valid_from_queue<'a>(queue: &mut ModifiedQueue, audit_pool: &'a AuditPool) -> Option<&'a AuditTransaction> {
fn next_valid_from_queue<'a>(
queue: &mut ModifiedQueue,
audit_pool: &'a AuditPool,
) -> Option<&'a AuditTransaction> {
let mut next_txid = queue.peek()?.0;
let mut tx: &AuditTransaction = audit_pool.get(next_txid)?;
while tx.used {

View File

@@ -8,17 +8,21 @@
use napi::bindgen_prelude::{Result, Uint8Array};
use napi_derive::napi;
use tracing::{debug, info, trace};
use tracing_log::LogTracer;
use tracing_subscriber::{EnvFilter, FmtSubscriber};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use u32_hasher_types::{u32hashmap_with_capacity, U32HasherState};
mod audit_transaction;
mod gbt;
mod thread_transaction;
mod u32_hasher_types;
mod utils;
use thread_transaction::ThreadTransaction;
use u32_hasher_types::{u32hashmap_with_capacity, U32HasherState};
/// This is the starting capacity for HashMap/Vec/etc. that deal with transactions.
/// `HashMap` doubles capacity when it hits it, so 2048 is a decent tradeoff between
@@ -35,12 +39,36 @@ pub struct GbtGenerator {
thread_transactions: Arc<Mutex<ThreadTransactionsMap>>,
}
#[napi::module_init]
fn init() {
// Set all `tracing` logs to print to STDOUT
// Note: Passing RUST_LOG env variable to the node process
// will change the log level for the rust module.
tracing::subscriber::set_global_default(
FmtSubscriber::builder()
.with_env_filter(EnvFilter::from_default_env())
.with_ansi(
// Default to no-color logs.
// Setting RUST_LOG_COLOR to 1 or true|TRUE|True etc.
// will enable color
std::env::var("RUST_LOG_COLOR")
.map(|s| ["1", "true"].contains(&&*s.to_lowercase()))
.unwrap_or(false),
)
.finish(),
)
.expect("Logging subscriber failed");
// Convert all `log` logs into `tracing` events
LogTracer::init().expect("Legacy log subscriber failed");
}
#[napi]
impl GbtGenerator {
#[napi(constructor)]
#[allow(clippy::new_without_default)]
#[must_use]
pub fn new() -> Self {
debug!("Created new GbtGenerator");
Self {
thread_transactions: Arc::new(Mutex::new(u32hashmap_with_capacity(STARTING_CAPACITY))),
}
@@ -51,6 +79,7 @@ impl GbtGenerator {
/// Rejects if the thread panics or if the Mutex is poisoned.
#[napi]
pub async fn make(&self, mempool_buffer: Uint8Array) -> Result<GbtResult> {
trace!("make: Current State {:#?}", self.thread_transactions);
run_task(Arc::clone(&self.thread_transactions), move |map| {
for tx in ThreadTransaction::batch_from_buffer(&mempool_buffer) {
map.insert(tx.uid, tx);
@@ -64,6 +93,7 @@ impl GbtGenerator {
/// Rejects if the thread panics or if the Mutex is poisoned.
#[napi]
pub async fn update(&self, new_txs: Uint8Array, remove_txs: Uint8Array) -> Result<GbtResult> {
trace!("update: Current State {:#?}", self.thread_transactions);
run_task(Arc::clone(&self.thread_transactions), move |map| {
for tx in ThreadTransaction::batch_from_buffer(&new_txs) {
map.insert(tx.uid, tx);
@@ -105,12 +135,28 @@ async fn run_task<F>(
where
F: FnOnce(&mut ThreadTransactionsMap) + Send + 'static,
{
debug!("Spawning thread...");
let handle = napi::tokio::task::spawn_blocking(move || {
debug!(
"Getting lock for thread_transactions from thread {:?}...",
std::thread::current().id()
);
let mut map = thread_transactions
.lock()
.map_err(|_| napi::Error::from_reason("THREAD_TRANSACTIONS Mutex poisoned"))?;
callback(&mut map);
Ok(gbt::gbt(&mut map))
info!("Starting gbt algorithm for {} elements...", map.len());
let result = gbt::gbt(&mut map);
info!("Finished gbt algorithm for {} elements...", map.len());
debug!(
"Releasing lock for thread_transactions from thread {:?}...",
std::thread::current().id()
);
drop(map);
Ok(result)
});
handle

View File

@@ -1,6 +1,7 @@
use bytes::buf::Buf;
use std::io::Cursor;
#[derive(Debug)]
pub struct ThreadTransaction {
pub uid: u32,
pub fee: u64,

View File

@@ -1,6 +1,7 @@
use priority_queue::PriorityQueue;
use std::{
collections::{HashMap, HashSet},
fmt::Debug,
hash::{BuildHasher, Hasher},
};
@@ -25,6 +26,12 @@ pub fn u32hashset_new() -> HashSet<u32, U32HasherState> {
#[derive(Clone)]
pub struct U32HasherState(());
impl Debug for U32HasherState {
fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
impl BuildHasher for U32HasherState {
type Hasher = U32Hasher;