From a55ab2af53e4dc295f111749a27c6121212b919c Mon Sep 17 00:00:00 2001 From: Seth Michael Larson Date: Sun, 8 Aug 2021 11:04:09 -0500 Subject: [PATCH] Fallback on scroll for Elasticsearch <7.12 --- .ci/test-matrix.yml | 1 + eland/operations.py | 124 +++++++++++++++++++++++++++++++++++++------- 2 files changed, 105 insertions(+), 20 deletions(-) diff --git a/.ci/test-matrix.yml b/.ci/test-matrix.yml index ecb2daf5..cb7f6a0f 100755 --- a/.ci/test-matrix.yml +++ b/.ci/test-matrix.yml @@ -4,6 +4,7 @@ ELASTICSEARCH_VERSION: - 8.0.0-SNAPSHOT - 7.x-SNAPSHOT - 7.14-SNAPSHOT + - 7.11-SNAPSHOT PANDAS_VERSION: - 1.2.0 diff --git a/eland/operations.py b/eland/operations.py index c72d7e06..2b8f5005 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -63,6 +63,7 @@ from numpy.typing import DTypeLike from eland.arithmetics import ArithmeticSeries + from eland.common import es_version from eland.field_mappings import Field from eland.filter import BooleanFilter from eland.query_compiler import QueryCompiler @@ -1244,7 +1245,7 @@ def _es_results( body["sort"] = [sort_params] es_results = list( - search_after_with_pit( + search_yield_hits( query_compiler=query_compiler, body=body, max_number_of_hits=result_size ) ) @@ -1509,14 +1510,14 @@ def show_progress(self) -> bool: return self._show_progress -def search_after_with_pit( +def search_yield_hits( query_compiler: "QueryCompiler", body: Dict[str, Any], max_number_of_hits: Optional[int], ) -> Generator[Dict[str, Any], None, None]: """ This is a generator used to initialize point in time API and query the - search API and return generator which yields an individual document + search API and return generator which yields an individual documents Parameters ---------- @@ -1530,39 +1531,122 @@ def search_after_with_pit( Examples -------- - >>> results = list(search_after_with_pit(query_compiler, body, 2)) # doctest: +SKIP + >>> results = list(search_yield_hits(query_compiler, body, 2)) # doctest: +SKIP [{'_index': 'flights', '_type': '_doc', '_id': '0', '_score': None, '_source': {...}, 'sort': [...]}, {'_index': 'flights', '_type': '_doc', '_id': '1', '_score': None, '_source': {...}, 'sort': [...]}] - """ + # Make a copy of 'body' to avoid mutating it outside this function. + body = body.copy() + + # Use the default search size + body.setdefault("size", DEFAULT_SEARCH_SIZE) + + # Improves performance by not tracking # of hits. We only + # care about the hit itself for these queries. + body.setdefault("track_total_hits", False) + + # Elasticsearch 7.12 added '_shard_doc' sort tiebreaker for PITs which + # means we're guaranteed to be safe on documents with a duplicate sort rank. + if es_version(query_compiler._client) >= (7, 12, 0): + yield from _search_with_pit_and_search_after( + query_compiler=query_compiler, + body=body, + max_number_of_hits=max_number_of_hits, + ) + + # Otherwise we use 'scroll' like we used to. + else: + yield from _search_with_scroll( + query_compiler=query_compiler, + body=body, + max_number_of_hits=max_number_of_hits, + ) + + +def _search_with_scroll( + query_compiler: "QueryCompiler", + body: Dict[str, Any], + max_number_of_hits: Optional[int], +) -> Generator[Dict[str, Any], None, None]: # No documents, no reason to send a search. if max_number_of_hits == 0: return + client = query_compiler._client + hits_yielded = 0 + + # Make the initial search with 'scroll' set + resp = client.search( + index=query_compiler._index_pattern, + body=body, + scroll=DEFAULT_PIT_KEEP_ALIVE, + ) + scroll_id: Optional[str] = resp.get("_scroll_id", None) + + try: + while scroll_id and ( + max_number_of_hits is None or hits_yielded < max_number_of_hits + ): + hits: List[Dict[str, Any]] = resp["hits"]["hits"] + + # If we didn't receive any hits it means we've reached the end. + if not hits: + break + + # Calculate which hits should be yielded from this batch + if max_number_of_hits is None: + hits_to_yield = len(hits) + else: + hits_to_yield = min(len(hits), max_number_of_hits - hits_yielded) + + # Yield the hits we need to and then track the total number. + yield from hits[:hits_to_yield] + hits_yielded += hits_to_yield + + # Retrieve the next set of results + resp = client.scroll( + body={"scroll_id": scroll_id, "scroll": DEFAULT_PIT_KEEP_ALIVE}, + ) + scroll_id = resp.get("_scroll_id", None) # Update the scroll ID. + + finally: + # Close the scroll if we have one open + if scroll_id is not None: + try: + client.clear_scroll(body={"scroll_id": [scroll_id]}) + except NotFoundError: + pass + + +def _search_with_pit_and_search_after( + query_compiler: "QueryCompiler", + body: Dict[str, Any], + max_number_of_hits: Optional[int], +) -> Generator[Dict[str, Any], None, None]: + + # No documents, no reason to send a search. + if max_number_of_hits == 0: + return + + client = query_compiler._client hits_yielded = 0 # Track the total number of hits yielded. pit_id: Optional[str] = None + + # Pagination with 'search_after' must have a 'sort' setting. + # Using '_doc:asc' is the most efficient as reads documents + # in the order that they're written on disk in Lucene. + body.setdefault("sort", [{"_doc": "asc"}]) + try: - pit_id = query_compiler._client.open_point_in_time( + pit_id = client.open_point_in_time( index=query_compiler._index_pattern, keep_alive=DEFAULT_PIT_KEEP_ALIVE )["id"] # Modify the search with the new point in time ID and keep-alive time. body["pit"] = {"id": pit_id, "keep_alive": DEFAULT_PIT_KEEP_ALIVE} - # Use the default search size - body.setdefault("size", DEFAULT_SEARCH_SIZE) - - # Improves performance by not tracking # of hits. We only - # care about the hit itself for these queries. - body.setdefault("track_total_hits", False) - - # Pagination with 'search_after' must have a 'sort' setting. - # Using '_doc:asc' is the most efficient as reads documents - # in the order that they're written on disk in Lucene. - body.setdefault("sort", [{"_doc": "asc"}]) - while max_number_of_hits is None or hits_yielded < max_number_of_hits: - resp = query_compiler._client.search(body=body) + resp = client.search(body=body) hits: List[Dict[str, Any]] = resp["hits"]["hits"] # The point in time ID can change between searches so we @@ -1593,7 +1677,7 @@ def search_after_with_pit( # to keep our memory footprint low. if pit_id is not None: try: - query_compiler._client.close_point_in_time(body={"id": pit_id}) + client.close_point_in_time(body={"id": pit_id}) except NotFoundError: # If a point in time is already closed Elasticsearch throws NotFoundError pass