Skip to content

Commit

Permalink
Allow dict in es_type_overrides, text fields by default get keyword s…
Browse files Browse the repository at this point in the history
…ub-field
  • Loading branch information
sethmlarson authored Oct 29, 2020
1 parent cb4cd08 commit b936e98
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 22 deletions.
32 changes: 11 additions & 21 deletions eland/field_mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Optional,
Set,
Tuple,
Union,
)

import numpy as np
Expand Down Expand Up @@ -515,26 +516,7 @@ def _generate_es_mappings(
-------
mapping : str
"""

"""
"mappings" : {
"properties" : {
"AvgTicketPrice" : {
"type" : "float"
},
"Cancelled" : {
"type" : "boolean"
},
"Carrier" : {
"type" : "keyword"
},
"Dest" : {
"type" : "keyword"
}
}
}
"""
es_dtype: str
es_dtype: Union[str, Dict[str, Any]]

mapping_props: Dict[str, Any] = {}

Expand All @@ -550,10 +532,18 @@ def _generate_es_mappings(
for column, dtype in dataframe.dtypes.iteritems():
if es_type_overrides is not None and column in es_type_overrides:
es_dtype = es_type_overrides[column]
if es_dtype == "text":
es_dtype = {
"type": "text",
"fields": {"keyword": {"type": "keyword"}},
}
else:
es_dtype = FieldMappings._pd_dtype_to_es_dtype(dtype)

mapping_props[column] = {"type": es_dtype}
if isinstance(es_dtype, str):
mapping_props[column] = {"type": es_dtype}
else:
mapping_props[column] = es_dtype

return {"mappings": {"properties": mapping_props}}

Expand Down
5 changes: 5 additions & 0 deletions eland/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,11 @@ def aggs_groupby(

# Construct Query
for by_field in by_fields:
if by_field.aggregatable_es_field_name is None:
raise ValueError(
f"Cannot use {by_field.column!r} with groupby() because "
f"it has no aggregatable fields in Elasticsearch"
)
# groupby fields will be term aggregations
body.composite_agg_bucket_terms(
name=f"groupby_{by_field.column}",
Expand Down
5 changes: 4 additions & 1 deletion eland/tests/dataframe/test_utils_pytest.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ def test_pandas_to_eland_ignore_index(self):
"E": {"type": "double"},
"F": {"type": "boolean"},
"G": {"type": "long"},
"H": {"type": "text"},
"H": {
"type": "text",
"fields": {"keyword": {"type": "keyword"}},
},
"I": {"type": "geo_point"},
}
}
Expand Down
44 changes: 44 additions & 0 deletions eland/tests/etl/test_pandas_to_eland.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,47 @@ def test_es_if_exists_append_es_type_coerce_error(self):

# Assert that the value 128 caused the index error
assert "Value [128] is out of range for a byte" in str(e.value)

def test_pandas_to_eland_text_inserts_keyword(self):
es = ES_TEST_CLIENT
df1 = pandas_to_eland(
pd_df,
es_client=es,
es_dest_index="test-index",
es_if_exists="append",
es_refresh=True,
es_type_overrides={
"c": "text",
"b": {"type": "float"},
"d": {"type": "text"},
},
)
assert es.indices.get_mapping(index="test-index") == {
"test-index": {
"mappings": {
"properties": {
"a": {"type": "long"},
"b": {"type": "float"},
"c": {
"fields": {"keyword": {"type": "keyword"}},
"type": "text",
},
"d": {"type": "text"},
}
}
}
}

# 'c' is aggregatable on 'keyword'
assert df1.groupby("c").mean().to_dict() == {
"a": {"A": 1.0, "B": 2.0, "C": 3.0},
"b": {"A": 1.0, "B": 2.0, "C": 3.0},
}

# 'd' isn't aggregatable because it's missing the 'keyword'
with pytest.raises(ValueError) as e:
df1.groupby("d").mean()
assert str(e.value) == (
"Cannot use 'd' with groupby() because it has "
"no aggregatable fields in Elasticsearch"
)

0 comments on commit b936e98

Please sign in to comment.