Skip to content

Commit

Permalink
The DependencyTracker makes sure that prefetched DependencyInfos have…
Browse files Browse the repository at this point in the history
… the right partition id when we try to match up query invocations at run time.
  • Loading branch information
apavlo committed May 30, 2013
1 parent 1906946 commit 3bc4fbe
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ public void run(int partition) {
if (trace.val)
LOG.trace(String.format("%s - Prefetch=%s / HasPrefetchFragments=%s",
this.ts, this.prefetch, this.ts.hasPrefetchFragments()));
if (this.prefetch && partition != this.ts.getBasePartition() && this.ts.hasPrefetchFragments()) {
if (this.prefetch &&
this.ts.hasPrefetchFragments() &&
partition != this.ts.getBasePartition() &&
hstore_site.isLocalPartition(partition)) {
if (debug.val)
LOG.debug(String.format("%s - Checking for prefetch queries at partition %d",
this.ts, partition));
Expand Down
21 changes: 16 additions & 5 deletions src/frontend/edu/brown/hstore/txns/DependencyTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ private DependencyInfo getOrCreateDependencyInfo(LocalTransaction ts,
* @param state
* @param round
* @param stmtCounter
* @param partitionId TODO
* @param paramsHash
* @param fragmentId
* @param dependencyId
Expand All @@ -433,6 +434,7 @@ private DependencyInfo getPrefetchDependencyInfo(TransactionState state,
int round,
int stmtCounter,
int stmtIndex,
int partitionId,
int paramsHash,
int fragmentId,
int dependencyId) {
Expand Down Expand Up @@ -462,6 +464,14 @@ private DependencyInfo getPrefetchDependencyInfo(TransactionState state,
dinfo.getParameterSetHash(), paramsHash));
return (null);
}
else if (dinfo.getExpectedPartitions().contains(partitionId) == false) {
if (trace.val)
LOG.trace(String.format("%s - Invalid prefetch query for %s. " +
"Partition mismatch [%d != %d]",
state, TransactionUtil.debugStmtDep(stmtCounter, dependencyId),
partitionId, dinfo.getExpectedPartitions()));
return (null);
}

// IMPORTANT: We have to update this DependencyInfo's output id
// so that the blocked WorkFragment can retrieve it properly when it
Expand Down Expand Up @@ -611,6 +621,7 @@ public boolean addWorkFragment(LocalTransaction ts, WorkFragment.Builder fragmen
int output_dep_id, input_dep_id;
int ignore_ctr = 0;
for (int i = 0; i < num_fragments; i++) {
int partitionId = fragment.getPartitionId();
int fragmentId = fragment.getFragmentId(i);
int stmtCounter = fragment.getStmtCounter(i);
int stmtIndex = fragment.getStmtIndex(i);
Expand All @@ -626,8 +637,8 @@ public boolean addWorkFragment(LocalTransaction ts, WorkFragment.Builder fragmen
// this same query invocation.
if (state.prefetch_ctr > 0) {
dinfo = this.getPrefetchDependencyInfo(state, currentRound,
stmtCounter, stmtIndex, paramsHash,
fragmentId, output_dep_id);
stmtCounter, stmtIndex, partitionId,
paramsHash, fragmentId, output_dep_id);
prefetch = (dinfo != null);

}
Expand Down Expand Up @@ -690,8 +701,8 @@ public boolean addWorkFragment(LocalTransaction ts, WorkFragment.Builder fragmen
// generate this result for us.
if (state.prefetch_ctr > 0) {
dinfo = this.getPrefetchDependencyInfo(state, currentRound,
stmtCounter, stmtIndex, paramsHash,
fragmentId, input_dep_id);
stmtCounter, stmtIndex, partitionId,
paramsHash, fragmentId, input_dep_id);
}
if (dinfo == null) {
dinfo = this.getOrCreateDependencyInfo(ts, state, currentRound,
Expand Down Expand Up @@ -1038,7 +1049,7 @@ public void addPrefetchWorkFragment(LocalTransaction ts, WorkFragment.Builder fr

if (debug.val) {
String msg = String.format("%s - Adding prefetch %s %s at partition %d for %s",
ts, dinfo.getClass().getSimpleName(),
ts, dinfo,
TransactionUtil.debugStmtDep(stmtCounter, output_dep_id), partition,
CatalogUtil.getPlanFragment(catalogContext.catalog, fragment.getFragmentId(i)).fullName());
if (trace.val)
Expand Down

0 comments on commit 3bc4fbe

Please sign in to comment.