Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-304: Add an option to make requested schema case insensitive in read path #210

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,13 @@
public class SchemaCompatibilityValidator implements FilterPredicate.Visitor<Void> {

public static void validate(FilterPredicate predicate, MessageType schema) {
validate(predicate, schema, true);
}

public static void validate(FilterPredicate predicate, MessageType schema, boolean isCaseSensitive) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think that isCaseSensitive should be a property of the FilterPredicate.

checkNotNull(predicate, "predicate");
checkNotNull(schema, "schema");
predicate.accept(new SchemaCompatibilityValidator(schema));
predicate.accept(new SchemaCompatibilityValidator(schema, isCaseSensitive));
}

// A map of column name to the type the user supplied for this column.
Expand All @@ -76,11 +80,14 @@ public static void validate(FilterPredicate predicate, MessageType schema) {
// the original type of a column, keyed by path
private final Map<ColumnPath, OriginalType> originalTypes = new HashMap<ColumnPath, OriginalType>();

private SchemaCompatibilityValidator(MessageType schema) {
private boolean isCaseSensitive = true;

private SchemaCompatibilityValidator(MessageType schema, boolean isCaseSensitive) {

this.isCaseSensitive = isCaseSensitive;
for (ColumnDescriptor cd : schema.getColumns()) {
ColumnPath columnPath = ColumnPath.get(cd.getPath());
columnsAccordingToSchema.put(columnPath, cd);
ColumnPath columnPath = isCaseSensitive? ColumnPath.get(cd.getPath()):ColumnPath.getLowerCase(cd.getPath());
columnsAccordingToSchema.put(columnPath, cd);

OriginalType ot = schema.getType(cd.getPath()).getOriginalType();
if (ot != null) {
Expand Down Expand Up @@ -186,7 +193,8 @@ private <T extends Comparable<T>> void validateColumn(Column<T> column) {
}

private ColumnDescriptor getColumnDescriptor(ColumnPath columnPath) {
ColumnDescriptor cd = columnsAccordingToSchema.get(columnPath);
ColumnDescriptor cd = isCaseSensitive ? columnsAccordingToSchema.get(columnPath):
columnsAccordingToSchema.get(ColumnPath.getLowerCase(columnPath.toArray()));
checkArgument(cd != null, "Column " + columnPath + " was not found in schema!");
return cd;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.apache.parquet.io.InvalidRecordException;

Expand All @@ -38,7 +39,6 @@ public class GroupType extends Type {

private final List<Type> fields;
private final Map<String, Integer> indexByName;

/**
* @param repetition OPTIONAL, REPEATED, REQUIRED
* @param name the name of the field
Expand All @@ -48,6 +48,10 @@ public GroupType(Repetition repetition, String name, List<Type> fields) {
this(repetition, name, null, fields, null);
}

public GroupType(Repetition repetition, String name, List<Type> fields, boolean isCaseSensitive) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should not be a property of the type itself, but a property of the selection

this(repetition, name, null, fields, null, isCaseSensitive);
}

/**
* @param repetition OPTIONAL, REPEATED, REQUIRED
* @param name the name of the field
Expand Down Expand Up @@ -87,12 +91,17 @@ public GroupType(Repetition repetition, String name, OriginalType originalType,
* @param id the id of the field
*/
GroupType(Repetition repetition, String name, OriginalType originalType, List<Type> fields, ID id) {
this(repetition, name, originalType, fields, id, true);
}

GroupType(Repetition repetition, String name, OriginalType originalType, List<Type> fields, ID id, boolean isCaseSensitive) {
super(name, repetition, originalType, id);
if (fields.isEmpty()) {
throw new InvalidSchemaException("A group type can not be empty. Parquet does not support empty group without leaves. Empty group: " + name);
}

this.fields = fields;
this.indexByName = new HashMap<String, Integer>();
this.indexByName = isCaseSensitive? new HashMap<String, Integer>() : new TreeMap<String, Integer>(String.CASE_INSENSITIVE_ORDER);
for (int i = 0; i < fields.size(); i++) {
indexByName.put(fields.get(i).getName(), i);
}
Expand Down Expand Up @@ -391,4 +400,16 @@ List<Type> mergeFields(GroupType toMerge, boolean strict) {
}
return newFields;
}

public List<Type> getContainedFields(GroupType base) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make this method optionally case sensitive.

List<Type> newFields = new ArrayList<Type>();
List<Type> baseFields = base.getFields();
for (Type type : this.getFields()) {
if (base.containsField(type.getName())) {
newFields.add(type);
}
}
return newFields;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ public MessageType(String name, List<Type> fields) {
super(Repetition.REPEATED, name, fields);
}

public MessageType(String name, List<Type> fields, boolean isCaseSensitive) {
super(Repetition.REPEATED, name, fields, isCaseSensitive);
}

public MessageType(MessageType base, boolean isCaseSensitive) {
super(Repetition.REPEATED, base.getName(), base.getFields(), isCaseSensitive);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment, I don't think the property belong here.

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ protected ColumnPath toCanonical(ColumnPath value) {
}
};

private static Canonicalizer<ColumnPath> pathsLowerCase = new Canonicalizer<ColumnPath>() {
@Override
protected ColumnPath toCanonical(ColumnPath value) {
String[] path = new String[value.p.length];
for (int i = 0; i < value.p.length; i++) {
path[i] = value.p[i].toLowerCase().intern();
}
return new ColumnPath(path);
}
};

public static ColumnPath fromDotString(String path) {
checkNotNull(path, "path");
return get(path.split("\\."));
Expand All @@ -48,6 +59,10 @@ public static ColumnPath get(String... path){
return paths.canonicalize(new ColumnPath(path));
}

public static ColumnPath getLowerCase(String... path){
return pathsLowerCase.canonicalize(new ColumnPath(path));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the toLowerCase can be applied to path directly and then call the existing get(). There's no need for a separate Canonicalizer


private final String[] p;

private ColumnPath(String[] path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,34 @@
public class RowGroupFilter implements Visitor<List<BlockMetaData>> {
private final List<BlockMetaData> blocks;
private final MessageType schema;
private final boolean isCaseSensitive;

public static List<BlockMetaData> filterRowGroups(Filter filter, List<BlockMetaData> blocks, MessageType schema) {
return filterRowGroups(filter, blocks, schema, true);
}

public static List<BlockMetaData> filterRowGroups(Filter filter, List<BlockMetaData> blocks, MessageType schema,
boolean isCaseSensitive) {
checkNotNull(filter, "filter");
return filter.accept(new RowGroupFilter(blocks, schema));
return filter.accept(new RowGroupFilter(blocks, schema, isCaseSensitive));
}

private RowGroupFilter(List<BlockMetaData> blocks, MessageType schema) {
this(blocks, schema, true);
}

private RowGroupFilter(List<BlockMetaData> blocks, MessageType schema, boolean isCaseSensitive) {
this.blocks = checkNotNull(blocks, "blocks");
this.schema = checkNotNull(schema, "schema");
this.isCaseSensitive = isCaseSensitive;
}

@Override
public List<BlockMetaData> visit(FilterCompat.FilterPredicateCompat filterPredicateCompat) {
FilterPredicate filterPredicate = filterPredicateCompat.getFilterPredicate();

// check that the schema of the filter matches the schema of the file
SchemaCompatibilityValidator.validate(filterPredicate, schema);
SchemaCompatibilityValidator.validate(filterPredicate, schema, isCaseSensitive);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the predicate has the isCaseSensitive field it can be applied here.


List<BlockMetaData> filteredBlocks = new ArrayList<BlockMetaData>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,23 @@ public void initialize(MessageType fileSchema,
// initialize a ReadContext for this file
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
configuration, toSetMultiMap(fileMetadata), fileSchema));
this.requestedSchema = readContext.getRequestedSchema();
MessageType request = readContext.getRequestedSchema();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether the requestedContext is caseSensitive can com from the readContext. I don't think we need a Hadoop property for this.

boolean isCaseSensitive = configuration.getBoolean(ParquetInputFormat.CASE_SENSITIVITY, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer we don't use configuration deep in the code for such things.
The ReadSupport implementation can pass thing through the ReadContext. You can add a field there


if (!isCaseSensitive) {
MessageType base = new MessageType(request, false);
this.requestedSchema = new MessageType(request.getName(), fileSchema.getContainedFields(base));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of adding isCaseSensitive in base, it can be passed to getContainedField

} else {
this.requestedSchema = request;
}

this.fileSchema = fileSchema;
this.file = file;
this.columnCount = requestedSchema.getPaths().size();
this.recordConverter = readSupport.prepareForRead(
configuration, fileMetadata, fileSchema, readContext);
this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);

List<ColumnDescriptor> columns = requestedSchema.getColumns();
reader = new ParquetFileReader(configuration, file, blocks, columns);
for (BlockMetaData block : blocks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
*/
public static final String FILTER_PREDICATE = "parquet.private.read.filter.predicate";

/**
* key to configure case sensitivity
*/
public static final String CASE_SENSITIVITY = "parquet.read.case.sensitivity";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a property in that case.


/**
* key to turn on or off task side metadata loading (default true)
* if true then metadata is read on the task side and some tasks may finish immediately.
Expand All @@ -126,10 +131,18 @@ public static void setTaskSideMetaData(Job job, boolean taskSideMetadata) {
ContextUtil.getConfiguration(job).setBoolean(TASK_SIDE_METADATA, taskSideMetadata);
}

public static void setCaseSensitiveRead(Job job, boolean isCaseSensitive) {
ContextUtil.getConfiguration(job).setBoolean(CASE_SENSITIVITY, isCaseSensitive);
}

public static boolean isTaskSideMetaData(Configuration configuration) {
return configuration.getBoolean(TASK_SIDE_METADATA, TRUE);
}

public static boolean isCaseSensitiveRead(Configuration configuration) {
return configuration.getBoolean(CASE_SENSITIVITY, TRUE);
}

public static void setReadSupportClass(Job job, Class<?> readSupportClass) {
ContextUtil.getConfiguration(job).set(READ_SUPPORT_CLASS, readSupportClass.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.parquet.hadoop;

import static java.lang.Boolean.TRUE;
import static org.apache.parquet.Preconditions.checkNotNull;

import java.io.Closeable;
Expand Down Expand Up @@ -144,8 +145,9 @@ private void initReader() throws IOException {

MessageType fileSchema = footer.getParquetMetadata().getFileMetaData().getSchema();

boolean isCaseSensitive = conf.getBoolean(ParquetInputFormat.CASE_SENSITIVITY, TRUE);
List<BlockMetaData> filteredBlocks = RowGroupFilter.filterRowGroups(
filter, blocks, fileSchema);
filter, blocks, fileSchema, isCaseSensitive);

reader = new InternalParquetRecordReader<T>(readSupport, filter);
reader.initialize(fileSchema,
Expand Down