Skip to content

Commit

Permalink
Snapshotting system integrated completely
Browse files Browse the repository at this point in the history
  • Loading branch information
jarulraj committed Jan 17, 2014
1 parent 23cfb02 commit 2d16006
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 420 deletions.
24 changes: 9 additions & 15 deletions src/frontend/edu/brown/hstore/txns/DependencyTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ protected void startRound(LocalTransaction ts) {
} // FOR
} // FOR

// CHANGE :: Disable assert
// XXX Disable assert
/*assert(batch_size == state.output_order.size()) :
String.format("%s - Expected %d output dependencies but we queued up %d " +
"[outputOrder=%s / numDependencies=%d]",
Expand Down Expand Up @@ -655,18 +655,15 @@ public boolean addWorkFragment(LocalTransaction ts, WorkFragment.Builder fragmen
state.dependency_ctr++;
// this.addResultDependencyStatement(ts, state, partition, output_dep_id, stmtIndex);

// CHANGE ::
/*
if (trace.val)
if (trace.val)
LOG.trace(String.format("%s - Added new %s %s for PlanFragment %d at partition %d " +
"[depCtr=%d, prefetch=%s]\n%s",
ts, dinfo.getClass().getSimpleName(),
TransactionUtil.debugStmtDep(stmtCounter, output_dep_id),
fragment.getFragmentId(i),
partition,
state.dependency_ctr, prefetch,
dinfo.debug()));
*/
"[depCtr=%d, prefetch=%s]\n%s",
ts, dinfo.getClass().getSimpleName(),
TransactionUtil.debugStmtDep(stmtCounter, output_dep_id),
fragment.getFragmentId(i),
partition,
state.dependency_ctr, prefetch,
dinfo.debug()));

// If this query was prefetched, we need to push its results through the
// the tracker so that it can update counters
Expand Down Expand Up @@ -738,12 +735,9 @@ public boolean addWorkFragment(LocalTransaction ts, WorkFragment.Builder fragmen
}
} // FOR

// CHANGE ::
/*
LOG.trace(String.format("%s - Number of Output Dependencies for StmtCounter #%d: " +
"%d out of %d\n%s",
ts, stmtCounter, output_ctr, dep_ctr, StringUtil.formatMaps(m)));
*/
}
// *********************************** DEBUG ***********************************

