Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding back [Time series based workload desc order optimization through reverse segment read (#7244)] with fixes #7967

Merged
merged 5 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452))
- Add Remote store as a segment replication source ([#7653](https://github.com/opensearch-project/OpenSearch/pull/7653))
- Add descending order search optimization through reverse segment read. ([#7967](https://github.com/opensearch-project/OpenSearch/pull/7967))


### Dependencies
- Bump `jackson` from 2.15.1 to 2.15.2 ([#7897](https://github.com/opensearch-project/OpenSearch/pull/7897))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@

package org.opensearch.cluster.metadata;

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.PointValues;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.AbstractDiffable;
import org.opensearch.cluster.Diff;
import org.opensearch.core.ParseField;
Expand All @@ -46,6 +50,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -59,6 +64,24 @@
public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {

public static final String BACKING_INDEX_PREFIX = ".ds-";
public static final String TIMESERIES_FIELDNAME = "@timestamp";
public static final Comparator<LeafReader> TIMESERIES_LEAF_SORTER = Comparator.comparingLong((LeafReader r) -> {
try {
PointValues points = r.getPointValues(TIMESERIES_FIELDNAME);
if (points != null) {
// could be a multipoint (probably not) but get the maximum time value anyway
byte[] sortValue = points.getMaxPackedValue();
// decode the first dimension because this should not be a multi dimension field
// it's a bug in the date field if it is
return LongPoint.decodeDimension(sortValue, 0);
} else {
// segment does not have a timestamp field, just return the minimum value
return Long.MIN_VALUE;
}
} catch (IOException e) {
throw new OpenSearchException("Not a timeseries Index! Field [{}] not found!", TIMESERIES_FIELDNAME);
}
}).reversed();

private final String name;
private final TimestampField timeStampField;
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
private volatile long mappingTotalFieldsLimit;
private volatile long mappingDepthLimit;
private volatile long mappingFieldNameLengthLimit;
private volatile boolean searchSegmentOrderReversed;

/**
* The maximum number of refresh listeners allows on this shard.
Expand Down Expand Up @@ -905,6 +906,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(DEFAULT_SEARCH_PIPELINE, this::setDefaultSearchPipeline);
}

private void setSearchSegmentOrderReversed(boolean reversed) {
this.searchSegmentOrderReversed = reversed;
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
this.searchIdleAfter = searchIdleAfter;
}
Expand Down Expand Up @@ -1084,6 +1089,13 @@ public Settings getNodeSettings() {
return nodeSettings;
}

/**
* Returns true if index level setting for leaf reverse order search optimization is enabled
*/
public boolean getSearchSegmentOrderReversed() {
return this.searchSegmentOrderReversed;
}

/**
* Updates the settings and index metadata and notifies all registered settings consumers with the new settings iff at least one
* setting has changed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
Expand All @@ -59,6 +60,7 @@
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.threadpool.ThreadPool;

import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -102,6 +104,7 @@ public final class EngineConfig {
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private final boolean isReadOnlyReplica;
private final BooleanSupplier primaryModeSupplier;
private final Comparator<LeafReader> leafSorter;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -204,6 +207,7 @@ private EngineConfig(Builder builder) {
this.isReadOnlyReplica = builder.isReadOnlyReplica;
this.primaryModeSupplier = builder.primaryModeSupplier;
this.translogFactory = builder.translogFactory;
this.leafSorter = builder.leafSorter;
}

/**
Expand Down Expand Up @@ -451,6 +455,15 @@ public TranslogDeletionPolicyFactory getCustomTranslogDeletionPolicyFactory() {
return translogDeletionPolicyFactory;
}

/**
* Returns subReaderSorter for org.apache.lucene.index.BaseCompositeReader.
* This gets used in lucene IndexReader and decides order of segment read.
* @return comparator
*/
public Comparator<LeafReader> getLeafSorter() {
return this.leafSorter;
}

/**
* Builder for EngineConfig class
*
Expand Down Expand Up @@ -483,6 +496,7 @@ public static class Builder {
private boolean isReadOnlyReplica;
private BooleanSupplier primaryModeSupplier;
private TranslogFactory translogFactory = new InternalTranslogFactory();
Comparator<LeafReader> leafSorter;

public Builder shardId(ShardId shardId) {
this.shardId = shardId;
Expand Down Expand Up @@ -614,6 +628,11 @@ public Builder translogFactory(TranslogFactory translogFactory) {
return this;
}

public Builder leafSorter(Comparator<LeafReader> leafSorter) {
this.leafSorter = leafSorter;
return this;
}

public EngineConfig build() {
return new EngineConfig(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
Expand All @@ -36,6 +37,7 @@

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -151,7 +153,8 @@ public EngineConfig newEngineConfig(
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica,
BooleanSupplier primaryModeSupplier,
TranslogFactory translogFactory
TranslogFactory translogFactory,
Comparator<LeafReader> leafSorter
) {
CodecService codecServiceToUse = codecService;
if (codecService == null && this.codecServiceFactory != null) {
Expand Down Expand Up @@ -184,6 +187,7 @@ public EngineConfig newEngineConfig(
.readOnlyReplica(isReadOnlyReplica)
.primaryModeSupplier(primaryModeSupplier)
.translogFactory(translogFactory)
.leafSorter(leafSorter)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2322,6 +2322,9 @@ private IndexWriterConfig getIndexWriterConfig() {
if (config().getIndexSort() != null) {
iwc.setIndexSort(config().getIndexSort());
}
if (config().getLeafSorter() != null) {
iwc.setLeafSorter(config().getLeafSorter()); // The default segment search order
}
return iwc;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index.mapper;

import org.apache.lucene.analysis.Analyzer;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.analysis.FieldNameAnalyzer;

Expand Down Expand Up @@ -261,6 +262,15 @@ public String getNestedScope(String path) {
return null;
}

/**
* If this index contains @timestamp field with Date type, it will return true
* @return true or false based on above condition
*/
public boolean containsTimeStampField() {
MappedFieldType timeSeriesFieldType = this.fieldTypeLookup.get(DataStream.TIMESERIES_FIELDNAME);
return timeSeriesFieldType != null && timeSeriesFieldType instanceof DateFieldMapper.DateFieldType; // has to be Date field type
}

private static String parentObject(String field) {
int lastDot = field.lastIndexOf('.');
if (lastDot == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.core.Assertions;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
Expand Down Expand Up @@ -333,6 +334,7 @@ Runnable getGlobalCheckpointSyncer() {
private volatile boolean useRetentionLeasesInPeerRecovery;
private final Store remoteStore;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final boolean isTimeSeriesIndex;
private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;

public IndexShard(
Expand Down Expand Up @@ -451,6 +453,9 @@ public boolean shouldCache(Query query) {
this.checkpointPublisher = checkpointPublisher;
this.remoteStore = remoteStore;
this.translogFactorySupplier = translogFactorySupplier;
this.isTimeSeriesIndex = (mapperService == null || mapperService.documentMapper() == null)
? false
: mapperService.documentMapper().mappers().containsTimeStampField();
this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService;
}

Expand Down Expand Up @@ -3627,7 +3632,9 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
tombstoneDocSupplier(),
isReadOnlyReplica,
replicationTracker::isPrimaryMode,
translogFactorySupplier.apply(indexSettings, shardRouting)
translogFactorySupplier.apply(indexSettings, shardRouting),
isTimeSeriesDescSortOptimizationEnabled() ? DataStream.TIMESERIES_LEAF_SORTER : null // DESC @timestamp default order for
// timeseries
);
}

Expand All @@ -3639,6 +3646,14 @@ public boolean isRemoteTranslogEnabled() {
return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled();
}

/**
* @return true if segment reverse search optimization is enabled for time series based workload.
*/
public boolean isTimeSeriesDescSortOptimizationEnabled() {
// Do not change segment order in case of index sort.
return isTimeSeriesIndex && getIndexSort() == null;
}

/**
* @return True if settings indicate this shard is backed by a remote snapshot, false otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.CombinedBitSet;
import org.apache.lucene.util.SparseFixedBitSet;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.core.common.lease.Releasable;
import org.opensearch.search.DocValueFormat;
Expand Down Expand Up @@ -282,8 +283,17 @@ public void search(

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
for (LeafReaderContext ctx : leaves) { // search each subreader
searchLeaf(ctx, weight, collector);
if (shouldReverseLeafReaderContexts()) {
// reverse the segment search order if this flag is true.
// Certain queries can benefit if we reverse the segment read order,
// for example time series based queries if searched for desc sort order.
for (int i = leaves.size() - 1; i >= 0; i--) {
searchLeaf(leaves.get(i), weight, collector);
}
} else {
for (int i = 0; i < leaves.size(); i++) {
searchLeaf(leaves.get(i), weight, collector);
}
}
}

Expand Down Expand Up @@ -496,4 +506,24 @@ private boolean canMatchSearchAfter(LeafReaderContext ctx) throws IOException {
}
return true;
}

private boolean shouldReverseLeafReaderContexts() {
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext != null && searchContext.indexShard().isTimeSeriesDescSortOptimizationEnabled()) {
// Only reverse order for asc order sort queries
if (searchContext.sort() != null
&& searchContext.sort().sort != null
&& searchContext.sort().sort.getSort() != null
&& searchContext.sort().sort.getSort().length > 0
&& searchContext.sort().sort.getSort()[0].getReverse() == false
&& searchContext.sort().sort.getSort()[0].getField() != null
&& searchContext.sort().sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public void testCreateEngineConfigFromFactory() {
null,
false,
() -> Boolean.TRUE,
new InternalTranslogFactory()
new InternalTranslogFactory(),
null
);

assertNotNull(config.getCodec());
Expand Down Expand Up @@ -148,7 +149,8 @@ public void testCreateCodecServiceFromFactory() {
null,
false,
() -> Boolean.TRUE,
new InternalTranslogFactory()
new InternalTranslogFactory(),
null
);
assertNotNull(config.getCodec());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ public Settings indexSettings() {
).getStringRep()
);
}

return builder.build();
}

Expand Down