Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine fanout timeline #12507

Merged
merged 20 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8623f40
chore(endpoints/hybrid-timeline): don't pack inside getFromDb
anatawa12 Nov 28, 2023
5fe6625
chore(endpoints/hybrid-timeline): Redisから取得する部分のうちSTLに依存しなそうなところを別のSe…
anatawa12 Nov 28, 2023
f0e41d7
chore(endpoints/local-timeline): FanoutTimelineEndpointServiceで再実装
anatawa12 Nov 28, 2023
9bd88f1
chore(endpoints/channels/timeline): FanoutTimelineEndpointServiceで再実装
anatawa12 Nov 28, 2023
816b8b9
chore(endpoints/timeline): FanoutTimelineEndpointServiceで再実装
anatawa12 Nov 28, 2023
ef43513
chore(endpoints/user-list-timeline): FanoutTimelineEndpointServiceで再実装
anatawa12 Nov 28, 2023
d247134
chore(endpoints/users/notes): FanoutTimelineEndpointServiceで再実装
anatawa12 Nov 28, 2023
d8c6148
chore: add useDbFallback to FanoutTimelineEndpointService.timeline an…
anatawa12 Nov 28, 2023
1ac9a12
style: fix lint error
anatawa12 Nov 28, 2023
60a7529
chore: split logic to multiple functions
anatawa12 Nov 29, 2023
32fd234
chore: implement redis fallback
anatawa12 Nov 29, 2023
f6b7725
chore: 成功率を上げる
anatawa12 Nov 29, 2023
d196900
fix: db fallback not working
anatawa12 Nov 29, 2023
ae684e6
Merge branch 'develop' into refine-fanout-timeline
syuilo Dec 2, 2023
6148c86
feat: allowPartial
anatawa12 Dec 2, 2023
c100f8c
chore(frontend): set allowPartial
anatawa12 Dec 2, 2023
198be6c
chore(backend): remove fallbackIfEmpty
anatawa12 Dec 2, 2023
e159c22
fix: missing allowPartial in channel timeline
anatawa12 Dec 2, 2023
9e1ceaf
fix: type of timelineConfig in hybrid-timeline
anatawa12 Dec 2, 2023
9cd8ccb
Merge branch 'develop' into refine-fanout-timeline
anatawa12 Dec 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/backend/src/core/CoreModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/

