Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validation for data stream creation #54083

Merged
merged 9 commits into from
Mar 26, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -47,7 +48,10 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;

public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
Expand Down Expand Up @@ -161,15 +165,63 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}

static ClusterState createDataStream(ClusterState currentState, Request request) {
List<String> validationErrors = new ArrayList<>();
if (currentState.metaData().dataStreams().containsKey(request.name)) {
throw new IllegalArgumentException("data_stream [" + request.name + "] already exists");
validationErrors.add("data_stream [" + request.name + "] already exists");
}

MetaData.Builder builder = MetaData.builder(currentState.metaData()).put(
new DataStream(request.name, request.timestampFieldName, Collections.emptyList()));
validationErrors.addAll(validateDataStreamName(request.name));

logger.info("adding data stream [{}]", request.name);
return ClusterState.builder(currentState).metaData(builder).build();
if (currentState.metaData().hasIndex(request.name)) {
validationErrors.add("data_stream [" + request.name + "] conflicts with existing index");
}

if (currentState.metaData().hasAlias(request.name)) {
validationErrors.add("data_stream [" + request.name + "] conflicts with existing alias");
}

final String backingIndexPrefix = (request.name.startsWith(".") ? "" : ".") + request.name + "-";
for (String indexName : currentState.metaData().getConcreteAllIndices()) {
if (indexName.startsWith(backingIndexPrefix)) {
validationErrors.add(
"data_stream [" + request.name + "] could create backing indices that conflict with existing indices");
break;
}
}

if (validationErrors.isEmpty()) {
MetaData.Builder builder = MetaData.builder(currentState.metaData()).put(
new DataStream(request.name, request.timestampFieldName, Collections.emptyList()));
logger.info("adding data stream [{}]", request.name);
return ClusterState.builder(currentState).metaData(builder).build();
} else {
ValidationException ex = new ValidationException();
ex.addValidationErrors(validationErrors);
throw new IllegalArgumentException(ex);
}
}

private static List<String> validateDataStreamName(String name) {
danhermann marked this conversation as resolved.
Show resolved Hide resolved
List<String> validationErrors = new ArrayList<>();
if (name.contains(" ")) {
validationErrors.add("name must not contain a space");
}
if (name.contains(",")) {
validationErrors.add("name must not contain a ','");
}
if (name.contains("#")) {
validationErrors.add("name must not contain a '#'");
}
if (name.startsWith("_")) {
validationErrors.add("name must not start with '_'");
}
if (name.endsWith("-")) {
validationErrors.add("name must not end with '-'");
}
if (name.toLowerCase(Locale.ROOT).equals(name) == false) {
validationErrors.add("name must be lower cased");
}
return validationErrors;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
*/
package org.elasticsearch.action.admin.indices.datastream;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
Expand Down Expand Up @@ -82,4 +85,65 @@ public void testCreateDuplicateDataStream() {
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] already exists"));
}

public void testCreateDataStreamWithInvalidName() {
final String dataStreamName = "_My-da#ta- ,stream-";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
assertThat(e.getMessage(), containsString("name must not contain a space"));
assertThat(e.getMessage(), containsString("name must not contain a ','"));
assertThat(e.getMessage(), containsString("name must not contain a '#'"));
assertThat(e.getMessage(), containsString("name must not start with '_'"));
assertThat(e.getMessage(), containsString("name must not end with '-'"));
assertThat(e.getMessage(), containsString("name must be lower cased"));
}

public void testCreateDataStreamThatConflictsWithIndex() {
final String dataStreamName = "my-data-stream";
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().put(IndexMetaData.builder(dataStreamName)
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build(), false).build()).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);

IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] conflicts with existing index"));
}

public void testCreateDataStreamThatConflictsWithAlias() {
final String dataStreamName = "my-data-stream";
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().put(IndexMetaData.builder(dataStreamName + "z")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.putAlias(AliasMetaData.builder(dataStreamName).build())
.build(), false).build()).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);

IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] conflicts with existing alias"));
}

public void testCreateDataStreamWithConflictingBackingIndices() {
final String dataStreamName = "my-data-stream";
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().put(IndexMetaData.builder("." + dataStreamName + "-00001")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build(), false).build()).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);

IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
assertThat(e.getMessage(),
containsString("data_stream [" + dataStreamName + "] could create backing indices that conflict with existing indices"));
}
}