Skip to content

Commit

Permalink
Requested changes
Browse files Browse the repository at this point in the history
  • Loading branch information
V1NAY8 committed Oct 14, 2020
1 parent 19086f3 commit dd4e085
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 134 deletions.
2 changes: 1 addition & 1 deletion eland/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
DEFAULT_CSV_BATCH_OUTPUT_SIZE = 10000
DEFAULT_PROGRESS_REPORTING_NUM_ROWS = 10000
DEFAULT_ES_MAX_RESULT_WINDOW = 10000 # index.max_result_window
DEFAULT_PAGINATION_SIZE = 10000 # for composite aggregations
DEFAULT_PAGINATION_SIZE = 5000 # for composite aggregations


with warnings.catch_warnings():
Expand Down
22 changes: 11 additions & 11 deletions eland/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1447,10 +1447,10 @@ def groupby(
Examples
--------
>>> ed_flights = df = ed.DataFrame('localhost', 'flights', columns=['AvgTicketPrice', 'DistanceKilometers', 'timestamp', 'DestCountry'])
>>> ed_flights.groupby(["DestCountry","Cancelled"]).agg(["min", "max"],True) # doctest: +SKIP
AvgTicketPrice dayOfWeek
min max min max
>>> ed_flights = ed.DataFrame('localhost', 'flights', columns=["AvgTicketPrice", "Cancelled", "dayOfWeek", "timestamp", "DestCountry"])
>>> ed_flights.groupby(["DestCountry", "Cancelled"]).agg(["min", "max"], True)
AvgTicketPrice dayOfWeek
min max min max
DestCountry Cancelled
AE False 110.799911 1126.148682 0.0 6.0
True 132.443756 817.931030 0.0 6.0
Expand All @@ -1463,10 +1463,10 @@ def groupby(
True 102.153069 1192.429932 0.0 6.0
ZA False 102.002663 1196.186157 0.0 6.0
True 121.280296 1175.709961 0.0 6.0
<BLANKLINE>
[63 rows x 4 columns]
>>> ed_flights.groupby(["DestCountry","Cancelled"]).mean(True) # doctest: +SKIP
AvgTicketPrice dayOfWeek
>>> ed_flights.groupby(["DestCountry", "Cancelled"]).mean(True)
AvgTicketPrice dayOfWeek
DestCountry Cancelled
AE False 643.956793 2.717949
True 388.828809 2.571429
Expand All @@ -1479,18 +1479,18 @@ def groupby(
True 579.799066 2.767068
ZA False 636.998605 2.738589
True 677.794078 2.928571
<BLANKLINE>
[63 rows x 2 columns]
"""
if by is None:
raise TypeError("by parameter should be specified for groupby")
raise TypeError("by parameter should be specified to groupby")
if isinstance(by, str):
if by not in self._query_compiler.columns:
raise KeyError(f"Requested column [{by}] is not in the DataFrame.")
by = [by]
if isinstance(by, list):
if isinstance(by, (list, tuple)):
if set(by) - set(self._query_compiler.columns):
raise KeyError("Requested column/s not in the DataFrame.")
raise KeyError("Requested columns not in the DataFrame.")

return GroupByDataFrame(
by=by, query_compiler=self._query_compiler, dropna=dropna
Expand Down
10 changes: 5 additions & 5 deletions eland/field_mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
TYPE_CHECKING,
List,
Set,
Union,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -728,18 +727,19 @@ def groupby_source_fields(self, by: List[str]) -> Tuple[List[Field], List[Field]
a Tuple consisting of a list of field mappings for groupby and non-groupby fields
"""
groupby_fields: Union[List[Field], List[None]] = [None] * len(by)
groupby_fields: Dict[str, Field] = {}
# groupby_fields: Union[List[Field], List[None]] = [None] * len(by)
aggregatable_fields: List[Field] = []
for index_name, row in self._mappings_capabilities.iterrows():
row = row.to_dict()
row["index"] = index_name
if index_name not in by:
aggregatable_fields.append(Field(**row))
else:
# Maintain groupby order as given input
groupby_fields[by.index(index_name)] = Field(**row)
groupby_fields[index_name] = Field(**row)

return groupby_fields, aggregatable_fields
# Maintain groupby order as given input
return [groupby_fields[column] for column in by], aggregatable_fields

def metric_source_fields(self, include_bool=False, include_timestamp=False):
"""
Expand Down
14 changes: 4 additions & 10 deletions eland/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,6 @@ class GroupByDataFrame(GroupBy):
"""

def __init__(
self,
by: Union[str, List[str], None] = None,
query_compiler: Optional["QueryCompiler"] = None,
dropna: bool = True,
) -> None:
super().__init__(by=by, query_compiler=query_compiler, dropna=dropna)

def aggregate(self, func: Union[str, List[str]], numeric_only: bool = False):
"""
Used to groupby and aggregate
Expand All @@ -160,17 +152,19 @@ def aggregate(self, func: Union[str, List[str]], numeric_only: bool = False):
Accepted combinations are:
- function
- list of functions
TODO Implement other functions present in pandas groupby
TODO Implement other combinations present in pandas groupby
numeric_only: {True, False, None} Default is None
Which datatype to be returned
- True: returns all values with float64, NaN/NaT are ignored.
- False: returns all values with float64.
- None: returns all values with default datatype.
"""
if isinstance(func, str):
func = [func]
# numeric_only is by default False because pandas does the same
return self._query_compiler.groupby(
by=self._by,
pd_aggs=([func] if isinstance(func, str) else func),
pd_aggs=func,
dropna=self._dropna,
numeric_only=numeric_only,
is_agg=True,
Expand Down
163 changes: 87 additions & 76 deletions eland/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,17 @@

import copy
import warnings
from typing import Optional, Sequence, Tuple, List, Dict, Any
from typing import (
Generator,
Optional,
Sequence,
Tuple,
List,
Dict,
Any,
TYPE_CHECKING,
Union,
)

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -435,8 +445,8 @@ def _calculate_single_agg(
-------
a dictionary on which agg caluculations are done.
"""

results: Dict[str, Any] = {}

for field in fields:
values = []
for es_agg, pd_agg in zip(es_aggs, pd_aggs):
Expand All @@ -452,73 +462,75 @@ def _calculate_single_agg(
elif is_dataframe_agg and numeric_only:
if pd_agg == "mad":
values.append(field.nan_value)
else:
if isinstance(es_agg, tuple):
agg_value = response["aggregations"][
continue

if isinstance(es_agg, tuple):
agg_value = response["aggregations"][
f"{es_agg[0]}_{field.es_field_name}"
]

# Pull multiple values from 'percentiles' result.
if es_agg[0] == "percentiles":
agg_value = agg_value["values"]

agg_value = agg_value[es_agg[1]]

# Need to convert 'Population' stddev and variance
# from Elasticsearch into 'Sample' stddev and variance
# which is what pandas uses.
if es_agg[1] in ("std_deviation", "variance"):
# Neither transformation works with count <=1
count = response["aggregations"][
f"{es_agg[0]}_{field.es_field_name}"
]

# Pull multiple values from 'percentiles' result.
if es_agg[0] == "percentiles":
agg_value = agg_value["values"]

agg_value = agg_value[es_agg[1]]

# Need to convert 'Population' stddev and variance
# from Elasticsearch into 'Sample' stddev and variance
# which is what pandas uses.
if es_agg[1] in ("std_deviation", "variance"):
# Neither transformation works with count <=1
count = response["aggregations"][
f"{es_agg[0]}_{field.es_field_name}"
]["count"]

# All of the below calculations result in NaN if count<=1
if count <= 1:
agg_value = np.NaN

elif es_agg[1] == "std_deviation":
agg_value *= count / (count - 1.0)

else: # es_agg[1] == "variance"
# sample_std=\sqrt{\frac{1}{N-1}\sum_{i=1}^N(x_i-\bar{x})^2}
# population_std=\sqrt{\frac{1}{N}\sum_{i=1}^N(x_i-\bar{x})^2}
# sample_std=\sqrt{\frac{N}{N-1}population_std}
agg_value = np.sqrt(
(count / (count - 1.0)) * agg_value * agg_value
)
else:
agg_value = response["aggregations"][
f"{es_agg}_{field.es_field_name}"
]["value"]
]["count"]

# Null usually means there were no results.
if agg_value is None or np.isnan(agg_value):
if is_dataframe_agg and not numeric_only:
agg_value = np.NaN
elif not is_dataframe_agg and numeric_only is False:
# All of the below calculations result in NaN if count<=1
if count <= 1:
agg_value = np.NaN

# Cardinality is always either NaN or integer.
elif pd_agg == "nunique":
agg_value = int(agg_value)

# If this is a non-null timestamp field convert to a pd.Timestamp()
elif field.is_timestamp:
agg_value = elasticsearch_date_to_pandas_date(
agg_value, field.es_date_format
)
# If numeric_only is False | None then maintain column datatype
elif not numeric_only:
# we're only converting to bool for lossless aggs like min, max, and median.
if pd_agg in {"max", "min", "median", "sum"}:
# 'sum' isn't representable with bool, use int64
if pd_agg == "sum" and field.is_bool:
agg_value = np.int64(agg_value)
else:
agg_value = field.np_dtype.type(agg_value)

values.append(agg_value)
elif es_agg[1] == "std_deviation":
agg_value *= count / (count - 1.0)

else: # es_agg[1] == "variance"
# sample_std=\sqrt{\frac{1}{N-1}\sum_{i=1}^N(x_i-\bar{x})^2}
# population_std=\sqrt{\frac{1}{N}\sum_{i=1}^N(x_i-\bar{x})^2}
# sample_std=\sqrt{\frac{N}{N-1}population_std}
agg_value = np.sqrt(
(count / (count - 1.0)) * agg_value * agg_value
)
else:
agg_value = response["aggregations"][
f"{es_agg}_{field.es_field_name}"
]["value"]

# Null usually means there were no results.
if agg_value is None or np.isnan(agg_value):
if is_dataframe_agg and not numeric_only:
agg_value = np.NaN
elif not is_dataframe_agg and numeric_only is False:
agg_value = np.NaN

# Cardinality is always either NaN or integer.
elif pd_agg == "nunique":
agg_value = int(agg_value)

# If this is a non-null timestamp field convert to a pd.Timestamp()
elif field.is_timestamp:
agg_value = elasticsearch_date_to_pandas_date(
agg_value, field.es_date_format
)
# If numeric_only is False | None then maintain column datatype
elif not numeric_only:
# we're only converting to bool for lossless aggs like min, max, and median.
if pd_agg in {"max", "min", "median", "sum"}:
# 'sum' isn't representable with bool, use int64
if pd_agg == "sum" and field.is_bool:
agg_value = np.int64(agg_value)
else:
agg_value = field.np_dtype.type(agg_value)

values.append(agg_value)

# If numeric_only is True and We only have a NaN type field then we check for empty.
if values:
results[field.index] = values if len(values) > 1 else values[0]
Expand Down Expand Up @@ -565,7 +577,9 @@ def groupby(
is_agg=is_agg,
numeric_only=numeric_only,
)

agg_df = pd.DataFrame(results, columns=results.keys()).set_index(by)

if is_agg:
# Convert header columns to MultiIndex
agg_df.columns = pd.MultiIndex.from_product([headers, pd_aggs])
Expand Down Expand Up @@ -628,7 +642,7 @@ def _groupby_aggs(

# Construct Query
for b in by:
# groupby fields will be termed aggregations
# groupby fields will be term aggregations
body.term_aggs(f"groupby_{b.index}", b.index)

for field in fields:
Expand Down Expand Up @@ -673,7 +687,7 @@ def response_generator() -> Generator[List[str], None, List[str]]:
Returns
-------
A generator which initially yields the bucket
If after_value if found use it to fetch the next set of buckets.
If after_key is found, use it to fetch the next set of buckets.
"""
while True:
Expand All @@ -684,10 +698,13 @@ def response_generator() -> Generator[List[str], None, List[str]]:
)
# Pagination Logic
if "after_key" in res["aggregations"]["groupby_buckets"]:

# yield the bucket which contains the result
yield res["aggregations"]["groupby_buckets"]["buckets"]
body.composite_agg(
after_key=res["aggregations"]["groupby_buckets"]["after_key"]

body.composite_agg_after_key(
name="groupby_buckets",
after_key=res["aggregations"]["groupby_buckets"]["after_key"],
)
else:
return res["aggregations"]["groupby_buckets"]["buckets"]
Expand All @@ -697,13 +714,7 @@ def response_generator() -> Generator[List[str], None, List[str]]:
for bucket in buckets:
# groupby columns are added to result same way they are returned
for b in by:
agg_value = bucket["key"][f"groupby_{b.index}"]
# Convert to timestamp
if b.is_timestamp:
agg_value = elasticsearch_date_to_pandas_date(
agg_value, b.es_date_format
)
response[b.index].append(agg_value)
response[b.index].append(bucket["key"][f"groupby_{b.index}"])

agg_calculation = self._calculate_single_agg(
fields=fields,
Expand Down
Loading

0 comments on commit dd4e085

Please sign in to comment.