Skip to content

Commit

Permalink
The PartitionEstimator now correctly supports ConstantValues when exa…
Browse files Browse the repository at this point in the history
…mining Statement WHERE clauses. Not that I think this occurs that often in queries, but it's good to have...
  • Loading branch information
apavlo committed Jul 7, 2013
1 parent 362976a commit 81b0f02
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 80 deletions.
174 changes: 98 additions & 76 deletions src/frontend/edu/brown/utils/PartitionEstimator.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
import org.voltdb.VoltTableRow;
import org.voltdb.VoltType;
import org.voltdb.catalog.CatalogMap;
import org.voltdb.catalog.CatalogType;
import org.voltdb.catalog.Column;
import org.voltdb.catalog.ConstantValue;
import org.voltdb.catalog.PlanFragment;
import org.voltdb.catalog.ProcParameter;
import org.voltdb.catalog.Procedure;
Expand All @@ -57,6 +59,7 @@
import org.voltdb.plannodes.AbstractPlanNode;
import org.voltdb.types.ExpressionType;
import org.voltdb.types.QueryType;
import org.voltdb.utils.VoltTypeUtil;

import edu.brown.catalog.CatalogKey;
import edu.brown.catalog.CatalogPair;
Expand Down Expand Up @@ -126,9 +129,10 @@ public class PartitionEstimator {
private final Map<String, Set<CacheEntry>> table_cache_xref = new HashMap<String, Set<CacheEntry>>();

/**
* CacheEntry ColumnKey -> StmtParameter Offset Array
* CacheEntry ColumnKey -> Parameter List
* The parameters could be either StmtParameters or ConstantValues
*/
private class CacheEntry extends HashMap<Column, int[]> {
private class CacheEntry extends HashMap<Column, List<CatalogType>> {
private static final long serialVersionUID = 1L;
private final QueryType query_type;
private boolean contains_or = false;
Expand All @@ -148,30 +152,17 @@ public CacheEntry(QueryType query_type) {

/**
* @param key
* @param param_idx
* @param param
* @param catalog_tbls
*/
public void put(Column key, int param_idx, Table... catalog_tbls) {
int params[] = this.get(key);
boolean dirty = true;
public void put(Column key, CatalogType param, Table... catalog_tbls) {
assert(param instanceof StmtParameter || param instanceof ConstantValue);
List<CatalogType> params = this.get(key);
if (params == null) {
params = new int[]{ param_idx };
} else {
for (int idx : params) {
if (idx == param_idx) {
dirty = false;
break;
}
} // FOR

if (dirty) {
int temp[] = new int[params.length + 1];
System.arraycopy(params, 0, temp, 0, params.length);
temp[temp.length-1] = param_idx;
params = temp;
}
params = new ArrayList<CatalogType>();
this.put(key, params);
}
if (dirty) this.put(key, params);
params.add(param);
for (Table catalog_tbl : catalog_tbls) {
this.table_keys.add(CatalogKey.createKey(catalog_tbl));
} // FOR
Expand Down Expand Up @@ -569,11 +560,11 @@ private synchronized void generateCache(final Statement catalog_stmt) throws Exc
}
continue;
}

// Look for predicates with StmtParameters
// Look for predicates with StmtParameters or ConstantValues
for (Table catalog_tbl : frag_tables) {
Column catalog_col = null;
StmtParameter catalog_param = null;
CatalogType catalog_param = null;

// *********************************** DEBUG ***********************************
if (trace.val) {
Expand Down Expand Up @@ -603,14 +594,18 @@ private synchronized void generateCache(final Statement catalog_stmt) throws Exc
}
// *********************************** DEBUG ***********************************

// Column = StmtParameter
if (entry.getFirst().getParent() != null && entry.getFirst().getParent().equals(catalog_tbl) && entry.getSecond() instanceof StmtParameter) {
// Column = (StmtParameter or ConstantValue)
if (entry.getFirst().getParent() != null && entry.getFirst().getParent().equals(catalog_tbl) &&
(entry.getSecond() instanceof StmtParameter || entry.getSecond() instanceof ConstantValue) ) {
catalog_col = (Column) entry.getFirst();
catalog_param = (StmtParameter) entry.getSecond();
// StmtParameter = Column
} else if (entry.getSecond().getParent() != null && entry.getSecond().getParent().equals(catalog_tbl) && entry.getFirst() instanceof StmtParameter) {
catalog_param = entry.getSecond();

}
// (StmtParameter or ConstantValue) = Column
else if (entry.getSecond().getParent() != null && entry.getSecond().getParent().equals(catalog_tbl) &&
(entry.getFirst() instanceof StmtParameter || entry.getFirst() instanceof ConstantValue)) {
catalog_col = (Column) entry.getSecond();
catalog_param = (StmtParameter) entry.getFirst();
catalog_param = entry.getFirst();
}
if (catalog_col != null && catalog_param != null) {
// If this table is a view, then we need to check whether
Expand All @@ -626,8 +621,8 @@ private synchronized void generateCache(final Statement catalog_stmt) throws Exc
CatalogUtil.getDisplayName(catalog_frag),
CatalogUtil.getDisplayName(catalog_col),
CatalogUtil.getDisplayName(catalog_param)));
stmt_cache.put(catalog_col, catalog_param.getIndex(), catalog_tbl);
frag_cache.put(catalog_col, catalog_param.getIndex(), catalog_tbl);
stmt_cache.put(catalog_col, catalog_param, catalog_tbl);
frag_cache.put(catalog_col, catalog_param, catalog_tbl);
}
} // FOR (tables)
if (trace.val)
Expand All @@ -651,11 +646,12 @@ private synchronized void generateCache(final Statement catalog_stmt) throws Exc
// this guy was used against a StmtParameter some where else in the Statement
// If this is the case, then we can substitute that mofo in it's place
if (stmt_cache.containsKey(catalog_col)) {
for (int param_idx : stmt_cache.get(catalog_col)) {
for (CatalogType param : stmt_cache.get(catalog_col)) {
if (trace.val)
LOG.trace("Linking " + CatalogUtil.getDisplayName(other_col) + " to parameter #" + param_idx + " because of " + catalog_col.fullName());
stmt_cache.put(other_col, param_idx, (Table) other_col.getParent());
frag_cache.put(other_col, param_idx, (Table) other_col.getParent());
LOG.trace(String.format("Linking %s to parameter %s because of %s",
other_col.fullName(), param.fullName(), catalog_col.fullName()));
stmt_cache.put(other_col, param, (Table) other_col.getParent());
frag_cache.put(other_col, param, (Table) other_col.getParent());
} // FOR (StmtParameter.Index)
}
} // FOR (Column)
Expand Down Expand Up @@ -698,23 +694,26 @@ private synchronized void generateCache(final Statement catalog_stmt) throws Exc
boolean found = false;
for (CatalogPair entry : update_cset) {
Column catalog_col = null;
StmtParameter catalog_param = null;
CatalogType catalog_param = null;

// For now we only care up look-ups using parameters
if (entry.getFirst() instanceof StmtParameter) {
catalog_param = (StmtParameter) entry.getFirst();
// For now we only care up look-ups using StmtParameters or ConstantValues
if (entry.getFirst() instanceof StmtParameter || entry.getFirst() instanceof ConstantValue) {
catalog_col = (Column) entry.getSecond();
} else if (entry.getSecond() instanceof StmtParameter) {
catalog_param = (StmtParameter) entry.getSecond();
catalog_param = entry.getFirst();
}
else if (entry.getSecond() instanceof StmtParameter || entry.getSecond() instanceof ConstantValue) {
catalog_col = (Column) entry.getFirst();
} else {
catalog_param = entry.getSecond();
}
else {
if (trace.val)
LOG.trace("Skipping entry " + entry + " when examing the update information for " + catalog_tbl);
LOG.trace(String.format("Skipping entry %s when examing the update information for %s",
entry, catalog_tbl));
continue;
}
assert (catalog_col != null);
assert (catalog_param != null);
stmt_cache.put(catalog_col, catalog_param.getIndex(), catalog_tbl);
stmt_cache.put(catalog_col, catalog_param, catalog_tbl);
found = true;
} // FOR
if (trace.val && found)
Expand Down Expand Up @@ -1048,7 +1047,7 @@ public int[] getStatementEstimationParameters(final Statement catalog_stmt) {

int[] all_param_idxs = this.cache_stmtPartitionParameters.get(catalog_stmt);
if (all_param_idxs == null) {
List<Integer> idxs = new ArrayList<Integer>();
List<Integer> param_idxs = new ArrayList<Integer>();

// Assume single-partition
if (catalog_stmt.getHas_singlesited() == false) {
Expand Down Expand Up @@ -1086,12 +1085,14 @@ public int[] getStatementEstimationParameters(final Statement catalog_stmt) {
catalog_frag.fullName(), catalog_tbl.getName(), partition_col.fullName()));
return (null);
} else if (partition_col != null && cache_entry.containsKey(partition_col)) {
for (int idx : cache_entry.get(partition_col)) {
idxs.add(Integer.valueOf(idx));
for (CatalogType param : cache_entry.get(partition_col)) {
if (param instanceof StmtParameter) {
param_idxs.add(((StmtParameter)param).getIndex());
}
} // FOR
}
} // FOR
if (idxs.isEmpty() == false) all_param_idxs = CollectionUtil.toIntArray(idxs);
if (param_idxs.isEmpty() == false) all_param_idxs = CollectionUtil.toIntArray(param_idxs);
} // FOR
this.cache_stmtPartitionParameters.put(catalog_stmt, all_param_idxs);
}
Expand Down Expand Up @@ -1391,13 +1392,13 @@ private void calculatePartitionsForCache(final CacheEntry target,
}
// SINGLE COLUMN PARTITIONING
else {
int param_idxs[] = target.get(catalog_col);
List<CatalogType> param_idxs = target.get(catalog_col);
if (trace.val)
LOG.trace("Param Indexes: " + param_idxs);

// Important: If there is no entry for this partitioning
// column, then we have to broadcast this mofo
if (param_idxs == null || param_idxs.length == 0) {
if (param_idxs == null || param_idxs.isEmpty()) {
if (debug.val)
LOG.debug(String.format("No parameter mapping for %s. " +
"Fragment must be broadcast to all partitions",
Expand Down Expand Up @@ -1450,34 +1451,55 @@ private void calculatePartitionsForCache(final CacheEntry target,
private PartitionSet calculatePartitions(final PartitionSet partitions,
final Object params[],
final boolean is_array[],
final int param_idxs[],
final Column catalog_col) {
// Note that we have to go through all of the mappings from the
// partitioning column
// to parameters. This can occur when the partitioning column is
// referenced multiple times
for (int param_idx : param_idxs) {
// IMPORTANT: Check if the parameter is an array. If it is, then we
// have to
// loop through and get the hash of all of the values
if (is_array[param_idx]) {
int num_elements = Array.getLength(params[param_idx]);
if (trace.val)
LOG.trace("Parameter #" + param_idx + " is an array. Calculating multiple partitions...");
for (int i = 0; i < num_elements; i++) {
Object value = Array.get(params[param_idx], i);
int partition_id = this.hasher.hash(value, catalog_col);
final List<CatalogType> param_idxs,
final Column catalog_col) throws Exception {
// Note that we have to go through all of the mappings from the partitioning column
// to parameters. This can occur when the partitioning column is referenced multiple times
// This allows us to handle complex WHERE clauses and what not.
for (CatalogType param : param_idxs) {
// STATEMENT PARAMETER
// This is the common case
if (param instanceof StmtParameter) {
int param_idx = ((StmtParameter)param).getIndex();

// IMPORTANT: Check if the parameter is an array. If it is, then we
// have to loop through and get the hash of all of the values
if (is_array[param_idx]) {
int num_elements = Array.getLength(params[param_idx]);
if (trace.val)
LOG.trace(CatalogUtil.getDisplayName(catalog_col) + " HASHING PARAM ARRAY[" + param_idx + "][" + i + "]: " + value + " -> " + partition_id);
LOG.trace("Parameter #" + param_idx + " is an array. Calculating multiple partitions...");
for (int i = 0; i < num_elements; i++) {
Object value = Array.get(params[param_idx], i);
int partition_id = this.hasher.hash(value, catalog_col);
if (trace.val)
LOG.trace(String.format("%s HASHING PARAM ARRAY[%d][%d]: %s -> %d",
catalog_col.fullName(), param_idx, i, value, partition_id));
partitions.add(partition_id);
} // FOR

}
// Primitive Value
else {
int partition_id = this.hasher.hash(params[param_idx], catalog_col);
if (trace.val)
LOG.trace(String.format("%s HASHING PARAM [%d]: %s -> %d",
catalog_col.fullName(), param_idx, params[param_idx], partition_id));
partitions.add(partition_id);
} // FOR
// Primitive
} else {
int partition_id = this.hasher.hash(params[param_idx], catalog_col);
if (trace.val)
LOG.trace(CatalogUtil.getDisplayName(catalog_col) + " HASHING PARAM[" + param_idx + "]: " + params[param_idx] + " -> " + partition_id);
}
}
// CONSTANT VALUE
// This is more rare
else if (param instanceof ConstantValue) {
ConstantValue const_param = (ConstantValue)param;
VoltType vtype = VoltType.get(const_param.getType());
Object const_value = VoltTypeUtil.getObjectFromString(vtype, const_param.getValue());
int partition_id = this.hasher.hash(const_value);
partitions.add(partition_id);
}
// BUSTED!
else {
throw new RuntimeException("Unexpected parameter type: " + param.fullName());
}
} // FOR
return (partitions);
}
Expand Down
Loading

0 comments on commit 81b0f02

Please sign in to comment.