diff --git a/eland/etl.py b/eland/etl.py index 3a604521..7be5744c 100644 --- a/eland/etl.py +++ b/eland/etl.py @@ -48,6 +48,7 @@ def pandas_to_eland( es_refresh: bool = False, es_dropna: bool = False, es_type_overrides: Optional[Mapping[str, str]] = None, + es_verify_mapping_compatibility: bool = True, thread_count: int = 4, chunksize: Optional[int] = None, use_pandas_index_for_es_ids: bool = True, @@ -77,6 +78,9 @@ def pandas_to_eland( * False: Include missing values - may cause bulk to fail es_type_overrides: dict, default None Dict of field_name: es_data_type that overrides default es data types + es_verify_mapping_compatibility: bool, default 'True' + * True: Verify that the dataframe schema matches the Elasticsearch index schema + * False: Do not verify schema thread_count: int number of the threads to use for the bulk requests chunksize: int, default None @@ -177,7 +181,7 @@ def pandas_to_eland( es_client.indices.delete(index=es_dest_index) es_api_compat(es_client.indices.create, index=es_dest_index, body=mapping) - elif es_if_exists == "append": + elif es_if_exists == "append" and es_verify_mapping_compatibility: dest_mapping = es_client.indices.get_mapping(index=es_dest_index)[ es_dest_index ] diff --git a/tests/etl/test_pandas_to_eland.py b/tests/etl/test_pandas_to_eland.py index c99f57c3..36cd1fe3 100644 --- a/tests/etl/test_pandas_to_eland.py +++ b/tests/etl/test_pandas_to_eland.py @@ -137,7 +137,7 @@ def test_es_if_exists_append(self): pd_df3 = pd_df.append(pd_df2) assert_pandas_eland_frame_equal(pd_df3, df2) - def test_es_if_exists_append_mapping_mismatch(self): + def test_es_if_exists_append_mapping_mismatch_schema_enforcement(self): df1 = pandas_to_eland( pd_df, es_client=ES_TEST_CLIENT, @@ -162,9 +162,60 @@ def test_es_if_exists_append_mapping_mismatch(self): "- 'Z' is missing from ES index mapping\n" "- 'a' column type ('keyword') not compatible with ES index mapping type ('long')" ) + # Assert that the index isn't modified assert_pandas_eland_frame_equal(pd_df, df1) + def test_es_if_exists_append_mapping_mismatch_no_schema_enforcement(self): + pandas_to_eland( + pd_df, + es_client=ES_TEST_CLIENT, + es_dest_index="test-index", + es_if_exists="append", + es_refresh=True, + ) + + pd_df2 = pd.DataFrame( + { + "a": [4, 5, 6], + "b": [-1.0, -2.0, -3.0], + "d": [dt, dt - timedelta(1), dt - timedelta(2)], + "e": ["A", "B", "C"], + }, + index=["3", "4", "5"], + ) + + pandas_to_eland( + pd_df2, + es_client=ES_TEST_CLIENT, + es_dest_index="test-index", + es_if_exists="append", + es_refresh=True, + es_verify_mapping_compatibility=False, + ) + + final_df = pd.DataFrame( + { + "a": [1, 2, 3, 4, 5, 6], + "b": [1.0, 2.0, 3.0, -1.0, -2.0, -3.0], + "c": ["A", "B", "C", None, None, None], + "d": [ + dt, + dt + timedelta(1), + dt + timedelta(2), + dt, + dt - timedelta(1), + dt - timedelta(2), + ], + "e": [None, None, None, "A", "B", "C"], + }, + index=["0", "1", "2", "3", "4", "5"], + ) + + eland_df = DataFrame(ES_TEST_CLIENT, "test-index") + # Assert that the index isn't modified + assert_pandas_eland_frame_equal(final_df, eland_df) + def test_es_if_exists_append_es_type_coerce_error(self): df1 = pandas_to_eland( pd_df,