diff --git a/eland/operations.py b/eland/operations.py index 4464000c..48c33448 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -1234,13 +1234,14 @@ def _es_results( if sort_params: body["sort"] = [sort_params] - es_results = list( - search_yield_hits( + es_results: List[Dict[str, Any]] = sum( + _search_yield_hits( query_compiler=query_compiler, body=body, max_number_of_hits=result_size - ) + ), + [], ) - _, df = query_compiler._es_results_to_pandas( + df = query_compiler._es_results_to_pandas( results=es_results, show_progress=show_progress ) df = self._apply_df_post_processing(df, post_processing) @@ -1447,14 +1448,16 @@ def quantile_to_percentile(quantile: Union[int, float]) -> float: return float(min(100, max(0, quantile * 100))) -def search_yield_hits( +def _search_yield_hits( query_compiler: "QueryCompiler", body: Dict[str, Any], max_number_of_hits: Optional[int], -) -> Generator[Dict[str, Any], None, None]: +) -> Generator[List[Dict[str, Any]], None, None]: """ This is a generator used to initialize point in time API and query the - search API and return generator which yields an individual documents + search API and return generator which yields batches of hits as they + come in. No empty batches will be yielded, if there are no hits then + no batches will be yielded instead. Parameters ---------- @@ -1469,8 +1472,8 @@ def search_yield_hits( Examples -------- >>> results = list(search_yield_hits(query_compiler, body, 2)) # doctest: +SKIP - [{'_index': 'flights', '_type': '_doc', '_id': '0', '_score': None, '_source': {...}, 'sort': [...]}, - {'_index': 'flights', '_type': '_doc', '_id': '1', '_score': None, '_source': {...}, 'sort': [...]}] + [[{'_index': 'flights', '_type': '_doc', '_id': '0', '_score': None, '_source': {...}, 'sort': [...]}, + {'_index': 'flights', '_type': '_doc', '_id': '1', '_score': None, '_source': {...}, 'sort': [...]}]] """ # Make a copy of 'body' to avoid mutating it outside this function. body = body.copy() @@ -1500,7 +1503,7 @@ def _search_with_scroll( query_compiler: "QueryCompiler", body: Dict[str, Any], max_number_of_hits: Optional[int], -) -> Generator[Dict[str, Any], None, None]: +) -> Generator[List[Dict[str, Any]], None, None]: # No documents, no reason to send a search. if max_number_of_hits == 0: return @@ -1533,8 +1536,11 @@ def _search_with_scroll( hits_to_yield = min(len(hits), max_number_of_hits - hits_yielded) # Yield the hits we need to and then track the total number. - yield from hits[:hits_to_yield] - hits_yielded += hits_to_yield + # Never yield an empty list as that makes things simpler for + # downstream consumers. + if hits and hits_to_yield > 0: + yield hits[:hits_to_yield] + hits_yielded += hits_to_yield # Retrieve the next set of results resp = client.scroll( @@ -1555,7 +1561,7 @@ def _search_with_pit_and_search_after( query_compiler: "QueryCompiler", body: Dict[str, Any], max_number_of_hits: Optional[int], -) -> Generator[Dict[str, Any], None, None]: +) -> Generator[List[Dict[str, Any]], None, None]: # No documents, no reason to send a search. if max_number_of_hits == 0: @@ -1602,8 +1608,11 @@ def _search_with_pit_and_search_after( hits_to_yield = min(len(hits), max_number_of_hits - hits_yielded) # Yield the hits we need to and then track the total number. - yield from hits[:hits_to_yield] - hits_yielded += hits_to_yield + # Never yield an empty list as that makes things simpler for + # downstream consumers. + if hits and hits_to_yield > 0: + yield hits[:hits_to_yield] + hits_yielded += hits_to_yield # Set the 'search_after' for the next request # to be the last sort value for this set of hits. diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 00dda543..207d1c15 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -148,7 +148,6 @@ def es_dtypes(self) -> pd.Series: def _es_results_to_pandas( self, results: List[Dict[str, Any]], - batch_size: Optional[int] = None, show_progress: bool = False, ) -> "pd.Dataframe": """ @@ -239,10 +238,8 @@ def _es_results_to_pandas( (which isn't great) NOTE - using this lists is generally not a good way to use this API """ - partial_result = False - - if results is None: - return partial_result, self._empty_pd_ef() + if not results: + return self._empty_pd_ef() # This is one of the most performance critical areas of eland, and it repeatedly calls # self._mappings.field_name_pd_dtype and self._mappings.date_field_format @@ -253,8 +250,7 @@ def _es_results_to_pandas( index = [] i = 0 - for hit in results: - i = i + 1 + for i, hit in enumerate(results, 1): if "_source" in hit: row = hit["_source"] @@ -277,11 +273,6 @@ def _es_results_to_pandas( # flatten row to map correctly to 2D DataFrame rows.append(self._flatten_dict(row, field_mapping_cache)) - if batch_size is not None: - if i >= batch_size: - partial_result = True - break - if show_progress: if i % DEFAULT_PROGRESS_REPORTING_NUM_ROWS == 0: print(f"{datetime.now()}: read {i} rows") @@ -310,7 +301,7 @@ def _es_results_to_pandas( if show_progress: print(f"{datetime.now()}: read {i} rows") - return partial_result, df + return df def _flatten_dict(self, y, field_mapping_cache: "FieldMappingCache"): out = {}