diff --git a/server/modules/search/aws/engine.js b/server/modules/search/aws/engine.js index 6c05496b2c..072a69c5de 100644 --- a/server/modules/search/aws/engine.js +++ b/server/modules/search/aws/engine.js @@ -1,6 +1,6 @@ const _ = require('lodash') const AWS = require('aws-sdk') -const { pipeline } = require('stream') +const { pipeline, Transform } = require('stream') module.exports = { async activate() { @@ -20,6 +20,13 @@ module.exports = { secretAccessKey: this.config.secretAccessKey, region: this.config.region }) + this.clientDomain = new AWS.CloudSearchDomain({ + apiVersion: '2013-01-01', + endpoint: this.config.endpoint, + accessKeyId: this.config.accessKeyId, + secretAccessKey: this.config.secretAccessKey, + region: this.config.region + }) let rebuildIndex = false @@ -141,10 +148,30 @@ module.exports = { */ async query(q, opts) { try { + let suggestions = [] + const results = await this.clientDomain.search({ + query: q, + partial: true, + size: 50 + }).promise() + if (results.hits.found < 5) { + const suggestResults = await this.clientDomain.suggest({ + query: q, + suggester: 'default_suggester', + size: 5 + }).promise() + suggestions = suggestResults.suggest.suggestions.map(s => s.suggestion) + } return { - results: [], - suggestions: [], - totalHits: 0 + results: _.map(results.hits.hit, r => ({ + id: r.id, + path: _.head(r.fields.path), + locale: _.head(r.fields.locale), + title: _.head(r.fields.title), + description: _.head(r.fields.description) + })), + suggestions: suggestions, + totalHits: results.hits.found } } catch (err) { WIKI.logger.warn('Search Engine Error:') @@ -157,16 +184,22 @@ module.exports = { * @param {Object} page Page to create */ async created(page) { - await this.client.indexes.use(this.config.indexName).index([ - { - id: page.hash, - locale: page.localeCode, - path: page.path, - title: page.title, - description: page.description, - content: page.content - } - ]) + await this.clientDomain.uploadDocuments({ + contentType: 'application/json', + documents: JSON.stringify([ + { + type: 'add', + id: page.hash, + fields: { + locale: page.localeCode, + path: page.path, + title: page.title, + description: page.description, + content: page.content + } + } + ]) + }).promise() }, /** * UPDATE @@ -174,16 +207,22 @@ module.exports = { * @param {Object} page Page to update */ async updated(page) { - await this.client.indexes.use(this.config.indexName).index([ - { - id: page.hash, - locale: page.localeCode, - path: page.path, - title: page.title, - description: page.description, - content: page.content - } - ]) + await this.clientDomain.uploadDocuments({ + contentType: 'application/json', + documents: JSON.stringify([ + { + type: 'add', + id: page.hash, + fields: { + locale: page.localeCode, + path: page.path, + title: page.title, + description: page.description, + content: page.content + } + } + ]) + }).promise() }, /** * DELETE @@ -191,12 +230,15 @@ module.exports = { * @param {Object} page Page to delete */ async deleted(page) { - await this.client.indexes.use(this.config.indexName).index([ - { - '@search.action': 'delete', - id: page.hash - } - ]) + await this.clientDomain.uploadDocuments({ + contentType: 'application/json', + documents: JSON.stringify([ + { + type: 'delete', + id: page.hash + } + ]) + }).promise() }, /** * RENAME @@ -204,33 +246,122 @@ module.exports = { * @param {Object} page Page to rename */ async renamed(page) { - await this.client.indexes.use(this.config.indexName).index([ - { - '@search.action': 'delete', - id: page.sourceHash - } - ]) - await this.client.indexes.use(this.config.indexName).index([ - { - id: page.destinationHash, - locale: page.localeCode, - path: page.destinationPath, - title: page.title, - description: page.description, - content: page.content - } - ]) + await this.clientDomain.uploadDocuments({ + contentType: 'application/json', + documents: JSON.stringify([ + { + type: 'delete', + id: page.sourceHash + } + ]) + }).promise() + await this.clientDomain.uploadDocuments({ + contentType: 'application/json', + documents: JSON.stringify([ + { + type: 'add', + id: page.destinationHash, + fields: { + locale: page.localeCode, + path: page.destinationPath, + title: page.title, + description: page.description, + content: page.content + } + } + ]) + }).promise() }, /** * REBUILD INDEX */ async rebuild() { + WIKI.logger.info(`(SEARCH/AWS) Rebuilding Index...`) + + const MAX_DOCUMENT_BYTES = Math.pow(2, 20) + const MAX_INDEXING_BYTES = 5 * Math.pow(2, 20) - Buffer.from('[').byteLength - Buffer.from(']').byteLength + const MAX_INDEXING_COUNT = 1000 + const COMMA_BYTES = Buffer.from(',').byteLength + + let chunks = [] + let bytes = 0 + + const processDocument = async (cb, doc) => { + try { + if (doc) { + const docBytes = Buffer.from(JSON.stringify(doc)).byteLength + // -> Document too large + if (docBytes >= MAX_DOCUMENT_BYTES) { + throw new Error('Document exceeds maximum size allowed by AWS CloudSearch.') + } + + // -> Current batch exceeds size hard limit, flush + if (docBytes + COMMA_BYTES + bytes >= MAX_INDEXING_BYTES) { + await flushBuffer() + } + + if (chunks.length > 0) { + bytes += COMMA_BYTES + } + bytes += docBytes + chunks.push(doc) + + // -> Current batch exceeds count soft limit, flush + if (chunks.length >= MAX_INDEXING_COUNT) { + await flushBuffer() + } + } else { + // -> End of stream, flush + await flushBuffer() + } + cb() + } catch (err) { + cb(err) + } + } + + const flushBuffer = async () => { + WIKI.logger.info(`(SEARCH/AWS) Sending batch of ${chunks.length}...`) + try { + const resp = await this.clientDomain.uploadDocuments({ + contentType: 'application/json', + documents: JSON.stringify(_.map(chunks, doc => ({ + type: 'add', + id: doc.id, + fields: { + locale: doc.locale, + path: doc.path, + title: doc.title, + description: doc.description, + content: doc.content + } + }))) + }).promise() + } catch (err) { + WIKI.logger.warn('(SEARCH/AWS) Failed to send batch to AWS CloudSearch: ', err) + } + chunks.length = 0 + bytes = 0 + } + await pipeline( WIKI.models.knex.column({ id: 'hash' }, 'path', { locale: 'localeCode' }, 'title', 'description', 'content').select().from('pages').where({ isPublished: true, isPrivate: false }).stream(), - this.client.indexes.use(this.config.indexName).createIndexingStream() + new Transform({ + objectMode: true, + transform: async (chunk, enc, cb) => await processDocument(cb, chunk), + flush: async (cb) => await processDocument(cb) + }) ) + + WIKI.logger.info(`(SEARCH/AWS) Requesting Index Rebuild...`) + await this.client.indexDocuments({ + DomainName: this.config.domain + }).promise() + + WIKI.logger.info(`(SEARCH/AWS) Index rebuilt successfully.`) } } +