Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: queueing bulk follow/unfollow and block/unblock #10544

Merged
merged 13 commits into from
Apr 12, 2023
13 changes: 11 additions & 2 deletions packages/backend/src/core/QueueModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Bull from 'bull';
import { DI } from '@/di-symbols.js';
import type { Config } from '@/config.js';
import type { Provider } from '@nestjs/common';
import type { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData, EndedPollNotificationJobData, WebhookDeliverJobData } from '../queue/types.js';
import type { DeliverJobData, InboxJobData, DbJobData, ObjectStorageJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData, DbJobMap } from '../queue/types.js';

function q<T>(config: Config, name: string, limitPerSec = -1) {
return new Bull<T>(name, {
Expand Down Expand Up @@ -41,7 +41,8 @@ export type SystemQueue = Bull.Queue<Record<string, unknown>>;
export type EndedPollNotificationQueue = Bull.Queue<EndedPollNotificationJobData>;
export type DeliverQueue = Bull.Queue<DeliverJobData>;
export type InboxQueue = Bull.Queue<InboxJobData>;
export type DbQueue = Bull.Queue<DbJobData>;
export type DbQueue = Bull.Queue<DbJobData<keyof DbJobMap>>;
export type RelationshipQueue = Bull.Queue<RelationshipJobData>;
export type ObjectStorageQueue = Bull.Queue<ObjectStorageJobData>;
export type WebhookDeliverQueue = Bull.Queue<WebhookDeliverJobData>;

Expand Down Expand Up @@ -75,6 +76,12 @@ const $db: Provider = {
inject: [DI.config],
};

const $relationship: Provider = {
provide: 'queue:relationship',
useFactory: (config: Config) => q(config, 'relationship'),
inject: [DI.config],
};

const $objectStorage: Provider = {
provide: 'queue:objectStorage',
useFactory: (config: Config) => q(config, 'objectStorage'),
Expand All @@ -96,6 +103,7 @@ const $webhookDeliver: Provider = {
$deliver,
$inbox,
$db,
$relationship,
$objectStorage,
$webhookDeliver,
],
Expand All @@ -105,6 +113,7 @@ const $webhookDeliver: Provider = {
$deliver,
$inbox,
$db,
$relationship,
$objectStorage,
$webhookDeliver,
],
Expand Down
113 changes: 94 additions & 19 deletions packages/backend/src/core/QueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import type { Webhook, webhookEventTypes } from '@/models/entities/Webhook.js';
import type { Config } from '@/config.js';
import { DI } from '@/di-symbols.js';
import { bindThis } from '@/decorators.js';
import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js';
import type { ThinUser } from '../queue/types.js';
import type { DbQueue, DeliverQueue, EndedPollNotificationQueue, InboxQueue, ObjectStorageQueue, RelationshipQueue, SystemQueue, WebhookDeliverQueue } from './QueueModule.js';
import type { DbJobData, RelationshipJobData, ThinUser } from '../queue/types.js';
import type httpSignature from '@peertube/http-signature';
import Bull from 'bull';

@Injectable()
export class QueueService {
Expand All @@ -21,6 +22,7 @@ export class QueueService {
@Inject('queue:deliver') public deliverQueue: DeliverQueue,
@Inject('queue:inbox') public inboxQueue: InboxQueue,
@Inject('queue:db') public dbQueue: DbQueue,
@Inject('queue:relationship') public relationshipQueue: RelationshipQueue,
@Inject('queue:objectStorage') public objectStorageQueue: ObjectStorageQueue,
@Inject('queue:webhookDeliver') public webhookDeliverQueue: WebhookDeliverQueue,
) {}
Expand Down Expand Up @@ -56,7 +58,7 @@ export class QueueService {
activity: activity,
signature,
};

return this.inboxQueue.add(data, {
attempts: this.config.inboxJobMaxAttempts ?? 8,
timeout: 5 * 60 * 1000, // 5min
Expand All @@ -71,7 +73,7 @@ export class QueueService {
@bindThis
public createDeleteDriveFilesJob(user: ThinUser) {
return this.dbQueue.add('deleteDriveFiles', {
user: user,
user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
Expand All @@ -81,7 +83,7 @@ export class QueueService {
@bindThis
public createExportCustomEmojisJob(user: ThinUser) {
return this.dbQueue.add('exportCustomEmojis', {
user: user,
user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
Expand All @@ -91,7 +93,7 @@ export class QueueService {
@bindThis
public createExportNotesJob(user: ThinUser) {
return this.dbQueue.add('exportNotes', {
user: user,
user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
Expand All @@ -101,7 +103,7 @@ export class QueueService {
@bindThis
public createExportFavoritesJob(user: ThinUser) {
return this.dbQueue.add('exportFavorites', {
user: user,
user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
Expand All @@ -111,7 +113,7 @@ export class QueueService {
@bindThis
public createExportFollowingJob(user: ThinUser, excludeMuting = false, excludeInactive = false) {
return this.dbQueue.add('exportFollowing', {
user: user,
user: { id: user.id },
excludeMuting,
excludeInactive,
}, {
Expand All @@ -123,7 +125,7 @@ export class QueueService {
@bindThis
public createExportMuteJob(user: ThinUser) {
return this.dbQueue.add('exportMuting', {
user: user,
user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
Expand All @@ -133,7 +135,7 @@ export class QueueService {
@bindThis
public createExportBlockingJob(user: ThinUser) {
return this.dbQueue.add('exportBlocking', {
user: user,
user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
Expand All @@ -143,7 +145,7 @@ export class QueueService {
@bindThis
public createExportUserListsJob(user: ThinUser) {
return this.dbQueue.add('exportUserLists', {
user: user,
user: { id: user.id },
}, {
removeOnComplete: true,
removeOnFail: true,
Expand All @@ -153,18 +155,24 @@ export class QueueService {
@bindThis
public createImportFollowingJob(user: ThinUser, fileId: DriveFile['id']) {
return this.dbQueue.add('importFollowing', {
user: user,
user: { id: user.id },
fileId: fileId,
}, {
removeOnComplete: true,
removeOnFail: true,
});
}

@bindThis
public createImportFollowingToDbJob(user: ThinUser, targets: string[]) {
const jobs = targets.map(rel => this.generateToDbJobData('importFollowingToDb', { user, target: rel }));
return this.dbQueue.addBulk(jobs);
}

@bindThis
public createImportMutingJob(user: ThinUser, fileId: DriveFile['id']) {
return this.dbQueue.add('importMuting', {
user: user,
user: { id: user.id },
fileId: fileId,
}, {
removeOnComplete: true,
Expand All @@ -175,18 +183,40 @@ export class QueueService {
@bindThis
public createImportBlockingJob(user: ThinUser, fileId: DriveFile['id']) {
return this.dbQueue.add('importBlocking', {
user: user,
user: { id: user.id },
fileId: fileId,
}, {
removeOnComplete: true,
removeOnFail: true,
});
}

@bindThis
public createImportBlockingToDbJob(user: ThinUser, targets: string[]) {
const jobs = targets.map(rel => this.generateToDbJobData('importBlockingToDb', { user, target: rel }));
return this.dbQueue.addBulk(jobs);
}

@bindThis
private generateToDbJobData<T extends 'importFollowingToDb' | 'importBlockingToDb', D extends DbJobData<T>>(name: T, data: D): {
name: string,
data: D,
opts: Bull.JobOptions,
} {
return {
name,
data,
opts: {
removeOnComplete: true,
removeOnFail: true,
},
};
}

@bindThis
public createImportUserListsJob(user: ThinUser, fileId: DriveFile['id']) {
return this.dbQueue.add('importUserLists', {
user: user,
user: { id: user.id },
fileId: fileId,
}, {
removeOnComplete: true,
Expand All @@ -197,7 +227,7 @@ export class QueueService {
@bindThis
public createImportCustomEmojisJob(user: ThinUser, fileId: DriveFile['id']) {
return this.dbQueue.add('importCustomEmojis', {
user: user,
user: { id: user.id },
fileId: fileId,
}, {
removeOnComplete: true,
Expand All @@ -208,14 +238,59 @@ export class QueueService {
@bindThis
public createDeleteAccountJob(user: ThinUser, opts: { soft?: boolean; } = {}) {
return this.dbQueue.add('deleteAccount', {
user: user,
user: { id: user.id },
soft: opts.soft,
}, {
removeOnComplete: true,
removeOnFail: true,
});
}

@bindThis
public createFollowJob(followings: { from: ThinUser, to: ThinUser, requestId?: string, silent?: boolean }[]) {
const jobs = followings.map(rel => this.generateRelationshipJobData('follow', rel));
return this.relationshipQueue.addBulk(jobs);
}

@bindThis
public createUnfollowJob(followings: { from: ThinUser, to: ThinUser, requestId?: string }[]) {
const jobs = followings.map(rel => this.generateRelationshipJobData('unfollow', rel));
return this.relationshipQueue.addBulk(jobs);
}

@bindThis
public createBlockJob(blockings: { from: ThinUser, to: ThinUser, silent?: boolean }[]) {
const jobs = blockings.map(rel => this.generateRelationshipJobData('block', rel));
return this.relationshipQueue.addBulk(jobs);
}

@bindThis
public createUnblockJob(blockings: { from: ThinUser, to: ThinUser, silent?: boolean }[]) {
const jobs = blockings.map(rel => this.generateRelationshipJobData('unblock', rel));
return this.relationshipQueue.addBulk(jobs);
}

@bindThis
private generateRelationshipJobData(name: 'follow' | 'unfollow' | 'block' | 'unblock', data: RelationshipJobData): {
name: string,
data: RelationshipJobData,
opts: Bull.JobOptions,
} {
return {
name,
data: {
from: { id: data.from.id },
to: { id: data.to.id },
silent: data.silent,
requestId: data.requestId,
},
opts: {
removeOnComplete: true,
removeOnFail: true,
},
};
}

@bindThis
public createDeleteObjectStorageFileJob(key: string) {
return this.objectStorageQueue.add('deleteFile', {
Expand Down Expand Up @@ -246,7 +321,7 @@ export class QueueService {
createdAt: Date.now(),
eventId: uuid(),
};

return this.webhookDeliverQueue.add(data, {
attempts: 4,
timeout: 1 * 60 * 1000, // 1min
Expand All @@ -264,7 +339,7 @@ export class QueueService {
//deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
this.deliverQueue.clean(0, 'delayed');

this.inboxQueue.once('cleaned', (jobs, status) => {
//inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
});
Expand Down
16 changes: 8 additions & 8 deletions packages/backend/src/core/UserBlockingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export class UserBlockingService implements OnModuleInit {

constructor(
private moduleRef: ModuleRef,

@Inject(DI.followRequestsRepository)
private followRequestsRepository: FollowRequestsRepository,

Expand Down Expand Up @@ -54,12 +54,12 @@ export class UserBlockingService implements OnModuleInit {
}

@bindThis
public async block(blocker: User, blockee: User) {
public async block(blocker: User, blockee: User, silent = false) {
await Promise.all([
this.cancelRequest(blocker, blockee),
this.cancelRequest(blockee, blocker),
this.userFollowingService.unfollow(blocker, blockee),
this.userFollowingService.unfollow(blockee, blocker),
this.cancelRequest(blocker, blockee, silent),
this.cancelRequest(blockee, blocker, silent),
this.userFollowingService.unfollow(blocker, blockee, silent),
this.userFollowingService.unfollow(blockee, blocker, silent),
this.removeFromList(blockee, blocker),
]);

Expand Down Expand Up @@ -89,7 +89,7 @@ export class UserBlockingService implements OnModuleInit {
}

@bindThis
private async cancelRequest(follower: User, followee: User) {
private async cancelRequest(follower: User, followee: User, silent = false) {
const request = await this.followRequestsRepository.findOneBy({
followeeId: followee.id,
followerId: follower.id,
Expand All @@ -110,7 +110,7 @@ export class UserBlockingService implements OnModuleInit {
}).then(packed => this.globalEventService.publishMainStream(followee.id, 'meUpdated', packed));
}

if (this.userEntityService.isLocalUser(follower)) {
if (this.userEntityService.isLocalUser(follower) && !silent) {
this.userEntityService.pack(followee, follower, {
detail: true,
}).then(async packed => {
Expand Down
9 changes: 5 additions & 4 deletions packages/backend/src/core/UserFollowingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class UserFollowingService implements OnModuleInit {

constructor(
private moduleRef: ModuleRef,

@Inject(DI.usersRepository)
private usersRepository: UsersRepository,

Expand Down Expand Up @@ -79,7 +79,7 @@ export class UserFollowingService implements OnModuleInit {
}

@bindThis
public async follow(_follower: { id: User['id'] }, _followee: { id: User['id'] }, requestId?: string): Promise<void> {
public async follow(_follower: { id: User['id'] }, _followee: { id: User['id'] }, requestId?: string, silent = false): Promise<void> {
const [follower, followee] = await Promise.all([
this.usersRepository.findOneByOrFail({ id: _follower.id }),
this.usersRepository.findOneByOrFail({ id: _followee.id }),
Expand Down Expand Up @@ -139,7 +139,7 @@ export class UserFollowingService implements OnModuleInit {
}
}

await this.insertFollowingDoc(followee, follower);
await this.insertFollowingDoc(followee, follower, silent);

if (this.userEntityService.isRemoteUser(follower) && this.userEntityService.isLocalUser(followee)) {
const content = this.apRendererService.addContext(this.apRendererService.renderAccept(this.apRendererService.renderFollow(follower, followee, requestId), followee));
Expand All @@ -155,6 +155,7 @@ export class UserFollowingService implements OnModuleInit {
follower: {
id: User['id']; host: User['host']; uri: User['host']; inbox: User['inbox']; sharedInbox: User['sharedInbox']
},
silent = false,
): Promise<void> {
if (follower.id === followee.id) return;

Expand Down Expand Up @@ -233,7 +234,7 @@ export class UserFollowingService implements OnModuleInit {
this.perUserFollowingChart.update(follower, followee, true);

// Publish follow event
if (this.userEntityService.isLocalUser(follower)) {
if (this.userEntityService.isLocalUser(follower) && !silent) {
this.userEntityService.pack(followee.id, follower, {
detail: true,
}).then(async packed => {
Expand Down
Loading