optimize data structures for advanced GBT algorithm

This commit is contained in:
Mononaut
2023-05-02 17:46:48 -06:00
parent d9db142c65
commit 6d4c2f8f8f
4 changed files with 142 additions and 62 deletions

View File

@@ -1,5 +1,5 @@
import logger from '../logger';
import { MempoolBlock, TransactionExtended, ThreadTransaction, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor } from '../mempool.interfaces';
import { MempoolBlock, TransactionExtended, ThreadTransaction, TransactionStripped, MempoolBlockWithTransactions, MempoolBlockDelta, Ancestor, CompactThreadTransaction } from '../mempool.interfaces';
import { Common } from './common';
import config from '../config';
import { Worker } from 'worker_threads';
@@ -10,6 +10,9 @@ class MempoolBlocks {
private mempoolBlockDeltas: MempoolBlockDelta[] = [];
private txSelectionWorker: Worker | null = null;
private nextUid: number = 1;
private uidMap: Map<number, string> = new Map(); // map short numerical uids to full txids
constructor() {}
public getMempoolBlocks(): MempoolBlock[] {
@@ -175,18 +178,26 @@ class MempoolBlocks {
}
public async $makeBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, saveResults: boolean = false): Promise<MempoolBlockWithTransactions[]> {
// reset mempool short ids
this.resetUids();
for (const tx of Object.values(newMempool)) {
this.setUid(tx);
}
// prepare a stripped down version of the mempool with only the minimum necessary data
// to reduce the overhead of passing this data to the worker thread
const strippedMempool: { [txid: string]: ThreadTransaction } = {};
const strippedMempool: Map<number, CompactThreadTransaction> = new Map();
Object.values(newMempool).forEach(entry => {
strippedMempool[entry.txid] = {
txid: entry.txid,
fee: entry.fee,
weight: entry.weight,
feePerVsize: entry.fee / (entry.weight / 4),
effectiveFeePerVsize: entry.fee / (entry.weight / 4),
vin: entry.vin.map(v => v.txid),
};
if (entry.uid != null) {
strippedMempool.set(entry.uid, {
uid: entry.uid,
fee: entry.fee,
weight: entry.weight,
feePerVsize: entry.fee / (entry.weight / 4),
effectiveFeePerVsize: entry.fee / (entry.weight / 4),
inputs: entry.vin.map(v => this.getUid(newMempool[v.txid])).filter(uid => uid != null) as number[],
});
}
});
// (re)initialize tx selection worker thread
@@ -205,7 +216,7 @@ class MempoolBlocks {
// run the block construction algorithm in a separate thread, and wait for a result
let threadErrorListener;
try {
const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve, reject) => {
const workerResultPromise = new Promise<{ blocks: CompactThreadTransaction[][], clusters: Map<number, number[]> }>((resolve, reject) => {
threadErrorListener = reject;
this.txSelectionWorker?.once('message', (result): void => {
resolve(result);
@@ -213,7 +224,7 @@ class MempoolBlocks {
this.txSelectionWorker?.once('error', reject);
});
this.txSelectionWorker.postMessage({ type: 'set', mempool: strippedMempool });
let { blocks, clusters } = await workerResultPromise;
let { blocks, clusters } = this.convertResultTxids(await workerResultPromise);
// filter out stale transactions
const unfilteredCount = blocks.reduce((total, block) => { return total + block.length; }, 0);
blocks = blocks.map(block => block.filter(tx => (tx.txid && tx.txid in newMempool)));
@@ -232,37 +243,42 @@ class MempoolBlocks {
return this.mempoolBlocks;
}
public async $updateBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, added: TransactionExtended[], removed: string[], saveResults: boolean = false): Promise<void> {
public async $updateBlockTemplates(newMempool: { [txid: string]: TransactionExtended }, added: TransactionExtended[], removed: TransactionExtended[], saveResults: boolean = false): Promise<void> {
if (!this.txSelectionWorker) {
// need to reset the worker
await this.$makeBlockTemplates(newMempool, saveResults);
return;
}
for (const tx of Object.values(added)) {
this.setUid(tx);
}
const removedUids = removed.map(tx => this.getUid(tx)).filter(uid => uid != null) as number[];
// prepare a stripped down version of the mempool with only the minimum necessary data
// to reduce the overhead of passing this data to the worker thread
const addedStripped: ThreadTransaction[] = added.map(entry => {
const addedStripped: CompactThreadTransaction[] = added.filter(entry => entry.uid != null).map(entry => {
return {
txid: entry.txid,
uid: entry.uid || 0,
fee: entry.fee,
weight: entry.weight,
feePerVsize: entry.fee / (entry.weight / 4),
effectiveFeePerVsize: entry.fee / (entry.weight / 4),
vin: entry.vin.map(v => v.txid),
inputs: entry.vin.map(v => this.getUid(newMempool[v.txid])).filter(uid => uid != null) as number[],
};
});
// run the block construction algorithm in a separate thread, and wait for a result
let threadErrorListener;
try {
const workerResultPromise = new Promise<{ blocks: ThreadTransaction[][], clusters: { [root: string]: string[] } }>((resolve, reject) => {
const workerResultPromise = new Promise<{ blocks: CompactThreadTransaction[][], clusters: Map<number, number[]> }>((resolve, reject) => {
threadErrorListener = reject;
this.txSelectionWorker?.once('message', (result): void => {
resolve(result);
});
this.txSelectionWorker?.once('error', reject);
});
this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed });
let { blocks, clusters } = await workerResultPromise;
this.txSelectionWorker.postMessage({ type: 'update', added: addedStripped, removed: removedUids });
let { blocks, clusters } = this.convertResultTxids(await workerResultPromise);
// filter out stale transactions
const unfilteredCount = blocks.reduce((total, block) => { return total + block.length; }, 0);
blocks = blocks.map(block => block.filter(tx => (tx.txid && tx.txid in newMempool)));
@@ -271,6 +287,8 @@ class MempoolBlocks {
logger.warn(`tx selection worker thread returned ${unfilteredCount - filteredCount} stale transactions from updateBlockTemplates`);
}
this.removeUids(removedUids);
// clean up thread error listener
this.txSelectionWorker?.removeListener('error', threadErrorListener);
@@ -280,7 +298,7 @@ class MempoolBlocks {
}
}
private processBlockTemplates(mempool, blocks, clusters, saveResults): MempoolBlockWithTransactions[] {
private processBlockTemplates(mempool, blocks: ThreadTransaction[][], clusters, saveResults): MempoolBlockWithTransactions[] {
// update this thread's mempool with the results
blocks.forEach((block, blockIndex) => {
let runningVsize = 0;
@@ -371,6 +389,54 @@ class MempoolBlocks {
transactions: fitTransactions.map((tx) => Common.stripTransaction(tx)),
};
}
private resetUids(): void {
this.uidMap.clear();
this.nextUid = 1;
}
private setUid(tx: TransactionExtended): number {
const uid = this.nextUid;
this.nextUid++;
this.uidMap.set(uid, tx.txid);
tx.uid = uid;
return uid;
}
private getUid(tx: TransactionExtended): number | void {
if (tx?.uid != null && this.uidMap.has(tx.uid)) {
return tx.uid;
}
}
private removeUids(uids: number[]): void {
for (const uid of uids) {
this.uidMap.delete(uid);
}
}
private convertResultTxids({ blocks, clusters }: { blocks: any[][], clusters: Map<number, number[]>})
: { blocks: ThreadTransaction[][], clusters: { [root: string]: string[] }} {
for (const block of blocks) {
for (const tx of block) {
tx.txid = this.uidMap.get(tx.uid);
if (tx.cpfpRoot) {
tx.cpfpRoot = this.uidMap.get(tx.cpfpRoot);
}
}
}
const convertedClusters = {};
for (const rootUid of clusters.keys()) {
const rootTxid = this.uidMap.get(rootUid);
if (rootTxid) {
const members = clusters.get(rootUid)?.map(uid => {
return this.uidMap.get(uid);
});
convertedClusters[rootTxid] = members;
}
}
return { blocks, clusters: convertedClusters } as { blocks: ThreadTransaction[][], clusters: { [root: string]: string[] }};
}
}
export default new MempoolBlocks();