diff --git a/eland/common.py b/eland/common.py index 10a02346..772457ab 100644 --- a/eland/common.py +++ b/eland/common.py @@ -31,7 +31,7 @@ DEFAULT_CSV_BATCH_OUTPUT_SIZE = 10000 DEFAULT_PROGRESS_REPORTING_NUM_ROWS = 10000 DEFAULT_ES_MAX_RESULT_WINDOW = 10000 # index.max_result_window -DEFAULT_PAGINATION_SIZE = 10000 # for composite aggregations +DEFAULT_PAGINATION_SIZE = 5000 # for composite aggregations with warnings.catch_warnings(): diff --git a/eland/dataframe.py b/eland/dataframe.py index d7772124..26b5f542 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -1447,10 +1447,10 @@ def groupby( Examples -------- - >>> ed_flights = df = ed.DataFrame('localhost', 'flights', columns=['AvgTicketPrice', 'DistanceKilometers', 'timestamp', 'DestCountry']) - >>> ed_flights.groupby(["DestCountry","Cancelled"]).agg(["min", "max"],True) # doctest: +SKIP - AvgTicketPrice dayOfWeek - min max min max + >>> ed_flights = ed.DataFrame('localhost', 'flights', columns=["AvgTicketPrice", "Cancelled", "dayOfWeek", "timestamp", "DestCountry"]) + >>> ed_flights.groupby(["DestCountry", "Cancelled"]).agg(["min", "max"], True) + AvgTicketPrice dayOfWeek + min max min max DestCountry Cancelled AE False 110.799911 1126.148682 0.0 6.0 True 132.443756 817.931030 0.0 6.0 @@ -1463,10 +1463,10 @@ def groupby( True 102.153069 1192.429932 0.0 6.0 ZA False 102.002663 1196.186157 0.0 6.0 True 121.280296 1175.709961 0.0 6.0 - + [63 rows x 4 columns] - >>> ed_flights.groupby(["DestCountry","Cancelled"]).mean(True) # doctest: +SKIP - AvgTicketPrice dayOfWeek + >>> ed_flights.groupby(["DestCountry", "Cancelled"]).mean(True) + AvgTicketPrice dayOfWeek DestCountry Cancelled AE False 643.956793 2.717949 True 388.828809 2.571429 @@ -1479,18 +1479,18 @@ def groupby( True 579.799066 2.767068 ZA False 636.998605 2.738589 True 677.794078 2.928571 - + [63 rows x 2 columns] """ if by is None: - raise TypeError("by parameter should be specified for groupby") + raise TypeError("by parameter should be specified to groupby") if isinstance(by, str): if by not in self._query_compiler.columns: raise KeyError(f"Requested column [{by}] is not in the DataFrame.") by = [by] - if isinstance(by, list): + if isinstance(by, (list, tuple)): if set(by) - set(self._query_compiler.columns): - raise KeyError("Requested column/s not in the DataFrame.") + raise KeyError("Requested columns not in the DataFrame.") return GroupByDataFrame( by=by, query_compiler=self._query_compiler, dropna=dropna diff --git a/eland/field_mappings.py b/eland/field_mappings.py index 378b71d3..94dd29df 100644 --- a/eland/field_mappings.py +++ b/eland/field_mappings.py @@ -37,7 +37,6 @@ TYPE_CHECKING, List, Set, - Union, ) if TYPE_CHECKING: @@ -728,7 +727,8 @@ def groupby_source_fields(self, by: List[str]) -> Tuple[List[Field], List[Field] a Tuple consisting of a list of field mappings for groupby and non-groupby fields """ - groupby_fields: Union[List[Field], List[None]] = [None] * len(by) + groupby_fields: Dict[str, Field] = {} + # groupby_fields: Union[List[Field], List[None]] = [None] * len(by) aggregatable_fields: List[Field] = [] for index_name, row in self._mappings_capabilities.iterrows(): row = row.to_dict() @@ -736,10 +736,10 @@ def groupby_source_fields(self, by: List[str]) -> Tuple[List[Field], List[Field] if index_name not in by: aggregatable_fields.append(Field(**row)) else: - # Maintain groupby order as given input - groupby_fields[by.index(index_name)] = Field(**row) + groupby_fields[index_name] = Field(**row) - return groupby_fields, aggregatable_fields + # Maintain groupby order as given input + return [groupby_fields[column] for column in by], aggregatable_fields def metric_source_fields(self, include_bool=False, include_timestamp=False): """ diff --git a/eland/groupby.py b/eland/groupby.py index 19d7a1c0..fa9ccd96 100644 --- a/eland/groupby.py +++ b/eland/groupby.py @@ -140,14 +140,6 @@ class GroupByDataFrame(GroupBy): """ - def __init__( - self, - by: Union[str, List[str], None] = None, - query_compiler: Optional["QueryCompiler"] = None, - dropna: bool = True, - ) -> None: - super().__init__(by=by, query_compiler=query_compiler, dropna=dropna) - def aggregate(self, func: Union[str, List[str]], numeric_only: bool = False): """ Used to groupby and aggregate @@ -160,17 +152,19 @@ def aggregate(self, func: Union[str, List[str]], numeric_only: bool = False): Accepted combinations are: - function - list of functions - TODO Implement other functions present in pandas groupby + TODO Implement other combinations present in pandas groupby numeric_only: {True, False, None} Default is None Which datatype to be returned - True: returns all values with float64, NaN/NaT are ignored. - False: returns all values with float64. - None: returns all values with default datatype. """ + if isinstance(func, str): + func = [func] # numeric_only is by default False because pandas does the same return self._query_compiler.groupby( by=self._by, - pd_aggs=([func] if isinstance(func, str) else func), + pd_aggs=func, dropna=self._dropna, numeric_only=numeric_only, is_agg=True, diff --git a/eland/operations.py b/eland/operations.py index 6af3ac71..4731a2eb 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -17,7 +17,17 @@ import copy import warnings -from typing import Optional, Sequence, Tuple, List, Dict, Any +from typing import ( + Generator, + Optional, + Sequence, + Tuple, + List, + Dict, + Any, + TYPE_CHECKING, + Union, +) import numpy as np import pandas as pd @@ -435,8 +445,8 @@ def _calculate_single_agg( ------- a dictionary on which agg caluculations are done. """ - results: Dict[str, Any] = {} + for field in fields: values = [] for es_agg, pd_agg in zip(es_aggs, pd_aggs): @@ -452,73 +462,75 @@ def _calculate_single_agg( elif is_dataframe_agg and numeric_only: if pd_agg == "mad": values.append(field.nan_value) - else: - if isinstance(es_agg, tuple): - agg_value = response["aggregations"][ + continue + + if isinstance(es_agg, tuple): + agg_value = response["aggregations"][ + f"{es_agg[0]}_{field.es_field_name}" + ] + + # Pull multiple values from 'percentiles' result. + if es_agg[0] == "percentiles": + agg_value = agg_value["values"] + + agg_value = agg_value[es_agg[1]] + + # Need to convert 'Population' stddev and variance + # from Elasticsearch into 'Sample' stddev and variance + # which is what pandas uses. + if es_agg[1] in ("std_deviation", "variance"): + # Neither transformation works with count <=1 + count = response["aggregations"][ f"{es_agg[0]}_{field.es_field_name}" - ] - - # Pull multiple values from 'percentiles' result. - if es_agg[0] == "percentiles": - agg_value = agg_value["values"] - - agg_value = agg_value[es_agg[1]] - - # Need to convert 'Population' stddev and variance - # from Elasticsearch into 'Sample' stddev and variance - # which is what pandas uses. - if es_agg[1] in ("std_deviation", "variance"): - # Neither transformation works with count <=1 - count = response["aggregations"][ - f"{es_agg[0]}_{field.es_field_name}" - ]["count"] - - # All of the below calculations result in NaN if count<=1 - if count <= 1: - agg_value = np.NaN - - elif es_agg[1] == "std_deviation": - agg_value *= count / (count - 1.0) - - else: # es_agg[1] == "variance" - # sample_std=\sqrt{\frac{1}{N-1}\sum_{i=1}^N(x_i-\bar{x})^2} - # population_std=\sqrt{\frac{1}{N}\sum_{i=1}^N(x_i-\bar{x})^2} - # sample_std=\sqrt{\frac{N}{N-1}population_std} - agg_value = np.sqrt( - (count / (count - 1.0)) * agg_value * agg_value - ) - else: - agg_value = response["aggregations"][ - f"{es_agg}_{field.es_field_name}" - ]["value"] + ]["count"] - # Null usually means there were no results. - if agg_value is None or np.isnan(agg_value): - if is_dataframe_agg and not numeric_only: - agg_value = np.NaN - elif not is_dataframe_agg and numeric_only is False: + # All of the below calculations result in NaN if count<=1 + if count <= 1: agg_value = np.NaN - # Cardinality is always either NaN or integer. - elif pd_agg == "nunique": - agg_value = int(agg_value) - - # If this is a non-null timestamp field convert to a pd.Timestamp() - elif field.is_timestamp: - agg_value = elasticsearch_date_to_pandas_date( - agg_value, field.es_date_format - ) - # If numeric_only is False | None then maintain column datatype - elif not numeric_only: - # we're only converting to bool for lossless aggs like min, max, and median. - if pd_agg in {"max", "min", "median", "sum"}: - # 'sum' isn't representable with bool, use int64 - if pd_agg == "sum" and field.is_bool: - agg_value = np.int64(agg_value) - else: - agg_value = field.np_dtype.type(agg_value) - - values.append(agg_value) + elif es_agg[1] == "std_deviation": + agg_value *= count / (count - 1.0) + + else: # es_agg[1] == "variance" + # sample_std=\sqrt{\frac{1}{N-1}\sum_{i=1}^N(x_i-\bar{x})^2} + # population_std=\sqrt{\frac{1}{N}\sum_{i=1}^N(x_i-\bar{x})^2} + # sample_std=\sqrt{\frac{N}{N-1}population_std} + agg_value = np.sqrt( + (count / (count - 1.0)) * agg_value * agg_value + ) + else: + agg_value = response["aggregations"][ + f"{es_agg}_{field.es_field_name}" + ]["value"] + + # Null usually means there were no results. + if agg_value is None or np.isnan(agg_value): + if is_dataframe_agg and not numeric_only: + agg_value = np.NaN + elif not is_dataframe_agg and numeric_only is False: + agg_value = np.NaN + + # Cardinality is always either NaN or integer. + elif pd_agg == "nunique": + agg_value = int(agg_value) + + # If this is a non-null timestamp field convert to a pd.Timestamp() + elif field.is_timestamp: + agg_value = elasticsearch_date_to_pandas_date( + agg_value, field.es_date_format + ) + # If numeric_only is False | None then maintain column datatype + elif not numeric_only: + # we're only converting to bool for lossless aggs like min, max, and median. + if pd_agg in {"max", "min", "median", "sum"}: + # 'sum' isn't representable with bool, use int64 + if pd_agg == "sum" and field.is_bool: + agg_value = np.int64(agg_value) + else: + agg_value = field.np_dtype.type(agg_value) + + values.append(agg_value) + # If numeric_only is True and We only have a NaN type field then we check for empty. if values: results[field.index] = values if len(values) > 1 else values[0] @@ -565,7 +577,9 @@ def groupby( is_agg=is_agg, numeric_only=numeric_only, ) + agg_df = pd.DataFrame(results, columns=results.keys()).set_index(by) + if is_agg: # Convert header columns to MultiIndex agg_df.columns = pd.MultiIndex.from_product([headers, pd_aggs]) @@ -628,7 +642,7 @@ def _groupby_aggs( # Construct Query for b in by: - # groupby fields will be termed aggregations + # groupby fields will be term aggregations body.term_aggs(f"groupby_{b.index}", b.index) for field in fields: @@ -673,7 +687,7 @@ def response_generator() -> Generator[List[str], None, List[str]]: Returns ------- A generator which initially yields the bucket - If after_value if found use it to fetch the next set of buckets. + If after_key is found, use it to fetch the next set of buckets. """ while True: @@ -684,10 +698,13 @@ def response_generator() -> Generator[List[str], None, List[str]]: ) # Pagination Logic if "after_key" in res["aggregations"]["groupby_buckets"]: + # yield the bucket which contains the result yield res["aggregations"]["groupby_buckets"]["buckets"] - body.composite_agg( - after_key=res["aggregations"]["groupby_buckets"]["after_key"] + + body.composite_agg_after_key( + name="groupby_buckets", + after_key=res["aggregations"]["groupby_buckets"]["after_key"], ) else: return res["aggregations"]["groupby_buckets"]["buckets"] @@ -697,13 +714,7 @@ def response_generator() -> Generator[List[str], None, List[str]]: for bucket in buckets: # groupby columns are added to result same way they are returned for b in by: - agg_value = bucket["key"][f"groupby_{b.index}"] - # Convert to timestamp - if b.is_timestamp: - agg_value = elasticsearch_date_to_pandas_date( - agg_value, b.es_date_format - ) - response[b.index].append(agg_value) + response[b.index].append(bucket["key"][f"groupby_{b.index}"]) agg_calculation = self._calculate_single_agg( fields=fields, diff --git a/eland/query.py b/eland/query.py index e6d36fa4..6961bd1c 100644 --- a/eland/query.py +++ b/eland/query.py @@ -17,7 +17,7 @@ import warnings from copy import deepcopy -from typing import Optional, Dict, List, Any, Union +from typing import Optional, Dict, List, Any from eland.filter import ( RandomScoreFilter, @@ -153,9 +153,8 @@ def term_aggs(self, name: str, field: str) -> None: def composite_agg( self, - size: int = 10000, - name: str = "groupby_buckets", - after_key: Union[Dict[str, Any], None] = None, + name: str, + size: int, dropna: bool = True, ) -> None: """ @@ -185,34 +184,41 @@ def composite_agg( Pagination size. name: str Name of the buckets - after_key: str - After key to fetch next bunch of results dropna: bool Drop None values if True. TODO Not yet implemented """ + sources: List[Dict[str, Dict[str, str]]] = [] + aggregations: Dict[str, Dict[str, str]] = {} - if after_key is not None: - self._aggs[name]["composite"]["after"] = after_key - else: - sources: List[Dict[str, Dict[str, str]]] = [] - aggregations: Dict[str, Dict[str, str]] = {} + for _name, agg in self._aggs.items(): + if agg.get("terms"): + if not dropna: + agg["terms"]["missing_bucket"] = "true" + sources.append({_name: agg}) + else: + aggregations[_name] = agg - for _name, agg in self._aggs.items(): - if agg.get("terms"): - if not dropna: - agg["terms"]["missing_bucket"] = "true" - sources.append({_name: agg}) - else: - aggregations[_name] = agg + agg = { + "composite": {"size": size, "sources": sources}, + "aggregations": aggregations, + } + self._aggs.clear() + self._aggs[name] = agg - agg = { - "composite": {"size": size, "sources": sources}, - "aggregations": aggregations, - } - self._aggs.clear() - self._aggs[name] = agg + def composite_agg_after_key(self, name: str, after_key: Dict[str, Any]) -> None: + """ + Add's after_key to existing query to fetch next bunch of results + + PARAMETERS + ---------- + name: str + Name of the buckets + after_key: Dict[str, Any] + Dictionary returned from previous query results + """ + self._aggs[name]["composite"]["after"] = after_key def hist_aggs( self, diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 29938159..580f7a34 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -17,7 +17,7 @@ import copy from datetime import datetime -from typing import Optional, Sequence, TYPE_CHECKING, List +from typing import Dict, Optional, Sequence, TYPE_CHECKING, List, Union import numpy as np import pandas as pd @@ -553,7 +553,7 @@ def nunique(self): def groupby( self, by: Union[str, List[str], Dict[str, str]], - pd_aggs: List[str] = [], + pd_aggs: List[str], dropna: bool = True, is_agg: bool = False, numeric_only: bool = True, diff --git a/eland/tests/dataframe/test_groupby_pytest.py b/eland/tests/dataframe/test_groupby_pytest.py index bc9bb279..a2343259 100644 --- a/eland/tests/dataframe/test_groupby_pytest.py +++ b/eland/tests/dataframe/test_groupby_pytest.py @@ -35,8 +35,6 @@ class TestGroupbyDataFrame(TestData): @pytest.mark.parametrize("numeric_only", [True]) def test_groupby_aggregate(self, numeric_only): - # TODO numeric_only False and None - # TODO Add more tests pd_flights = self.pd_flights().filter(self.filter_data) ed_flights = self.ed_flights().filter(self.filter_data) @@ -46,7 +44,45 @@ def test_groupby_aggregate(self, numeric_only): # checking only values because dtypes are checked in other tests assert_frame_equal(pd_groupby, ed_groupby, check_exact=False, check_dtype=False) - def test_groupby_single_agg(self): - # Write tests when grouped is implemented in eland. - # Should write single agg tests + @pytest.mark.parametrize("pd_agg", ["max", "min", "mean", "sum", "median"]) + def test_groupby_aggs_1(self, pd_agg): + pd_flights = self.pd_flights().filter(self.filter_data) + ed_flights = self.ed_flights().filter(self.filter_data) + + pd_groupby = getattr(pd_flights.groupby("Cancelled"), pd_agg)(numeric_only=True) + ed_groupby = getattr(ed_flights.groupby("Cancelled"), pd_agg)(numeric_only=True) + + # checking only values because dtypes are checked in other tests + assert_frame_equal( + pd_groupby, ed_groupby, check_exact=False, check_dtype=False, rtol=4 + ) + + @pytest.mark.parametrize("pd_agg", ["mad", "var", "std"]) + def test_groupby_aggs_2(self, pd_agg): + pd_flights = self.pd_flights().filter(self.filter_data) + ed_flights = self.ed_flights().filter(self.filter_data) + + pd_groupby = getattr(pd_flights.groupby("Cancelled"), pd_agg)() + ed_groupby = getattr(ed_flights.groupby("Cancelled"), pd_agg)(numeric_only=True) + + # checking only values because dtypes are checked in other tests + assert_frame_equal( + pd_groupby, ed_groupby, check_exact=False, check_dtype=False, rtol=4 + ) + + @pytest.mark.parametrize("pd_agg", ["nunique"]) + def test_groupby_aggs_nunique(self, pd_agg): + pd_flights = self.pd_flights().filter(self.filter_data) + ed_flights = self.ed_flights().filter(self.filter_data) + + pd_groupby = getattr(pd_flights.groupby("Cancelled"), pd_agg)() + ed_groupby = getattr(ed_flights.groupby("Cancelled"), pd_agg)() + + # checking only values because dtypes are checked in other tests + assert_frame_equal( + pd_groupby, ed_groupby, check_exact=False, check_dtype=False, rtol=4 + ) + + def test_groupby_dropna(self): + # TODO Add tests once dropna is implemeted pass