From 6830b7a910a47b52d10cc61d42fd096c20cbb6bb Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Thu, 10 Dec 2020 15:27:57 -0600 Subject: [PATCH] [7.x] Migrate aliased indices to data stream (#66176) --- .../metadata/ComposableIndexTemplate.java | 6 + .../cluster/metadata/DataStream.java | 5 +- .../cluster/metadata/IndexAbstraction.java | 2 - .../MetadataCreateDataStreamService.java | 96 +++-- .../MetadataMigrateToDataStreamService.java | 217 +++++++++++ .../MetadataCreateDataStreamServiceTests.java | 1 - ...tadataMigrateToDataStreamServiceTests.java | 336 ++++++++++++++++++ .../xpack/core/MigrateToDataStreamAction.java | 88 +++++ .../datastreams/DataStreamIT.java | 10 +- .../datastreams/DataStreamMigrationIT.java | 273 ++++++++++++++ .../xpack/datastreams/DataStreamsPlugin.java | 3 + .../MigrateToDataStreamTransportAction.java | 69 ++++ .../DataStreamTimestampFieldMapper.java | 4 +- .../DataStreamTimestampFieldMapperTests.java | 15 +- .../xpack/security/operator/Constants.java | 1 + .../security/authz/AuthorizationService.java | 9 +- 16 files changed, 1088 insertions(+), 47 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamServiceTests.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/MigrateToDataStreamAction.java create mode 100644 x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamMigrationIT.java create mode 100644 x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/MigrateToDataStreamTransportAction.java diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java index 3a92a4db832b7..90897b63e674f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java @@ -112,6 +112,12 @@ public ComposableIndexTemplate(List indexPatterns, @Nullable Template te this(indexPatterns, template, componentTemplates, priority, version, metadata, null, null); } + public ComposableIndexTemplate(List indexPatterns, @Nullable Template template, @Nullable List componentTemplates, + @Nullable Long priority, @Nullable Long version, @Nullable Map metadata, + @Nullable DataStreamTemplate dataStreamTemplate) { + this(indexPatterns, template, componentTemplates, priority, version, metadata, dataStreamTemplate, null); + } + public ComposableIndexTemplate(List indexPatterns, @Nullable Template template, @Nullable List componentTemplates, @Nullable Long priority, @Nullable Long version, @Nullable Map metadata, @Nullable DataStreamTemplate dataStreamTemplate, @Nullable Boolean allowAutoCreate) { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 95c40187b25cb..a7dffb9b44087 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -71,7 +71,6 @@ public DataStream(String name, TimestampField timeStampField, List indice this.hidden = hidden; this.replicated = replicated; assert indices.size() > 0; - assert indices.get(indices.size() - 1).getName().equals(getDefaultBackingIndexName(name, generation)); } public DataStream(String name, TimestampField timeStampField, List indices) { @@ -94,6 +93,10 @@ public long getGeneration() { return generation; } + public Index getWriteIndex() { + return indices.get(indices.size() - 1); + } + @Nullable public Map getMetadata() { return metadata; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java index 3cfebe47c6ed2..c29b854101ba1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java @@ -31,7 +31,6 @@ import java.util.Objects; import java.util.stream.Collectors; -import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_HIDDEN_SETTING; import static org.elasticsearch.common.collect.List.copyOf; @@ -309,7 +308,6 @@ public DataStream(org.elasticsearch.cluster.metadata.DataStream dataStream, List this.dataStream = dataStream; this.dataStreamIndices = copyOf(dataStreamIndices); this.writeIndex = dataStreamIndices.get(dataStreamIndices.size() - 1); - assert writeIndex.getIndex().getName().equals(getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration())); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index 589f7e91101c3..ed111c41cffc0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -33,21 +33,24 @@ import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ObjectPath; +import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MetadataFieldMapper; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; public class MetadataCreateDataStreamService { @@ -116,54 +119,81 @@ public CreateDataStreamClusterStateUpdateRequest(String name, static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService, ClusterState currentState, CreateDataStreamClusterStateUpdateRequest request) throws Exception { + return createDataStream(metadataCreateIndexService, currentState, request.name, org.elasticsearch.common.collect.List.of(), null); + } + + /** + * Creates a data stream with the specified properties. + * + * @param metadataCreateIndexService Used if a new write index must be created + * @param currentState Cluster state + * @param dataStreamName Name of the data stream + * @param backingIndices List of backing indices. May be empty + * @param writeIndex Write index for the data stream. If null, a new write index will be created. + * @return Cluster state containing the new data stream + */ + static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService, + ClusterState currentState, + String dataStreamName, + List backingIndices, + IndexMetadata writeIndex) throws Exception + { if (currentState.nodes().getMinNodeVersion().before(Version.V_7_9_0)) { throw new IllegalStateException("data streams require minimum node version of " + Version.V_7_9_0); } - - if (currentState.metadata().dataStreams().containsKey(request.name)) { - throw new ResourceAlreadyExistsException("data_stream [" + request.name + "] already exists"); + if (writeIndex == null) { + Objects.requireNonNull(metadataCreateIndexService); + } + Objects.requireNonNull(currentState); + Objects.requireNonNull(backingIndices); + if (currentState.metadata().dataStreams().containsKey(dataStreamName)) { + throw new ResourceAlreadyExistsException("data_stream [" + dataStreamName + "] already exists"); } - MetadataCreateIndexService.validateIndexOrAliasName(request.name, + MetadataCreateIndexService.validateIndexOrAliasName(dataStreamName, (s1, s2) -> new IllegalArgumentException("data_stream [" + s1 + "] " + s2)); - if (request.name.toLowerCase(Locale.ROOT).equals(request.name) == false) { - throw new IllegalArgumentException("data_stream [" + request.name + "] must be lowercase"); + if (dataStreamName.toLowerCase(Locale.ROOT).equals(dataStreamName) == false) { + throw new IllegalArgumentException("data_stream [" + dataStreamName + "] must be lowercase"); } - if (request.name.startsWith(DataStream.BACKING_INDEX_PREFIX)) { - throw new IllegalArgumentException("data_stream [" + request.name + "] must not start with '" + if (dataStreamName.startsWith(DataStream.BACKING_INDEX_PREFIX)) { + throw new IllegalArgumentException("data_stream [" + dataStreamName + "] must not start with '" + DataStream.BACKING_INDEX_PREFIX + "'"); } - ComposableIndexTemplate template = lookupTemplateForDataStream(request.name, currentState.metadata()); - - String firstBackingIndexName = DataStream.getDefaultBackingIndexName(request.name, 1); - CreateIndexClusterStateUpdateRequest createIndexRequest = - new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName) - .dataStreamName(request.name) - .settings(Settings.builder().put("index.hidden", true).build()); - try { - currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false); - } catch (ResourceAlreadyExistsException e) { - // Rethrow as ElasticsearchStatusException, so that bulk transport action doesn't ignore it during - // auto index/data stream creation. - // (otherwise bulk execution fails later, because data stream will also not have been created) - throw new ElasticsearchStatusException("data stream could not be created because backing index [{}] already exists", - RestStatus.BAD_REQUEST, e, firstBackingIndexName); + ComposableIndexTemplate template = lookupTemplateForDataStream(dataStreamName, currentState.metadata()); + + if (writeIndex == null) { + String firstBackingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + CreateIndexClusterStateUpdateRequest createIndexRequest = + new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName) + .dataStreamName(dataStreamName) + .settings(Settings.builder().put("index.hidden", true).build()); + try { + currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false); + } catch (ResourceAlreadyExistsException e) { + // Rethrow as ElasticsearchStatusException, so that bulk transport action doesn't ignore it during + // auto index/data stream creation. + // (otherwise bulk execution fails later, because data stream will also not have been created) + throw new ElasticsearchStatusException("data stream could not be created because backing index [{}] already exists", + RestStatus.BAD_REQUEST, e, firstBackingIndexName); + } + writeIndex = currentState.metadata().index(firstBackingIndexName); } - IndexMetadata firstBackingIndex = currentState.metadata().index(firstBackingIndexName); - assert firstBackingIndex != null; - assert firstBackingIndex.mapping() != null : "no mapping found for backing index [" + firstBackingIndexName + "]"; + assert writeIndex != null; + assert writeIndex.mapping() != null : "no mapping found for backing index [" + writeIndex.getIndex().getName() + "]"; String fieldName = template.getDataStreamTemplate().getTimestampField(); DataStream.TimestampField timestampField = new DataStream.TimestampField(fieldName); + List dsBackingIndices = backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()); + dsBackingIndices.add(writeIndex.getIndex()); boolean hidden = template.getDataStreamTemplate().isHidden(); - DataStream newDataStream = - new DataStream(request.name, timestampField, - Collections.singletonList(firstBackingIndex.getIndex()), 1L, - template.metadata() != null ? Collections.unmodifiableMap(new HashMap<>(template.metadata())) : null, hidden, false); + DataStream newDataStream = new DataStream(dataStreamName, timestampField, dsBackingIndices, 1L, + template.metadata() != null ? org.elasticsearch.common.collect.Map.copyOf(template.metadata()) : null, hidden, false); Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream); - logger.info("adding data stream [{}]", request.name); + logger.info("adding data stream [{}] with write index [{}] and backing indices [{}]", dataStreamName, + writeIndex.getIndex().getName(), + Strings.arrayToCommaDelimitedString(backingIndices.stream().map(i -> i.getIndex().getName()).toArray())); return ClusterState.builder(currentState).metadata(builder).build(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java new file mode 100644 index 0000000000000..4ef994b6dc2b5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java @@ -0,0 +1,217 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.ActiveShardsObserver; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Map; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class MetadataMigrateToDataStreamService { + + private static final Logger logger = LogManager.getLogger(MetadataMigrateToDataStreamService.class); + + private final ClusterService clusterService; + private final ActiveShardsObserver activeShardsObserver; + private final IndicesService indexServices; + + @Inject + public MetadataMigrateToDataStreamService(ThreadPool threadPool, + ClusterService clusterService, + IndicesService indexServices) { + this.clusterService = clusterService; + this.indexServices = indexServices; + this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool); + } + + public void migrateToDataStream(MigrateToDataStreamClusterStateUpdateRequest request, + ActionListener finalListener) { + AtomicReference writeIndexRef = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap( + response -> { + if (response.isAcknowledged()) { + String writeIndexName = writeIndexRef.get(); + assert writeIndexName != null; + activeShardsObserver.waitForActiveShards( + new String[]{writeIndexName}, + ActiveShardCount.DEFAULT, + request.masterNodeTimeout(), + shardsAcked -> { + finalListener.onResponse(AcknowledgedResponse.TRUE); + }, + finalListener::onFailure); + } else { + finalListener.onResponse(AcknowledgedResponse.FALSE); + } + }, + finalListener::onFailure + ); + clusterService.submitStateUpdateTask("migrate-to-data-stream [" + request.aliasName + "]", + new AckedClusterStateUpdateTask(Priority.HIGH, request, listener) { + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + ClusterState clusterState = migrateToDataStream( + currentState, + indexMetadata -> { + try { + return indexServices.createIndexMapperService(indexMetadata); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }, + request); + writeIndexRef.set(clusterState.metadata().dataStreams().get(request.aliasName).getWriteIndex().getName()); + return clusterState; + } + }); + } + + static ClusterState migrateToDataStream(ClusterState currentState, + Function mapperSupplier, + MigrateToDataStreamClusterStateUpdateRequest request) throws Exception { + if (currentState.nodes().getMinNodeVersion().before(Version.V_7_11_0)) { + throw new IllegalStateException("data stream migration requires minimum node version of " + Version.V_7_11_0); + } + + validateRequest(currentState, request); + IndexAbstraction.Alias alias = (IndexAbstraction.Alias) currentState.metadata().getIndicesLookup().get(request.aliasName); + + validateBackingIndices(currentState, request.aliasName); + Metadata.Builder mb = Metadata.builder(currentState.metadata()); + for (IndexMetadata im : alias.getIndices()) { + prepareBackingIndex(mb, im, request.aliasName, mapperSupplier); + } + currentState = ClusterState.builder(currentState).metadata(mb).build(); + + IndexMetadata writeIndex = alias.getWriteIndex(); + + List backingIndices = alias.getIndices() + .stream() + .filter(x -> writeIndex == null || x.getIndex().getName().equals(writeIndex.getIndex().getName()) == false) + .collect(Collectors.toList()); + + logger.info("submitting request to migrate alias [{}] to a data stream", request.aliasName); + return MetadataCreateDataStreamService.createDataStream(null, currentState, request.aliasName, backingIndices, writeIndex); + } + + // package-visible for testing + static void validateRequest(ClusterState currentState, MigrateToDataStreamClusterStateUpdateRequest request) { + IndexAbstraction ia = currentState.metadata().getIndicesLookup().get(request.aliasName); + if (ia == null || ia.getType() != IndexAbstraction.Type.ALIAS) { + throw new IllegalArgumentException("alias [" + request.aliasName + "] does not exist"); + } + IndexAbstraction.Alias alias = (IndexAbstraction.Alias) ia; + + if (alias.getWriteIndex() == null) { + throw new IllegalArgumentException("alias [" + request.aliasName + "] must specify a write index"); + } + + // check for "clean" alias without routing or filter query + AliasMetadata aliasMetadata = alias.getFirstAliasMetadata(); + assert aliasMetadata != null : "alias metadata may not be null"; + if (aliasMetadata.filteringRequired() || aliasMetadata.getIndexRouting() != null || aliasMetadata.getSearchRouting() != null) { + throw new IllegalArgumentException("alias [" + request.aliasName + "] may not have custom filtering or routing"); + } + } + + private static void prepareBackingIndex( + Metadata.Builder b, + IndexMetadata im, + String dataStreamName, + Function mapperSupplier) throws IOException { + // hides the index, removes the original alias, and adds data stream timestamp field mapper + MappingMetadata mm = im.mapping(); + if (mm == null) { + throw new IllegalArgumentException("backing index [" + im.getIndex().getName() + "] must have mappings for a timestamp field"); + } + + MapperService mapperService = mapperSupplier.apply(im); + mapperService.merge(im, MapperService.MergeReason.MAPPING_RECOVERY); + mapperService.merge("_doc", Map.of("_data_stream_timestamp", Map.of("enabled", true)), MapperService.MergeReason.MAPPING_UPDATE); + DocumentMapper mapper = mapperService.documentMapper(); + + b.put(IndexMetadata.builder(im) + .removeAlias(dataStreamName) + .settings(Settings.builder().put(im.getSettings()).put("index.hidden", "true").build()) + .settingsVersion(im.getSettingsVersion() + 1) + .mappingVersion(im.getMappingVersion() + 1) + .putMapping(new MappingMetadata(mapper))); + } + + // package-visible for testing + static void validateBackingIndices(ClusterState currentState, String dataStreamName) { + IndexAbstraction ia = currentState.metadata().getIndicesLookup().get(dataStreamName); + if (ia == null || ia.getType() != IndexAbstraction.Type.ALIAS) { + throw new IllegalArgumentException("alias [" + dataStreamName + "] does not exist"); + } + IndexAbstraction.Alias alias = (IndexAbstraction.Alias) ia; + + // ensure that no other aliases reference indices + List indicesWithOtherAliases = new ArrayList<>(); + for (IndexMetadata im : alias.getIndices()) { + if (im.getAliases().size() > 1 || im.getAliases().containsKey(alias.getName()) == false) { + indicesWithOtherAliases.add(im.getIndex().getName()); + } + } + if (indicesWithOtherAliases.size() > 0) { + throw new IllegalArgumentException("other aliases referencing indices [" + + Strings.collectionToCommaDelimitedString(indicesWithOtherAliases) + "] must be removed before migrating to a data stream"); + } + } + + public static final class MigrateToDataStreamClusterStateUpdateRequest extends ClusterStateUpdateRequest { + + private final String aliasName; + + public MigrateToDataStreamClusterStateUpdateRequest(String aliasName, + TimeValue masterNodeTimeout, + TimeValue timeout) { + this.aliasName = aliasName; + masterNodeTimeout(masterNodeTimeout); + ackTimeout(timeout); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java index 2c72871668391..a2ba6d5bac15a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java @@ -172,5 +172,4 @@ private static MetadataCreateIndexService getMetadataCreateIndexService() throws return s; } - } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamServiceTests.java new file mode 100644 index 0000000000000..35657557a3f23 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamServiceTests.java @@ -0,0 +1,336 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.DataStreamTestHelper; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.MapperServiceTestCase; +import org.elasticsearch.plugins.Plugin; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class MetadataMigrateToDataStreamServiceTests extends MapperServiceTestCase { + + public void testValidateRequestWithNonexistentAlias() { + ClusterState cs = ClusterState.EMPTY_STATE; + String nonExistentAlias = "nonexistent_alias"; + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> MetadataMigrateToDataStreamService + .validateRequest(cs, new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest( + nonExistentAlias, TimeValue.ZERO, TimeValue.ZERO))); + assertThat(e.getMessage(), containsString("alias [" + nonExistentAlias + "] does not exist")); + } + + public void testValidateRequestWithFilteredAlias() { + String filteredAliasName = "filtered_alias"; + AliasMetadata filteredAlias = AliasMetadata.builder(filteredAliasName).filter("{\"term\":{\"user.id\":\"kimchy\"}}").build(); + ClusterState cs = ClusterState.builder(new ClusterName("dummy")).metadata( + Metadata.builder().put(IndexMetadata.builder("foo") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(filteredAlias) + .numberOfShards(1) + .numberOfReplicas(0)) + ).build(); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> MetadataMigrateToDataStreamService + .validateRequest(cs, new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest( + filteredAliasName, TimeValue.ZERO, TimeValue.ZERO))); + assertThat(e.getMessage(), containsString("alias [" + filteredAliasName + "] may not have custom filtering or routing")); + } + + public void testValidateRequestWithAliasWithRouting() { + String routedAliasName = "routed_alias"; + AliasMetadata aliasWithRouting = AliasMetadata.builder(routedAliasName).routing("foo").build(); + ClusterState cs = ClusterState.builder(new ClusterName("dummy")).metadata( + Metadata.builder().put(IndexMetadata.builder("foo") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(aliasWithRouting) + .numberOfShards(1) + .numberOfReplicas(0)) + ).build(); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> MetadataMigrateToDataStreamService + .validateRequest(cs, new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest( + routedAliasName, TimeValue.ZERO, TimeValue.ZERO))); + assertThat(e.getMessage(), containsString("alias [" + routedAliasName + "] may not have custom filtering or routing")); + } + + public void testValidateRequestWithAliasWithoutWriteIndex() { + String aliasWithoutWriteIndex = "alias"; + AliasMetadata alias1 = AliasMetadata.builder(aliasWithoutWriteIndex).build(); + ClusterState cs = ClusterState.builder(new ClusterName("dummy")).metadata( + Metadata.builder() + .put(IndexMetadata.builder("foo1") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(alias1) + .numberOfShards(1) + .numberOfReplicas(0)) + .put(IndexMetadata.builder("foo2") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(alias1) + .numberOfShards(1) + .numberOfReplicas(0)) + .put(IndexMetadata.builder("foo3") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(alias1) + .numberOfShards(1) + .numberOfReplicas(0)) + .put(IndexMetadata.builder("foo4") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(alias1) + .numberOfShards(1) + .numberOfReplicas(0)) + ).build(); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> MetadataMigrateToDataStreamService + .validateRequest(cs, new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest( + aliasWithoutWriteIndex, TimeValue.ZERO, TimeValue.ZERO))); + assertThat(e.getMessage(), containsString("alias [" + aliasWithoutWriteIndex + "] must specify a write index")); + } + + public void testValidateRequest() { + String aliasName = "alias"; + AliasMetadata alias1 = AliasMetadata.builder(aliasName).build(); + ClusterState cs = ClusterState.builder(new ClusterName("dummy")).metadata( + Metadata.builder() + .put(IndexMetadata.builder("foo1") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(alias1) + .numberOfShards(1) + .numberOfReplicas(0)) + .put(IndexMetadata.builder("foo2") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(alias1) + .numberOfShards(1) + .numberOfReplicas(0)) + .put(IndexMetadata.builder("foo3") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(alias1) + .numberOfShards(1) + .numberOfReplicas(0)) + .put(IndexMetadata.builder("foo4") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(AliasMetadata.builder(aliasName).writeIndex(true)) + .numberOfShards(1) + .numberOfReplicas(0)) + ).build(); + MetadataMigrateToDataStreamService.validateRequest(cs, + new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest(aliasName, TimeValue.ZERO, TimeValue.ZERO)); + } + + public void testValidateRequestWithIndicesWithMultipleAliasReferences() { + String aliasName = "alias"; + AliasMetadata alias1 = AliasMetadata.builder(aliasName).build(); + AliasMetadata alias2 = AliasMetadata.builder(aliasName + "2").build(); + ClusterState cs = ClusterState.builder(new ClusterName("dummy")).metadata( + Metadata.builder() + .put(IndexMetadata.builder("foo1") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(alias1) + .numberOfShards(1) + .numberOfReplicas(0)) + .put(IndexMetadata.builder("foo2") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(alias1) + .putAlias(alias2) + .numberOfShards(1) + .numberOfReplicas(0)) + .put(IndexMetadata.builder("foo3") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(alias1) + .putAlias(alias2) + .numberOfShards(1) + .numberOfReplicas(0)) + .put(IndexMetadata.builder("foo4") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(alias1) + .numberOfShards(1) + .numberOfReplicas(0)) + ).build(); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> MetadataMigrateToDataStreamService.validateBackingIndices(cs, aliasName)); + String emsg = e.getMessage(); + assertThat(emsg, containsString("other aliases referencing indices [")); + assertThat(emsg, containsString("] must be removed before migrating to a data stream")); + String referencedIndices = emsg.substring(emsg.indexOf('[') + 1, emsg.indexOf(']')); + Set indices = Strings.commaDelimitedListToSet(referencedIndices); + assertThat(indices, containsInAnyOrder("foo2", "foo3")); + } + + public void testCreateDataStreamWithSuppliedWriteIndex() throws Exception { + String dataStreamName = "foo"; + AliasMetadata alias = AliasMetadata.builder(dataStreamName).build(); + IndexMetadata + foo1 = + IndexMetadata.builder("foo1") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(AliasMetadata.builder(dataStreamName).writeIndex(true).build()) + .numberOfShards(1) + .numberOfReplicas(0) + .putMapping(generateMapping("@timestamp", "date")) + .build(); + IndexMetadata + foo2 = + IndexMetadata.builder("foo2") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(alias) + .numberOfShards(1) + .numberOfReplicas(0) + .putMapping(generateMapping("@timestamp", "date")) + .build(); + ClusterState cs = ClusterState.builder(new ClusterName("dummy")) + .metadata(Metadata.builder() + .put(foo1, false) + .put(foo2, false) + .put("template", new ComposableIndexTemplate(org.elasticsearch.common.collect.List.of(dataStreamName + "*"), null, null, + null, null, null, new ComposableIndexTemplate.DataStreamTemplate()))) + .build(); + + ClusterState newState = MetadataMigrateToDataStreamService.migrateToDataStream(cs, this::getMapperService, + new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest(dataStreamName, + TimeValue.ZERO, + TimeValue.ZERO)); + IndexAbstraction ds = newState.metadata().getIndicesLookup().get(dataStreamName); + assertThat(ds, notNullValue()); + assertThat(ds.getType(), equalTo(IndexAbstraction.Type.DATA_STREAM)); + assertThat(ds.getIndices().size(), equalTo(2)); + List backingIndexNames = ds.getIndices().stream().map(x -> x.getIndex().getName()).collect(Collectors.toList()); + assertThat(backingIndexNames, containsInAnyOrder("foo1", "foo2")); + assertThat(ds.getWriteIndex().getIndex().getName(), equalTo("foo1")); + for (IndexMetadata im : ds.getIndices()) { + assertThat(im.getSettings().get("index.hidden"), equalTo("true")); + assertThat(im.getAliases().size(), equalTo(0)); + } + } + + public void testCreateDataStreamHidesBackingIndicesAndRemovesAlias() throws Exception { + String dataStreamName = "foo"; + AliasMetadata alias = AliasMetadata.builder(dataStreamName).build(); + IndexMetadata foo1 = IndexMetadata.builder("foo1") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(AliasMetadata.builder(dataStreamName).writeIndex(true).build()) + .numberOfShards(1) + .numberOfReplicas(0) + .putMapping(generateMapping("@timestamp", "date")) + .build(); + IndexMetadata foo2 = IndexMetadata.builder("foo2") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(alias) + .numberOfShards(1) + .numberOfReplicas(0) + .putMapping(generateMapping("@timestamp", "date")) + .build(); + ClusterState cs = ClusterState.builder(new ClusterName("dummy")).metadata( + Metadata.builder() + .put(foo1, false) + .put(foo2, false) + .put("template", new ComposableIndexTemplate(org.elasticsearch.common.collect.List.of(dataStreamName + "*"), null, null, + null, null, null, new ComposableIndexTemplate.DataStreamTemplate()))) + .build(); + + ClusterState newState = MetadataMigrateToDataStreamService.migrateToDataStream(cs, this::getMapperService, + new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest(dataStreamName, + TimeValue.ZERO, + TimeValue.ZERO)); + IndexAbstraction ds = newState.metadata().getIndicesLookup().get(dataStreamName); + assertThat(ds, notNullValue()); + assertThat(ds.getType(), equalTo(IndexAbstraction.Type.DATA_STREAM)); + assertThat(ds.getIndices().size(), equalTo(2)); + List backingIndexNames = ds.getIndices().stream().map(x -> x.getIndex().getName()).collect(Collectors.toList()); + assertThat(backingIndexNames, containsInAnyOrder("foo1", "foo2")); + assertThat(ds.getWriteIndex().getIndex().getName(), equalTo("foo1")); + for (IndexMetadata im : ds.getIndices()) { + assertThat(im.getSettings().get("index.hidden"), equalTo("true")); + assertThat(im.getAliases().size(), equalTo(0)); + } + } + + public void testCreateDataStreamWithoutSuppliedWriteIndex() throws Exception { + String dataStreamName = "foo"; + AliasMetadata alias = AliasMetadata.builder(dataStreamName).build(); + IndexMetadata + foo1 = + IndexMetadata.builder("foo1") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(alias) + .numberOfShards(1) + .numberOfReplicas(0) + .putMapping(generateMapping("@timestamp", "date")) + .build(); + IndexMetadata + foo2 = + IndexMetadata.builder("foo2") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .putAlias(alias) + .numberOfShards(1) + .numberOfReplicas(0) + .putMapping(generateMapping("@timestamp", "date")) + .build(); + ClusterState cs = ClusterState.builder(new ClusterName("dummy")) + .metadata(Metadata.builder() + .put(foo1, false) + .put(foo2, false) + .put("template", new ComposableIndexTemplate(org.elasticsearch.common.collect.List.of(dataStreamName + "*"), null, null, + null, null, null, new ComposableIndexTemplate.DataStreamTemplate()))) + .build(); + + IllegalArgumentException e = + expectThrows(IllegalArgumentException.class, + () -> MetadataMigrateToDataStreamService.migrateToDataStream(cs, + this::getMapperService, + new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest(dataStreamName, + TimeValue.ZERO, + TimeValue.ZERO))); + assertThat(e.getMessage(), containsString("alias [" + dataStreamName + "] must specify a write index")); + } + + private static MappingMetadata generateMapping(String timestampFieldName, String type) throws IOException { + String source = DataStreamTestHelper.generateMapping(timestampFieldName, type); + return new MappingMetadata(MapperService.SINGLE_MAPPING_NAME, + XContentHelper.convertToMap(XContentFactory.xContent(source), source, true)); + } + + private MapperService getMapperService(IndexMetadata im) { + try { + return createMapperService("_doc", "{\"_doc\": " + im.mapping().source().toString() + "}"); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + @Override + protected Collection getPlugins() { + return org.elasticsearch.common.collect.List.of(new MetadataIndexTemplateServiceTests.DummyPlugin()); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/MigrateToDataStreamAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/MigrateToDataStreamAction.java new file mode 100644 index 0000000000000..1adf02f72f9bd --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/MigrateToDataStreamAction.java @@ -0,0 +1,88 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +public class MigrateToDataStreamAction extends ActionType { + + public static final MigrateToDataStreamAction INSTANCE = new MigrateToDataStreamAction(); + public static final String NAME = "indices:admin/data_stream/migrate"; + + private MigrateToDataStreamAction() { + super(NAME, AcknowledgedResponse::readFrom); + } + + public static class Request extends AcknowledgedRequest implements IndicesRequest { + + private final String aliasName; + + public Request(String aliasName) { + this.aliasName = aliasName; + } + + public String getAliasName() { + return aliasName; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (Strings.hasText(aliasName) == false) { + validationException = ValidateActions.addValidationError("name is missing", validationException); + } + return validationException; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.aliasName = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(aliasName); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MigrateToDataStreamAction.Request request = (MigrateToDataStreamAction.Request) o; + return aliasName.equals(request.aliasName); + } + + @Override + public int hashCode() { + return Objects.hash(aliasName); + } + + @Override + public String[] indices() { + return new String[]{aliasName}; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + } + } + +} diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index c25acc6e834f8..0efbff2049dbc 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -1194,7 +1194,7 @@ private static void verifyResolvability( } } - private static void indexDocs(String dataStream, int numDocs) { + static void indexDocs(String dataStream, int numDocs) { BulkRequest bulkRequest = new BulkRequest(); for (int i = 0; i < numDocs; i++) { String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis()); @@ -1215,17 +1215,21 @@ private static void indexDocs(String dataStream, int numDocs) { client().admin().indices().refresh(new RefreshRequest(dataStream)).actionGet(); } - private static void verifyDocs(String dataStream, long expectedNumHits, long minGeneration, long maxGeneration) { + static void verifyDocs(String dataStream, long expectedNumHits, java.util.List expectedIndices) { SearchRequest searchRequest = new SearchRequest(dataStream); searchRequest.source().size((int) expectedNumHits); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertThat(searchResponse.getHits().getTotalHits().value, equalTo(expectedNumHits)); + Arrays.stream(searchResponse.getHits().getHits()).forEach(hit -> { assertTrue(expectedIndices.contains(hit.getIndex())); }); + } + + static void verifyDocs(String dataStream, long expectedNumHits, long minGeneration, long maxGeneration) { java.util.List expectedIndices = new ArrayList<>(); for (long k = minGeneration; k <= maxGeneration; k++) { expectedIndices.add(DataStream.getDefaultBackingIndexName(dataStream, k)); } - Arrays.stream(searchResponse.getHits().getHits()).forEach(hit -> { assertTrue(expectedIndices.contains(hit.getIndex())); }); + verifyDocs(dataStream, expectedNumHits, expectedIndices); } public static void putComposableIndexTemplate(String id, java.util.List patterns) throws IOException { diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamMigrationIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamMigrationIT.java new file mode 100644 index 0000000000000..c1ff3f3694e9b --- /dev/null +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamMigrationIT.java @@ -0,0 +1,273 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.datastreams; + +import joptsimple.internal.Strings; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction; +import org.elasticsearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.collect.List; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xpack.core.MigrateToDataStreamAction; +import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; +import org.elasticsearch.xpack.datastreams.DataStreamsPlugin; +import org.junit.After; + +import java.util.Collection; +import java.util.Locale; + +import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD; +import static org.elasticsearch.datastreams.DataStreamIT.putComposableIndexTemplate; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +@ESIntegTestCase.ClusterScope(transportClientRatio = 0) +public class DataStreamMigrationIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(DataStreamsPlugin.class); + } + + @After + public void cleanup() { + AcknowledgedResponse response = client().execute( + DeleteDataStreamAction.INSTANCE, + new DeleteDataStreamAction.Request(new String[] { "*" }) + ).actionGet(); + assertAcked(response); + + DeleteDataStreamAction.Request deleteDSRequest = new DeleteDataStreamAction.Request(new String[] { "*" }); + client().execute(DeleteDataStreamAction.INSTANCE, deleteDSRequest).actionGet(); + DeleteComposableIndexTemplateAction.Request deleteTemplateRequest = new DeleteComposableIndexTemplateAction.Request("*"); + client().execute(DeleteComposableIndexTemplateAction.INSTANCE, deleteTemplateRequest).actionGet(); + } + + public void testBasicMigration() throws Exception { + putComposableIndexTemplate("id1", List.of("migrate*")); + + admin().indices().create(new CreateIndexRequest("index1")).get(); + admin().indices().create(new CreateIndexRequest("index2")).get(); + + int numDocs1 = randomIntBetween(2, 16); + indexDocs("index1", numDocs1); + int numDocs2 = randomIntBetween(2, 16); + indexDocs("index2", numDocs2); + + String alias = "migrate-to-data-stream"; + IndicesAliasesRequest request = new IndicesAliasesRequest(); + request.addAliasAction(IndicesAliasesRequest.AliasActions.add().index("index1").alias(alias).writeIndex(true)); + request.addAliasAction(IndicesAliasesRequest.AliasActions.add().index("index2").alias(alias).writeIndex(false)); + assertAcked(admin().indices().aliases(request).get()); + + ResolveIndexAction.Request resolveRequest = new ResolveIndexAction.Request( + new String[] { "*" }, + IndicesOptions.fromOptions(true, true, true, true, true) + ); + ResolveIndexAction.Response resolveResponse = admin().indices().resolveIndex(resolveRequest).get(); + assertThat(resolveResponse.getAliases().size(), equalTo(1)); + assertThat(resolveResponse.getAliases().get(0).getName(), equalTo(alias)); + assertThat(resolveResponse.getDataStreams().size(), equalTo(0)); + assertThat(resolveResponse.getIndices().size(), equalTo(2)); + + client().execute(MigrateToDataStreamAction.INSTANCE, new MigrateToDataStreamAction.Request(alias)).get(); + + resolveResponse = admin().indices().resolveIndex(resolveRequest).get(); + assertThat(resolveResponse.getAliases().size(), equalTo(0)); + assertThat(resolveResponse.getDataStreams().size(), equalTo(1)); + assertThat(resolveResponse.getDataStreams().get(0).getName(), equalTo(alias)); + assertThat(resolveResponse.getDataStreams().get(0).getBackingIndices(), arrayContaining("index2", "index1")); + assertThat(resolveResponse.getIndices().size(), equalTo(2)); + + int numDocsDs = randomIntBetween(2, 16); + indexDocs(alias, numDocsDs); + DataStreamIT.verifyDocs(alias, numDocs1 + numDocs2 + numDocsDs, List.of("index1", "index2")); + } + + public void testMigrationWithoutTemplate() throws Exception { + admin().indices().create(new CreateIndexRequest("index1")).get(); + admin().indices().create(new CreateIndexRequest("index2")).get(); + + int numDocs1 = randomIntBetween(2, 16); + indexDocs("index1", numDocs1); + int numDocs2 = randomIntBetween(2, 16); + indexDocs("index2", numDocs2); + + String alias = "migrate-to-data-stream"; + IndicesAliasesRequest request = new IndicesAliasesRequest(); + request.addAliasAction(IndicesAliasesRequest.AliasActions.add().index("index1").alias(alias).writeIndex(true)); + request.addAliasAction(IndicesAliasesRequest.AliasActions.add().index("index2").alias(alias).writeIndex(false)); + assertAcked(admin().indices().aliases(request).get()); + + ResolveIndexAction.Request resolveRequest = new ResolveIndexAction.Request( + new String[] { "*" }, + IndicesOptions.fromOptions(true, true, true, true, true) + ); + ResolveIndexAction.Response resolveResponse = admin().indices().resolveIndex(resolveRequest).get(); + assertThat(resolveResponse.getAliases().size(), equalTo(1)); + assertThat(resolveResponse.getAliases().get(0).getName(), equalTo(alias)); + assertThat(resolveResponse.getDataStreams().size(), equalTo(0)); + assertThat(resolveResponse.getIndices().size(), equalTo(2)); + + Exception e = expectThrows( + Exception.class, + () -> client().execute(MigrateToDataStreamAction.INSTANCE, new MigrateToDataStreamAction.Request(alias)).get() + ); + + assertTrue( + throwableOrItsCause(e, IllegalArgumentException.class, "no matching index template found for data stream [" + alias + "]") + ); + } + + public void testMigrationWithoutIndexMappings() throws Exception { + putComposableIndexTemplate("id1", List.of("migrate*")); + + admin().indices().create(new CreateIndexRequest("index1")).get(); + admin().indices().create(new CreateIndexRequest("index2")).get(); + + String alias = "migrate-to-data-stream"; + IndicesAliasesRequest request = new IndicesAliasesRequest(); + request.addAliasAction(IndicesAliasesRequest.AliasActions.add().index("index1").alias(alias).writeIndex(true)); + request.addAliasAction(IndicesAliasesRequest.AliasActions.add().index("index2").alias(alias).writeIndex(false)); + assertAcked(admin().indices().aliases(request).get()); + + ResolveIndexAction.Request resolveRequest = new ResolveIndexAction.Request( + new String[] { "*" }, + IndicesOptions.fromOptions(true, true, true, true, true) + ); + ResolveIndexAction.Response resolveResponse = admin().indices().resolveIndex(resolveRequest).get(); + assertThat(resolveResponse.getAliases().size(), equalTo(1)); + assertThat(resolveResponse.getAliases().get(0).getName(), equalTo(alias)); + assertThat(resolveResponse.getDataStreams().size(), equalTo(0)); + assertThat(resolveResponse.getIndices().size(), equalTo(2)); + + Exception e = expectThrows( + Exception.class, + () -> client().execute(MigrateToDataStreamAction.INSTANCE, new MigrateToDataStreamAction.Request(alias)).get() + ); + + assertTrue(throwableOrItsCause(e, IllegalArgumentException.class, "must have mappings for a timestamp field")); + } + + public void testMigrationWithoutTimestampMapping() throws Exception { + putComposableIndexTemplate("id1", List.of("migrate*")); + + admin().indices().create(new CreateIndexRequest("index1")).get(); + admin().indices().create(new CreateIndexRequest("index2")).get(); + + int numDocs1 = randomIntBetween(2, 16); + indexDocs("index1", numDocs1, "foo"); + int numDocs2 = randomIntBetween(2, 16); + indexDocs("index2", numDocs2, "foo"); + + String alias = "migrate-to-data-stream"; + IndicesAliasesRequest request = new IndicesAliasesRequest(); + request.addAliasAction(IndicesAliasesRequest.AliasActions.add().index("index1").alias(alias).writeIndex(true)); + request.addAliasAction(IndicesAliasesRequest.AliasActions.add().index("index2").alias(alias).writeIndex(false)); + assertAcked(admin().indices().aliases(request).get()); + + ResolveIndexAction.Request resolveRequest = new ResolveIndexAction.Request( + new String[] { "*" }, + IndicesOptions.fromOptions(true, true, true, true, true) + ); + ResolveIndexAction.Response resolveResponse = admin().indices().resolveIndex(resolveRequest).get(); + assertThat(resolveResponse.getAliases().size(), equalTo(1)); + assertThat(resolveResponse.getAliases().get(0).getName(), equalTo(alias)); + assertThat(resolveResponse.getDataStreams().size(), equalTo(0)); + assertThat(resolveResponse.getIndices().size(), equalTo(2)); + + Exception e = expectThrows( + Exception.class, + () -> client().execute(MigrateToDataStreamAction.INSTANCE, new MigrateToDataStreamAction.Request(alias)).get() + ); + + assertTrue(throwableOrItsCause(e, IllegalArgumentException.class, "data stream timestamp field [@timestamp] does not exist")); + } + + public void testMigrationWithoutWriteIndex() throws Exception { + putComposableIndexTemplate("id1", List.of("migrate*")); + + admin().indices().create(new CreateIndexRequest("index1")).get(); + admin().indices().create(new CreateIndexRequest("index2")).get(); + + int numDocs1 = randomIntBetween(2, 16); + indexDocs("index1", numDocs1); + int numDocs2 = randomIntBetween(2, 16); + indexDocs("index2", numDocs2); + + String alias = "migrate-to-data-stream"; + IndicesAliasesRequest request = new IndicesAliasesRequest(); + request.addAliasAction(IndicesAliasesRequest.AliasActions.add().index("index1").alias(alias).writeIndex(false)); + request.addAliasAction(IndicesAliasesRequest.AliasActions.add().index("index2").alias(alias).writeIndex(false)); + assertAcked(admin().indices().aliases(request).get()); + + ResolveIndexAction.Request resolveRequest = new ResolveIndexAction.Request( + new String[] { "*" }, + IndicesOptions.fromOptions(true, true, true, true, true) + ); + ResolveIndexAction.Response resolveResponse = admin().indices().resolveIndex(resolveRequest).get(); + assertThat(resolveResponse.getAliases().size(), equalTo(1)); + assertThat(resolveResponse.getAliases().get(0).getName(), equalTo(alias)); + assertThat(resolveResponse.getDataStreams().size(), equalTo(0)); + assertThat(resolveResponse.getIndices().size(), equalTo(2)); + + Exception e = expectThrows( + Exception.class, + () -> client().execute(MigrateToDataStreamAction.INSTANCE, new MigrateToDataStreamAction.Request(alias)).get() + ); + + assertTrue(throwableOrItsCause(e, IllegalArgumentException.class, "alias [" + alias + "] must specify a write index")); + } + + static boolean throwableOrItsCause(Throwable t, Class clazz, String message) { + boolean found = false; + Throwable throwable = t; + while (throwable != null && found == false) { + found = throwable.getMessage().contains(message) && throwable.getClass().equals(clazz); + throwable = throwable.getCause(); + } + return found; + } + + static void indexDocs(String index, int numDocs) { + indexDocs(index, numDocs, Strings.EMPTY); + } + + static void indexDocs(String index, int numDocs, String fieldPrefix) { + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < numDocs; i++) { + String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis()); + bulkRequest.add( + new IndexRequest(index).opType(DocWriteRequest.OpType.CREATE) + .source(String.format(Locale.ROOT, "{\"%s\":\"%s\"}", fieldPrefix + DEFAULT_TIMESTAMP_FIELD, value), XContentType.JSON) + ); + } + BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); + assertThat(bulkResponse.getItems().length, equalTo(numDocs)); + for (BulkItemResponse itemResponse : bulkResponse) { + assertThat(itemResponse.getFailureMessage(), nullValue()); + assertThat(itemResponse.status(), equalTo(RestStatus.CREATED)); + } + client().admin().indices().refresh(new RefreshRequest(index)).actionGet(); + } + +} diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java index d39ee82f0f12f..92ae98f9dc5ce 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamsPlugin.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.xpack.core.MigrateToDataStreamAction; import org.elasticsearch.xpack.core.action.CreateDataStreamAction; import org.elasticsearch.xpack.core.action.DataStreamsStatsAction; import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; @@ -27,6 +28,7 @@ import org.elasticsearch.xpack.core.action.PromoteDataStreamAction; import org.elasticsearch.xpack.datastreams.action.DataStreamsStatsTransportAction; import org.elasticsearch.xpack.datastreams.action.PromoteDataStreamTransportAction; +import org.elasticsearch.xpack.datastreams.action.MigrateToDataStreamTransportAction; import org.elasticsearch.xpack.datastreams.rest.RestCreateDataStreamAction; import org.elasticsearch.xpack.datastreams.rest.RestDataStreamsStatsAction; import org.elasticsearch.xpack.datastreams.rest.RestDeleteDataStreamAction; @@ -66,6 +68,7 @@ public Map getMetadataMappers() { new ActionHandler<>(DeleteDataStreamAction.INSTANCE, DeleteDataStreamTransportAction.class), new ActionHandler<>(GetDataStreamAction.INSTANCE, GetDataStreamsTransportAction.class), new ActionHandler<>(DataStreamsStatsAction.INSTANCE, DataStreamsStatsTransportAction.class), + new ActionHandler<>(MigrateToDataStreamAction.INSTANCE, MigrateToDataStreamTransportAction.class), new ActionHandler<>(PromoteDataStreamAction.INSTANCE, PromoteDataStreamTransportAction.class) ); } diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/MigrateToDataStreamTransportAction.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/MigrateToDataStreamTransportAction.java new file mode 100644 index 0000000000000..d3da08963b051 --- /dev/null +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/MigrateToDataStreamTransportAction.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.datastreams.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetadataMigrateToDataStreamService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.MigrateToDataStreamAction; + +public class MigrateToDataStreamTransportAction extends AcknowledgedTransportMasterNodeAction { + + private final MetadataMigrateToDataStreamService metadataMigrateToDataStreamService; + + @Inject + public MigrateToDataStreamTransportAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + MetadataMigrateToDataStreamService metadataMigrateToDataStreamService + ) { + super( + MigrateToDataStreamAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + MigrateToDataStreamAction.Request::new, + indexNameExpressionResolver, + ThreadPool.Names.SAME + ); + this.metadataMigrateToDataStreamService = metadataMigrateToDataStreamService; + } + + @Override + protected void masterOperation( + MigrateToDataStreamAction.Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest updateRequest = + new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest( + request.getAliasName(), + request.masterNodeTimeout(), + request.timeout() + ); + metadataMigrateToDataStreamService.migrateToDataStream(updateRequest, listener); + } + + @Override + protected ClusterBlockException checkBlock(MigrateToDataStreamAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/mapper/DataStreamTimestampFieldMapper.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/mapper/DataStreamTimestampFieldMapper.java index 1710b2d4b5119..e83b7a72a64d0 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/mapper/DataStreamTimestampFieldMapper.java +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/mapper/DataStreamTimestampFieldMapper.java @@ -73,7 +73,9 @@ private static DataStreamTimestampFieldMapper toType(FieldMapper in) { public static class Builder extends MetadataFieldMapper.Builder { - private final Parameter enabled = Parameter.boolParam("enabled", false, m -> toType(m).enabled, false); + private final Parameter enabled = Parameter.boolParam("enabled", true, m -> toType(m).enabled, false) + // this field mapper may be enabled but once enabled, may not be disabled + .setMergeValidator((previous, current, conflicts) -> (previous == current) || (previous == false && current)); public Builder() { super(NAME); diff --git a/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/mapper/DataStreamTimestampFieldMapperTests.java b/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/mapper/DataStreamTimestampFieldMapperTests.java index 06ba31fb2c332..adb355b58402c 100644 --- a/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/mapper/DataStreamTimestampFieldMapperTests.java +++ b/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/mapper/DataStreamTimestampFieldMapperTests.java @@ -280,17 +280,26 @@ public void testValidateNotDisallowedAttribute() throws IOException { assertThat(e.getMessage(), equalTo("data stream timestamp field [@timestamp] has disallowed attributes: [store]")); } - public void testCannotUpdateTimestampField() throws IOException { + public void testCanUpdateTimestampFieldMapperFromDisabledToEnabled() throws IOException { MapperService mapperService = createIndex("test").mapperService(); String mapping1 = "{\"type\":{\"_data_stream_timestamp\":{\"enabled\":false}, \"properties\": {\"@timestamp\": {\"type\": \"date\"}}}}}"; String mapping2 = "{\"type\":{\"_data_stream_timestamp\":{\"enabled\":true}, \"properties\": {\"@timestamp2\": " + "{\"type\": \"date\"},\"@timestamp\": {\"type\": \"date\"}}}})"; - assertConflicts(mapping1, mapping2, mapperService, "Mapper for [_data_stream_timestamp]", "[enabled] from [false] to [true]"); + assertConflicts(mapping1, mapping2, mapperService); mapping1 = "{\"type\":{\"properties\":{\"@timestamp\": {\"type\": \"date\"}}}}}"; mapping2 = "{\"type\":{\"_data_stream_timestamp\":{\"enabled\":true}, \"properties\": " + "{\"@timestamp2\": {\"type\": \"date\"},\"@timestamp\": {\"type\": \"date\"}}}})"; - assertConflicts(mapping1, mapping2, mapperService, "Mapper for [_data_stream_timestamp]", "[enabled] from [false] to [true]"); + assertConflicts(mapping1, mapping2, mapperService); + } + + public void testCannotUpdateTimestampFieldMapperFromEnabledToDisabled() throws IOException { + MapperService mapperService = createIndex("test").mapperService(); + String mapping1 = + "{\"type\":{\"_data_stream_timestamp\":{\"enabled\":true}, \"properties\": {\"@timestamp\": {\"type\": \"date\"}}}}}"; + String mapping2 = "{\"type\":{\"_data_stream_timestamp\":{\"enabled\":false}, \"properties\": {\"@timestamp2\": " + + "{\"type\": \"date\"},\"@timestamp\": {\"type\": \"date\"}}}})"; + assertConflicts(mapping1, mapping2, mapperService, "Cannot update parameter [enabled] from [true] to [false]"); } } diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 04b8240fe268f..fc38b624d9653 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -275,6 +275,7 @@ public class Constants { "indices:admin/data_stream/create", "indices:admin/data_stream/delete", "indices:admin/data_stream/get", + "indices:admin/data_stream/migrate", "indices:admin/data_stream/promote", "indices:admin/delete", "indices:admin/exists", diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java index 8a1d4d5e24e8d..80920f19db38e 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java @@ -15,7 +15,6 @@ import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.xpack.core.action.CreateDataStreamAction; import org.elasticsearch.action.bulk.BulkItemRequest; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.bulk.TransportShardBulkAction; @@ -39,6 +38,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.xpack.core.MigrateToDataStreamAction; +import org.elasticsearch.xpack.core.action.CreateDataStreamAction; import org.elasticsearch.xpack.core.security.action.user.GetUserPrivilegesRequest; import org.elasticsearch.xpack.core.security.action.user.GetUserPrivilegesResponse; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest; @@ -320,8 +321,10 @@ private void handleIndexActionAuthorizationResult(final IndexAuthorizationResult } //if we are creating an index we need to authorize potential aliases created at the same time if (IndexPrivilege.CREATE_INDEX_MATCHER.test(action)) { - assert (request instanceof CreateIndexRequest) || (request instanceof CreateDataStreamAction.Request); - if (request instanceof CreateDataStreamAction.Request || ((CreateIndexRequest) request).aliases().isEmpty()) { + assert (request instanceof CreateIndexRequest) || (request instanceof MigrateToDataStreamAction.Request) || + (request instanceof CreateDataStreamAction.Request); + if (request instanceof CreateDataStreamAction.Request || (request instanceof MigrateToDataStreamAction.Request) || + ((CreateIndexRequest) request).aliases().isEmpty()) { runRequestInterceptors(requestInfo, authzInfo, authorizationEngine, listener); } else { Set aliases = ((CreateIndexRequest) request).aliases();