Skip to content

Commit

Permalink
ARROW-14658: [C++] Disable fine-grained projection for now
Browse files Browse the repository at this point in the history
  • Loading branch information
lidavidm committed Nov 29, 2021
1 parent a13ee91 commit 3bc7ac8
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 14 deletions.
9 changes: 8 additions & 1 deletion cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ Status ResolveOneFieldRef(
return Status::OK();
}

const SchemaField* toplevel = nullptr;
const SchemaField* field = nullptr;
if (const std::vector<FieldRef>* refs = field_ref.nested_refs()) {
// Only supports a sequence of names
Expand All @@ -260,6 +261,7 @@ Status ResolveOneFieldRef(
auto it = field_lookup.find(*name);
if (it != field_lookup.end()) {
field = it->second;
toplevel = field;
} else if (duplicate_fields.find(*name) != duplicate_fields.end()) {
return Status::Invalid("Ambiguous reference to column '", *name,
"' which occurs more than once");
Expand Down Expand Up @@ -297,7 +299,12 @@ Status ResolveOneFieldRef(
}

if (field) {
AddColumnIndices(*field, columns_selection);
// TODO(ARROW-1888): support fine-grained column projection. We should be
// able to materialize only the child fields requested, and not the entire
// top-level field.
// Right now, if enabled, projection/filtering will fail when they cast the
// physical schema to the dataset schema.
AddColumnIndices(*toplevel, columns_selection);
}
return Status::OK();
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ TEST_P(TestParquetFileFormatScan, ScanRecordBatchReader) { TestScan(); }
TEST_P(TestParquetFileFormatScan, ScanBatchSize) { TestScanBatchSize(); }
TEST_P(TestParquetFileFormatScan, ScanRecordBatchReaderProjected) { TestScanProjected(); }
TEST_P(TestParquetFileFormatScan, ScanRecordBatchReaderProjectedNested) {
TestScanProjectedNested(/*fine_grained_selection=*/true);
// TODO(ARROW-1888): enable fine-grained column projection.
TestScanProjectedNested(/*fine_grained_selection=*/false);
}
TEST_P(TestParquetFileFormatScan, ScanRecordBatchReaderProjectedMissingCols) {
TestScanProjectedMissingCols();
Expand Down
42 changes: 30 additions & 12 deletions cpp/src/arrow/dataset/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -664,18 +664,22 @@ class FileFormatScanMixin : public FileFormatFixtureMixin<FormatHelper>,
auto struct2 = field("struct2", struct_({f64, i64, struct1}));
this->SetSchema({struct1, struct2, f32, f64, i32, i64});
this->ProjectNested({".struct1.f32", ".struct2.struct1", ".struct2.struct1.f32"});
this->SetFilter(equal(field_ref(FieldRef("struct2", "i64")), literal(0)));
this->SetFilter(greater_equal(field_ref(FieldRef("struct2", "i64")), literal(0)));

std::shared_ptr<Schema> expected_schema;
std::shared_ptr<Schema> physical_schema;
if (fine_grained_selection) {
// Some formats, like Parquet, let you pluck only a part of a complex type
expected_schema = schema({
field("struct1", struct_({f32})),
field("struct2", struct_({i64, struct1})),
});
physical_schema = schema(
{field("struct1", struct_({f32})), field("struct2", struct_({i64, struct1}))});
} else {
expected_schema = schema({struct1, struct2});
// Otherwise, the entire top-level field is returned
physical_schema = schema({struct1, struct2});
}
std::shared_ptr<Schema> projected_schema = schema({
field(".struct1.f32", float32()),
field(".struct2.struct1", struct1->type()),
field(".struct2.struct1.f32", float32()),
});

{
auto reader = this->GetRecordBatchReader(opts_->dataset_schema);
Expand All @@ -688,13 +692,27 @@ class FileFormatScanMixin : public FileFormatFixtureMixin<FormatHelper>,
row_count += batch->num_rows();
ASSERT_THAT(
batch->schema()->fields(),
::testing::UnorderedPointwise(PointeesEquals(), expected_schema->fields()))
::testing::UnorderedPointwise(PointeesEquals(), physical_schema->fields()))
<< "EXPECTED:\n"
<< expected_schema->ToString() << "\nACTUAL:\n"
<< physical_schema->ToString() << "\nACTUAL:\n"
<< batch->schema()->ToString();
}
ASSERT_EQ(row_count, expected_rows());
}
{
auto reader = this->GetRecordBatchReader(opts_->dataset_schema);
auto source = this->GetFileSource(reader.get());
auto fragment = this->MakeFragment(*source);

int64_t row_count = 0;
for (auto maybe_batch : Batches(fragment)) {
ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
row_count += batch->num_rows();
AssertSchemaEqual(*batch->schema(), *projected_schema, /*check_metadata=*/false);
}
ASSERT_LE(row_count, expected_rows());
ASSERT_GT(row_count, 0);
}
{
// File includes a duplicated name in struct2
auto struct2_physical = field("struct2", struct_({f64, i64, struct1, i64}));
Expand All @@ -715,17 +733,17 @@ class FileFormatScanMixin : public FileFormatFixtureMixin<FormatHelper>,
auto source = this->GetFileSource(reader.get());
auto fragment = this->MakeFragment(*source);

expected_schema = schema({expected_schema->field(1)});
physical_schema = schema({physical_schema->field(1)});

int64_t row_count = 0;
for (auto maybe_batch : PhysicalBatches(fragment)) {
ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
row_count += batch->num_rows();
ASSERT_THAT(
batch->schema()->fields(),
::testing::UnorderedPointwise(PointeesEquals(), expected_schema->fields()))
::testing::UnorderedPointwise(PointeesEquals(), physical_schema->fields()))
<< "EXPECTED:\n"
<< expected_schema->ToString() << "\nACTUAL:\n"
<< physical_schema->ToString() << "\nACTUAL:\n"
<< batch->schema()->ToString();
}
ASSERT_EQ(row_count, expected_rows());
Expand Down

0 comments on commit 3bc7ac8

Please sign in to comment.