diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java index d473841d95..6ec85469f4 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java @@ -59,9 +59,13 @@ public class SchemaCompatibilityValidator implements FilterPredicate.Visitor { public static void validate(FilterPredicate predicate, MessageType schema) { + validate(predicate, schema, true); + } + + public static void validate(FilterPredicate predicate, MessageType schema, boolean isCaseSensitive) { checkNotNull(predicate, "predicate"); checkNotNull(schema, "schema"); - predicate.accept(new SchemaCompatibilityValidator(schema)); + predicate.accept(new SchemaCompatibilityValidator(schema, isCaseSensitive)); } // A map of column name to the type the user supplied for this column. @@ -76,11 +80,14 @@ public static void validate(FilterPredicate predicate, MessageType schema) { // the original type of a column, keyed by path private final Map originalTypes = new HashMap(); - private SchemaCompatibilityValidator(MessageType schema) { + private boolean isCaseSensitive = true; + + private SchemaCompatibilityValidator(MessageType schema, boolean isCaseSensitive) { + this.isCaseSensitive = isCaseSensitive; for (ColumnDescriptor cd : schema.getColumns()) { - ColumnPath columnPath = ColumnPath.get(cd.getPath()); - columnsAccordingToSchema.put(columnPath, cd); + ColumnPath columnPath = isCaseSensitive? ColumnPath.get(cd.getPath()):ColumnPath.getLowerCase(cd.getPath()); + columnsAccordingToSchema.put(columnPath, cd); OriginalType ot = schema.getType(cd.getPath()).getOriginalType(); if (ot != null) { @@ -186,7 +193,8 @@ private > void validateColumn(Column column) { } private ColumnDescriptor getColumnDescriptor(ColumnPath columnPath) { - ColumnDescriptor cd = columnsAccordingToSchema.get(columnPath); + ColumnDescriptor cd = isCaseSensitive ? columnsAccordingToSchema.get(columnPath): + columnsAccordingToSchema.get(ColumnPath.getLowerCase(columnPath.toArray())); checkArgument(cd != null, "Column " + columnPath + " was not found in schema!"); return cd; } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java index bf98bfd21a..7241f0deeb 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import org.apache.parquet.io.InvalidRecordException; @@ -38,7 +39,6 @@ public class GroupType extends Type { private final List fields; private final Map indexByName; - /** * @param repetition OPTIONAL, REPEATED, REQUIRED * @param name the name of the field @@ -48,6 +48,10 @@ public GroupType(Repetition repetition, String name, List fields) { this(repetition, name, null, fields, null); } + public GroupType(Repetition repetition, String name, List fields, boolean isCaseSensitive) { + this(repetition, name, null, fields, null, isCaseSensitive); + } + /** * @param repetition OPTIONAL, REPEATED, REQUIRED * @param name the name of the field @@ -87,12 +91,17 @@ public GroupType(Repetition repetition, String name, OriginalType originalType, * @param id the id of the field */ GroupType(Repetition repetition, String name, OriginalType originalType, List fields, ID id) { + this(repetition, name, originalType, fields, id, true); + } + + GroupType(Repetition repetition, String name, OriginalType originalType, List fields, ID id, boolean isCaseSensitive) { super(name, repetition, originalType, id); if (fields.isEmpty()) { throw new InvalidSchemaException("A group type can not be empty. Parquet does not support empty group without leaves. Empty group: " + name); } + this.fields = fields; - this.indexByName = new HashMap(); + this.indexByName = isCaseSensitive? new HashMap() : new TreeMap(String.CASE_INSENSITIVE_ORDER); for (int i = 0; i < fields.size(); i++) { indexByName.put(fields.get(i).getName(), i); } @@ -391,4 +400,16 @@ List mergeFields(GroupType toMerge, boolean strict) { } return newFields; } + + public List getContainedFields(GroupType base) { + List newFields = new ArrayList(); + List baseFields = base.getFields(); + for (Type type : this.getFields()) { + if (base.containsField(type.getName())) { + newFields.add(type); + } + } + return newFields; + } + } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java index 1e26ed2425..7d1cb6906c 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java @@ -50,6 +50,13 @@ public MessageType(String name, List fields) { super(Repetition.REPEATED, name, fields); } + public MessageType(String name, List fields, boolean isCaseSensitive) { + super(Repetition.REPEATED, name, fields, isCaseSensitive); + } + + public MessageType(MessageType base, boolean isCaseSensitive) { + super(Repetition.REPEATED, base.getName(), base.getFields(), isCaseSensitive); + } /** * {@inheritDoc} */ diff --git a/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/ColumnPath.java b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/ColumnPath.java index 4383b0f2f6..e2193f5c5d 100644 --- a/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/ColumnPath.java +++ b/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/ColumnPath.java @@ -39,6 +39,17 @@ protected ColumnPath toCanonical(ColumnPath value) { } }; + private static Canonicalizer pathsLowerCase = new Canonicalizer() { + @Override + protected ColumnPath toCanonical(ColumnPath value) { + String[] path = new String[value.p.length]; + for (int i = 0; i < value.p.length; i++) { + path[i] = value.p[i].toLowerCase().intern(); + } + return new ColumnPath(path); + } + }; + public static ColumnPath fromDotString(String path) { checkNotNull(path, "path"); return get(path.split("\\.")); @@ -48,6 +59,10 @@ public static ColumnPath get(String... path){ return paths.canonicalize(new ColumnPath(path)); } + public static ColumnPath getLowerCase(String... path){ + return pathsLowerCase.canonicalize(new ColumnPath(path)); + } + private final String[] p; private ColumnPath(String[] path) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java index d85a231a31..7781f8ad44 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java @@ -40,15 +40,26 @@ public class RowGroupFilter implements Visitor> { private final List blocks; private final MessageType schema; + private final boolean isCaseSensitive; public static List filterRowGroups(Filter filter, List blocks, MessageType schema) { + return filterRowGroups(filter, blocks, schema, true); + } + + public static List filterRowGroups(Filter filter, List blocks, MessageType schema, + boolean isCaseSensitive) { checkNotNull(filter, "filter"); - return filter.accept(new RowGroupFilter(blocks, schema)); + return filter.accept(new RowGroupFilter(blocks, schema, isCaseSensitive)); } private RowGroupFilter(List blocks, MessageType schema) { + this(blocks, schema, true); + } + + private RowGroupFilter(List blocks, MessageType schema, boolean isCaseSensitive) { this.blocks = checkNotNull(blocks, "blocks"); this.schema = checkNotNull(schema, "schema"); + this.isCaseSensitive = isCaseSensitive; } @Override @@ -56,7 +67,7 @@ public List visit(FilterCompat.FilterPredicateCompat filterPredic FilterPredicate filterPredicate = filterPredicateCompat.getFilterPredicate(); // check that the schema of the filter matches the schema of the file - SchemaCompatibilityValidator.validate(filterPredicate, schema); + SchemaCompatibilityValidator.validate(filterPredicate, schema, isCaseSensitive); List filteredBlocks = new ArrayList(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index 6ff4eac24d..b0d2051ac0 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -169,13 +169,23 @@ public void initialize(MessageType fileSchema, // initialize a ReadContext for this file ReadSupport.ReadContext readContext = readSupport.init(new InitContext( configuration, toSetMultiMap(fileMetadata), fileSchema)); - this.requestedSchema = readContext.getRequestedSchema(); + MessageType request = readContext.getRequestedSchema(); + boolean isCaseSensitive = configuration.getBoolean(ParquetInputFormat.CASE_SENSITIVITY, true); + + if (!isCaseSensitive) { + MessageType base = new MessageType(request, false); + this.requestedSchema = new MessageType(request.getName(), fileSchema.getContainedFields(base)); + } else { + this.requestedSchema = request; + } + this.fileSchema = fileSchema; this.file = file; this.columnCount = requestedSchema.getPaths().size(); this.recordConverter = readSupport.prepareForRead( configuration, fileMetadata, fileSchema, readContext); this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true); + List columns = requestedSchema.getColumns(); reader = new ParquetFileReader(configuration, file, blocks, columns); for (BlockMetaData block : blocks) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index a4baf98b0a..2c33c52866 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -113,6 +113,11 @@ public class ParquetInputFormat extends FileInputFormat { */ public static final String FILTER_PREDICATE = "parquet.private.read.filter.predicate"; + /** + * key to configure case sensitivity + */ + public static final String CASE_SENSITIVITY = "parquet.read.case.sensitivity"; + /** * key to turn on or off task side metadata loading (default true) * if true then metadata is read on the task side and some tasks may finish immediately. @@ -126,10 +131,18 @@ public static void setTaskSideMetaData(Job job, boolean taskSideMetadata) { ContextUtil.getConfiguration(job).setBoolean(TASK_SIDE_METADATA, taskSideMetadata); } + public static void setCaseSensitiveRead(Job job, boolean isCaseSensitive) { + ContextUtil.getConfiguration(job).setBoolean(CASE_SENSITIVITY, isCaseSensitive); + } + public static boolean isTaskSideMetaData(Configuration configuration) { return configuration.getBoolean(TASK_SIDE_METADATA, TRUE); } + public static boolean isCaseSensitiveRead(Configuration configuration) { + return configuration.getBoolean(CASE_SENSITIVITY, TRUE); + } + public static void setReadSupportClass(Job job, Class readSupportClass) { ContextUtil.getConfiguration(job).set(READ_SUPPORT_CLASS, readSupportClass.getName()); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java index a45dde545c..d33372b503 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java @@ -18,6 +18,7 @@ */ package org.apache.parquet.hadoop; +import static java.lang.Boolean.TRUE; import static org.apache.parquet.Preconditions.checkNotNull; import java.io.Closeable; @@ -144,8 +145,9 @@ private void initReader() throws IOException { MessageType fileSchema = footer.getParquetMetadata().getFileMetaData().getSchema(); + boolean isCaseSensitive = conf.getBoolean(ParquetInputFormat.CASE_SENSITIVITY, TRUE); List filteredBlocks = RowGroupFilter.filterRowGroups( - filter, blocks, fileSchema); + filter, blocks, fileSchema, isCaseSensitive); reader = new InternalParquetRecordReader(readSupport, filter); reader.initialize(fileSchema,