Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] [Search] Use async es client endpoints (#76872) #76912

Merged
merged 1 commit into from
Sep 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,24 @@ const mockRollupResponse = {

describe('ES search strategy', () => {
const mockApiCaller = jest.fn();
const mockGetCaller = jest.fn();
const mockSubmitCaller = jest.fn();
const mockLogger: any = {
debug: () => {},
};
const mockContext = {
core: {
elasticsearch: { client: { asCurrentUser: { transport: { request: mockApiCaller } } } },
elasticsearch: {
client: {
asCurrentUser: {
asyncSearch: {
get: mockGetCaller,
submit: mockSubmitCaller,
},
transport: { request: mockApiCaller },
},
},
},
},
};
const mockConfig$ = pluginInitializerContextConfigMock<any>({}).legacy.globalConfig$;
Expand All @@ -56,47 +68,32 @@ describe('ES search strategy', () => {
});

it('makes a POST request to async search with params when no ID is provided', async () => {
mockApiCaller.mockResolvedValueOnce(mockAsyncResponse);
mockSubmitCaller.mockResolvedValueOnce(mockAsyncResponse);

const params = { index: 'logstash-*', body: { query: {} } };
const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger);

await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params });

expect(mockApiCaller).toBeCalled();
const { method, path, body } = mockApiCaller.mock.calls[0][0];
expect(method).toBe('POST');
expect(path).toBe('/logstash-*/_async_search');
expect(body).toEqual({ query: {} });
expect(mockSubmitCaller).toBeCalled();
const request = mockSubmitCaller.mock.calls[0][0];
expect(request.index).toEqual(params.index);
expect(request.body).toEqual(params.body);
});

it('makes a GET request to async search with ID when ID is provided', async () => {
mockApiCaller.mockResolvedValueOnce(mockAsyncResponse);
mockGetCaller.mockResolvedValueOnce(mockAsyncResponse);

const params = { index: 'logstash-*', body: { query: {} } };
const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger);

await esSearch.search((mockContext as unknown) as RequestHandlerContext, { id: 'foo', params });

expect(mockApiCaller).toBeCalled();
const { method, path, body } = mockApiCaller.mock.calls[0][0];
expect(method).toBe('GET');
expect(path).toBe('/_async_search/foo');
expect(body).toEqual(undefined);
});

it('encodes special characters in the path', async () => {
mockApiCaller.mockResolvedValueOnce(mockAsyncResponse);

const params = { index: 'foo-程', body: {} };
const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger);

await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params });

expect(mockApiCaller).toBeCalled();
const { method, path } = mockApiCaller.mock.calls[0][0];
expect(method).toBe('POST');
expect(path).toBe('/foo-%E7%A8%8B/_async_search');
expect(mockGetCaller).toBeCalled();
const request = mockGetCaller.mock.calls[0][0];
expect(request.id).toEqual('foo');
expect(request).toHaveProperty('wait_for_completion_timeout');
expect(request).toHaveProperty('keep_alive');
});

it('calls the rollup API if the index is a rollup type', async () => {
Expand All @@ -117,16 +114,16 @@ describe('ES search strategy', () => {
});

it('sets wait_for_completion_timeout and keep_alive in the request', async () => {
mockApiCaller.mockResolvedValueOnce(mockAsyncResponse);
mockSubmitCaller.mockResolvedValueOnce(mockAsyncResponse);

const params = { index: 'foo-*', body: {} };
const esSearch = await enhancedEsSearchStrategyProvider(mockConfig$, mockLogger);

await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params });

expect(mockApiCaller).toBeCalled();
const { querystring } = mockApiCaller.mock.calls[0][0];
expect(querystring).toHaveProperty('wait_for_completion_timeout');
expect(querystring).toHaveProperty('keep_alive');
expect(mockSubmitCaller).toBeCalled();
const request = mockSubmitCaller.mock.calls[0][0];
expect(request).toHaveProperty('wait_for_completion_timeout');
expect(request).toHaveProperty('keep_alive');
});
});
47 changes: 18 additions & 29 deletions x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ export const enhancedEsSearchStrategyProvider = (

const cancel = async (context: RequestHandlerContext, id: string) => {
logger.debug(`cancel ${id}`);
await context.core.elasticsearch.client.asCurrentUser.transport.request({
method: 'DELETE',
path: encodeURI(`/_async_search/${id}`),
await context.core.elasticsearch.client.asCurrentUser.asyncSearch.delete({
id,
});
};

Expand All @@ -84,39 +83,29 @@ async function asyncSearch(
request: IEnhancedEsSearchRequest,
options?: ISearchOptions
): Promise<IEsSearchResponse> {
const { timeout = undefined, restTotalHitsAsInt = undefined, ...params } = {
...request.params,
};

params.trackTotalHits = true; // Get the exact count of hits

// If we have an ID, then just poll for that ID, otherwise send the entire request body
const { body = undefined, index = undefined, ...queryParams } = request.id ? {} : params;

const method = request.id ? 'GET' : 'POST';
const path = encodeURI(request.id ? `/_async_search/${request.id}` : `/${index}/_async_search`);

// Only report partial results every 64 shards; this should be reduced when we actually display partial results
const batchedReduceSize = request.id ? undefined : 64;
let esResponse;

const asyncOptions = {
waitForCompletionTimeout: '100ms', // Wait up to 100ms for the response to return
keepAlive: '1m', // Extend the TTL for this search request by one minute
};

const querystring = toSnakeCase({
...asyncOptions,
...(batchedReduceSize && { batchedReduceSize }),
...queryParams,
});
// If we have an ID, then just poll for that ID, otherwise send the entire request body
if (!request.id) {
const submitOptions = toSnakeCase({
batchedReduceSize: 64, // Only report partial results every 64 shards; this should be reduced when we actually display partial results
trackTotalHits: true, // Get the exact count of hits
...asyncOptions,
...request.params,
});

// TODO: replace with async endpoints once https://github.com/elastic/elasticsearch-js/issues/1280 is resolved
const esResponse = await client.transport.request({
method,
path,
body,
querystring,
});
esResponse = await client.asyncSearch.submit(submitOptions);
} else {
esResponse = await client.asyncSearch.get({
id: request.id,
...toSnakeCase(asyncOptions),
});
}

const { id, response, is_partial: isPartial, is_running: isRunning } = esResponse.body;
return {
Expand Down