Skip to content

Commit

Permalink
Fix parquet reads with limit across row groups
Browse files Browse the repository at this point in the history
  • Loading branch information
desmondcheongzx committed Aug 27, 2024
1 parent 457289b commit e55282e
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/daft-parquet/src/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ pub(crate) fn local_parquet_read_into_arrow(
rg,
schema.fields.clone(),
Some(chunk_size),
num_rows,
Some(rg_range.num_rows),
None,
);
let single_rg_column_iter = single_rg_column_iter?;
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/io/parquet/test_reads_public_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ def get_filesystem_from_path(path: str, **kwargs) -> fsspec.AbstractFileSystem:
"parquet-benchmarking/s3a-mvp",
"s3a://daft-public-data/test_fixtures/parquet-dev/mvp.parquet",
),
(
"parquet/test_parquet_limits_across_row_groups",
"s3://daft-public-data/test_fixtures/parquet-dev/tpch-issue#2730.parquet",
),
(
"azure/mvp/az",
"az://public-anonymous/mvp.parquet",
Expand Down
24 changes: 22 additions & 2 deletions tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import tempfile
import uuid

import numpy as np
import pyarrow as pa
import pyarrow.parquet as papq
import pytest
Expand Down Expand Up @@ -159,8 +160,6 @@ def test_row_groups():
@pytest.mark.integration()
@pytest.mark.parametrize("chunk_size", [5, 1024, 2048, 4096])
def test_parquet_rows_cross_page_boundaries(tmpdir, minio_io_config, chunk_size):
import numpy as np

int64_min = -(2**63)
int64_max = 2**63 - 1

Expand Down Expand Up @@ -346,3 +345,24 @@ def test_parquet_helper(data_and_type, use_daft_writer):
# One column uses a single dictionary-encoded data page, and the other contains data pages with
# 4096 values each.
test_parquet_helper(get_string_data_and_type(8192, 300, 1), True)


def test_parquet_limits_across_row_groups(tmpdir):
row_group_size = 1024
int_array = np.full(shape=4096, fill_value=3, dtype=np.int32)
before = pa.Table.from_arrays(
[
pa.array(int_array, type=pa.int32()),
],
names=["col"],
)
file_path = f"{tmpdir}/{str(uuid.uuid4())}.parquet"
papq.write_table(before, file_path, row_group_size=row_group_size)
assert (
before.take(list(range(min(before.num_rows, row_group_size + 10)))).to_pydict()
== daft.read_parquet(file_path).limit(row_group_size + 10).to_pydict()
)
assert (
before.take(list(range(min(before.num_rows, row_group_size * 2)))).to_pydict()
== daft.read_parquet(file_path).limit(row_group_size * 2).to_pydict()
)

0 comments on commit e55282e

Please sign in to comment.