Skip to content

Commit

Permalink
ARROW-14658: [C++] Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
lidavidm committed Nov 16, 2021
1 parent 356be43 commit ddc1eb4
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 94 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ BatchesWithSchema MakeNestedBatches() {
BatchesWithSchema out;
out.batches = {
ExecBatchFromJSON(
{ty}, R"([[{"i32": null, "bool": true}], [{"i32": 4, "bool": false}]])"),
{ty}, R"([[{"i32": null, "bool": true}], [{"i32": 4, "bool": false}], [null]])"),
ExecBatchFromJSON(
{ty},
R"([[{"i32": 5, "bool": null}], [{"i32": 6, "bool": false}], [{"i32": 7, "bool": false}]])")};
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/dataset/file_csv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,14 @@ static inline Result<csv::ConvertOptions> GetConvertOptions(
// Preprocess field refs. We try to avoid FieldRef::GetFoo here since that's
// quadratic (and this is significant overhead with 1000+ columns)
for (const auto& ref : field_refs) {
if (const auto* name = ref.name()) {
if (const std::string* name = ref.name()) {
// Common case
materialized_fields.emplace(*name);
continue;
}
// CSV doesn't really support nested types so do our best
// Currently CSV reader doesn't support reading any nested types, so this
// path shouldn't be hit. However, implement it in the same way as IPC/ORC:
// load the entire top-level field if a nested field is selected.
ARROW_ASSIGN_OR_RAISE(auto field, ref.GetOneOrNone(*scan_options->dataset_schema));
if (column_names.find(field->name()) == column_names.end()) continue;
// Only read the requested columns
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ Status ResolveOneFieldRef(
const std::unordered_map<std::string, const SchemaField*>& field_lookup,
const std::unordered_set<std::string>& duplicate_fields,
std::vector<int>* columns_selection) {
if (const auto* name = field_ref.name()) {
if (const std::string* name = field_ref.name()) {
auto it = field_lookup.find(*name);
if (it != field_lookup.end()) {
AddColumnIndices(*it->second, columns_selection);
Expand All @@ -251,10 +251,10 @@ Status ResolveOneFieldRef(
}

const SchemaField* field = nullptr;
if (const auto* refs = field_ref.nested_refs()) {
if (const std::vector<FieldRef>* refs = field_ref.nested_refs()) {
// Only supports a sequence of names
for (const auto& ref : *refs) {
if (const auto* name = ref.name()) {
if (const std::string* name = ref.name()) {
if (!field) {
// First lookup, top-level field
auto it = field_lookup.find(*name);
Expand Down
179 changes: 94 additions & 85 deletions cpp/src/arrow/dataset/scanner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1370,65 +1370,100 @@ struct DatasetAndBatches {
std::vector<compute::ExecBatch> batches;
};

DatasetAndBatches MakeBasicDataset() {
const auto dataset_schema = ::arrow::schema({
field("a", int32()),
field("b", boolean()),
field("c", int32()),
});

const auto physical_schema = SchemaFromColumnNames(dataset_schema, {"a", "b"});

RecordBatchVector record_batches{
RecordBatchFromJSON(physical_schema, R"([{"a": 1, "b": null},
{"a": 2, "b": true}])"),
RecordBatchFromJSON(physical_schema, R"([{"a": null, "b": true},
{"a": 3, "b": false}])"),
RecordBatchFromJSON(physical_schema, R"([{"a": null, "b": true},
{"a": 4, "b": false}])"),
RecordBatchFromJSON(physical_schema, R"([{"a": 5, "b": null},
{"a": 6, "b": false},
{"a": 7, "b": false}])"),
};

auto dataset = std::make_shared<FragmentDataset>(
dataset_schema,
FragmentVector{
std::make_shared<InMemoryFragment>(
physical_schema, RecordBatchVector{record_batches[0], record_batches[1]},
equal(field_ref("c"), literal(23))),
std::make_shared<InMemoryFragment>(
physical_schema, RecordBatchVector{record_batches[2], record_batches[3]},
equal(field_ref("c"), literal(47))),
});
DatasetAndBatches DatasetAndBatchesFromJSON(
const std::shared_ptr<Schema>& dataset_schema,
const std::shared_ptr<Schema>& physical_schema,
const std::vector<std::vector<std::string>>& fragment_batch_strs,
const std::vector<compute::Expression>& guarantees,
std::function<void(compute::ExecBatch*, const RecordBatch&)> make_exec_batch = {}) {
if (!guarantees.empty()) {
EXPECT_EQ(fragment_batch_strs.size(), guarantees.size());
}
RecordBatchVector record_batches;
FragmentVector fragments;
fragments.reserve(fragment_batch_strs.size());
for (size_t i = 0; i < fragment_batch_strs.size(); i++) {
const auto& batch_strs = fragment_batch_strs[i];
RecordBatchVector fragment_batches;
fragment_batches.reserve(batch_strs.size());
for (const auto& batch_str : batch_strs) {
fragment_batches.push_back(RecordBatchFromJSON(physical_schema, batch_str));
}
record_batches.insert(record_batches.end(), fragment_batches.begin(),
fragment_batches.end());
fragments.push_back(std::make_shared<InMemoryFragment>(
physical_schema, std::move(fragment_batches),
guarantees.empty() ? literal(true) : guarantees[i]));
}

std::vector<compute::ExecBatch> batches;

auto batch_it = record_batches.begin();
for (int fragment_index = 0; fragment_index < 2; ++fragment_index) {
for (int batch_index = 0; batch_index < 2; ++batch_index) {
for (size_t fragment_index = 0; fragment_index < fragment_batch_strs.size();
++fragment_index) {
for (size_t batch_index = 0; batch_index < fragment_batch_strs[fragment_index].size();
++batch_index) {
const auto& batch = *batch_it++;

// the scanned ExecBatches will begin with physical columns
batches.emplace_back(*batch);

// a placeholder will be inserted for partition field "c"
batches.back().values.emplace_back(std::make_shared<Int32Scalar>());
// allow customizing the ExecBatch (e.g. to fill in placeholders for partition
// fields)
if (make_exec_batch) {
make_exec_batch(&batches.back(), *batch);
}

// scanned batches will be augmented with fragment and batch indices
batches.back().values.emplace_back(fragment_index);
batches.back().values.emplace_back(batch_index);
batches.back().values.emplace_back(static_cast<int>(fragment_index));
batches.back().values.emplace_back(static_cast<int>(batch_index));

// ... and with the last-in-fragment flag
batches.back().values.emplace_back(batch_index == 1);
batches.back().values.emplace_back(batch_index ==
fragment_batch_strs[fragment_index].size() - 1);

// each batch carries a guarantee inherited from its Fragment's partition expression
batches.back().guarantee =
equal(field_ref("c"), literal(fragment_index == 0 ? 23 : 47));
batches.back().guarantee = fragments[fragment_index]->partition_expression();
}
}

return {dataset, batches};
auto dataset = std::make_shared<FragmentDataset>(dataset_schema, std::move(fragments));
return {std::move(dataset), std::move(batches)};
}

DatasetAndBatches MakeBasicDataset() {
const auto dataset_schema = ::arrow::schema({
field("a", int32()),
field("b", boolean()),
field("c", int32()),
});

const auto physical_schema = SchemaFromColumnNames(dataset_schema, {"a", "b"});

return DatasetAndBatchesFromJSON(
dataset_schema, physical_schema,
{
{
R"([{"a": 1, "b": null},
{"a": 2, "b": true}])",
R"([{"a": null, "b": true},
{"a": 3, "b": false}])",
},
{
R"([{"a": null, "b": true},
{"a": 4, "b": false}])",
R"([{"a": 5, "b": null},
{"a": 6, "b": false},
{"a": 7, "b": false}])",
},
},
{
equal(field_ref("c"), literal(23)),
equal(field_ref("c"), literal(47)),
},
[](compute::ExecBatch* batch, const RecordBatch&) {
// a placeholder will be inserted for partition field "c"
batch->values.emplace_back(std::make_shared<Int32Scalar>());
});
}

DatasetAndBatches MakeNestedDataset() {
Expand All @@ -1440,7 +1475,6 @@ DatasetAndBatches MakeNestedDataset() {
field("e", float64()),
})),
});

const auto physical_schema = ::arrow::schema({
field("a", int32()),
field("b", boolean()),
Expand All @@ -1449,49 +1483,24 @@ DatasetAndBatches MakeNestedDataset() {
})),
});

