Skip to content

Commit

Permalink
ARROW-14658: [C++] Enable nested field refs in scanning
Browse files Browse the repository at this point in the history
  • Loading branch information
lidavidm committed Jan 14, 2022
1 parent 093fdad commit b12fb6e
Show file tree
Hide file tree
Showing 18 changed files with 695 additions and 124 deletions.
67 changes: 67 additions & 0 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,73 @@ TEST(ExecPlanExecution, SourceGroupedSum) {
}
}

TEST(ExecPlanExecution, NestedSourceFilter) {
for (bool parallel : {false, true}) {
SCOPED_TRACE(parallel ? "parallel/merged" : "serial");

auto input = MakeNestedBatches();
auto empty = ExecBatchFromJSON({input.schema->field(0)->type()}, R"([])");
auto expected = ExecBatchFromJSON({input.schema->field(0)->type()}, R"([
[{"i32": 5, "bool": null}],
[{"i32": 6, "bool": false}],
[{"i32": 7, "bool": false}]
])");

ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
AsyncGenerator<util::optional<ExecBatch>> sink_gen;

ASSERT_OK(Declaration::Sequence(
{
{"source", SourceNodeOptions{input.schema,
input.gen(parallel, /*slow=*/false)}},
{"filter", FilterNodeOptions{greater_equal(
field_ref(FieldRef("struct", "i32")), literal(5))}},
{"sink", SinkNodeOptions{&sink_gen}},
})
.AddToPlan(plan.get()));

ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
Finishes(ResultWith(UnorderedElementsAreArray({empty, expected}))));
}
}

TEST(ExecPlanExecution, NestedSourceProjectGroupedSum) {
for (bool parallel : {false, true}) {
SCOPED_TRACE(parallel ? "parallel/merged" : "serial");

auto input = MakeNestedBatches();
auto expected = ExecBatchFromJSON({int64(), boolean()}, R"([
[null, true],
[17, false],
[5, null]
])");

ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
AsyncGenerator<util::optional<ExecBatch>> sink_gen;

ASSERT_OK(
Declaration::Sequence(
{
{"source",
SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}},
{"project", ProjectNodeOptions{{
field_ref(FieldRef("struct", "i32")),
field_ref(FieldRef("struct", "bool")),
},
{"i32", "bool"}}},
{"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}},
/*targets=*/{"i32"},
/*names=*/{"sum(i32)"},
/*keys=*/{"bool"}}},
{"sink", SinkNodeOptions{&sink_gen}},
})
.AddToPlan(plan.get()));

ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
Finishes(ResultWith(UnorderedElementsAreArray({expected}))));
}
}

TEST(ExecPlanExecution, SourceFilterProjectGroupedSumFilter) {
for (bool parallel : {false, true}) {
SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/compute/exec/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,20 @@ BatchesWithSchema MakeBasicBatches() {
return out;
}

BatchesWithSchema MakeNestedBatches() {
auto ty = struct_({field("i32", int32()), field("bool", boolean())});
BatchesWithSchema out;
out.batches = {
ExecBatchFromJSON(
{ty},
R"([[{"i32": null, "bool": true}], [{"i32": 4, "bool": false}], [null]])"),
ExecBatchFromJSON(
{ty},
R"([[{"i32": 5, "bool": null}], [{"i32": 6, "bool": false}], [{"i32": 7, "bool": false}]])")};
out.schema = schema({field("struct", ty)});
return out;
}

BatchesWithSchema MakeRandomBatches(const std::shared_ptr<Schema>& schema,
int num_batches, int batch_size) {
BatchesWithSchema out;
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/compute/exec/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ Future<std::vector<ExecBatch>> StartAndCollect(
ARROW_TESTING_EXPORT
BatchesWithSchema MakeBasicBatches();

ARROW_TESTING_EXPORT
BatchesWithSchema MakeNestedBatches();

ARROW_TESTING_EXPORT
BatchesWithSchema MakeRandomBatches(const std::shared_ptr<Schema>& schema,
int num_batches = 10, int batch_size = 4);
Expand Down
25 changes: 22 additions & 3 deletions cpp/src/arrow/dataset/file_csv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,28 @@ static inline Result<csv::ConvertOptions> GetConvertOptions(

if (!scan_options) return convert_options;

auto materialized = scan_options->MaterializedFields();
std::unordered_set<std::string> materialized_fields(materialized.begin(),
materialized.end());
auto field_refs = scan_options->MaterializedFields();
std::unordered_set<std::string> materialized_fields;
materialized_fields.reserve(field_refs.size());
// Preprocess field refs. We try to avoid FieldRef::GetFoo here since that's
// quadratic (and this is significant overhead with 1000+ columns)
for (const auto& ref : field_refs) {
if (const std::string* name = ref.name()) {
// Common case
materialized_fields.emplace(*name);
continue;
}
// Currently CSV reader doesn't support reading any nested types, so this
// path shouldn't be hit. However, implement it in the same way as IPC/ORC:
// load the entire top-level field if a nested field is selected.
ARROW_ASSIGN_OR_RAISE(auto field, ref.GetOneOrNone(*scan_options->dataset_schema));
if (column_names.find(field->name()) == column_names.end()) continue;
// Only read the requested columns
convert_options.include_columns.push_back(field->name());
// Properly set conversion types
convert_options.column_types[field->name()] = field->type();
}

for (auto field : scan_options->dataset_schema->fields()) {
if (materialized_fields.find(field->name()) == materialized_fields.end()) continue;
// Ignore virtual columns.
Expand Down
14 changes: 11 additions & 3 deletions cpp/src/arrow/dataset/file_csv_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,21 @@ class TestCsvFileFormatScan : public FileFormatScanMixin<CsvFormatHelper> {};

TEST_P(TestCsvFileFormatScan, ScanRecordBatchReader) { TestScan(); }
TEST_P(TestCsvFileFormatScan, ScanBatchSize) { TestScanBatchSize(); }
TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
TestScanWithVirtualColumn();
}
TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderProjected) { TestScanProjected(); }
// NOTE(ARROW-14658): TestScanProjectedNested is ignored since CSV
// doesn't have any nested types for us to work with
TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderProjectedMissingCols) {
TestScanProjectedMissingCols();
}
TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
TestScanWithVirtualColumn();
}
TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithDuplicateColumn) {
// The CSV reader rejects duplicate columns
}
TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithDuplicateColumnError) {
TestScanWithDuplicateColumnError();
}

