Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-14658: [C++] Add basic support for nested field refs in scanning #11704

Closed
wants to merge 2 commits into from

Conversation

lidavidm
Copy link
Member

@lidavidm lidavidm commented Nov 15, 2021

This implements the following:

  • Being able to project and filter on nested fields in the scanner/query engine.

Parquet, ORC, and Feather are supported/tested. For ORC and Feather, we will read the entire top-level column. (CSV does not support reading any nested types, though if it does in the future, it should behave the same as Feather/ORC.) For Parquet, we could materialize only the leaf nodes necessary for the projection, but without ARROW-1888 this will fail later on in the scanning pipeline, so we behave the same as Feather/ORC.

The following are not implemented:

  • Normally, the scanner can fill in a column of nulls if a requested column does not exist in a file. This is not supported for nested field refs because we need ARROW-1888 to be implemented.

  • A nested field ref cannot be used as a key/target of an aggregation or join. However, you can first project the nested fields into their own fields, then aggregate/join on them as usual.

    This limitation is because the aggregate/join nodes currently compute a FieldPath to resolve a FieldRef, but then throw away the path, keeping only the first index. To implement this, we would need to store the FieldPath and use the struct_field kernel to resolve the actual array, however, this will have more overhead and we should be careful about regressions here, especially in the common case of no nested field refs.

  • Only FieldRefs consisting of field names are supported. For FieldRefs consisting of FieldPath (= a sequence of indices), the semantics are unclear. So far, the scanner is robust to individual files having fields in a different order than the overall dataset, but this won't work for FieldPath, so either we must require that the schema is consistent across files, or come up with some way to map file schemas onto the dataset schema so that indices have a consistent meaning.

@github-actions
Copy link

@nealrichardson
Copy link
Member

nealrichardson commented Nov 15, 2021

  • A nested field ref cannot be used as a key/target of an aggregation or join.

If I project a = struct_col.some_nested_field in one step, can I then aggregate a?

@lidavidm
Copy link
Member Author

  • A nested field ref cannot be used as a key/target of an aggregation or join.

If I project a = struct_col.some_nested_field in one step, can I then aggregate a?

Yes, you can (I just pushed a test to confirm that).

@nealrichardson
Copy link
Member

  • A nested field ref cannot be used as a key/target of an aggregation or join.

If I project a = struct_col.some_nested_field in one step, can I then aggregate a?

Yes, you can (I just pushed a test to confirm that).

Excellent. Can you note that on the PR description then? (i.e. you can't directly use a nested field ref there but you can project and then use what you projected)

@lidavidm
Copy link
Member Author

Done (also clarified the comment about CSV)

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a great addition. I have some minor nits on const auto * that you're welcome to ignore and a few questions. I think my biggest concern would be that the scanner should have a consistent output schema regardless of the format. But maybe I'm reading that test wrong.

Comment on lines +1660 to +1677
const std::vector<FieldRef>* nested_refs() const {
return util::holds_alternative<std::vector<FieldRef>>(impl_)
? &util::get<std::vector<FieldRef>>(impl_)
: NULLPTR;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the logic here different than the logic above in IsNested? I would expect this would be return IsNested() ? ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is because IsNested is checking whether it's either a FieldPath or a series of Names, but this accessor only wants the latter case. (I think the IsNested naming is a little unfortunate…)

@@ -709,6 +808,35 @@ class FileFormatScanMixin : public FileFormatFixtureMixin<FormatHelper>,
ASSERT_EQ(row_count, expected_rows());
}
}
void TestScanWithDuplicateColumn() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any particular reason to allow this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps not - I just wanted to make sure I didn't break this inadvertently since it was working.

ASSERT_EQ(row_count, expected_rows());
}
{
// File includes an extra child in struct2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems arbitrary that we can't handle this case but we're fine with a missing child. Though maybe I am reading the test incorrectly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can handle missing fields just fine because (once ARROW-1888 is implemented) we can synthesize a null child to stand in for it. But, we can't handle a duplicate name because it's ambiguous which child we're referring to.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the comment here is a little misleading. I'll edit it to reflect that it's a duplicate name.

Comment on lines 1477 to 1492
auto batch_it = record_batches.begin();
for (int fragment_index = 0; fragment_index < 2; ++fragment_index) {
for (int batch_index = 0; batch_index < 2; ++batch_index) {
const auto& batch = *batch_it++;

// the scanned ExecBatches will begin with physical columns
batches.emplace_back(*batch);

// scanned batches will be augmented with fragment and batch indices
batches.back().values.emplace_back(fragment_index);
batches.back().values.emplace_back(batch_index);

// ... and with the last-in-fragment flag
batches.back().values.emplace_back(batch_index == 1);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic feels like it belongs in a helper method somewhere. Maybe a DatasetAndBatchesFromJSON

Comment on lines 1453 to 1461
RecordBatchFromJSON(physical_schema, R"([{"a": 1, "b": null, "c": {"e": 0}},
{"a": 2, "b": true, "c": {"e": 1}}])"),
RecordBatchFromJSON(physical_schema, R"([{"a": null, "b": true, "c": {"e": 2}},
{"a": 3, "b": false, "c": {"e": null}}])"),
RecordBatchFromJSON(physical_schema, R"([{"a": null, "b": true, "c": {"e": 4}},
{"a": 4, "b": false, "c": {"e": 5}}])"),
RecordBatchFromJSON(physical_schema, R"([{"a": 5, "b": null, "c": {"e": 6}},
{"a": 6, "b": false, "c": {"e": 7}},
{"a": 7, "b": false, "c": {"e": null}}])"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Add some top-level nulls? Or cases where c is null?

