-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
INGEST: Enable default pipelines #32286
Changes from 6 commits
c502262
aa5c5cf
91d6e0d
c8c4f62
1575e8e
97ceccb
431fa06
f005a40
b6d3ecb
6a9ebae
d98e0b0
d1b593d
f619fd6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
--- | ||
teardown: | ||
- do: | ||
ingest.delete_pipeline: | ||
id: "my_pipeline" | ||
ignore: 404 | ||
|
||
--- | ||
"Test index with default pipeline": | ||
- do: | ||
ingest.put_pipeline: | ||
id: "my_pipeline" | ||
body: > | ||
{ | ||
"description": "_description", | ||
"processors": [ | ||
{ | ||
"bytes" : { | ||
"field" : "bytes_source_field", | ||
"target_field" : "bytes_target_field" | ||
} | ||
} | ||
] | ||
} | ||
- match: { acknowledged: true } | ||
|
||
- do: | ||
indices.create: | ||
index: test | ||
body: | ||
settings: | ||
index: | ||
default_pipeline: "my_pipeline" | ||
|
||
- do: | ||
index: | ||
index: test | ||
type: test | ||
id: 1 | ||
body: {bytes_source_field: "1kb"} | ||
|
||
- do: | ||
get: | ||
index: test | ||
type: test | ||
id: 1 | ||
- match: { _source.bytes_source_field: "1kb" } | ||
- match: { _source.bytes_target_field: 1024 } | ||
|
||
- do: | ||
index: | ||
index: test | ||
type: test | ||
id: 2 | ||
pipeline: "_none" | ||
body: {bytes_source_field: "1kb"} | ||
|
||
- do: | ||
get: | ||
index: test | ||
type: test | ||
id: 2 | ||
- match: { _source.bytes_source_field: "1kb" } | ||
- is_false: _source.bytes_target_field | ||
|
||
- do: | ||
catch: bad_request | ||
index: | ||
index: test | ||
type: test | ||
id: 3 | ||
pipeline: "" | ||
body: {bytes_source_field: "1kb"} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,13 +47,15 @@ | |
import org.elasticsearch.cluster.metadata.MappingMetaData; | ||
import org.elasticsearch.cluster.metadata.MetaData; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.collect.ImmutableOpenMap; | ||
import org.elasticsearch.common.inject.Inject; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.common.util.concurrent.AbstractRunnable; | ||
import org.elasticsearch.common.util.concurrent.AtomicArray; | ||
import org.elasticsearch.index.Index; | ||
import org.elasticsearch.index.IndexNotFoundException; | ||
import org.elasticsearch.index.IndexSettings; | ||
import org.elasticsearch.index.VersionType; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.indices.IndexClosedException; | ||
|
@@ -125,7 +127,27 @@ public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportSe | |
|
||
@Override | ||
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) { | ||
if (bulkRequest.hasIndexRequestsWithPipelines()) { | ||
boolean hasIndexRequestsWithPipelines = false; | ||
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices(); | ||
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) { | ||
if (actionRequest instanceof IndexRequest) { | ||
IndexRequest indexRequest = (IndexRequest) actionRequest; | ||
String pipeline = indexRequest.getPipeline(); | ||
if (pipeline == null) { | ||
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index()); | ||
if (indexMetaData != null) { | ||
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings()); | ||
if (defaultPipeline.isEmpty() == false) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This shouldn't be possible if we are validating a non-empty string? |
||
indexRequest.setPipeline(defaultPipeline); | ||
hasIndexRequestsWithPipelines = true; | ||
} | ||
} | ||
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { | ||
hasIndexRequestsWithPipelines = true; | ||
} | ||
} | ||
} | ||
if (hasIndexRequestsWithPipelines) { | ||
if (clusterService.localNode().isIngestNode()) { | ||
processBulkIndexIngestRequest(task, bulkRequest, listener); | ||
} else { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -254,6 +254,9 @@ public final class IndexSettings { | |
public static final Setting<Integer> MAX_REGEX_LENGTH_SETTING = Setting.intSetting("index.max_regex_length", | ||
1000, 1, Property.Dynamic, Property.IndexScope); | ||
|
||
public static final Setting<String> DEFAULT_PIPELINE = | ||
Setting.simpleString("index.default_pipeline", "", Property.Dynamic, Property.IndexScope); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should add validation to this setting so it is not empty. We can make the default the |
||
|
||
private final Index index; | ||
private final Version version; | ||
private final Logger logger; | ||
|
@@ -293,6 +296,7 @@ public final class IndexSettings { | |
private volatile TimeValue searchIdleAfter; | ||
private volatile int maxAnalyzedOffset; | ||
private volatile int maxTermsCount; | ||
private volatile String defaultPipeline; | ||
|
||
/** | ||
* The maximum number of refresh listeners allows on this shard. | ||
|
@@ -408,6 +412,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti | |
this.mergePolicyConfig = new MergePolicyConfig(logger, this); | ||
this.indexSortConfig = new IndexSortConfig(this); | ||
searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER); | ||
defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE); | ||
|
||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio); | ||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, mergePolicyConfig::setExpungeDeletesAllowed); | ||
|
@@ -447,6 +452,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti | |
scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields); | ||
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter); | ||
scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength); | ||
scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline); | ||
} | ||
|
||
private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; } | ||
|
@@ -822,4 +828,12 @@ public boolean isExplicitRefresh() { | |
* Returns the time that an index shard becomes search idle unless it's accessed in between | ||
*/ | ||
public TimeValue getSearchIdleAfter() { return searchIdleAfter; } | ||
|
||
public String getDefaultPipeline() { | ||
return defaultPipeline; | ||
} | ||
|
||
public void setDefaultPipeline(String defaultPipeline) { | ||
this.defaultPipeline = defaultPipeline; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,12 +73,16 @@ protected void doRun() throws Exception { | |
UpdateRequest updateRequest = (UpdateRequest) actionRequest; | ||
indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest(); | ||
} | ||
if (indexRequest != null && Strings.hasText(indexRequest.getPipeline())) { | ||
if (indexRequest == null) { | ||
continue; | ||
} | ||
String pipeline = indexRequest.getPipeline(); | ||
if (Strings.hasText(pipeline) && IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is hasText necessary if we have validated this is a non empty string? |
||
try { | ||
innerExecute(indexRequest, getPipeline(indexRequest.getPipeline())); | ||
//this shouldn't be needed here but we do it for consistency with index api | ||
// which requires it to prevent double execution | ||
indexRequest.setPipeline(null); | ||
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); | ||
} catch (Exception e) { | ||
itemFailureHandler.accept(indexRequest, e); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While having rest tests is fine to ensure the param is passed through the rest layer correctly, we should first have unit tests.