Skip to content

Commit

Permalink
Improve efficiency of 'pandas_to_eland()' using 'parallel_bulk()'
Browse files Browse the repository at this point in the history
  • Loading branch information
V1NAY8 authored Oct 8, 2020
1 parent 225a23a commit 0dd247b
Showing 1 changed file with 44 additions and 29 deletions.
73 changes: 44 additions & 29 deletions eland/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
# under the License.

import csv
from typing import Union, List, Tuple, Optional, Mapping, Dict, Any

from typing import Generator, Union, List, Tuple, Optional, Mapping, Dict, Any
from collections import deque
import pandas as pd # type: ignore
from pandas.io.parsers import _c_parser_defaults # type: ignore

Expand All @@ -26,7 +26,7 @@
from eland.common import ensure_es_client, DEFAULT_CHUNK_SIZE
from eland.utils import deprecated_api
from elasticsearch import Elasticsearch # type: ignore
from elasticsearch.helpers import bulk # type: ignore
from elasticsearch.helpers import parallel_bulk # type: ignore


@deprecated_api("eland.DataFrame()")
Expand Down Expand Up @@ -67,6 +67,7 @@ def pandas_to_eland(
es_refresh: bool = False,
es_dropna: bool = False,
es_type_overrides: Optional[Mapping[str, str]] = None,
thread_count: int = 4,
chunksize: Optional[int] = None,
use_pandas_index_for_es_ids: bool = True,
) -> DataFrame:
Expand Down Expand Up @@ -95,6 +96,8 @@ def pandas_to_eland(
* False: Include missing values - may cause bulk to fail
es_type_overrides: dict, default None
Dict of field_name: es_data_type that overrides default es data types
thread_count: int
number of the threads to use for the bulk requests
chunksize: int, default None
Number of pandas.DataFrame rows to read before bulk index into Elasticsearch
use_pandas_index_for_es_ids: bool, default 'True'
Expand Down Expand Up @@ -205,33 +208,45 @@ def pandas_to_eland(
else:
es_client.indices.create(index=es_dest_index, body=mapping)

# Now add data
actions = []
n = 0
for row in pd_df.iterrows():
if es_dropna:
values = row[1].dropna().to_dict()
else:
values = row[1].to_dict()

if use_pandas_index_for_es_ids:
# Use index as _id
id = row[0]

# Use integer as id field for repeatable results
action = {"_index": es_dest_index, "_source": values, "_id": str(id)}
else:
action = {"_index": es_dest_index, "_source": values}

actions.append(action)

n = n + 1

if n % chunksize == 0:
bulk(client=es_client, actions=actions, refresh=es_refresh)
actions = []
def action_generator(
pd_df: pd.DataFrame,
es_dropna: bool,
use_pandas_index_for_es_ids: bool,
es_dest_index: str,
) -> Generator[Dict[str, Any], None, None]:
for row in pd_df.iterrows():
if es_dropna:
values = row[1].dropna().to_dict()
else:
values = row[1].to_dict()

if use_pandas_index_for_es_ids:
# Use index as _id
id = row[0]

action = {"_index": es_dest_index, "_source": values, "_id": str(id)}
else:
action = {"_index": es_dest_index, "_source": values}

yield action

# parallel_bulk is lazy generator so use deque to consume them immediately
# maxlen = 0 because don't need results of parallel_bulk
deque(
parallel_bulk(
client=es_client,
actions=action_generator(
pd_df, es_dropna, use_pandas_index_for_es_ids, es_dest_index
),
thread_count=thread_count,
chunk_size=chunksize / thread_count,
),
maxlen=0,
)

if es_refresh:
es_client.indices.refresh(index=es_dest_index)

bulk(client=es_client, actions=actions, refresh=es_refresh)
return DataFrame(es_client, es_dest_index)


Expand Down

0 comments on commit 0dd247b

Please sign in to comment.