Skip to content

Commit

Permalink
Add iterrows() and itertuples() to DataFrame
Browse files Browse the repository at this point in the history
Co-authored-by: Seth Michael Larson <seth.larson@elastic.co>
  • Loading branch information
kxbin and sethmlarson authored Aug 20, 2021
1 parent e4f88a3 commit 1aa193d
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 1 deletion.
6 changes: 6 additions & 0 deletions docs/sphinx/reference/api/eland.DataFrame.iterrows.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
eland.DataFrame.iterrows
========================

.. currentmodule:: eland

.. automethod:: DataFrame.iterrows
6 changes: 6 additions & 0 deletions docs/sphinx/reference/api/eland.DataFrame.itertuples.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
eland.DataFrame.itertuples
==========================

.. currentmodule:: eland

.. automethod:: DataFrame.itertuples
2 changes: 2 additions & 0 deletions docs/sphinx/reference/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ Indexing, Iteration
DataFrame.get
DataFrame.query
DataFrame.sample
DataFrame.iterrows
DataFrame.itertuples

Function Application, GroupBy & Window
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
117 changes: 116 additions & 1 deletion eland/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import sys
import warnings
from io import StringIO
from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple, Union
from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Sequence, Tuple, Union

import numpy as np
import pandas as pd # type: ignore
Expand Down Expand Up @@ -1446,6 +1446,121 @@ def keys(self) -> pd.Index:
"""
return self.columns

def iterrows(self) -> Iterable[Tuple[Union[str, Tuple[str, ...]], pd.Series]]:
"""
Iterate over eland.DataFrame rows as (index, pandas.Series) pairs.
Yields
------
index: index
The index of the row.
data: pandas Series
The data of the row as a pandas Series.
See Also
--------
eland.DataFrame.itertuples: Iterate over eland.DataFrame rows as namedtuples.
Examples
--------
>>> df = ed.DataFrame('localhost:9200', 'flights', columns=['AvgTicketPrice', 'Cancelled']).head()
>>> df
AvgTicketPrice Cancelled
0 841.265642 False
1 882.982662 False
2 190.636904 False
3 181.694216 True
4 730.041778 False
<BLANKLINE>
[5 rows x 2 columns]
>>> for index, row in df.iterrows():
... print(row)
AvgTicketPrice 841.265642
Cancelled False
Name: 0, dtype: object
AvgTicketPrice 882.982662
Cancelled False
Name: 1, dtype: object
AvgTicketPrice 190.636904
Cancelled False
Name: 2, dtype: object
AvgTicketPrice 181.694216
Cancelled True
Name: 3, dtype: object
AvgTicketPrice 730.041778
Cancelled False
Name: 4, dtype: object
"""
for df in self._query_compiler.search_yield_pandas_dataframes():
yield from df.iterrows()

def itertuples(
self, index: bool = True, name: Union[str, None] = "Eland"
) -> Iterable[Tuple[Any, ...]]:
"""
Iterate over eland.DataFrame rows as namedtuples.
Args
----
index: bool, default True
If True, return the index as the first element of the tuple.
name: str or None, default "Eland"
The name of the returned namedtuples or None to return regular tuples.
Returns
-------
iterator
An object to iterate over namedtuples for each row in the
DataFrame with the first field possibly being the index and
following fields being the column values.
See Also
--------
eland.DataFrame.iterrows: Iterate over eland.DataFrame rows as (index, pandas.Series) pairs.
Examples
--------
>>> df = ed.DataFrame('localhost:9200', 'flights', columns=['AvgTicketPrice', 'Cancelled']).head()
>>> df
AvgTicketPrice Cancelled
0 841.265642 False
1 882.982662 False
2 190.636904 False
3 181.694216 True
4 730.041778 False
<BLANKLINE>
[5 rows x 2 columns]
>>> for row in df.itertuples():
... print(row)
Eland(Index='0', AvgTicketPrice=841.2656419677076, Cancelled=False)
Eland(Index='1', AvgTicketPrice=882.9826615595518, Cancelled=False)
Eland(Index='2', AvgTicketPrice=190.6369038508356, Cancelled=False)
Eland(Index='3', AvgTicketPrice=181.69421554118, Cancelled=True)
Eland(Index='4', AvgTicketPrice=730.041778346198, Cancelled=False)
By setting the `index` parameter to False we can remove the index as the first element of the tuple:
>>> for row in df.itertuples(index=False):
... print(row)
Eland(AvgTicketPrice=841.2656419677076, Cancelled=False)
Eland(AvgTicketPrice=882.9826615595518, Cancelled=False)
Eland(AvgTicketPrice=190.6369038508356, Cancelled=False)
Eland(AvgTicketPrice=181.69421554118, Cancelled=True)
Eland(AvgTicketPrice=730.041778346198, Cancelled=False)
With the `name` parameter set we set a custom name for the yielded namedtuples:
>>> for row in df.itertuples(name='Flight'):
... print(row)
Flight(Index='0', AvgTicketPrice=841.2656419677076, Cancelled=False)
Flight(Index='1', AvgTicketPrice=882.9826615595518, Cancelled=False)
Flight(Index='2', AvgTicketPrice=190.6369038508356, Cancelled=False)
Flight(Index='3', AvgTicketPrice=181.69421554118, Cancelled=True)
Flight(Index='4', AvgTicketPrice=730.041778346198, Cancelled=False)
"""
for df in self._query_compiler.search_yield_pandas_dataframes():
yield from df.itertuples(index=index, name=name)

def aggregate(
self,
func: Union[str, List[str]],
Expand Down
30 changes: 30 additions & 0 deletions eland/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,36 @@ def to_csv(
df = self._es_results(query_compiler, show_progress)
return df.to_csv(**kwargs) # type: ignore[no-any-return]

def search_yield_pandas_dataframes(
self, query_compiler: "QueryCompiler"
) -> Generator["pd.DataFrame", None, None]:
query_params, post_processing = self._resolve_tasks(query_compiler)

result_size, sort_params = Operations._query_params_to_size_and_sort(
query_params
)

script_fields = query_params.script_fields
query = Query(query_params.query)

body = query.to_search_body()
if script_fields is not None:
body["script_fields"] = script_fields

# Only return requested field_names and add them to body
_source = query_compiler.get_field_names(include_scripted_fields=False)
body["_source"] = _source if _source else False

if sort_params:
body["sort"] = [sort_params]

for hits in _search_yield_hits(
query_compiler=query_compiler, body=body, max_number_of_hits=result_size
):
df = query_compiler._es_results_to_pandas(hits)
df = self._apply_df_post_processing(df, post_processing)
yield df

def _es_results(
self, query_compiler: "QueryCompiler", show_progress: bool = False
) -> pd.DataFrame:
Expand Down
4 changes: 4 additions & 0 deletions eland/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
TYPE_CHECKING,
Any,
Dict,
Generator,
List,
Optional,
Sequence,
Expand Down Expand Up @@ -527,6 +528,9 @@ def to_csv(self, **kwargs) -> Optional[str]:
"""
return self._operations.to_csv(self, **kwargs)

