Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mitigate date histogram slowdowns with non-fixed timezones. #30534

Merged
merged 5 commits into from
May 16, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

package org.elasticsearch.search.aggregations.bucket.histogram;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.rounding.DateTimeUnit;
Expand All @@ -27,8 +31,14 @@
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.fielddata.AtomicNumericFieldData;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MappedFieldType.Relation;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.BucketOrder;
Expand All @@ -44,6 +54,7 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceParserHelper;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.internal.SearchContext;
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -351,20 +362,76 @@ public String getType() {
return NAME;
}

/*
* NOTE: this can't be done in rewrite() because the timezone is then also used on the
* coordinating node in order to generate missing buckets, which may cross a transition
* even though data on the shards doesn't.
*/
DateTimeZone rewriteTimeZone(QueryShardContext context) throws IOException {
final DateTimeZone tz = timeZone();
if (field() != null &&
tz != null &&
tz.isFixed() == false &&
field() != null &&
script() == null) {
final MappedFieldType ft = context.fieldMapper(field());
final IndexReader reader = context.getIndexReader();
if (ft != null && reader != null) {
Long anyInstant = null;
final IndexFieldData<?> fieldData = context.getForField(ft);
if (fieldData instanceof IndexNumericFieldData) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this not be guaranteed to be an IndexNumericFieldData? if so maybe this should either be an assert or we should throw an exception if this condition is false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. I had it implemented in rewrite() initially where it felt wrong since validation happens afterwards, but here the values source is already resolved so an exception would have been thrown already if the histogram ran against a non-numeric field

for (LeafReaderContext ctx : reader.leaves()) {
AtomicNumericFieldData leafFD = ((IndexNumericFieldData) fieldData).load(ctx);
SortedNumericDocValues values = leafFD.getLongValues();
if (values.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
anyInstant = values.nextValue();
break;
}
}

if (anyInstant != null) {
final long prevTransition = tz.previousTransition(anyInstant);
final long nextTransition = tz.nextTransition(anyInstant);
final DocValueFormat format = ft.docValueFormat(null, null);
final String formattedPrevTransition = format.format(prevTransition);
final String formattedNextTransition = format.format(nextTransition);
if (ft.isFieldWithinQuery(reader, formattedPrevTransition, formattedNextTransition,
false, false, tz, null, context) == Relation.WITHIN) {
// All values in this reader have the same offset despite daylight saving times.
// This is very common for location-based timezones such as Europe/Paris in
// combination with time-based indices.
return DateTimeZone.forOffsetMillis(tz.getOffset(anyInstant));
}
}
}
}
}
return tz;
}

@Override
protected ValuesSourceAggregatorFactory<Numeric, ?> innerBuild(SearchContext context, ValuesSourceConfig<Numeric> config,
AggregatorFactory<?> parent, Builder subFactoriesBuilder) throws IOException {
Rounding rounding = createRounding();
final DateTimeZone tz = timeZone();
final Rounding rounding = createRounding(tz);
final DateTimeZone rewrittenTimeZone = rewriteTimeZone(context.getQueryShardContext());
final Rounding shardRounding;
if (tz == rewrittenTimeZone) {
shardRounding = rounding;
} else {
shardRounding = createRounding(rewrittenTimeZone);
}

ExtendedBounds roundedBounds = null;
if (this.extendedBounds != null) {
// parse any string bounds to longs and round
roundedBounds = this.extendedBounds.parseAndValidate(name, context, config.format()).round(rounding);
}
return new DateHistogramAggregatorFactory(name, config, interval, dateHistogramInterval, offset, order, keyed, minDocCount,
rounding, roundedBounds, context, parent, subFactoriesBuilder, metaData);
rounding, shardRounding, roundedBounds, context, parent, subFactoriesBuilder, metaData);
}