import { Module } from '@nestjs/common';
import { FanoutTimelineEndpointService } from '@/core/FanoutTimelineEndpointService.js';
import { AccountMoveService } from './AccountMoveService.js';
import { AccountUpdateService } from './AccountUpdateService.js';
import { AiService } from './AiService.js';
Expand Down Expand Up @@ -195,6 +196,7 @@ const $SearchService: Provider = { provide: 'SearchService', useExisting: Search
const $ClipService: Provider = { provide: 'ClipService', useExisting: ClipService };
const $FeaturedService: Provider = { provide: 'FeaturedService', useExisting: FeaturedService };
const $FanoutTimelineService: Provider = { provide: 'FanoutTimelineService', useExisting: FanoutTimelineService };
const $FanoutTimelineEndpointService: Provider = { provide: 'FanoutTimelineEndpointService', useExisting: FanoutTimelineEndpointService };
const $ChannelFollowingService: Provider = { provide: 'ChannelFollowingService', useExisting: ChannelFollowingService };
const $RegistryApiService: Provider = { provide: 'RegistryApiService', useExisting: RegistryApiService };

Expand Down Expand Up @@ -331,6 +333,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting
ClipService,
FeaturedService,
FanoutTimelineService,
FanoutTimelineEndpointService,
ChannelFollowingService,
RegistryApiService,
ChartLoggerService,
Expand Down Expand Up @@ -460,6 +463,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting
$ClipService,
$FeaturedService,
$FanoutTimelineService,
$FanoutTimelineEndpointService,
$ChannelFollowingService,
$RegistryApiService,
$ChartLoggerService,
Expand Down Expand Up @@ -590,6 +594,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting
ClipService,
FeaturedService,
FanoutTimelineService,
FanoutTimelineEndpointService,
ChannelFollowingService,
RegistryApiService,
FederationChart,
Expand Down Expand Up @@ -718,6 +723,7 @@ const $ApQuestionService: Provider = { provide: 'ApQuestionService', useExisting
$ClipService,
$FeaturedService,
$FanoutTimelineService,
$FanoutTimelineEndpointService,
$ChannelFollowingService,
$RegistryApiService,
$FederationChart,
Expand Down
123 changes: 123 additions & 0 deletions packages/backend/src/core/FanoutTimelineEndpointService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* SPDX-FileCopyrightText: syuilo and other misskey contributors
* SPDX-License-Identifier: AGPL-3.0-only
*/

import { Inject, Injectable } from '@nestjs/common';
import { DI } from '@/di-symbols.js';
import { bindThis } from '@/decorators.js';
import type { MiUser } from '@/models/User.js';
import type { MiNote } from '@/models/Note.js';
import { Packed } from '@/misc/json-schema.js';
import type { NotesRepository } from '@/models/_.js';
import { NoteEntityService } from '@/core/entities/NoteEntityService.js';
import { FanoutTimelineService } from '@/core/FanoutTimelineService.js';

@Injectable()
export class FanoutTimelineEndpointService {
constructor(
@Inject(DI.notesRepository)
private notesRepository: NotesRepository,

private noteEntityService: NoteEntityService,
private fanoutTimelineService: FanoutTimelineService,
) {
}

@bindThis
async timeline(ps: {
untilId: string | null,
sinceId: string | null,
limit: number,
allowPartial: boolean,
me?: { id: MiUser['id'] } | undefined | null,
useDbFallback: boolean,
redisTimelines: string[],
noteFilter: (note: MiNote) => boolean,
dbFallback: (untilId: string | null, sinceId: string | null, limit: number) => Promise<MiNote[]>,
}): Promise<Packed<'Note'>[]> {
return await this.noteEntityService.packMany(await this.getMiNotes(ps), ps.me);
}

@bindThis
private async getMiNotes(ps: {
untilId: string | null,
sinceId: string | null,
limit: number,
allowPartial: boolean,
me?: { id: MiUser['id'] } | undefined | null,
useDbFallback: boolean,
redisTimelines: string[],
noteFilter: (note: MiNote) => boolean,
dbFallback: (untilId: string | null, sinceId: string | null, limit: number) => Promise<MiNote[]>,
}): Promise<MiNote[]> {
let noteIds: string[];
let shouldFallbackToDb = false;

// 呼び出し元と以下の処理をシンプルにするためにdbFallbackを置き換える
if (!ps.useDbFallback) ps.dbFallback = () => Promise.resolve([]);

const redisResult = await this.fanoutTimelineService.getMulti(ps.redisTimelines, ps.untilId, ps.sinceId);

const redisResultIds = Array.from(new Set(redisResult.flat(1)));

redisResultIds.sort((a, b) => a > b ? -1 : 1);
noteIds = redisResultIds.slice(0, ps.limit);

shouldFallbackToDb = shouldFallbackToDb || (noteIds.length === 0);

if (!shouldFallbackToDb) {
const redisTimeline: MiNote[] = [];
let readFromRedis = 0;
let lastSuccessfulRate = 1; // rateをキャッシュする?
let trialCount = 1;

while ((redisResultIds.length - readFromRedis) !== 0) {
const remainingToRead = ps.limit - redisTimeline.length;

// DBからの取り直しを減らす初回と同じ割合以上で成功すると仮定するが、クエリの長さを考えて三倍まで
const countToGet = remainingToRead * Math.ceil(Math.min(1.1 / lastSuccessfulRate, 3));
noteIds = redisResultIds.slice(readFromRedis, readFromRedis + countToGet);

readFromRedis += noteIds.length;

const gotFromDb = await this.getAndFilterFromDb(noteIds, ps.noteFilter);
syuilo marked this conversation as resolved.
Show resolved Hide resolved
redisTimeline.push(...gotFromDb);
lastSuccessfulRate = gotFromDb.length / noteIds.length;

console.log(`fanoutTimelineTrial#${trialCount++}: req: ${ps.limit}, tried: ${noteIds.length}, got: ${gotFromDb.length}, rate: ${lastSuccessfulRate}, total: ${redisTimeline.length}, fromRedis: ${redisResultIds.length}`);

if (ps.allowPartial ? redisTimeline.length !== 0 : redisTimeline.length >= ps.limit) {
// 十分Redisからとれた
return redisTimeline.slice(0, ps.limit);
}
}

// まだ足りない分はDBにフォールバック
const remainingToRead = ps.limit - redisTimeline.length;
const gotFromDb = await ps.dbFallback(noteIds[noteIds.length - 1], ps.sinceId, remainingToRead);
redisTimeline.push(...gotFromDb);
console.log(`fanoutTimelineTrial#db: req: ${ps.limit}, tried: ${remainingToRead}, got: ${gotFromDb.length}, since: ${noteIds[noteIds.length - 1]}, until: ${ps.untilId}, total: ${redisTimeline.length}`);
return redisTimeline;
}

return await ps.dbFallback(ps.untilId, ps.sinceId, ps.limit);
}

private async getAndFilterFromDb(noteIds: string[], noteFilter: (note: MiNote) => boolean): Promise<MiNote[]> {
const query = this.notesRepository.createQueryBuilder('note')
.where('note.id IN (:...noteIds)', { noteIds: noteIds })
.innerJoinAndSelect('note.user', 'user')
.leftJoinAndSelect('note.reply', 'reply')
.leftJoinAndSelect('note.renote', 'renote')
.leftJoinAndSelect('reply.user', 'replyUser')
.leftJoinAndSelect('renote.user', 'renoteUser')
.leftJoinAndSelect('note.channel', 'channel');

const notes = (await query.getMany()).filter(noteFilter);

notes.sort((a, b) => a.id > b.id ? -1 : 1);

return notes;
}
}
116 changes: 54 additions & 62 deletions packages/backend/src/server/api/endpoints/channels/timeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import { NoteEntityService } from '@/core/entities/NoteEntityService.js';
import ActiveUsersChart from '@/core/chart/charts/active-users.js';
import { DI } from '@/di-symbols.js';
import { IdService } from '@/core/IdService.js';
import { FanoutTimelineService } from '@/core/FanoutTimelineService.js';
import { isUserRelated } from '@/misc/is-user-related.js';
import { CacheService } from '@/core/CacheService.js';
import { MetaService } from '@/core/MetaService.js';
import { FanoutTimelineEndpointService } from '@/core/FanoutTimelineEndpointService.js';
import { MiLocalUser } from '@/models/User.js';
import { ApiError } from '../../error.js';

export const meta = {
Expand Down Expand Up @@ -51,16 +52,14 @@ export const paramDef = {
untilId: { type: 'string', format: 'misskey:id' },
sinceDate: { type: 'integer' },
untilDate: { type: 'integer' },
allowPartial: { type: 'boolean', default: false }, // true is recommended but for compatibility false by default
},
required: ['channelId'],
} as const;

@Injectable()
export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-disable-line import/no-default-export
constructor(
@Inject(DI.redisForTimelines)
private redisForTimelines: Redis.Redis,

@Inject(DI.notesRepository)
private notesRepository: NotesRepository,

Expand All @@ -70,15 +69,14 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
private idService: IdService,
private noteEntityService: NoteEntityService,
private queryService: QueryService,
private fanoutTimelineService: FanoutTimelineService,
private fanoutTimelineEndpointService: FanoutTimelineEndpointService,
private cacheService: CacheService,
private activeUsersChart: ActiveUsersChart,
private metaService: MetaService,
) {
super(meta, paramDef, async (ps, me) => {
const untilId = ps.untilId ?? (ps.untilDate ? this.idService.gen(ps.untilDate!) : null);
const sinceId = ps.sinceId ?? (ps.sinceDate ? this.idService.gen(ps.sinceDate!) : null);
const isRangeSpecified = untilId != null && sinceId != null;

const serverSettings = await this.metaService.fetch();

Expand All @@ -92,64 +90,58 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-

if (me) this.activeUsersChart.read(me);

if (serverSettings.enableFanoutTimeline && (isRangeSpecified || sinceId == null)) {
const [
userIdsWhoMeMuting,
] = me ? await Promise.all([
this.cacheService.userMutingsCache.fetch(me.id),
]) : [new Set<string>()];

let noteIds = await this.fanoutTimelineService.get(`channelTimeline:${channel.id}`, untilId, sinceId);
noteIds = noteIds.slice(0, ps.limit);

if (noteIds.length > 0) {
const query = this.notesRepository.createQueryBuilder('note')
.where('note.id IN (:...noteIds)', { noteIds: noteIds })
.innerJoinAndSelect('note.user', 'user')
.leftJoinAndSelect('note.reply', 'reply')
.leftJoinAndSelect('note.renote', 'renote')
.leftJoinAndSelect('reply.user', 'replyUser')
.leftJoinAndSelect('renote.user', 'renoteUser')
.leftJoinAndSelect('note.channel', 'channel');

let timeline = await query.getMany();

timeline = timeline.filter(note => {
if (me && isUserRelated(note, userIdsWhoMeMuting)) return false;

return true;
});

// TODO: フィルタで件数が減った場合の埋め合わせ処理

timeline.sort((a, b) => a.id > b.id ? -1 : 1);

if (timeline.length > 0) {
return await this.noteEntityService.packMany(timeline, me);
}
}
}

//#region fallback to database
const query = this.queryService.makePaginationQuery(this.notesRepository.createQueryBuilder('note'), ps.sinceId, ps.untilId, ps.sinceDate, ps.untilDate)
.andWhere('note.channelId = :channelId', { channelId: channel.id })
.innerJoinAndSelect('note.user', 'user')
.leftJoinAndSelect('note.reply', 'reply')
.leftJoinAndSelect('note.renote', 'renote')
.leftJoinAndSelect('reply.user', 'replyUser')
.leftJoinAndSelect('renote.user', 'renoteUser')
.leftJoinAndSelect('note.channel', 'channel');

if (me) {
this.queryService.generateMutedUserQuery(query, me);
this.queryService.generateBlockedUserQuery(query, me);
if (!serverSettings.enableFanoutTimeline) {
return await this.noteEntityService.packMany(await this.getFromDb({ untilId, sinceId, limit: ps.limit, channelId: channel.id }, me), me);
}
//#endregion

const timeline = await query.limit(ps.limit).getMany();

return await this.noteEntityService.packMany(timeline, me);
//#endregion
const [
userIdsWhoMeMuting,
] = me ? await Promise.all([
this.cacheService.userMutingsCache.fetch(me.id),
]) : [new Set<string>()];

return await this.fanoutTimelineEndpointService.timeline({
untilId,
sinceId,
limit: ps.limit,
allowPartial: ps.allowPartial,
me,
useDbFallback: true,
redisTimelines: [`channelTimeline:${channel.id}`],
noteFilter: note => {
if (me && isUserRelated(note, userIdsWhoMeMuting)) return false;

return true;
},
dbFallback: async (untilId, sinceId, limit) => {
return await this.getFromDb({ untilId, sinceId, limit, channelId: channel.id }, me);
},
});
});
}

private async getFromDb(ps: {
untilId: string | null,
sinceId: string | null,
limit: number,
channelId: string
}, me: MiLocalUser | null) {
//#region fallback to database
const query = this.queryService.makePaginationQuery(this.notesRepository.createQueryBuilder('note'), ps.sinceId, ps.untilId)
.andWhere('note.channelId = :channelId', { channelId: ps.channelId })
.innerJoinAndSelect('note.user', 'user')
.leftJoinAndSelect('note.reply', 'reply')
.leftJoinAndSelect('note.renote', 'renote')
.leftJoinAndSelect('reply.user', 'replyUser')
.leftJoinAndSelect('renote.user', 'renoteUser')
.leftJoinAndSelect('note.channel', 'channel');

if (me) {
this.queryService.generateMutedUserQuery(query, me);
this.queryService.generateBlockedUserQuery(query, me);
}
//#endregion

return await query.limit(ps.limit).getMany();
}
}
Loading
Loading