[server] express server also listens on unix socket

This commit is contained in:
nymkappa
2024-04-07 18:39:37 +09:00
parent 501b79fdce
commit bed00fbd41
7 changed files with 93 additions and 32 deletions

View File

@@ -44,7 +44,7 @@ const wantable = [
];
class WebsocketHandler {
private wss: WebSocket.Server | undefined;
private webSocketServers: WebSocket.Server[] = [];
private extraInitProperties = {};
private numClients = 0;
@@ -57,8 +57,8 @@ class WebsocketHandler {
constructor() { }
setWebsocketServer(wss: WebSocket.Server) {
this.wss = wss;
addWebsocketServer(wss: WebSocket.Server) {
this.webSocketServers.push(wss);
}
setExtraInitData(property: string, value: any) {
@@ -102,11 +102,13 @@ class WebsocketHandler {
}
setupConnectionHandling() {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
}
this.wss.on('connection', (client: WebSocket, req) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.on('connection', (client: WebSocket, req) => {
this.numConnected++;
client['remoteAddress'] = req.headers['x-forwarded-for'] || req.socket?.remoteAddress || 'unknown';
client.on('error', (e) => {
@@ -369,14 +371,17 @@ class WebsocketHandler {
}
});
});
}
}
handleNewDonation(id: string) {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
}
this.wss.clients.forEach((client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
@@ -384,43 +389,50 @@ class WebsocketHandler {
client.send(JSON.stringify({ donationConfirmed: true }));
}
});
}
}
handleLoadingChanged(indicators: ILoadingIndicators) {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
}
this.updateSocketDataFields({ 'loadingIndicators': indicators });
const response = JSON.stringify({ loadingIndicators: indicators });
this.wss.clients.forEach((client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
client.send(response);
});
}
}
handleNewConversionRates(conversionRates: ApiPrice) {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
}
this.updateSocketDataFields({ 'conversions': conversionRates });
const response = JSON.stringify({ conversions: conversionRates });
this.wss.clients.forEach((client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
client.send(response);
});
}
}
handleNewStatistic(stats: OptimizedStatistic) {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
}
this.printLogs();
@@ -429,7 +441,9 @@ class WebsocketHandler {
'live-2h-chart': stats
});
this.wss.clients.forEach((client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
@@ -440,11 +454,12 @@ class WebsocketHandler {
client.send(response);
});
}
}
handleReorg(): void {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
}
const da = difficultyAdjustment.getDifficultyAdjustment();
@@ -455,7 +470,9 @@ class WebsocketHandler {
'da': da?.previousTime ? da : undefined,
});
this.wss.clients.forEach((client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
@@ -473,13 +490,14 @@ class WebsocketHandler {
client.send(this.serializeResponse(response));
}
});
}
}
async $handleMempoolChange(newMempool: { [txid: string]: MempoolTransactionExtended }, mempoolSize: number,
newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[],
candidates?: GbtCandidates): Promise<void> {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
}
this.printLogs();
@@ -552,7 +570,9 @@ class WebsocketHandler {
// pre-compute new tracked outspends
const outspendCache: { [txid: string]: { [vout: number]: { vin: number, txid: string } } } = {};
const trackedTxs = new Set<string>();
this.wss.clients.forEach((client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach((client) => {
if (client['track-tx']) {
trackedTxs.add(client['track-tx']);
}
@@ -562,6 +582,7 @@ class WebsocketHandler {
}
}
});
}
if (trackedTxs.size > 0) {
for (const tx of newTransactions) {
for (let i = 0; i < tx.vin.length; i++) {
@@ -581,7 +602,9 @@ class WebsocketHandler {
const addressCache = this.makeAddressCache(newTransactions);
const removedAddressCache = this.makeAddressCache(deletedTransactions);
this.wss.clients.forEach(async (client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach(async (client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
@@ -821,11 +844,12 @@ class WebsocketHandler {
client.send(this.serializeResponse(response));
}
});
}
}
async handleNewBlock(block: BlockExtended, txIds: string[], transactions: MempoolTransactionExtended[]): Promise<void> {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
}
this.printLogs();
@@ -969,7 +993,10 @@ class WebsocketHandler {
return responseCache[key];
}
this.wss.clients.forEach((client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
@@ -1150,6 +1177,7 @@ class WebsocketHandler {
client.send(this.serializeResponse(response));
}
});
}
await statistics.runStatistics();
}
@@ -1231,13 +1259,15 @@ class WebsocketHandler {
}
private printLogs(): void {
if (this.wss) {
if (this.webSocketServers.length) {
let numTxSubs = 0;
let numTxsSubs = 0;
let numProjectedSubs = 0;
let numRbfSubs = 0;
this.wss.clients.forEach((client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach((client) => {
if (client['track-tx']) {
numTxSubs++;
}
@@ -1251,8 +1281,12 @@ class WebsocketHandler {
numRbfSubs++;
}
})
}
const count = this.wss?.clients?.size || 0;
let count = 0;
for (const server of this.webSocketServers) {
count += server.clients?.size || 0;
}
const diff = count - this.numClients;
this.numClients = count;
logger.debug(`${count} websocket clients | ${this.numConnected} connected | ${this.numDisconnected} disconnected | (${diff >= 0 ? '+' : ''}${diff})`);