private Rounding createRounding() {
private Rounding createRounding(DateTimeZone timeZone) {
Rounding.Builder tzRoundingBuilder;
if (dateHistogramInterval != null) {
DateTimeUnit dateTimeUnit = DATE_FIELD_UNITS.get(dateHistogramInterval.toString());
Expand All @@ -379,8 +446,8 @@ private Rounding createRounding() {
// the interval is an integer time value in millis?
tzRoundingBuilder = Rounding.builder(TimeValue.timeValueMillis(interval));
}
if (timeZone() != null) {
tzRoundingBuilder.timeZone(timeZone());
if (timeZone != null) {
tzRoundingBuilder.timeZone(timeZone);
}
Rounding rounding = tzRoundingBuilder.build();
return rounding;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class DateHistogramAggregator extends BucketsAggregator {
private final ValuesSource.Numeric valuesSource;
private final DocValueFormat formatter;
private final Rounding rounding;
private final Rounding shardRounding;
private final BucketOrder order;
private final boolean keyed;

Expand All @@ -64,14 +65,15 @@ class DateHistogramAggregator extends BucketsAggregator {
private final LongHash bucketOrds;
private long offset;

DateHistogramAggregator(String name, AggregatorFactories factories, Rounding rounding, long offset, BucketOrder order,
boolean keyed,
DateHistogramAggregator(String name, AggregatorFactories factories, Rounding rounding, Rounding shardRounding,
long offset, BucketOrder order, boolean keyed,
long minDocCount, @Nullable ExtendedBounds extendedBounds, @Nullable ValuesSource.Numeric valuesSource,
DocValueFormat formatter, SearchContext aggregationContext,
Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {

super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
this.rounding = rounding;
this.shardRounding = shardRounding;
this.offset = offset;
this.order = InternalOrder.validate(order, this);;
this.keyed = keyed;
Expand Down Expand Up @@ -105,7 +107,9 @@ public void collect(int doc, long bucket) throws IOException {
long previousRounded = Long.MIN_VALUE;
for (int i = 0; i < valuesCount; ++i) {
long value = values.nextValue();
long rounded = rounding.round(value - offset) + offset;
// We can use shardRounding here, which is sometimes more efficient
// if daylight saving times are involved.
long rounded = shardRounding.round(value - offset) + offset;
assert rounded >= previousRounded;
if (rounded == previousRounded) {
continue;
Expand Down Expand Up @@ -138,6 +142,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator(this));

// value source will be null for unmapped fields
// Important: use `rounding` here, not `shardRounding`
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding, buildEmptySubAggregations(), extendedBounds)
: null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ public final class DateHistogramAggregatorFactory
private final boolean keyed;
private final long minDocCount;
private final ExtendedBounds extendedBounds;
private Rounding rounding;
private final Rounding rounding;
private final Rounding shardRounding;

public DateHistogramAggregatorFactory(String name, ValuesSourceConfig<Numeric> config, long interval,
DateHistogramInterval dateHistogramInterval, long offset, BucketOrder order, boolean keyed, long minDocCount,
Rounding rounding, ExtendedBounds extendedBounds, SearchContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
Rounding rounding, Rounding shardRounding, ExtendedBounds extendedBounds, SearchContext context,
AggregatorFactory<?> parent, AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metaData) throws IOException {
super(name, config, context, parent, subFactoriesBuilder, metaData);
this.interval = interval;
this.dateHistogramInterval = dateHistogramInterval;
Expand All @@ -60,6 +62,7 @@ public DateHistogramAggregatorFactory(String name, ValuesSourceConfig<Numeric> c
this.minDocCount = minDocCount;
this.extendedBounds = extendedBounds;
this.rounding = rounding;
this.shardRounding = shardRounding;
}

public long minDocCount() {
Expand All @@ -77,8 +80,8 @@ protected Aggregator doCreateInternal(ValuesSource.Numeric valuesSource, Aggrega

private Aggregator createAggregator(ValuesSource.Numeric valuesSource, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new DateHistogramAggregator(name, factories, rounding, offset, order, keyed, minDocCount, extendedBounds, valuesSource,
config.format(), context, parent, pipelineAggregators, metaData);
return new DateHistogramAggregator(name, factories, rounding, shardRounding, offset, order, keyed, minDocCount, extendedBounds,
valuesSource, config.format(), context, parent, pipelineAggregators, metaData);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,27 @@
* under the License.
*/

package org.elasticsearch.search.aggregations.bucket;
package org.elasticsearch.search.aggregations.bucket.histogram;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.BaseAggregationTestCase;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBoundsTests;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.joda.time.DateTimeZone;
import org.junit.Assume;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -120,4 +133,52 @@ private List<BucketOrder> randomOrder() {
return orders;
}

private static Document documentForDate(String field, long millis) {
Document doc = new Document();
doc.add(new LongPoint(field, millis));
doc.add(new SortedNumericDocValuesField(field, millis));
return doc;
}

public void testRewriteTimeZone() throws IOException {
Assume.assumeTrue(getCurrentTypes().length > 0); // we need mappings
FormatDateTimeFormatter format = Joda.forPattern("strict_date_optional_time");

try (Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig())) {

w.addDocument(documentForDate(DATE_FIELD_NAME, format.parser().parseDateTime("2018-03-11T11:55:00").getMillis()));
w.addDocument(documentForDate(DATE_FIELD_NAME, format.parser().parseDateTime("2018-03-20T18:13:00").getMillis()));

try (IndexReader readerThatDoesntCross = DirectoryReader.open(w)) {

w.addDocument(documentForDate(DATE_FIELD_NAME, format.parser().parseDateTime("2018-03-25T02:44:00").getMillis()));

try (IndexReader readerThatCrosses = DirectoryReader.open(w)) {

QueryShardContext shardContextThatDoesntCross = createShardContext(readerThatDoesntCross);
QueryShardContext shardContextThatCrosses = createShardContext(readerThatCrosses);

DateHistogramAggregationBuilder builder = new DateHistogramAggregationBuilder("my_date_histo");
builder.field(DATE_FIELD_NAME);
// no timeZone => no rewrite
assertNull(builder.rewriteTimeZone(shardContextThatDoesntCross));
assertNull(builder.rewriteTimeZone(shardContextThatCrosses));

// fixed timeZone => no rewrite
DateTimeZone tz = DateTimeZone.forOffsetHours(1);
builder.timeZone(tz);
assertSame(tz, builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(tz, builder.rewriteTimeZone(shardContextThatCrosses));

// daylight-saving-times => rewrite if doesn't cross
tz = DateTimeZone.forID("Europe/Paris");
builder.timeZone(tz);
assertEquals(DateTimeZone.forOffsetHours(1), builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(tz, builder.rewriteTimeZone(shardContextThatCrosses));
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also have a test that makes sure fixed time zones are not changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the above assertions check it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I had missed that, you are right

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.test.AbstractBuilderTestCase;
import org.elasticsearch.test.AbstractQueryTestCase;
import org.elasticsearch.test.ESTestCase;

Expand All @@ -50,60 +51,12 @@
import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode;
import static org.hamcrest.Matchers.hasSize;

public abstract class BaseAggregationTestCase<AB extends AbstractAggregationBuilder<AB>> extends ESTestCase {
public abstract class BaseAggregationTestCase<AB extends AbstractAggregationBuilder<AB>> extends AbstractBuilderTestCase {

protected static final String STRING_FIELD_NAME = "mapped_string";
protected static final String INT_FIELD_NAME = "mapped_int";
protected static final String DOUBLE_FIELD_NAME = "mapped_double";
protected static final String BOOLEAN_FIELD_NAME = "mapped_boolean";
protected static final String DATE_FIELD_NAME = "mapped_date";
protected static final String IP_FIELD_NAME = "mapped_ip";

private String[] currentTypes;

protected String[] getCurrentTypes() {
return currentTypes;
}

private NamedWriteableRegistry namedWriteableRegistry;
private NamedXContentRegistry xContentRegistry;
protected abstract AB createTestAggregatorBuilder();

protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.emptyList();
}

/**
* Setup for the whole base test class.
*/
@Override
public void setUp() throws Exception {
super.setUp();
Settings settings = Settings.builder()
.put("node.name", AbstractQueryTestCase.class.toString())
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.build();
IndicesModule indicesModule = new IndicesModule(Collections.emptyList());
PluginsService pluginsService = new PluginsService(settings, null, null, null, getPlugins());
SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
entries.addAll(indicesModule.getNamedWriteables());
entries.addAll(searchModule.getNamedWriteables());
namedWriteableRegistry = new NamedWriteableRegistry(entries);
xContentRegistry = new NamedXContentRegistry(searchModule.getNamedXContents());
//create some random type with some default field, those types will stick around for all of the subclasses
currentTypes = new String[randomIntBetween(0, 5)];
for (int i = 0; i < currentTypes.length; i++) {
String type = randomAlphaOfLengthBetween(1, 10);
currentTypes[i] = type;
}
}

@Override
protected NamedXContentRegistry xContentRegistry() {
return xContentRegistry;
}

/**
* Generic test that creates new AggregatorFactory from the test
* AggregatorFactory and checks both for equality and asserts equality on
Expand Down Expand Up @@ -157,7 +110,7 @@ public void testSerialization() throws IOException {
AB testAgg = createTestAggregatorBuilder();
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.writeNamedWriteable(testAgg);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry())) {
AggregationBuilder deserialized = in.readNamedWriteable(AggregationBuilder.class);
assertEquals(testAgg, deserialized);
assertEquals(testAgg.hashCode(), deserialized.hashCode());
Expand All @@ -181,12 +134,12 @@ public void testShallowCopy() {

// we use the streaming infra to create a copy of the query provided as
// argument
private AB copyAggregation(AB agg) throws IOException {
protected AB copyAggregation(AB agg) throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
agg.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry())) {
@SuppressWarnings("unchecked")
AB secondAgg = (AB) namedWriteableRegistry.getReader(AggregationBuilder.class, agg.getWriteableName()).read(in);
AB secondAgg = (AB) namedWriteableRegistry().getReader(AggregationBuilder.class, agg.getWriteableName()).read(in);
return secondAgg;
}
}
Expand Down
Loading