Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into interleave-bloom
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jun 21, 2024
2 parents ed9e576 + 29c07d0 commit abd81ef
Show file tree
Hide file tree
Showing 27 changed files with 825 additions and 134 deletions.
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,19 @@ versions approximately every 2 months.

## Related Projects

There are two related crates in different repositories
There are several related crates in different repositories

| Crate | Description | Documentation |
| -------------- | --------------------------------------- | ----------------------------- |
| [`datafusion`] | In-memory query engine with SQL support | [(README)][datafusion-readme] |
| [`ballista`] | Distributed query execution | [(README)][ballista-readme] |
| Crate | Description | Documentation |
| ------------------------ | ------------------------------------------- | --------------------------------------- |
| [`datafusion`] | In-memory query engine with SQL support | [(README)][datafusion-readme] |
| [`ballista`] | Distributed query execution | [(README)][ballista-readme] |
| [`object_store_opendal`] | Use [`opendal`] as [`object_store`] backend | [(README)][object_store_opendal-readme] |

[`datafusion`]: https://crates.io/crates/datafusion
[`ballista`]: https://crates.io/crates/ballista
[`object_store_opendal`]: https://crates.io/crates/object_store_opendal
[`opendal`]: https://crates.io/crates/opendal
[`object_store_opendal-readme`]: https://github.com/apache/opendal/blob/main/integrations/object_store/README.md

Collectively, these crates support a wider array of functionality for analytic computations in Rust.

Expand Down
8 changes: 8 additions & 0 deletions arrow-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,11 @@ criterion = { version = "0.5", default-features = false }
[[bench]]
name = "occupancy"
harness = false

[[bench]]
name = "gc_view_types"
harness = false

[[bench]]
name = "fixed_size_list_array"
harness = false
51 changes: 51 additions & 0 deletions arrow-array/benches/fixed_size_list_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::{Array, FixedSizeListArray, Int32Array};
use arrow_schema::Field;
use criterion::*;
use rand::{thread_rng, Rng};
use std::sync::Arc;

fn gen_fsl(len: usize, value_len: usize) -> FixedSizeListArray {
let mut rng = thread_rng();
let values = Arc::new(Int32Array::from(
(0..len).map(|_| rng.gen::<i32>()).collect::<Vec<_>>(),
));
let field = Arc::new(Field::new("item", values.data_type().clone(), true));
FixedSizeListArray::new(field, value_len as i32, values, None)
}

fn criterion_benchmark(c: &mut Criterion) {
let len = 4096;
for value_len in [1, 32, 1024] {
let fsl = gen_fsl(len, value_len);
c.bench_function(
&format!("fixed_size_list_array(len: {len}, value_len: {value_len})"),
|b| {
b.iter(|| {
for i in 0..len / value_len {
black_box(fsl.value(i));
}
});
},
);
}
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
48 changes: 48 additions & 0 deletions arrow-array/benches/gc_view_types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::StringViewArray;
use criterion::*;

fn gen_view_array(size: usize) -> StringViewArray {
StringViewArray::from_iter((0..size).map(|v| match v % 3 {
0 => Some("small"),
1 => Some("larger than 12 bytes array"),
2 => None,
_ => unreachable!("unreachable"),
}))
}

fn criterion_benchmark(c: &mut Criterion) {
let array = gen_view_array(100_000);

c.bench_function("gc view types all", |b| {
b.iter(|| {
black_box(array.gc());
});
});

let sliced = array.slice(0, 100_000 / 2);
c.bench_function("gc view types slice half", |b| {
b.iter(|| {
black_box(sliced.gc());
});
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
106 changes: 103 additions & 3 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::builder::GenericByteViewBuilder;
use crate::iterator::ArrayIter;
use crate::types::bytes::ByteArrayNativeType;
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
use crate::{Array, ArrayAccessor, ArrayRef};
use crate::{Array, ArrayAccessor, ArrayRef, Scalar};
use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer};
use arrow_data::{ArrayData, ArrayDataBuilder, ByteView};
use arrow_schema::{ArrowError, DataType};
Expand Down Expand Up @@ -186,6 +186,11 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
}
}

/// Create a new [`Scalar`] from `value`
pub fn new_scalar(value: impl AsRef<T::Native>) -> Scalar<Self> {
Scalar::new(Self::from_iter_values(std::iter::once(value)))
}

/// Creates a [`GenericByteViewArray`] based on an iterator of values without nulls
pub fn from_iter_values<Ptr, I>(iter: I) -> Self
where
Expand Down Expand Up @@ -239,8 +244,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
let v = self.views.get_unchecked(idx);
let len = *v as u32;
let b = if len <= 12 {
let ptr = self.views.as_ptr() as *const u8;
std::slice::from_raw_parts(ptr.add(idx * 16 + 4), len as usize)
Self::inline_value(v, len as usize)
} else {
let view = ByteView::from(*v);
let data = self.buffers.get_unchecked(view.buffer_index as usize);
Expand All @@ -250,6 +254,17 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
T::Native::from_bytes_unchecked(b)
}

/// Returns the inline value of the view.
///
/// # Safety
/// - The `view` must be a valid element from `Self::views()` that adheres to the view layout.
/// - The `len` must be the length of the inlined value. It should never be larger than 12.
#[inline(always)]
pub unsafe fn inline_value(view: &u128, len: usize) -> &[u8] {
debug_assert!(len <= 12);
std::slice::from_raw_parts((view as *const u128 as *const u8).wrapping_add(4), len)
}

/// constructs a new iterator
pub fn iter(&self) -> ArrayIter<&Self> {
ArrayIter::new(self)
Expand All @@ -265,6 +280,56 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
phantom: Default::default(),
}
}

