Skip to content

Commit

Permalink
INGEST: Enable default pipelines
Browse files Browse the repository at this point in the history
* Add `default_pipeline` index setting
* Empty string pipeline argument is interpreted as no pipeline
* closes elastic#21101
  • Loading branch information
original-brownbear committed Jul 23, 2018
1 parent 33f11e6 commit c502262
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404

---
"Test index with default pipeline":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"bytes" : {
"field" : "bytes_source_field",
"target_field" : "bytes_target_field"
}
}
]
}
- match: { acknowledged: true }

- do:
indices.create:
index: test
body:
settings:
index:
default_pipeline: "my_pipeline"

- do:
index:
index: test
type: test
id: 1
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
type: test
id: 1
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }

- do:
index:
index: test
type: test
id: 2
pipeline: ""
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
type: test
id: 2
- match: { _source.bytes_source_field: "1kb" }
- is_false: _source.bytes_target_field

Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexClosedException;
Expand Down Expand Up @@ -125,7 +127,27 @@ public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportSe

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
if (bulkRequest.hasIndexRequestsWithPipelines()) {
boolean hasIndexRequestsWithPipelines = false;
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
String pipeline = indexRequest.getPipeline();
if (pipeline == null) {
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
if (indexMetaData != null) {
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
if (!defaultPipeline.isEmpty()) {
indexRequest.setPipeline(defaultPipeline);
hasIndexRequestsWithPipelines = true;
}
}
} else if (!pipeline.isEmpty()) {
hasIndexRequestsWithPipelines = true;
}
}
}
if (hasIndexRequestsWithPipelines) {
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EngineConfig.INDEX_CODEC_SETTING,
EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,
IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
IndexSettings.DEFAULT_PIPELINE,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ public final class IndexSettings {
public static final Setting<Integer> MAX_REGEX_LENGTH_SETTING = Setting.intSetting("index.max_regex_length",
1000, 1, Property.Dynamic, Property.IndexScope);

public static final Setting<String> DEFAULT_PIPELINE =
Setting.simpleString("index.default_pipeline", "", Property.Dynamic, Property.IndexScope);

private final Index index;
private final Version version;
private final Logger logger;
Expand Down Expand Up @@ -293,6 +296,7 @@ public final class IndexSettings {
private volatile TimeValue searchIdleAfter;
private volatile int maxAnalyzedOffset;
private volatile int maxTermsCount;
private volatile String defaultPipeline;

/**
* The maximum number of refresh listeners allows on this shard.
Expand Down Expand Up @@ -408,6 +412,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
this.mergePolicyConfig = new MergePolicyConfig(logger, this);
this.indexSortConfig = new IndexSortConfig(this);
searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER);
defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE);

scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, mergePolicyConfig::setExpungeDeletesAllowed);
Expand Down Expand Up @@ -447,6 +452,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter);
scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength);
scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }
Expand Down Expand Up @@ -822,4 +828,12 @@ public boolean isExplicitRefresh() {
* Returns the time that an index shard becomes search idle unless it's accessed in between
*/
public TimeValue getSearchIdleAfter() { return searchIdleAfter; }

public String getDefaultPipeline() {
return defaultPipeline;
}

public void setDefaultPipeline(String defaultPipeline) {
this.defaultPipeline = defaultPipeline;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected void doRun() throws Exception {
innerExecute(indexRequest, getPipeline(indexRequest.getPipeline()));
//this shouldn't be needed here but we do it for consistency with index api
// which requires it to prevent double execution
indexRequest.setPipeline(null);
indexRequest.setPipeline("");
} catch (Exception e) {
itemFailureHandler.accept(indexRequest, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -45,6 +46,7 @@
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCase {
public void testNonExceptional() {
Expand Down Expand Up @@ -97,7 +99,11 @@ public void testSomeFail() {

private void indicesThatCannotBeCreatedTestCase(Set<String> expected,
BulkRequest bulkRequest, Function<String, Boolean> shouldAutoCreate) {
TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), mock(ClusterService.class),
ClusterService clusterService = mock(ClusterService.class);
ClusterState state = mock(ClusterState.class);
when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
when(clusterService.state()).thenReturn(state);
TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), clusterService,
null, null, null, mock(ActionFilters.class), null, null) {
@Override
void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -153,6 +154,7 @@ public void setupAction() {
when(nodes.getIngestNodes()).thenReturn(ingestNodes);
ClusterState state = mock(ClusterState.class);
when(state.getNodes()).thenReturn(nodes);
when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
when(clusterService.state()).thenReturn(state);
doAnswer(invocation -> {
ClusterChangedEvent event = mock(ClusterChangedEvent.class);
Expand Down

0 comments on commit c502262

Please sign in to comment.