RecordBatchVector record_batches{
RecordBatchFromJSON(physical_schema, R"([{"a": 1, "b": null, "c": {"e": 0}},
{"a": 2, "b": true, "c": {"e": 1}}])"),
RecordBatchFromJSON(physical_schema, R"([{"a": null, "b": true, "c": {"e": 2}},
{"a": 3, "b": false, "c": {"e": null}}])"),
RecordBatchFromJSON(physical_schema, R"([{"a": null, "b": true, "c": {"e": 4}},
{"a": 4, "b": false, "c": {"e": 5}}])"),
RecordBatchFromJSON(physical_schema, R"([{"a": 5, "b": null, "c": {"e": 6}},
return DatasetAndBatchesFromJSON(dataset_schema, physical_schema,
{
{
R"([{"a": 1, "b": null, "c": {"e": 0}},
{"a": 2, "b": true, "c": {"e": 1}}])",
R"([{"a": null, "b": true, "c": {"e": 2}},
{"a": 3, "b": false, "c": {"e": null}}])",
R"([{"a": null, "b": null, "c": null}])",
},
{
R"([{"a": null, "b": true, "c": {"e": 4}},
{"a": 4, "b": false, "c": null}])",
R"([{"a": 5, "b": null, "c": {"e": 6}},
{"a": 6, "b": false, "c": {"e": 7}},
{"a": 7, "b": false, "c": {"e": null}}])"),
};

