From 09fb54b6930dddbf6f821d343b7ecf13498cdc63 Mon Sep 17 00:00:00 2001 From: "P. Sai Vinay" Date: Mon, 5 Oct 2020 20:38:03 +0530 Subject: [PATCH 1/2] Improve efficiency of pandas_to_eland() using parallel_bulk() --- eland/etl.py | 71 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 42 insertions(+), 29 deletions(-) diff --git a/eland/etl.py b/eland/etl.py index 1700e90b..b6bab7e8 100644 --- a/eland/etl.py +++ b/eland/etl.py @@ -16,8 +16,8 @@ # under the License. import csv -from typing import Union, List, Tuple, Optional, Mapping, Dict, Any - +from typing import Generator, Union, List, Tuple, Optional, Mapping, Dict, Any +from collections import deque import pandas as pd # type: ignore from pandas.io.parsers import _c_parser_defaults # type: ignore @@ -26,7 +26,7 @@ from eland.common import ensure_es_client, DEFAULT_CHUNK_SIZE from eland.utils import deprecated_api from elasticsearch import Elasticsearch # type: ignore -from elasticsearch.helpers import bulk # type: ignore +from elasticsearch.helpers import parallel_bulk # type: ignore @deprecated_api("eland.DataFrame()") @@ -67,6 +67,7 @@ def pandas_to_eland( es_refresh: bool = False, es_dropna: bool = False, es_type_overrides: Optional[Mapping[str, str]] = None, + thread_count: int = 4, chunksize: Optional[int] = None, use_pandas_index_for_es_ids: bool = True, ) -> DataFrame: @@ -95,6 +96,8 @@ def pandas_to_eland( * False: Include missing values - may cause bulk to fail es_type_overrides: dict, default None Dict of field_name: es_data_type that overrides default es data types + thread_count: int + number of the threads to use for the bulk requests chunksize: int, default None Number of pandas.DataFrame rows to read before bulk index into Elasticsearch use_pandas_index_for_es_ids: bool, default 'True' @@ -205,33 +208,43 @@ def pandas_to_eland( else: es_client.indices.create(index=es_dest_index, body=mapping) - # Now add data - actions = [] - n = 0 - for row in pd_df.iterrows(): - if es_dropna: - values = row[1].dropna().to_dict() - else: - values = row[1].to_dict() - - if use_pandas_index_for_es_ids: - # Use index as _id - id = row[0] - - # Use integer as id field for repeatable results - action = {"_index": es_dest_index, "_source": values, "_id": str(id)} - else: - action = {"_index": es_dest_index, "_source": values} - - actions.append(action) - - n = n + 1 - - if n % chunksize == 0: - bulk(client=es_client, actions=actions, refresh=es_refresh) - actions = [] + def action_generator( + pd_df: pd.DataFrame, + es_dropna: bool, + use_pandas_index_for_es_ids: bool, + es_dest_index: str, + ) -> Generator[Dict[str, Any], None, None]: + for row in pd_df.iterrows(): + if es_dropna: + values = row[1].dropna().to_dict() + else: + values = row[1].to_dict() + + if use_pandas_index_for_es_ids: + # Use index as _id + id = row[0] + + # Use integer as id field for repeatable results + action = {"_index": es_dest_index, "_source": values, "_id": str(id)} + else: + action = {"_index": es_dest_index, "_source": values} + + yield action + + # parallel_bulk is lazy generator so use deque to consume them immediately + deque( + parallel_bulk( + client=es_client, + actions=action_generator( + pd_df, es_dropna, use_pandas_index_for_es_ids, es_dest_index + ), + thread_count=thread_count, + chunk_size=chunksize / thread_count, + refresh=es_refresh, + ), + maxlen=0, + ) - bulk(client=es_client, actions=actions, refresh=es_refresh) return DataFrame(es_client, es_dest_index) From 040e10a2d5434bf615446348bf34df13d7e9643c Mon Sep 17 00:00:00 2001 From: "P. Sai Vinay" Date: Tue, 6 Oct 2020 12:04:14 +0530 Subject: [PATCH 2/2] Changes requested --- eland/etl.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/eland/etl.py b/eland/etl.py index b6bab7e8..e24b3967 100644 --- a/eland/etl.py +++ b/eland/etl.py @@ -224,7 +224,6 @@ def action_generator( # Use index as _id id = row[0] - # Use integer as id field for repeatable results action = {"_index": es_dest_index, "_source": values, "_id": str(id)} else: action = {"_index": es_dest_index, "_source": values} @@ -232,6 +231,7 @@ def action_generator( yield action # parallel_bulk is lazy generator so use deque to consume them immediately + # maxlen = 0 because don't need results of parallel_bulk deque( parallel_bulk( client=es_client, @@ -240,11 +240,13 @@ def action_generator( ), thread_count=thread_count, chunk_size=chunksize / thread_count, - refresh=es_refresh, ), maxlen=0, ) + if es_refresh: + es_client.indices.refresh(index=es_dest_index) + return DataFrame(es_client, es_dest_index)