diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java index 6f4dcf55e..fcbb0a62f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java @@ -503,7 +503,7 @@ private void notifyLifecycleBeansEx(GridLifecycleEventType evt) { catch (Throwable e) { U.error(log, "Failed to notify lifecycle bean (safely ignored) [evt=" + evt + ", gridName=" + gridName + ']', e); - + if (e instanceof Error) throw (Error)e; } @@ -569,6 +569,7 @@ public void start(final GridConfiguration cfg, @Nullable ExecutorService utility // Ack various information. ackAsciiLogo(); + ackNodeId(); ackConfigUrl(); ackDaemon(); ackOsInfo(); @@ -1740,6 +1741,17 @@ private void ackAsciiLogo() { } } + /** + * + */ + private void ackNodeId() { + assert log != null; + + U.quietAndInfo(log, + "Local node information [id=" + configuration().getNodeId() + + (F.isEmpty(gridName) ? "" : ", grid=" + gridName) + ']'); + } + /** * Prints start info. * @@ -1925,7 +1937,7 @@ else if (state == STARTING) errOnStop = true; U.error(log, "Failed to pre-stop processor: " + comp, e); - + if (e instanceof Error) throw e; } @@ -2010,7 +2022,7 @@ else if (state == STARTING) errOnStop = true; U.error(log, "Failed to stop component (ignoring): " + comp, e); - + if (e instanceof Error) throw (Error)e; } diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java index 0fc44fb96..aa8eb05e7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java @@ -692,6 +692,10 @@ private void cleanup(GridCacheConfiguration cfg, @Nullable Object rsrc, boolean GridDataLoaderProcessor.SkipStoreBooleanFlagAddedMessageConverter665.class, GridDataLoaderProcessor.SKIP_STORE_SINCE_VER); + ctx.versionConverter().registerLocal(GridNearTxFinishResponse.class, + GridDhtTransactionalCacheAdapter.NearFinishResponse667Converter.class, + GridDhtTransactionalCacheAdapter.DHT_VERSION_IN_FINISH_RESPONSE_SINCE); + GridDeploymentMode depMode = ctx.config().getDeploymentMode(); if (!F.isEmpty(ctx.config().getCacheConfiguration())) { diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index dd68c7b2c..744b9bd76 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -15,8 +15,11 @@ import org.gridgain.grid.kernal.processors.cache.distributed.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.preloader.*; import org.gridgain.grid.kernal.processors.cache.distributed.near.*; +import org.gridgain.grid.kernal.processors.version.*; import org.gridgain.grid.lang.*; +import org.gridgain.grid.product.*; import org.gridgain.grid.util.*; +import org.gridgain.grid.util.direct.*; import org.gridgain.grid.util.future.*; import org.gridgain.grid.util.lang.*; import org.gridgain.grid.util.typedef.*; @@ -24,6 +27,7 @@ import org.jetbrains.annotations.*; import java.io.*; +import java.nio.*; import java.util.*; import static org.gridgain.grid.cache.GridCacheTxConcurrency.*; @@ -40,6 +44,9 @@ public abstract class GridDhtTransactionalCacheAdapter extends GridDhtCach /** */ private static final long serialVersionUID = 0L; + /** */ + public static final GridProductVersion DHT_VERSION_IN_FINISH_RESPONSE_SINCE = GridProductVersion.fromString("6.6.7"); + /** * Empty constructor required for {@link Externalizable}. */ @@ -312,7 +319,7 @@ public GridFuture rollbackTx(UUID nodeId, GridNearTxFinishRequest res = new GridNearTxFinishResponse<>(req.version(), req.threadId(), req.futureId(), - req.miniId(), new GridException("Transaction has been already completed.")); + req.miniId(), req.version(), new GridException("Transaction has been already completed.")); try { ctx.io().send(nodeId, res); @@ -331,7 +338,7 @@ public GridFuture rollbackTx(UUID nodeId, GridNearTxFinishRequest req) { } catch (Throwable e) { U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", res=" + res + ']', e); - - throw e; + + throw e; } } } @@ -2410,4 +2417,38 @@ private void obsoleteNearEntry(K key, GridCacheVersion ver) { if (nearEntry != null) nearEntry.markObsolete(ver); } + + /** + * Converter for DHT version added in near tx finish response. + */ + public static class NearFinishResponse667Converter extends GridVersionConverter { + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: { + if (!commState.putCacheVersion(null)) + return false; + + commState.idx++; + } + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: + if (commState.getCacheVersion() == GridTcpCommunicationMessageAdapter.CACHE_VER_NOT_READ) + return false; + } + + return true; + } + } } diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java index 0fb6146ee..aeff7479d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -616,7 +616,7 @@ public GridFuture> prepareAsync(@Nullable Iterable res = new GridNearTxFinishResponse<>(nearXidVer, threadId, nearFinFutId, - nearFinMiniId, err); + nearFinMiniId, xidVer, err); try { cctx.io().send(nearNodeId, res); diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index b7986619e..59456777a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -865,6 +865,9 @@ private void processPrepareResponse(UUID nodeId, GridNearTxPrepareResponse * @param res Response. */ private void processFinishResponse(UUID nodeId, GridNearTxFinishResponse res) { + if (res.dhtVersion() != null) + ctx.versions().onReceived(nodeId, res.dhtVersion()); + ctx.tm().onFinishedRemote(nodeId, res.threadId()); GridDhtColocatedTxFinishFuture fut = (GridDhtColocatedTxFinishFuture)ctx.mvcc() diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java index 9cc5aed11..1f5681af3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -618,6 +618,9 @@ private void processPrepareResponse(UUID nodeId, GridNearTxPrepareResponse * @param res Response. */ private void processFinishResponse(UUID nodeId, GridNearTxFinishResponse res) { + if (res.dhtVersion() != null) + ctx.versions().onReceived(nodeId, res.dhtVersion()); + ctx.tm().onFinishedRemote(nodeId, res.threadId()); GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().future( diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishResponse.java index 5f27d4369..86fede3bc 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishResponse.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishResponse.java @@ -40,6 +40,10 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishRespo /** Near tx thread ID. */ private long nearThreadId; + /** DHT Version. */ + @GridDirectVersion(1) + private GridCacheVersion dhtVer; + /** * Empty constructor required by {@link Externalizable}. */ @@ -55,7 +59,7 @@ public GridNearTxFinishResponse() { * @param err Error. */ public GridNearTxFinishResponse(GridCacheVersion xid, long nearThreadId, GridUuid futId, GridUuid miniId, - @Nullable Throwable err) { + GridCacheVersion dhtVer, @Nullable Throwable err) { super(xid, futId); assert miniId != null; @@ -63,6 +67,7 @@ public GridNearTxFinishResponse(GridCacheVersion xid, long nearThreadId, GridUui this.nearThreadId = nearThreadId; this.miniId = miniId; this.err = err; + this.dhtVer = dhtVer; } /** @@ -86,6 +91,13 @@ public long threadId() { return nearThreadId; } + /** + * @return DHT version. + */ + public GridCacheVersion dhtVersion() { + return dhtVer; + } + /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheContext ctx) throws GridException { super.prepareMarshal(ctx); @@ -122,6 +134,7 @@ public long threadId() { _clone.errBytes = errBytes; _clone.miniId = miniId; _clone.nearThreadId = nearThreadId; + _clone.dhtVer = dhtVer; } /** {@inheritDoc} */ @@ -158,6 +171,12 @@ public long threadId() { commState.idx++; + case 7: + if (!commState.putCacheVersion(dhtVer)) + return false; + + commState.idx++; + } return true; @@ -200,6 +219,16 @@ public long threadId() { commState.idx++; + case 7: + GridCacheVersion dhtVer0 = commState.getCacheVersion(); + + if (dhtVer0 == CACHE_VER_NOT_READ) + return false; + + dhtVer = dhtVer0; + + commState.idx++; + } return true;