From 05f9ad11bb51ca932cba7716302924dfaa7aafdd Mon Sep 17 00:00:00 2001 From: syuilo Date: Thu, 11 Oct 2018 18:09:41 +0900 Subject: [PATCH] =?UTF-8?q?Redis=E3=81=8C=E3=82=A4=E3=83=B3=E3=82=B9?= =?UTF-8?q?=E3=83=88=E3=83=BC=E3=83=AB=E3=81=95=E3=82=8C=E3=81=A6=E3=81=84?= =?UTF-8?q?=E3=82=8B=E3=81=A8=E3=81=8D=E3=81=AF=E3=82=A4=E3=83=99=E3=83=B3?= =?UTF-8?q?=E3=83=88=E3=81=AE=E5=85=B1=E6=9C=89=E3=81=ABRedis=E3=81=AEpub/?= =?UTF-8?q?sub=E3=82=92=E4=BD=BF=E3=81=86=E3=82=88=E3=81=86=E3=81=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/api/stream/index.ts | 6 +++--- src/server/api/streaming.ts | 32 ++++++++++++++++++++++++++++++-- src/stream.ts | 15 +++++++++++++-- 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/src/server/api/stream/index.ts b/src/server/api/stream/index.ts index 838d847004..afedd4362c 100644 --- a/src/server/api/stream/index.ts +++ b/src/server/api/stream/index.ts @@ -1,6 +1,5 @@ import autobind from 'autobind-decorator'; import * as websocket from 'websocket'; -import Xev from 'xev'; import * as debug from 'debug'; import User, { IUser } from '../../../models/user'; @@ -11,6 +10,7 @@ import readNote from '../../../services/note/read'; import Channel from './channel'; import channels from './channels'; +import { EventEmitter } from 'events'; const log = debug('misskey'); @@ -21,14 +21,14 @@ export default class Connection { public user?: IUser; public app: IApp; private wsConnection: websocket.connection; - public subscriber: Xev; + public subscriber: EventEmitter; private channels: Channel[] = []; private subscribingNotes: any = {}; public sendMessageToWsOverride: any = null; // 後方互換性のため constructor( wsConnection: websocket.connection, - subscriber: Xev, + subscriber: EventEmitter, user: IUser, app: IApp ) { diff --git a/src/server/api/streaming.ts b/src/server/api/streaming.ts index a0a219a317..249157e222 100644 --- a/src/server/api/streaming.ts +++ b/src/server/api/streaming.ts @@ -1,11 +1,14 @@ import * as http from 'http'; import * as websocket from 'websocket'; +import * as redis from 'redis'; import Xev from 'xev'; import MainStreamConnection from './stream'; import { ParsedUrlQuery } from 'querystring'; import authenticate from './authenticate'; import channels from './stream/channels'; +import { EventEmitter } from 'events'; +import config from '../../config'; module.exports = (server: http.Server) => { // Init websocket server @@ -16,11 +19,36 @@ module.exports = (server: http.Server) => { ws.on('request', async (request) => { const connection = request.accept(); - const ev = new Xev(); - const q = request.resourceURL.query as ParsedUrlQuery; const [user, app] = await authenticate(q.i as string); + let ev: EventEmitter; + + if (config.redis) { + // Connect to Redis + const subscriber = redis.createClient( + config.redis.port, config.redis.host); + + subscriber.subscribe('misskey'); + + ev = new EventEmitter(); + + subscriber.on('message', async (_, data) => { + const obj = JSON.parse(data); + + console.log(obj); + + ev.emit(obj.channel, obj.message); + }); + + connection.once('close', () => { + subscriber.unsubscribe(); + subscriber.quit(); + }); + } else { + ev = new Xev(); + } + const main = new MainStreamConnection(connection, ev, user, app); // 後方互換性のため diff --git a/src/stream.ts b/src/stream.ts index 45b353d904..b222a45ca9 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,4 +1,5 @@ import * as mongo from 'mongodb'; +import redis from './db/redis'; import Xev from 'xev'; import Meta, { IMeta } from './models/meta'; @@ -9,7 +10,10 @@ class Publisher { private meta: IMeta; constructor() { - this.ev = new Xev(); + // Redisがインストールされてないときはプロセス間通信を使う + if (redis == null) { + this.ev = new Xev(); + } setInterval(async () => { this.meta = await Meta.findOne({}); @@ -28,7 +32,14 @@ class Publisher { { type: type, body: null } : { type: type, body: value }; - this.ev.emit(channel, message); + if (this.ev) { + this.ev.emit(channel, message); + } else { + redis.publish('misskey', JSON.stringify({ + channel: channel, + message: message + })); + } } public publishMainStream = (userId: ID, type: string, value?: any): void => {