auto dataset = std::make_shared<FragmentDataset>(
dataset_schema,
FragmentVector{
std::make_shared<InMemoryFragment>(
physical_schema, RecordBatchVector{record_batches[0], record_batches[1]},
literal(true)),
std::make_shared<InMemoryFragment>(
physical_schema, RecordBatchVector{record_batches[2], record_batches[3]},
literal(true)),
});

std::vector<compute::ExecBatch> batches;

auto batch_it = record_batches.begin();
for (int fragment_index = 0; fragment_index < 2; ++fragment_index) {
for (int batch_index = 0; batch_index < 2; ++batch_index) {
const auto& batch = *batch_it++;

// the scanned ExecBatches will begin with physical columns
batches.emplace_back(*batch);

// scanned batches will be augmented with fragment and batch indices
batches.back().values.emplace_back(fragment_index);
batches.back().values.emplace_back(batch_index);

// ... and with the last-in-fragment flag
batches.back().values.emplace_back(batch_index == 1);
}
}

return {dataset, batches};
{"a": 7, "b": false, "c": {"e": null}}])",
},
},
/*guarantees=*/{});
}

compute::Expression Materialize(std::vector<std::string> names,
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/arrow/dataset/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "arrow/testing/future_util.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"
#include "arrow/testing/random.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/io_util.h"
Expand Down Expand Up @@ -543,8 +544,6 @@ class FileFormatFixtureMixin : public ::testing::Test {
std::shared_ptr<ScanOptions> opts_;
};

MATCHER(PointeesEquals, "") { return std::get<0>(arg)->Equals(*std::get<1>(arg)); }

template <typename FormatHelper>
class FileFormatScanMixin : public FileFormatFixtureMixin<FormatHelper>,
public ::testing::WithParamInterface<TestFormatParams> {
Expand Down Expand Up @@ -697,7 +696,7 @@ class FileFormatScanMixin : public FileFormatFixtureMixin<FormatHelper>,
ASSERT_EQ(row_count, expected_rows());
}
{
// File includes an extra child in struct2
// File includes a duplicated name in struct2
auto struct2_physical = field("struct2", struct_({f64, i64, struct1, i64}));
auto reader = this->GetRecordBatchReader(
schema({struct1, struct2_physical, f32, f64, i32, i64}));
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/testing/matchers.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@

namespace arrow {

// A matcher that checks that the values pointed to are Equals().
// Useful in conjunction with other googletest matchers.
MATCHER(PointeesEquals, "") { return std::get<0>(arg)->Equals(*std::get<1>(arg)); }

template <typename ResultMatcher>
class FutureMatcher {
public:
Expand Down

0 comments on commit ddc1eb4

Please sign in to comment.