Skip to content

Commit

Permalink
[8.12] Ingest correctly handle upsert operations and drop processors …
Browse files Browse the repository at this point in the history
…together ( #104585) (#104884)
  • Loading branch information
joegallo authored Jan 30, 2024
1 parent 4f94a2a commit 5d3894d
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 109 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/104585.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 104585
summary: Ingest correctly handle upsert operations and drop processors together
area: Ingest Node
type: bug
issues:
- 36746
Original file line number Diff line number Diff line change
@@ -1,94 +1,185 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404
- do:
indices.delete:
index: "test"
ignore_unavailable: true
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404

---
"Test Drop Processor":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description" : "pipeline with drop",
"processors" : [
{
"drop" : {
"if": "ctx.foo == 'bar'"
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description" : "pipeline with drop",
"processors" : [
{
"drop" : {
"if": "ctx.foo == 'bar'"
}
}
}
]
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: "1"
pipeline: "my_pipeline"
body: {
foo: "bar"
}
- match: { acknowledged: true }

- do:
index:
index: test
id: "1"
pipeline: "my_pipeline"
body: {
foo: "bar"
}

- do:
index:
index: test
id: "2"
pipeline: "my_pipeline"
body: {
foo: "blub"
}

- do:
catch: missing
get:
index: test
id: "1"
- match: { found: false }

- do:
get:
index: test
id: "2"
- match: { _source.foo: "blub" }

- do:
index:
index: test
id: "2"
pipeline: "my_pipeline"
body: {
foo: "blub"
}

- do:
catch: missing
get:
index: test
id: "1"
- match: { found: false }

- do:
get:
index: test
id: "2"
- match: { _source.foo: "blub" }

---
"Test Drop Processor On Failure":
- do:
ingest.put_pipeline:
id: "my_pipeline_with_failure"
body: >
{
"description" : "pipeline with on failure drop",
"processors": [
- do:
ingest.put_pipeline:
id: "my_pipeline_with_failure"
body: >
{
"description" : "pipeline with on failure drop",
"processors": [
{
"fail": {
"message": "failed",
"on_failure": [
{
"drop": {}
}
]
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: "3"
pipeline: "my_pipeline_with_failure"
body: {
foo: "bar"
}

- do:
catch: missing
get:
index: test
id: "3"

---
"Test Drop Processor with Upsert (_bulk)":
- skip:
version: ' - 8.12.0'
reason: 'https://github.com/elastic/elasticsearch/issues/36746 fixed in 8.12.1'
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"processors": [
{
"fail": {
"message": "failed",
"on_failure": [
{
"drop": {}
}
]
"drop": {
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: "3"
pipeline: "my_pipeline_with_failure"
body: {
foo: "bar"
}

- do:
catch: missing
get:
index: test
id: "3"
}
- match: { acknowledged: true }

- do:
bulk:
refresh: true
pipeline: "my_pipeline"
body:
- update:
_index: test
_id: 4
- '{"upsert":{"some":"fields"},"script":"ctx"}'
- match: { errors: false }
- match: { items.0.update._index: test }
- match: { items.0.update._id: "4" }
- match: { items.0.update._version: -3 }
- match: { items.0.update.result: noop }
- match: { items.0.update.status: 200 }

- do:
catch: missing
get:
index: test
id: "4"

---
"Test Drop Processor with Upsert (_update)":
- skip:
version: ' - 8.12.0'
reason: 'https://github.com/elastic/elasticsearch/issues/36746 fixed in 8.12.1'
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"processors": [
{
"drop": {
}
}
]
}
- match: { acknowledged: true }

- do:
indices.create:
index: test
body:
settings:
index:
default_pipeline: "my_pipeline"

- do:
update:
index: test
id: "5"
body:
script:
source: "ctx._source.foo = 'bar'"
upsert:
foo: "bar"

- match: { _index: test }
- match: { _id: "5" }
- match: { result: noop }

- do:
catch: missing
get:
index: test
id: "5"
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
---
teardown:
- do:
indices.delete:
index: "test"
ignore_unavailable: true
- do:
ingest.delete_pipeline:
id: "my_pipeline"
Expand All @@ -10,7 +14,7 @@ teardown:
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
body: >
{
"description": "_description",
"processors": [
Expand All @@ -36,7 +40,7 @@ teardown:
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
body: >
{
"description": "_description",
"processors": [
Expand Down Expand Up @@ -69,3 +73,87 @@ teardown:
index: test
id: "1"
- match: { _source.error_message: "fail_processor_ran" }

---
"Test Fail Processor with Upsert (bulk)":
- skip:
version: ' - 8.12.0'
reason: 'https://github.com/elastic/elasticsearch/issues/36746 fixed in 8.12.1'
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"processors": [
{
"fail": {
"message": "error-message"
}
}
]
}
- match: { acknowledged: true }

- do:
bulk:
refresh: true
pipeline: "my_pipeline"
body:
- update:
_index: test
_id: 3
- '{"upsert":{"some":"fields"},"script":"ctx"}'
- match: { errors: true }
- match: { items.0.update._index: test }
- match: { items.0.update._id: "3" }
- match: { items.0.update.status: 500 }
- match: { items.0.update.error.type: fail_processor_exception }
- match: { items.0.update.error.reason: /error-message/ }

- do:
catch: missing
get:
index: test
id: "3"

---
"Test Fail Processor with Upsert (_update)":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"processors": [
{
"fail": {
"message": "error-message"
}
}
]
}
- match: { acknowledged: true }

- do:
indices.create:
index: test
body:
settings:
index:
default_pipeline: "my_pipeline"

- do:
update:
index: test
id: "4"
body:
script:
source: "ctx._source.foo = 'bar'"
upsert:
foo: "bar"
catch: /error-message/

- do:
catch: missing
get:
index: test
id: "4"
Loading

0 comments on commit 5d3894d

Please sign in to comment.