From 5dea23c3b2cfa6a110bb7158dcc57eb3fe1531e6 Mon Sep 17 00:00:00 2001 From: Liza Katz Date: Tue, 8 Sep 2020 12:34:40 +0300 Subject: [PATCH] [Search] Use async es client endpoints (#76872) * Use ES Client asyncSearch * Rename to queryOptions * Simplify options * Update jest test and use delete route * Common async options --- .../server/search/es_search_strategy.test.ts | 61 +++++++++---------- .../server/search/es_search_strategy.ts | 47 ++++++-------- 2 files changed, 47 insertions(+), 61 deletions(-) diff --git a/x-pack/plugins/data_enhanced/server/search/es_search_strategy.test.ts b/x-pack/plugins/data_enhanced/server/search/es_search_strategy.test.ts index 054baa6ac81d1..a287f72ca9161 100644 --- a/x-pack/plugins/data_enhanced/server/search/es_search_strategy.test.ts +++ b/x-pack/plugins/data_enhanced/server/search/es_search_strategy.test.ts @@ -35,12 +35,24 @@ const mockRollupResponse = { describe('ES search strategy', () => { const mockApiCaller = jest.fn(); + const mockGetCaller = jest.fn(); + const mockSubmitCaller = jest.fn(); const mockLogger: any = { debug: () => {}, }; const mockContext = { core: { - elasticsearch: { client: { asCurrentUser: { transport: { request: mockApiCaller } } } }, + elasticsearch: { + client: { + asCurrentUser: { + asyncSearch: { + get: mockGetCaller, + submit: mockSubmitCaller, + }, + transport: { request: mockApiCaller }, + }, + }, + }, }, }; const mockConfig$ = pluginInitializerContextConfigMock({}).legacy.globalConfig$; @@ -56,47 +68,32 @@ describe('ES search strategy', () => { }); it('makes a POST request to async search with params when no ID is provided', async () => { - mockApiCaller.mockResolvedValueOnce(mockAsyncResponse); + mockSubmitCaller.mockResolvedValueOnce(mockAsyncResponse); const params = { index: 'logstash-*', body: { query: {} } }; const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger); await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params }); - expect(mockApiCaller).toBeCalled(); - const { method, path, body } = mockApiCaller.mock.calls[0][0]; - expect(method).toBe('POST'); - expect(path).toBe('/logstash-*/_async_search'); - expect(body).toEqual({ query: {} }); + expect(mockSubmitCaller).toBeCalled(); + const request = mockSubmitCaller.mock.calls[0][0]; + expect(request.index).toEqual(params.index); + expect(request.body).toEqual(params.body); }); it('makes a GET request to async search with ID when ID is provided', async () => { - mockApiCaller.mockResolvedValueOnce(mockAsyncResponse); + mockGetCaller.mockResolvedValueOnce(mockAsyncResponse); const params = { index: 'logstash-*', body: { query: {} } }; const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger); await esSearch.search((mockContext as unknown) as RequestHandlerContext, { id: 'foo', params }); - expect(mockApiCaller).toBeCalled(); - const { method, path, body } = mockApiCaller.mock.calls[0][0]; - expect(method).toBe('GET'); - expect(path).toBe('/_async_search/foo'); - expect(body).toEqual(undefined); - }); - - it('encodes special characters in the path', async () => { - mockApiCaller.mockResolvedValueOnce(mockAsyncResponse); - - const params = { index: 'foo-程', body: {} }; - const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger); - - await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params }); - - expect(mockApiCaller).toBeCalled(); - const { method, path } = mockApiCaller.mock.calls[0][0]; - expect(method).toBe('POST'); - expect(path).toBe('/foo-%E7%A8%8B/_async_search'); + expect(mockGetCaller).toBeCalled(); + const request = mockGetCaller.mock.calls[0][0]; + expect(request.id).toEqual('foo'); + expect(request).toHaveProperty('wait_for_completion_timeout'); + expect(request).toHaveProperty('keep_alive'); }); it('calls the rollup API if the index is a rollup type', async () => { @@ -117,16 +114,16 @@ describe('ES search strategy', () => { }); it('sets wait_for_completion_timeout and keep_alive in the request', async () => { - mockApiCaller.mockResolvedValueOnce(mockAsyncResponse); + mockSubmitCaller.mockResolvedValueOnce(mockAsyncResponse); const params = { index: 'foo-*', body: {} }; const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger); await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params }); - expect(mockApiCaller).toBeCalled(); - const { querystring } = mockApiCaller.mock.calls[0][0]; - expect(querystring).toHaveProperty('wait_for_completion_timeout'); - expect(querystring).toHaveProperty('keep_alive'); + expect(mockSubmitCaller).toBeCalled(); + const request = mockSubmitCaller.mock.calls[0][0]; + expect(request).toHaveProperty('wait_for_completion_timeout'); + expect(request).toHaveProperty('keep_alive'); }); }); diff --git a/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts b/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts index 67a42b9954c9d..4ace1c4c5385b 100644 --- a/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts +++ b/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts @@ -70,9 +70,8 @@ export const enhancedEsSearchStrategyProvider = ( const cancel = async (context: RequestHandlerContext, id: string) => { logger.debug(`cancel ${id}`); - await context.core.elasticsearch.client.asCurrentUser.transport.request({ - method: 'DELETE', - path: encodeURI(`/_async_search/${id}`), + await context.core.elasticsearch.client.asCurrentUser.asyncSearch.delete({ + id, }); }; @@ -84,39 +83,29 @@ async function asyncSearch( request: IEnhancedEsSearchRequest, options?: ISearchOptions ): Promise { - const { timeout = undefined, restTotalHitsAsInt = undefined, ...params } = { - ...request.params, - }; - - params.trackTotalHits = true; // Get the exact count of hits - - // If we have an ID, then just poll for that ID, otherwise send the entire request body - const { body = undefined, index = undefined, ...queryParams } = request.id ? {} : params; - - const method = request.id ? 'GET' : 'POST'; - const path = encodeURI(request.id ? `/_async_search/${request.id}` : `/${index}/_async_search`); - - // Only report partial results every 64 shards; this should be reduced when we actually display partial results - const batchedReduceSize = request.id ? undefined : 64; + let esResponse; const asyncOptions = { waitForCompletionTimeout: '100ms', // Wait up to 100ms for the response to return keepAlive: '1m', // Extend the TTL for this search request by one minute }; - const querystring = toSnakeCase({ - ...asyncOptions, - ...(batchedReduceSize && { batchedReduceSize }), - ...queryParams, - }); + // If we have an ID, then just poll for that ID, otherwise send the entire request body + if (!request.id) { + const submitOptions = toSnakeCase({ + batchedReduceSize: 64, // Only report partial results every 64 shards; this should be reduced when we actually display partial results + trackTotalHits: true, // Get the exact count of hits + ...asyncOptions, + ...request.params, + }); - // TODO: replace with async endpoints once https://github.com/elastic/elasticsearch-js/issues/1280 is resolved - const esResponse = await client.transport.request({ - method, - path, - body, - querystring, - }); + esResponse = await client.asyncSearch.submit(submitOptions); + } else { + esResponse = await client.asyncSearch.get({ + id: request.id, + ...toSnakeCase(asyncOptions), + }); + } const { id, response, is_partial: isPartial, is_running: isRunning } = esResponse.body; return {