From 29ce3c652d1631848a5b1cf22906ada40ed28e80 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Fri, 19 Jan 2024 19:27:20 -0500 Subject: [PATCH] Temporarily blow away the new tests and add asserts --- .../test/ingest/220_drop_processor.yml | 187 +++++++----------- .../rest-api-spec/test/ingest/60_fail.yml | 43 +--- .../action/bulk/TransportBulkAction.java | 12 ++ 3 files changed, 89 insertions(+), 153 deletions(-) diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/220_drop_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/220_drop_processor.yml index fe17ef1d8531d..dcf201666dfeb 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/220_drop_processor.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/220_drop_processor.yml @@ -1,131 +1,94 @@ --- teardown: - - do: - ingest.delete_pipeline: - id: "my_pipeline" - ignore: 404 +- do: + ingest.delete_pipeline: + id: "my_pipeline" + ignore: 404 --- "Test Drop Processor": - - do: - ingest.put_pipeline: - id: "my_pipeline" - body: > - { - "description" : "pipeline with drop", - "processors" : [ - { - "drop" : { - "if": "ctx.foo == 'bar'" - } +- do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description" : "pipeline with drop", + "processors" : [ + { + "drop" : { + "if": "ctx.foo == 'bar'" } - ] - } - - match: { acknowledged: true } - - - do: - index: - index: test - id: "1" - pipeline: "my_pipeline" - body: { - foo: "bar" + } + ] } +- match: { acknowledged: true } - - do: - index: - index: test - id: "2" - pipeline: "my_pipeline" - body: { - foo: "blub" - } - - - do: - catch: missing - get: - index: test - id: "1" - - match: { found: false } - - - do: - get: - index: test - id: "2" - - match: { _source.foo: "blub" } +- do: + index: + index: test + id: "1" + pipeline: "my_pipeline" + body: { + foo: "bar" + } ---- -"Test Drop Processor On Failure": - - do: - ingest.put_pipeline: - id: "my_pipeline_with_failure" - body: > - { - "description" : "pipeline with on failure drop", - "processors": [ - { - "fail": { - "message": "failed", - "on_failure": [ - { - "drop": {} - } - ] - } - } - ] - } - - match: { acknowledged: true } +- do: + index: + index: test + id: "2" + pipeline: "my_pipeline" + body: { + foo: "blub" + } - - do: - index: - index: test - id: "3" - pipeline: "my_pipeline_with_failure" - body: { - foo: "bar" - } +- do: + catch: missing + get: + index: test + id: "1" +- match: { found: false } - - do: - catch: missing - get: - index: test - id: "3" +- do: + get: + index: test + id: "2" +- match: { _source.foo: "blub" } --- -"Test Drop Processor with Upsert": - - do: - ingest.put_pipeline: - id: "my_pipeline" - body: > - { - "processors": [ +"Test Drop Processor On Failure": +- do: + ingest.put_pipeline: + id: "my_pipeline_with_failure" + body: > + { + "description" : "pipeline with on failure drop", + "processors": [ { - "drop": { + "fail": { + "message": "failed", + "on_failure": [ + { + "drop": {} + } + ] } } ] - } - - match: { acknowledged: true } + } +- match: { acknowledged: true } + +- do: + index: + index: test + id: "3" + pipeline: "my_pipeline_with_failure" + body: { + foo: "bar" + } - - do: - bulk: - refresh: true - pipeline: "my_pipeline" - body: - - update: - _index: test - _id: 4 - - '{"upsert":{"some":"fields"},"script":"ctx"}' - - match: { errors: false } - - match: { items.0.update._index: test } - - match: { items.0.update._id: "4" } - - match: { items.0.update._version: -3 } - - match: { items.0.update.result: noop } - - match: { items.0.update.status: 200 } +- do: + catch: missing + get: + index: test + id: "3" - - do: - catch: missing - get: - index: test - id: "4" diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/60_fail.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/60_fail.yml index 3349fdd4f3e41..e2c331deae340 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/60_fail.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/60_fail.yml @@ -10,7 +10,7 @@ teardown: - do: ingest.put_pipeline: id: "my_pipeline" - body: > + body: > { "description": "_description", "processors": [ @@ -36,7 +36,7 @@ teardown: - do: ingest.put_pipeline: id: "my_pipeline" - body: > + body: > { "description": "_description", "processors": [ @@ -69,42 +69,3 @@ teardown: index: test id: "1" - match: { _source.error_message: "fail_processor_ran" } - ---- -"Test Fail Processor with Upsert": - - do: - ingest.put_pipeline: - id: "my_pipeline" - body: > - { - "processors": [ - { - "fail": { - "message": "error-message" - } - } - ] - } - - match: { acknowledged: true } - - - do: - bulk: - refresh: true - pipeline: "my_pipeline" - body: - - update: - _index: test - _id: 3 - - '{"upsert":{"some":"fields"},"script":"ctx"}' - - match: { errors: true } - - match: { items.0.update._index: test } - - match: { items.0.update._id: "3" } - - match: { items.0.update.status: 500 } - - match: { items.0.update.error.type: fail_processor_exception } - - match: { items.0.update.error.reason: error-message } - - - do: - catch: missing - get: - index: test - id: "3" diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index b5f30771ba53d..250539b1da17d 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -1111,6 +1111,12 @@ private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkIte synchronized void markItemAsFailed(int slot, Exception e) { final DocWriteRequest docWriteRequest = bulkRequest.requests().get(slot); + // temporary asserts for a run of the tests to make sure these are actually equivalent + final IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); + assert Objects.equals(indexRequest.id(), docWriteRequest.id()); + assert Objects.equals(indexRequest.opType(), docWriteRequest.opType()); + assert Objects.equals(indexRequest.index(), docWriteRequest.index()); + assert Objects.equals(indexRequest.version(), docWriteRequest.version()); final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID); // We hit a error during preprocessing a request, so we: // 1) Remember the request item slot from the bulk, so that when we're done processing all requests we know what failed @@ -1123,6 +1129,12 @@ synchronized void markItemAsFailed(int slot, Exception e) { synchronized void markItemAsDropped(int slot) { final DocWriteRequest docWriteRequest = bulkRequest.requests().get(slot); + // temporary asserts for a run of the tests to make sure these are actually equivalent + final IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); + assert Objects.equals(indexRequest.id(), docWriteRequest.id()); + assert Objects.equals(indexRequest.opType(), docWriteRequest.opType()); + assert Objects.equals(indexRequest.index(), docWriteRequest.index()); + assert Objects.equals(indexRequest.version(), docWriteRequest.version()); final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID); failedSlots.set(slot); UpdateResponse dropped = new UpdateResponse(