perf(backend): terminate stalled websocket connections

Resolve #10885
This commit is contained in:
syuilo 2023-06-02 09:13:41 +09:00
parent 8bdf0dd135
commit 9eaca966a4
1 changed files with 21 additions and 2 deletions

View File

@ -19,6 +19,8 @@ import type * as http from 'node:http';
@Injectable()
export class StreamingApiServerService {
#wss: WebSocket.WebSocketServer;
#connections = new Map<WebSocket.WebSocket, number>();
#cleanConnectionsIntervalId: NodeJS.Timeout | null = null;
constructor(
@Inject(DI.config)
@ -109,7 +111,9 @@ export class StreamingApiServerService {
await stream.listen(ev, connection);
const intervalId = user ? setInterval(() => {
this.#connections.set(connection, Date.now());
const userUpdateIntervalId = user ? setInterval(() => {
this.usersRepository.update(user.id, {
lastActiveDate: new Date(),
});
@ -124,19 +128,34 @@ export class StreamingApiServerService {
ev.removeAllListeners();
stream.dispose();
this.redisForSub.off('message', onRedisMessage);
if (intervalId) clearInterval(intervalId);
if (userUpdateIntervalId) clearInterval(userUpdateIntervalId);
});
connection.on('message', async (data) => {
this.#connections.set(connection, Date.now());
if (data.toString() === 'ping') {
connection.send('pong');
}
});
});
this.#cleanConnectionsIntervalId = setInterval(() => {
const now = Date.now();
for (const [connection, lastActive] of this.#connections.entries()) {
if (now - lastActive > 1000 * 60 * 5) {
connection.terminate();
this.#connections.delete(connection);
}
}
}, 1000 * 60 * 5);
}
@bindThis
public detach(): Promise<void> {
if (this.#cleanConnectionsIntervalId) {
clearInterval(this.#cleanConnectionsIntervalId);
this.#cleanConnectionsIntervalId = null;
}
return new Promise((resolve) => {
this.#wss.close(() => resolve());
});