Skip to content

Commit

Permalink
Add groupby to dataframe
Browse files Browse the repository at this point in the history
  • Loading branch information
V1NAY8 committed Oct 4, 2020
1 parent 4d96ad3 commit 66f7d13
Show file tree
Hide file tree
Showing 8 changed files with 779 additions and 99 deletions.
1 change: 1 addition & 0 deletions eland/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
DEFAULT_CSV_BATCH_OUTPUT_SIZE = 10000
DEFAULT_PROGRESS_REPORTING_NUM_ROWS = 10000
DEFAULT_ES_MAX_RESULT_WINDOW = 10000 # index.max_result_window
DEFAULT_PAGINATION_SIZE = 10000 # for composite aggregations


with warnings.catch_warnings():
Expand Down
68 changes: 67 additions & 1 deletion eland/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import warnings
from io import StringIO
import re
from typing import Optional, Sequence, Union, Tuple, List
from typing import Dict, List, Optional, Sequence, Union, Tuple

import numpy as np
import pandas as pd
Expand All @@ -39,6 +39,7 @@
from eland.common import DEFAULT_NUM_ROWS_DISPLAYED, docstring_parameter
from eland.filter import BooleanFilter
from eland.utils import deprecated_api, is_valid_attr_name
from eland.groupby import GroupByDataFrame


class DataFrame(NDFrame):
Expand Down Expand Up @@ -1430,6 +1431,71 @@ def aggregate(

hist = gfx.ed_hist_frame

def groupby(
self, by: Union[List[str], Dict[str, str], None] = None, dropna: bool = True
) -> "GroupByDataFrame":
"""
Used to perform groupby operations
Parameters
----------
by: List[str]
Returns
-------
GroupByDataFrame
Examples
--------
>>> ed_flights = df = ed.DataFrame('localhost', 'flights', columns=['AvgTicketPrice', 'DistanceKilometers', 'timestamp', 'DestCountry'])
>>> ed_flights.groupby(["DestCountry","Cancelled"]).agg(["min", "max"],True) # doctest: +SKIP
AvgTicketPrice dayOfWeek
min max min max
DestCountry Cancelled
AE False 110.799911 1126.148682 0.0 6.0
True 132.443756 817.931030 0.0 6.0
AR False 125.589394 1199.642822 0.0 6.0
True 251.389603 1172.382568 0.0 6.0
AT False 100.020531 1181.835815 0.0 6.0
... ... ... ... ...
TR True 307.915649 307.915649 0.0 0.0
US False 100.145966 1199.729004 0.0 6.0
True 102.153069 1192.429932 0.0 6.0
ZA False 102.002663 1196.186157 0.0 6.0
True 121.280296 1175.709961 0.0 6.0
[63 rows x 4 columns]
>>> ed_flights.groupby(["DestCountry","Cancelled"]).mean(True) # doctest: +SKIP
AvgTicketPrice dayOfWeek
DestCountry Cancelled
AE False 643.956793 2.717949
True 388.828809 2.571429
AR False 673.551677 2.746154
True 682.197241 2.733333
AT False 647.158290 2.819936
... ... ...
TR True 307.915649 0.000000
US False 598.063146 2.752014
True 579.799066 2.767068
ZA False 636.998605 2.738589
True 677.794078 2.928571
[63 rows x 2 columns]
"""
if by is None:
raise TypeError("by parameter should be specified for groupby")
if isinstance(by, str):
if by not in self._query_compiler.columns:
raise KeyError(f"Requested column [{by}] is not in the DataFrame.")
by = [by]
if isinstance(by, list):
if set(by) - set(self._query_compiler.columns):
raise KeyError("Requested column/s not in the DataFrame.")

return GroupByDataFrame(
by=by, query_compiler=self._query_compiler, dropna=dropna
)

def query(self, expr) -> "DataFrame":
"""
Query the columns of a DataFrame with a boolean expression.
Expand Down
40 changes: 38 additions & 2 deletions eland/field_mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
Mapping,
Dict,
Any,
Tuple,
TYPE_CHECKING,
List,
Set,
Union,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -697,14 +699,48 @@ def numeric_source_fields(self):
pd_dtypes, es_field_names, es_date_formats = self.metric_source_fields()
return es_field_names

def all_source_fields(self):
source_fields = []
def all_source_fields(self) -> List[Field]:
"""
This method is used to return all Field Mappings for fields
Returns
-------
a list of Field Mappings
"""
source_fields: List[Field] = []
for index, row in self._mappings_capabilities.iterrows():
row = row.to_dict()
row["index"] = index
source_fields.append(Field(**row))
return source_fields

def groupby_source_fields(self, by: List[str]) -> Tuple[List[Field], List[Field]]:
"""
This method returns all Field Mappings for groupby and non-groupby fields
Parameters
----------
by:
a list of groupby fields
Returns
-------
a Tuple consisting of a list of field mappings for groupby and non-groupby fields
"""
groupby_fields: Union[List[Field], List[None]] = [None] * len(by)
aggregatable_fields: List[Field] = []
for index_name, row in self._mappings_capabilities.iterrows():
row = row.to_dict()
row["index"] = index_name
if index_name not in by:
aggregatable_fields.append(Field(**row))
else:
# Maintain groupby order as given input
groupby_fields[by.index(index_name)] = Field(**row)

return groupby_fields, aggregatable_fields

def metric_source_fields(self, include_bool=False, include_timestamp=False):
"""
Returns
Expand Down
179 changes: 179 additions & 0 deletions eland/groupby.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from abc import ABC
from typing import List, Optional, Union
from eland.query_compiler import QueryCompiler


class GroupBy(ABC):
"""
This holds all the groupby base methods
Parameters
----------
by:
List of columns to groupby
query_compiler:
Query compiler object
dropna:
default is true, drop None/NaT/NaN values while grouping
"""

def __init__(
self,
by: Union[str, List[str], None] = None,
query_compiler: Optional["QueryCompiler"] = None,
dropna: bool = True,
) -> None:
self._query_compiler: "QueryCompiler" = QueryCompiler(to_copy=query_compiler)
self._dropna: bool = dropna
self._by: Union[str, List[str]] = by

def mean(self, numeric_only: bool = True):
# numeric_only=True becuase pandas does the same
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["mean"],
dropna=self._dropna,
numeric_only=numeric_only,
)

