From 66f7d130ffa5b8c8cd84e261933efc651fe8e0c8 Mon Sep 17 00:00:00 2001 From: "P. Sai Vinay" Date: Sun, 4 Oct 2020 23:04:21 +0530 Subject: [PATCH] Add groupby to dataframe --- eland/common.py | 1 + eland/dataframe.py | 68 ++- eland/field_mappings.py | 40 +- eland/groupby.py | 179 ++++++++ eland/operations.py | 446 +++++++++++++++---- eland/query.py | 80 +++- eland/query_compiler.py | 12 +- eland/tests/dataframe/test_groupby_pytest.py | 52 +++ 8 files changed, 779 insertions(+), 99 deletions(-) create mode 100644 eland/groupby.py create mode 100644 eland/tests/dataframe/test_groupby_pytest.py diff --git a/eland/common.py b/eland/common.py index a9078f8b..10a02346 100644 --- a/eland/common.py +++ b/eland/common.py @@ -31,6 +31,7 @@ DEFAULT_CSV_BATCH_OUTPUT_SIZE = 10000 DEFAULT_PROGRESS_REPORTING_NUM_ROWS = 10000 DEFAULT_ES_MAX_RESULT_WINDOW = 10000 # index.max_result_window +DEFAULT_PAGINATION_SIZE = 10000 # for composite aggregations with warnings.catch_warnings(): diff --git a/eland/dataframe.py b/eland/dataframe.py index bd25a75c..d7772124 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -19,7 +19,7 @@ import warnings from io import StringIO import re -from typing import Optional, Sequence, Union, Tuple, List +from typing import Dict, List, Optional, Sequence, Union, Tuple import numpy as np import pandas as pd @@ -39,6 +39,7 @@ from eland.common import DEFAULT_NUM_ROWS_DISPLAYED, docstring_parameter from eland.filter import BooleanFilter from eland.utils import deprecated_api, is_valid_attr_name +from eland.groupby import GroupByDataFrame class DataFrame(NDFrame): @@ -1430,6 +1431,71 @@ def aggregate( hist = gfx.ed_hist_frame + def groupby( + self, by: Union[List[str], Dict[str, str], None] = None, dropna: bool = True + ) -> "GroupByDataFrame": + """ + Used to perform groupby operations + + Parameters + ---------- + by: List[str] + + Returns + ------- + GroupByDataFrame + + Examples + -------- + >>> ed_flights = df = ed.DataFrame('localhost', 'flights', columns=['AvgTicketPrice', 'DistanceKilometers', 'timestamp', 'DestCountry']) + >>> ed_flights.groupby(["DestCountry","Cancelled"]).agg(["min", "max"],True) # doctest: +SKIP + AvgTicketPrice dayOfWeek + min max min max + DestCountry Cancelled + AE False 110.799911 1126.148682 0.0 6.0 + True 132.443756 817.931030 0.0 6.0 + AR False 125.589394 1199.642822 0.0 6.0 + True 251.389603 1172.382568 0.0 6.0 + AT False 100.020531 1181.835815 0.0 6.0 + ... ... ... ... ... + TR True 307.915649 307.915649 0.0 0.0 + US False 100.145966 1199.729004 0.0 6.0 + True 102.153069 1192.429932 0.0 6.0 + ZA False 102.002663 1196.186157 0.0 6.0 + True 121.280296 1175.709961 0.0 6.0 + + [63 rows x 4 columns] + >>> ed_flights.groupby(["DestCountry","Cancelled"]).mean(True) # doctest: +SKIP + AvgTicketPrice dayOfWeek + DestCountry Cancelled + AE False 643.956793 2.717949 + True 388.828809 2.571429 + AR False 673.551677 2.746154 + True 682.197241 2.733333 + AT False 647.158290 2.819936 + ... ... ... + TR True 307.915649 0.000000 + US False 598.063146 2.752014 + True 579.799066 2.767068 + ZA False 636.998605 2.738589 + True 677.794078 2.928571 + + [63 rows x 2 columns] + """ + if by is None: + raise TypeError("by parameter should be specified for groupby") + if isinstance(by, str): + if by not in self._query_compiler.columns: + raise KeyError(f"Requested column [{by}] is not in the DataFrame.") + by = [by] + if isinstance(by, list): + if set(by) - set(self._query_compiler.columns): + raise KeyError("Requested column/s not in the DataFrame.") + + return GroupByDataFrame( + by=by, query_compiler=self._query_compiler, dropna=dropna + ) + def query(self, expr) -> "DataFrame": """ Query the columns of a DataFrame with a boolean expression. diff --git a/eland/field_mappings.py b/eland/field_mappings.py index 32b71c41..78fcea47 100644 --- a/eland/field_mappings.py +++ b/eland/field_mappings.py @@ -33,9 +33,11 @@ Mapping, Dict, Any, + Tuple, TYPE_CHECKING, List, Set, + Union, ) if TYPE_CHECKING: @@ -697,14 +699,48 @@ def numeric_source_fields(self): pd_dtypes, es_field_names, es_date_formats = self.metric_source_fields() return es_field_names - def all_source_fields(self): - source_fields = [] + def all_source_fields(self) -> List[Field]: + """ + This method is used to return all Field Mappings for fields + + Returns + ------- + a list of Field Mappings + """ + source_fields: List[Field] = [] for index, row in self._mappings_capabilities.iterrows(): row = row.to_dict() row["index"] = index source_fields.append(Field(**row)) return source_fields + def groupby_source_fields(self, by: List[str]) -> Tuple[List[Field], List[Field]]: + """ + This method returns all Field Mappings for groupby and non-groupby fields + + Parameters + ---------- + by: + a list of groupby fields + + Returns + ------- + a Tuple consisting of a list of field mappings for groupby and non-groupby fields + + """ + groupby_fields: Union[List[Field], List[None]] = [None] * len(by) + aggregatable_fields: List[Field] = [] + for index_name, row in self._mappings_capabilities.iterrows(): + row = row.to_dict() + row["index"] = index_name + if index_name not in by: + aggregatable_fields.append(Field(**row)) + else: + # Maintain groupby order as given input + groupby_fields[by.index(index_name)] = Field(**row) + + return groupby_fields, aggregatable_fields + def metric_source_fields(self, include_bool=False, include_timestamp=False): """ Returns diff --git a/eland/groupby.py b/eland/groupby.py new file mode 100644 index 00000000..19d7a1c0 --- /dev/null +++ b/eland/groupby.py @@ -0,0 +1,179 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from abc import ABC +from typing import List, Optional, Union +from eland.query_compiler import QueryCompiler + + +class GroupBy(ABC): + """ + This holds all the groupby base methods + + Parameters + ---------- + by: + List of columns to groupby + query_compiler: + Query compiler object + dropna: + default is true, drop None/NaT/NaN values while grouping + """ + + def __init__( + self, + by: Union[str, List[str], None] = None, + query_compiler: Optional["QueryCompiler"] = None, + dropna: bool = True, + ) -> None: + self._query_compiler: "QueryCompiler" = QueryCompiler(to_copy=query_compiler) + self._dropna: bool = dropna + self._by: Union[str, List[str]] = by + + def mean(self, numeric_only: bool = True): + # numeric_only=True becuase pandas does the same + return self._query_compiler.groupby( + by=self._by, + pd_aggs=["mean"], + dropna=self._dropna, + numeric_only=numeric_only, + ) + + def var(self, numeric_only: bool = True): + # numeric_only=True becuase pandas does the same + return self._query_compiler.groupby( + by=self._by, + pd_aggs=["var"], + dropna=self._dropna, + numeric_only=numeric_only, + ) + + def std(self, numeric_only: bool = True): + # numeric_only=True becuase pandas does the same + return self._query_compiler.groupby( + by=self._by, + pd_aggs=["std"], + dropna=self._dropna, + numeric_only=numeric_only, + ) + + def mad(self, numeric_only: bool = True): + # numeric_only=True becuase pandas does the same + return self._query_compiler.groupby( + by=self._by, + pd_aggs=["mad"], + dropna=self._dropna, + numeric_only=numeric_only, + ) + + def median(self, numeric_only: bool = True): + # numeric_only=True becuase pandas does the same + return self._query_compiler.groupby( + by=self._by, + pd_aggs=["median"], + dropna=self._dropna, + numeric_only=numeric_only, + ) + + def sum(self, numeric_only: bool = True): + # numeric_only=True becuase pandas does the same + return self._query_compiler.groupby( + by=self._by, + pd_aggs=["sum"], + dropna=self._dropna, + numeric_only=numeric_only, + ) + + def min(self, numeric_only: bool = True): + # numeric_only=True becuase pandas does the same + return self._query_compiler.groupby( + by=self._by, + pd_aggs=["min"], + dropna=self._dropna, + numeric_only=numeric_only, + ) + + def max(self, numeric_only: bool = True): + # numeric_only=True becuase pandas does the same + return self._query_compiler.groupby( + by=self._by, + pd_aggs=["max"], + dropna=self._dropna, + numeric_only=numeric_only, + ) + + def nunique(self): + return self._query_compiler.groupby( + by=self._by, + pd_aggs=["nunique"], + dropna=self._dropna, + numeric_only=False, + ) + + +class GroupByDataFrame(GroupBy): + """ + This holds all the groupby methods for DataFrame + + Parameters + ---------- + by: + List of columns to groupby + query_compiler: + Query compiler object + dropna: + default is true, drop None/NaT/NaN values while grouping + + """ + + def __init__( + self, + by: Union[str, List[str], None] = None, + query_compiler: Optional["QueryCompiler"] = None, + dropna: bool = True, + ) -> None: + super().__init__(by=by, query_compiler=query_compiler, dropna=dropna) + + def aggregate(self, func: Union[str, List[str]], numeric_only: bool = False): + """ + Used to groupby and aggregate + + Parameters + ---------- + func: + Functions to use for aggregating the data. + + Accepted combinations are: + - function + - list of functions + TODO Implement other functions present in pandas groupby + numeric_only: {True, False, None} Default is None + Which datatype to be returned + - True: returns all values with float64, NaN/NaT are ignored. + - False: returns all values with float64. + - None: returns all values with default datatype. + """ + # numeric_only is by default False because pandas does the same + return self._query_compiler.groupby( + by=self._by, + pd_aggs=([func] if isinstance(func, str) else func), + dropna=self._dropna, + numeric_only=numeric_only, + is_agg=True, + ) + + agg = aggregate diff --git a/eland/operations.py b/eland/operations.py index cd48ef5c..3af8d768 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -16,12 +16,12 @@ # under the License. import copy -import typing import warnings -from typing import Optional, Tuple, List, Dict, Any +from typing import Generator, Optional, Tuple, List, Dict, Any, TYPE_CHECKING, Union import numpy as np import pandas as pd +from collections import defaultdict from elasticsearch.helpers import scan from eland.index import Index @@ -31,6 +31,7 @@ DEFAULT_ES_MAX_RESULT_WINDOW, elasticsearch_date_to_pandas_date, build_pd_series, + DEFAULT_PAGINATION_SIZE, ) from eland.query import Query from eland.actions import SortFieldAction @@ -45,8 +46,9 @@ SizeTask, ) -if typing.TYPE_CHECKING: +if TYPE_CHECKING: from eland.query_compiler import QueryCompiler + from eland.field_mappings import Field class QueryParams: @@ -185,10 +187,29 @@ def aggs(self, query_compiler, pd_aggs, numeric_only=None) -> pd.DataFrame: def _metric_aggs( self, query_compiler: "QueryCompiler", - pd_aggs, + pd_aggs: List[str], numeric_only: Optional[bool] = None, is_dataframe_agg: bool = False, - ) -> Dict: + ) -> Dict[str, Any]: + """ + Used to calculate metric aggregations + https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics.html + + Parameters + ---------- + query_compiler: + Query Compiler object + pd_aggs: + aggregations that are to be performed on dataframe or series + numeric_only: + return either all numeric values or NaN/NaT + is_dataframe_agg: + know if this method is called from single-agg or aggreagation method + + Returns + ------- + A dictionary which contains all aggregations calculated. + """ query_params, post_processing = self._resolve_tasks(query_compiler) size = self._size(query_params, post_processing) @@ -197,7 +218,6 @@ def _metric_aggs( f"Can not count field matches if size is set {size}" ) - results = {} fields = query_compiler._mappings.all_source_fields() if numeric_only: # Consider if field is Int/Float/Bool @@ -239,95 +259,15 @@ def _metric_aggs( sum 8.204365e+06 9.261629e+07 5.754909e+07 618150 min 1.000205e+02 0.000000e+00 0.000000e+00 0 """ - for field in fields: - values = [] - for es_agg, pd_agg in zip(es_aggs, pd_aggs): - # is_dataframe_agg is used to differentiate agg() and an aggregation called through .mean() - # If the field and agg aren't compatible we add a NaN/NaT for agg - # If the field and agg aren't compatible we don't add NaN/NaT for an aggregation called through .mean() - if not field.is_es_agg_compatible(es_agg): - if is_dataframe_agg and not numeric_only: - values.append(field.nan_value) - elif not is_dataframe_agg and numeric_only is False: - values.append(field.nan_value) - # Explicit condition for mad to add NaN because it doesn't support bool - elif is_dataframe_agg and numeric_only: - if pd_agg == "mad": - values.append(field.nan_value) - continue - - if isinstance(es_agg, tuple): - agg_value = response["aggregations"][ - f"{es_agg[0]}_{field.es_field_name}" - ] - - # Pull multiple values from 'percentiles' result. - if es_agg[0] == "percentiles": - agg_value = agg_value["values"] - - agg_value = agg_value[es_agg[1]] - - # Need to convert 'Population' stddev and variance - # from Elasticsearch into 'Sample' stddev and variance - # which is what pandas uses. - if es_agg[1] in ("std_deviation", "variance"): - # Neither transformation works with count <=1 - count = response["aggregations"][ - f"{es_agg[0]}_{field.es_field_name}" - ]["count"] - - # All of the below calculations result in NaN if count<=1 - if count <= 1: - agg_value = np.NaN - - elif es_agg[1] == "std_deviation": - agg_value *= count / (count - 1.0) - - else: # es_agg[1] == "variance" - # sample_std=\sqrt{\frac{1}{N-1}\sum_{i=1}^N(x_i-\bar{x})^2} - # population_std=\sqrt{\frac{1}{N}\sum_{i=1}^N(x_i-\bar{x})^2} - # sample_std=\sqrt{\frac{N}{N-1}population_std} - agg_value = np.sqrt( - (count / (count - 1.0)) * agg_value * agg_value - ) - else: - agg_value = response["aggregations"][ - f"{es_agg}_{field.es_field_name}" - ]["value"] - - # Null usually means there were no results. - if agg_value is None or np.isnan(agg_value): - if is_dataframe_agg and not numeric_only: - agg_value = np.NaN - elif not is_dataframe_agg and numeric_only is False: - agg_value = np.NaN - - # Cardinality is always either NaN or integer. - elif pd_agg == "nunique": - agg_value = int(agg_value) - - # If this is a non-null timestamp field convert to a pd.Timestamp() - elif field.is_timestamp: - agg_value = elasticsearch_date_to_pandas_date( - agg_value, field.es_date_format - ) - # If numeric_only is False | None then maintain column datatype - elif not numeric_only: - # we're only converting to bool for lossless aggs like min, max, and median. - if pd_agg in {"max", "min", "median", "sum"}: - # 'sum' isn't representable with bool, use int64 - if pd_agg == "sum" and field.is_bool: - agg_value = np.int64(agg_value) - else: - agg_value = field.np_dtype.type(agg_value) - - values.append(agg_value) - # If numeric_only is True and We only have a NaN type field then we check for empty. - if values: - results[field.index] = values if len(values) > 1 else values[0] - - return results + return self._calculate_single_agg( + fields=fields, + es_aggs=es_aggs, + pd_aggs=pd_aggs, + response=response, + numeric_only=numeric_only, + is_dataframe_agg=is_dataframe_agg, + ) def _terms_aggs(self, query_compiler, func, es_size=None): """ @@ -464,6 +404,324 @@ def _hist_aggs(self, query_compiler, num_bins): df_weights = pd.DataFrame(data=weights) return df_bins, df_weights + def _calculate_single_agg( + self, + fields: List["Field"], + es_aggs: Union[List[str], List[Tuple[str, str]]], + pd_aggs: List[str], + response: Dict[str, Any], + numeric_only: Optional[bool], + is_dataframe_agg: bool = False, + ): + """ + This method is used to calculate single agg calculations. + Common for both metric aggs and groupby aggs + + Parameters + ---------- + fields: + a list of Field Mappings + es_aggs: + Eland Equivalent of aggs + pd_aggs: + a list of aggs + response: + a dict containing response from Elastic Search + numeric_only: + return either numeric values or NaN/NaT + + Returns + ------- + a dictionary on which agg caluculations are done. + """ + + results: Dict[str, Any] = {} + for field in fields: + values = [] + for es_agg, pd_agg in zip(es_aggs, pd_aggs): + # is_dataframe_agg is used to differentiate agg() and an aggregation called through .mean() + # If the field and agg aren't compatible we add a NaN/NaT for agg + # If the field and agg aren't compatible we don't add NaN/NaT for an aggregation called through .mean() + if not field.is_es_agg_compatible(es_agg): + if is_dataframe_agg and not numeric_only: + values.append(field.nan_value) + elif not is_dataframe_agg and numeric_only is False: + values.append(field.nan_value) + # Explicit condition for mad to add NaN because it doesn't support bool + elif is_dataframe_agg and numeric_only: + if pd_agg == "mad": + values.append(field.nan_value) + else: + if isinstance(es_agg, tuple): + agg_value = response["aggregations"][ + f"{es_agg[0]}_{field.es_field_name}" + ] + + # Pull multiple values from 'percentiles' result. + if es_agg[0] == "percentiles": + agg_value = agg_value["values"] + + agg_value = agg_value[es_agg[1]] + + # Need to convert 'Population' stddev and variance + # from Elasticsearch into 'Sample' stddev and variance + # which is what pandas uses. + if es_agg[1] in ("std_deviation", "variance"): + # Neither transformation works with count <=1 + count = response["aggregations"][ + f"{es_agg[0]}_{field.es_field_name}" + ]["count"] + + # All of the below calculations result in NaN if count<=1 + if count <= 1: + agg_value = np.NaN + + elif es_agg[1] == "std_deviation": + agg_value *= count / (count - 1.0) + + else: # es_agg[1] == "variance" + # sample_std=\sqrt{\frac{1}{N-1}\sum_{i=1}^N(x_i-\bar{x})^2} + # population_std=\sqrt{\frac{1}{N}\sum_{i=1}^N(x_i-\bar{x})^2} + # sample_std=\sqrt{\frac{N}{N-1}population_std} + agg_value = np.sqrt( + (count / (count - 1.0)) * agg_value * agg_value + ) + else: + agg_value = response["aggregations"][ + f"{es_agg}_{field.es_field_name}" + ]["value"] + + # Null usually means there were no results. + if agg_value is None or np.isnan(agg_value): + if is_dataframe_agg and not numeric_only: + agg_value = np.NaN + elif not is_dataframe_agg and numeric_only is False: + agg_value = np.NaN + + # Cardinality is always either NaN or integer. + elif pd_agg == "nunique": + agg_value = int(agg_value) + + # If this is a non-null timestamp field convert to a pd.Timestamp() + elif field.is_timestamp: + agg_value = elasticsearch_date_to_pandas_date( + agg_value, field.es_date_format + ) + # If numeric_only is False | None then maintain column datatype + elif not numeric_only: + # we're only converting to bool for lossless aggs like min, max, and median. + if pd_agg in {"max", "min", "median", "sum"}: + # 'sum' isn't representable with bool, use int64 + if pd_agg == "sum" and field.is_bool: + agg_value = np.int64(agg_value) + else: + agg_value = field.np_dtype.type(agg_value) + + values.append(agg_value) + # If numeric_only is True and We only have a NaN type field then we check for empty. + if values: + results[field.index] = values if len(values) > 1 else values[0] + + return results + + def groupby( + self, + query_compiler: "QueryCompiler", + by: Union[str, List[str]], + pd_aggs: List[str], + dropna: bool = True, + is_agg: bool = False, + numeric_only: bool = True, + ) -> pd.DataFrame: + """ + This method is used to construct groupby dataframe + + Parameters + ---------- + query_compiler: + A Query compiler + by: + a list of columns on which groupby operations have to be performed + pd_aggs: + a list of aggregations to be performed + dropna: + Drop None values if True. + TODO Not yet implemented + is_agg: + Know if groupby aggregation or single agg is called. + numeric_only: + return either numeric values or NaN/NaT + + Returns + ------- + A dataframe which consists groupby data + """ + headers, results = self._groupby_aggs( + query_compiler, + by=by, + pd_aggs=pd_aggs, + dropna=dropna, + is_agg=is_agg, + numeric_only=numeric_only, + ) + agg_df = pd.DataFrame(results, columns=results.keys()).set_index(by) + if is_agg: + # Convert header columns to MultiIndex + agg_df.columns = pd.MultiIndex.from_product([headers, pd_aggs]) + + return agg_df + + def _groupby_aggs( + self, + query_compiler: "QueryCompiler", + by: Union[str, List[str]], + pd_aggs: List[str], + dropna: bool = True, + is_agg: bool = False, + numeric_only: bool = True, + ) -> Tuple[List[str], Dict[str, Any]]: + """ + This method is used to calculate groupby aggregations + + Parameters + ---------- + query_compiler: + A Query compiler + by: + a list of columns on which groupby operations have to be performed + pd_aggs: + a list of aggregations to be performed + dropna: + Drop None values if True. + TODO Not yet implemented + is_agg: + Know if groupby aggregation or single agg is called. + numeric_only: + return either numeric values or NaN/NaT + + Returns + ------- + headers: columns on which MultiIndex has to be applied + response: dictionary of groupby aggregated values + """ + query_params, post_processing = self._resolve_tasks(query_compiler) + + size = self._size(query_params, post_processing) + if size is not None: + raise NotImplementedError( + f"Can not count field matches if size is set {size}" + ) + + by, fields = query_compiler._mappings.groupby_source_fields(by=by) + + # Used defaultdict to avoid initialization of columns with lists + response: Dict[str, List[Any]] = defaultdict(list) + + if numeric_only: + fields = [field for field in fields if (field.is_numeric or field.is_bool)] + + body = Query(query_params.query) + + # Convert pandas aggs to ES equivalent + es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs) + + # Construct Query + for b in by: + # groupby fields will be termed aggregations + body.term_aggs(f"groupby_{b.index}", b.index) + + for field in fields: + for es_agg in es_aggs: + if not field.is_es_agg_compatible(es_agg): + continue + + # If we have multiple 'extended_stats' etc. here we simply NOOP on 2nd call + if isinstance(es_agg, tuple): + body.metric_aggs( + f"{es_agg[0]}_{field.es_field_name}", + es_agg[0], + field.aggregatable_es_field_name, + ) + else: + body.metric_aggs( + f"{es_agg}_{field.es_field_name}", + es_agg, + field.aggregatable_es_field_name, + ) + + # Composite aggregation + body.composite_agg( + size=DEFAULT_PAGINATION_SIZE, name="groupby_buckets", dropna=dropna + ) + + def response_generator() -> Generator[List[str], None, List[str]]: + """ + e.g. + "aggregations": { + "groupby_buckets": { + "after_key": {"total_quantity": 8}, + "buckets": [ + { + "key": {"total_quantity": 1}, + "doc_count": 87, + "taxful_total_price_avg": {"value": 48.035978536496216}, + } + ], + } + } + Returns + ------- + A generator which initially yields the bucket + If after_value if found use it to fetch the next set of buckets. + + """ + while True: + res = query_compiler._client.search( + index=query_compiler._index_pattern, + size=0, + body=body.to_search_body(), + ) + # Pagination Logic + if "after_key" in res["aggregations"]["groupby_buckets"]: + # yield the bucket which contains the result + yield res["aggregations"]["groupby_buckets"]["buckets"] + body.composite_agg( + after_key=res["aggregations"]["groupby_buckets"]["after_key"] + ) + else: + return res["aggregations"]["groupby_buckets"]["buckets"] + + for buckets in response_generator(): + # We recieve response row-wise + for bucket in buckets: + # groupby columns are added to result same way they are returned + for b in by: + agg_value = bucket["key"][f"groupby_{b.index}"] + # Convert to timestamp + if b.is_timestamp: + agg_value = elasticsearch_date_to_pandas_date( + agg_value, b.es_date_format + ) + response[b.index].append(agg_value) + + agg_calculation = self._calculate_single_agg( + fields=fields, + es_aggs=es_aggs, + pd_aggs=pd_aggs, + response={"aggregations": bucket}, + numeric_only=numeric_only, + is_dataframe_agg=is_agg, + ) + # Process the calculated agg values to response + for key, value in agg_calculation.items(): + if not is_agg: + response[key].append(value) + else: + for i in range(0, len(pd_aggs)): + response[f"{key}_{pd_aggs[i]}"].append(value[i]) + + return [field.index for field in fields], response + @staticmethod def _map_pd_aggs_to_es_aggs(pd_aggs): """ diff --git a/eland/query.py b/eland/query.py index 30f16b4c..e6d36fa4 100644 --- a/eland/query.py +++ b/eland/query.py @@ -17,7 +17,7 @@ import warnings from copy import deepcopy -from typing import Optional, Dict, List, Any +from typing import Optional, Dict, List, Any, Union from eland.filter import ( RandomScoreFilter, @@ -136,6 +136,84 @@ def metric_aggs(self, name: str, func: str, field: str) -> None: agg = {func: {"field": field}} self._aggs[name] = agg + def term_aggs(self, name: str, field: str) -> None: + """ + Add term agg e.g. + + "aggs": { + "name": { + "terms": { + "field": "AvgTicketPrice" + } + } + } + """ + agg = {"terms": {"field": field}} + self._aggs[name] = agg + + def composite_agg( + self, + size: int = 10000, + name: str = "groupby_buckets", + after_key: Union[Dict[str, Any], None] = None, + dropna: bool = True, + ) -> None: + """ + Add composite aggregation e.g. + https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-composite-aggregation.html + + "aggs": { + "groupby_buckets": { + "composite": { + "size": 10, + "sources": [ + {"total_quantity": {"terms": {"field": "total_quantity"}}} + ], + "after": {"total_quantity": 8}, + }, + "aggregations": { + "taxful_total_price_avg": { + "avg": {"field": "taxful_total_price"} + } + }, + } + } + + Parameters + ---------- + size: int + Pagination size. + name: str + Name of the buckets + after_key: str + After key to fetch next bunch of results + dropna: bool + Drop None values if True. + TODO Not yet implemented + + """ + + if after_key is not None: + self._aggs[name]["composite"]["after"] = after_key + else: + sources: List[Dict[str, Dict[str, str]]] = [] + aggregations: Dict[str, Dict[str, str]] = {} + + for _name, agg in self._aggs.items(): + if agg.get("terms"): + if not dropna: + agg["terms"]["missing_bucket"] = "true" + sources.append({_name: agg}) + else: + aggregations[_name] = agg + + agg = { + "composite": {"size": size, "sources": sources}, + "aggregations": aggregations, + } + self._aggs.clear() + self._aggs[name] = agg + def hist_aggs( self, name: str, diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 60309f7f..47883059 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -17,7 +17,7 @@ import copy from datetime import datetime -from typing import Optional, TYPE_CHECKING, List +from typing import Dict, Optional, TYPE_CHECKING, List, Union import numpy as np import pandas as pd @@ -541,6 +541,16 @@ def nunique(self): self, ["nunique"], numeric_only=False ) + def groupby( + self, + by: Union[str, List[str], Dict[str, str]], + pd_aggs: List[str] = [], + dropna: bool = True, + is_agg: bool = False, + numeric_only: bool = True, + ) -> pd.DataFrame: + return self._operations.groupby(self, by, pd_aggs, dropna, is_agg, numeric_only) + def value_counts(self, es_size): return self._operations.value_counts(self, es_size) diff --git a/eland/tests/dataframe/test_groupby_pytest.py b/eland/tests/dataframe/test_groupby_pytest.py new file mode 100644 index 00000000..bc9bb279 --- /dev/null +++ b/eland/tests/dataframe/test_groupby_pytest.py @@ -0,0 +1,52 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# File called _pytest for PyCharm compatability + +import pytest +from pandas.testing import assert_frame_equal +from eland.tests.common import TestData + + +class TestGroupbyDataFrame(TestData): + funcs = ["max", "min", "mean", "sum"] + extended_funcs = ["median", "mad", "var", "std"] + filter_data = [ + "AvgTicketPrice", + "Cancelled", + "dayOfWeek", + "timestamp", + "DestCountry", + ] + + @pytest.mark.parametrize("numeric_only", [True]) + def test_groupby_aggregate(self, numeric_only): + # TODO numeric_only False and None + # TODO Add more tests + pd_flights = self.pd_flights().filter(self.filter_data) + ed_flights = self.ed_flights().filter(self.filter_data) + + pd_groupby = pd_flights.groupby("Cancelled").agg(self.funcs, numeric_only) + ed_groupby = ed_flights.groupby("Cancelled").agg(self.funcs, numeric_only) + + # checking only values because dtypes are checked in other tests + assert_frame_equal(pd_groupby, ed_groupby, check_exact=False, check_dtype=False) + + def test_groupby_single_agg(self): + # Write tests when grouped is implemented in eland. + # Should write single agg tests + pass