Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve efficiency of pandas_to_eland() using parallel_bulk() #279

Merged
merged 2 commits into from
Oct 8, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 44 additions & 29 deletions eland/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
# under the License.

import csv
from typing import Union, List, Tuple, Optional, Mapping, Dict, Any

from typing import Generator, Union, List, Tuple, Optional, Mapping, Dict, Any
from collections import deque
import pandas as pd # type: ignore
from pandas.io.parsers import _c_parser_defaults # type: ignore

Expand All @@ -26,7 +26,7 @@
from eland.common import ensure_es_client, DEFAULT_CHUNK_SIZE
from eland.utils import deprecated_api
from elasticsearch import Elasticsearch # type: ignore
from elasticsearch.helpers import bulk # type: ignore
from elasticsearch.helpers import parallel_bulk # type: ignore


@deprecated_api("eland.DataFrame()")
Expand Down Expand Up @@ -67,6 +67,7 @@ def pandas_to_eland(
es_refresh: bool = False,
es_dropna: bool = False,
es_type_overrides: Optional[Mapping[str, str]] = None,
thread_count: int = 4,
chunksize: Optional[int] = None,
use_pandas_index_for_es_ids: bool = True,
) -> DataFrame:
Expand Down Expand Up @@ -95,6 +96,8 @@ def pandas_to_eland(
* False: Include missing values - may cause bulk to fail
es_type_overrides: dict, default None
Dict of field_name: es_data_type that overrides default es data types
thread_count: int
number of the threads to use for the bulk requests
chunksize: int, default None
Number of pandas.DataFrame rows to read before bulk index into Elasticsearch
use_pandas_index_for_es_ids: bool, default 'True'
Expand Down Expand Up @@ -205,33 +208,45 @@ def pandas_to_eland(
else:
es_client.indices.create(index=es_dest_index, body=mapping)

# Now add data
actions = []
n = 0
for row in pd_df.iterrows():
if es_dropna:
values = row[1].dropna().to_dict()
else:
values = row[1].to_dict()

if use_pandas_index_for_es_ids:
# Use index as _id
id = row[0]

# Use integer as id field for repeatable results
action = {"_index": es_dest_index, "_source": values, "_id": str(id)}
else:
action = {"_index": es_dest_index, "_source": values}

actions.append(action)

n = n + 1

if n % chunksize == 0:
bulk(client=es_client, actions=actions, refresh=es_refresh)
actions = []
def action_generator(
pd_df: pd.DataFrame,
es_dropna: bool,
use_pandas_index_for_es_ids: bool,
es_dest_index: str,
) -> Generator[Dict[str, Any], None, None]:
for row in pd_df.iterrows():
if es_dropna:
values = row[1].dropna().to_dict()
else:
values = row[1].to_dict()

if use_pandas_index_for_es_ids:
# Use index as _id
id = row[0]

action = {"_index": es_dest_index, "_source": values, "_id": str(id)}
else:
action = {"_index": es_dest_index, "_source": values}

yield action

# parallel_bulk is lazy generator so use deque to consume them immediately
V1NAY8 marked this conversation as resolved.
Show resolved Hide resolved
# maxlen = 0 because don't need results of parallel_bulk
deque(
parallel_bulk(
client=es_client,
actions=action_generator(
pd_df, es_dropna, use_pandas_index_for_es_ids, es_dest_index
),
thread_count=thread_count,
chunk_size=chunksize / thread_count,
),
maxlen=0,
)

if es_refresh:
es_client.indices.refresh(index=es_dest_index)

bulk(client=es_client, actions=actions, refresh=es_refresh)
return DataFrame(es_client, es_dest_index)


Expand Down