Skip to content

Commit

Permalink
make file scan task serializable (apache#377)
Browse files Browse the repository at this point in the history
Co-authored-by: ZENOTME <st810918843@gmail.com>
  • Loading branch information
2 people authored and shaeqahmed committed Dec 9, 2024
1 parent d10586a commit 32424e8
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 13 deletions.
2 changes: 1 addition & 1 deletion crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl ArrowReader {
Ok(try_stream! {
while let Some(Ok(task)) = tasks.next().await {
let parquet_file = file_io
.new_input(task.data().data_file().file_path())?;
.new_input(task.data_file_path())?;
let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?;
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub mod avro;
pub mod io;
pub mod spec;

mod scan;
pub mod scan;

#[allow(dead_code)]
pub mod expr;
Expand Down
24 changes: 13 additions & 11 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
use crate::expr::{Bind, BoundPredicate, Predicate};
use crate::io::FileIO;
use crate::spec::{
DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, Schema, SchemaRef,
SnapshotRef, TableMetadataRef,
DataContentType, ManifestContentType, ManifestFile, Schema, SchemaRef, SnapshotRef,
TableMetadataRef,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
use arrow_array::RecordBatch;
use async_stream::try_stream;
use futures::stream::BoxStream;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -55,7 +56,7 @@ pub struct TableScanBuilder<'a> {
}

impl<'a> TableScanBuilder<'a> {
pub fn new(table: &'a Table) -> Self {
pub(crate) fn new(table: &'a Table) -> Self {
Self {
table,
column_names: vec![],
Expand Down Expand Up @@ -265,7 +266,7 @@ impl TableScan {
}
DataContentType::Data => {
let scan_task: Result<FileScanTask> = Ok(FileScanTask {
data_manifest_entry: manifest_entry.clone(),
data_file_path: manifest_entry.data_file().file_path().to_string(),
start: 0,
length: manifest_entry.file_size_in_bytes(),
});
Expand Down Expand Up @@ -463,18 +464,19 @@ impl ManifestEvaluatorCache {
}

/// A task to scan part of file.
#[derive(Debug)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileScanTask {
data_manifest_entry: ManifestEntryRef,
data_file_path: String,
#[allow(dead_code)]
start: u64,
#[allow(dead_code)]
length: u64,
}

impl FileScanTask {
pub fn data(&self) -> ManifestEntryRef {
self.data_manifest_entry.clone()
/// Returns the data file path of this file scan task.
pub fn data_file_path(&self) -> &str {
&self.data_file_path
}
}

Expand Down Expand Up @@ -794,17 +796,17 @@ mod tests {

assert_eq!(tasks.len(), 2);

tasks.sort_by_key(|t| t.data().data_file().file_path().to_string());
tasks.sort_by_key(|t| t.data_file_path().to_string());

// Check first task is added data file
assert_eq!(
tasks[0].data().data_file().file_path(),
tasks[0].data_file_path(),
format!("{}/1.parquet", &fixture.table_location)
);

// Check second task is existing data file
assert_eq!(
tasks[1].data().data_file().file_path(),
tasks[1].data_file_path(),
format!("{}/3.parquet", &fixture.table_location)
);
}
Expand Down

0 comments on commit 32424e8

Please sign in to comment.