-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add SessionContext::read_batches
#9197
Conversation
) -> Result<DataFrame> { | ||
// check schema uniqueness | ||
let mut batches = batches.into_iter().peekable(); | ||
let schema: SchemaRef = batches.peek().unwrap().schema().clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any way we can avoid an unwrap here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps something like
let schema: SchemaRef = batches.peek().unwrap().schema().clone(); | |
let schema: SchemaRef = if let Some(schema) = batches.peek() { | |
schema.clone() | |
} else { | |
// use empty schema) | |
Arc::new(Schema::new()) | |
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it!
SessionContext::record_batches
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much @Lordworms 🙏
As @Jefffrey pointed out, I think the only thing left for this PR is to handle the case with zero batches and add a test for that case too
) -> Result<DataFrame> { | ||
// check schema uniqueness | ||
let mut batches = batches.into_iter().peekable(); | ||
let schema: SchemaRef = batches.peek().unwrap().schema().clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps something like
let schema: SchemaRef = batches.peek().unwrap().schema().clone(); | |
let schema: SchemaRef = if let Some(schema) = batches.peek() { | |
schema.clone() | |
} else { | |
// use empty schema) | |
Arc::new(Schema::new()) | |
}; |
], | ||
) | ||
.unwrap(), | ||
RecordBatch::try_new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: I suggest making the test shorter by feeding in 2 batches rather than 5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @Lordworms -- this looks great.
@@ -1489,7 +1489,7 @@ async fn test_read_batches_empty() -> Result<()> { | |||
let state = SessionState::new_with_config_rt(config, runtime); | |||
let ctx = SessionContext::new_with_state(state); | |||
|
|||
let schema = Arc::new(Schema::new(vec![ | |||
let _schema = Arc::new(Schema::new(vec![ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could simply delete this statement entirely as the Schema is not needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
CI failure seems unrelated -- I retriggered it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good, just have a minor question 👍
let provider = | ||
MemTable::try_new(schema, batches.map(|batch| vec![batch]).collect())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, is there any difference to mapping each batch into its own vec (which seemingly represents a different partition?) vs just collecting the batches into a single vec and passing in a single vec?
e.g. something like
let provider = | |
MemTable::try_new(schema, batches.map(|batch| vec![batch]).collect())?; | |
let provider = | |
MemTable::try_new(schema, vec![batches.collect()])?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a difference (as you say the code in this PR makes its own partition). I think you are right that a single partition might be better (and DataFusion will repartition the plan into multiple partitions) if necessary
Is this something you can do @Lordworms ? Otherwise we can merge this PR as is and make it as a follow on change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, so what we need to do is to flatten all the batches into a vec? I wonder in what situation the datafusion would do the repartition? Is there any doc for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TLDR is that it is added by https://docs.rs/datafusion/latest/datafusion/physical_optimizer/enforce_distribution/struct.EnforceDistribution.html -- the rules about when it happens are non trivial but basically are "when it helps"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I would read about it and commit the changes soon.
Thanks for this 👍 |
SessionContext::record_batches
SessionContext::read_batches
Which issue does this PR close?
Closes #9157
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?