-
Notifications
You must be signed in to change notification settings - Fork 167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RUST-1149 Prose tests for change streams. #561
Merged
abr-egn
merged 34 commits into
mongodb:master
from
abr-egn:RUST-523/change-stream-tests
Jan 24, 2022
Merged
Changes from all commits
Commits
Show all changes
34 commits
Select commit
Hold shift + click to select a range
d46f32d
wip
abr-egn 7b9b5e9
fix change stream event source; debugging
abr-egn 2d87c08
parsed tokens
abr-egn 80d925a
prose test 1
abr-egn 38dac23
error on missing token
abr-egn 0190ab0
prose test 2
abr-egn e78667e
prose 3 wip
abr-egn 279e719
wip debugging
abr-egn f382596
test 3
abr-egn 1b355ad
test 4
abr-egn f30f089
test 7
abr-egn 3e75457
test 8
abr-egn 03103d8
topology
abr-egn c597828
prose test 9 wip
abr-egn e0effa7
tidy
abr-egn 1257098
make tracks_resume_token resilient to server logic
abr-egn 50e735b
test 9
abr-egn 3247eb3
test 11
abr-egn 0d5638d
test 12
abr-egn 57b687c
test 13
abr-egn 5a6d9b5
test 14
abr-egn 00f51dd
test 17
abr-egn 1e4fd63
test 18
abr-egn 3763a00
fix tests 17 & 18
abr-egn def2405
cmt and flippy
abr-egn 8a0f54a
fix empty_batch_not_closed
abr-egn 05f43e6
fix resume_uses_*
abr-egn 3b02c79
fix tracks_resume_token
abr-egn 7323d3c
rustfmt
abr-egn 2abb868
tweak test client options
abr-egn c2c2ee0
fmt
abr-egn f5d65a5
non_exhaustive
abr-egn 2f4e3d8
assert a resume
abr-egn a2196ac
merge fail points
abr-egn File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,7 @@ use crate::{ | |
options::ChangeStreamOptions, | ||
}, | ||
cursor::{stream_poll_next, BatchValue, CursorStream, NextInBatchFuture}, | ||
error::{Error, Result}, | ||
error::{Error, ErrorKind, Result}, | ||
operation::AggregateTarget, | ||
options::AggregateOptions, | ||
selection_criteria::{ReadPreference, SelectionCriteria}, | ||
|
@@ -220,10 +220,14 @@ fn get_resume_token( | |
) -> Result<Option<ResumeToken>> { | ||
Ok(match batch_value { | ||
BatchValue::Some { doc, is_last } => { | ||
let doc_token = match doc.get("_id")? { | ||
Some(val) => ResumeToken(val.to_raw_bson()), | ||
None => return Err(ErrorKind::MissingResumeToken.into()), | ||
}; | ||
if *is_last && batch_token.is_some() { | ||
batch_token.cloned() | ||
} else { | ||
doc.get("_id")?.map(|val| ResumeToken(val.to_raw_bson())) | ||
Some(doc_token) | ||
} | ||
} | ||
BatchValue::Empty => batch_token.cloned(), | ||
|
@@ -236,50 +240,56 @@ where | |
T: DeserializeOwned + Unpin + Send + Sync, | ||
{ | ||
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> { | ||
if let Some(mut pending) = self.pending_resume.take() { | ||
match Pin::new(&mut pending).poll(cx) { | ||
Poll::Pending => { | ||
self.pending_resume = Some(pending); | ||
return Poll::Pending; | ||
loop { | ||
if let Some(mut pending) = self.pending_resume.take() { | ||
match Pin::new(&mut pending).poll(cx) { | ||
Poll::Pending => { | ||
self.pending_resume = Some(pending); | ||
return Poll::Pending; | ||
} | ||
Poll::Ready(Ok(new_stream)) => { | ||
// Ensure that the old cursor is killed on the server selected for the new | ||
// one. | ||
self.cursor | ||
.set_drop_address(new_stream.cursor.address().clone()); | ||
self.cursor = new_stream.cursor; | ||
self.args = new_stream.args; | ||
continue; | ||
} | ||
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), | ||
} | ||
Poll::Ready(Ok(new_stream)) => { | ||
// Ensure that the old cursor is killed on the server selected for the new one. | ||
self.cursor | ||
.set_drop_address(new_stream.cursor.address().clone()); | ||
self.cursor = new_stream.cursor; | ||
self.args = new_stream.args; | ||
return Poll::Pending; | ||
} | ||
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), | ||
} | ||
} | ||
let out = self.cursor.poll_next_in_batch(cx); | ||
match &out { | ||
Poll::Ready(Ok(bv)) => { | ||
if let Some(token) = get_resume_token(bv, self.cursor.post_batch_resume_token())? { | ||
self.data.resume_token = Some(token); | ||
let out = self.cursor.poll_next_in_batch(cx); | ||
match &out { | ||
Poll::Ready(Ok(bv)) => { | ||
if let Some(token) = | ||
get_resume_token(bv, self.cursor.post_batch_resume_token())? | ||
{ | ||
self.data.resume_token = Some(token); | ||
} | ||
if matches!(bv, BatchValue::Some { .. }) { | ||
self.data.document_returned = true; | ||
} | ||
} | ||
if matches!(bv, BatchValue::Some { .. }) { | ||
self.data.document_returned = true; | ||
Poll::Ready(Err(e)) if e.is_resumable() && !self.data.resume_attempted => { | ||
self.data.resume_attempted = true; | ||
let client = self.cursor.client().clone(); | ||
let args = self.args.clone(); | ||
let mut data = self.data.take(); | ||
data.implicit_session = self.cursor.take_implicit_session(); | ||
self.pending_resume = Some(Box::pin(async move { | ||
let new_stream: Result<ChangeStream<ChangeStreamEvent<()>>> = client | ||
.execute_watch(args.pipeline, args.options, args.target, Some(data)) | ||
.await; | ||
new_stream.map(|cs| cs.with_type::<T>()) | ||
})); | ||
// Iterate the loop so the new future gets polled and can register wakers. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was the point of wrapping the logic in a loop - there needs to be an underlying future that's been polled rather than just returning |
||
continue; | ||
} | ||
_ => {} | ||
} | ||
Poll::Ready(Err(e)) if e.is_resumable() && !self.data.resume_attempted => { | ||
self.data.resume_attempted = true; | ||
let client = self.cursor.client().clone(); | ||
let args = self.args.clone(); | ||
let mut data = self.data.take(); | ||
data.implicit_session = self.cursor.take_implicit_session(); | ||
self.pending_resume = Some(Box::pin(async move { | ||
let new_stream: Result<ChangeStream<ChangeStreamEvent<()>>> = client | ||
.execute_watch(args.pipeline, args.options, args.target, Some(data)) | ||
.await; | ||
new_stream.map(|cs| cs.with_type::<T>()) | ||
})); | ||
return Poll::Pending; | ||
} | ||
_ => {} | ||
return out; | ||
} | ||
out | ||
} | ||
} | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible this could change in the future? if so, we should keep it
non_exhaustive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, done.