diff --git a/.gitignore b/.gitignore index 8522abf189..7dbadf1ce1 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,6 @@ custom_uploads/* test app/api/files/specs/file1 app/api/files/specs/file2 +**/redis-bin +dump.rdb + diff --git a/app/api/config.ts b/app/api/config.ts index cde646219b..0b2f58e3dd 100644 --- a/app/api/config.ts +++ b/app/api/config.ts @@ -47,10 +47,11 @@ export const config = { customUploads: CUSTOM_UPLOADS_FOLDER || `${rootPath}/custom_uploads/`, temporalFiles: TEMPORAL_FILES_FOLDER || `${rootPath}/temporal_files/`, }, + externalServices: Boolean(process.env.EXTERNAL_SERVICES) || false, redis: { activated: CLUSTER_MODE, host: process.env.REDIS_HOST || 'localhost', - port: process.env.REDIS_PORT || 6379, + port: parseInt(process.env.REDIS_PORT || '', 10) || 6379, }, }; diff --git a/app/api/files/filesystem.ts b/app/api/files/filesystem.ts index 9777d642af..c5e7ebe5c5 100644 --- a/app/api/files/filesystem.ts +++ b/app/api/files/filesystem.ts @@ -136,6 +136,8 @@ const streamToString = async (stream: Readable): Promise => const getFileContent = async (fileName: FilePath): Promise => asyncFS.readFile(uploadsPath(fileName), 'utf8'); +const readFile = async (fileName: FilePath): Promise => asyncFS.readFile(fileName); + export { setupTestUploadedPaths, deleteUploadedFiles, @@ -154,4 +156,5 @@ export { activityLogPath, writeFile, appendFile, + readFile, }; diff --git a/app/api/services/pdfsegmentation/PDFSegmentation.ts b/app/api/services/pdfsegmentation/PDFSegmentation.ts new file mode 100644 index 0000000000..0f6e06ce3f --- /dev/null +++ b/app/api/services/pdfsegmentation/PDFSegmentation.ts @@ -0,0 +1,198 @@ +import { TaskManager, ResultsMessage } from 'api/services/tasksmanager/TaskManager'; +import { uploadsPath, fileFromReadStream, createDirIfNotExists, readFile } from 'api/files'; +import { Readable } from 'stream'; +import urljoin from 'url-join'; +import filesModel from 'api/files/filesModel'; +import path from 'path'; +import { FileType } from 'shared/types/fileType'; +import { Settings } from 'shared/types/settingsType'; +import settings from 'api/settings/settings'; +import { tenants } from 'api/tenants/tenantContext'; +import { ObjectIdSchema } from 'shared/types/commonTypes'; +import request from 'shared/JSONRequest'; +import { handleError } from 'api/utils'; +import { SegmentationModel } from './segmentationModel'; + +class PDFSegmentation { + SERVICE_NAME = 'segmentation'; + + public segmentationTaskManager: TaskManager; + + templatesWithInformationExtraction: string[] | undefined; + + features: Settings | undefined; + + batchSize = 10; + + constructor() { + this.segmentationTaskManager = new TaskManager({ + serviceName: this.SERVICE_NAME, + processResults: this.processResults, + }); + } + + segmentOnePdf = async ( + file: FileType & { filename: string; _id: ObjectIdSchema }, + serviceUrl: string, + tenant: string + ) => { + try { + const fileContent = await readFile(uploadsPath(file.filename)); + await request.uploadFile(urljoin(serviceUrl, tenant), file.filename, fileContent); + + await this.segmentationTaskManager.startTask({ + task: this.SERVICE_NAME, + tenant, + params: { + filename: file.filename, + }, + }); + + await this.storeProcess(file._id, file.filename); + } catch (err) { + if (err.code === 'ENOENT') { + await this.storeProcess(file._id, file.filename, false); + handleError(err); + return; + } + + throw err; + } + }; + + storeProcess = async (fileID: ObjectIdSchema, filename: string, proccessing = true) => + SegmentationModel.save({ + fileID, + filename, + status: proccessing ? 'processing' : 'failed', + }); + + getFilesToSegment = async (): Promise => + filesModel.db.aggregate([ + { + $match: { + type: 'document', + filename: { $exists: true }, + }, + }, + { + $lookup: { + from: 'segmentations', + localField: '_id', + foreignField: 'fileID', + as: 'segmentation', + }, + }, + { + $match: { + segmentation: { + $size: 0, + }, + }, + }, + { + $limit: this.batchSize, + }, + ]); + + segmentPdfs = async () => { + const pendingTasks = await this.segmentationTaskManager.countPendingTasks(); + if (pendingTasks > 0) { + return; + } + + try { + await Promise.all( + Object.keys(tenants.tenants).map(async tenant => { + await tenants.run(async () => { + const settingsValues = await settings.get(); + const segmentationServiceConfig = settingsValues?.features?.segmentation; + + if (!segmentationServiceConfig) { + return; + } + + const filesToSegment = await this.getFilesToSegment(); + + for (let i = 0; i < filesToSegment.length; i += 1) { + // eslint-disable-next-line no-await-in-loop + await this.segmentOnePdf(filesToSegment[i], segmentationServiceConfig.url, tenant); + } + }, tenant); + }) + ); + } catch (err) { + if (err.code === 'ECONNREFUSED') { + await new Promise(resolve => { + setTimeout(resolve, 60000); + }); + } + handleError(err, { useContext: false }); + } + }; + + requestResults = async (message: ResultsMessage) => { + const response = await request.get(message.data_url); + const fileStream = (await fetch(message.file_url!)).body; + + if (!fileStream) { + throw new Error( + `Error requesting for segmentation file: ${message.params!.filename}, tenant: ${ + message.tenant + }` + ); + } + return { data: JSON.parse(response.json), fileStream: (fileStream as unknown) as Readable }; + }; + + storeXML = async (filename: string, fileStream: Readable) => { + const folderPath = uploadsPath(this.SERVICE_NAME); + await createDirIfNotExists(folderPath); + const xmlname = `${path.basename(filename, path.extname(filename))}.xml`; + + await fileFromReadStream(xmlname, fileStream, folderPath); + }; + + saveSegmentation = async (filename: string, data: any) => { + const [segmentation] = await SegmentationModel.get({ filename }); + // eslint-disable-next-line camelcase + const { paragraphs, page_height, page_width } = data; + await SegmentationModel.save({ + ...segmentation, + segmentation: { page_height, page_width, paragraphs }, + autoexpire: null, + status: 'ready', + }); + }; + + saveSegmentationError = async (filename: string) => { + const [segmentation] = await SegmentationModel.get({ filename }); + if (segmentation) { + await SegmentationModel.save({ + ...segmentation, + filename, + autoexpire: null, + status: 'failed', + }); + } + }; + + processResults = async (message: ResultsMessage): Promise => { + await tenants.run(async () => { + try { + if (!message.success) { + await this.saveSegmentationError(message.params!.filename); + return; + } + + const { data, fileStream } = await this.requestResults(message); + await this.storeXML(message.params!.filename, fileStream); + await this.saveSegmentation(message.params!.filename, data); + } catch (error) { + handleError(error); + } + }, message.tenant); + }; +} + +export { PDFSegmentation }; diff --git a/app/api/services/pdfsegmentation/segmentationModel.ts b/app/api/services/pdfsegmentation/segmentationModel.ts new file mode 100644 index 0000000000..92e3d0c1d2 --- /dev/null +++ b/app/api/services/pdfsegmentation/segmentationModel.ts @@ -0,0 +1,18 @@ +import mongoose from 'mongoose'; +import { instanceModel } from 'api/odm'; +import { SegmentationType } from 'shared/types/segmentationType'; + +const props = { + autoexpire: { type: Date, expires: 86400, default: Date.now }, // 24 hours + file: { type: mongoose.Schema.Types.ObjectId, ref: 'File' }, + status: { type: String, enum: ['processing', 'failed', 'ready'], default: 'processing' }, +}; + +const mongoSchema = new mongoose.Schema(props, { + emitIndexErrors: true, + strict: false, +}); + +const SegmentationModel = instanceModel('segmentations', mongoSchema); + +export { SegmentationModel }; diff --git a/app/api/services/pdfsegmentation/specs/PDFSegmentation.spec.ts b/app/api/services/pdfsegmentation/specs/PDFSegmentation.spec.ts new file mode 100644 index 0000000000..5c1006cc58 --- /dev/null +++ b/app/api/services/pdfsegmentation/specs/PDFSegmentation.spec.ts @@ -0,0 +1,311 @@ +/* eslint-disable camelcase */ +/* eslint-disable max-lines */ + +import { fixturer, createNewMongoDB } from 'api/utils/testing_db'; +import { MongoMemoryServer } from 'mongodb-memory-server'; +import { + fixturesOneFile, + fixturesOtherFile, + fixturesPdfNameA, + fixturesPdfNameB, + fixturesTwelveFiles, + fixturesFiveFiles, + fixturesMissingPdf, +} from 'api/services/pdfsegmentation/specs/fixtures'; + +import asyncFS from 'api/utils/async-fs'; +import path from 'path'; + +import { tenants } from 'api/tenants/tenantContext'; +import { DB } from 'api/odm'; +import { Db } from 'mongodb'; +import request from 'shared/JSONRequest'; + +import { PDFSegmentation } from '../PDFSegmentation'; +import { SegmentationModel } from '../segmentationModel'; +import { ExternalDummyService } from '../../tasksmanager/specs/ExternalDummyService'; + +jest.mock('api/services/tasksmanager/TaskManager.ts'); + +describe('PDFSegmentation', () => { + let segmentPdfs: PDFSegmentation; + + const tenantOne = { + name: 'tenantOne', + dbName: 'tenantOne', + indexName: 'tenantOne', + uploadedDocuments: `${__dirname}/uploads`, + attachments: `${__dirname}/uploads`, + customUploads: `${__dirname}/uploads`, + temporalFiles: `${__dirname}/uploads`, + }; + + const tenantTwo = { + name: 'tenantTwo', + dbName: 'tenantTwo', + indexName: 'tenantTwo', + uploadedDocuments: `${__dirname}/uploads`, + attachments: `${__dirname}/uploads`, + customUploads: `${__dirname}/uploads`, + temporalFiles: `${__dirname}/uploads`, + }; + + let dbOne: Db; + let dbTwo: Db; + let fileA: Buffer; + let fileB: Buffer; + let mongod: MongoMemoryServer; + + afterAll(async () => { + await DB.disconnect(); + await mongod.stop(); + }); + + beforeAll(async () => { + mongod = await createNewMongoDB(); + const mongoUri = mongod.getUri(); + await DB.connect(mongoUri); + }); + + beforeEach(async () => { + segmentPdfs = new PDFSegmentation(); + dbOne = DB.connectionForDB(tenantOne.dbName).db; + dbTwo = DB.connectionForDB(tenantTwo.dbName).db; + + tenants.tenants = { tenantOne }; + fileA = await asyncFS.readFile( + `app/api/services/pdfsegmentation/specs/uploads/${fixturesPdfNameA}` + ); + fileB = await asyncFS.readFile( + `app/api/services/pdfsegmentation/specs/uploads/${fixturesPdfNameA}` + ); + jest.spyOn(request, 'uploadFile').mockResolvedValue({}); + jest.resetAllMocks(); + }); + + it('should send the pdf', async () => { + await fixturer.clearAllAndLoad(dbOne, fixturesOneFile); + + await segmentPdfs.segmentPdfs(); + expect(request.uploadFile).toHaveBeenCalledWith( + 'http://localhost:1234/files/tenantOne', + fixturesPdfNameA, + fileA + ); + }); + + it('should send other pdf to segment', async () => { + await fixturer.clearAllAndLoad(dbOne, fixturesOtherFile); + await segmentPdfs.segmentPdfs(); + expect(request.uploadFile).toHaveBeenCalledWith( + 'http://localhost:1234/files/tenantOne', + fixturesPdfNameB, + fileB + ); + }); + + it('should send 10 pdfs to segment', async () => { + await fixturer.clearAllAndLoad(dbOne, fixturesTwelveFiles); + await segmentPdfs.segmentPdfs(); + expect(request.uploadFile).toHaveBeenCalledTimes(10); + }); + + it('should send pdfs from different tenants with the information extraction on', async () => { + await fixturer.clearAllAndLoad(dbOne, fixturesOneFile); + await fixturer.clearAllAndLoad(dbTwo, fixturesOtherFile); + tenants.tenants = { tenantOne, tenantTwo }; + + await segmentPdfs.segmentPdfs(); + + expect(request.uploadFile).toHaveBeenCalledTimes(2); + }); + + it('should start the tasks', async () => { + await fixturer.clearAllAndLoad(dbOne, fixturesOneFile); + + await segmentPdfs.segmentPdfs(); + + expect(segmentPdfs.segmentationTaskManager?.startTask).toHaveBeenCalledWith({ + params: { filename: 'documentA.pdf' }, + tenant: 'tenantOne', + task: 'segmentation', + }); + }); + + it('should store the segmentation process state', async () => { + await fixturer.clearAllAndLoad(dbOne, fixturesOneFile); + + await segmentPdfs.segmentPdfs(); + await tenants.run(async () => { + const [segmentation] = await SegmentationModel.get(); + expect(segmentation.status).toBe('processing'); + expect(segmentation.filename).toBe(fixturesPdfNameA); + expect(segmentation.fileID).toEqual(fixturesOneFile.files![0]._id); + }, 'tenantOne'); + }); + + it('should only send pdfs not already segmented or in the process', async () => { + await fixturer.clearAllAndLoad(dbOne, fixturesFiveFiles); + await dbOne.collection('segmentations').insertMany([ + { + filename: fixturesFiveFiles.files![0].filename, + fileID: fixturesFiveFiles.files![0]._id, + status: 'processing', + }, + ]); + + await segmentPdfs.segmentPdfs(); + + expect(segmentPdfs.segmentationTaskManager?.startTask).toHaveBeenCalledTimes(4); + }); + + describe('if the file is missing', () => { + it('should throw an error and store the segmentation as failed', async () => { + await fixturer.clearAllAndLoad(dbOne, fixturesMissingPdf); + + await segmentPdfs.segmentPdfs(); + + await tenants.run(async () => { + const segmentations = await SegmentationModel.get(); + const [segmentation] = segmentations; + expect(segmentation.status).toBe('failed'); + expect(segmentation.filename).toBe(fixturesMissingPdf.files![0].filename); + expect(segmentations.length).toBe(1); + }, 'tenantOne'); + }); + }); + + describe('when there is pending tasks', () => { + it('should not put more', async () => { + await fixturer.clearAllAndLoad(dbOne, fixturesFiveFiles); + + segmentPdfs.segmentationTaskManager!.countPendingTasks = async () => Promise.resolve(10); + + await segmentPdfs.segmentPdfs(); + + expect(segmentPdfs.segmentationTaskManager?.startTask).not.toHaveBeenCalled(); + }); + }); + + describe('when there is NOT segmentation config', () => { + it('should do nothing', async () => { + await fixturer.clearAllAndLoad(dbOne, { ...fixturesOneFile, settings: [{}] }); + await segmentPdfs.segmentPdfs(); + + expect(segmentPdfs.segmentationTaskManager?.startTask).not.toHaveBeenCalled(); + }); + }); + + describe('when the segmentation finsihes', () => { + let segmentationExternalService: ExternalDummyService; + let segmentationData: { + page_width: number; + page_height: number; + paragraphs: object[]; + }; + let segmentationFolder: string; + beforeEach(async () => { + await fixturer.clearAllAndLoad(dbOne, fixturesOneFile); + await segmentPdfs.segmentPdfs(); + segmentationFolder = path.join(tenantOne.uploadedDocuments, 'segmentation'); + if (await asyncFS.exists(segmentationFolder)) { + await asyncFS.rmdir(segmentationFolder, { recursive: true }); + } + segmentationExternalService = new ExternalDummyService(1235); + await segmentationExternalService.start(); + + segmentationData = { + page_width: 600, + page_height: 1200, + paragraphs: [ + { + left: 30, + top: 45, + width: 400, + height: 120, + page_number: 1, + text: 'El veloz murciélago hindú comía feliz cardillo y kiwi.', + }, + ], + }; + segmentationExternalService.setResults(segmentationData); + segmentationExternalService.setFileResults(path.join(__dirname, '/uploads/test.xml')); + }); + + afterEach(async () => { + await segmentationExternalService.stop(); + + if (await asyncFS.exists(segmentationFolder)) { + await asyncFS.rmdir(segmentationFolder, { recursive: true }); + } + }); + it('should store the segmentation', async () => { + await segmentPdfs.processResults({ + tenant: tenantOne.name, + params: { filename: 'documentA.pdf' }, + data_url: 'http://localhost:1235/results', + file_url: 'http://localhost:1235/file', + task: 'segmentation', + success: true, + }); + + await tenants.run(async () => { + const segmentations = await SegmentationModel.get(); + const [segmentation] = segmentations; + expect(segmentation.status).toBe('ready'); + expect(segmentation.filename).toBe(fixturesPdfNameA); + expect(segmentation.fileID).toEqual(fixturesOneFile.files![0]._id); + expect(segmentation.autoexpire).toBe(null); + + expect(segmentation.segmentation).toEqual( + expect.objectContaining({ + ...segmentationData, + paragraphs: [expect.objectContaining(segmentationData.paragraphs[0])], + }) + ); + }, tenantOne.name); + }); + + it('should store the xml file', async () => { + await segmentPdfs.processResults({ + tenant: tenantOne.name, + params: { filename: 'documentA.pdf' }, + data_url: 'http://localhost:1235/results', + file_url: 'http://localhost:1235/file', + task: 'segmentation', + success: true, + }); + const fileExists = await asyncFS.exists(path.join(segmentationFolder, 'documentA.xml')); + const fileContents = await asyncFS.readFile( + path.join(segmentationFolder, 'documentA.xml'), + 'utf8' + ); + expect(fileExists).toBe(true); + const xml = 'Cold shrimps soup'; + await expect(fileContents.includes(xml)).toBe(true); + }); + + describe('if the segmentation fails', () => { + it('should store it as failed', async () => { + await segmentPdfs.processResults({ + tenant: tenantOne.name, + params: { filename: 'documentA.pdf' }, + data_url: 'http://localhost:1235/results', + file_url: 'http://localhost:1235/file', + task: 'segmentation', + success: false, + }); + + await tenants.run(async () => { + const segmentations = await SegmentationModel.get(); + const [segmentation] = segmentations; + expect(segmentation.status).toBe('failed'); + expect(segmentation.filename).toBe(fixturesPdfNameA); + expect(segmentation.fileID).toEqual(fixturesOneFile.files![0]._id); + expect(segmentation.autoexpire).toBe(null); + expect(segmentations.length).toBe(1); + }, tenantOne.name); + }); + }); + }); +}); diff --git a/app/api/services/pdfsegmentation/specs/fixtures.ts b/app/api/services/pdfsegmentation/specs/fixtures.ts new file mode 100644 index 0000000000..7b3e1787fe --- /dev/null +++ b/app/api/services/pdfsegmentation/specs/fixtures.ts @@ -0,0 +1,124 @@ +import db, { DBFixture } from 'api/utils/testing_db'; +import { getFixturesFactory } from 'api/utils/fixturesFactory'; + +const factory = getFixturesFactory(); + +const settings = [ + { + features: { + metadataExtraction: [ + { + template: factory.id('templateToSegmentA'), + properties: ['property1', 'property2'], + }, + { + template: factory.id('templateToSegmentB'), + properties: ['property1'], + }, + ], + segmentation: { + url: 'http://localhost:1234/files', + }, + }, + }, +]; + +const otherSettings = [ + { + _id: db.id(), + features: { + metadataExtraction: [ + { + template: factory.id('templateToSegmentB'), + properties: ['property1'], + }, + ], + segmentation: { + url: 'http://localhost:1234/files', + }, + }, + }, +]; + +const fixturesPdfNameA = 'documentA.pdf'; +const fixturesPdfNameB = 'documentB.pdf'; + +const fixturesOneFile: DBFixture = { + entities: [factory.entity('A1', 'templateToSegmentA')], + settings, + files: [factory.file('F1', 'A1', 'document', fixturesPdfNameA)], +}; + +const fixturesOtherFile: DBFixture = { + entities: [factory.entity('A2', 'templateToSegmentB')], + settings: otherSettings, + files: [factory.file('F2', 'A2', 'document', fixturesPdfNameB)], +}; + +const fixturesMissingPdf: DBFixture = { + entities: [factory.entity('A1', 'templateToSegmentA')], + settings, + files: [factory.file('F1', 'A1', 'document', 'missing.pdf')], +}; + +const fixturesFiveFiles: DBFixture = { + settings, + entities: [ + factory.entity('A1', 'templateToSegmentA'), + factory.entity('A2', 'templateToSegmentA'), + factory.entity('A3', 'templateToSegmentA'), + factory.entity('A4', 'templateToSegmentA'), + factory.entity('A5', 'templateToSegmentA'), + ], + files: [ + factory.file('F1', 'A1', 'document', fixturesPdfNameA), + factory.file('F2', 'A2', 'document', fixturesPdfNameA), + factory.file('F3', 'A3', 'document', fixturesPdfNameA), + factory.file('F4', 'A4', 'document', fixturesPdfNameA), + factory.file('F5', 'A5', 'document', fixturesPdfNameA), + ], +}; + +const fixturesTwelveFiles: DBFixture = { + settings, + entities: [ + factory.entity('A1', 'templateToSegmentA'), + factory.entity('A2', 'templateToSegmentA'), + factory.entity('A3', 'templateToSegmentA'), + factory.entity('A4', 'templateToSegmentA'), + factory.entity('A5', 'templateToSegmentA'), + factory.entity('A6', 'templateToSegmentA'), + factory.entity('A7', 'templateToSegmentA'), + factory.entity('A8', 'templateToSegmentA'), + factory.entity('A9', 'templateToSegmentA'), + factory.entity('A10', 'templateToSegmentA'), + factory.entity('A11', 'templateToSegmentA'), + factory.entity('A12', 'templateToSegmentA'), + factory.entity('A13', 'templateToSegmentA'), + factory.entity('A14', 'templateToSegmentA'), + ], + files: [ + factory.file('F1', 'A1', 'document', fixturesPdfNameA), + factory.file('F2', 'A2', 'document', fixturesPdfNameA), + factory.file('F3', 'A3', 'document', fixturesPdfNameA), + factory.file('F4', 'A4', 'document', fixturesPdfNameA), + factory.file('F5', 'A5', 'document', fixturesPdfNameA), + factory.file('F6', 'A6', 'document', fixturesPdfNameA), + factory.file('F7', 'A7', 'document', fixturesPdfNameA), + factory.file('F8', 'A8', 'document', fixturesPdfNameA), + factory.file('F9', 'A9', 'document', fixturesPdfNameA), + factory.file('F10', 'A10', 'document', fixturesPdfNameA), + factory.file('F11', 'A11', 'document', fixturesPdfNameA), + factory.file('F12', 'A12', 'document', fixturesPdfNameA), + ], +}; + +export { + fixturesPdfNameA, + fixturesPdfNameB, + fixturesOneFile, + fixturesOtherFile, + fixturesTwelveFiles, + fixturesFiveFiles, + fixturesMissingPdf, +}; diff --git a/app/api/services/pdfsegmentation/specs/uploads/documentA.pdf b/app/api/services/pdfsegmentation/specs/uploads/documentA.pdf new file mode 100644 index 0000000000..02f8ffa7aa Binary files /dev/null and b/app/api/services/pdfsegmentation/specs/uploads/documentA.pdf differ diff --git a/app/api/services/pdfsegmentation/specs/uploads/documentB.pdf b/app/api/services/pdfsegmentation/specs/uploads/documentB.pdf new file mode 100644 index 0000000000..02f8ffa7aa Binary files /dev/null and b/app/api/services/pdfsegmentation/specs/uploads/documentB.pdf differ diff --git a/app/api/services/pdfsegmentation/specs/uploads/test.xml b/app/api/services/pdfsegmentation/specs/uploads/test.xml new file mode 100644 index 0000000000..3edbcb9b13 --- /dev/null +++ b/app/api/services/pdfsegmentation/specs/uploads/test.xml @@ -0,0 +1,26 @@ + + + Ceviche + $5.95 + Cold shrimps soup + 450 + + + Fideua + $7.95 + Hot shrimps pasta + 600 + + + Tofu + $8.95 + No one really knows + 1 + + + Cheese Burger + $4.50 + Hot beef sandwich + 950 + + \ No newline at end of file diff --git a/app/api/services/tasksmanager/DistributedLoop.ts b/app/api/services/tasksmanager/DistributedLoop.ts new file mode 100644 index 0000000000..ad9a4ba315 --- /dev/null +++ b/app/api/services/tasksmanager/DistributedLoop.ts @@ -0,0 +1,117 @@ +import Redis from 'redis'; +import Redlock from 'redlock'; +import { handleError } from 'api/utils/handleError'; + +export class DistributedLoop { + private lockName: string; + + private task: () => void; + + private redlock: Redlock | undefined; + + private stopTask: Function | undefined; + + private redisClient: Redis.RedisClient | undefined; + + private maxLockTime: number; + + private delayTimeBetweenTasks: number; + + private retryDelay: number; + + private port: number; + + private host: string; + + constructor( + lockName: string, + task: () => void, + options: { + maxLockTime?: number; + delayTimeBetweenTasks?: number; + retryDelay?: number; + port?: number; + } + ) { + const _options = { + maxLockTime: 2000, + delayTimeBetweenTasks: 1000, + retryDelay: 200, + port: 6379, + host: 'localhost', + ...options, + }; + this.maxLockTime = _options.maxLockTime; + this.retryDelay = _options.retryDelay; + this.delayTimeBetweenTasks = _options.delayTimeBetweenTasks; + this.lockName = `locks:${lockName}`; + this.task = task; + this.port = _options.port; + this.host = _options.host; + } + + async start() { + this.redisClient = await Redis.createClient(`redis://${this.host}:${this.port}`); + this.redlock = await new Redlock([this.redisClient], { + retryJitter: 0, + retryDelay: this.retryDelay, + }); + this.redisClient.on('error', error => { + if (error.code !== 'ECONNREFUSED') { + throw error; + } + }); + + // eslint-disable-next-line no-void + void this.lockTask(); + } + + async waitBetweenTasks(delay = this.delayTimeBetweenTasks) { + await new Promise(resolve => { + setTimeout(resolve, delay); + }); + } + + async runTask() { + try { + await this.task(); + } catch (error) { + handleError(error, { useContext: false }); + } + + await this.waitBetweenTasks(); + } + + async stop() { + await new Promise(resolve => { + this.stopTask = resolve; + }); + + await this.redlock?.quit(); + await this.redisClient?.end(true); + } + + async lockTask(): Promise { + try { + const lock = await this.redlock!.lock( + this.lockName, + this.maxLockTime + this.delayTimeBetweenTasks + ); + + if (this.stopTask) { + this.stopTask(); + return; + } + + await this.runTask(); + await lock.unlock(); + } catch (error) { + if (error instanceof Error && error.name !== 'LockError') { + throw error; + } + } + + // eslint-disable-next-line no-void + void this.lockTask(); + } +} diff --git a/app/api/services/tasksmanager/RedisServer.ts b/app/api/services/tasksmanager/RedisServer.ts new file mode 100644 index 0000000000..a651f2419e --- /dev/null +++ b/app/api/services/tasksmanager/RedisServer.ts @@ -0,0 +1,31 @@ +import { spawn } from 'child_process'; + +export class RedisServer { + server: any; + + port: number; + + pathToBin: string; + + constructor(port = 6379) { + this.pathToBin = 'redis-bin/redis-stable/src/redis-server'; + this.port = port; + } + + start() { + try { + this.server = spawn(this.pathToBin, ['--port', this.port.toString()]); + } catch (err) { + console.log(err); + } + } + + async stop(): Promise { + return new Promise((resolve, _reject) => { + this.server.on('close', () => { + resolve(); + }); + this.server.kill('SIGINT'); + }); + } +} diff --git a/app/api/services/tasksmanager/TaskManager.ts b/app/api/services/tasksmanager/TaskManager.ts new file mode 100644 index 0000000000..33e12c946a --- /dev/null +++ b/app/api/services/tasksmanager/TaskManager.ts @@ -0,0 +1,148 @@ +/* eslint-disable no-await-in-loop */ +import RedisSMQ, { QueueMessage } from 'rsmq'; +import Redis, { RedisClient } from 'redis'; +import { Repeater } from 'api/utils/Repeater'; +import { config } from 'api/config'; + +export interface TaskMessage { + tenant: string; + task: string; + params?: { + [key: string]: any; + }; +} + +/* eslint-disable camelcase */ +export interface ResultsMessage { + tenant: string; + task: string; + params?: { + [key: string]: any; + }; + data_url?: string; + file_url?: string; + success?: boolean; + error?: string; +} +/* eslint-enable camelcase */ + +export interface Service { + serviceName: string; + processResults?: (results: ResultsMessage) => Promise; + processRessultsMessageHiddenTime?: number; +} + +export class TaskManager { + redisSMQ: RedisSMQ; + + readonly service: Service; + + readonly taskQueue: string; + + readonly resultsQueue: string; + + private repeater: Repeater | undefined; + + redisClient: RedisClient; + + constructor(service: Service) { + this.service = service; + this.taskQueue = `${service.serviceName}_tasks`; + this.resultsQueue = `${service.serviceName}_results`; + const redisUrl = `redis://${config.redis.host}:${config.redis.port}`; + this.redisClient = Redis.createClient(redisUrl); + this.redisSMQ = new RedisSMQ({ client: this.redisClient }); + + this.subscribeToEvents(); + this.subscribeToResults(); + } + + subscribeToEvents() { + this.redisClient.on('error', (error: any | undefined) => { + if (error && error.code !== 'ECONNREFUSED') { + throw error; + } + }); + + this.redisClient.on('connect', () => { + this.redisSMQ.createQueue({ qname: this.taskQueue }, (err: Error | undefined) => { + if (err && err.name !== 'queueExists') { + throw err; + } + }); + this.redisSMQ.createQueue({ qname: this.resultsQueue }, (err: Error | undefined) => { + if (err && err.name !== 'queueExists') { + throw err; + } + }); + }); + } + + async clearQueue() { + while ((await this.countPendingTasks()) > 0) { + const message = (await this.redisSMQ.receiveMessageAsync({ + qname: this.taskQueue, + })) as QueueMessage; + + if (!message.id) { + break; + } + + await this.redisSMQ.deleteMessageAsync({ + qname: this.taskQueue, + id: message.id, + }); + } + } + + async countPendingTasks(): Promise { + const queueAttributes = await this.redisSMQ!.getQueueAttributesAsync({ + qname: this.taskQueue, + }); + return queueAttributes.msgs; + } + + private subscribeToResults(): void { + this.repeater = new Repeater(this.checkForResults.bind(this), 500); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.repeater.start(); + } + + private async checkForResults() { + if (!this.redisClient?.connected) { + return; + } + + const message = (await this.redisSMQ.receiveMessageAsync({ + qname: this.resultsQueue, + vt: this.service.processRessultsMessageHiddenTime, + })) as QueueMessage; + + if (message.id && this.service.processResults) { + const processedMessage = JSON.parse(message.message); + + await this.service.processResults(processedMessage); + + await this.redisSMQ.deleteMessageAsync({ + qname: this.resultsQueue, + id: message.id, + }); + } + } + + async startTask(taskMessage: TaskMessage) { + if (!this.redisClient.connected) { + throw new Error('Redis is not connected'); + } + + return this.redisSMQ.sendMessageAsync({ + qname: this.taskQueue, + message: JSON.stringify(taskMessage), + }); + } + + async stop() { + await this.repeater!.stop(); + await this.redisClient.end(true); + } +} diff --git a/app/api/services/tasksmanager/specs/ExternalDummyService.ts b/app/api/services/tasksmanager/specs/ExternalDummyService.ts new file mode 100644 index 0000000000..94186a619a --- /dev/null +++ b/app/api/services/tasksmanager/specs/ExternalDummyService.ts @@ -0,0 +1,181 @@ +/* eslint-disable camelcase */ +import express from 'express'; +import RedisSMQ, { QueueMessage } from 'rsmq'; +import Redis, { RedisClient } from 'redis'; +import { Server } from 'http'; +import bodyParser from 'body-parser'; +import { uploadMiddleware } from 'api/files'; +import { ResultsMessage } from '../TaskManager'; + +export class ExternalDummyService { + private app: express.Application; + + private readonly port: number; + + private redisSMQ: RedisSMQ | undefined; + + private server: Server | undefined; + + currentTask: string | undefined; + + materials: string[] = []; + + files: Buffer[] = []; + + filesNames: String[] = []; + + results: object | undefined; + + redisClient: RedisClient | undefined; + + fileResults: string | undefined; + + private readonly serviceName: string; + + constructor(port = 1234, serviceName = 'dummy') { + this.port = port; + this.serviceName = serviceName; + this.app = express(); + this.app.use(bodyParser.json()); + + this.app.post('/data', (req, res) => { + this.materials.push(req.body); + res.send('ok'); + }); + + this.app.post('/files/*', uploadMiddleware.multiple(), (req, res) => { + if (req.files.length) { + const files = req.files as { buffer: Buffer; originalname: string }[]; + this.files.push(files[0].buffer); + this.filesNames.push(files[0].originalname); + } + res.send('received'); + }); + + this.app.get('/results', (_req, res) => { + res.json(JSON.stringify(this.results)); + }); + + this.app.get('/file', (_req, res) => { + if (!this.fileResults) { + res.status(404).send('Not found'); + return; + } + + res.sendFile(this.fileResults); + }); + } + + setResults(results: object) { + this.results = results; + } + + setFileResults(file: string) { + this.fileResults = file; + } + + get rsmq() { + if (!this.redisSMQ) { + throw new Error('rsmq is not initialized'); + } + return this.redisSMQ; + } + + async deleteQueue(qname: string) { + try { + await this.rsmq.deleteQueueAsync({ qname }); + } catch (err) { + if (err instanceof Error && err.name !== 'queueNotFound') { + throw err; + } + } + } + + async createQueue(qname: string) { + try { + await this.rsmq.createQueueAsync({ qname }); + } catch (err) { + if (err instanceof Error && err.name !== 'queueExists') { + throw err; + } + } + } + + async resetQueue() { + await this.deleteQueue(`${this.serviceName}_tasks`); + await this.deleteQueue(`${this.serviceName}_results`); + + await this.createQueue(`${this.serviceName}_tasks`); + await this.createQueue(`${this.serviceName}_results`); + } + + async readFirstTaskMessage() { + const message: RedisSMQ.QueueMessage | {} = await this.rsmq.receiveMessageAsync({ + qname: `${this.serviceName}_tasks`, + }); + const queueMessage = message as QueueMessage; + + if (!queueMessage.id) { + return undefined; + } + + await this.rsmq.deleteMessageAsync({ + qname: `${this.serviceName}_tasks`, + id: queueMessage.id, + }); + + return queueMessage?.message; + } + + async readAllTaskMessages() { + const messages: string[] = []; + while (true) { + // eslint-disable-next-line no-await-in-loop + const message = await this.readFirstTaskMessage(); + if (!message) { + break; + } + messages.push(message); + } + + return messages; + } + + async start(redisUrl?: string) { + if (redisUrl) { + this.redisClient = await Redis.createClient(redisUrl); + + this.redisSMQ = await new RedisSMQ({ client: this.redisClient }); + await this.resetQueue(); + } + + const start = new Promise(resolve => { + this.server = this.app.listen(this.port, () => { + resolve(); + }); + }); + + return start; + } + + async stop() { + await this.redisClient?.end(true); + await this.server?.close(); + } + + async sendFinishedMessage(task: ResultsMessage) { + try { + await this.rsmq.sendMessageAsync({ + qname: `${this.serviceName}_results`, + message: JSON.stringify(task), + }); + } catch (err) { + console.log(err); + } + } + + reset() { + this.files = []; + this.filesNames = []; + } +} diff --git a/app/api/services/tasksmanager/specs/blank.pdf b/app/api/services/tasksmanager/specs/blank.pdf new file mode 100644 index 0000000000..02f8ffa7aa Binary files /dev/null and b/app/api/services/tasksmanager/specs/blank.pdf differ diff --git a/app/api/services/tasksmanager/specs/distributedLoop.spec.js b/app/api/services/tasksmanager/specs/distributedLoop.spec.js new file mode 100644 index 0000000000..5772294728 --- /dev/null +++ b/app/api/services/tasksmanager/specs/distributedLoop.spec.js @@ -0,0 +1,234 @@ +import * as errorHelper from 'api/utils/handleError'; +import waitForExpect from 'wait-for-expect'; +import { DistributedLoop } from '../DistributedLoop'; +import { RedisServer } from '../RedisServer'; + +/* eslint-disable max-statements */ +describe('DistributedLoopLock', () => { + let finishTask; + let task; + let rejectTask; + let redisServer; + let pendingTasks; + + beforeAll(async () => { + redisServer = new RedisServer(); + await redisServer.start(); + }); + + afterAll(async () => { + await redisServer.stop(); + }); + + beforeEach(async () => { + pendingTasks = []; + task = jasmine.createSpy('callbackone').and.callFake( + () => + new Promise((resolve, reject) => { + pendingTasks.push(resolve); + rejectTask = reject; + finishTask = resolve; + }) + ); + }); + + afterEach(async () => { + await pendingTasks.map(pendingTask => pendingTask()); + }); + + async function sleepTime(time) { + await new Promise(resolve => { + setTimeout(resolve, time); + }); + } + + it('should run one task at a time', async () => { + const nodeOne = new DistributedLoop('my_locked_task', task, { delayTimeBetweenTasks: 0 }); + const nodeTwo = new DistributedLoop('my_locked_task', task, { delayTimeBetweenTasks: 0 }); + await nodeOne.start(); + await nodeTwo.start(); + await waitForExpect(async () => { + expect(task).toHaveBeenCalledTimes(1); + }); + finishTask(); + await waitForExpect(async () => { + expect(task).toHaveBeenCalledTimes(2); + }); + finishTask(); + await waitForExpect(async () => { + expect(task).toHaveBeenCalledTimes(3); + }); + finishTask(); + await nodeOne.stop(); + finishTask(); + await nodeTwo.stop(); + }); + + it('should wait until the redis server is available to execute the task', async () => { + await redisServer.stop(); + const nodeOne = new DistributedLoop('my_locked_task', task, { + retryDelay: 20, + delayTimeBetweenTasks: 0, + }); + await nodeOne.start(); + + await sleepTime(50); + + expect(task).toHaveBeenCalledTimes(0); + + await redisServer.start(); + + await waitForExpect(async () => { + expect(task).toHaveBeenCalledTimes(1); + }); + + finishTask(); + + await waitForExpect(async () => { + expect(task).toHaveBeenCalledTimes(2); + }); + + finishTask(); + + await nodeOne.stop(); + }); + + it('should continue executing tasks after redis was unavailable for a while', async () => { + const unstableRedisServer = new RedisServer(6371); + await unstableRedisServer.start(); + const nodeOne = new DistributedLoop('my_locked_task', task, { + retryDelay: 20, + delayTimeBetweenTasks: 0, + port: 6371, + }); + await nodeOne.start(); + + await waitForExpect(async () => { + expect(task).toHaveBeenCalledTimes(1); + }); + + await unstableRedisServer.stop(); + + finishTask(); + + await sleepTime(50); + expect(task).toHaveBeenCalledTimes(1); + + await unstableRedisServer.start(); + + await waitForExpect(async () => { + expect(task).toHaveBeenCalledTimes(2); + }); + + finishTask(); + + await nodeOne.stop(); + await unstableRedisServer.stop(); + }); + + it('should handle when a lock fails for too many retries', async () => { + const nodeOne = new DistributedLoop('my_long_locked_task', task, { + retryDelay: 20, + delayTimeBetweenTasks: 0, + }); + const nodeTwo = new DistributedLoop('my_long_locked_task', task, { + retryDelay: 20, + delayTimeBetweenTasks: 0, + }); + + await nodeOne.start(); + await nodeTwo.start(); + + await sleepTime(250); + + expect(task).toHaveBeenCalledTimes(1); + + finishTask(); + await nodeOne.stop(); + finishTask(); + await nodeTwo.stop(); + }); + + it('should handle when a node fails to unlock the lock', async () => { + const nodeOne = new DistributedLoop('my_locked_task', task, { + maxLockTime: 50, + delayTimeBetweenTasks: 0, + }); + const nodeTwo = new DistributedLoop('my_locked_task', task, { + maxLockTime: 50, + delayTimeBetweenTasks: 0, + }); + + await nodeOne.start(); + await sleepTime(10); + const firstFinishTask = finishTask; + await nodeTwo.start(); + + await waitForExpect(async () => { + expect(task).toHaveBeenCalledTimes(2); + }); + + firstFinishTask(); + await nodeOne.stop(); + finishTask(); + await nodeTwo.stop(); + }); + + it('should continue executing the task if one task fails', async () => { + jest.spyOn(errorHelper, 'handleError').mockImplementation(() => {}); + const nodeOne = new DistributedLoop('my_locked_task', task, { + maxLockTime: 500, + delayTimeBetweenTasks: 0, + }); + + await nodeOne.start(); + + await waitForExpect(async () => { + expect(task).toHaveBeenCalledTimes(1); + }); + + const someError = { error: 'some error' }; + rejectTask(someError); + await waitForExpect(async () => { + expect(errorHelper.handleError).toHaveBeenLastCalledWith(someError, { useContext: false }); + }); + + finishTask(); + await waitForExpect(async () => { + expect(task).toHaveBeenCalledTimes(2); + }); + finishTask(); + await nodeOne.stop(); + }); + + // eslint-disable-next-line max-statements + it('should add a delay between task executions', async () => { + const nodeOne = new DistributedLoop('my_locked_task', task, { + maxLockTime: 50, + delayTimeBetweenTasks: 50, + retryDelay: 20, + }); + const nodeTwo = new DistributedLoop('my_locked_task', task, { + maxLockTime: 50, + delayTimeBetweenTasks: 50, + retryDelay: 20, + }); + + await nodeOne.start(); + await nodeTwo.start(); + + await waitForExpect(async () => { + expect(task).toHaveBeenCalledTimes(1); + }); + + finishTask(); + await sleepTime(25); + expect(task).toHaveBeenCalledTimes(1); + + finishTask(); + await nodeOne.stop(); + + finishTask(); + await nodeTwo.stop(); + }); +}); diff --git a/app/api/services/tasksmanager/specs/taskManager.spec.ts b/app/api/services/tasksmanager/specs/taskManager.spec.ts new file mode 100644 index 0000000000..81829238d6 --- /dev/null +++ b/app/api/services/tasksmanager/specs/taskManager.spec.ts @@ -0,0 +1,195 @@ +/* eslint-disable max-statements */ +import waitForExpect from 'wait-for-expect'; +import { TaskManager, Service } from 'api/services/tasksmanager/TaskManager'; +import { RedisServer } from '../RedisServer'; +import { ExternalDummyService } from './ExternalDummyService'; +import { config } from 'api/config'; + +describe('taskManager', () => { + let taskManager: TaskManager | undefined; + + let service: Service; + let redisServer: RedisServer; + let externalDummyService: ExternalDummyService; + + beforeAll(async () => { + const port = 6378; + config.redis.port = port; + + const redisUrl = `redis://${config.redis.host}:${config.redis.port}`; + service = { + serviceName: 'KonzNGaboHellKitchen', + processResults: jest.fn(), + processRessultsMessageHiddenTime: 1, + }; + redisServer = new RedisServer(port); + await redisServer.start(); + + externalDummyService = new ExternalDummyService(1234, service.serviceName); + await externalDummyService.start(redisUrl); + + taskManager = new TaskManager(service); + + await new Promise(resolve => setTimeout(resolve, 100)); // wait for redis to be ready + }); + + afterAll(async () => { + await taskManager?.stop(); + await externalDummyService.stop(); + await redisServer.stop(); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + describe('startTask', () => { + it('should add a task', async () => { + await taskManager?.startTask({ + task: 'CheeseBurger', + tenant: 'Rafa', + }); + const message = await externalDummyService.readFirstTaskMessage(); + expect(message).toBe('{"task":"CheeseBurger","tenant":"Rafa"}'); + }); + + describe('when multiple tasks are added', () => { + it('services get them in order', async () => { + await taskManager?.startTask({ + task: 'CheeseBurger', + tenant: 'Joan', + }); + + await taskManager?.startTask({ + task: 'Fries', + tenant: 'Joan', + }); + + await taskManager?.startTask({ + task: 'Ribs', + tenant: 'Fede', + }); + + let message = await externalDummyService.readFirstTaskMessage(); + expect(message).toBe('{"task":"CheeseBurger","tenant":"Joan"}'); + + message = await externalDummyService.readFirstTaskMessage(); + expect(message).toBe('{"task":"Fries","tenant":"Joan"}'); + + message = await externalDummyService.readFirstTaskMessage(); + expect(message).toBe('{"task":"Ribs","tenant":"Fede"}'); + }); + }); + }); + + describe('count tasks', () => { + it('should count the pending tasks', async () => { + await taskManager?.startTask({ + task: 'CheeseBurger', + tenant: 'Rafa', + }); + + await taskManager?.startTask({ + task: 'Fries', + tenant: 'Joan', + }); + + await taskManager?.startTask({ + task: 'Ribs', + tenant: 'Fede', + }); + + const pendingTasks = await taskManager?.countPendingTasks(); + expect(pendingTasks).toBe(3); + }); + }); + + describe('when the task finishes', () => { + it('should call process results once and delete the result message', async () => { + const task = { + task: 'Tofu', + tenant: 'Gabo', + data_url: 'http://localhost:1234/results', + }; + + await externalDummyService.sendFinishedMessage(task); + + await waitForExpect(async () => { + expect(service.processResults).toHaveBeenCalledWith(task); + }); + + const queueAttributes = await taskManager?.redisSMQ!.getQueueAttributesAsync({ + qname: taskManager.resultsQueue, + }); + + expect(queueAttributes!.msgs).toBe(0); + }); + }); + + describe('when redis server is not available', () => { + it('taskManager should fail to start task', async () => { + await redisServer.stop(); + const task = { task: 'Spagueti', tenant: 'Konz' }; + + try { + await taskManager?.startTask(task); + fail('It should throw'); + } catch (e) { + expect(e).toEqual(Error('Redis is not connected')); + } + + await redisServer.start(); + }); + + describe('and redis comes back', () => { + it('should send tasks again', async () => { + await externalDummyService.resetQueue(); + await redisServer.stop(); + + const task = { task: 'Ceviche', tenant: 'Mercy' }; + + try { + await taskManager?.startTask(task); + fail('It should throw'); + } catch (e) { + expect(e).toEqual(Error('Redis is not connected')); + } + + await redisServer.start(); + + await waitForExpect(async () => { + expect(taskManager?.redisClient.connected).toBe(true); + }); + await taskManager?.startTask(task); + + const message = await externalDummyService.readFirstTaskMessage(); + expect(message).toBe('{"task":"Ceviche","tenant":"Mercy"}'); + }); + + it('should read pending messages', async () => { + await taskManager?.stop(); + const task = { + task: 'Ceviche', + tenant: 'Mercy', + results_url: 'http://localhost:1234/results', + }; + externalDummyService.setResults({ + results: 'Ceviche', + }); + + await externalDummyService.sendFinishedMessage(task); + + await redisServer.stop(); + + taskManager = new TaskManager(service); + expect(service.processResults).not.toHaveBeenCalled(); + + await redisServer.start(); + + await waitForExpect(async () => { + expect(service.processResults).toHaveBeenCalledWith(task); + }); + }); + }); + }); +}); diff --git a/app/api/utils/Repeater.js b/app/api/utils/Repeater.js index bb91f06df9..379535ca68 100644 --- a/app/api/utils/Repeater.js +++ b/app/api/utils/Repeater.js @@ -3,18 +3,27 @@ const timeout = async interval => setTimeout(resolve, interval); }); -export default { - stopped: false, +export class Repeater { + constructor(cb, interval) { + this.cb = cb; + this.interval = interval; + this.stopped = null; + } - async start(cb, interval) { - if (!this.stopped) { - await cb(); - await timeout(interval); - await this.start(cb, interval); + async start() { + if (this.stopped) { + this.stopped(); + return; } - }, - stop() { - this.stopped = true; - }, -}; + await this.cb(); + await timeout(this.interval); + await this.start(this.cb, this.interval); + } + + async stop() { + return new Promise(resolve => { + this.stopped = resolve; + }); + } +} diff --git a/app/api/utils/async-fs.js b/app/api/utils/async-fs.js index 396fedac43..86cd045d9d 100644 --- a/app/api/utils/async-fs.js +++ b/app/api/utils/async-fs.js @@ -12,4 +12,5 @@ export default { readFile: promisify(fs.readFile), readdir: promisify(fs.readdir), mkdir: promisify(fs.mkdir), + rmdir: promisify(fs.rmdir), }; diff --git a/app/api/utils/downloadRedis.js b/app/api/utils/downloadRedis.js new file mode 100644 index 0000000000..da9bdea3cd --- /dev/null +++ b/app/api/utils/downloadRedis.js @@ -0,0 +1,31 @@ +/* eslint-disable no-console */ +import { execSync } from 'child_process'; +import path from 'path'; +import fs from 'fs'; + +export const downloadRedis = () => { + const pathToBin = path.join(__dirname, '../../../redis-bin/redis-stable/src/redis-server'); + console.log(pathToBin); + if (fs.existsSync(pathToBin)) { + return; + } + console.log('Downloading redis...'); + execSync( + `mkdir redis-bin && cd redis-bin + curl -O http://download.redis.io/redis-stable.tar.gz + tar xzvf redis-stable.tar.gz`, + { stdio: 'ignore' } + ); + + execSync('cd redis-bin && tar xzvf redis-stable.tar.gz', { stdio: 'ignore' }); + console.log('Downloading redis... Done'); + console.log('Installing redis...'); + execSync( + `cd redis-bin && + cd redis-stable && + make distclean && + make`, + { stdio: 'ignore' } + ); + console.log('Installing redis... Done'); +}; diff --git a/app/api/utils/fixturesFactory.ts b/app/api/utils/fixturesFactory.ts index a2b8a04f73..e16cc093c7 100644 --- a/app/api/utils/fixturesFactory.ts +++ b/app/api/utils/fixturesFactory.ts @@ -2,6 +2,7 @@ import { ObjectId } from 'mongodb'; import db from 'api/utils/testing_db'; import { EntitySchema } from 'shared/types/entityType'; import { PropertySchema, MetadataSchema } from 'shared/types/commonTypes'; +import { FileType } from 'shared/types/fileType'; export function getIdMapper() { const map = new Map(); @@ -43,6 +44,20 @@ export function getFixturesFactory() { }; }, + file: ( + id: string, + entity: string, + type: 'custom' | 'document' | 'thumbnail' | 'attachment' | undefined, + filename: string, + language: string = 'en' + ): FileType => ({ + _id: idMapper(`${id}`), + entity, + language, + type, + filename, + }), + inherit(name: string, content: string, property: string, props = {}): PropertySchema { return this.relationshipProp(name, content, { inherit: { property: idMapper(property).toString() }, diff --git a/app/api/utils/handleError.js b/app/api/utils/handleError.js index c019113fcf..e2d6997c4a 100644 --- a/app/api/utils/handleError.js +++ b/app/api/utils/handleError.js @@ -132,7 +132,7 @@ function simplifyError(result, error) { return simplifiedError; } -const handleError = (_error, { req = undefined, uncaught = false } = {}) => { +const handleError = (_error, { req = undefined, uncaught = false, useContext = true } = {}) => { const errorData = typeof _error === 'string' ? createError(_error, 500) : _error; const error = errorData || new Error('Unexpected error has occurred'); @@ -141,7 +141,9 @@ const handleError = (_error, { req = undefined, uncaught = false } = {}) => { } const result = prettifyError(error, { req, uncaught }); - result.requestId = appContext.get('requestId'); + if (useContext) { + result.requestId = appContext.get('requestId'); + } sendLog(result, error, {}); diff --git a/app/api/utils/specs/Repeater.spec.js b/app/api/utils/specs/Repeater.spec.js index 745ea67b9c..83e846f9ed 100644 --- a/app/api/utils/specs/Repeater.spec.js +++ b/app/api/utils/specs/Repeater.spec.js @@ -1,31 +1,43 @@ -import repeater from '../Repeater'; +import { Repeater } from '../Repeater'; -describe('repeat', () => { - let callback; - let counter = 0; - const stopOn = 15; +describe('Repeater', () => { + let callbackOne; + let callbackTwo; + + let repeaterOne; + let repeaterTwo; + + // one does not simply test timeouts + function advanceTime(time) { + jest.advanceTimersByTime(time); + return new Promise(resolve => setImmediate(resolve)); + } + + afterEach(() => { + jest.useRealTimers(); + }); beforeEach(() => { - counter = 1; - callback = jasmine.createSpy('callback').and.callFake( - () => - new Promise(resolve => { - setTimeout(() => { - if (counter === stopOn) { - resolve(); - repeater.stop(); - } else { - counter += 1; - resolve(); - } - }, 1); - }) - ); + jest.useFakeTimers(); + + callbackOne = jasmine.createSpy('callbackone').and.callFake(() => Promise.resolve()); + callbackTwo = jasmine.createSpy('callbackone').and.callFake(() => Promise.resolve()); }); - it('should repeat callback call when callback finishes', async () => { - await repeater.start(callback, 0); - expect(callback).toHaveBeenCalledTimes(stopOn); - expect(counter).toBe(stopOn); + it('should be able to have two independant repeaters', async () => { + repeaterOne = new Repeater(callbackOne, 1); + repeaterTwo = new Repeater(callbackTwo, 1); + + repeaterTwo.start(); + repeaterOne.start(); + + await advanceTime(1); + + repeaterOne.stop(); + + await advanceTime(1); + + expect(callbackOne).toHaveBeenCalledTimes(1); + expect(callbackTwo).toHaveBeenCalledTimes(2); }); }); diff --git a/app/api/utils/testing_db.ts b/app/api/utils/testing_db.ts index e7e724baae..d217be91ca 100644 --- a/app/api/utils/testing_db.ts +++ b/app/api/utils/testing_db.ts @@ -140,7 +140,7 @@ const testingDB: { }, }; -export { testingDB }; +export { testingDB, fixturer }; // deprecated, for backward compatibility export default testingDB; diff --git a/app/jestServerGlobalSetup.js b/app/jestServerGlobalSetup.js index 25c31c8eb7..c8d7efb186 100644 --- a/app/jestServerGlobalSetup.js +++ b/app/jestServerGlobalSetup.js @@ -1,6 +1,8 @@ import { createMongoInstance as checkMongoVersion } from './api/utils/createMongoInstance.js'; +import { downloadRedis } from './api/utils/downloadRedis.js'; module.exports = async () => { const mongod = await checkMongoVersion(); await mongod.stop(); + downloadRedis(); }; diff --git a/app/react/MetadataExtraction/MetadataExtractionDashboard.tsx b/app/react/MetadataExtraction/MetadataExtractionDashboard.tsx index 1ec29aecdf..c28a773501 100644 --- a/app/react/MetadataExtraction/MetadataExtractionDashboard.tsx +++ b/app/react/MetadataExtraction/MetadataExtractionDashboard.tsx @@ -28,7 +28,7 @@ export interface MetadataExtractionDashboardStateTypes { function mapStateToProps({ settings, templates }: any) { return { - extractionSettings: settings.collection.get('features')?.get('metadata-extraction'), + extractionSettings: settings.collection.get('features')?.get('metadataExtraction'), templates, }; } @@ -52,8 +52,8 @@ class MetadataExtractionDashboard extends React.Component< const formatted: FormattedSettingsData = {}; this.props.extractionSettings.forEach(setting => { - const template = setting.has('id') - ? this.props.templates.find(temp => temp?.get('_id') === setting.get('id')) + const template = setting.has('template') + ? this.props.templates.find(temp => temp?.get('_id') === setting.get('template')) : this.props.templates.find(temp => temp?.get('name') === setting.get('name')); if (!template) { diff --git a/app/react/MetadataExtraction/specs/MetadataExtractionDashboard.spec.js b/app/react/MetadataExtraction/specs/MetadataExtractionDashboard.spec.js index 81afee4d49..a27aebabd0 100644 --- a/app/react/MetadataExtraction/specs/MetadataExtractionDashboard.spec.js +++ b/app/react/MetadataExtraction/specs/MetadataExtractionDashboard.spec.js @@ -35,17 +35,17 @@ const templates = Immutable.fromJS([ const settings = { collection: Immutable.fromJS({ features: { - 'metadata-extraction': [ + metadataExtraction: [ { - id: factory.id('templateA'), + template: factory.id('templateA'), properties: ['AonlyText', 'ABsharedDate', 'ACsharedMarkdown', 'ABC shared Number'], }, { - id: factory.id('templateB'), + template: factory.id('templateB'), properties: ['BonlyText', 'ABsharedDate', 'BCsharedMarkdown', 'ABC shared number'], }, { - id: factory.id('templateC'), + template: factory.id('templateC'), properties: ['ConlyText', 'ACsharedMarkdown', 'BCsharedMarkdown', 'abc shared number'], }, ], diff --git a/app/server.js b/app/server.js index 8b35e7cf5e..d19f834ae9 100644 --- a/app/server.js +++ b/app/server.js @@ -10,6 +10,8 @@ import mongoose from 'mongoose'; import path from 'path'; import { TaskProvider } from 'shared/tasks/tasks'; +import { PDFSegmentation } from 'api/services/pdfsegmentation/PDFSegmentation'; +import { DistributedLoop } from 'api/services/tasksmanager/DistributedLoop'; import { appContextMiddleware } from 'api/utils/appContextMiddleware'; import { requestIdMiddleware } from 'api/utils/requestIdMiddleware'; @@ -24,8 +26,8 @@ import { migrator } from './api/migrations/migrator'; import settings from './api/settings'; import syncWorker from './api/sync/syncWorker'; import errorHandlingMiddleware from './api/utils/error_handling_middleware'; -import { handleError } from './api/utils'; -import repeater from './api/utils/Repeater'; +import { handleError } from './api/utils/handleError.js'; +import { Repeater } from './api/utils/Repeater'; import serverRenderingRoutes from './react/server.js'; import { DB } from './api/odm'; import { tenants } from './api/tenants/tenantContext'; @@ -126,19 +128,21 @@ DB.connect(config.DBHOST, dbAuth).then(async () => { const { evidencesVault, features } = await settings.get(); if (evidencesVault && evidencesVault.token && evidencesVault.template) { console.info('==> 📥 evidences vault config detected, started sync ....'); - repeater.start( + const vaultSyncRepeater = new Repeater( () => vaultSync.sync(evidencesVault.token, evidencesVault.template), 10000 ); + vaultSyncRepeater.start(); } if (features && features.tocGeneration && features.tocGeneration.url) { console.info('==> 🗂️ automatically generating TOCs using external service'); const service = tocService(features.tocGeneration.url); - repeater.start(() => service.processNext(), 10000); + const tocServiceRepeater = new Repeater(() => service.processNext(), 10000); + tocServiceRepeater.start(); } - repeater.start( + const topicClassificationRepeater = new Repeater( () => TaskProvider.runAndWait('TopicClassificationSync', 'TopicClassificationSync', { mode: 'onlynew', @@ -147,6 +151,18 @@ DB.connect(config.DBHOST, dbAuth).then(async () => { }), 10000 ); + topicClassificationRepeater.start(); + + if (config.externalServices) { + const segmentationConnector = new PDFSegmentation(); + const segmentationRepeater = new DistributedLoop( + 'segmentation_repeat', + segmentationConnector.segmentPdfs, + { port: config.redis.port, host: config.redis.host, delayTimeBetweenTasks: 2000 } + ); + + segmentationRepeater.start(); + } } }); diff --git a/app/shared/JSONRequest.js b/app/shared/JSONRequest.js index 66da4d5196..1511dd0697 100644 --- a/app/shared/JSONRequest.js +++ b/app/shared/JSONRequest.js @@ -139,7 +139,7 @@ export default { head: (url, data, headers) => _fetch(url, data, 'HEAD', headers), - // TEST!!!! Fully untested function + // TEST!!!!! Fully untested function uploadFile: (url, filename, file, _cookie) => new Promise((resolve, reject) => { superagent diff --git a/app/shared/types/segmentationSchema.ts b/app/shared/types/segmentationSchema.ts new file mode 100644 index 0000000000..6e0f14e28d --- /dev/null +++ b/app/shared/types/segmentationSchema.ts @@ -0,0 +1,52 @@ +import Ajv from 'ajv'; +import { objectIdSchema } from 'shared/types/commonSchemas'; +import { wrapValidator } from 'shared/tsUtils'; +import { SegmentationType } from './segmentationType'; + +export const emitSchemaTypes = true; + +const ajv = Ajv({ allErrors: true, removeAdditional: true }); + +export const segmentationSchema = { + $schema: 'http://json-schema.org/schema#', + $async: true, + type: 'object', + additionalProperties: false, + title: 'SegmentationType', + definitions: { objectIdSchema }, + properties: { + _id: objectIdSchema, + autoexpire: { oneOf: [{ type: 'number' }, { type: 'null' }] }, + fileID: objectIdSchema, + filename: { type: 'string', minLength: 1 }, + status: { type: 'string', enum: ['processing', 'failed', 'ready'] }, + segmentation: { + type: 'object', + additionalProperties: false, + properties: { + page_width: { type: 'number' }, + page_height: { type: 'number' }, + paragraphs: { + type: 'array', + items: { + type: 'object', + additionalProperties: false, + properties: { + left: { type: 'number' }, + top: { type: 'number' }, + width: { type: 'number' }, + height: { type: 'number' }, + page_number: { type: 'number' }, + text: { type: 'string' }, + }, + }, + }, + }, + }, + }, +}; + +const validate = wrapValidator(ajv.compile(segmentationSchema)); + +export const validateFile = async (file: SegmentationType): Promise => + validate({ ...file }); diff --git a/app/shared/types/segmentationType.d.ts b/app/shared/types/segmentationType.d.ts new file mode 100644 index 0000000000..7c4be57d77 --- /dev/null +++ b/app/shared/types/segmentationType.d.ts @@ -0,0 +1,24 @@ +/* eslint-disable */ +/**AUTO-GENERATED. RUN yarn emit-types to update.*/ + +import { ObjectIdSchema } from 'shared/types/commonTypes'; + +export interface SegmentationType { + _id?: ObjectIdSchema; + autoexpire?: number | null; + fileID?: ObjectIdSchema; + filename?: string; + status?: 'processing' | 'failed' | 'ready'; + segmentation?: { + page_width?: number; + page_height?: number; + paragraphs?: { + left?: number; + top?: number; + width?: number; + height?: number; + page_number?: number; + text?: string; + }[]; + }; +} diff --git a/app/shared/types/settingsSchema.ts b/app/shared/types/settingsSchema.ts index 400e1f5f08..5bbbbeca92 100644 --- a/app/shared/types/settingsSchema.ts +++ b/app/shared/types/settingsSchema.ts @@ -148,6 +148,26 @@ export const settingsSchema = { }, topicClassification: { type: 'boolean' }, favorites: { type: 'boolean' }, + segmentation: { + type: 'object', + additionalProperties: false, + required: ['url'], + properties: { + url: { type: 'string' }, + }, + }, + metadataExtraction: { + type: 'array', + items: { + type: 'object', + additionalProperties: false, + required: ['template', 'properties'], + properties: { + template: objectIdSchema, + properties: { type: 'array', items: { type: 'string' } }, + }, + }, + }, }, }, mapStartingPoint: geolocationSchema, diff --git a/app/shared/types/settingsType.d.ts b/app/shared/types/settingsType.d.ts index b957454f45..6d4b460f8a 100644 --- a/app/shared/types/settingsType.d.ts +++ b/app/shared/types/settingsType.d.ts @@ -94,6 +94,13 @@ export interface Settings { }; topicClassification?: boolean; favorites?: boolean; + segmentation?: { + url: string; + }; + metadataExtraction?: { + template: ObjectIdSchema; + properties: string[]; + }[]; [k: string]: unknown | undefined; }; mapStartingPoint?: { diff --git a/package.json b/package.json index 62d1cd3aa9..2d48c07483 100644 --- a/package.json +++ b/package.json @@ -80,6 +80,9 @@ "@types/lodash": "^4.14.170", "@types/react-dropzone": "^4.2.2", "@types/react-modal": "^3.12.0", + "@types/redis": "^2.8.31", + "@types/redlock": "^4.0.2", + "@types/url-join": "^4.0.1", "ajv": "^6.12.3", "ajv-keywords": "^3.4.1", "async": "2.6.3", @@ -160,12 +163,15 @@ "react-text-selection-handler": "0.1.0", "react-widgets": "v4.5.0", "recharts": "1.3.6", - "redis": "^3.0.2", + "redis": "^3.1.2", + "redis-mock": "^0.56.3", + "redlock": "^4.2.0", "redux": "^3.7.2", "redux-devtools-extension": "^2.13.2", "redux-thunk": "^2.0.1", "reselect": "^4.0.0", "rison-node": "^2.1.1", + "rsmq": "^0.12.4", "rtlcss": "^2.6.0", "sanitize-filename": "^1.6.3", "serialize-javascript": "^5.0.1", diff --git a/yarn.lock b/yarn.lock index 5ea1c89b47..90a13641b7 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1820,6 +1820,11 @@ dependencies: "@babel/types" "^7.3.0" +"@types/bluebird@*": + version "3.5.36" + resolved "https://registry.yarnpkg.com/@types/bluebird/-/bluebird-3.5.36.tgz#00d9301d4dc35c2f6465a8aec634bb533674c652" + integrity sha512-HBNx4lhkxN7bx6P0++W8E289foSu8kO8GCk2unhuVggO+cE7rh9DhZUyPhUxNRG9m+5B5BTKxZQ5ZP92x/mx9Q== + "@types/body-parser@*": version "1.17.1" resolved "https://registry.yarnpkg.com/@types/body-parser/-/body-parser-1.17.1.tgz#18fcf61768fb5c30ccc508c21d6fd2e8b3bf7897" @@ -2337,6 +2342,20 @@ "@types/d3-shape" "^1" "@types/react" "*" +"@types/redis@^2.8.31": + version "2.8.31" + resolved "https://registry.yarnpkg.com/@types/redis/-/redis-2.8.31.tgz#c11c1b269fec132ac2ec9eb891edf72fc549149e" + integrity sha512-daWrrTDYaa5iSDFbgzZ9gOOzyp2AJmYK59OlG/2KGBgYWF3lfs8GDKm1c//tik5Uc93hDD36O+qLPvzDolChbA== + dependencies: + "@types/node" "*" + +"@types/redlock@^4.0.2": + version "4.0.2" + resolved "https://registry.yarnpkg.com/@types/redlock/-/redlock-4.0.2.tgz#3ae94bc236d4cff12815b62b9b9e51a8e7f9f25f" + integrity sha512-3MMTCWOXrfQFU8dLAbQWDOsftnFagQxmUkfR5KK/DB/zKPUh0ZzPFkNV84nfw1yMFYLfd4MgITGT+XolYd8d1w== + dependencies: + "@types/bluebird" "*" + "@types/redux-mock-store@^1.0.1": version "1.0.1" resolved "https://registry.yarnpkg.com/@types/redux-mock-store/-/redux-mock-store-1.0.1.tgz#90ca701d640aef7c007f564a9a4f8dc03180b0f7" @@ -2420,6 +2439,11 @@ dependencies: source-map "^0.6.1" +"@types/url-join@^4.0.1": + version "4.0.1" + resolved "https://registry.yarnpkg.com/@types/url-join/-/url-join-4.0.1.tgz#4989c97f969464647a8586c7252d97b449cdc045" + integrity sha512-wDXw9LEEUHyV+7UWy7U315nrJGJ7p1BzaCxDpEoLr789Dk1WDVMMlf3iBfbG2F8NdWnYyFbtTxUn2ZNbm1Q4LQ== + "@types/webpack-env@1.15.2": version "1.15.2" resolved "https://registry.yarnpkg.com/@types/webpack-env/-/webpack-env-1.15.2.tgz#927997342bb9f4a5185a86e6579a0a18afc33b0a" @@ -3373,7 +3397,7 @@ bluebird@3.5.1, bluebird@^3.5.1: version "3.5.1" resolved "https://registry.yarnpkg.com/bluebird/-/bluebird-3.5.1.tgz#d9551f9de98f1fcda1e683d17ee91a0602ee2eb9" -bluebird@^3.5.5: +bluebird@^3.5.5, bluebird@^3.7.2: version "3.7.2" resolved "https://registry.yarnpkg.com/bluebird/-/bluebird-3.7.2.tgz#9f229c15be272454ffa973ace0dbee79a1b0c36f" integrity sha512-XpNj6GDQzdfW+r2Wnn7xiSAd7TM3jzkxGXBGTtWKuSXv1xUV+azxAm8jdWZN06QTQk+2N2XB9jRDkvbmQmcRtg== @@ -5051,6 +5075,11 @@ denque@^1.4.1: resolved "https://registry.yarnpkg.com/denque/-/denque-1.4.1.tgz#6744ff7641c148c3f8a69c307e51235c1f4a37cf" integrity sha512-OfzPuSZKGcgr96rf1oODnfjqBFmr1DVoc/TrItj3Ohe0Ah1C5WX5Baquw/9U9KovnQ88EqmJbD66rKYUQYN1tQ== +denque@^1.5.0: + version "1.5.1" + resolved "https://registry.yarnpkg.com/denque/-/denque-1.5.1.tgz#07f670e29c9a78f8faecb2566a1e2c11929c5cbf" + integrity sha512-XwE+iZ4D6ZUB7mfYRMb5wByE8L74HCn30FBN7sWnXksWc1LO1bPDl67pBR9o/kC4z/xSNAwkMYcGgqDV3BE3Hw== + depd@2.0.0, depd@~2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/depd/-/depd-2.0.0.tgz#b696163cc757560d09cf22cc8fad1571b79e76df" @@ -12592,11 +12621,21 @@ redis-commands@^1.5.0: resolved "https://registry.yarnpkg.com/redis-commands/-/redis-commands-1.5.0.tgz#80d2e20698fe688f227127ff9e5164a7dd17e785" integrity sha512-6KxamqpZ468MeQC3bkWmCB1fp56XL64D4Kf0zJSwDZbVLLm7KFkoIcHrgRvQ+sk8dnhySs7+yBg94yIkAK7aJg== +redis-commands@^1.7.0: + version "1.7.0" + resolved "https://registry.yarnpkg.com/redis-commands/-/redis-commands-1.7.0.tgz#15a6fea2d58281e27b1cd1acfb4b293e278c3a89" + integrity sha512-nJWqw3bTFy21hX/CPKHth6sfhZbdiHP6bTawSgQBlKOVRG7EZkfHbbHwQJnrE4vsQf0CMNE+3gJ4Fmm16vdVlQ== + redis-errors@^1.0.0, redis-errors@^1.2.0: version "1.2.0" resolved "https://registry.yarnpkg.com/redis-errors/-/redis-errors-1.2.0.tgz#eb62d2adb15e4eaf4610c04afe1529384250abad" integrity sha1-62LSrbFeTq9GEMBK/hUpOEJQq60= +redis-mock@^0.56.3: + version "0.56.3" + resolved "https://registry.yarnpkg.com/redis-mock/-/redis-mock-0.56.3.tgz#e96471bcc774ddc514c2fc49cdd03cab2baecd89" + integrity sha512-ynaJhqk0Qf3Qajnwvy4aOjS4Mdf9IBkELWtjd+NYhpiqu4QCNq6Vf3Q7c++XRPGiKiwRj9HWr0crcwy7EiPjYQ== + redis-parser@^3.0.0: version "3.0.0" resolved "https://registry.yarnpkg.com/redis-parser/-/redis-parser-3.0.0.tgz#b66d828cdcafe6b4b8a428a7def4c6bcac31c8b4" @@ -12604,7 +12643,7 @@ redis-parser@^3.0.0: dependencies: redis-errors "^1.0.0" -redis@^3.0.0, redis@^3.0.2: +redis@^3.0.0: version "3.0.2" resolved "https://registry.yarnpkg.com/redis/-/redis-3.0.2.tgz#bd47067b8a4a3e6a2e556e57f71cc82c7360150a" integrity sha512-PNhLCrjU6vKVuMOyFu7oSP296mwBkcE6lrAjruBYG5LgdSqtRBoVQIylrMyVZD/lkF24RSNNatzvYag6HRBHjQ== @@ -12614,6 +12653,23 @@ redis@^3.0.0, redis@^3.0.2: redis-errors "^1.2.0" redis-parser "^3.0.0" +redis@^3.1.2: + version "3.1.2" + resolved "https://registry.yarnpkg.com/redis/-/redis-3.1.2.tgz#766851117e80653d23e0ed536254677ab647638c" + integrity sha512-grn5KoZLr/qrRQVwoSkmzdbw6pwF+/rwODtrOr6vuBRiR/f3rjSTGupbF90Zpqm2oenix8Do6RV7pYEkGwlKkw== + dependencies: + denque "^1.5.0" + redis-commands "^1.7.0" + redis-errors "^1.2.0" + redis-parser "^3.0.0" + +redlock@^4.2.0: + version "4.2.0" + resolved "https://registry.yarnpkg.com/redlock/-/redlock-4.2.0.tgz#c26590768559afd5fff76aa1133c94b411ff4f5f" + integrity sha512-j+oQlG+dOwcetUt2WJWttu4CZVeRzUrcVcISFmEmfyuwCVSJ93rDT7YSgg7H7rnxwoRyk/jU46kycVka5tW7jA== + dependencies: + bluebird "^3.7.2" + reduce-css-calc@~1.3.0: version "1.3.0" resolved "https://registry.yarnpkg.com/reduce-css-calc/-/reduce-css-calc-1.3.0.tgz#747c914e049614a4c9cfbba629871ad1d2927716" @@ -13071,6 +13127,14 @@ router-ips@^1.0.0: resolved "https://registry.yarnpkg.com/router-ips/-/router-ips-1.0.0.tgz#44e00858ebebc0133d58e40b2cd8a1fbb04203f5" integrity sha1-ROAIWOvrwBM9WOQLLNih+7BCA/U= +rsmq@^0.12.4: + version "0.12.4" + resolved "https://registry.yarnpkg.com/rsmq/-/rsmq-0.12.4.tgz#de42490956666aa7ddc245f4a5dff9deea91615e" + integrity sha512-xXs0MudraTG0ndRUo8QIG6/G0/xcPFag/kxbfZZ9Xuz7hTeaQLJKaZH5y0B4fcpLkWBXjC8A9ynIHula65bGBQ== + dependencies: + lodash "^4.17.21" + redis "^3.1.2" + rst-selector-parser@^2.2.3: version "2.2.3" resolved "https://registry.yarnpkg.com/rst-selector-parser/-/rst-selector-parser-2.2.3.tgz#81b230ea2fcc6066c89e3472de794285d9b03d91"