Refactor database pool use

This commit is contained in:
nymkappa
2022-04-12 15:15:57 +09:00
parent 0814816950
commit c18d525f4a
10 changed files with 164 additions and 358 deletions

View File

@@ -1,4 +1,3 @@
import { PoolConnection } from 'mysql2/promise';
import config from '../config';
import { DB } from '../database';
import logger from '../logger';
@@ -77,116 +76,112 @@ class DatabaseMigration {
await this.$setStatisticsAddedIndexedFlag(databaseSchemaVersion);
const isBitcoin = ['mainnet', 'testnet', 'signet'].includes(config.MEMPOOL.NETWORK);
const connection = await DB.getConnection();
try {
await this.$executeQuery(connection, this.getCreateElementsTableQuery(), await this.$checkIfTableExists('elements_pegs'));
await this.$executeQuery(connection, this.getCreateStatisticsQuery(), await this.$checkIfTableExists('statistics'));
await this.$executeQuery(this.getCreateElementsTableQuery(), await this.$checkIfTableExists('elements_pegs'));
await this.$executeQuery(this.getCreateStatisticsQuery(), await this.$checkIfTableExists('statistics'));
if (databaseSchemaVersion < 2 && this.statisticsAddedIndexed === false) {
await this.$executeQuery(connection, `CREATE INDEX added ON statistics (added);`);
await this.$executeQuery(`CREATE INDEX added ON statistics (added);`);
}
if (databaseSchemaVersion < 3) {
await this.$executeQuery(connection, this.getCreatePoolsTableQuery(), await this.$checkIfTableExists('pools'));
await this.$executeQuery(this.getCreatePoolsTableQuery(), await this.$checkIfTableExists('pools'));
}
if (databaseSchemaVersion < 4) {
await this.$executeQuery(connection, 'DROP table IF EXISTS blocks;');
await this.$executeQuery(connection, this.getCreateBlocksTableQuery(), await this.$checkIfTableExists('blocks'));
await this.$executeQuery('DROP table IF EXISTS blocks;');
await this.$executeQuery(this.getCreateBlocksTableQuery(), await this.$checkIfTableExists('blocks'));
}
if (databaseSchemaVersion < 5 && isBitcoin === true) {
logger.warn(`'blocks' table has been truncated. Re-indexing from scratch.`);
await this.$executeQuery(connection, 'TRUNCATE blocks;'); // Need to re-index
await this.$executeQuery(connection, 'ALTER TABLE blocks ADD `reward` double unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery('TRUNCATE blocks;'); // Need to re-index
await this.$executeQuery('ALTER TABLE blocks ADD `reward` double unsigned NOT NULL DEFAULT "0"');
}
if (databaseSchemaVersion < 6 && isBitcoin === true) {
logger.warn(`'blocks' table has been truncated. Re-indexing from scratch.`);
await this.$executeQuery(connection, 'TRUNCATE blocks;'); // Need to re-index
await this.$executeQuery('TRUNCATE blocks;'); // Need to re-index
// Cleanup original blocks fields type
await this.$executeQuery(connection, 'ALTER TABLE blocks MODIFY `height` integer unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery(connection, 'ALTER TABLE blocks MODIFY `tx_count` smallint unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery(connection, 'ALTER TABLE blocks MODIFY `size` integer unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery(connection, 'ALTER TABLE blocks MODIFY `weight` integer unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery(connection, 'ALTER TABLE blocks MODIFY `difficulty` double NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE blocks MODIFY `height` integer unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE blocks MODIFY `tx_count` smallint unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE blocks MODIFY `size` integer unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE blocks MODIFY `weight` integer unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE blocks MODIFY `difficulty` double NOT NULL DEFAULT "0"');
// We also fix the pools.id type so we need to drop/re-create the foreign key
await this.$executeQuery(connection, 'ALTER TABLE blocks DROP FOREIGN KEY IF EXISTS `blocks_ibfk_1`');
await this.$executeQuery(connection, 'ALTER TABLE pools MODIFY `id` smallint unsigned AUTO_INCREMENT');
await this.$executeQuery(connection, 'ALTER TABLE blocks MODIFY `pool_id` smallint unsigned NULL');
await this.$executeQuery(connection, 'ALTER TABLE blocks ADD FOREIGN KEY (`pool_id`) REFERENCES `pools` (`id`)');
await this.$executeQuery('ALTER TABLE blocks DROP FOREIGN KEY IF EXISTS `blocks_ibfk_1`');
await this.$executeQuery('ALTER TABLE pools MODIFY `id` smallint unsigned AUTO_INCREMENT');
await this.$executeQuery('ALTER TABLE blocks MODIFY `pool_id` smallint unsigned NULL');
await this.$executeQuery('ALTER TABLE blocks ADD FOREIGN KEY (`pool_id`) REFERENCES `pools` (`id`)');
// Add new block indexing fields
await this.$executeQuery(connection, 'ALTER TABLE blocks ADD `version` integer unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery(connection, 'ALTER TABLE blocks ADD `bits` integer unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery(connection, 'ALTER TABLE blocks ADD `nonce` bigint unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery(connection, 'ALTER TABLE blocks ADD `merkle_root` varchar(65) NOT NULL DEFAULT ""');
await this.$executeQuery(connection, 'ALTER TABLE blocks ADD `previous_block_hash` varchar(65) NULL');
await this.$executeQuery('ALTER TABLE blocks ADD `version` integer unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE blocks ADD `bits` integer unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE blocks ADD `nonce` bigint unsigned NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE blocks ADD `merkle_root` varchar(65) NOT NULL DEFAULT ""');
await this.$executeQuery('ALTER TABLE blocks ADD `previous_block_hash` varchar(65) NULL');
}
if (databaseSchemaVersion < 7 && isBitcoin === true) {
await this.$executeQuery(connection, 'DROP table IF EXISTS hashrates;');
await this.$executeQuery(connection, this.getCreateDailyStatsTableQuery(), await this.$checkIfTableExists('hashrates'));
await this.$executeQuery('DROP table IF EXISTS hashrates;');
await this.$executeQuery(this.getCreateDailyStatsTableQuery(), await this.$checkIfTableExists('hashrates'));
}
if (databaseSchemaVersion < 8 && isBitcoin === true) {
logger.warn(`'hashrates' table has been truncated. Re-indexing from scratch.`);
await this.$executeQuery(connection, 'TRUNCATE hashrates;'); // Need to re-index
await this.$executeQuery(connection, 'ALTER TABLE `hashrates` DROP INDEX `PRIMARY`');
await this.$executeQuery(connection, 'ALTER TABLE `hashrates` ADD `id` int NOT NULL AUTO_INCREMENT PRIMARY KEY FIRST');
await this.$executeQuery(connection, 'ALTER TABLE `hashrates` ADD `share` float NOT NULL DEFAULT "0"');
await this.$executeQuery(connection, 'ALTER TABLE `hashrates` ADD `type` enum("daily", "weekly") DEFAULT "daily"');
await this.$executeQuery('TRUNCATE hashrates;'); // Need to re-index
await this.$executeQuery('ALTER TABLE `hashrates` DROP INDEX `PRIMARY`');
await this.$executeQuery('ALTER TABLE `hashrates` ADD `id` int NOT NULL AUTO_INCREMENT PRIMARY KEY FIRST');
await this.$executeQuery('ALTER TABLE `hashrates` ADD `share` float NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE `hashrates` ADD `type` enum("daily", "weekly") DEFAULT "daily"');
}
if (databaseSchemaVersion < 9 && isBitcoin === true) {
logger.warn(`'hashrates' table has been truncated. Re-indexing from scratch.`);
await this.$executeQuery(connection, 'TRUNCATE hashrates;'); // Need to re-index
await this.$executeQuery(connection, 'ALTER TABLE `state` CHANGE `name` `name` varchar(100)');
await this.$executeQuery(connection, 'ALTER TABLE `hashrates` ADD UNIQUE `hashrate_timestamp_pool_id` (`hashrate_timestamp`, `pool_id`)');
await this.$executeQuery('TRUNCATE hashrates;'); // Need to re-index
await this.$executeQuery('ALTER TABLE `state` CHANGE `name` `name` varchar(100)');
await this.$executeQuery('ALTER TABLE `hashrates` ADD UNIQUE `hashrate_timestamp_pool_id` (`hashrate_timestamp`, `pool_id`)');
}
if (databaseSchemaVersion < 10 && isBitcoin === true) {
await this.$executeQuery(connection, 'ALTER TABLE `blocks` ADD INDEX `blockTimestamp` (`blockTimestamp`)');
await this.$executeQuery('ALTER TABLE `blocks` ADD INDEX `blockTimestamp` (`blockTimestamp`)');
}
if (databaseSchemaVersion < 11 && isBitcoin === true) {
logger.warn(`'blocks' table has been truncated. Re-indexing from scratch.`);
await this.$executeQuery(connection, 'TRUNCATE blocks;'); // Need to re-index
await this.$executeQuery(connection, `ALTER TABLE blocks
await this.$executeQuery('TRUNCATE blocks;'); // Need to re-index
await this.$executeQuery(`ALTER TABLE blocks
ADD avg_fee INT UNSIGNED NULL,
ADD avg_fee_rate INT UNSIGNED NULL
`);
await this.$executeQuery(connection, 'ALTER TABLE blocks MODIFY `reward` BIGINT UNSIGNED NOT NULL DEFAULT "0"');
await this.$executeQuery(connection, 'ALTER TABLE blocks MODIFY `median_fee` INT UNSIGNED NOT NULL DEFAULT "0"');
await this.$executeQuery(connection, 'ALTER TABLE blocks MODIFY `fees` INT UNSIGNED NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE blocks MODIFY `reward` BIGINT UNSIGNED NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE blocks MODIFY `median_fee` INT UNSIGNED NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE blocks MODIFY `fees` INT UNSIGNED NOT NULL DEFAULT "0"');
}
if (databaseSchemaVersion < 12 && isBitcoin === true) {
// No need to re-index because the new data type can contain larger values
await this.$executeQuery(connection, 'ALTER TABLE blocks MODIFY `fees` BIGINT UNSIGNED NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE blocks MODIFY `fees` BIGINT UNSIGNED NOT NULL DEFAULT "0"');
}
if (databaseSchemaVersion < 13 && isBitcoin === true) {
await this.$executeQuery(connection, 'ALTER TABLE blocks MODIFY `difficulty` DOUBLE UNSIGNED NOT NULL DEFAULT "0"');
await this.$executeQuery(connection, 'ALTER TABLE blocks MODIFY `median_fee` BIGINT UNSIGNED NOT NULL DEFAULT "0"');
await this.$executeQuery(connection, 'ALTER TABLE blocks MODIFY `avg_fee` BIGINT UNSIGNED NOT NULL DEFAULT "0"');
await this.$executeQuery(connection, 'ALTER TABLE blocks MODIFY `avg_fee_rate` BIGINT UNSIGNED NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE blocks MODIFY `difficulty` DOUBLE UNSIGNED NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE blocks MODIFY `median_fee` BIGINT UNSIGNED NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE blocks MODIFY `avg_fee` BIGINT UNSIGNED NOT NULL DEFAULT "0"');
await this.$executeQuery('ALTER TABLE blocks MODIFY `avg_fee_rate` BIGINT UNSIGNED NOT NULL DEFAULT "0"');
}
if (databaseSchemaVersion < 14 && isBitcoin === true) {
logger.warn(`'hashrates' table has been truncated. Re-indexing from scratch.`);
await this.$executeQuery(connection, 'TRUNCATE hashrates;'); // Need to re-index
await this.$executeQuery(connection, 'ALTER TABLE `hashrates` DROP FOREIGN KEY `hashrates_ibfk_1`');
await this.$executeQuery(connection, 'ALTER TABLE `hashrates` MODIFY `pool_id` SMALLINT UNSIGNED NOT NULL DEFAULT "0"');
await this.$executeQuery('TRUNCATE hashrates;'); // Need to re-index
await this.$executeQuery('ALTER TABLE `hashrates` DROP FOREIGN KEY `hashrates_ibfk_1`');
await this.$executeQuery('ALTER TABLE `hashrates` MODIFY `pool_id` SMALLINT UNSIGNED NOT NULL DEFAULT "0"');
}
if (databaseSchemaVersion < 16 && isBitcoin === true) {
logger.warn(`'hashrates' table has been truncated. Re-indexing from scratch.`);
await this.$executeQuery(connection, 'TRUNCATE hashrates;'); // Need to re-index because we changed timestamps
await this.$executeQuery('TRUNCATE hashrates;'); // Need to re-index because we changed timestamps
}
if (databaseSchemaVersion < 17 && isBitcoin === true) {
await this.$executeQuery(connection, 'ALTER TABLE `pools` ADD `slug` CHAR(50) NULL');
await this.$executeQuery('ALTER TABLE `pools` ADD `slug` CHAR(50) NULL');
}
connection.release();
} catch (e) {
connection.release();
throw e;
}
}
@@ -203,13 +198,11 @@ class DatabaseMigration {
return;
}
const connection = await DB.getConnection();
try {
// We don't use "CREATE INDEX IF NOT EXISTS" because it is not supported on old mariadb version 5.X
const query = `SELECT COUNT(1) hasIndex FROM INFORMATION_SCHEMA.STATISTICS
WHERE table_schema=DATABASE() AND table_name='statistics' AND index_name='added';`;
const [rows] = await this.$executeQuery(connection, query, true);
const [rows] = await this.$executeQuery(query, true);
if (rows[0].hasIndex === 0) {
logger.debug('MIGRATIONS: `statistics.added` is not indexed');
this.statisticsAddedIndexed = false;
@@ -223,28 +216,24 @@ class DatabaseMigration {
logger.err('MIGRATIONS: Unable to check if `statistics.added` INDEX exist or not.');
this.statisticsAddedIndexed = true;
}
connection.release();
}
/**
* Small query execution wrapper to log all executed queries
*/
private async $executeQuery(connection: PoolConnection, query: string, silent: boolean = false): Promise<any> {
private async $executeQuery(query: string, silent: boolean = false): Promise<any> {
if (!silent) {
logger.debug('MIGRATIONS: Execute query:\n' + query);
}
return connection.query<any>({ sql: query, timeout: this.queryTimeout });
return DB.query({ sql: query, timeout: this.queryTimeout });
}
/**
* Check if 'table' exists in the database
*/
private async $checkIfTableExists(table: string): Promise<boolean> {
const connection = await DB.getConnection();
const query = `SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = '${config.DATABASE.DATABASE}' AND TABLE_NAME = '${table}'`;
const [rows] = await connection.query<any>({ sql: query, timeout: this.queryTimeout });
connection.release();
const [rows] = await DB.query({ sql: query, timeout: this.queryTimeout });
return rows[0]['COUNT(*)'] === 1;
}
@@ -252,10 +241,8 @@ class DatabaseMigration {
* Get current database version
*/
private async $getSchemaVersionFromDatabase(): Promise<number> {
const connection = await DB.getConnection();
const query = `SELECT number FROM state WHERE name = 'schema_version';`;
const [rows] = await this.$executeQuery(connection, query, true);
connection.release();
const [rows] = await this.$executeQuery(query, true);
return rows[0]['number'];
}
@@ -263,8 +250,6 @@ class DatabaseMigration {
* Create the `state` table
*/
private async $createMigrationStateTable(): Promise<void> {
const connection = await DB.getConnection();
try {
const query = `CREATE TABLE IF NOT EXISTS state (
name varchar(25) NOT NULL,
@@ -272,15 +257,12 @@ class DatabaseMigration {
string varchar(100) NULL,
CONSTRAINT name_unique UNIQUE (name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;`;
await this.$executeQuery(connection, query);
await this.$executeQuery(query);
// Set initial values
await this.$executeQuery(connection, `INSERT INTO state VALUES('schema_version', 0, NULL);`);
await this.$executeQuery(connection, `INSERT INTO state VALUES('last_elements_block', 0, NULL);`);
connection.release();
await this.$executeQuery(`INSERT INTO state VALUES('schema_version', 0, NULL);`);
await this.$executeQuery(`INSERT INTO state VALUES('last_elements_block', 0, NULL);`);
} catch (e) {
connection.release();
throw e;
}
}
@@ -295,18 +277,14 @@ class DatabaseMigration {
}
transactionQueries.push(this.getUpdateToLatestSchemaVersionQuery());
const connection = await DB.getConnection();
try {
await this.$executeQuery(connection, 'START TRANSACTION;');
await this.$executeQuery('START TRANSACTION;');
for (const query of transactionQueries) {
await this.$executeQuery(connection, query);
await this.$executeQuery(query);
}
await this.$executeQuery(connection, 'COMMIT;');
connection.release();
await this.$executeQuery('COMMIT;');
} catch (e) {
await this.$executeQuery(connection, 'ROLLBACK;');
connection.release();
await this.$executeQuery('ROLLBACK;');
throw e;
}
}
@@ -346,14 +324,12 @@ class DatabaseMigration {
* Print current database version
*/
private async $printDatabaseVersion() {
const connection = await DB.getConnection();
try {
const [rows] = await this.$executeQuery(connection, 'SELECT VERSION() as version;', true);
const [rows] = await this.$executeQuery('SELECT VERSION() as version;', true);
logger.debug(`MIGRATIONS: Database engine version '${rows[0].version}'`);
} catch (e) {
logger.debug(`MIGRATIONS: Could not fetch database engine version. ` + e);
}
connection.release();
}
// Couple of wrappers to clean the main logic
@@ -490,24 +466,22 @@ class DatabaseMigration {
public async $truncateIndexedData(tables: string[]) {
const allowedTables = ['blocks', 'hashrates'];
const connection = await DB.getConnection();
try {
for (const table of tables) {
if (!allowedTables.includes(table)) {
logger.debug(`Table ${table} cannot to be re-indexed (not allowed)`);
continue;
};
}
await this.$executeQuery(connection, `TRUNCATE ${table}`, true);
await this.$executeQuery(`TRUNCATE ${table}`, true);
if (table === 'hashrates') {
await this.$executeQuery(connection, 'UPDATE state set number = 0 where name = "last_hashrates_indexing"', true);
await this.$executeQuery('UPDATE state set number = 0 where name = "last_hashrates_indexing"', true);
}
logger.notice(`Table ${table} has been truncated`);
}
} catch (e) {
logger.warn(`Unable to erase indexed data`);
}
connection.release();
}
}