feat: introduce retention-rate aggregation

This commit is contained in:
syuilo 2022-12-25 09:09:46 +09:00
parent b11f8b0aae
commit 3e81913b6a
15 changed files with 220 additions and 3 deletions

View file

@ -25,6 +25,7 @@ You should also include the user name that made the change.
- Push notification of Antenna note @tamaina
- AVIF support @tamaina
- Add Cloudflare Turnstile CAPTCHA support @CyberRex0
- Introduce retention-rate aggregation @syuilo
- Server: improve syslog performance @syuilo
- Server: improve note scoring for featured notes @CyberRex0
- Server: delete outdated notifications regularly to improve db performance @syuilo

View file

@ -0,0 +1,13 @@
export class RetentionAggregation1671924750884 {
name = 'RetentionAggregation1671924750884'
async up(queryRunner) {
await queryRunner.query(`CREATE TABLE "retention_aggregation" ("id" character varying(32) NOT NULL, "createdAt" TIMESTAMP WITH TIME ZONE NOT NULL, "userIds" character varying(32) array NOT NULL, "data" jsonb NOT NULL DEFAULT '{}', CONSTRAINT "PK_22aad3e8640b15fb3b90ee02d18" PRIMARY KEY ("id")); COMMENT ON COLUMN "retention_aggregation"."createdAt" IS 'The created date of the Note.'`);
await queryRunner.query(`CREATE INDEX "IDX_09f4e5b9e4a2f268d3e284e4b3" ON "retention_aggregation" ("createdAt") `);
}
async down(queryRunner) {
await queryRunner.query(`DROP INDEX "public"."IDX_09f4e5b9e4a2f268d3e284e4b3"`);
await queryRunner.query(`DROP TABLE "retention_aggregation"`);
}
}

View file

@ -0,0 +1,15 @@
export class RetentionAggregation21671926422832 {
name = 'RetentionAggregation21671926422832'
async up(queryRunner) {
await queryRunner.query(`ALTER TABLE "retention_aggregation" ADD "updatedAt" TIMESTAMP WITH TIME ZONE NOT NULL`);
await queryRunner.query(`COMMENT ON COLUMN "retention_aggregation"."updatedAt" IS 'The updated date of the GalleryPost.'`);
await queryRunner.query(`ALTER TABLE "retention_aggregation" ADD "usersCount" integer NOT NULL`);
}
async down(queryRunner) {
await queryRunner.query(`ALTER TABLE "retention_aggregation" DROP COLUMN "usersCount"`);
await queryRunner.query(`COMMENT ON COLUMN "retention_aggregation"."updatedAt" IS 'The updated date of the GalleryPost.'`);
await queryRunner.query(`ALTER TABLE "retention_aggregation" DROP COLUMN "updatedAt"`);
}
}

View file

@ -68,5 +68,6 @@ export const DI = {
webhooksRepository: Symbol('webhooksRepository'),
adsRepository: Symbol('adsRepository'),
passwordResetRequestsRepository: Symbol('passwordResetRequestsRepository'),
retentionAggregationsRepository: Symbol('retentionAggregationsRepository'),
//#endregion
};

View file

