/* * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ import { Inject, Injectable } from '@nestjs/common'; import * as Redis from 'ioredis'; import { DI } from '@/di-symbols.js'; import { bindThis } from '@/decorators.js'; import { IdService } from '@/core/IdService.js'; export type FanoutTimelineName = // home timeline | `homeTimeline:${string}` | `homeTimelineWithFiles:${string}` // only notes with files are included // local timeline | `localTimeline` // replies are not included | `localTimelineWithFiles` // only non-reply notes with files are included | `localTimelineWithReplies` // only replies are included | `localTimelineWithReplyTo:${string}` // Only replies to specific local user are included. Parameter is reply user id. // antenna | `antennaTimeline:${string}` // user timeline | `userTimeline:${string}` // replies are not included | `userTimelineWithFiles:${string}` // only non-reply notes with files are included | `userTimelineWithReplies:${string}` // only replies are included | `userTimelineWithChannel:${string}` // only channel notes are included, replies are included // user list timelines | `userListTimeline:${string}` | `userListTimelineWithFiles:${string}` // only notes with files are included // channel timelines | `channelTimeline:${string}` // replies are included // role timelines | `roleTimeline:${string}` // any notes are included @Injectable() export class FanoutTimelineService { constructor( @Inject(DI.redisForTimelines) private redisForTimelines: Redis.Redis, private idService: IdService, ) { } @bindThis public push(tl: FanoutTimelineName, id: string, maxlen: number, pipeline: Redis.ChainableCommander) { // リモートから遅れて届いた(もしくは後から追加された)投稿日時が古い投稿が追加されるとページネーション時に問題を引き起こすため、 // 3分以内に投稿されたものでない場合、Redisにある最古のIDより新しい場合のみ追加する if (this.idService.parse(id).date.getTime() > Date.now() - 1000 * 60 * 3) { pipeline.lpush('list:' + tl, id); if (Math.random() < 0.1) { // 10%の確率でトリム pipeline.ltrim('list:' + tl, 0, maxlen - 1); } } else { // 末尾のIDを取得 this.redisForTimelines.lindex('list:' + tl, -1).then(lastId => { if (lastId == null || (this.idService.parse(id).date.getTime() > this.idService.parse(lastId).date.getTime())) { this.redisForTimelines.lpush('list:' + tl, id); } else { Promise.resolve(); } }); } } @bindThis public get(name: FanoutTimelineName, untilId?: string | null, sinceId?: string | null) { if (untilId && sinceId) { return this.redisForTimelines.lrange('list:' + name, 0, -1) .then(ids => ids.filter(id => id < untilId && id > sinceId).sort((a, b) => a > b ? -1 : 1)); } else if (untilId) { return this.redisForTimelines.lrange('list:' + name, 0, -1) .then(ids => ids.filter(id => id < untilId).sort((a, b) => a > b ? -1 : 1)); } else if (sinceId) { return this.redisForTimelines.lrange('list:' + name, 0, -1) .then(ids => ids.filter(id => id > sinceId).sort((a, b) => a < b ? -1 : 1)); } else { return this.redisForTimelines.lrange('list:' + name, 0, -1) .then(ids => ids.sort((a, b) => a > b ? -1 : 1)); } } @bindThis public getMulti(name: FanoutTimelineName[], untilId?: string | null, sinceId?: string | null): Promise { const pipeline = this.redisForTimelines.pipeline(); for (const n of name) { pipeline.lrange('list:' + n, 0, -1); } return pipeline.exec().then(res => { if (res == null) return []; const tls = res.map(r => r[1] as string[]); return tls.map(ids => (untilId && sinceId) ? ids.filter(id => id < untilId && id > sinceId).sort((a, b) => a > b ? -1 : 1) : untilId ? ids.filter(id => id < untilId).sort((a, b) => a > b ? -1 : 1) : sinceId ? ids.filter(id => id > sinceId).sort((a, b) => a < b ? -1 : 1) : ids.sort((a, b) => a > b ? -1 : 1), ); }); } @bindThis public purge(name: FanoutTimelineName) { return this.redisForTimelines.del('list:' + name); } }