Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-422 - Fix issue with Predicate push down when lower/upper bounds are set #348

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ public String getMinimum() {

@Override
public String getMaximum() {
/* if we have upper bound is set (in case of truncation)
/* if we have upper bound set (in case of truncation)
getMaximum will be null */
if(isUpperBoundSet) {
return null;
Expand Down
148 changes: 98 additions & 50 deletions java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,18 @@
*/
package org.apache.orc.impl;

import org.apache.orc.CompressionKind;

import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;

import org.apache.orc.OrcFile;
import org.apache.orc.util.BloomFilter;
import org.apache.orc.util.BloomFilterIO;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.hadoop.hive.ql.util.TimestampUtils;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.io.Text;
import org.apache.orc.BooleanColumnStatistics;
import org.apache.orc.ColumnStatistics;
import org.apache.orc.CompressionCodec;
Expand All @@ -42,28 +38,29 @@
import org.apache.orc.DoubleColumnStatistics;
import org.apache.orc.IntegerColumnStatistics;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcProto;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.StringColumnStatistics;
import org.apache.orc.StripeInformation;
import org.apache.orc.TimestampColumnStatistics;
import org.apache.orc.TypeDescription;
import org.apache.orc.util.BloomFilter;
import org.apache.orc.util.BloomFilterIO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.ql.util.TimestampUtils;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;

public class RecordReaderImpl implements RecordReader {
static final Logger LOG = LoggerFactory.getLogger(RecordReaderImpl.class);
Expand Down Expand Up @@ -318,16 +315,30 @@ enum Location {
* @param <T> the type of the comparision
* @return the location of the point
*/
static <T> Location compareToRange(Comparable<T> point, T min, T max) {
int minCompare = point.compareTo(min);
static <T> Location compareToRange(Comparable<T> point, T min, T max, T lowerBound, T upperBound) {

final boolean isLowerBoundSet = (min == null && lowerBound != null) ? true : false;
final boolean isUpperBoundSet = (max == null && upperBound != null) ? true : false;

final int minCompare = isLowerBoundSet ? point.compareTo(lowerBound) : point.compareTo(min);
if (minCompare < 0) {
return Location.BEFORE;
}

/* since min value is truncated when we have compare=0, it means the predicate string is BEFORE the min value*/
else if (minCompare == 0 && isLowerBoundSet) {
return Location.BEFORE;
} else if (minCompare == 0) {
return Location.MIN;
}
int maxCompare = point.compareTo(max);

int maxCompare = isUpperBoundSet ? point.compareTo(upperBound) : point.compareTo(max);
if (maxCompare > 0) {
return Location.AFTER;
}
/* if upperbound is set then location here will be AFTER */
else if (maxCompare == 0 && isUpperBoundSet) {
return Location.AFTER;
} else if (maxCompare == 0) {
return Location.MAX;
}
Expand Down Expand Up @@ -359,7 +370,7 @@ static Object getMax(ColumnStatistics index, boolean useUTCTimestamp) {
} else if (index instanceof DoubleColumnStatistics) {
return ((DoubleColumnStatistics) index).getMaximum();
} else if (index instanceof StringColumnStatistics) {
return ((StringColumnStatistics) index).getMaximum();
return ((StringColumnStatistics) index).getUpperBound();
} else if (index instanceof DateColumnStatistics) {
return ((DateColumnStatistics) index).getMaximum();
} else if (index instanceof DecimalColumnStatistics) {
Expand Down Expand Up @@ -406,7 +417,7 @@ static Object getMin(ColumnStatistics index, boolean useUTCTimestamp) {
} else if (index instanceof DoubleColumnStatistics) {
return ((DoubleColumnStatistics) index).getMinimum();
} else if (index instanceof StringColumnStatistics) {
return ((StringColumnStatistics) index).getMinimum();
return ((StringColumnStatistics) index).getLowerBound();
} else if (index instanceof DateColumnStatistics) {
return ((DateColumnStatistics) index).getMinimum();
} else if (index instanceof DecimalColumnStatistics) {
Expand Down Expand Up @@ -464,6 +475,7 @@ static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto,
* @return the set of truth values that may be returned for the given
* predicate.
*/

static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto,
PredicateLeaf predicate,
OrcProto.Stream.Kind kind,
Expand All @@ -490,9 +502,21 @@ static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto,
return TruthValue.YES_NO_NULL;
}
}

String lowerBound = null;
String upperBound = null;

if(cs instanceof StringColumnStatistics) {
lowerBound = ((StringColumnStatistics) cs).getLowerBound();
minValue = ((StringColumnStatistics) cs).getMinimum();

upperBound = ((StringColumnStatistics) cs).getUpperBound();
maxValue = ((StringColumnStatistics) cs).getMaximum();
}

return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull(),
BloomFilterIO.deserialize(kind, encoding, writerVersion, type, bloomFilter),
useUTCTimestamp);
useUTCTimestamp, lowerBound, upperBound);
}

/**
Expand Down Expand Up @@ -527,13 +551,26 @@ public static TruthValue evaluatePredicate(ColumnStatistics stats,
boolean useUTCTimestamp) {
Object minValue = getMin(stats, useUTCTimestamp);
Object maxValue = getMax(stats, useUTCTimestamp);
return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter, useUTCTimestamp);

String lowerBound = null;
String upperBound = null;

if(stats instanceof StringColumnStatistics) {
lowerBound = ((StringColumnStatistics) stats).getLowerBound();
minValue = ((StringColumnStatistics) stats).getMinimum();

upperBound = ((StringColumnStatistics) stats).getUpperBound();
maxValue = ((StringColumnStatistics) stats).getMaximum();
}

return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter, useUTCTimestamp, lowerBound, upperBound);
}

static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min,
Object max, boolean hasNull, BloomFilter bloomFilter, boolean useUTCTimestamp) {
Object max, boolean hasNull, BloomFilter bloomFilter,
boolean useUTCTimestamp, Object lowerBound, Object upperBound) {
// if we didn't have any values, everything must have been null
if (min == null) {
if (min == null && lowerBound == null) {
if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
return TruthValue.YES;
} else {
Expand All @@ -543,14 +580,18 @@ static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min,
return TruthValue.YES_NO_NULL;
}

if(max == UNKNOWN_VALUE) {
return TruthValue.YES_NO;
}

TruthValue result;
Object baseObj = predicate.getLiteral();
// Predicate object and stats objects are converted to the type of the predicate object.
Object minValue = getBaseObjectForComparison(predicate.getType(), min);
Object maxValue = getBaseObjectForComparison(predicate.getType(), max);
Object predObj = getBaseObjectForComparison(predicate.getType(), baseObj);

result = evaluatePredicateMinMax(predicate, predObj, minValue, maxValue, hasNull);
result = evaluatePredicateMinMax(predicate, predObj, minValue, maxValue, hasNull, lowerBound, upperBound);
if (shouldEvaluateBloomFilter(predicate, result, bloomFilter)) {
return evaluatePredicateBloomFilter(predicate, predObj, bloomFilter, hasNull, useUTCTimestamp);
} else {
Expand All @@ -577,28 +618,30 @@ private static boolean shouldEvaluateBloomFilter(PredicateLeaf predicate,
private static TruthValue evaluatePredicateMinMax(PredicateLeaf predicate, Object predObj,
Object minValue,
Object maxValue,
boolean hasNull) {
boolean hasNull,
moresandeep marked this conversation as resolved.
Show resolved Hide resolved
Object lowerBound,
Object upperBound) {
Location loc;

switch (predicate.getOperator()) {
case NULL_SAFE_EQUALS:
loc = compareToRange((Comparable) predObj, minValue, maxValue);
loc = compareToRange((Comparable) predObj, minValue, maxValue, lowerBound, upperBound);
if (loc == Location.BEFORE || loc == Location.AFTER) {
return TruthValue.NO;
} else {
return TruthValue.YES_NO;
}
case EQUALS:
loc = compareToRange((Comparable) predObj, minValue, maxValue);
if (minValue.equals(maxValue) && loc == Location.MIN) {
loc = compareToRange((Comparable) predObj, minValue, maxValue, lowerBound, upperBound);
if (minValue != null && minValue.equals(maxValue) && loc == Location.MIN) {
return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
} else if (loc == Location.BEFORE || loc == Location.AFTER) {
return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
} else {
return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
}
case LESS_THAN:
loc = compareToRange((Comparable) predObj, minValue, maxValue);
loc = compareToRange((Comparable) predObj, minValue, maxValue, lowerBound, upperBound);
if (loc == Location.AFTER) {
return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
} else if (loc == Location.BEFORE || loc == Location.MIN) {
Expand All @@ -607,7 +650,7 @@ private static TruthValue evaluatePredicateMinMax(PredicateLeaf predicate, Objec
return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
}
case LESS_THAN_EQUALS:
loc = compareToRange((Comparable) predObj, minValue, maxValue);
loc = compareToRange((Comparable) predObj, minValue, maxValue, lowerBound, upperBound);
if (loc == Location.AFTER || loc == Location.MAX) {
return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
} else if (loc == Location.BEFORE) {
Expand All @@ -616,12 +659,17 @@ private static TruthValue evaluatePredicateMinMax(PredicateLeaf predicate, Objec
return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
}
case IN:
if (minValue.equals(maxValue)) {
boolean minEqualsMax = predicate.getType()
.equals(PredicateLeaf.Type.STRING) ?
lowerBound.equals(upperBound) :
minValue.equals(maxValue);

if (minEqualsMax) {
// for a single value, look through to see if that value is in the
// set
for (Object arg : predicate.getLiteralList()) {
predObj = getBaseObjectForComparison(predicate.getType(), arg);
loc = compareToRange((Comparable) predObj, minValue, maxValue);
loc = compareToRange((Comparable) predObj, minValue, maxValue, lowerBound, upperBound);
if (loc == Location.MIN) {
return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
}
Expand All @@ -631,7 +679,7 @@ private static TruthValue evaluatePredicateMinMax(PredicateLeaf predicate, Objec
// are all of the values outside of the range?
for (Object arg : predicate.getLiteralList()) {
predObj = getBaseObjectForComparison(predicate.getType(), arg);
loc = compareToRange((Comparable) predObj, minValue, maxValue);
loc = compareToRange((Comparable) predObj, minValue, maxValue, lowerBound, upperBound);
if (loc == Location.MIN || loc == Location.MIDDLE ||
loc == Location.MAX) {
return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
Expand All @@ -646,10 +694,10 @@ private static TruthValue evaluatePredicateMinMax(PredicateLeaf predicate, Objec
}
Object predObj1 = getBaseObjectForComparison(predicate.getType(), args.get(0));

loc = compareToRange((Comparable) predObj1, minValue, maxValue);
loc = compareToRange((Comparable) predObj1, minValue, maxValue, lowerBound, upperBound);
if (loc == Location.BEFORE || loc == Location.MIN) {
Object predObj2 = getBaseObjectForComparison(predicate.getType(), args.get(1));
Location loc2 = compareToRange((Comparable) predObj2, minValue, maxValue);
Location loc2 = compareToRange((Comparable) predObj2, minValue, maxValue, lowerBound, upperBound);
if (loc2 == Location.AFTER || loc2 == Location.MAX) {
return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
} else if (loc2 == Location.BEFORE) {
Expand Down
Loading