Skip to content

Commit

Permalink
PARQUET-568: Enable top-level column selection.
Browse files Browse the repository at this point in the history
Author: Aliaksei Sandryhaila <aliaksei.sandryhaila@hp.com>

Closes apache#81 from asandryh/PARQUET-568 and squashes the following commits:

f619ed0 [Aliaksei Sandryhaila] Addressed PR comments.
bf12164 [Aliaksei Sandryhaila] Added column selection capability to parquet_reader.

Change-Id: I5cb658f51e9f761e83be22424f2f36593a169766
  • Loading branch information
Aliaksei Sandryhaila authored and wesm committed Mar 23, 2016
1 parent 3b897aa commit 93faeb8
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 16 deletions.
38 changes: 25 additions & 13 deletions cpp/src/parquet/file/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,24 @@ std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
// the fixed initial size is just for an example
#define COL_WIDTH "20"

void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
void ParquetFileReader::DebugPrint(std::ostream& stream,
std::list<int> selected_columns, bool print_values) {
stream << "File statistics:\n";
stream << "Total rows: " << this->num_rows() << "\n";
stream << "Total rows: " << num_rows() << "\n";

for (int i = 0; i < num_columns(); ++i) {
if (selected_columns.size() == 0) {
for (int i = 0; i < num_columns(); i++) {
selected_columns.push_back(i);
}
} else {
for (auto i : selected_columns) {
if (i < 0 || i >= num_columns()) {
throw ParquetException("Selected column is out of range");
}
}
}

for (auto i : selected_columns) {
const ColumnDescriptor* descr = schema_->Column(i);
stream << "Column " << i << ": "
<< descr->name()
Expand All @@ -152,9 +165,7 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
auto group_reader = RowGroup(r);

// Print column metadata
int num_columns = group_reader->num_columns();

for (int i = 0; i < num_columns; ++i) {
for (auto i : selected_columns) {
RowGroupStatistics stats = group_reader->GetColumnStats(i);

stream << "Column " << i << ": "
Expand All @@ -174,9 +185,10 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
static constexpr int bufsize = 25;
char buffer[bufsize];

// Create readers for all columns and print contents
vector<std::shared_ptr<Scanner> > scanners(num_columns, NULL);
for (int i = 0; i < num_columns; ++i) {
// Create readers for selected columns and print contents
vector<std::shared_ptr<Scanner> > scanners(selected_columns.size(), NULL);
int j = 0;
for (auto i : selected_columns) {
std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i);

std::stringstream ss;
Expand All @@ -188,17 +200,17 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {

// This is OK in this method as long as the RowGroupReader does not get
// deleted
scanners[i] = Scanner::Make(col_reader);
scanners[j++] = Scanner::Make(col_reader);
}
stream << "\n";

bool hasRow;
do {
hasRow = false;
for (int i = 0; i < num_columns; ++i) {
if (scanners[i]->HasNext()) {
for (auto scanner : scanners) {
if (scanner->HasNext()) {
hasRow = true;
scanners[i]->PrintNext(stream, 17);
scanner->PrintNext(stream, 17);
}
}
stream << "\n";
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/parquet/file/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstdint>
#include <iosfwd>
#include <memory>
#include <list>
#include <string>

#include "parquet/column/page.h"
Expand Down Expand Up @@ -119,7 +120,8 @@ class ParquetFileReader {
return schema_->Column(i);
}

void DebugPrint(std::ostream& stream, bool print_values = true);
void DebugPrint(std::ostream& stream, std::list<int> selected_columns,
bool print_values = true);

private:
// PIMPL idiom
Expand Down
29 changes: 27 additions & 2 deletions cpp/src/parquet/reader-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,38 @@ TEST_F(TestAllTypesPlain, TestSetScannerBatchSize) {
TEST_F(TestAllTypesPlain, DebugPrintWorks) {
std::stringstream ss;

// Automatically parses metadata
reader_->DebugPrint(ss);
std::list<int> columns;
reader_->DebugPrint(ss, columns);

std::string result = ss.str();
ASSERT_GT(result.size(), 0);
}

TEST_F(TestAllTypesPlain, ColumnSelection) {
std::stringstream ss;

std::list<int> columns;
columns.push_back(5);
columns.push_back(0);
columns.push_back(10);
reader_->DebugPrint(ss, columns);

std::string result = ss.str();
ASSERT_GT(result.size(), 0);
}

TEST_F(TestAllTypesPlain, ColumnSelectionOutOfRange) {
std::stringstream ss;

std::list<int> columns;
columns.push_back(100);
ASSERT_THROW(reader_->DebugPrint(ss, columns), ParquetException);

columns.clear();
columns.push_back(-1);
ASSERT_THROW(reader_->DebugPrint(ss, columns), ParquetException);
}


class TestLocalFileSource : public ::testing::Test {
public:
Expand Down

0 comments on commit 93faeb8

Please sign in to comment.