Skip to content

Commit

Permalink
don't keep Arc<Statistic> in PartitionedFile.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Aug 6, 2024
1 parent 2b56774 commit 56cc8ea
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 10 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub struct PartitionedFile {
///
/// DataFusion relies on these statistics for planning (in particular to sort file groups),
/// so if they are incorrect, incorrect answers may result.
pub statistics: Option<Arc<Statistics>>,
pub statistics: Option<Statistics>,
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}
Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,11 +973,10 @@ impl ListingTable {
// collect the statistics if required by the config
let files = file_list
.map(|part_file| async {
let mut part_file = part_file?;
let part_file = part_file?;
if self.options.collect_stat {
let statistics =
self.do_collect_statistics(ctx, &store, &part_file).await?;
part_file.statistics = Some(statistics.clone());
Ok((part_file, statistics))
} else {
Ok((
Expand Down Expand Up @@ -1014,8 +1013,7 @@ impl ListingTable {
store: &Arc<dyn ObjectStore>,
part_file: &PartitionedFile,
) -> Result<Arc<Statistics>> {
let statistics_cache = self.collected_statistics.clone();
match statistics_cache
match self.collected_statistics
.get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
{
Some(statistics) => Ok(statistics.clone()),
Expand All @@ -1031,7 +1029,7 @@ impl ListingTable {
)
.await?;
let statistics = Arc::new(statistics);
statistics_cache.put_with_extra(
self.collected_statistics.put_with_extra(
&part_file.object_meta.location,
statistics.clone(),
&part_file.object_meta,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,7 @@ mod tests {
},
partition_values: vec![ScalarValue::from(file.date)],
range: None,
statistics: Some(Arc::new(Statistics {
statistics: Some(Statistics {
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
column_statistics: file
Expand All @@ -1213,7 +1213,7 @@ mod tests {
.unwrap_or_default()
})
.collect::<Vec<_>>(),
})),
}),
extensions: None,
}
}
Expand Down
7 changes: 5 additions & 2 deletions datafusion/core/src/datasource/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ pub async fn get_statistics_with_limit(
let mut all_files = Box::pin(all_files.fuse());

if let Some(first_file) = all_files.next().await {
let (file, file_stats) = first_file?;
let (mut file, file_stats) = first_file?;
file.statistics = Some(file_stats.as_ref().clone());
result_files.push(file);


// First file, we set them directly from the file statistics.
num_rows = file_stats.num_rows.clone();
total_byte_size = file_stats.total_byte_size.clone();
Expand All @@ -81,7 +83,8 @@ pub async fn get_statistics_with_limit(
};
if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
while let Some(current) = all_files.next().await {
let (file, file_stats) = current?;
let (mut file, file_stats) = current?;
file.statistics = Some(file_stats.as_ref().clone());
result_files.push(file);
if !collect_stats {
continue;
Expand Down

0 comments on commit 56cc8ea

Please sign in to comment.