Skip to content

Commit

Permalink
SQL: Support pattern against compatible indices (#34718)
Browse files Browse the repository at this point in the history
Extend querying support on multiple indices from being strictly
identical to being just compatible.
Use FieldCapabilities API (extended through #33803) for mapping merging.

Close #31837 #31611
  • Loading branch information
costin authored Oct 23, 2018
1 parent 36baf38 commit ca6808e
Show file tree
Hide file tree
Showing 25 changed files with 727 additions and 348 deletions.
2 changes: 1 addition & 1 deletion docs/reference/sql/security.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ indices:

["source","yaml",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{sql-tests}/security/roles.yml[cli_jdbc]
include-tagged::{sql-tests}/security/roles.yml[cli_drivers]
--------------------------------------------------

Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public enum DataType {
SCALED_FLOAT(JDBCType.FLOAT, Double.class, Double.BYTES, 19, 25, false, true, true),
KEYWORD( JDBCType.VARCHAR, String.class, Integer.MAX_VALUE, 256, 0),
TEXT( JDBCType.VARCHAR, String.class, Integer.MAX_VALUE, Integer.MAX_VALUE, 0, false, false, false),
OBJECT( JDBCType.STRUCT, null, -1, 0, 0),
NESTED( JDBCType.STRUCT, null, -1, 0, 0),
OBJECT( JDBCType.STRUCT, null, -1, 0, 0, false, false, false),
NESTED( JDBCType.STRUCT, null, -1, 0, 0, false, false, false),
BINARY( JDBCType.VARBINARY, byte[].class, -1, Integer.MAX_VALUE, 0),
// since ODBC and JDBC interpret precision for Date as display size,
// the precision is 23 (number of chars in ISO8601 with millis) + Z (the UTC timezone)
Expand Down Expand Up @@ -223,7 +223,11 @@ public static DataType fromODBCType(String odbcType) {
* For any dataType DataType.fromEsType(dataType.esType) == dataType
*/
public static DataType fromEsType(String esType) {
return DataType.valueOf(esType.toUpperCase(Locale.ROOT));
try {
return DataType.valueOf(esType.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException ex) {
return DataType.UNSUPPORTED;
}
}

public boolean isCompatibleWith(DataType other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest.Feature;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.IndicesOptions.Option;
import org.elasticsearch.action.support.IndicesOptions.WildcardStates;
Expand All @@ -24,23 +26,34 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
import org.elasticsearch.xpack.sql.type.DataType;
import org.elasticsearch.xpack.sql.type.DateEsField;
import org.elasticsearch.xpack.sql.type.EsField;
import org.elasticsearch.xpack.sql.type.KeywordEsField;
import org.elasticsearch.xpack.sql.type.TextEsField;
import org.elasticsearch.xpack.sql.type.Types;
import org.elasticsearch.xpack.sql.type.UnsupportedEsField;
import org.elasticsearch.xpack.sql.util.CollectionUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.regex.Pattern;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;

public class IndexResolver {

Expand Down Expand Up @@ -222,64 +235,157 @@ private void filterResults(String javaRegex, GetAliasesResponse aliases, GetInde
listener.onResponse(result);
}


/**
* Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping.
*/
public void resolveWithSameMapping(String indexWildcard, String javaRegex, ActionListener<IndexResolution> listener) {
GetIndexRequest getIndexRequest = createGetIndexRequest(indexWildcard);
client.admin().indices().getIndex(getIndexRequest, ActionListener.wrap(response -> {
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = response.getMappings();

List<IndexResolution> resolutions;
if (mappings.size() > 0) {
resolutions = new ArrayList<>(mappings.size());
Pattern pattern = javaRegex != null ? Pattern.compile(javaRegex) : null;
for (ObjectObjectCursor<String, ImmutableOpenMap<String, MappingMetaData>> indexMappings : mappings) {
String concreteIndex = indexMappings.key;
if (pattern == null || pattern.matcher(concreteIndex).matches()) {
resolutions.add(buildGetIndexResult(concreteIndex, concreteIndex, indexMappings.value));
public void resolveAsMergedMapping(String indexWildcard, String javaRegex, ActionListener<IndexResolution> listener) {
FieldCapabilitiesRequest fieldRequest = createFieldCapsRequest(indexWildcard);
client.fieldCaps(fieldRequest,
ActionListener.wrap(response -> listener.onResponse(mergedMapping(indexWildcard, response.get())), listener::onFailure));
}

static IndexResolution mergedMapping(String indexPattern, Map<String, Map<String, FieldCapabilities>> fieldCaps) {
if (fieldCaps == null || fieldCaps.isEmpty()) {
return IndexResolution.notFound(indexPattern);
}

StringBuilder errorMessage = new StringBuilder();

NavigableSet<Entry<String, Map<String, FieldCapabilities>>> sortedFields = new TreeSet<>(
// for some reason .reversed doesn't work (prolly due to inference)
Collections.reverseOrder(Comparator.comparing(Entry::getKey)));
sortedFields.addAll(fieldCaps.entrySet());

Map<String, EsField> hierarchicalMapping = new TreeMap<>();
Map<String, EsField> flattedMapping = new LinkedHashMap<>();

// sort keys descending in order to easily detect multi-fields (a.b.c multi-field of a.b)
// without sorting, they can still be detected however without the emptyMap optimization
// (fields without multi-fields have no children)
for (Entry<String, Map<String, FieldCapabilities>> entry : sortedFields) {
String name = entry.getKey();
// skip internal fields
if (!name.startsWith("_")) {
Map<String, FieldCapabilities> types = entry.getValue();
// field is mapped differently across indices
if (types.size() > 1) {
// build error message
for (Entry<String, FieldCapabilities> type : types.entrySet()) {
if (errorMessage.length() > 0) {
errorMessage.append(", ");
}
errorMessage.append("[");
errorMessage.append(type.getKey());
errorMessage.append("] in ");
errorMessage.append(Arrays.toString(type.getValue().indices()));
}

errorMessage.insert(0,
"[" + indexPattern + "] points to indices with incompatible mappings; " +
"field [" + name + "] is mapped in [" + types.size() + "] different ways: ");
}
if (errorMessage.length() > 0) {
return IndexResolution.invalid(errorMessage.toString());
}

FieldCapabilities fieldCap = types.values().iterator().next();
// validate search/agg-able
if (fieldCap.isAggregatable() && fieldCap.nonAggregatableIndices() != null) {
errorMessage.append("[" + indexPattern + "] points to indices with incompatible mappings: ");
errorMessage.append("field [" + name + "] is aggregateable except in ");
errorMessage.append(Arrays.toString(fieldCap.nonAggregatableIndices()));
}
if (fieldCap.isSearchable() && fieldCap.nonSearchableIndices() != null) {
if (errorMessage.length() > 0) {
errorMessage.append(",");
}
errorMessage.append("[" + indexPattern + "] points to indices with incompatible mappings: ");
errorMessage.append("field [" + name + "] is searchable except in ");
errorMessage.append(Arrays.toString(fieldCap.nonSearchableIndices()));
}
if (errorMessage.length() > 0) {
return IndexResolution.invalid(errorMessage.toString());
}

// validation passes - create the field
// and name wasn't added before
if (!flattedMapping.containsKey(name)) {
createField(name, fieldCap, fieldCaps, hierarchicalMapping, flattedMapping, false);
}
} else {
resolutions = emptyList();
}
}

listener.onResponse(merge(resolutions, indexWildcard));
}, listener::onFailure));
return IndexResolution.valid(new EsIndex(indexPattern, hierarchicalMapping));
}

static IndexResolution merge(List<IndexResolution> resolutions, String indexWildcard) {
IndexResolution merged = null;
for (IndexResolution resolution : resolutions) {
// everything that follows gets compared
if (!resolution.isValid()) {
return resolution;
}
// initialize resolution on first run
if (merged == null) {
merged = resolution;
}
// need the same mapping across all resolutions
if (!merged.get().mapping().equals(resolution.get().mapping())) {
return IndexResolution.invalid(
"[" + indexWildcard + "] points to indices [" + merged.get().name() + "] "
+ "and [" + resolution.get().name() + "] which have different mappings. "
+ "When using multiple indices, the mappings must be identical.");
private static EsField createField(String fieldName, FieldCapabilities caps, Map<String, Map<String, FieldCapabilities>> globalCaps,
Map<String, EsField> hierarchicalMapping, Map<String, EsField> flattedMapping, boolean hasChildren) {

Map<String, EsField> parentProps = hierarchicalMapping;

int dot = fieldName.lastIndexOf('.');
String fullFieldName = fieldName;

if (dot >= 0) {
String parentName = fieldName.substring(0, dot);
fieldName = fieldName.substring(dot + 1);
EsField parent = flattedMapping.get(parentName);
if (parent == null) {
Map<String, FieldCapabilities> map = globalCaps.get(parentName);
if (map == null) {
throw new SqlIllegalArgumentException("Cannot find field {}; this is likely a bug", parentName);
}
FieldCapabilities parentCap = map.values().iterator().next();
parent = createField(parentName, parentCap, globalCaps, hierarchicalMapping, flattedMapping, true);
}
parentProps = parent.getProperties();
}
if (merged != null) {
// at this point, we are sure there's the same mapping across all (if that's the case) indices
// to keep things simple, use the given pattern as index name
merged = IndexResolution.valid(new EsIndex(indexWildcard, merged.get().mapping()));
} else {
merged = IndexResolution.notFound(indexWildcard);

EsField field = null;
Map<String, EsField> props = hasChildren ? new TreeMap<>() : emptyMap();

DataType esType = DataType.fromEsType(caps.getType());
switch (esType) {
case TEXT:
field = new TextEsField(fieldName, props, false);
break;
case KEYWORD:
int length = DataType.KEYWORD.defaultPrecision;
// TODO: to check whether isSearchable/isAggregateable takes into account the presence of the normalizer
boolean normalized = false;
field = new KeywordEsField(fieldName, props, caps.isAggregatable(), length, normalized);
break;
case DATE:
field = new DateEsField(fieldName, props, caps.isAggregatable());
break;
case UNSUPPORTED:
field = new UnsupportedEsField(fieldName, caps.getType());
break;
default:
field = new EsField(fieldName, esType, props, caps.isAggregatable());
}
return merged;

parentProps.put(fieldName, field);
flattedMapping.put(fullFieldName, field);

return field;
}

private static FieldCapabilitiesRequest createFieldCapsRequest(String index) {
return new FieldCapabilitiesRequest()
.indices(Strings.commaDelimitedListToStringArray(index))
.fields("*")
//lenient because we throw our own errors looking at the response e.g. if something was not resolved
//also because this way security doesn't throw authorization exceptions but rather honors ignore_unavailable
.indicesOptions(IndicesOptions.lenientExpandOpen());
}

// TODO: Concrete indices still uses get mapping
// waiting on https://github.com/elastic/elasticsearch/pull/34071
//

/**
* Resolves a pattern to multiple, separate indices.
* Resolves a pattern to multiple, separate indices. Doesn't perform validation.
*/
public void resolveAsSeparateMappings(String indexWildcard, String javaRegex, ActionListener<List<EsIndex>> listener) {
GetIndexRequest getIndexRequest = createGetIndexRequest(indexWildcard);
Expand All @@ -306,7 +412,7 @@ public void resolveAsSeparateMappings(String indexWildcard, String javaRegex, Ac
listener.onResponse(results);
}, listener::onFailure));
}

private static GetIndexRequest createGetIndexRequest(String index) {
return new GetIndexRequest()
.local(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@ protected NodeInfo<ShowColumns> info() {
@Override
public List<Attribute> output() {
return asList(new FieldAttribute(location(), "column", new KeywordEsField("column")),
new FieldAttribute(location(), "type", new KeywordEsField("type"))); }
new FieldAttribute(location(), "type", new KeywordEsField("type")),
new FieldAttribute(location(), "mapping", new KeywordEsField("mapping")));
}

@Override
public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
String idx = index != null ? index : (pattern != null ? pattern.asIndexNameWildcard() : "*");
String regex = pattern != null ? pattern.asJavaRegex() : null;
session.indexResolver().resolveWithSameMapping(idx, regex, ActionListener.wrap(
session.indexResolver().resolveAsMergedMapping(idx, regex, ActionListener.wrap(
indexResult -> {
List<List<?>> rows = emptyList();
if (indexResult.isValid()) {
Expand All @@ -69,8 +71,7 @@ public void execute(SqlSession session, ActionListener<SchemaRowSet> listener) {
}
listener.onResponse(Rows.of(output(), rows));
},
listener::onFailure
));
listener::onFailure));
}

private void fillInRows(Map<String, EsField> mapping, String prefix, List<List<?>> rows) {
Expand All @@ -79,7 +80,7 @@ private void fillInRows(Map<String, EsField> mapping, String prefix, List<List<?
DataType dt = field.getDataType();
String name = e.getKey();
if (dt != null) {
rows.add(asList(prefix != null ? prefix + "." + name : name, dt.sqlName()));
rows.add(asList(prefix != null ? prefix + "." + name : name, dt.sqlName(), dt.name()));
if (field.getProperties().isEmpty() == false) {
String newPrefix = prefix != null ? prefix + "." + name : name;
fillInRows(field.getProperties(), newPrefix, rows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private String aliasName(Attribute attr) {
// reference methods
//
private FieldExtraction topHitFieldRef(FieldAttribute fieldAttr) {
return new SearchHitFieldRef(aliasName(fieldAttr), fieldAttr.field().getDataType(), fieldAttr.field().hasDocValues());
return new SearchHitFieldRef(aliasName(fieldAttr), fieldAttr.field().getDataType(), fieldAttr.field().isAggregatable());
}

private Tuple<QueryContainer, FieldExtraction> nestedHitFieldRef(FieldAttribute attr) {
Expand All @@ -181,10 +181,10 @@ private Tuple<QueryContainer, FieldExtraction> nestedHitFieldRef(FieldAttribute

String name = aliasName(attr);
Query q = rewriteToContainNestedField(query, attr.location(),
attr.nestedParent().name(), name, attr.field().hasDocValues());
attr.nestedParent().name(), name, attr.field().isAggregatable());

SearchHitFieldRef nestedFieldRef = new SearchHitFieldRef(name, attr.field().getDataType(),
attr.field().hasDocValues(), attr.parent().name());
attr.field().isAggregatable(), attr.parent().name());
nestedRefs.add(nestedFieldRef);

return new Tuple<>(new QueryContainer(q, aggs, columns, aliases, pseudoFunctions, scalarFunctions, sort, limit), nestedFieldRef);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private <T> void preAnalyze(LogicalPlan parsed, Function<IndexResolution, T> act
listener.onFailure(new MappingException("Cannot inspect indices in cluster/catalog [{}]", cluster));
}

indexResolver.resolveWithSameMapping(table.index(), null,
indexResolver.resolveAsMergedMapping(table.index(), null,
wrap(indexResult -> listener.onResponse(action.apply(indexResult)), listener::onFailure));
} else {
try {
Expand Down
Loading

0 comments on commit ca6808e

Please sign in to comment.