Expand Down
7 changes: 4 additions & 3 deletions src/frontend/org/voltdb/SnapshotSaveAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public VoltTable startSnapshotting(String file_path, String file_nonce, byte blo
LOG.trace("Stage 1 : at partition : "+partition_id);

synchronized (SnapshotSiteProcessor.m_taskListsForSites) {
// CHANGE :: Fetch work for this partition
// Fetch work for this partition
int index = partition_id - lowest_partition_id;
final Deque<SnapshotTableTask> m_taskList = SnapshotSiteProcessor.m_taskListsForSites.get(index);

Expand Down Expand Up @@ -331,15 +331,16 @@ public SnapshotRegistry.Snapshot.Table update(
int partition_id = context.getPartitionExecutor().getPartitionId();
int index = partition_id - lowest_partition_id;

// CHANGE :: Each partition gets a task
// Each partition gets a partitioned task
for (SnapshotTableTask t : partitionedSnapshotTasks) {
SnapshotSiteProcessor.m_taskListsForSites.get(index).offer(t);
}

//for (int ii = 0; ii < numLocalSites && !partitionedSnapshotTasks.isEmpty(); ii++) {
// SnapshotSiteProcessor.m_taskListsForSites.get(ii).addAll(partitionedSnapshotTasks);
//}


// Each partition gets a replicated task
//int siteIndex = 0;
for (SnapshotTableTask t : replicatedSnapshotTasks) {
//SnapshotSiteProcessor.m_taskListsForSites.get(siteIndex++ % numLocalSites).offer(t);
Expand Down
2 changes: 0 additions & 2 deletions src/frontend/org/voltdb/SnapshotSiteProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,7 @@ public class SnapshotSiteProcessor {
* and does any potential snapshot work with that buffer
*/
private final Runnable m_onPotentialSnapshotWork;


// CHANGE :: Stuff needed for multiple partitions on same site
/**
* finish only after digest written
*/
Expand Down
9 changes: 2 additions & 7 deletions src/frontend/org/voltdb/sysprocs/SnapshotRestore.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,9 @@ private static synchronized void initializeTableSaveFiles(String filePath, Strin
for (int originalHostId : originalHostIds) {
final File f = getSaveFileForPartitionedTable(filePath, fileNonce, tableName, originalHostId, siteId, partitionId);

// CHANGE :: num local sites
Host catalog_host = context.getHost();
Collection<Site> catalog_sites = CatalogUtil.getSitesForHost(catalog_host);
System.out.println("---");


m_saveFiles.offer(getTableSaveFile(f, catalog_sites.size() * 4, relevantPartitionIds));
assert (m_saveFiles.peekLast().getCompleted());
}
Expand Down Expand Up @@ -408,7 +406,7 @@ public DependencySet executePlanFragment(Long txn_id, Map<Integer, List<VoltTabl
int dependency_id = (Integer) paramsA[3];
int allowExport = (Integer) paramsA[4];

// CHANGE :: Localized Version
// Using Localized Version
VoltTable result = performLoadPartitionedTable(table_name, originalHosts, relevantPartitions, context, allowExport, ts);

// Distributed Version - Invokes another round of plan fragments
Expand Down Expand Up @@ -492,7 +490,6 @@ public VoltTable[] run(String path, String nonce, long allowExport) throws VoltA

ClusterSaveFileState savefile_state = null;
try {
// CHANGE :: Need ExecutionContext
savefile_state = new ClusterSaveFileState(savefile_data[0], execution_context, (int) allowExport);
} catch (IOException e) {
throw new VoltAbortException(e.getMessage());
Expand Down Expand Up @@ -751,7 +748,6 @@ private VoltTable performDistributeReplicatedTable(String tableName, int siteId,
int result_dependency_id = TableSaveFileState.getNextDependencyId();
pfs[0] = new SynthesizedPlanFragment();
pfs[0].fragmentId = SysProcFragmentId.PF_restoreSendReplicatedTable;
// CHANGE ::
// XXX pfs[0].siteId = siteId;
pfs[0].destPartitionId = siteId;
pfs[0].outputDependencyIds = new int[] { result_dependency_id };
Expand Down Expand Up @@ -974,7 +970,6 @@ private VoltTable performDistributePartitionedTable(String tableName, int origin
dependencyIds[pfs_index] = TableSaveFileState.getNextDependencyId();
pfs[pfs_index] = new SynthesizedPlanFragment();
pfs[pfs_index].fragmentId = SysProcFragmentId.PF_restoreSendPartitionedTable;
// CHANGE ::
// XXX pfs[pfs_index].siteId = site_id;
pfs[pfs_index].destPartitionId = site_id;
pfs[pfs_index].multipartition = false;
Expand Down
5 changes: 2 additions & 3 deletions src/frontend/org/voltdb/sysprocs/SnapshotRestoreLocal.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ private SynthesizedPlanFragment[] createNonReplicatedPlan(LocalTransaction ts, T
pf.fragmentId = SysProcFragmentId.PF_SRLloadDistribute;
pf.inputDependencyIds = new int[] {};
pf.outputDependencyIds = new int[] { (int) DEP_SRLdistribute };
// CHANGE :: Spread the task across all partitions
// Spread the task across all partitions
pf.multipartition = true;
pf.nonExecSites = false;
pf.destPartitionId = partition; // partitionsToSites[i - 1];
Expand Down Expand Up @@ -442,7 +442,7 @@ public DependencySet executePlanFragment(Long txn_id, Map<Integer, List<VoltTabl
LOG.trace("RelevantPartitions :"+pset.size());
LOG.trace("loadDistribute at host :" + catalog_host.getId());

// CHANGE :: Construct table again if it is not replicated
// Construct table again if it is not replicated
// As by default, it gets constructed only at one partition
if (is_replicated == false) {
// BBContainer is not backed by an array (uses allocateDirect),
Expand Down Expand Up @@ -528,7 +528,6 @@ public VoltTable[] run(String path, String nonce, long allowExport) throws VoltA

ClusterSaveFileState savefile_state = null;
try {
// CHANGE :: Need ExecutionContext
savefile_state = new ClusterSaveFileState(savefile_data[0], execution_context, (int) allowExport);
} catch (IOException e) {
throw new VoltAbortException(e.getMessage());
Expand Down
Loading

0 comments on commit 2d16006

Please sign in to comment.