From ba759a5c2beeb45bf8e372d47fe7db82a471be13 Mon Sep 17 00:00:00 2001 From: Atreyee Maiti Date: Tue, 22 Apr 2014 14:56:53 -0400 Subject: [PATCH] multi partition on same site works now. had to do the merge before hand --- .../edu/brown/hstore/AntiCacheManager.java | 19 +++++++++++-------- .../edu/brown/hstore/HStoreCoordinator.java | 9 +++++++++ src/frontend/edu/brown/hstore/HStoreSite.java | 2 ++ .../edu/brown/hstore/PartitionExecutor.java | 5 +++++ src/frontend/org/voltdb/VoltProcedure.java | 3 ++- .../org/voltdb/client/ClientImpl.java | 1 + .../brown/hstore/MockHStoreCoordinator.java | 4 ++-- .../regressionsuites/TestUsersSuite.java | 17 +++++++++++------ 8 files changed, 43 insertions(+), 17 deletions(-) diff --git a/src/frontend/edu/brown/hstore/AntiCacheManager.java b/src/frontend/edu/brown/hstore/AntiCacheManager.java index 3f37295b06..145dfe7d6d 100644 --- a/src/frontend/edu/brown/hstore/AntiCacheManager.java +++ b/src/frontend/edu/brown/hstore/AntiCacheManager.java @@ -326,19 +326,22 @@ protected void processingCallback(QueueEntry next) { // HACK HACK HACK HACK HACK HACK // We need to get a new txnId for ourselves, since the one that we // were given before is now probably too far in the past + if(next.partition != next.ts.getBasePartition()){ + ee.antiCacheMergeBlocks(next.catalog_tbl); + } Long newTxnId = this.hstore_site.getTransactionInitializer().resetTransactionId(next.ts, next.partition); LOG.info("restartin on local"); this.hstore_site.transactionInit(next.ts); }else{ - RemoteTransaction ts = (RemoteTransaction) next.ts; - RpcCallback callback = ts.getUnevictCallback(); - UnevictDataResponse.Builder builder = UnevictDataResponse.newBuilder() - .setSenderSite(this.hstore_site.getSiteId()) - .setTransactionId(oldTxnId) - .setPartitionId(next.partition) - .setStatus(Status.OK); - callback.run(builder.build()); +// RemoteTransaction ts = (RemoteTransaction) next.ts; +// RpcCallback callback = ts.getUnevictCallback(); +// UnevictDataResponse.Builder builder = UnevictDataResponse.newBuilder() +// .setSenderSite(this.hstore_site.getSiteId()) +// .setTransactionId(oldTxnId) +// .setPartitionId(next.partition) +// .setStatus(Status.OK); +// callback.run(builder.build()); } diff --git a/src/frontend/edu/brown/hstore/HStoreCoordinator.java b/src/frontend/edu/brown/hstore/HStoreCoordinator.java index ac7c23e0a4..4287224785 100644 --- a/src/frontend/edu/brown/hstore/HStoreCoordinator.java +++ b/src/frontend/edu/brown/hstore/HStoreCoordinator.java @@ -845,6 +845,14 @@ public void unevictData(RpcController controller, for(int i = 0; i < request.getTupleOffsetsList().size(); i++) tuple_offsets[i] = request.getTupleOffsets(i); hstore_site.getAntiCacheManager().queue(ts, partition, catalog_tbl, block_ids, tuple_offsets); + // TRIAL + RpcCallback callback = ts.getUnevictCallback(); + UnevictDataResponse.Builder builder = UnevictDataResponse.newBuilder() + .setSenderSite(hstore_site.getSiteId()) + .setTransactionId(request.getTransactionId()) + .setPartitionId(partition) + .setStatus(Status.OK); + done.run(builder.build()); } } // END CLASS @@ -1357,6 +1365,7 @@ public boolean sendUnevictDataMessage(int remote_site_id, LocalTransaction txn, try { System.out.println(this.channels[remote_site_id]); this.channels[remote_site_id].unevictData(new ProtoRpcController(), request, this.unevictCallback); + LOG.info("sent message to remote hstore site"); if (trace.val) LOG.trace(String.format("Sent %s to %s", request.getClass().getSimpleName(), diff --git a/src/frontend/edu/brown/hstore/HStoreSite.java b/src/frontend/edu/brown/hstore/HStoreSite.java index 1ab7c0fb34..8f52f8881b 100644 --- a/src/frontend/edu/brown/hstore/HStoreSite.java +++ b/src/frontend/edu/brown/hstore/HStoreSite.java @@ -1917,6 +1917,7 @@ public void transactionStart(LocalTransaction ts) { } // We will want to delete this transaction after we reject it if it is a single-partition txn // Otherwise we will let the normal distributed transaction process clean things up + LOG.info("the reject happened here!!!"); this.transactionReject(ts, status); if (singlePartitioned) this.queueDeleteTransaction(ts.getTransactionId(), status); } @@ -2192,6 +2193,7 @@ public Status transactionRestart(LocalTransaction orig_ts, Status status) { orig_ts, orig_ts.getRestartCounter()); throw new RuntimeException(msg); } else { + LOG.info("the reject happened coz of too many restarts"); this.transactionReject(orig_ts, Status.ABORT_REJECT); return (Status.ABORT_REJECT); } diff --git a/src/frontend/edu/brown/hstore/PartitionExecutor.java b/src/frontend/edu/brown/hstore/PartitionExecutor.java index c53cb33f9f..f08ef0fc78 100644 --- a/src/frontend/edu/brown/hstore/PartitionExecutor.java +++ b/src/frontend/edu/brown/hstore/PartitionExecutor.java @@ -2563,6 +2563,11 @@ private void processWorkFragment(AbstractTransaction ts, WorkFragment fragment, } finally { if (error != null) { // error.printStackTrace(); + if(error instanceof EvictedTupleAccessException){ + EvictedTupleAccessException ex = (EvictedTupleAccessException) error; + System.out.println(ex.tuple_offsets[0]+"tuple offsets##########"); + } + LOG.warn(String.format("%s - Unexpected %s on partition %d", ts, error.getClass().getSimpleName(), this.partitionId), error); // (debug.val ? error : null)); diff --git a/src/frontend/org/voltdb/VoltProcedure.java b/src/frontend/org/voltdb/VoltProcedure.java index 9764a62515..86d5f9e261 100644 --- a/src/frontend/org/voltdb/VoltProcedure.java +++ b/src/frontend/org/voltdb/VoltProcedure.java @@ -564,7 +564,7 @@ public final ClientResponseImpl call(LocalTransaction txnState, Object... paramL try { // ANTI-CACHE TABLE MERGE if (hstore_conf.site.anticache_enable && txnState.hasAntiCacheMergeTable()) { - LOG.debug("Merging blocks for anticache table."); + LOG.info("Merging blocks for anticache table."); if (hstore_conf.site.anticache_profiling) { this.hstore_site.getAntiCacheManager() @@ -576,6 +576,7 @@ public final ClientResponseImpl call(LocalTransaction txnState, Object... paramL // have the logic down below for handling various errors from the EE try { Table catalog_tbl = txnState.getAntiCacheMergeTable(); + System.out.println(catalog_tbl.fullName()+this.partitionId); this.executor.getExecutionEngine().antiCacheMergeBlocks(catalog_tbl); } finally { if (hstore_conf.site.anticache_profiling) { diff --git a/src/frontend/org/voltdb/client/ClientImpl.java b/src/frontend/org/voltdb/client/ClientImpl.java index 014b9a2914..9069162f18 100644 --- a/src/frontend/org/voltdb/client/ClientImpl.java +++ b/src/frontend/org/voltdb/client/ClientImpl.java @@ -245,6 +245,7 @@ else if (m_catalog != null && procName.startsWith("@") == false) { throw new java.io.InterruptedIOException("Interrupted while waiting for response"); } if (cb.getResponse().getStatus() != Status.OK) { + LOG.info("the response failed!!!"); throw new ProcCallException(cb.getResponse(), cb.getResponse().getStatusString(), cb.getResponse().getException()); } // cb.result() throws ProcCallException if procedure failed diff --git a/tests/frontend/edu/brown/hstore/MockHStoreCoordinator.java b/tests/frontend/edu/brown/hstore/MockHStoreCoordinator.java index 5e16576496..d5c24cc0cf 100644 --- a/tests/frontend/edu/brown/hstore/MockHStoreCoordinator.java +++ b/tests/frontend/edu/brown/hstore/MockHStoreCoordinator.java @@ -227,8 +227,8 @@ public void unevictData(RpcController controller, Long oldTxnId = request.getTransactionId(); UnevictDataResponse.Builder builder = UnevictDataResponse.newBuilder() .setSenderSite(hstore_site.getSiteId()) - .setOldTransactionId(oldTxnId) - .setNewTransactionId(oldTxnId+1) // some new id + .setTransactionId(oldTxnId) + .setPartitionId(1) // some id .setStatus(Status.OK); done.run(builder.build()); diff --git a/tests/frontend/org/voltdb/regressionsuites/TestUsersSuite.java b/tests/frontend/org/voltdb/regressionsuites/TestUsersSuite.java index be544a629f..7fdc0da7a3 100644 --- a/tests/frontend/org/voltdb/regressionsuites/TestUsersSuite.java +++ b/tests/frontend/org/voltdb/regressionsuites/TestUsersSuite.java @@ -92,7 +92,7 @@ public void testReadRecords() throws Exception { System.err.println("-------------------------------"); String procName = GetUsers.class.getSimpleName(); - long user1 = 9999; + long user1 = 99999; long user2 = 1; Object params[] = { user1, user2 }; ClientResponse cresponse = client.callProcedure(procName, params); @@ -145,9 +145,14 @@ public static Test suite() { MultiConfigSuiteBuilder builder = new MultiConfigSuiteBuilder(TestUsersSuite.class); builder.setGlobalConfParameter("site.exec_voltdb_procinfo", true); builder.setGlobalConfParameter("site.anticache_enable", true); - builder.setGlobalConfParameter("site.anticache_profiling", true); + builder.setGlobalConfParameter("site.anticache_profiling", false); builder.setGlobalConfParameter("site.anticache_reset", true); builder.setGlobalConfParameter("site.anticache_check_interval", Integer.MAX_VALUE); + builder.setGlobalConfParameter("site.network_startup_wait", 60000); + builder.setGlobalConfParameter("site.coordinator_sync_time", false); + builder.setGlobalConfParameter("site.status_enable", false); + builder.setGlobalConfParameter("site.txn_partition_id_managers", true); + UsersProjectBuilder project = new UsersProjectBuilder(); project.addAllDefaults(); @@ -158,19 +163,19 @@ public static Test suite() { ///////////////////////////////////////////////////////////// // CONFIG #1: 1 Local Site/Partition running on JNI backend ///////////////////////////////////////////////////////////// -/* config = new LocalSingleProcessServer(PREFIX+"-1part.jar", 2, BackendTarget.NATIVE_EE_JNI); + config = new LocalSingleProcessServer(PREFIX+"-1part.jar", 2, BackendTarget.NATIVE_EE_JNI); success = config.compile(project); assert(success); builder.addServerConfig(config); -*/ + //////////////////////////////////////////////////////////// // CONFIG #2: cluster of 2 nodes running 2 site each, one replica //////////////////////////////////////////////////////////// - config = new LocalCluster(PREFIX+"-cluster.jar", 2, 1, 0, BackendTarget.NATIVE_EE_JNI); +/* config = new LocalCluster(PREFIX+"-cluster.jar", 2, 1, 0, BackendTarget.NATIVE_EE_JNI); success = config.compile(project); assert(success); builder.addServerConfig(config); - +*/ return builder; }