@ -1,6 +1,6 @@
import { Module } from '@nestjs/common';
import { DI } from '@/di-symbols.js';
import { User, Note, Announcement, AnnouncementRead, App, NoteFavorite, NoteThreadMuting, NoteReaction, NoteUnread, Notification, Poll, PollVote, UserProfile, UserKeypair, UserPending, AttestationChallenge, UserSecurityKey, UserPublickey, UserList, UserListJoining, UserGroup, UserGroupJoining, UserGroupInvitation, UserNotePining, UserIp, UsedUsername, Following, FollowRequest, Instance, Emoji, DriveFile, DriveFolder, Meta, Muting, Blocking, SwSubscription, Hashtag, AbuseUserReport, RegistrationTicket, AuthSession, AccessToken, Signin, MessagingMessage, Page, PageLike, GalleryPost, GalleryLike, ModerationLog, Clip, ClipNote, Antenna, AntennaNote, PromoNote, PromoRead, Relay, MutedNote, Channel, ChannelFollowing, ChannelNotePining, RegistryItem, Webhook, Ad, PasswordResetRequest } from './index.js';
import { User, Note, Announcement, AnnouncementRead, App, NoteFavorite, NoteThreadMuting, NoteReaction, NoteUnread, Notification, Poll, PollVote, UserProfile, UserKeypair, UserPending, AttestationChallenge, UserSecurityKey, UserPublickey, UserList, UserListJoining, UserGroup, UserGroupJoining, UserGroupInvitation, UserNotePining, UserIp, UsedUsername, Following, FollowRequest, Instance, Emoji, DriveFile, DriveFolder, Meta, Muting, Blocking, SwSubscription, Hashtag, AbuseUserReport, RegistrationTicket, AuthSession, AccessToken, Signin, MessagingMessage, Page, PageLike, GalleryPost, GalleryLike, ModerationLog, Clip, ClipNote, Antenna, AntennaNote, PromoNote, PromoRead, Relay, MutedNote, Channel, ChannelFollowing, ChannelNotePining, RegistryItem, Webhook, Ad, PasswordResetRequest, RetentionAggregation } from './index.js';
import type { DataSource } from 'typeorm';
import type { Provider } from '@nestjs/common';
@ -382,6 +382,12 @@ const $passwordResetRequestsRepository: Provider = {
inject: [DI.db],
};
const $retentionAggregationsRepository: Provider = {
provide: DI.retentionAggregationsRepository,
useFactory: (db: DataSource) => db.getRepository(RetentionAggregation),
inject: [DI.db],
};
@Module({
imports: [
],
@ -449,6 +455,7 @@ const $passwordResetRequestsRepository: Provider = {
$webhooksRepository,
$adsRepository,
$passwordResetRequestsRepository,
$retentionAggregationsRepository,
],
exports: [
$usersRepository,
@ -514,6 +521,7 @@ const $passwordResetRequestsRepository: Provider = {
$webhooksRepository,
$adsRepository,
$passwordResetRequestsRepository,
$retentionAggregationsRepository,
],
})
export class RepositoryModule {}

View file

@ -0,0 +1,35 @@
import { Entity, PrimaryColumn, Index, Column } from 'typeorm';
import { id } from '../id.js';
import type { User } from './User.js';
@Entity()
export class RetentionAggregation {
@PrimaryColumn(id())
public id: string;
@Index()
@Column('timestamp with time zone', {
comment: 'The created date of the Note.',
})
public createdAt: Date;
@Column('timestamp with time zone', {
comment: 'The updated date of the GalleryPost.',
})
public updatedAt: Date;
@Column({
...id(),
array: true,
})
public userIds: User['id'][];
@Column('integer', {
})
public usersCount: number;
@Column('jsonb', {
default: {},
})
public data: Record<string, number>;
}

View file

@ -61,6 +61,7 @@ import { UserPublickey } from '@/models/entities/UserPublickey.js';
import { UserSecurityKey } from '@/models/entities/UserSecurityKey.js';
import { Webhook } from '@/models/entities/Webhook.js';
import { Channel } from '@/models/entities/Channel.js';
import { RetentionAggregation } from '@/models/entities/RetentionAggregation.js';
import type { Repository } from 'typeorm';
export {
@ -127,6 +128,7 @@ export {
UserSecurityKey,
Webhook,
Channel,
RetentionAggregation,
};
export type AbuseUserReportsRepository = Repository<AbuseUserReport>;
@ -192,3 +194,4 @@ export type UserPublickeysRepository = Repository<UserPublickey>;
export type UserSecurityKeysRepository = Repository<UserSecurityKey>;
export type WebhooksRepository = Repository<Webhook>;
export type ChannelsRepository = Repository<Channel>;
export type RetentionAggregationsRepository = Repository<RetentionAggregation>;

View file

@ -69,6 +69,7 @@ import { UserPublickey } from '@/models/entities/UserPublickey.js';
import { UserSecurityKey } from '@/models/entities/UserSecurityKey.js';
import { Webhook } from '@/models/entities/Webhook.js';
import { Channel } from '@/models/entities/Channel.js';
import { RetentionAggregation } from '@/models/entities/RetentionAggregation.js';
import { Config } from '@/config.js';
import MisskeyLogger from '@/logger.js';
@ -182,6 +183,7 @@ export const entities = [
UserPending,
Webhook,
UserIp,
RetentionAggregation,
...charts,
];

View file

@ -29,6 +29,7 @@ import { ImportMutingProcessorService } from './processors/ImportMutingProcessor
import { ImportUserListsProcessorService } from './processors/ImportUserListsProcessorService.js';
import { ResyncChartsProcessorService } from './processors/ResyncChartsProcessorService.js';
import { TickChartsProcessorService } from './processors/TickChartsProcessorService.js';
import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
@Module({
imports: [
@ -63,6 +64,7 @@ import { TickChartsProcessorService } from './processors/TickChartsProcessorServ
EndedPollNotificationProcessorService,
DeliverProcessorService,
InboxProcessorService,
AggregateRetentionProcessorService,
QueueProcessorService,
],
exports: [

View file

@ -4,6 +4,7 @@ import type { Config } from '@/config.js';
import { DI } from '@/di-symbols.js';
import type Logger from '@/logger.js';
import { QueueService } from '@/core/QueueService.js';
import { bindThis } from '@/decorators.js';
import { getJobInfo } from './get-job-info.js';
import { SystemQueueProcessorsService } from './SystemQueueProcessorsService.js';
import { ObjectStorageQueueProcessorsService } from './ObjectStorageQueueProcessorsService.js';
@ -13,7 +14,6 @@ import { EndedPollNotificationProcessorService } from './processors/EndedPollNot
import { DeliverProcessorService } from './processors/DeliverProcessorService.js';
import { InboxProcessorService } from './processors/InboxProcessorService.js';
import { QueueLoggerService } from './QueueLoggerService.js';
import { bindThis } from '@/decorators.js';
@Injectable()
export class QueueProcessorService {
@ -133,6 +133,12 @@ export class QueueProcessorService {
repeat: { cron: '0 0 * * *' },
removeOnComplete: true,
});
this.queueService.systemQueue.add('aggregateRetention', {
}, {
repeat: { cron: '0 0 * * *' },
removeOnComplete: true,
});
this.queueService.systemQueue.add('clean', {
}, {

View file

@ -1,13 +1,14 @@
import { Inject, Injectable } from '@nestjs/common';
import { DI } from '@/di-symbols.js';
import type { Config } from '@/config.js';
import { bindThis } from '@/decorators.js';
import { TickChartsProcessorService } from './processors/TickChartsProcessorService.js';
import { ResyncChartsProcessorService } from './processors/ResyncChartsProcessorService.js';
import { CleanChartsProcessorService } from './processors/CleanChartsProcessorService.js';
import { CheckExpiredMutingsProcessorService } from './processors/CheckExpiredMutingsProcessorService.js';
import { CleanProcessorService } from './processors/CleanProcessorService.js';
import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
import type Bull from 'bull';
import { bindThis } from '@/decorators.js';
@Injectable()
export class SystemQueueProcessorsService {
@ -18,6 +19,7 @@ export class SystemQueueProcessorsService {
private tickChartsProcessorService: TickChartsProcessorService,
private resyncChartsProcessorService: ResyncChartsProcessorService,
private cleanChartsProcessorService: CleanChartsProcessorService,
private aggregateRetentionProcessorService: AggregateRetentionProcessorService,
private checkExpiredMutingsProcessorService: CheckExpiredMutingsProcessorService,
private cleanProcessorService: CleanProcessorService,
) {
@ -28,6 +30,7 @@ export class SystemQueueProcessorsService {
q.process('tickCharts', (job, done) => this.tickChartsProcessorService.process(job, done));
q.process('resyncCharts', (job, done) => this.resyncChartsProcessorService.process(job, done));
q.process('cleanCharts', (job, done) => this.cleanChartsProcessorService.process(job, done));
q.process('aggregateRetention', (job, done) => this.aggregateRetentionProcessorService.process(job, done));
q.process('checkExpiredMutings', (job, done) => this.checkExpiredMutingsProcessorService.process(job, done));
q.process('clean', (job, done) => this.cleanProcessorService.process(job, done));
}

View file

@ -0,0 +1,75 @@
import { Inject, Injectable } from '@nestjs/common';
import { In, IsNull, MoreThan } from 'typeorm';
import { DI } from '@/di-symbols.js';
import type { Config } from '@/config.js';
import type Logger from '@/logger.js';
import { bindThis } from '@/decorators.js';
import type { RetentionAggregationsRepository, UsersRepository } from '@/models/index.js';
import { deepClone } from '@/misc/clone.js';
import { IdService } from '@/core/IdService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type Bull from 'bull';
@Injectable()
export class AggregateRetentionProcessorService {
private logger: Logger;
constructor(
@Inject(DI.config)
private config: Config,
@Inject(DI.usersRepository)
private usersRepository: UsersRepository,
@Inject(DI.retentionAggregationsRepository)
private retentionAggregationsRepository: RetentionAggregationsRepository,
private idService: IdService,
private queueLoggerService: QueueLoggerService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('aggregate-retention');
}
@bindThis
public async process(job: Bull.Job<Record<string, unknown>>, done: () => void): Promise<void> {
this.logger.info('Aggregating retention...');
const now = new Date();
const dateKey = `${now.getFullYear()}-${now.getMonth() + 1}-${now.getDate()}`;
// 過去(だいたい)30日分のレコードを取得
const pastRecords = await this.retentionAggregationsRepository.findBy({
createdAt: MoreThan(new Date(Date.now() - (1000 * 60 * 60 * 24 * 31))),
});
// 今日登録したユーザーを全て取得
const targetUsers = await this.usersRepository.findBy({
host: IsNull(),
createdAt: MoreThan(new Date(Date.now() - (1000 * 60 * 60 * 24))),
});
const targetUserIds = targetUsers.map(u => u.id);
await this.retentionAggregationsRepository.insert({
id: this.idService.genId(),
createdAt: now,
updatedAt: now,
userIds: targetUserIds,
usersCount: targetUserIds.length,
});
for (const record of pastRecords) {
const retention = record.userIds.filter(id => targetUserIds.includes(id)).length;
const data = deepClone(record.data);
data[dateKey] = retention;
this.retentionAggregationsRepository.update(record.id, {
updatedAt: now,
data,
});
}
this.logger.succ('Retention aggregated.');
done();
}
}

View file

@ -315,6 +315,7 @@ import * as ep___users_show from './endpoints/users/show.js';
import * as ep___users_stats from './endpoints/users/stats.js';
import * as ep___fetchRss from './endpoints/fetch-rss.js';
import * as ep___admin_driveCapOverride from './endpoints/admin/drive-capacity-override.js';
import * as ep___retention from './endpoints/retention.js';
import { GetterService } from './GetterService.js';
import { ApiLoggerService } from './ApiLoggerService.js';
import type { Provider } from '@nestjs/common';
@ -633,6 +634,7 @@ const $users_show: Provider = { provide: 'ep:users/show', useClass: ep___users_s
const $users_stats: Provider = { provide: 'ep:users/stats', useClass: ep___users_stats.default };
const $admin_driveCapOverride: Provider = { provide: 'ep:admin/drive-capacity-override', useClass: ep___admin_driveCapOverride.default };
const $fetchRss: Provider = { provide: 'ep:fetch-rss', useClass: ep___fetchRss.default };
const $retention: Provider = { provide: 'ep:retention', useClass: ep___retention.default };
@Module({
imports: [
@ -955,6 +957,7 @@ const $fetchRss: Provider = { provide: 'ep:fetch-rss', useClass: ep___fetchRss.d
$users_stats,
$admin_driveCapOverride,
$fetchRss,
$retention,
],
exports: [
$admin_meta,
@ -1269,6 +1272,7 @@ const $fetchRss: Provider = { provide: 'ep:fetch-rss', useClass: ep___fetchRss.d
$users_stats,
$admin_driveCapOverride,
$fetchRss,
$retention,
],
})
export class EndpointsModule {}

View file

@ -314,6 +314,7 @@ import * as ep___users_show from './endpoints/users/show.js';
import * as ep___users_stats from './endpoints/users/stats.js';
import * as ep___fetchRss from './endpoints/fetch-rss.js';
import * as ep___admin_driveCapOverride from './endpoints/admin/drive-capacity-override.js';
import * as ep___retention from './endpoints/retention.js';
const eps = [
['admin/meta', ep___admin_meta],
@ -630,6 +631,7 @@ const eps = [
['users/stats', ep___users_stats],
['admin/drive-capacity-override', ep___admin_driveCapOverride],
['fetch-rss', ep___fetchRss],
['retention', ep___retention],
];
export interface IEndpointMeta {

View file

@ -0,0 +1,47 @@
import { IsNull } from 'typeorm';
import { Inject, Injectable } from '@nestjs/common';
import type { RetentionAggregationsRepository } from '@/models/index.js';
import { Endpoint } from '@/server/api/endpoint-base.js';
import { DI } from '@/di-symbols.js';
export const meta = {
tags: ['users'],
requireCredential: false,
res: {
},
allowGet: true,
cacheSec: 60 * 60,
} as const;
export const paramDef = {
type: 'object',
properties: {},
required: [],
} as const;
// eslint-disable-next-line import/no-default-export
@Injectable()
export default class extends Endpoint<typeof meta, typeof paramDef> {
constructor(
@Inject(DI.retentionAggregationsRepository)
private retentionAggregationsRepository: RetentionAggregationsRepository,
) {
super(meta, paramDef, async (ps, me) => {
const records = await this.retentionAggregationsRepository.find({
order: {
id: 'DESC',
},
take: 30,
});
return records.map(record => ({
createdAt: record.createdAt.toISOString(),
users: record.usersCount,
data: record.data,
}));
});
}
}