def search_yield_pandas_dataframes(self) -> Generator["pd.DataFrame", None, None]:
return self._operations.search_yield_pandas_dataframes(self)

# __getitem__ methods
def getitem_column_array(self, key, numeric=False):
"""Get column data for target labels.
Expand Down
64 changes: 64 additions & 0 deletions tests/dataframe/test_iterrows_itertuples_pytest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# File called _pytest for PyCharm compatability

import pytest
from pandas.testing import assert_series_equal

from tests.common import TestData


class TestDataFrameIterrowsItertuples(TestData):
def test_iterrows(self):
ed_flights = self.ed_flights()
pd_flights = self.pd_flights()

ed_flights_iterrows = ed_flights.iterrows()
pd_flights_iterrows = pd_flights.iterrows()

for ed_index, ed_row in ed_flights_iterrows:
pd_index, pd_row = next(pd_flights_iterrows)

assert ed_index == pd_index
assert_series_equal(ed_row, pd_row)

# Assert that both are the same length and are exhausted.
with pytest.raises(StopIteration):
next(ed_flights_iterrows)
with pytest.raises(StopIteration):
next(pd_flights_iterrows)

def test_itertuples(self):
ed_flights = self.ed_flights()
pd_flights = self.pd_flights()

ed_flights_itertuples = list(ed_flights.itertuples(name=None))
pd_flights_itertuples = list(pd_flights.itertuples(name=None))

def assert_tuples_almost_equal(left, right):
# Shim which uses pytest.approx() for floating point values inside tuples.
assert len(left) == len(right)
assert all(
(lt == rt) # Not floats? Use ==
if not isinstance(lt, float) and not isinstance(rt, float)
else (lt == pytest.approx(rt)) # If both are floats use pytest.approx()
for lt, rt in zip(left, right)
)

for ed_tuple, pd_tuple in zip(ed_flights_itertuples, pd_flights_itertuples):
assert_tuples_almost_equal(ed_tuple, pd_tuple)

0 comments on commit 1aa193d

Please sign in to comment.