const std::unordered_map<std::string, const SchemaField*>& field_lookup,
const std::unordered_set<std::string>& duplicate_fields,
std::vector<int>* columns_selection) {
if (const auto* name = field_ref.name()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (const auto* name = field_ref.name()) {
if (const std::string* name = field_ref.name()) {

Same optional nit as above.

if (const auto* refs = field_ref.nested_refs()) {
// Only supports a sequence of names
for (const auto& ref : *refs) {
if (const auto* name = ref.name()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (const auto* name = ref.name()) {
if (const std::string* name = ref.name()) {

Of course, by this point, I think I'm unlikely to forget the rule 😆 Feel free to ignore these.

}

const SchemaField* field = nullptr;
if (const auto* refs = field_ref.nested_refs()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (const auto* refs = field_ref.nested_refs()) {
if (const std::vector<FieldRef>* refs = field_ref.nested_refs()) {

@@ -534,6 +543,8 @@ class FileFormatFixtureMixin : public ::testing::Test {
std::shared_ptr<ScanOptions> opts_;
};

MATCHER(PointeesEquals, "") { return std::get<0>(arg)->Equals(*std::get<1>(arg)); }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems general enough to go in a test util file?

Comment on lines 671 to 675
if (fine_grained_selection) {
// Some formats, like Parquet, let you pluck only a part of a complex type
expected_schema = schema({
field("struct1", struct_({f32})),
field("struct2", struct_({i64, struct1})),
});
} else {
expected_schema = schema({struct1, struct2});
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding this I'm not sure I like it. I would expect the resulting schema to be the same regardless of whether the underlying format supported partial projection or not. For formats that don't support partial projection I would expect it would be simulated by a full read and then a cast.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overall schema will be the same once we pass through projection, i.e. the cast is done in the scanner instead of inside every file format. However, the tests here are reading from the fragment directly to check the physical schema, instead of the post-projection schema. I'll make sure both cases are covered in tests, though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah…this is a little problematic since filtering/projection cast to the dataset schema first, and then we run into ARROW-1888 again as a result. I might go implement that first since this PR becomes a lot more useful with that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(And, well, ARROW-1888 is a little easier with ARROW-7051…)

Copy link
Member

@westonpace westonpace Nov 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, projection is now inside the exec plan. Also, projection doesn't occur until near the end of the exec plan (e.g. the filter step runs on the unprojected data). So it is important for the scan to do its own internal projection to the dataset schema.

If the blocker is ARROW-1888 I think it would be fine to implement this as-is with comments for a follow-up JIRA next to the test behavior we expect to change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No tests will change (other than the one marked already), but as-is, you can't scan a Parquet dataset and project a nested field (since when we project from the specific schema to the dataset schema, we'll fail).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I've disabled fine-grained projection for now and marked it with ARROW-1888.

@lidavidm
Copy link
Member Author

lidavidm commented Dec 3, 2021

CC @westonpace if you have any final comments

@kszucs
Copy link
Member

kszucs commented Jan 14, 2022

@lidavidm requires a rebase. It'd be a nice addition to 7.0, but if it won't make it then please postpone the jira to 8.0.

@lidavidm
Copy link
Member Author

I'll try to rebase by EOD today, thanks for the ping.

@lidavidm
Copy link
Member Author

Rebased, cc @westonpace if you have any final comments

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is good. Some thoughts:

Projection foo out of struct: { "foo" int32 } yields a field named foo. I think my initial assumption would have been struct/foo but that begs the question "what happens if a field name includes the delimiter (/)?" so I think this is fine. Plus, users can always supply names when the project if they care.

I wonder if sometime down the road we might want to convert nested field refs to integer arrays at a relatively high level in the API (e.g. sanitizing user input). For example, FieldRef("struct", "foo") becomes [3, 0] (assuming "struct" is the fourth field in the schema and "foo" is the first field in the struct). This would allow users to potentially specify nested refs in the presence of duplicate fields (if the user is willing to specify the ref as an integer array). I think this is closer to how nested refs are going to be coming from Substrait as well. Then, if we only accept integer arrays at the lower levels, it simplifies the logic and avoids the risk of the quadratic-time mapping.

Comment on lines 396 to 398
TEST_P(TestCsvFileFormatScan, ScanRecordBatchReaderWithDuplicateColumn) {
// The CSV reader rejects duplicate columns
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can't we just omit this test? I don't suppose it causes any harm.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant this as 'documentation' so I've turned it into just a comment

@lidavidm
Copy link
Member Author

I was thinking about the same thing in regards to indices. The current design, for better or worse, is "robust" to fields being reordered, but I don't think that was intentional and we may not want (or need) to have that property. If so I agree resolving fields to indices ASAP Is best.

@lidavidm lidavidm closed this in 5fb2243 Jan 18, 2022
@ursabot
Copy link

ursabot commented Jan 18, 2022

Benchmark runs are scheduled for baseline = 30ddc2f and contender = 5fb2243. 5fb2243 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed ⬇️0.0% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.04% ⬆️0.0%] ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python. Runs only benchmarks with cloud = True
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants