-
Notifications
You must be signed in to change notification settings - Fork 24.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ingest correctly handle upsert operations and drop processors together #104585
Ingest correctly handle upsert operations and drop processors together #104585
Conversation
It doesn't invoke new ShardId(...) which is where the exception was, but it's still nice to report this information if we have it (which we do!).
Failure is so much more normal/expected than dropping, so let's have that one first. Also it has a better explanation of what's going on and why, which I like having earlier (but don't want to duplicate).
Increasing the symmetry between markItemAsFailed and markItemAsDropped.
Hi @joegallo, I've created a changelog YAML for you. |
Seems like a good idea. One test that would be good to have (and this might be one you already have in mind) is a non-bulk upsert since the issue reports a different stack trace for non-bulk. I think the current tests only cover bulk, unless I missed something. |
Yes! That's the precisely the test I have in mind. |
This requires that we have a default_pipeline in place, so I've also extended the cleanup to include deleting the index -- otherwise if something goes wrong the cleanup can fail because you can't drop a pipeline if there's an index using it.
The response will be an UpdateResponse (not an IndexResponse) when a document is dropped on an upsert, both of those are a DocWriteResponse.
This requires that we have a default_pipeline in place, so I've also extended the cleanup to include deleting the index -- otherwise if something goes wrong the cleanup can fail because you can't drop a pipeline if there's an index using it.
Pinging @elastic/es-data-management (Team:Data Management) |
A bunch of the lines of code changed here were just whitespace twiddling in the tests, I'd recommend reviewing this PR with |
final DocWriteRequest<?> docWriteRequest = bulkRequest.requests().get(slot); | ||
final String id = Objects.requireNonNullElse(docWriteRequest.id(), DROPPED_OR_FAILED_ITEM_WITH_AUTO_GENERATED_ID); | ||
failedSlots.set(slot); | ||
UpdateResponse dropped = new UpdateResponse( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this an UpdateResponse? I see that it was previously, but it leads to some weird things, like getting an UpdateResponse from an IndexRequest (which you also had to account for in this PR).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think IndexResponse lets us express anything except CREATED
or UPDATED
:
private static Result assertCreatedOrUpdated(Result result) {
assert result == Result.CREATED || result == Result.UPDATED;
return result;
}
But yeah, it is inconvenient (and also ancient!).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some discussion of this over at #99964.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Closes #36746
new ShardId(String index, ...)
checks that theString
arguments it receives are notnull
. When an upsert and adrop
processor meet, the thing returned fromgetIndexWriteRequest(...)
won't have the index populated, so we end up passingnull
where it can't go.But we don't need to call
getIndexWriteRequest(...)
at all! ThedocWriteRequest
argument to it already has everything we need or so I think (feel free to dis-abuse me of this notion).WIP because I have cleared all out my test changes and added some extra asserts. Once I get a green run from Jenkins on that, I'll revert the revert and get my new tests back (but lose the extra asserts). After that I might still add a test or two.