Skip to content

Commit

Permalink
[Search] Re-add support for aborting when a connection is closed (ela…
Browse files Browse the repository at this point in the history
…stic#76470)

* [Search] Re-add support for aborting when a connection is closed

* Fix types

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
lukasolson and elasticmachine committed Sep 13, 2020
1 parent 1f87b08 commit 432a2ab
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ export const esSearchStrategyProvider = (
});

try {
const esResponse = (await context.core.elasticsearch.client.asCurrentUser.search(
params
)) as ApiResponse<SearchResponse<any>>;
const rawResponse = esResponse.body;
// Temporary workaround until https://github.com/elastic/elasticsearch-js/issues/1297
const promise = context.core.elasticsearch.client.asCurrentUser.search(params);
if (options?.abortSignal)
options.abortSignal.addEventListener('abort', () => promise.abort());
const { body: rawResponse } = (await promise) as ApiResponse<SearchResponse<any>>;

if (usage) usage.trackSuccess(rawResponse.took);

Expand Down
26 changes: 18 additions & 8 deletions x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import { first } from 'rxjs/operators';
import { SearchResponse } from 'elasticsearch';
import { Observable } from 'rxjs';
import { TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport';
import { SharedGlobalConfig, RequestHandlerContext, Logger } from '../../../../../src/core/server';
import {
getTotalLoaded,
Expand Down Expand Up @@ -40,8 +41,8 @@ export const enhancedEsSearchStrategyProvider = (

try {
const response = isAsync
? await asyncSearch(context, request)
: await rollupSearch(context, request);
? await asyncSearch(context, request, options)
: await rollupSearch(context, request, options);

if (
usage &&
Expand Down Expand Up @@ -69,9 +70,10 @@ export const enhancedEsSearchStrategyProvider = (

async function asyncSearch(
context: RequestHandlerContext,
request: IEnhancedEsSearchRequest
request: IEnhancedEsSearchRequest,
options?: ISearchOptions
): Promise<IEsSearchResponse> {
let esResponse;
let promise: TransportRequestPromise<any>;
const esClient = context.core.elasticsearch.client.asCurrentUser;
const uiSettingsClient = await context.core.uiSettings.client;

Expand All @@ -89,14 +91,17 @@ export const enhancedEsSearchStrategyProvider = (
...request.params,
});

esResponse = await esClient.asyncSearch.submit(submitOptions);
promise = esClient.asyncSearch.submit(submitOptions);
} else {
esResponse = await esClient.asyncSearch.get({
promise = esClient.asyncSearch.get({
id: request.id,
...toSnakeCase(asyncOptions),
});
}

// Temporary workaround until https://github.com/elastic/elasticsearch-js/issues/1297
if (options?.abortSignal) options.abortSignal.addEventListener('abort', () => promise.abort());
const esResponse = await promise;
const { id, response, is_partial: isPartial, is_running: isRunning } = esResponse.body;
return {
id,
Expand All @@ -109,7 +114,8 @@ export const enhancedEsSearchStrategyProvider = (

const rollupSearch = async function (
context: RequestHandlerContext,
request: IEnhancedEsSearchRequest
request: IEnhancedEsSearchRequest,
options?: ISearchOptions
): Promise<IEsSearchResponse> {
const esClient = context.core.elasticsearch.client.asCurrentUser;
const uiSettingsClient = await context.core.uiSettings.client;
Expand All @@ -123,13 +129,17 @@ export const enhancedEsSearchStrategyProvider = (
...params,
});

const esResponse = await esClient.transport.request({
const promise = esClient.transport.request({
method,
path,
body,
querystring,
});

// Temporary workaround until https://github.com/elastic/elasticsearch-js/issues/1297
if (options?.abortSignal) options.abortSignal.addEventListener('abort', () => promise.abort());
const esResponse = await promise;

const response = esResponse.body as SearchResponse<any>;
return {
rawResponse: response,
Expand Down

0 comments on commit 432a2ab

Please sign in to comment.