Skip to content

Commit

Permalink
Temporarily blow away the new tests and add asserts
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo committed Jan 20, 2024
1 parent 5872ce1 commit 29ce3c6
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 153 deletions.
Original file line number Diff line number Diff line change
@@ -1,131 +1,94 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404

---
"Test Drop Processor":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description" : "pipeline with drop",
"processors" : [
{
"drop" : {
"if": "ctx.foo == 'bar'"
}
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description" : "pipeline with drop",
"processors" : [
{
"drop" : {
"if": "ctx.foo == 'bar'"
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: "1"
pipeline: "my_pipeline"
body: {
foo: "bar"
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: "2"
pipeline: "my_pipeline"
body: {
foo: "blub"
}

- do:
catch: missing
get:
index: test
id: "1"
- match: { found: false }

- do:
get:
index: test
id: "2"
- match: { _source.foo: "blub" }
- do:
index:
index: test
id: "1"
pipeline: "my_pipeline"
body: {
foo: "bar"
}

---
"Test Drop Processor On Failure":
- do:
ingest.put_pipeline:
id: "my_pipeline_with_failure"
body: >
{
"description" : "pipeline with on failure drop",
"processors": [
{
"fail": {
"message": "failed",
"on_failure": [
{
"drop": {}
}
]
}
}
]
}
- match: { acknowledged: true }
- do:
index:
index: test
id: "2"
pipeline: "my_pipeline"
body: {
foo: "blub"
}

- do:
index:
index: test
id: "3"
pipeline: "my_pipeline_with_failure"
body: {
foo: "bar"
}
- do:
catch: missing
get:
index: test
id: "1"
- match: { found: false }

- do:
catch: missing
get:
index: test
id: "3"
- do:
get:
index: test
id: "2"
- match: { _source.foo: "blub" }

---
"Test Drop Processor with Upsert":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"processors": [
"Test Drop Processor On Failure":
- do:
ingest.put_pipeline:
id: "my_pipeline_with_failure"
body: >
{
"description" : "pipeline with on failure drop",
"processors": [
{
"drop": {
"fail": {
"message": "failed",
"on_failure": [
{
"drop": {}
}
]
}
}
]
}
- match: { acknowledged: true }
}
- match: { acknowledged: true }

- do:
index:
index: test
id: "3"
pipeline: "my_pipeline_with_failure"
body: {
foo: "bar"
}

- do:
bulk:
refresh: true
pipeline: "my_pipeline"
body:
- update:
_index: test
_id: 4
- '{"upsert":{"some":"fields"},"script":"ctx"}'
- match: { errors: false }
- match: { items.0.update._index: test }
- match: { items.0.update._id: "4" }
- match: { items.0.update._version: -3 }
- match: { items.0.update.result: noop }
- match: { items.0.update.status: 200 }
- do:
catch: missing
get:
index: test
id: "3"

- do:
catch: missing
get:
index: test
id: "4"
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ teardown:
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
body: >
{
"description": "_description",
"processors": [
Expand All @@ -36,7 +36,7 @@ teardown:
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
body: >
{
"description": "_description",
"processors": [
Expand Down Expand Up @@ -69,42 +69,3 @@ teardown:
index: test
id: "1"
- match: { _source.error_message: "fail_processor_ran" }

---
"Test Fail Processor with Upsert":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"processors": [
{
"fail": {
"message": "error-message"
}
}
]
}
- match: { acknowledged: true }

- do:
bulk:
refresh: true
pipeline: "my_pipeline"
body:
- update:
_index: test
_id: 3
- '{"upsert":{"some":"fields"},"script":"ctx"}'
- match: { errors: true }
- match: { items.0.update._index: test }
- match: { items.0.update._id: "3" }
- match: { items.0.update.status: 500 }
- match: { items.0.update.error.type: fail_processor_exception }
- match: { items.0.update.error.reason: error-message }

- do:
catch: missing
get:
index: test
id: "3"
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,12 @@ private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkIte

synchronized void markItemAsFailed(int slot, Exception e) {
final DocWriteRequest<?> docWriteRequest = bulkRequest.requests().get(slot);
// temporary asserts for a run of the tests to make sure these are actually equivalent
final IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot));
assert Objects.equals(indexRequest.id(), docWriteRequest.id());
assert Objects.equals(indexRequest.opType(), docWriteRequest.opType());
assert Objects.equals(indexRequest.index(), docWriteRequest.index());
assert Objects.equals(indexRequest.version(), docWriteRequest.version());
final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID);
// We hit a error during preprocessing a request, so we:
// 1) Remember the request item slot from the bulk, so that when we're done processing all requests we know what failed
Expand All @@ -1123,6 +1129,12 @@ synchronized void markItemAsFailed(int slot, Exception e) {

synchronized void markItemAsDropped(int slot) {
final DocWriteRequest<?> docWriteRequest = bulkRequest.requests().get(slot);
// temporary asserts for a run of the tests to make sure these are actually equivalent
final IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot));
assert Objects.equals(indexRequest.id(), docWriteRequest.id());
assert Objects.equals(indexRequest.opType(), docWriteRequest.opType());
assert Objects.equals(indexRequest.index(), docWriteRequest.index());
assert Objects.equals(indexRequest.version(), docWriteRequest.version());
final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID);
failedSlots.set(slot);
UpdateResponse dropped = new UpdateResponse(
Expand Down

0 comments on commit 29ce3c6

Please sign in to comment.