Skip to content

Commit

Permalink
fix: reading deletion on failure (#488)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekystyle authored Nov 13, 2024
1 parent cecb938 commit cd5a7fe
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 15 deletions.
2 changes: 2 additions & 0 deletions src/enmon/enmon.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { UploadReadingRepository } from './upload-reading.repository.js';
import { EnmonApiClient } from './ApiClient.js';
import { READINGS_QUEUE_NAME } from './constants.js';
import { Config } from '../config/schemas.js';
import { UploadErrorFilter } from './upload-error.filter.js';

export interface ModuleOptions {
contactEmail?: string | undefined;
Expand All @@ -26,6 +27,7 @@ export interface ModuleOptions {
providers: [
ReadingProcessor,
UploadReadingRepository,
UploadErrorFilter,
{
provide: EnmonApiClient,
inject: [ConfigService],
Expand Down
28 changes: 13 additions & 15 deletions src/enmon/reading.processor.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { AsyncLocalStorage } from 'node:async_hooks';
import { Inject, Injectable, Logger } from '@nestjs/common';
import { Define, Queue } from 'agenda-nest';
import { AxiosError } from 'axios';
import { EnmonApiClient } from './ApiClient.js';
import { UploadReading } from './upload-reading.schema.js';
import { UploadReadingRepository } from './upload-reading.repository.js';
import { READINGS_QUEUE_NAME, UPLOAD_JOB_NAME } from './constants.js';
import { UploadErrorFilter } from './upload-error.filter.js';

@Injectable()
@Queue(READINGS_QUEUE_NAME)
Expand All @@ -19,6 +19,8 @@ export class ReadingProcessor {
private uploadDataRepository: UploadReadingRepository,
@Inject(AsyncLocalStorage)
private als: AsyncLocalStorage<{ readingId: unknown }>,
@Inject(UploadErrorFilter)
private uploadErrorFilter: UploadErrorFilter,
) {}

@Define(UPLOAD_JOB_NAME)
Expand All @@ -30,12 +32,17 @@ export class ReadingProcessor {
// eslint-disable-next-line no-restricted-syntax
for await (const data of cursor) {
await this.als.run({ readingId: data._id }, async () => {
this.logger.log('uploading reading...');
await this.uploadReading(data).catch(reason => this.handleUploadReadingError(reason));
this.logger.log({ message: 'uploading reading...', id: data._id });

// eslint-disable-next-line no-underscore-dangle
this.logger.log('deleting uploaded reading...');
await data.deleteOne();
try {
await this.uploadReading(data);

// eslint-disable-next-line no-underscore-dangle
this.logger.log('deleting uploaded reading...');
await data.deleteOne();
} catch (e) {
this.uploadErrorFilter.catch(e);
}
});
}

Expand Down Expand Up @@ -63,13 +70,4 @@ export class ReadingProcessor {

this.logger.log({ message: 'reading uploaded', status, statusText });
}

private handleUploadReadingError(e: unknown) {
if (e instanceof AxiosError) {
const { status, statusText } = e.response ?? {};
this.logger.log({ message: 'upload reading failed', status, statusText, data: e.response?.data as unknown });
} else {
throw e;
}
}
}
23 changes: 23 additions & 0 deletions src/enmon/upload-error.filter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { Injectable, Logger } from '@nestjs/common';
import { AxiosError } from 'axios';

@Injectable()
export class UploadErrorFilter {
private readonly logger = new Logger(UploadErrorFilter.name);

catch(e: unknown) {
if (e instanceof AxiosError) {
const { status, statusText } = e.response ?? {};
this.logger.log({
message: `upload reading failed: ${e.message}`,
reason: e.message,
error: e.toJSON(),
status,
statusText,
data: e.response?.data as unknown,
});
}

throw e;
}
}

0 comments on commit cd5a7fe

Please sign in to comment.