Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingest ingest then create index #39607

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 0 additions & 66 deletions docs/reference/ingest/ingest-node.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -271,28 +271,6 @@ POST test/_doc/1?pipeline=drop_guests_network
// CONSOLE
// TEST[continued]

////
Hidden example assertion:
[source,js]
--------------------------------------------------
GET test/_doc/1
--------------------------------------------------
// CONSOLE
// TEST[continued]
// TEST[catch:missing]

[source,js]
--------------------------------------------------
{
"_index": "test",
"_type": "_doc",
"_id": "1",
"found": false
}
--------------------------------------------------
// TESTRESPONSE
////

Copy link
Contributor Author

@jakelandis jakelandis Mar 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these were removed because now that the ingest happens before index creation, if the index doesn't exist yet, and we only drop a document, the index never exists, and thus the response is different.

Thanks to the `?.` operator the following document will not throw an error.
If the pipeline used a `.` the following document would throw a NullPointerException
since the `network` object is not part of the source document.
Expand Down Expand Up @@ -392,28 +370,6 @@ POST test/_doc/3?pipeline=drop_guests_network
// CONSOLE
// TEST[continued]

////
Hidden example assertion:
[source,js]
--------------------------------------------------
GET test/_doc/3
--------------------------------------------------
// CONSOLE
// TEST[continued]
// TEST[catch:missing]

[source,js]
--------------------------------------------------
{
"_index": "test",
"_type": "_doc",
"_id": "3",
"found": false
}
--------------------------------------------------
// TESTRESPONSE
////

The `?.` operators works well for use in the `if` conditional
because the {painless}/painless-operators-reference.html#null-safe-operator[null safe operator]
returns null if the object is null and `==` is null safe (as well as many other
Expand Down Expand Up @@ -511,28 +467,6 @@ POST test/_doc/1?pipeline=not_prod_dropper
The document is <<drop-processor,dropped>> since `prod` (case insensitive)
is not found in the tags.

////
Hidden example assertion:
[source,js]
--------------------------------------------------
GET test/_doc/1
--------------------------------------------------
// CONSOLE
// TEST[continued]
// TEST[catch:missing]

[source,js]
--------------------------------------------------
{
"_index": "test",
"_type": "_doc",
"_id": "1",
"found": false
}
--------------------------------------------------
// TESTRESPONSE
////

The following document is indexed (i.e. not dropped) since
`prod` (case insensitive) is found in the tags.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,4 @@ teardown:
get:
index: test
id: 3
- match: { found: false }
Copy link
Contributor Author

@jakelandis jakelandis Mar 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is removed because with this change the test index is no longer created since the index creation only happens after ingest (and this ingest drops the index request, thus no index ).


Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "retarget"
ignore: 404

- do:
indices.delete:
index: foo

---
"Test Change Target Index with Explicit Pipeline":

- do:
ingest.put_pipeline:
id: "retarget"
body: >
{
"processors": [
{
"set" : {
"field" : "_index",
"value" : "foo"
}
}
]
}
- match: { acknowledged: true }

# no indices
- do:
cat.indices: {}

- match:
$body: |
/^$/

- do:
index:
index: test
id: 1
pipeline: "retarget"
body: {
a: true
}

- do:
get:
index: foo
id: 1
- match: { _source.a: true }

# only the foo index
- do:
cat.indices:
h: i

- match:
$body: |
/^foo\n$/

---
"Test Change Target Index with Default Pipeline":

- do:
indices.put_template:
name: index_template
body:
index_patterns: test
settings:
default_pipeline: "retarget"

- do:
ingest.put_pipeline:
id: "retarget"
body: >
{
"processors": [
{
"set" : {
"field" : "_index",
"value" : "foo"
}
}
]
}
- match: { acknowledged: true }

# no indices
- do:
cat.indices: {}

- match:
$body: |
/^$/

- do:
index:
index: test
id: 1
body: {
a: true
}

- do:
get:
index: foo
id: 1
- match: { _source.a: true }

# only the foo index
- do:
cat.indices:
h: i

- match:
$body: |
/^foo\n$/
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
Expand Down Expand Up @@ -151,6 +154,71 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
final long startTime = relativeTime();
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());

boolean hasIndexRequestsWithPipelines = false;
final MetaData metaData = clusterService.state().getMetaData();
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
// get pipeline from request
String pipeline = indexRequest.getPipeline();
if (pipeline == null) {
// start to look for default pipeline via settings found in the index meta data
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
if (indexMetaData == null && indexRequest.index() != null) {
// if the write request if through an alias use the write index's meta data
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
indexMetaData = alias.getWriteIndex();
}
}
if (indexMetaData != null) {
// Find the the default pipeline if one is defined from and existing index.
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
indexRequest.setPipeline(defaultPipeline);
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
hasIndexRequestsWithPipelines = true;
}
} else if (indexRequest.index() != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the ingest node execution before the create index is the primary change of this PR.

However, to allow the prevent re-introducing #32758, we need to look up the default pipeline from the matched templates which is what this code block does.

// No index exists yet (and is valid request), so matching index templates to look for a default pipeline
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
assert (templates != null);
String defaultPipeline = IngestService.NOOP_PIPELINE_NAME;
// apply templates, here, in reverse order, since first ones are better matching
for (int i = templates.size() - 1; i >= 0; i--) {
original-brownbear marked this conversation as resolved.
Show resolved Hide resolved
Settings settings = templates.get(i).settings();
if(IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
}
}
indexRequest.setPipeline(defaultPipeline);
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
hasIndexRequestsWithPipelines = true;
}
}
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
hasIndexRequestsWithPipelines = true;
}
}
}

if (hasIndexRequestsWithPipelines) {
// this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but
// also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method,
// this path is never taken.
try {
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
} else {
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
}
} catch (Exception e) {
listener.onFailure(e);
}
return;
}

if (needToCheck()) {
// Attempt to create all the indices that we're going to need during the bulk before we start.
// Step 1: collect all the indices in the request
Expand Down Expand Up @@ -181,15 +249,15 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
}
// Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
if (autoCreateIndices.isEmpty()) {
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
} else {
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
for (String index : autoCreateIndices) {
createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
}
}

Expand All @@ -205,7 +273,7 @@ public void onFailure(Exception e) {
}
}
if (counter.decrementAndGet() == 0) {
executeIngestAndBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
inner.addSuppressed(e);
listener.onFailure(inner);
}), responses, indicesThatCannotBeCreated);
Expand All @@ -215,56 +283,7 @@ public void onFailure(Exception e) {
}
}
} else {
executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
}
}

private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos,
final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
boolean hasIndexRequestsWithPipelines = false;
final MetaData metaData = clusterService.state().getMetaData();
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if(indexRequest != null){
String pipeline = indexRequest.getPipeline();
if (pipeline == null) {
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
if (indexMetaData == null && indexRequest.index() != null) {
//check the alias
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
indexMetaData = alias.getWriteIndex();
}
}
if (indexMetaData == null) {
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
} else {
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
indexRequest.setPipeline(defaultPipeline);
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
hasIndexRequestsWithPipelines = true;
}
}
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
hasIndexRequestsWithPipelines = true;
}
}
}
if (hasIndexRequestsWithPipelines) {
try {
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
} else {
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
}
} catch (Exception e) {
listener.onFailure(e);
}
} else {
executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated);
executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
}
}

Expand Down
Loading