Skip to content

Commit

Permalink
sync updates
Browse files Browse the repository at this point in the history
  • Loading branch information
kkrik-es committed Nov 5, 2024
1 parent 60b7bcb commit ccf9707
Show file tree
Hide file tree
Showing 22 changed files with 230 additions and 133 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.datastreams;

import org.elasticsearch.action.DocWriteRequest;
Expand Down Expand Up @@ -89,7 +91,7 @@ public void testIndexSearchAndRetrieval() throws Exception {
.template(
new Template(
Settings.builder()
.put("index.mode", "logs")
.put("index.mode", "logsdb")
.put("index.routing_path", "metricset,k8s.pod.uid")
.put("index.number_of_replicas", 0)
// Reduce sync interval to speedup this integraton test,
Expand Down Expand Up @@ -135,7 +137,7 @@ public void testIndexSearchAndRetrieval() throws Exception {
var searchRequest = new SearchRequest(dataStreamName);
searchRequest.source().trackTotalHits(true);
assertResponse(client().search(searchRequest), searchResponse -> {
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) numBulkRequests * numDocsPerBulk));
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo((long) numBulkRequests * numDocsPerBulk));
String id = searchResponse.getHits().getHits()[0].getId();
assertThat(id, notNullValue());

Expand Down
24 changes: 7 additions & 17 deletions server/src/main/java/org/elasticsearch/index/IndexMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public IdFieldMapper buildIdFieldMapper(BooleanSupplier fieldDataEnabled) {

@Override
public DocumentDimensions buildDocumentDimensions(IndexSettings settings) {
return new DocumentDimensions.OnlySingleValueAllowed();
return DocumentDimensions.Noop.INSTANCE;
}

@Override
Expand All @@ -128,8 +128,8 @@ public boolean shouldValidateTimestamp() {
public void validateSourceFieldMapper(SourceFieldMapper sourceFieldMapper) {}

@Override
public boolean isSyntheticSourceEnabled() {
return false;
public SourceFieldMapper.Mode defaultSourceMode() {
return SourceFieldMapper.Mode.STORED;
}
},
TIME_SERIES("time_series") {
Expand Down Expand Up @@ -281,14 +281,6 @@ public MetadataFieldMapper indexModeIdFieldMapper() {
@Override
public MetadataFieldMapper routingHashFieldMapper() {
return DimensionRoutingHashFieldMapper.INSTANCE;
public MetadataFieldMapper timeSeriesRoutingHashFieldMapper() {
// non time-series indices must not have a TimeSeriesRoutingIdFieldMapper
return null;
}

@Override
public DocumentDimensions buildDocumentDimensions(IndexSettings settings) {
return DocumentDimensions.Noop.INSTANCE;
}

@Override
Expand Down Expand Up @@ -346,14 +338,12 @@ public TimestampBounds getTimestampBound(IndexMetadata indexMetadata) {
}

@Override
public MetadataFieldMapper timeSeriesIdFieldMapper() {
// non time-series indices must not have a TimeSeriesIdFieldMapper
public MetadataFieldMapper indexModeIdFieldMapper() {
return null;
}

@Override
public MetadataFieldMapper timeSeriesRoutingHashFieldMapper() {
// non time-series indices must not have a TimeSeriesRoutingIdFieldMapper
public MetadataFieldMapper routingHashFieldMapper() {
return null;
}

Expand Down Expand Up @@ -490,7 +480,7 @@ public String getName() {
* Get default mapping for this index or {@code null} if there is none.
*/
@Nullable
public abstract CompressedXContent getDefaultMapping();
public abstract CompressedXContent getDefaultMapping(IndexSettings indexSettings);

/**
* Build the {@link FieldMapper} for {@code _id}.
Expand Down Expand Up @@ -532,7 +522,7 @@ public DocumentDimensions buildDocumentDimensions(IndexSettings settings) {
IndexRouting.ExtractFromSource routing = (IndexRouting.ExtractFromSource) settings.getIndexRouting();
return new RoutingDimensions(routing.builder());
}
return new DocumentDimensions.OnlySingleValueAllowed();
return DocumentDimensions.Noop.INSTANCE;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ public IndexMode getMode() {

public boolean usesRoutingPath() {
return mode == IndexMode.TIME_SERIES
|| (mode == IndexMode.LOGS && IndexMetadata.INDEX_ROUTING_PATH.get(settings).isEmpty() == false);
|| (mode == IndexMode.LOGSDB && IndexMetadata.INDEX_ROUTING_PATH.get(settings).isEmpty() == false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public IndexSortConfig(IndexSettings indexSettings) {
return;
}

if (this.indexMode == IndexMode.LOGS) {
if (this.indexMode == IndexMode.LOGSDB) {
this.sortSpecs = LOGS_SORT;
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.mapper;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.StringHelper;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.hash.Murmur3Hasher;
Expand All @@ -22,6 +24,7 @@
import java.io.IOException;
import java.util.Base64;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -67,26 +70,38 @@ public static BytesReference build(RoutingDimensions routingDimensions) {
int index = StreamOutput.putVInt(bytes, len, 0);

hasher.reset();
for (final RoutingDimensions.Dimension dimension : dimensions) {
hasher.update(dimension.name().bytes);
for (final BytesRef name : dimensions.keySet()) {
hasher.update(name.bytes);
}
index = writeHash128(hasher.digestHash(), bytes, index);

// NOTE: concatenate all dimension value hashes up to a certain number of dimensions
int startIndex = index;
for (final RoutingDimensions.Dimension dimension : dimensions) {
for (final List<BytesReference> values : dimensions.values()) {
if ((index - startIndex) >= 4 * numberOfDimensions) {
break;
}
final BytesRef value = dimension.value().toBytesRef();
ByteUtils.writeIntLE(StringHelper.murmurhash3_x86_32(value.bytes, value.offset, value.length, SEED), bytes, index);
assert values.isEmpty() == false : "dimension values are empty";
final BytesRef dimensionValueBytesRef = values.get(0).toBytesRef();
ByteUtils.writeIntLE(
StringHelper.murmurhash3_x86_32(
dimensionValueBytesRef.bytes,
dimensionValueBytesRef.offset,
dimensionValueBytesRef.length,
SEED
),
bytes,
index
);
index += 4;
}

// NOTE: hash all dimension field values
// NOTE: hash all dimension field allValues
hasher.reset();
for (final RoutingDimensions.Dimension dimension : dimensions) {
hasher.update(dimension.value().toBytesRef().bytes);
for (final List<BytesReference> values : dimensions.values()) {
for (BytesReference v : values) {
hasher.update(v.toBytesRef().bytes);
}
}
index = writeHash128(hasher.digestHash(), bytes, index);

Expand Down Expand Up @@ -116,7 +131,7 @@ public static Object encode(final BytesRef bytesRef) {
private static String base64Encode(final BytesRef bytesRef) {
byte[] bytes = new byte[bytesRef.length];
System.arraycopy(bytesRef.bytes, bytesRef.offset, bytes, 0, bytesRef.length);
return BASE64_ENCODER.encodeToString(bytes);
return Strings.BASE_64_NO_PADDING_URL_ENCODER.encodeToString(bytes);
}

public static Map<String, Object> decodeAsMap(BytesRef bytesRef) {
Expand All @@ -128,38 +143,44 @@ public static Map<String, Object> decodeAsMap(BytesRef bytesRef) {
}

public static Map<String, Object> decodeAsMap(StreamInput in) throws IOException {
int size = in.readVInt();
Map<String, Object> result = new LinkedHashMap<>(size);

for (int i = 0; i < size; i++) {
String name = null;
try {
name = in.readSlicedBytesReference().utf8ToString();
} catch (AssertionError ae) {
throw new IllegalArgumentException("Error parsing keyword dimension: " + ae.getMessage(), ae);
}
try {
int size = in.readVInt();
Map<String, Object> result = new LinkedHashMap<>(size);

for (int i = 0; i < size; i++) {
String name = null;
try {
name = in.readSlicedBytesReference().utf8ToString();
} catch (AssertionError ae) {
throw new IllegalArgumentException("Error parsing keyword dimension: " + ae.getMessage(), ae);
}

int type = in.read();
switch (type) {
case (byte) 's' -> {
// parse a string
try {
result.put(name, in.readSlicedBytesReference().utf8ToString());
} catch (AssertionError ae) {
throw new IllegalArgumentException("Error parsing keyword dimension: " + ae.getMessage(), ae);
int type = in.read();
switch (type) {
case (byte) 's' -> {
// parse a string
try {
result.put(name, in.readSlicedBytesReference().utf8ToString());
} catch (AssertionError ae) {
throw new IllegalArgumentException("Error parsing keyword dimension: " + ae.getMessage(), ae);
}
}
case (byte) 'l' -> // parse a long
result.put(name, in.readLong());
case (byte) 'u' -> { // parse an unsigned_long
Object ul = DocValueFormat.UNSIGNED_LONG_SHIFTED.format(in.readLong());
result.put(name, ul);
}
case (byte) 'd' -> // parse a double
result.put(name, in.readDouble());
case (byte) 'b' -> // parse a boolean
result.put(name, in.read() == 't');
default -> throw new IllegalArgumentException("Cannot parse [" + name + "]: Unknown type [" + type + "]");
}
case (byte) 'l' -> // parse a long
result.put(name, in.readLong());
case (byte) 'u' -> { // parse an unsigned_long
Object ul = DocValueFormat.UNSIGNED_LONG_SHIFTED.format(in.readLong());
result.put(name, ul);
}
case (byte) 'd' -> // parse a double
result.put(name, in.readDouble());
default -> throw new IllegalArgumentException("Cannot parse [" + name + "]: Unknown type [" + type + "]");
}
return result;
} catch (IOException | IllegalArgumentException e) {
throw new IllegalArgumentException("Error while hashing dimensions: " + e.getMessage(), e);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.ByteUtils;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.fielddata.FieldDataContext;
Expand Down Expand Up @@ -52,6 +51,7 @@ public class DimensionRoutingHashFieldMapper extends MetadataFieldMapper {
}
return null;
});
static final NodeFeature TS_ROUTING_HASH_FIELD_PARSES_BYTES_REF = new NodeFeature("tsdb.ts_routing_hash_doc_value_parse_byte_ref");

static final class DimensionRoutingHashFieldType extends MappedFieldType {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexMode;
Expand Down Expand Up @@ -151,6 +152,7 @@ private DocumentParserContext(
SourceToParse sourceToParse,
Set<String> ignoreFields,
List<IgnoredSourceFieldMapper.NameValue> ignoredFieldValues,
Scope currentScope,
Map<String, List<Mapper>> dynamicMappers,
Map<String, ObjectMapper> dynamicObjectMappers,
Map<String, List<RuntimeField>> dynamicRuntimeFields,
Expand Down Expand Up @@ -828,7 +830,7 @@ public final DynamicTemplate findDynamicTemplate(String fieldName, DynamicTempla
* Identify the fields that match the routing path, for indexes in logs mode. These fields are equivalent to TSDB dimensions.
*/
public final void getDimensionsForLogsMode() {
if (indexSettings().getMode() == IndexMode.LOGS
if (indexSettings().getMode() == IndexMode.LOGSDB
&& indexSettings().getIndexRouting() instanceof IndexRouting.ExtractFromSource dimensionRouting) {
for (var mapper : mappingLookup().fieldMappers()) {
if (mapper instanceof FieldMapper fieldMapper && dimensionRouting.matchesField(fieldMapper.fullPath())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.mapper;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.mapper;
Expand All @@ -28,7 +29,7 @@

/**
* Mapper for {@code _logs_id} field generated when the index is
* {@link IndexMode#LOGS used for logs} and it contains a routing path.
* {@link IndexMode#LOGSDB used for logs} and it contains a routing path.
*/
public class LogsIdFieldMapper extends MetadataFieldMapper {

Expand Down Expand Up @@ -127,9 +128,4 @@ public void postParse(DocumentParserContext context) throws IOException {
protected String contentType() {
return NAME;
}

@Override
public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() {
return SourceLoader.SyntheticFieldLoader.NOTHING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Set<NodeFeature> getFeatures() {
FlattenedFieldMapper.IGNORE_ABOVE_SUPPORT,
IndexSettings.IGNORE_ABOVE_INDEX_LEVEL_SETTING,
SourceFieldMapper.SYNTHETIC_SOURCE_COPY_TO_INSIDE_OBJECTS_FIX,
TimeSeriesRoutingHashFieldMapper.TS_ROUTING_HASH_FIELD_PARSES_BYTES_REF,
DimensionRoutingHashFieldMapper.TS_ROUTING_HASH_FIELD_PARSES_BYTES_REF,
FlattenedFieldMapper.IGNORE_ABOVE_WITH_ARRAYS_SUPPORT,
DenseVectorFieldMapper.BBQ_FORMAT
);
Expand Down
Loading

0 comments on commit ccf9707

Please sign in to comment.