INSTANTIATE_TEST_SUITE_P(TestScan, TestCsvFileFormatScan,
::testing::ValuesIn(TestFormatParams::Values()),
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ static inline Future<std::shared_ptr<ipc::RecordBatchFileReader>> OpenReaderAsyn
}

static inline Result<std::vector<int>> GetIncludedFields(
const Schema& schema, const std::vector<std::string>& materialized_fields) {
const Schema& schema, const std::vector<FieldRef>& materialized_fields) {
std::vector<int> included_fields;

for (FieldRef ref : materialized_fields) {
for (const auto& ref : materialized_fields) {
ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(schema));
if (match.indices().empty()) continue;

Expand Down
15 changes: 12 additions & 3 deletions cpp/src/arrow/dataset/file_ipc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,22 @@ class TestIpcFileFormatScan : public FileFormatScanMixin<IpcFormatHelper> {};

TEST_P(TestIpcFileFormatScan, ScanRecordBatchReader) { TestScan(); }
TEST_P(TestIpcFileFormatScan, ScanBatchSize) { TestScanBatchSize(); }
TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
TestScanWithVirtualColumn();
}
TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderProjected) { TestScanProjected(); }
TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderProjectedNested) {
TestScanProjectedNested();
}
TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderProjectedMissingCols) {
TestScanProjectedMissingCols();
}
TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
TestScanWithVirtualColumn();
}
TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumn) {
TestScanWithDuplicateColumn();
}
TEST_P(TestIpcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumnError) {
TestScanWithDuplicateColumnError();
}
TEST_P(TestIpcFileFormatScan, FragmentScanOptions) {
auto reader = GetRecordBatchReader(
// ARROW-12077: on Windows/mimalloc/release, nullable list column leads to crash
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/arrow/dataset/file_orc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,11 @@ class OrcScanTask {
// filter out virtual columns
std::vector<std::string> included_fields;
ARROW_ASSIGN_OR_RAISE(auto schema, reader->ReadSchema());
for (auto name : materialized_fields) {
FieldRef ref(name);
for (const auto& ref : materialized_fields) {
ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*schema));
if (match.indices().empty()) continue;

included_fields.push_back(name);
included_fields.push_back(schema->field(match.indices()[0])->name());
}

return RecordBatchIterator(
Expand Down
16 changes: 14 additions & 2 deletions cpp/src/arrow/dataset/file_orc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,25 @@ TEST_F(TestOrcFileFormat, CountRows) { TestCountRows(); }
class TestOrcFileFormatScan : public FileFormatScanMixin<OrcFormatHelper> {};

TEST_P(TestOrcFileFormatScan, ScanRecordBatchReader) { TestScan(); }
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
TestScanWithVirtualColumn();
TEST_P(TestOrcFileFormatScan, ScanBatchSize) {
// TODO(ARROW-14153): TestScanBatchSize();
}
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderProjected) { TestScanProjected(); }
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderProjectedNested) {
TestScanProjectedNested();
}
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderProjectedMissingCols) {
TestScanProjectedMissingCols();
}
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithVirtualColumn) {
TestScanWithVirtualColumn();
}
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumn) {
TestScanWithDuplicateColumn();
}
TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumnError) {
TestScanWithDuplicateColumnError();
}
INSTANTIATE_TEST_SUITE_P(TestScan, TestOrcFileFormatScan,
::testing::ValuesIn(TestFormatParams::Values()),
TestFormatParams::ToTestNameString);
Expand Down
121 changes: 102 additions & 19 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,30 +155,112 @@ void AddColumnIndices(const SchemaField& schema_field,
}
}

