Skip to content

Commit

Permalink
wip batch imports
Browse files Browse the repository at this point in the history
  • Loading branch information
etnoy committed Dec 12, 2024
1 parent 69b273d commit f980219
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 79 deletions.
2 changes: 1 addition & 1 deletion e2e/test-assets
Submodule test-assets updated 0 files
1 change: 1 addition & 0 deletions server/src/interfaces/asset.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ export const IAssetRepository = 'IAssetRepository';

export interface IAssetRepository {
create(asset: AssetCreate): Promise<AssetEntity>;
createAll(assets: AssetCreate[]): Promise<AssetEntity[]>;
getByIds(
ids: string[],
relations?: FindOptionsRelations<AssetEntity>,
Expand Down
4 changes: 2 additions & 2 deletions server/src/interfaces/job.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export enum JobName {
// library management
LIBRARY_QUEUE_SYNC_FILES = 'library-queue-sync-files',
LIBRARY_QUEUE_SYNC_ASSETS = 'library-queue-sync-assets',
LIBRARY_SYNC_FILE = 'library-sync-file',
LIBRARY_SYNC_FILES = 'library-sync-files',
LIBRARY_SYNC_ASSETS = 'library-sync-assets',
LIBRARY_DELETE = 'library-delete',
LIBRARY_QUEUE_SYNC_ALL = 'library-queue-sync-all',
Expand Down Expand Up @@ -290,7 +290,7 @@ export type JobItem =
| { name: JobName.ASSET_DELETION_CHECK; data?: IBaseJob }

// Library Management
| { name: JobName.LIBRARY_SYNC_FILE; data: ILibraryFileJob }
| { name: JobName.LIBRARY_SYNC_FILES; data: ILibraryFileJob }
| { name: JobName.LIBRARY_QUEUE_SYNC_FILES; data: IEntityJob }
| { name: JobName.LIBRARY_QUEUE_SYNC_ASSETS; data: IEntityJob }
| { name: JobName.LIBRARY_SYNC_ASSETS; data: ILibraryBulkIdsJob }
Expand Down
4 changes: 4 additions & 0 deletions server/src/repositories/asset.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ export class AssetRepository implements IAssetRepository {
return this.repository.save(asset);
}

createAll(assets: AssetCreate[]): Promise<AssetEntity[]> {
return this.repository.save(assets);
}

@GenerateSql({ params: [[DummyValue.UUID], { day: 1, month: 1 }] })
getByDayOfYear(ownerIds: string[], { day, month }: MonthDay): Promise<AssetEntity[]> {
return this.repository
Expand Down
111 changes: 35 additions & 76 deletions server/src/services/library.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,15 @@ export class LibraryService extends BaseService {
return mapLibrary(library);
}

@OnJob({ name: JobName.LIBRARY_SYNC_FILES, queue: QueueName.LIBRARY })
private async syncFiles({ id, ownerId }: LibraryEntity, assetPaths: string[]) {
await this.jobRepository.queueAll(
assetPaths.map((assetPath) => ({
name: JobName.LIBRARY_SYNC_FILE,
data: {
libraryId: id,
assetPath,
ownerId,
},
})),
);
const assets = assetPaths.map((assetPath) => this.prepareAsset(assetPath, ownerId, id));

const assetIds = await this.assetRepository.createAll(assets).then((assets) => assets.map((asset) => asset.id));

await this.queuePostSyncJobs(assetIds);

return JobStatus.SUCCESS;
}

private async validateImportPath(importPath: string): Promise<ValidateLibraryImportPathResponseDto> {
Expand Down Expand Up @@ -332,72 +330,26 @@ export class LibraryService extends BaseService {
return JobStatus.SUCCESS;
}

@OnJob({ name: JobName.LIBRARY_SYNC_FILE, queue: QueueName.LIBRARY })
async handleSyncFile(job: JobOf<JobName.LIBRARY_SYNC_FILE>): Promise<JobStatus> {
/* For performance reasons, we don't check if the asset is already imported.
This is instead handled by a previous step in the scan process.
In the edge case of an asset being imported between that check
and this function call, the database constraint will prevent duplicates.
*/

const assetPath = path.normalize(job.assetPath);

// TODO: we can replace this get call with an exists call
/* let asset = await this.assetRepository.getByLibraryIdAndOriginalPath(job.libraryId, assetPath);
if (asset) {
return await this.handleSyncAssets({ libraryId: job.libraryId, assetIds: [asset.id] });
} */

this.logger.log(`Importing new asset ${assetPath} into library ${job.libraryId}`);

let stat;
try {
stat = await this.storageRepository.stat(assetPath);
} catch (error: any) {
if (error.code === 'ENOENT') {
this.logger.error(`File not found: ${assetPath}`);
return JobStatus.SKIPPED;
}
this.logger.error(`Error reading file: ${assetPath}. Error: ${error}`);
return JobStatus.FAILED;
}

// TODO: device asset id is deprecated, remove it
const deviceAssetId = `${basename(assetPath)}`.replaceAll(/\s+/g, '');

const pathHash = this.cryptoRepository.hashSha1(`path:${assetPath}`);

// TODO: doesn't xmp replace the file extension? Will need investigation
let sidecarPath: string | null = null;
if (await this.storageRepository.checkFileExists(`${assetPath}.xmp`, R_OK)) {
sidecarPath = `${assetPath}.xmp`;
}

const assetType = mimeTypes.isVideo(assetPath) ? AssetType.VIDEO : AssetType.IMAGE;
private prepareAsset(assetPath: string, ownerId: string, libraryId: string) {
const normalizedPath = path.normalize(assetPath);

const mtime = stat.mtime;
const now = new Date();

const asset = await this.assetRepository.create({
ownerId: job.ownerId,
libraryId: job.libraryId,
checksum: pathHash,
originalPath: assetPath,
deviceAssetId,
return {
ownerId: ownerId,

Check failure on line 339 in server/src/services/library.service.ts

View workflow job for this annotation

GitHub Actions / Test & Lint Server

Expected property shorthand
libraryId: libraryId,

Check failure on line 340 in server/src/services/library.service.ts

View workflow job for this annotation

GitHub Actions / Test & Lint Server

Expected property shorthand
checksum: this.cryptoRepository.hashSha1(`path:${assetPath}`),
originalPath: normalizedPath,
// TODO: device asset id is deprecated, remove it
deviceAssetId: `${basename(normalizedPath)}`.replaceAll(/\s+/g, ''),
deviceId: 'Library Import',
fileCreatedAt: mtime,
fileModifiedAt: mtime,
localDateTime: mtime,
type: assetType,
originalFileName: parse(assetPath).base,
sidecarPath,
fileCreatedAt: now,
fileModifiedAt: now,
localDateTime: now,
type: mimeTypes.isVideo(assetPath) ? AssetType.VIDEO : AssetType.IMAGE,
originalFileName: parse(normalizedPath).base,
isExternal: true,
});

this.logger.debug(`Queueing metadata extraction for: ${asset.originalPath}`);

await this.queuePostSyncJobs([asset.id]);

return JobStatus.SUCCESS;
};
}

async queuePostSyncJobs(assetIds: string[]) {
Expand All @@ -407,6 +359,13 @@ export class LibraryService extends BaseService {
data: { id: assetId, source: 'upload' },
})),
);

await this.jobRepository.queueAll(
assetIds.map((assetId) => ({
name: JobName.SIDECAR_DISCOVERY,
data: { id: assetId, source: 'upload' },
})),
);
}

async queueScan(id: string) {
Expand Down Expand Up @@ -607,21 +566,21 @@ export class LibraryService extends BaseService {
importCount += newPaths.length;
await this.syncFiles(library, newPaths);
if (newPaths.length < pathBatch.length) {
this.logger.log(
this.logger.debug(
`Current crawl batch: ${newPaths.length} of ${pathBatch.length} file(s) are new, queued import for library ${library.id}...`,
);
} else {
this.logger.log(
this.logger.debug(
`Current crawl batch: ${newPaths.length} new file(s), queued import for library ${library.id}...`,
);
}
} else {
this.logger.log(`Current crawl batch: ${pathBatch.length} asset(s) already in library ${library.id}`);
this.logger.debug(`Current crawl batch: ${pathBatch.length} asset(s) already in library ${library.id}`);
}
}

if (importCount > 0 && importCount === crawlCount) {
this.logger.log(`Finished crawling and queueing ${crawlCount} files for import for library ${library.id}`);
this.logger.log(`Finished crawling and queueing ${crawlCount} file(s) for import for library ${library.id}`);
} else if (importCount > 0) {
this.logger.log(
`Finished crawling ${crawlCount} file(s) of which ${importCount} are queued for import for library ${library.id}`,
Expand Down

0 comments on commit f980219

Please sign in to comment.