Skip to content

Commit

Permalink
backport of: Add rollover-creation-date setting to rolled over index (#…
Browse files Browse the repository at this point in the history
…31144) (#31413)

This commit introduces a new property to IndexMetaData called
RolloverInfo. This object contains a map containing the aliases
that were used to rollover the related index, which conditions
were met, and at what time the rollover took place.

much like the `index.creation_date`, it captures the approximate time
that the index was rolled over to a new one.

set version serialization check to 6.4
  • Loading branch information
talevy authored Jun 20, 2018
1 parent ee023b2 commit a270984
Show file tree
Hide file tree
Showing 11 changed files with 356 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;

Expand Down Expand Up @@ -64,4 +65,12 @@ public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.field(NAME, value.getStringRep());
}

public static MaxAgeCondition fromXContent(XContentParser parser) throws IOException {
if (parser.nextToken() == XContentParser.Token.VALUE_STRING) {
return new MaxAgeCondition(TimeValue.parseTimeValue(parser.text(), NAME));
} else {
throw new IllegalArgumentException("invalid token: " + parser.currentToken());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;

Expand Down Expand Up @@ -61,4 +62,12 @@ public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.field(NAME, value);
}

public static MaxDocsCondition fromXContent(XContentParser parser) throws IOException {
if (parser.nextToken() == XContentParser.Token.VALUE_NUMBER) {
return new MaxDocsCondition(parser.longValue());
} else {
throw new IllegalArgumentException("invalid token: " + parser.currentToken());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;

Expand Down Expand Up @@ -70,4 +71,12 @@ public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.field(NAME, value.getStringRep());
}

public static MaxSizeCondition fromXContent(XContentParser parser) throws IOException {
if (parser.nextToken() == XContentParser.Token.VALUE_STRING) {
return new MaxSizeCondition(ByteSizeValue.parseBytesSizeValue(parser.text(), NAME));
} else {
throw new IllegalArgumentException("invalid token: " + parser.currentToken());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.indices.rollover;

import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

/**
* Class for holding Rollover related information within an index
*/
public class RolloverInfo extends AbstractDiffable<RolloverInfo> implements Writeable, ToXContentFragment {

public static final ParseField CONDITION_FIELD = new ParseField("met_conditions");
public static final ParseField TIME_FIELD = new ParseField("time");

@SuppressWarnings("unchecked")
public static ConstructingObjectParser<RolloverInfo, String> PARSER = new ConstructingObjectParser<>("rollover_info", false,
(a, alias) -> new RolloverInfo(alias, (List<Condition>) a[0], (Long) a[1]));
static {
PARSER.declareNamedObjects(ConstructingObjectParser.constructorArg(),
(p, c, n) -> p.namedObject(Condition.class, n, c), CONDITION_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIME_FIELD);
}

private final String alias;
private final List<Condition> metConditions;
private final long time;

public RolloverInfo(String alias, List<Condition> metConditions, long time) {
this.alias = alias;
this.metConditions = metConditions;
this.time = time;
}

public RolloverInfo(StreamInput in) throws IOException {
this.alias = in.readString();
this.time = in.readVLong();
this.metConditions = in.readNamedWriteableList(Condition.class);
}

public static RolloverInfo parse(XContentParser parser, String alias) {
return PARSER.apply(parser, alias);
}

public String getAlias() {
return alias;
}

public List<Condition> getMetConditions() {
return metConditions;
}

public long getTime() {
return time;
}

public static Diff<RolloverInfo> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(RolloverInfo::new, in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(alias);
out.writeVLong(time);
out.writeNamedWriteableList(metConditions);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(alias);
builder.startObject(CONDITION_FIELD.getPreferredName());
for (Condition condition : metConditions) {
condition.toXContent(builder, params);
}
builder.endObject();
builder.field(TIME_FIELD.getPreferredName(), time);
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(alias, metConditions, time);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
}
RolloverInfo other = (RolloverInfo) obj;
return Objects.equals(alias, other.alias) &&
Objects.equals(metConditions, other.metConditions) &&
Objects.equals(time, other.time);
}

@Override
public String toString() {
return Strings.toString(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasAction;
Expand Down Expand Up @@ -131,7 +132,9 @@ public void onResponse(IndicesStatsResponse statsResponse) {
new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults, true, false, false, false));
return;
}
if (conditionResults.size() == 0 || conditionResults.values().stream().anyMatch(result -> result)) {
List<Condition> metConditions = rolloverRequest.getConditions().values().stream()
.filter(condition -> conditionResults.get(condition.toString())).collect(Collectors.toList());
if (conditionResults.size() == 0 || metConditions.size() > 0) {
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(unresolvedName, rolloverIndexName,
rolloverRequest);
createIndexService.createIndex(updateRequest, ActionListener.wrap(createIndexClusterStateUpdateResponse -> {
Expand All @@ -141,13 +144,33 @@ public void onResponse(IndicesStatsResponse statsResponse) {
rolloverRequest),
ActionListener.wrap(aliasClusterStateUpdateResponse -> {
if (aliasClusterStateUpdateResponse.isAcknowledged()) {
activeShardsObserver.waitForActiveShards(new String[]{rolloverIndexName},
rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
rolloverRequest.masterNodeTimeout(),
isShardsAcknowledged -> listener.onResponse(new RolloverResponse(
sourceIndexName, rolloverIndexName, conditionResults, false, true, true,
isShardsAcknowledged)),
listener::onFailure);
clusterService.submitStateUpdateTask("update_rollover_info", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
RolloverInfo rolloverInfo = new RolloverInfo(rolloverRequest.getAlias(), metConditions,
threadPool.absoluteTimeInMillis());
return ClusterState.builder(currentState)
.metaData(MetaData.builder(currentState.metaData())
.put(IndexMetaData.builder(currentState.metaData().index(sourceIndexName))
.putRolloverInfo(rolloverInfo))).build();
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
activeShardsObserver.waitForActiveShards(new String[]{rolloverIndexName},
rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
rolloverRequest.masterNodeTimeout(),
isShardsAcknowledged -> listener.onResponse(new RolloverResponse(
sourceIndexName, rolloverIndexName, conditionResults, false, true, true,
isShardsAcknowledged)),
listener::onFailure);
}
});
} else {
listener.onResponse(new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults,
false, true, false, false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.node.InternalSettingsPreparer;
import org.elasticsearch.node.Node;
Expand Down Expand Up @@ -149,9 +150,11 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
SettingsModule settingsModule = new SettingsModule(settings, additionalSettings, additionalSettingsFilter);

SearchModule searchModule = new SearchModule(settings, true, pluginsService.filterPlugins(SearchPlugin.class));
IndicesModule indicesModule = new IndicesModule(Collections.emptyList());
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
entries.addAll(NetworkModule.getNamedWriteables());
entries.addAll(searchModule.getNamedWriteables());
entries.addAll(indicesModule.getNamedWriteables());
entries.addAll(ClusterModule.getNamedWriteables());
entries.addAll(pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.getNamedWriteables().stream())
Expand Down
Loading

0 comments on commit a270984

Please sign in to comment.