-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* chore(endpoints/hybrid-timeline): don't pack inside getFromDb * chore(endpoints/hybrid-timeline): Redisから取得する部分のうちSTLに依存しなそうなところを別のServiceに切り出し * chore(endpoints/local-timeline): FanoutTimelineEndpointServiceで再実装 * chore(endpoints/channels/timeline): FanoutTimelineEndpointServiceで再実装 * chore(endpoints/timeline): FanoutTimelineEndpointServiceで再実装 * chore(endpoints/user-list-timeline): FanoutTimelineEndpointServiceで再実装 * chore(endpoints/users/notes): FanoutTimelineEndpointServiceで再実装 * chore: add useDbFallback to FanoutTimelineEndpointService.timeline and always true for channel / user note list * style: fix lint error * chore: split logic to multiple functions * chore: implement redis fallback * chore: 成功率を上げる * fix: db fallback not working * feat: allowPartial * chore(frontend): set allowPartial * chore(backend): remove fallbackIfEmpty HTL will never be purged so it's no longer required * fix: missing allowPartial in channel timeline * fix: type of timelineConfig in hybrid-timeline --------- Co-authored-by: syuilo <Syuilotan@yahoo.co.jp>
- Loading branch information
Showing
9 changed files
with
449 additions
and
405 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
123 changes: 123 additions & 0 deletions
123
packages/backend/src/core/FanoutTimelineEndpointService.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
/* | ||
* SPDX-FileCopyrightText: syuilo and other misskey contributors | ||
* SPDX-License-Identifier: AGPL-3.0-only | ||
*/ | ||
|
||
import { Inject, Injectable } from '@nestjs/common'; | ||
import { DI } from '@/di-symbols.js'; | ||
import { bindThis } from '@/decorators.js'; | ||
import type { MiUser } from '@/models/User.js'; | ||
import type { MiNote } from '@/models/Note.js'; | ||
import { Packed } from '@/misc/json-schema.js'; | ||
import type { NotesRepository } from '@/models/_.js'; | ||
import { NoteEntityService } from '@/core/entities/NoteEntityService.js'; | ||
import { FanoutTimelineService } from '@/core/FanoutTimelineService.js'; | ||
|
||
@Injectable() | ||
export class FanoutTimelineEndpointService { | ||
constructor( | ||
@Inject(DI.notesRepository) | ||
private notesRepository: NotesRepository, | ||
|
||
private noteEntityService: NoteEntityService, | ||
private fanoutTimelineService: FanoutTimelineService, | ||
) { | ||
} | ||
|
||
@bindThis | ||
async timeline(ps: { | ||
untilId: string | null, | ||
sinceId: string | null, | ||
limit: number, | ||
allowPartial: boolean, | ||
me?: { id: MiUser['id'] } | undefined | null, | ||
useDbFallback: boolean, | ||
redisTimelines: string[], | ||
noteFilter: (note: MiNote) => boolean, | ||
dbFallback: (untilId: string | null, sinceId: string | null, limit: number) => Promise<MiNote[]>, | ||
}): Promise<Packed<'Note'>[]> { | ||
return await this.noteEntityService.packMany(await this.getMiNotes(ps), ps.me); | ||
} | ||
|
||
@bindThis | ||
private async getMiNotes(ps: { | ||
untilId: string | null, | ||
sinceId: string | null, | ||
limit: number, | ||
allowPartial: boolean, | ||
me?: { id: MiUser['id'] } | undefined | null, | ||
useDbFallback: boolean, | ||
redisTimelines: string[], | ||
noteFilter: (note: MiNote) => boolean, | ||
dbFallback: (untilId: string | null, sinceId: string | null, limit: number) => Promise<MiNote[]>, | ||
}): Promise<MiNote[]> { | ||
let noteIds: string[]; | ||
let shouldFallbackToDb = false; | ||
|
||
// 呼び出し元と以下の処理をシンプルにするためにdbFallbackを置き換える | ||
if (!ps.useDbFallback) ps.dbFallback = () => Promise.resolve([]); | ||
|
||
const redisResult = await this.fanoutTimelineService.getMulti(ps.redisTimelines, ps.untilId, ps.sinceId); | ||
|
||
const redisResultIds = Array.from(new Set(redisResult.flat(1))); | ||
|
||
redisResultIds.sort((a, b) => a > b ? -1 : 1); | ||
noteIds = redisResultIds.slice(0, ps.limit); | ||
|
||
shouldFallbackToDb = shouldFallbackToDb || (noteIds.length === 0); | ||
|
||
if (!shouldFallbackToDb) { | ||
const redisTimeline: MiNote[] = []; | ||
let readFromRedis = 0; | ||
let lastSuccessfulRate = 1; // rateをキャッシュする? | ||
let trialCount = 1; | ||
|
||
while ((redisResultIds.length - readFromRedis) !== 0) { | ||
const remainingToRead = ps.limit - redisTimeline.length; | ||
|
||
// DBからの取り直しを減らす初回と同じ割合以上で成功すると仮定するが、クエリの長さを考えて三倍まで | ||
const countToGet = remainingToRead * Math.ceil(Math.min(1.1 / lastSuccessfulRate, 3)); | ||
noteIds = redisResultIds.slice(readFromRedis, readFromRedis + countToGet); | ||
|
||
readFromRedis += noteIds.length; | ||
|
||
const gotFromDb = await this.getAndFilterFromDb(noteIds, ps.noteFilter); | ||
redisTimeline.push(...gotFromDb); | ||
lastSuccessfulRate = gotFromDb.length / noteIds.length; | ||
|
||
console.log(`fanoutTimelineTrial#${trialCount++}: req: ${ps.limit}, tried: ${noteIds.length}, got: ${gotFromDb.length}, rate: ${lastSuccessfulRate}, total: ${redisTimeline.length}, fromRedis: ${redisResultIds.length}`); | ||
|
||
if (ps.allowPartial ? redisTimeline.length !== 0 : redisTimeline.length >= ps.limit) { | ||
// 十分Redisからとれた | ||
return redisTimeline.slice(0, ps.limit); | ||
} | ||
} | ||
|
||
// まだ足りない分はDBにフォールバック | ||
const remainingToRead = ps.limit - redisTimeline.length; | ||
const gotFromDb = await ps.dbFallback(noteIds[noteIds.length - 1], ps.sinceId, remainingToRead); | ||
redisTimeline.push(...gotFromDb); | ||
console.log(`fanoutTimelineTrial#db: req: ${ps.limit}, tried: ${remainingToRead}, got: ${gotFromDb.length}, since: ${noteIds[noteIds.length - 1]}, until: ${ps.untilId}, total: ${redisTimeline.length}`); | ||
return redisTimeline; | ||
} | ||
|
||
return await ps.dbFallback(ps.untilId, ps.sinceId, ps.limit); | ||
} | ||
|
||
private async getAndFilterFromDb(noteIds: string[], noteFilter: (note: MiNote) => boolean): Promise<MiNote[]> { | ||
const query = this.notesRepository.createQueryBuilder('note') | ||
.where('note.id IN (:...noteIds)', { noteIds: noteIds }) | ||
.innerJoinAndSelect('note.user', 'user') | ||
.leftJoinAndSelect('note.reply', 'reply') | ||
.leftJoinAndSelect('note.renote', 'renote') | ||
.leftJoinAndSelect('reply.user', 'replyUser') | ||
.leftJoinAndSelect('renote.user', 'renoteUser') | ||
.leftJoinAndSelect('note.channel', 'channel'); | ||
|
||
const notes = (await query.getMany()).filter(noteFilter); | ||
|
||
notes.sort((a, b) => a.id > b.id ? -1 : 1); | ||
|
||
return notes; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.