Skip to content

Commit

Permalink
Add mode() method to DataFrame and Series
Browse files Browse the repository at this point in the history
  • Loading branch information
V1NAY8 authored Jan 7, 2021
1 parent 27717ee commit 421d84f
Show file tree
Hide file tree
Showing 14 changed files with 320 additions and 16 deletions.
6 changes: 6 additions & 0 deletions docs/sphinx/reference/api/eland.DataFrame.mode.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
eland.DataFrame.mode
====================

.. currentmodule:: eland

.. automethod:: DataFrame.mode
6 changes: 6 additions & 0 deletions docs/sphinx/reference/api/eland.Series.mode.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
eland.Series.mode
====================

.. currentmodule:: eland

.. automethod:: Series.mode
1 change: 1 addition & 0 deletions docs/sphinx/reference/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ Computations / Descriptive Stats
DataFrame.var
DataFrame.sum
DataFrame.nunique
DataFrame.mode

Reindexing / Selection / Label Manipulation
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
1 change: 1 addition & 0 deletions docs/sphinx/reference/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Computations / Descriptive Stats
Series.var
Series.nunique
Series.value_counts
Series.mode

Reindexing / Selection / Label Manipulation
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
62 changes: 62 additions & 0 deletions eland/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,68 @@ def groupby(
by=by, query_compiler=self._query_compiler.copy(), dropna=dropna
)

def mode(
self,
numeric_only: bool = False,
dropna: bool = True,
es_size: int = 10,
) -> pd.DataFrame:
"""
Calculate mode of a DataFrame
Parameters
----------
numeric_only: {True, False} Default is False
Which datatype to be returned
- True: Returns all numeric or timestamp columns
- False: Returns all columns
dropna: {True, False} Default is True
- True: Don’t consider counts of NaN/NaT.
- False: Consider counts of NaN/NaT.
es_size: default 10
number of rows to be returned if mode has multiple values
See Also
--------
:pandas_api_docs:`pandas.DataFrame.mode`
Examples
--------
>>> ed_ecommerce = ed.DataFrame('localhost', 'ecommerce')
>>> ed_df = ed_ecommerce.filter(["total_quantity", "geoip.city_name", "customer_birth_date", "day_of_week", "taxful_total_price"])
>>> ed_df.mode(numeric_only=False)
total_quantity geoip.city_name customer_birth_date day_of_week taxful_total_price
0 2 New York NaT Thursday 53.98
>>> ed_df.mode(numeric_only=True)
total_quantity taxful_total_price
0 2 53.98
>>> ed_df = ed_ecommerce.filter(["products.tax_amount","order_date"])
>>> ed_df.mode()
products.tax_amount order_date
0 0.0 2016-12-02 20:36:58
1 NaN 2016-12-04 23:44:10
2 NaN 2016-12-08 06:21:36
3 NaN 2016-12-08 09:38:53
4 NaN 2016-12-12 11:38:24
5 NaN 2016-12-12 19:46:34
6 NaN 2016-12-14 18:00:00
7 NaN 2016-12-15 11:38:24
8 NaN 2016-12-22 19:39:22
9 NaN 2016-12-24 06:21:36
>>> ed_df.mode(es_size = 3)
products.tax_amount order_date
0 0.0 2016-12-02 20:36:58
1 NaN 2016-12-04 23:44:10
2 NaN 2016-12-08 06:21:36
"""
# TODO dropna=False
return self._query_compiler.mode(
numeric_only=numeric_only, dropna=True, is_dataframe=True, es_size=es_size
)

