Skip to content

Commit

Permalink
feat: reconnect pushdown to v2 (#2913)
Browse files Browse the repository at this point in the history
There is still a bit more testing work to do before pushdown is fully
supported in v2 and until we start using `LanceDfFieldDecoderStrategy`
in the file reader it won't be accessible to users. However, this PR has
a number of structural refactors for v2 and is big enough as it is.

This adds a cache to the v2 schedulers. This is needed in this PR
because we want to use the cache to store zone maps. However, it will be
needed in future 2.1 work as well because we want to cache things like
"rows per chunk" and "dictionaries".

This adds an initialization routine to v2 schedulers. Again, this is
needed for zone maps but will also be used by 2.1 features.

Lastly, this PR does, in fact, reconnect the zone maps feature,
restoring blocks that had been commented out.
  • Loading branch information
westonpace authored Sep 27, 2024
1 parent 7cc14d9 commit d97a93d
Show file tree
Hide file tree
Showing 29 changed files with 839 additions and 410 deletions.
12 changes: 9 additions & 3 deletions python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use arrow_schema::Schema as ArrowSchema;
use bytes::Bytes;
use futures::stream::StreamExt;
use lance::io::{ObjectStore, RecordBatchStream};
use lance_core::cache::FileMetadataCache;
use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression};
use lance_file::{
v2::{
Expand Down Expand Up @@ -331,9 +332,14 @@ impl LanceFileReader {
},
);
let file = scheduler.open_file(&path).await.infer_error()?;
let inner = FileReader::try_open(file, None, DecoderMiddlewareChain::default())
.await
.infer_error()?;
let inner = FileReader::try_open(
file,
None,
Arc::<DecoderMiddlewareChain>::default(),
&FileMetadataCache::no_cache(),
)
.await
.infer_error()?;
Ok(Self {
inner: Arc::new(inner),
})
Expand Down
89 changes: 78 additions & 11 deletions rust/lance-core/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures::Future;
use moka::sync::Cache;
use object_store::path::Path;

use crate::utils::path::LancePathExt;
use crate::Result;

pub const DEFAULT_INDEX_CACHE_SIZE: usize = 128;
Expand All @@ -21,7 +22,7 @@ type ArcAny = Arc<dyn Any + Send + Sync>;
#[derive(Clone)]
struct SizedRecord {
record: ArcAny,
size_accessor: Arc<dyn Fn(ArcAny) -> usize + Send + Sync>,
size_accessor: Arc<dyn Fn(&ArcAny) -> usize + Send + Sync>,
}

impl std::fmt::Debug for SizedRecord {
Expand All @@ -35,7 +36,7 @@ impl std::fmt::Debug for SizedRecord {
impl SizedRecord {
fn new<T: DeepSizeOf + Send + Sync + 'static>(record: Arc<T>) -> Self {
let size_accessor =
|record: ArcAny| -> usize { record.downcast_ref::<T>().unwrap().deep_size_of() };
|record: &ArcAny| -> usize { record.downcast_ref::<T>().unwrap().deep_size_of() };
Self {
record,
size_accessor: Arc::new(size_accessor),
Expand All @@ -48,38 +49,104 @@ impl SizedRecord {
/// The cache is keyed by the file path and the type of metadata.
#[derive(Clone, Debug)]
pub struct FileMetadataCache {
cache: Arc<Cache<(Path, TypeId), SizedRecord>>,
cache: Option<Arc<Cache<(Path, TypeId), SizedRecord>>>,
base_path: Option<Path>,
}

impl DeepSizeOf for FileMetadataCache {
fn deep_size_of_children(&self, _: &mut Context) -> usize {
self.cache
.iter()
.map(|(_, v)| (v.size_accessor)(v.record))
.sum()
.as_ref()
.map(|cache| {
cache
.iter()
.map(|(_, v)| (v.size_accessor)(&v.record))
.sum()
})
.unwrap_or(0)
}
}

pub enum CapacityMode {
Items,
Bytes,
}

impl FileMetadataCache {
/// Instantiates a new cache which, for legacy reasons, uses Items capacity mode.
pub fn new(capacity: usize) -> Self {
Self {
cache: Arc::new(Cache::new(capacity as u64)),
cache: Some(Arc::new(Cache::new(capacity as u64))),
base_path: None,
}
}

/// Instantiates a dummy cache that will never cache anything.
pub fn no_cache() -> Self {
Self {
cache: None,
base_path: None,
}
}

/// Instantiates a new cache with a given capacity and capacity mode.
pub fn with_capacity(capacity: usize, mode: CapacityMode) -> Self {
match mode {
CapacityMode::Items => Self::new(capacity),
CapacityMode::Bytes => Self {
cache: Some(Arc::new(
Cache::builder()
.weigher(|_, v: &SizedRecord| {
(v.size_accessor)(&v.record).try_into().unwrap_or(u32::MAX)
})
.build(),
)),
base_path: None,
},
}
}

/// Creates a new cache which shares the same underlying cache but prepends `base_path` to all
/// keys.
pub fn with_base_path(&self, base_path: Path) -> Self {
Self {
cache: self.cache.clone(),
base_path: Some(base_path),
}
}

pub fn size(&self) -> usize {
self.cache.entry_count() as usize
if let Some(cache) = self.cache.as_ref() {
cache.entry_count() as usize
} else {
0
}
}

pub fn get<T: Send + Sync + 'static>(&self, path: &Path) -> Option<Arc<T>> {
self.cache
let cache = self.cache.as_ref()?;
let temp: Path;
let path = if let Some(base_path) = &self.base_path {
temp = base_path.child_path(path);
&temp
} else {
path
};
cache
.get(&(path.to_owned(), TypeId::of::<T>()))
.map(|metadata| metadata.record.clone().downcast::<T>().unwrap())
}

pub fn insert<T: DeepSizeOf + Send + Sync + 'static>(&self, path: Path, metadata: Arc<T>) {
self.cache
.insert((path, TypeId::of::<T>()), SizedRecord::new(metadata));
let Some(cache) = self.cache.as_ref() else {
return;
};
let path = if let Some(base_path) = &self.base_path {
base_path.child_path(&path)
} else {
path
};
cache.insert((path, TypeId::of::<T>()), SizedRecord::new(metadata));
}

/// Get an item
Expand Down
1 change: 1 addition & 0 deletions rust/lance-core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod cpu;
pub mod deletion;
pub mod futures;
pub mod mask;
pub mod path;
pub mod testing;
pub mod tokio;
pub mod tracing;
18 changes: 18 additions & 0 deletions rust/lance-core/src/utils/path.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use object_store::path::Path;

pub trait LancePathExt {
fn child_path(&self, path: &Path) -> Path;
}

impl LancePathExt for Path {
fn child_path(&self, path: &Path) -> Path {
let mut new_path = self.clone();
for part in path.parts() {
new_path = path.child(part);
}
new_path
}
}
39 changes: 24 additions & 15 deletions rust/lance-datafusion/src/substrait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,22 +271,31 @@ pub async fn parse_substrait(expr: &[u8], input_schema: Arc<Schema>) -> Result<E
}),
}?;

let (substrait_schema, input_schema, index_mapping) =
remove_extension_types(envelope.base_schema.as_ref().unwrap(), input_schema.clone())?;
let (substrait_schema, input_schema) =
if envelope.base_schema.as_ref().unwrap().r#struct.is_some() {
let (substrait_schema, input_schema, index_mapping) = remove_extension_types(
envelope.base_schema.as_ref().unwrap(),
input_schema.clone(),
)?;

if substrait_schema.r#struct.as_ref().unwrap().types.len()
!= envelope
.base_schema
.as_ref()
.unwrap()
.r#struct
.as_ref()
.unwrap()
.types
.len()
{
remap_expr_references(&mut expr, &index_mapping)?;
}
if substrait_schema.r#struct.as_ref().unwrap().types.len()
!= envelope
.base_schema
.as_ref()
.unwrap()
.r#struct
.as_ref()
.unwrap()
.types
.len()
{
remap_expr_references(&mut expr, &index_mapping)?;
}

(substrait_schema, input_schema)
} else {
(envelope.base_schema.as_ref().unwrap().clone(), input_schema)
};

// Datafusion's substrait consumer only supports Plan (not ExtendedExpression) and so
// we need to create a dummy plan with a single project node
Expand Down
55 changes: 25 additions & 30 deletions rust/lance-encoding-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::sync::{Arc, Mutex};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};

