Skip to content

Commit

Permalink
distributed txn queueing for AntiCacheManager. first cut. wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Atreyee committed Apr 22, 2014
1 parent a144bb6 commit d3c5ee1
Show file tree
Hide file tree
Showing 8 changed files with 1,216 additions and 65 deletions.
43 changes: 31 additions & 12 deletions src/frontend/edu/brown/hstore/AntiCacheManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.protobuf.RpcCallback;

import edu.brown.catalog.CatalogUtil;
import edu.brown.hstore.Hstoreservice.HStoreService;
import edu.brown.hstore.Hstoreservice.Status;
import edu.brown.hstore.conf.HStoreConf;
import edu.brown.hstore.internal.UtilityWorkMessage.TableStatsRequestMessage;
Expand Down Expand Up @@ -263,9 +264,15 @@ public Runnable getMemoryMonitorThread() {
protected void processingCallback(QueueEntry next) {
assert(next.ts.isInitialized()) :
String.format("Unexpected uninitialized transaction handle: %s", next);
assert(next.partition == next.ts.getBasePartition()) :
String.format("The base partition for %s is %d but we want to fetch a block for partition %d: %s",
next.ts, next.ts.getBasePartition(), next.partition, next);
if(next.partition != next.ts.getBasePartition()) { // distributed txn
LOG.debug(String.format("The base partition for %s is %d but we want to fetch a block for partition %d: %s",
next.ts, next.ts.getBasePartition(), next.partition, next));
System.out.println(String.format("The base partition for %s is %d but we want to fetch a block for partition %d: %s",
next.ts, next.ts.getBasePartition(), next.partition, next));
// if we are the remote site then we should go ahead and continue processing
// if no then we should simply requeue the entry?

}
LOG.debug("Processing " + next);

// We need to get the EE handle for the partition that this txn
Expand Down Expand Up @@ -339,15 +346,27 @@ protected void removeCallback(QueueEntry next) {
* - The list of blockIds that need to be read in for the table
*/
public boolean queue(LocalTransaction ts, int partition, Table catalog_tbl, short block_ids[], int tuple_offsets[]) {

// if (hstore_conf.site.anticache_profiling) {
assert(ts.getPendingError() != null) :
String.format("Missing original %s for %s", EvictedTupleAccessException.class.getSimpleName(), ts);
assert(ts.getPendingError() instanceof EvictedTupleAccessException) :
String.format("Unexpected error for %s: %s", ts, ts.getPendingError().getClass().getSimpleName());
this.profilers[partition].restarted_txns++;
this.profilers[partition].addEvictedAccess(ts, (EvictedTupleAccessException)ts.getPendingError());
// }
System.out.println(ts.getBasePartition()+"*********"+partition);
if(ts.getBasePartition()!=partition){ // different partition generated the exception
int site_id = hstore_site.getCatalogContext().getSiteIdForPartitionId(partition);
return hstore_site.getCoordinator().sendUnevictDataMessage(site_id);
// should we enqueue the transaction on our side?
// if yes then we need to prevent the queue item from being picked up
// and prevent it from bombing the partition error
// if no then simply return?

// how to take care of LRU?

}

if (hstore_conf.site.anticache_profiling) {
assert(ts.getPendingError() != null) :
String.format("Missing original %s for %s", EvictedTupleAccessException.class.getSimpleName(), ts);
assert(ts.getPendingError() instanceof EvictedTupleAccessException) :
String.format("Unexpected error for %s: %s", ts, ts.getPendingError().getClass().getSimpleName());
this.profilers[partition].restarted_txns++;
this.profilers[partition].addEvictedAccess(ts, (EvictedTupleAccessException)ts.getPendingError());
}

QueueEntry e = new QueueEntry(ts, partition, catalog_tbl, block_ids, tuple_offsets);

Expand Down
65 changes: 65 additions & 0 deletions src/frontend/edu/brown/hstore/HStoreCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import edu.brown.hstore.Hstoreservice.TransactionReduceResponse;
import edu.brown.hstore.Hstoreservice.TransactionWorkRequest;
import edu.brown.hstore.Hstoreservice.TransactionWorkResponse;
import edu.brown.hstore.Hstoreservice.UnevictDataRequest;
import edu.brown.hstore.Hstoreservice.UnevictDataResponse;
import edu.brown.hstore.Hstoreservice.WorkFragment;
import edu.brown.hstore.callbacks.ShutdownPrepareCallback;
import edu.brown.hstore.callbacks.LocalFinishCallback;
Expand Down Expand Up @@ -245,6 +247,25 @@ public void run(HeartbeatResponse response) {
}
};

// ----------------------------------------------------------------------------
// UNEVICT CALLBACK
// ----------------------------------------------------------------------------

private final RpcCallback<UnevictDataResponse> unevictCallback = new RpcCallback<UnevictDataResponse>() {
@Override
public void run(UnevictDataResponse response) {
if (response.getStatus() == Status.OK) {
if (trace.val)
LOG.trace(String.format("%s %s -> %s [%s]",
response.getClass().getSimpleName(),
HStoreThreadManager.formatSiteName(response.getSenderSite()),
HStoreThreadManager.formatSiteName(local_site_id),
response.getStatus()));
assert(response.getSenderSite() != local_site_id);
}
}
};

// ----------------------------------------------------------------------------
// INITIALIZATION
// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -795,6 +816,21 @@ public void transactionDebug(RpcController controller, TransactionDebugRequest r
done.run(response);
}

@Override
public void unevictData(RpcController controller,
UnevictDataRequest request,
RpcCallback<UnevictDataResponse> done) {
if (debug.val)
LOG.debug(String.format("Received %s from HStoreSite %s",
request.getClass().getSimpleName(),
HStoreThreadManager.formatSiteName(request.getSenderSite())));
UnevictDataResponse.Builder builder = UnevictDataResponse.newBuilder()
.setSenderSite(local_site_id)
.setStatus(Status.OK);
done.run(builder.build());

}

} // END CLASS


Expand Down Expand Up @@ -1273,6 +1309,35 @@ public void sendHeartbeat() {
}
} // FOR
}

// ----------------------------------------------------------------------------
// UNEVICT DATA
// ----------------------------------------------------------------------------

/**
* Send a message to a remote site to unevict data
* @return
*/
public boolean sendUnevictDataMessage(int remote_site_id) {
UnevictDataRequest request = UnevictDataRequest.newBuilder()
.setSenderSite(this.local_site_id)
.setLastTransactionId(-1) // FIXME
.build();
try {
this.channels[remote_site_id].unevictData(new ProtoRpcController(), request, this.unevictCallback);
if (trace.val)
LOG.trace(String.format("Sent %s to %s",
request.getClass().getSimpleName(),
HStoreThreadManager.formatSiteName(remote_site_id)));
return true;
} catch (RuntimeException ex) {
// Silently ignore these errors...
ex.printStackTrace();
System.out.println("&&&&&&&&&&");
return false;
}

}

// ----------------------------------------------------------------------------
// TIME SYNCHRONZIATION
Expand Down
Loading

0 comments on commit d3c5ee1

Please sign in to comment.