def query(self, expr) -> "DataFrame":
"""
Query the columns of a DataFrame with a boolean expression.
Expand Down
8 changes: 6 additions & 2 deletions eland/field_mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,13 @@ def is_es_agg_compatible(self, es_agg) -> bool:
# Except "median_absolute_deviation" which doesn't support bool
if es_agg == "median_absolute_deviation" and self.is_bool:
return False
# Cardinality and Count work for all types
# Cardinality, Count and mode work for all types
# Numerics and bools work for all aggs
if es_agg in ("cardinality", "value_count") or self.is_numeric or self.is_bool:
if (
es_agg in {"cardinality", "value_count", "mode"}
or self.is_numeric
or self.is_bool
):
return True
# Timestamps also work for 'min', 'max' and 'avg'
if es_agg in {"min", "max", "avg", "percentiles"} and self.is_timestamp:
Expand Down
3 changes: 3 additions & 0 deletions eland/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,3 +617,6 @@ def count(self) -> "pd.DataFrame":
numeric_only=False,
is_dataframe_agg=False,
)

def mode(self) -> None:
raise NotImplementedError("Currently mode is not supported for groupby")
121 changes: 111 additions & 10 deletions eland/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def _metric_agg_series(
dtype = "object"
return build_pd_series(results, index=results.keys(), dtype=dtype)

def value_counts(self, query_compiler, es_size):
def value_counts(self, query_compiler: "QueryCompiler", es_size: int) -> pd.Series:
return self._terms_aggs(query_compiler, "terms", es_size)

def hist(self, query_compiler, bins):
Expand All @@ -195,12 +195,54 @@ def aggs(self, query_compiler, pd_aggs, numeric_only=None) -> pd.DataFrame:
results, index=pd_aggs, dtype=(np.float64 if numeric_only else None)
)

def mode(
self,
query_compiler: "QueryCompiler",
pd_aggs: List[str],
is_dataframe: bool,
es_size: int,
numeric_only: bool = False,
dropna: bool = True,
) -> Union[pd.DataFrame, pd.Series]:

results = self._metric_aggs(
query_compiler,
pd_aggs=pd_aggs,
numeric_only=numeric_only,
dropna=dropna,
es_mode_size=es_size,
)

pd_dict: Dict[str, Any] = {}
row_diff: Optional[int] = None

if is_dataframe:
# If multiple values of mode is returned for a particular column
# find the maximum length and use that to fill dataframe with NaN/NaT
rows_len = max([len(value) for value in results.values()])
for key, values in results.items():
row_diff = rows_len - len(values)
# Convert np.ndarray to list
values = list(values)
if row_diff:
if isinstance(values[0], pd.Timestamp):
values.extend([pd.NaT] * row_diff)
else:
values.extend([np.NaN] * row_diff)
pd_dict[key] = values

return pd.DataFrame(pd_dict)
else:
return pd.DataFrame(results.values()).iloc[0].rename()

def _metric_aggs(
self,
query_compiler: "QueryCompiler",
pd_aggs: List[str],
numeric_only: Optional[bool] = None,
is_dataframe_agg: bool = False,
es_mode_size: Optional[int] = None,
dropna: bool = True,
) -> Dict[str, Any]:
"""
Used to calculate metric aggregations
Expand All @@ -216,6 +258,10 @@ def _metric_aggs(
return either all numeric values or NaN/NaT
is_dataframe_agg:
know if this method is called from single-agg or aggreagation method
es_mode_size:
number of rows to return when multiple mode values are present.
dropna:
drop NaN/NaT for a dataframe
Returns
-------
Expand Down Expand Up @@ -252,6 +298,15 @@ def _metric_aggs(
es_agg[0],
field.aggregatable_es_field_name,
)
elif es_agg == "mode":
# TODO for dropna=False, Check If field is timestamp or boolean or numeric,
# then use missing parameter for terms aggregation.
body.terms_aggs(
f"{es_agg}_{field.es_field_name}",
"terms",
field.aggregatable_es_field_name,
es_mode_size,
)
else:
body.metric_aggs(
f"{es_agg}_{field.es_field_name}",
Expand Down Expand Up @@ -280,7 +335,9 @@ def _metric_aggs(
is_dataframe_agg=is_dataframe_agg,
)

def _terms_aggs(self, query_compiler, func, es_size=None):
def _terms_aggs(
self, query_compiler: "QueryCompiler", func: str, es_size: int
) -> pd.Series:
"""
Parameters
----------
Expand Down Expand Up @@ -499,13 +556,43 @@ def _unpack_metric_aggs(
agg_value = np.sqrt(
(count / (count - 1.0)) * agg_value * agg_value
)
elif es_agg == "mode":
# For terms aggregation buckets are returned
# agg_value will be of type list
agg_value = response["aggregations"][
f"{es_agg}_{field.es_field_name}"
]["buckets"]
else:
agg_value = response["aggregations"][
f"{es_agg}_{field.es_field_name}"
]["value"]

if isinstance(agg_value, list):
# include top-terms in the result.
if not agg_value:
# If the all the documents for a field are empty
agg_value = [field.nan_value]
else:
max_doc_count = agg_value[0]["doc_count"]
# We need only keys which are equal to max_doc_count
# lesser values are ignored
agg_value = [
item["key"]
for item in agg_value
if item["doc_count"] == max_doc_count
]

# Maintain datatype by default because pandas does the same
# text are returned as-is
if field.is_bool or field.is_numeric:
agg_value = [
field.np_dtype.type(value) for value in agg_value
]

# Null usually means there were no results.
if agg_value is None or np.isnan(agg_value):
if not isinstance(agg_value, list) and (
agg_value is None or np.isnan(agg_value)
):
if is_dataframe_agg and not numeric_only:
agg_value = np.NaN
elif not is_dataframe_agg and numeric_only is False:
Expand All @@ -517,13 +604,22 @@ def _unpack_metric_aggs(

# If this is a non-null timestamp field convert to a pd.Timestamp()
elif field.is_timestamp:
agg_value = elasticsearch_date_to_pandas_date(
agg_value, field.es_date_format
)
if isinstance(agg_value, list):
# convert to timestamp results for mode
agg_value = [
elasticsearch_date_to_pandas_date(
value, field.es_date_format
)
for value in agg_value
]
else:
agg_value = elasticsearch_date_to_pandas_date(
agg_value, field.es_date_format
)
# If numeric_only is False | None then maintain column datatype
elif not numeric_only:
# we're only converting to bool for lossless aggs like min, max, and median.
if pd_agg in {"max", "min", "median", "sum"}:
if pd_agg in {"max", "min", "median", "sum", "mode"}:
# 'sum' isn't representable with bool, use int64
if pd_agg == "sum" and field.is_bool:
agg_value = np.int64(agg_value)
Expand Down Expand Up @@ -791,10 +887,15 @@ def _map_pd_aggs_to_es_aggs(pd_aggs):
elif pd_agg == "median":
es_aggs.append(("percentiles", "50.0"))

# Not implemented
elif pd_agg == "mode":
# We could do this via top term
raise NotImplementedError(pd_agg, " not currently implemented")
if len(pd_aggs) != 1:
raise NotImplementedError(
"Currently mode is not supported in df.agg(...). Try df.mode()"
)
else:
es_aggs.append("mode")

# Not implemented
elif pd_agg == "quantile":
# TODO
raise NotImplementedError(pd_agg, " not currently implemented")
Expand Down
19 changes: 16 additions & 3 deletions eland/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,33 @@ def regexp(self, field: str, value: str) -> None:
else:
self._query = self._query & Rlike(field, value)

def terms_aggs(self, name: str, func: str, field: str, es_size: int) -> None:
def terms_aggs(
self,
name: str,
func: str,
field: str,
es_size: Optional[int] = None,
missing: Optional[Any] = None,
) -> None:
"""
Add terms agg e.g
"aggs": {
"name": {
"terms": {
"field": "Airline",
"size": 10
"size": 10,
"missing": "null"
}
}
}
"""
agg = {func: {"field": field, "size": es_size}}
agg = {func: {"field": field}}
if es_size:
agg[func]["size"] = str(es_size)

if missing:
agg[func]["missing"] = missing
self._aggs[name] = agg

def metric_aggs(self, name: str, func: str, field: str) -> None:
Expand Down
18 changes: 17 additions & 1 deletion eland/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,22 @@ def nunique(self):
self, ["nunique"], numeric_only=False
)

def mode(
self,
es_size: int,
numeric_only: bool = False,
dropna: bool = True,
is_dataframe: bool = True,
) -> Union[pd.DataFrame, pd.Series]:
return self._operations.mode(
self,
pd_aggs=["mode"],
numeric_only=numeric_only,
dropna=dropna,
is_dataframe=is_dataframe,
es_size=es_size,
)

def aggs_groupby(
self,
by: List[str],
Expand All @@ -638,7 +654,7 @@ def aggs_groupby(
numeric_only=numeric_only,
)

def value_counts(self, es_size):
def value_counts(self, es_size: int) -> pd.Series:
return self._operations.value_counts(self, es_size)

def es_info(self, buf):
Expand Down
Loading

0 comments on commit 421d84f

Please sign in to comment.