// Compute the column projection out of an optional arrow::Schema
std::vector<int> InferColumnProjection(const parquet::arrow::FileReader& reader,
const ScanOptions& options) {
Status ResolveOneFieldRef(
const SchemaManifest& manifest, const FieldRef& field_ref,
const std::unordered_map<std::string, const SchemaField*>& field_lookup,
const std::unordered_set<std::string>& duplicate_fields,
std::vector<int>* columns_selection) {
if (const std::string* name = field_ref.name()) {
auto it = field_lookup.find(*name);
if (it != field_lookup.end()) {
AddColumnIndices(*it->second, columns_selection);
} else if (duplicate_fields.find(*name) != duplicate_fields.end()) {
// We shouldn't generally get here because SetProjection will reject such references
return Status::Invalid("Ambiguous reference to column '", *name,
"' which occurs more than once");
}
// "Virtual" column: field is not in file but is in the ScanOptions.
// Ignore it here, as projection will pad the batch with a null column.
return Status::OK();
}

const SchemaField* toplevel = nullptr;
const SchemaField* field = nullptr;
if (const std::vector<FieldRef>* refs = field_ref.nested_refs()) {
// Only supports a sequence of names
for (const auto& ref : *refs) {
if (const std::string* name = ref.name()) {
if (!field) {
// First lookup, top-level field
auto it = field_lookup.find(*name);
if (it != field_lookup.end()) {
field = it->second;
toplevel = field;
} else if (duplicate_fields.find(*name) != duplicate_fields.end()) {
return Status::Invalid("Ambiguous reference to column '", *name,
"' which occurs more than once");
} else {
// Virtual column
return Status::OK();
}
} else {
const SchemaField* result = nullptr;
for (const auto& child : field->children) {
if (child.field->name() == *name) {
if (!result) {
result = &child;
} else {
return Status::Invalid("Ambiguous nested reference to column '", *name,
"' which occurs more than once in field ",
field->field->ToString());
}
}
}
if (!result) {
// Virtual column
return Status::OK();
}
field = result;
}
continue;
}
return Status::NotImplemented("Inferring column projection from FieldRef ",
field_ref.ToString());
}
} else {
return Status::NotImplemented("Inferring column projection from FieldRef ",
field_ref.ToString());
}

if (field) {
// TODO(ARROW-1888): support fine-grained column projection. We should be
// able to materialize only the child fields requested, and not the entire
// top-level field.
// Right now, if enabled, projection/filtering will fail when they cast the
// physical schema to the dataset schema.
AddColumnIndices(*toplevel, columns_selection);
}
return Status::OK();
}

// Compute the column projection based on the scan options
Result<std::vector<int>> InferColumnProjection(const parquet::arrow::FileReader& reader,
const ScanOptions& options) {
auto manifest = reader.manifest();
// Checks if the field is needed in either the projection or the filter.
auto field_names = options.MaterializedFields();
std::unordered_set<std::string> materialized_fields{field_names.cbegin(),
field_names.cend()};
auto should_materialize_column = [&materialized_fields](const std::string& f) {
return materialized_fields.find(f) != materialized_fields.end();
};

std::vector<int> columns_selection;
// Note that the loop is using the file's schema to iterate instead of the
// materialized fields of the ScanOptions. This ensures that missing
// fields in the file (but present in the ScanOptions) will be ignored. The
// scanner's projector will take care of padding the column with the proper
// values.
auto field_refs = options.MaterializedFields();

// Build a lookup table from top level field name to field metadata.
// This is to avoid quadratic-time mapping of projected fields to
// column indices, in the common case of selecting top level
// columns. For nested fields, we will pay the cost of a linear scan
// assuming for now that this is relatively rare, but this can be
// optimized. (Also, we don't want to pay the cost of building all
// the lookup tables up front if they're rarely used.)
std::unordered_map<std::string, const SchemaField*> field_lookup;
std::unordered_set<std::string> duplicate_fields;
for (const auto& schema_field : manifest.schema_fields) {
if (should_materialize_column(schema_field.field->name())) {
AddColumnIndices(schema_field, &columns_selection);
const auto it = field_lookup.emplace(schema_field.field->name(), &schema_field);
if (!it.second) {
duplicate_fields.emplace(schema_field.field->name());
}
}

std::vector<int> columns_selection;
for (const auto& ref : field_refs) {
RETURN_NOT_OK(ResolveOneFieldRef(manifest, ref, field_lookup, duplicate_fields,
&columns_selection));
}
return columns_selection;
}

Expand Down Expand Up @@ -351,7 +433,8 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
parquet_fragment->FilterRowGroups(options->filter));
if (row_groups.empty()) return MakeEmptyGenerator<std::shared_ptr<RecordBatch>>();
}
auto column_projection = InferColumnProjection(*reader, *options);
ARROW_ASSIGN_OR_RAISE(auto column_projection,
InferColumnProjection(*reader, *options));
ARROW_ASSIGN_OR_RAISE(
auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(
Expand Down
Loading

0 comments on commit b12fb6e

Please sign in to comment.