/// Returns a "compacted" version of this array
///
/// The original array will *not* be modified
///
/// # Garbage Collection
///
/// Before GC:
/// ```text
/// ┌──────┐
/// │......│
/// │......│
/// ┌────────────────────┐ ┌ ─ ─ ─ ▶ │Data1 │ Large buffer
/// │ View 1 │─ ─ ─ ─ │......│ with data that
/// ├────────────────────┤ │......│ is not referred
/// │ View 2 │─ ─ ─ ─ ─ ─ ─ ─▶ │Data2 │ to by View 1 or
/// └────────────────────┘ │......│ View 2
/// │......│
/// 2 views, refer to │......│
/// small portions of a └──────┘
/// large buffer
/// ```
///
/// After GC:
///
/// ```text
/// ┌────────────────────┐ ┌─────┐ After gc, only
/// │ View 1 │─ ─ ─ ─ ─ ─ ─ ─▶ │Data1│ data that is
/// ├────────────────────┤ ┌ ─ ─ ─ ▶ │Data2│ pointed to by
/// │ View 2 │─ ─ ─ ─ └─────┘ the views is
/// └────────────────────┘ left
///
///
/// 2 views
/// ```
/// This method will compact the data buffers by recreating the view array and only include the data
/// that is pointed to by the views.
///
/// Note that it will copy the array regardless of whether the original array is compact.
/// Use with caution as this can be an expensive operation, only use it when you are sure that the view
/// array is significantly smaller than when it is originally created, e.g., after filtering or slicing.
pub fn gc(&self) -> Self {
let mut builder = GenericByteViewBuilder::<T>::with_capacity(self.len());

for v in self.iter() {
builder.append_option(v);
}

builder.finish()
}
}

impl<T: ByteViewType + ?Sized> Debug for GenericByteViewArray<T> {
Expand Down Expand Up @@ -645,4 +710,39 @@ mod tests {

StringViewArray::new(views, buffers, None);
}

#[test]
fn test_gc() {
let test_data = [
Some("longer than 12 bytes"),
Some("short"),
Some("t"),
Some("longer than 12 bytes"),
None,
Some("short"),
];

let array = {
let mut builder = StringViewBuilder::new().with_block_size(8); // create multiple buffers
test_data.into_iter().for_each(|v| builder.append_option(v));
builder.finish()
};
assert!(array.buffers.len() > 1);

fn check_gc(to_test: &StringViewArray) {
let gc = to_test.gc();
assert_ne!(to_test.data_buffers().len(), gc.data_buffers().len());

to_test.iter().zip(gc.iter()).for_each(|(a, b)| {
assert_eq!(a, b);
});
assert_eq!(to_test.len(), gc.len());
}

check_gc(&array);
check_gc(&array.slice(1, 3));
check_gc(&array.slice(2, 1));
check_gc(&array.slice(2, 2));
check_gc(&array.slice(3, 1));
}
}
8 changes: 4 additions & 4 deletions arrow-array/src/array/fixed_size_list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,15 +245,15 @@ impl FixedSizeListArray {
/// Returns ith value of this list array.
pub fn value(&self, i: usize) -> ArrayRef {
self.values
.slice(self.value_offset(i) as usize, self.value_length() as usize)
.slice(self.value_offset_at(i), self.value_length() as usize)
}

/// Returns the offset for value at index `i`.
///
/// Note this doesn't do any bound checking, for performance reason.
#[inline]
pub fn value_offset(&self, i: usize) -> i32 {
self.value_offset_at(i)
self.value_offset_at(i) as i32
}

/// Returns the length for an element.
Expand All @@ -265,8 +265,8 @@ impl FixedSizeListArray {
}

#[inline]
const fn value_offset_at(&self, i: usize) -> i32 {
i as i32 * self.value_length
const fn value_offset_at(&self, i: usize) -> usize {
i * self.value_length as usize
}

/// Returns a zero-copy slice of this array with the indicated offset and length.
Expand Down
Loading

0 comments on commit abd81ef

Please sign in to comment.