From 3ac8a7a59fdde438573b942128ca8a8b7384a692 Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Tue, 31 Mar 2020 11:34:03 +0000 Subject: [PATCH 1/2] Adding ignore_index to pandas_to_eland Parameter is useful when adding multiple pd.DataFrame's to the same index. Also, updated test module to pandas.testing for 1.0.x compliance, --- eland/tests/common.py | 2 +- eland/tests/dataframe/test_utils_pytest.py | 34 ++++++++++++++++++++++ eland/utils.py | 19 ++++++++---- 3 files changed, 48 insertions(+), 7 deletions(-) diff --git a/eland/tests/common.py b/eland/tests/common.py index 7db8cdcd..d04fe13e 100644 --- a/eland/tests/common.py +++ b/eland/tests/common.py @@ -15,7 +15,7 @@ import os import pandas as pd -from pandas.util.testing import assert_frame_equal, assert_series_equal +from pandas.testing import assert_frame_equal, assert_series_equal import eland as ed diff --git a/eland/tests/dataframe/test_utils_pytest.py b/eland/tests/dataframe/test_utils_pytest.py index 2478fdd5..62063101 100644 --- a/eland/tests/dataframe/test_utils_pytest.py +++ b/eland/tests/dataframe/test_utils_pytest.py @@ -65,6 +65,40 @@ def test_generate_es_mappings(self): assert_pandas_eland_frame_equal(df, ed_df_head) + ES_TEST_CLIENT.indices.delete(index=index_name) + + def test_pandas_to_eland_ignore_index(self): + df = pd.DataFrame( + data={ + "A": np.random.rand(3), + "B": 1, + "C": "foo", + "D": pd.Timestamp("20190102"), + "E": [1.0, 2.0, 3.0], + "F": False, + "G": [1, 2, 3], + }, + index=["0", "1", "2"], + ) + + # Now create index + index_name = "test_pandas_to_eland_ignore_index" + + ed_df = ed.pandas_to_eland( + df, + ES_TEST_CLIENT, + index_name, + es_if_exists="replace", + es_refresh=True, + ignore_index=True, + ) + pd_df = ed.eland_to_pandas(ed_df) + + # Compare values excluding index + assert df.values.all() == pd_df.values.all() + + ES_TEST_CLIENT.indices.delete(index=index_name) + def test_eland_to_pandas_performance(self): # TODO quantify this ed.eland_to_pandas(self.ed_flights(), show_progress=True) diff --git a/eland/utils.py b/eland/utils.py index 1b8fb0e0..9fc921cf 100644 --- a/eland/utils.py +++ b/eland/utils.py @@ -58,6 +58,7 @@ def pandas_to_eland( es_dropna=False, es_geo_points=None, chunksize=None, + ignore_index=False, ): """ Append a pandas DataFrame to an Elasticsearch index. @@ -86,7 +87,10 @@ def pandas_to_eland( es_geo_points: list, default None List of columns to map to geo_point data type chunksize: int, default None - number of pandas.DataFrame rows to read before bulk index into Elasticsearch + Number of pandas.DataFrame rows to read before bulk index into Elasticsearch + ignore_index: bool, default 'False' + Ignore pandas.DataFrame.index when indexing into Elasticsearch? + If 'False' pandas.DataFrame.index fields will be used to populate Elasticsearch '_id' fields. Returns ------- @@ -185,16 +189,19 @@ def pandas_to_eland( actions = [] n = 0 for row in pd_df.iterrows(): - # Use index as _id - id = row[0] - if es_dropna: values = row[1].dropna().to_dict() else: values = row[1].to_dict() - # Use integer as id field for repeatable results - action = {"_index": es_dest_index, "_source": values, "_id": str(id)} + if ignore_index: + action = {"_index": es_dest_index, "_source": values} + else: + # Use index as _id + id = row[0] + + # Use integer as id field for repeatable results + action = {"_index": es_dest_index, "_source": values, "_id": str(id)} actions.append(action) From 93959b85bc5ee9ac6500fb4be03af0addf29db76 Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Tue, 31 Mar 2020 14:06:17 +0000 Subject: [PATCH 2/2] Resolving review comments --- eland/tests/dataframe/test_utils_pytest.py | 5 ++++- eland/utils.py | 14 +++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/eland/tests/dataframe/test_utils_pytest.py b/eland/tests/dataframe/test_utils_pytest.py index 62063101..a8963978 100644 --- a/eland/tests/dataframe/test_utils_pytest.py +++ b/eland/tests/dataframe/test_utils_pytest.py @@ -90,13 +90,16 @@ def test_pandas_to_eland_ignore_index(self): index_name, es_if_exists="replace", es_refresh=True, - ignore_index=True, + use_pandas_index_for_es_ids=False, ) pd_df = ed.eland_to_pandas(ed_df) # Compare values excluding index assert df.values.all() == pd_df.values.all() + # Ensure that index is populated by ES. + assert not (df.index == pd_df.index).any() + ES_TEST_CLIENT.indices.delete(index=index_name) def test_eland_to_pandas_performance(self): diff --git a/eland/utils.py b/eland/utils.py index 9fc921cf..c83e4106 100644 --- a/eland/utils.py +++ b/eland/utils.py @@ -58,7 +58,7 @@ def pandas_to_eland( es_dropna=False, es_geo_points=None, chunksize=None, - ignore_index=False, + use_pandas_index_for_es_ids=True, ): """ Append a pandas DataFrame to an Elasticsearch index. @@ -88,9 +88,9 @@ def pandas_to_eland( List of columns to map to geo_point data type chunksize: int, default None Number of pandas.DataFrame rows to read before bulk index into Elasticsearch - ignore_index: bool, default 'False' - Ignore pandas.DataFrame.index when indexing into Elasticsearch? - If 'False' pandas.DataFrame.index fields will be used to populate Elasticsearch '_id' fields. + use_pandas_index_for_es_ids: bool, default 'True' + * True: pandas.DataFrame.index fields will be used to populate Elasticsearch '_id' fields. + * False: Ignore pandas.DataFrame.index when indexing into Elasticsearch Returns ------- @@ -194,14 +194,14 @@ def pandas_to_eland( else: values = row[1].to_dict() - if ignore_index: - action = {"_index": es_dest_index, "_source": values} - else: + if use_pandas_index_for_es_ids: # Use index as _id id = row[0] # Use integer as id field for repeatable results action = {"_index": es_dest_index, "_source": values, "_id": str(id)} + else: + action = {"_index": es_dest_index, "_source": values} actions.append(action)