diff --git a/docs/changelog/104585.yaml b/docs/changelog/104585.yaml new file mode 100644 index 0000000000000..8c2b20fe54d0c --- /dev/null +++ b/docs/changelog/104585.yaml @@ -0,0 +1,6 @@ +pr: 104585 +summary: Ingest correctly handle upsert operations and drop processors together +area: Ingest Node +type: bug +issues: + - 36746 diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/220_drop_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/220_drop_processor.yml index dcf201666dfeb..8a33f1c157ad3 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/220_drop_processor.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/220_drop_processor.yml @@ -1,94 +1,185 @@ --- teardown: -- do: - ingest.delete_pipeline: - id: "my_pipeline" - ignore: 404 + - do: + indices.delete: + index: "test" + ignore_unavailable: true + - do: + ingest.delete_pipeline: + id: "my_pipeline" + ignore: 404 --- "Test Drop Processor": -- do: - ingest.put_pipeline: - id: "my_pipeline" - body: > - { - "description" : "pipeline with drop", - "processors" : [ - { - "drop" : { - "if": "ctx.foo == 'bar'" + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description" : "pipeline with drop", + "processors" : [ + { + "drop" : { + "if": "ctx.foo == 'bar'" + } } - } - ] + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: "1" + pipeline: "my_pipeline" + body: { + foo: "bar" } -- match: { acknowledged: true } - -- do: - index: - index: test - id: "1" - pipeline: "my_pipeline" - body: { - foo: "bar" - } - -- do: - index: - index: test - id: "2" - pipeline: "my_pipeline" - body: { - foo: "blub" - } - -- do: - catch: missing - get: - index: test - id: "1" -- match: { found: false } - -- do: - get: - index: test - id: "2" -- match: { _source.foo: "blub" } + + - do: + index: + index: test + id: "2" + pipeline: "my_pipeline" + body: { + foo: "blub" + } + + - do: + catch: missing + get: + index: test + id: "1" + - match: { found: false } + + - do: + get: + index: test + id: "2" + - match: { _source.foo: "blub" } --- "Test Drop Processor On Failure": -- do: - ingest.put_pipeline: - id: "my_pipeline_with_failure" - body: > - { - "description" : "pipeline with on failure drop", - "processors": [ + - do: + ingest.put_pipeline: + id: "my_pipeline_with_failure" + body: > + { + "description" : "pipeline with on failure drop", + "processors": [ + { + "fail": { + "message": "failed", + "on_failure": [ + { + "drop": {} + } + ] + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: "3" + pipeline: "my_pipeline_with_failure" + body: { + foo: "bar" + } + + - do: + catch: missing + get: + index: test + id: "3" + +--- +"Test Drop Processor with Upsert (_bulk)": + - skip: + version: ' - 7.17.17' + reason: 'https://github.com/elastic/elasticsearch/issues/36746 fixed in 7.17.18' + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "processors": [ { - "fail": { - "message": "failed", - "on_failure": [ - { - "drop": {} - } - ] + "drop": { } } ] - } -- match: { acknowledged: true } - -- do: - index: - index: test - id: "3" - pipeline: "my_pipeline_with_failure" - body: { - foo: "bar" - } - -- do: - catch: missing - get: - index: test - id: "3" + } + - match: { acknowledged: true } + + - do: + bulk: + refresh: true + pipeline: "my_pipeline" + body: + - update: + _index: test + _id: 4 + - '{"upsert":{"some":"fields"},"script":"ctx"}' + - match: { errors: false } + - match: { items.0.update._index: test } + - match: { items.0.update._id: "4" } + - match: { items.0.update._version: -3 } + - match: { items.0.update.result: noop } + - match: { items.0.update.status: 200 } + + - do: + catch: missing + get: + index: test + id: "4" + +--- +"Test Drop Processor with Upsert (_update)": + - skip: + version: ' - 7.17.17' + reason: 'https://github.com/elastic/elasticsearch/issues/36746 fixed in 7.17.18' + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "processors": [ + { + "drop": { + } + } + ] + } + - match: { acknowledged: true } + + - do: + indices.create: + index: test + body: + settings: + index: + default_pipeline: "my_pipeline" + + - do: + update: + index: test + id: "5" + body: + script: + source: "ctx._source.foo = 'bar'" + upsert: + foo: "bar" + + - match: { _index: test } + - match: { _id: "5" } + - match: { result: noop } + - do: + catch: missing + get: + index: test + id: "5" diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/60_fail.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/60_fail.yml index e2c331deae340..f0504108ef7c9 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/60_fail.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/60_fail.yml @@ -1,5 +1,9 @@ --- teardown: + - do: + indices.delete: + index: "test" + ignore_unavailable: true - do: ingest.delete_pipeline: id: "my_pipeline" @@ -10,7 +14,7 @@ teardown: - do: ingest.put_pipeline: id: "my_pipeline" - body: > + body: > { "description": "_description", "processors": [ @@ -36,7 +40,7 @@ teardown: - do: ingest.put_pipeline: id: "my_pipeline" - body: > + body: > { "description": "_description", "processors": [ @@ -69,3 +73,87 @@ teardown: index: test id: "1" - match: { _source.error_message: "fail_processor_ran" } + +--- +"Test Fail Processor with Upsert (bulk)": + - skip: + version: ' - 7.17.17' + reason: 'https://github.com/elastic/elasticsearch/issues/36746 fixed in 7.17.18' + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "processors": [ + { + "fail": { + "message": "error-message" + } + } + ] + } + - match: { acknowledged: true } + + - do: + bulk: + refresh: true + pipeline: "my_pipeline" + body: + - update: + _index: test + _id: 3 + - '{"upsert":{"some":"fields"},"script":"ctx"}' + - match: { errors: true } + - match: { items.0.update._index: test } + - match: { items.0.update._id: "3" } + - match: { items.0.update.status: 500 } + - match: { items.0.update.error.type: fail_processor_exception } + - match: { items.0.update.error.reason: /error-message/ } + + - do: + catch: missing + get: + index: test + id: "3" + +--- +"Test Fail Processor with Upsert (_update)": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "processors": [ + { + "fail": { + "message": "error-message" + } + } + ] + } + - match: { acknowledged: true } + + - do: + indices.create: + index: test + body: + settings: + index: + default_pipeline: "my_pipeline" + + - do: + update: + index: test + id: "4" + body: + script: + source: "ctx._source.foo = 'bar'" + upsert: + foo: "bar" + catch: /error-message/ + + - do: + catch: missing + get: + index: test + id: "4" diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index c57dcc529c19e..60fb5b584219b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -103,7 +103,7 @@ public class TransportBulkAction extends HandledTransportAction docWriteRequest = bulkRequest.requests().get(slot); + final String id = docWriteRequest.id() == null ? DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID : docWriteRequest.id(); // We hit a error during preprocessing a request, so we: // 1) Remember the request item slot from the bulk, so that when we're done processing all requests we know what failed // 2) Add a bulk item failure for this request // 3) Continue with the next request in the bulk. failedSlots.set(slot); - BulkItemResponse.Failure failure = new BulkItemResponse.Failure( - indexRequest.index(), - indexRequest.type(), - indexRequest.id(), - e - ); - itemResponses.add(BulkItemResponse.failure(slot, indexRequest.opType(), failure)); + BulkItemResponse.Failure failure = new BulkItemResponse.Failure(docWriteRequest.index(), docWriteRequest.type(), id, e); + itemResponses.add(BulkItemResponse.failure(slot, docWriteRequest.opType(), failure)); } + synchronized void markItemAsDropped(int slot) { + final DocWriteRequest docWriteRequest = bulkRequest.requests().get(slot); + failedSlots.set(slot); + final String id = docWriteRequest.id() == null ? DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID : docWriteRequest.id(); + UpdateResponse dropped = new UpdateResponse( + new ShardId(docWriteRequest.index(), IndexMetadata.INDEX_UUID_NA_VALUE, 0), + docWriteRequest.type(), + id, + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + docWriteRequest.version(), + DocWriteResponse.Result.NOOP + ); + itemResponses.add(BulkItemResponse.success(slot, docWriteRequest.opType(), dropped)); + } } } diff --git a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index 54e4a47d9c0ae..8478bbac2d6c7 100644 --- a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -12,13 +12,13 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.TransportActions; @@ -206,7 +206,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< IndexRequest upsertRequest = result.action(); // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request final BytesReference upsertSourceBytes = upsertRequest.source(); - client.bulk(toSingleItemBulkRequest(upsertRequest), wrapBulkResponse(ActionListener.wrap(response -> { + client.bulk(toSingleItemBulkRequest(upsertRequest), wrapBulkResponse(ActionListener.wrap(response -> { UpdateResponse update = new UpdateResponse( response.getShardInfo(), response.getShardId(), @@ -247,7 +247,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< IndexRequest indexRequest = result.action(); // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request final BytesReference indexSourceBytes = indexRequest.source(); - client.bulk(toSingleItemBulkRequest(indexRequest), wrapBulkResponse(ActionListener.wrap(response -> { + client.bulk(toSingleItemBulkRequest(indexRequest), wrapBulkResponse(ActionListener.wrap(response -> { UpdateResponse update = new UpdateResponse( response.getShardInfo(), response.getShardId(),