def var(self, numeric_only: bool = True):
# numeric_only=True becuase pandas does the same
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["var"],
dropna=self._dropna,
numeric_only=numeric_only,
)

def std(self, numeric_only: bool = True):
# numeric_only=True becuase pandas does the same
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["std"],
dropna=self._dropna,
numeric_only=numeric_only,
)

def mad(self, numeric_only: bool = True):
# numeric_only=True becuase pandas does the same
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["mad"],
dropna=self._dropna,
numeric_only=numeric_only,
)

def median(self, numeric_only: bool = True):
# numeric_only=True becuase pandas does the same
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["median"],
dropna=self._dropna,
numeric_only=numeric_only,
)

def sum(self, numeric_only: bool = True):
# numeric_only=True becuase pandas does the same
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["sum"],
dropna=self._dropna,
numeric_only=numeric_only,
)

def min(self, numeric_only: bool = True):
# numeric_only=True becuase pandas does the same
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["min"],
dropna=self._dropna,
numeric_only=numeric_only,
)

def max(self, numeric_only: bool = True):
# numeric_only=True becuase pandas does the same
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["max"],
dropna=self._dropna,
numeric_only=numeric_only,
)

def nunique(self):
return self._query_compiler.groupby(
by=self._by,
pd_aggs=["nunique"],
dropna=self._dropna,
numeric_only=False,
)


class GroupByDataFrame(GroupBy):
"""
This holds all the groupby methods for DataFrame
Parameters
----------
by:
List of columns to groupby
query_compiler:
Query compiler object
dropna:
default is true, drop None/NaT/NaN values while grouping
"""

def __init__(
self,
by: Union[str, List[str], None] = None,
query_compiler: Optional["QueryCompiler"] = None,
dropna: bool = True,
) -> None:
super().__init__(by=by, query_compiler=query_compiler, dropna=dropna)

def aggregate(self, func: Union[str, List[str]], numeric_only: bool = False):
"""
Used to groupby and aggregate
Parameters
----------
func:
Functions to use for aggregating the data.
Accepted combinations are:
- function
- list of functions
TODO Implement other functions present in pandas groupby
numeric_only: {True, False, None} Default is None
Which datatype to be returned
- True: returns all values with float64, NaN/NaT are ignored.
- False: returns all values with float64.
- None: returns all values with default datatype.
"""
# numeric_only is by default False because pandas does the same
return self._query_compiler.groupby(
by=self._by,
pd_aggs=([func] if isinstance(func, str) else func),
dropna=self._dropna,
numeric_only=numeric_only,
is_agg=True,
)

agg = aggregate
Loading

0 comments on commit 66f7d13

Please sign in to comment.