diff --git a/eland/Untitled-1.ipynb b/eland/Untitled-1.ipynb new file mode 100644 index 00000000..103c02fc --- /dev/null +++ b/eland/Untitled-1.ipynb @@ -0,0 +1,369 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "\n", + "from datetime import datetime, timedelta\n", + "\n", + "dt = datetime.utcnow()" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": {}, + "outputs": [], + "source": [ + "df1 = pd.DataFrame(\n", + " {\n", + " \"a\": [1, 2, 3],\n", + " \"b\": [1.0, 2.0, 3.0],\n", + " \"c\": [\"A\", \"B\", \"C\"],\n", + " \"d\": [dt, dt + timedelta(1), dt + timedelta(2)],\n", + " },\n", + " index=[\"0\", \"1\", \"2\"],\n", + ")\n", + "\n", + "df2 = pd.DataFrame({\"Z\": [3, 2, 1], \"a\": [4, 5, 6]}, index=[\"0\", \"1\", \"2\"])\n", + "es_index = \"test3\"" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [], + "source": [ + "from elasticsearch import Elasticsearch\n", + "\n", + "es_client = Elasticsearch(\"http://localhost:9200\")" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [], + "source": [ + "import eland as ed" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/sidhuas/.pyenv/versions/3.9.1/envs/eland/lib/python3.9/site-packages/elasticsearch/connection/base.py:200: ElasticsearchWarning: Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.16/security-minimal-setup.html to enable security.\n", + " warnings.warn(message, category=ElasticsearchWarning)\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
abcd
\n", + "
\n", + "

0 rows × 4 columns

" + ], + "text/plain": [ + "Empty DataFrame\n", + "Columns: [a, b, c, d]\n", + "Index: []\n", + "\n", + "[0 rows x 4 columns]" + ] + }, + "execution_count": 25, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ed.pandas_to_eland(df1, es_client, es_index, es_dropna=True, \n", + " es_if_exists='append', use_pandas_index_for_es_ids=False)" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/sidhuas/.pyenv/versions/3.9.1/envs/eland/lib/python3.9/site-packages/elasticsearch/connection/base.py:200: ElasticsearchWarning: Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.16/security-minimal-setup.html to enable security.\n", + " warnings.warn(message, category=ElasticsearchWarning)\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
Zabcd
6KaKA34BeVG4RuLOG_yXNaN11.0A2021-12-29 00:13:36.956031
6aaKA34BeVG4RuLOG_yXNaN22.0B2021-12-30 00:13:36.956031
6qaKA34BeVG4RuLOG_yXNaN33.0C2021-12-31 00:13:36.956031
\n", + "
\n", + "

3 rows × 5 columns

" + ], + "text/plain": [ + " Z a b c d\n", + "6KaKA34BeVG4RuLOG_yX NaN 1 1.0 A 2021-12-29 00:13:36.956031\n", + "6aaKA34BeVG4RuLOG_yX NaN 2 2.0 B 2021-12-30 00:13:36.956031\n", + "6qaKA34BeVG4RuLOG_yX NaN 3 3.0 C 2021-12-31 00:13:36.956031\n", + "\n", + "[3 rows x 5 columns]" + ] + }, + "execution_count": 26, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ed.pandas_to_eland(df2, es_client, es_index, es_dropna=True, enforce_index_schema=False,\n", + " es_if_exists='append', use_pandas_index_for_es_ids=False)" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/sidhuas/.pyenv/versions/3.9.1/envs/eland/lib/python3.9/site-packages/elasticsearch/connection/base.py:200: ElasticsearchWarning: Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.16/security-minimal-setup.html to enable security.\n", + " warnings.warn(message, category=ElasticsearchWarning)\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
Zabcd
6KaKA34BeVG4RuLOG_yXNaN11.0A2021-12-29 00:13:36.956031
6aaKA34BeVG4RuLOG_yXNaN22.0B2021-12-30 00:13:36.956031
6qaKA34BeVG4RuLOG_yXNaN33.0C2021-12-31 00:13:36.956031
66aKA34BeVG4RuLOH_xK3.04NaNNaNNaT
7KaKA34BeVG4RuLOH_xK2.05NaNNaNNaT
7aaKA34BeVG4RuLOH_xK1.06NaNNaNNaT
\n", + "
\n", + "

6 rows × 5 columns

" + ], + "text/plain": [ + " Z a b c d\n", + "6KaKA34BeVG4RuLOG_yX NaN 1 1.0 A 2021-12-29 00:13:36.956031\n", + "6aaKA34BeVG4RuLOG_yX NaN 2 2.0 B 2021-12-30 00:13:36.956031\n", + "6qaKA34BeVG4RuLOG_yX NaN 3 3.0 C 2021-12-31 00:13:36.956031\n", + "66aKA34BeVG4RuLOH_xK 3.0 4 NaN NaN NaT\n", + "7KaKA34BeVG4RuLOH_xK 2.0 5 NaN NaN NaT\n", + "7aaKA34BeVG4RuLOH_xK 1.0 6 NaN NaN NaT\n", + "\n", + "[6 rows x 5 columns]" + ] + }, + "execution_count": 27, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ed.DataFrame(es_client, es_index)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "interpreter": { + "hash": "a4e377a64fa06eb866bfeb92d02adfe8420325b7057dd732fb302fa0a637b068" + }, + "kernelspec": { + "display_name": "Python 3.9.1 64-bit ('eland': pyenv)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.1" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/eland/etl.py b/eland/etl.py index 3a604521..4ef11817 100644 --- a/eland/etl.py +++ b/eland/etl.py @@ -48,6 +48,7 @@ def pandas_to_eland( es_refresh: bool = False, es_dropna: bool = False, es_type_overrides: Optional[Mapping[str, str]] = None, + enforce_index_schema: bool = True, thread_count: int = 4, chunksize: Optional[int] = None, use_pandas_index_for_es_ids: bool = True, @@ -177,7 +178,7 @@ def pandas_to_eland( es_client.indices.delete(index=es_dest_index) es_api_compat(es_client.indices.create, index=es_dest_index, body=mapping) - elif es_if_exists == "append": + elif es_if_exists == "append" and enforce_index_schema: dest_mapping = es_client.indices.get_mapping(index=es_dest_index)[ es_dest_index ] diff --git a/tests/etl/test_pandas_to_eland.py b/tests/etl/test_pandas_to_eland.py index c99f57c3..155029d8 100644 --- a/tests/etl/test_pandas_to_eland.py +++ b/tests/etl/test_pandas_to_eland.py @@ -137,7 +137,7 @@ def test_es_if_exists_append(self): pd_df3 = pd_df.append(pd_df2) assert_pandas_eland_frame_equal(pd_df3, df2) - def test_es_if_exists_append_mapping_mismatch(self): + def test_es_if_exists_append_mapping_mismatch_schema_enforcement(self): df1 = pandas_to_eland( pd_df, es_client=ES_TEST_CLIENT, @@ -154,14 +154,29 @@ def test_es_if_exists_append_mapping_mismatch(self): es_if_exists="append", ) - assert str(e.value) == ( - "DataFrame dtypes and Elasticsearch index mapping aren't compatible:\n" - "- 'b' is missing from DataFrame columns\n" - "- 'c' is missing from DataFrame columns\n" - "- 'd' is missing from DataFrame columns\n" - "- 'Z' is missing from ES index mapping\n" - "- 'a' column type ('keyword') not compatible with ES index mapping type ('long')" + + # Assert that the index isn't modified + assert_pandas_eland_frame_equal(pd_df, df1) + + def test_es_if_exists_append_mapping_mismatch_no_schema_enforcement(self): + df1 = pandas_to_eland( + pd_df, + es_client=ES_TEST_CLIENT, + es_dest_index="test-index", + es_if_exists="append", + es_refresh=True, ) + + df2 = pandas_to_eland( + pd_df2, + es_client=ES_TEST_CLIENT, + es_dest_index="test-index", + es_if_exists="append", + ) + + pr + + # Assert that the index isn't modified assert_pandas_eland_frame_equal(pd_df, df1)