-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
INGEST: Enable default pipelines #32286
Changes from 1 commit
c502262
aa5c5cf
91d6e0d
c8c4f62
1575e8e
97ceccb
431fa06
f005a40
b6d3ecb
6a9ebae
d98e0b0
d1b593d
f619fd6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
--- | ||
teardown: | ||
- do: | ||
ingest.delete_pipeline: | ||
id: "my_pipeline" | ||
ignore: 404 | ||
|
||
--- | ||
"Test index with default pipeline": | ||
- do: | ||
ingest.put_pipeline: | ||
id: "my_pipeline" | ||
body: > | ||
{ | ||
"description": "_description", | ||
"processors": [ | ||
{ | ||
"bytes" : { | ||
"field" : "bytes_source_field", | ||
"target_field" : "bytes_target_field" | ||
} | ||
} | ||
] | ||
} | ||
- match: { acknowledged: true } | ||
|
||
- do: | ||
indices.create: | ||
index: test | ||
body: | ||
settings: | ||
index: | ||
default_pipeline: "my_pipeline" | ||
|
||
- do: | ||
index: | ||
index: test | ||
type: test | ||
id: 1 | ||
body: {bytes_source_field: "1kb"} | ||
|
||
- do: | ||
get: | ||
index: test | ||
type: test | ||
id: 1 | ||
- match: { _source.bytes_source_field: "1kb" } | ||
- match: { _source.bytes_target_field: 1024 } | ||
|
||
- do: | ||
index: | ||
index: test | ||
type: test | ||
id: 2 | ||
pipeline: "" | ||
body: {bytes_source_field: "1kb"} | ||
|
||
- do: | ||
get: | ||
index: test | ||
type: test | ||
id: 2 | ||
- match: { _source.bytes_source_field: "1kb" } | ||
- is_false: _source.bytes_target_field | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,13 +47,15 @@ | |
import org.elasticsearch.cluster.metadata.MappingMetaData; | ||
import org.elasticsearch.cluster.metadata.MetaData; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.collect.ImmutableOpenMap; | ||
import org.elasticsearch.common.inject.Inject; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.common.util.concurrent.AbstractRunnable; | ||
import org.elasticsearch.common.util.concurrent.AtomicArray; | ||
import org.elasticsearch.index.Index; | ||
import org.elasticsearch.index.IndexNotFoundException; | ||
import org.elasticsearch.index.IndexSettings; | ||
import org.elasticsearch.index.VersionType; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.indices.IndexClosedException; | ||
|
@@ -125,7 +127,27 @@ public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportSe | |
|
||
@Override | ||
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) { | ||
if (bulkRequest.hasIndexRequestsWithPipelines()) { | ||
boolean hasIndexRequestsWithPipelines = false; | ||
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices(); | ||
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) { | ||
if (actionRequest instanceof IndexRequest) { | ||
IndexRequest indexRequest = (IndexRequest) actionRequest; | ||
String pipeline = indexRequest.getPipeline(); | ||
if (pipeline == null) { | ||
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index()); | ||
if (indexMetaData != null) { | ||
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings()); | ||
if (!defaultPipeline.isEmpty()) { | ||
indexRequest.setPipeline(defaultPipeline); | ||
hasIndexRequestsWithPipelines = true; | ||
} | ||
} | ||
} else if (!pipeline.isEmpty()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We prefer |
||
hasIndexRequestsWithPipelines = true; | ||
} | ||
} | ||
} | ||
if (hasIndexRequestsWithPipelines) { | ||
if (clusterService.localNode().isIngestNode()) { | ||
processBulkIndexIngestRequest(task, bulkRequest, listener); | ||
} else { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -254,6 +254,9 @@ public final class IndexSettings { | |
public static final Setting<Integer> MAX_REGEX_LENGTH_SETTING = Setting.intSetting("index.max_regex_length", | ||
1000, 1, Property.Dynamic, Property.IndexScope); | ||
|
||
public static final Setting<String> DEFAULT_PIPELINE = | ||
Setting.simpleString("index.default_pipeline", "", Property.Dynamic, Property.IndexScope); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should add validation to this setting so it is not empty. We can make the default the |
||
|
||
private final Index index; | ||
private final Version version; | ||
private final Logger logger; | ||
|
@@ -293,6 +296,7 @@ public final class IndexSettings { | |
private volatile TimeValue searchIdleAfter; | ||
private volatile int maxAnalyzedOffset; | ||
private volatile int maxTermsCount; | ||
private volatile String defaultPipeline; | ||
|
||
/** | ||
* The maximum number of refresh listeners allows on this shard. | ||
|
@@ -408,6 +412,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti | |
this.mergePolicyConfig = new MergePolicyConfig(logger, this); | ||
this.indexSortConfig = new IndexSortConfig(this); | ||
searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER); | ||
defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE); | ||
|
||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio); | ||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, mergePolicyConfig::setExpungeDeletesAllowed); | ||
|
@@ -447,6 +452,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti | |
scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields); | ||
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter); | ||
scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength); | ||
scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline); | ||
} | ||
|
||
private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; } | ||
|
@@ -822,4 +828,12 @@ public boolean isExplicitRefresh() { | |
* Returns the time that an index shard becomes search idle unless it's accessed in between | ||
*/ | ||
public TimeValue getSearchIdleAfter() { return searchIdleAfter; } | ||
|
||
public String getDefaultPipeline() { | ||
return defaultPipeline; | ||
} | ||
|
||
public void setDefaultPipeline(String defaultPipeline) { | ||
this.defaultPipeline = defaultPipeline; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,7 +78,7 @@ protected void doRun() throws Exception { | |
innerExecute(indexRequest, getPipeline(indexRequest.getPipeline())); | ||
//this shouldn't be needed here but we do it for consistency with index api | ||
// which requires it to prevent double execution | ||
indexRequest.setPipeline(null); | ||
indexRequest.setPipeline(""); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Had to change this since There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you elaborate on what triggers this? I'm unsure about the original comment, but it seems like that should be fixed first, so that we can have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rjernst the problem is that I need to somehow differentiate between "no pipeline so set default" (when the request first hits So better move to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Sorry I don't understand what you are suggesting here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. simply:
sorry, seems it's too late for my writing skills :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel strongly There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rjernst I like it :) Error for |
||
} catch (Exception e) { | ||
itemFailureHandler.accept(indexRequest, e); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While having rest tests is fine to ensure the param is passed through the rest layer correctly, we should first have unit tests.