use arrow_schema::DataType;
use lance_core::{
Expand All @@ -27,7 +30,7 @@ struct LanceDfFieldDecoderState {
rows_per_map: Option<u32>,
/// As we visit the decoding tree we populate this with the pushdown
/// information that is available.
zone_map_buffers: Vec<UnloadedPushdown>,
zone_map_buffers: HashMap<u32, UnloadedPushdown>,
}

/// This strategy is responsible for creating the field scheduler
Expand Down Expand Up @@ -60,15 +63,20 @@ impl LanceDfFieldDecoderStrategy {
if state.is_none() {
*state = Some(LanceDfFieldDecoderState {
rows_per_map: None,
zone_map_buffers: Vec::new(),
zone_map_buffers: HashMap::new(),
});
true
} else {
false
}
}

fn add_pushdown_field(&self, rows_per_map: u32, unloaded_pushdown: UnloadedPushdown) {
fn add_pushdown_field(
&self,
field: &Field,
rows_per_map: u32,
unloaded_pushdown: UnloadedPushdown,
) {
let mut state = self.state.lock().unwrap();
let state = state.as_mut().unwrap();
match state.rows_per_map {
Expand All @@ -79,7 +87,9 @@ impl LanceDfFieldDecoderStrategy {
state.rows_per_map = Some(rows_per_map);
}
}
state.zone_map_buffers.push(unloaded_pushdown);
state
.zone_map_buffers
.insert(field.id as u32, unloaded_pushdown);
}
}

Expand All @@ -96,55 +106,40 @@ impl FieldDecoderStrategy for LanceDfFieldDecoderStrategy {
)> {
let is_root = self.initialize();

if let Some((rows_per_map, unloaded_pushdown)) = extract_zone_info(
column_infos.next().unwrap(),
&field.data_type(),
chain.current_path(),
) {
if let Some((rows_per_map, unloaded_pushdown)) =
extract_zone_info(column_infos, &field.data_type(), chain.current_path())
{
// If there is pushdown info then record it and unwrap the
// pushdown encoding layer.
self.add_pushdown_field(rows_per_map, unloaded_pushdown);
self.add_pushdown_field(field, rows_per_map, unloaded_pushdown);
}
// Delegate to the rest of the chain to create the decoder
let (chain, next) = chain.next(field, column_infos, buffers)?;

// If this is the top level decoder then wrap it with our
// pushdown filtering scheduler.
let state = if is_root {
self.state.lock().unwrap().take()
} else {
None
};
let schema = self.schema.clone();
let _io = chain.io().clone();

let next = next?;
if is_root {
let state = state.unwrap();
let state = self.state.lock().unwrap().take().unwrap();
let schema = self.schema.clone();
let rows_per_map = state.rows_per_map;
let zone_map_buffers = state.zone_map_buffers;
let next = next?;
let num_rows = next.num_rows();
if rows_per_map.is_none() {
// No columns had any pushdown info
Ok((chain, Ok(next)))
} else {
let mut _scheduler = ZoneMapsFieldScheduler::new(
let scheduler = ZoneMapsFieldScheduler::new(
next,
schema,
zone_map_buffers,
rows_per_map.unwrap(),
num_rows,
);
// Load all the zone maps from disk
// TODO: it would be slightly more efficient to do this
// later when we know what columns are actually used
// for filtering.
// scheduler.initialize(io.as_ref()).await?;
// Ok(Arc::new(scheduler) as Arc<dyn FieldScheduler>)
todo!()
Ok((chain, Ok(Arc::new(scheduler))))
}
} else {
Ok((chain, Ok(next)))
Ok((chain, next))
}
}
}
Expand Down
Loading

0 comments on commit d97a93d

Please sign in to comment.