Skip to content
This repository has been archived by the owner on Oct 14, 2021. It is now read-only.

Commit

Permalink
#40 es bulk write implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
ArtemisDicoTiar committed Oct 12, 2021
1 parent 88a72f8 commit 838f59c
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 12 deletions.
35 changes: 31 additions & 4 deletions storyteller/elasticsearch/elastic_controller.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import json
from pathlib import Path
from typing import List, Dict, Union

from elasticsearch import Elasticsearch


Expand All @@ -10,6 +14,8 @@ def __init__(self,
self.elastic = Elasticsearch(cloud_id, http_auth=(user_id, user_pw))
self.index = index_name

self.batch_size = 10000

def read(self,
query: dict,
highlight: dict = None):
Expand Down Expand Up @@ -42,17 +48,38 @@ def read(self,
return self.elastic.search(index=self.index, query=query, highlight=highlight)

def write(self,
info,
info: Union[List[Dict], Dict],
bulk: bool):
"""
:param bulk:
:param info:
:return:
"""

if not (bulk and (type(info) == list)):
raise ValueError("Invalid info: (bulk is True) info must be list of dictionary.")

if not (not bulk and (type(info) == dict)):
raise ValueError("Invalid info: (bulk is False) info must be dictionary .")

if bulk:
# TODO: Bulk upload 구현.
return self.elastic.bulk(index=self.index, doc_type='_doc', body=info)
res = list()

for d in range((len(info) // self.batch_size) + 1):
cur_docs = info[d * self.batch_size: (d + 1) * self.batch_size]
cur_docs = '\n'.join(
list(
map(
lambda doc: '{"index": {}}\n' + json.dumps(doc),
cur_docs
)
)
)

r = self.elastic.bulk(index=self.index, doc_type='_doc', body=cur_docs)
res.append(r)

return res

else:
return self.elastic.index(index=self.index, doc_type='_doc', document=info)

14 changes: 6 additions & 8 deletions storyteller/examples/elasticsearch/explore_bulk_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@ def explore_bulk():

res = es.write(
bulk=True,
info='{"index": {"_index": "testiiing", "_id": "1"}}\n'
'{"source": 4, "title": "3572", "sents": "sent1"}\n'
'{"index": {"_index": "testiiing", "_id": "2"}}\n'
'{"source": 2, "title": "45475", "sents": "sent2"}\n'
'{"index": {"_index": "testiiing", "_id": "3"}}\n'
'{"source": 9, "title": "96346", "sents": "sent3"}\n'
'{"index": {"_index": "testiiing", "_id": "4"}}\n'
'{"source": 8, "title": "30875", "sents": "sent4"}\n'
info=[
{"source": 4, "title": "35722", "sents": "sent1"},
{"source": 2, "title": "45475", "sents": "sent2"},
{"source": 9, "title": "96346", "sents": "sent3"},
{"source": 8, "title": "30875", "sents": "sent4"}
]
)

print(res)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import json


def explore_dict_to_target_json_for_es_bulk():
data = [
{"source": 4, "title": "868", "sent": "test_sent", "sent_id": 0},
{"source": 4, "title": "868", "sent": "test_sent", "sent_id": 1},
{"source": 4, "title": "868", "sent": "test_sent", "sent_id": 2},
{"source": 4, "title": "868", "sent": "test_sent", "sent_id": 3},
{"source": 4, "title": "868", "sent": "test_sent", "sent_id": 4},
{"source": 4, "title": "868", "sent": "test_sent", "sent_id": 5},
{"source": 4, "title": "868", "sent": "test_sent", "sent_id": 6},
{"source": 4, "title": "868", "sent": "test_sent", "sent_id": 7},
{"source": 4, "title": "868", "sent": "test_sent", "sent_id": 8},
{"source": 4, "title": "868", "sent": "test_sent", "sent_id": 9},
{"source": 4, "title": "868", "sent": "test_sent", "sent_id": 10},
{"source": 5, "title": "32963", "sent": "test_sent", "sent_id": 0},
{"source": 5, "title": "32963", "sent": "test_sent", "sent_id": 1},
{"source": 5, "title": "32963", "sent": "test_sent", "sent_id": 2},
{"source": 5, "title": "32963", "sent": "test_sent", "sent_id": 3},
{"source": 5, "title": "32963", "sent": "test_sent", "sent_id": 4},
{"source": 2, "title": "43580", "sent": "test_sent", "sent_id": 0},
{"source": 2, "title": "43580", "sent": "test_sent", "sent_id": 1},
{"source": 2, "title": "43580", "sent": "test_sent", "sent_id": 2}
]

cur_docs = '\n'.join(
list(
map(
lambda doc: '{"index": {}}\n' + json.dumps(doc),
data
)
)
)

print(cur_docs)


if __name__ == '__main__':
explore_dict_to_target_json_for_es_bulk()

0 comments on commit 838f59c

Please sign in to comment.