Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/origin/gg-10229' into gg-6.6.7
Browse files Browse the repository at this point in the history
  • Loading branch information
avinogradov committed May 8, 2015
2 parents 10bf148 + 010fee0 commit 6e7ecb2
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ private void notifyLifecycleBeansEx(GridLifecycleEventType evt) {
catch (Throwable e) {
U.error(log, "Failed to notify lifecycle bean (safely ignored) [evt=" + evt +
", gridName=" + gridName + ']', e);

if (e instanceof Error)
throw (Error)e;
}
Expand Down Expand Up @@ -569,6 +569,7 @@ public void start(final GridConfiguration cfg, @Nullable ExecutorService utility

// Ack various information.
ackAsciiLogo();
ackNodeId();
ackConfigUrl();
ackDaemon();
ackOsInfo();
Expand Down Expand Up @@ -1740,6 +1741,17 @@ private void ackAsciiLogo() {
}
}

/**
*
*/
private void ackNodeId() {
assert log != null;

U.quietAndInfo(log,
"Local node information [id=" + configuration().getNodeId() +
(F.isEmpty(gridName) ? "" : ", grid=" + gridName) + ']');
}

/**
* Prints start info.
*
Expand Down Expand Up @@ -1925,7 +1937,7 @@ else if (state == STARTING)
errOnStop = true;

U.error(log, "Failed to pre-stop processor: " + comp, e);

if (e instanceof Error)
throw e;
}
Expand Down Expand Up @@ -2010,7 +2022,7 @@ else if (state == STARTING)
errOnStop = true;

U.error(log, "Failed to stop component (ignoring): " + comp, e);

if (e instanceof Error)
throw (Error)e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,10 @@ private void cleanup(GridCacheConfiguration cfg, @Nullable Object rsrc, boolean
GridDataLoaderProcessor.SkipStoreBooleanFlagAddedMessageConverter665.class,
GridDataLoaderProcessor.SKIP_STORE_SINCE_VER);

ctx.versionConverter().registerLocal(GridNearTxFinishResponse.class,
GridDhtTransactionalCacheAdapter.NearFinishResponse667Converter.class,
GridDhtTransactionalCacheAdapter.DHT_VERSION_IN_FINISH_RESPONSE_SINCE);

GridDeploymentMode depMode = ctx.config().getDeploymentMode();

if (!F.isEmpty(ctx.config().getCacheConfiguration())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@
import org.gridgain.grid.kernal.processors.cache.distributed.*;
import org.gridgain.grid.kernal.processors.cache.distributed.dht.preloader.*;
import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
import org.gridgain.grid.kernal.processors.version.*;
import org.gridgain.grid.lang.*;
import org.gridgain.grid.product.*;
import org.gridgain.grid.util.*;
import org.gridgain.grid.util.direct.*;
import org.gridgain.grid.util.future.*;
import org.gridgain.grid.util.lang.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;

import java.io.*;
import java.nio.*;
import java.util.*;

import static org.gridgain.grid.cache.GridCacheTxConcurrency.*;
Expand All @@ -40,6 +44,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
/** */
private static final long serialVersionUID = 0L;

/** */
public static final GridProductVersion DHT_VERSION_IN_FINISH_RESPONSE_SINCE = GridProductVersion.fromString("6.6.7");

/**
* Empty constructor required for {@link Externalizable}.
*/
Expand Down Expand Up @@ -312,7 +319,7 @@ public GridFuture<GridCacheTx> rollbackTx(UUID nodeId, GridNearTxFinishRequest<K

// Always send finish response.
GridCacheMessage<K, V> res = new GridNearTxFinishResponse<>(req.version(), req.threadId(), req.futureId(),
req.miniId(), new GridException("Transaction has been already completed."));
req.miniId(), req.version(), new GridException("Transaction has been already completed."));

try {
ctx.io().send(nodeId, res);
Expand All @@ -331,7 +338,7 @@ public GridFuture<GridCacheTx> rollbackTx(UUID nodeId, GridNearTxFinishRequest<K
catch (Throwable e) {
U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", " + "res=" + res + ']', e);

throw e;
throw e;
}

return null;
Expand Down Expand Up @@ -414,7 +421,7 @@ public GridFuture<GridCacheTx> rollbackTx(UUID nodeId, GridNearTxFinishRequest<K
else {
// Always send finish response.
GridCacheMessage<K, V> res = new GridNearTxFinishResponse<>(req.version(), req.threadId(),
req.futureId(), req.miniId(), null);
req.futureId(), req.miniId(), req.version(), null);

try {
ctx.io().send(nodeId, res);
Expand All @@ -431,7 +438,7 @@ public GridFuture<GridCacheTx> rollbackTx(UUID nodeId, GridNearTxFinishRequest<K
"res=" + res + ']', e);
}
catch (Throwable e) {
U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", " +
U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", " +
"res=" + res + ']', e);

throw e;
Expand Down Expand Up @@ -541,7 +548,7 @@ else if (log.isDebugEnabled())
catch (GridException ex) {
U.error(log, "Failed to invalidate transaction: " + tx, ex);
}

if (e instanceof Error)
throw (Error)e;
}
Expand Down Expand Up @@ -1433,8 +1440,8 @@ protected void sendReply(UUID nodeId, GridDhtTxFinishRequest<K, V> req) {
}
catch (Throwable e) {
U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
throw e;

throw e;
}
}
}
Expand Down Expand Up @@ -2410,4 +2417,38 @@ private void obsoleteNearEntry(K key, GridCacheVersion ver) {
if (nearEntry != null)
nearEntry.markObsolete(ver);
}

/**
* Converter for DHT version added in near tx finish response.
*/
public static class NearFinishResponse667Converter extends GridVersionConverter {
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf) {
commState.setBuffer(buf);

switch (commState.idx) {
case 0: {
if (!commState.putCacheVersion(null))
return false;

commState.idx++;
}
}

return true;
}

/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf) {
commState.setBuffer(buf);

switch (commState.idx) {
case 0:
if (commState.getCacheVersion() == GridTcpCommunicationMessageAdapter.CACHE_VER_NOT_READ)
return false;
}

return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ public GridFuture<GridCacheTxEx<K, V>> prepareAsync(@Nullable Iterable<GridCache
}

GridNearTxFinishResponse<K, V> res = new GridNearTxFinishResponse<>(nearXidVer, threadId, nearFinFutId,
nearFinMiniId, err);
nearFinMiniId, xidVer, err);

try {
cctx.io().send(nearNodeId, res);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,9 @@ private void processPrepareResponse(UUID nodeId, GridNearTxPrepareResponse<K, V>
* @param res Response.
*/
private void processFinishResponse(UUID nodeId, GridNearTxFinishResponse<K, V> res) {
if (res.dhtVersion() != null)
ctx.versions().onReceived(nodeId, res.dhtVersion());

ctx.tm().onFinishedRemote(nodeId, res.threadId());

GridDhtColocatedTxFinishFuture<K, V> fut = (GridDhtColocatedTxFinishFuture<K, V>)ctx.mvcc()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,9 @@ private void processPrepareResponse(UUID nodeId, GridNearTxPrepareResponse<K, V>
* @param res Response.
*/
private void processFinishResponse(UUID nodeId, GridNearTxFinishResponse<K, V> res) {
if (res.dhtVersion() != null)
ctx.versions().onReceived(nodeId, res.dhtVersion());

ctx.tm().onFinishedRemote(nodeId, res.threadId());

GridNearTxFinishFuture<K, V> fut = (GridNearTxFinishFuture<K, V>)ctx.mvcc().<GridCacheTx>future(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public class GridNearTxFinishResponse<K, V> extends GridDistributedTxFinishRespo
/** Near tx thread ID. */
private long nearThreadId;

/** DHT Version. */
@GridDirectVersion(1)
private GridCacheVersion dhtVer;

/**
* Empty constructor required by {@link Externalizable}.
*/
Expand All @@ -55,14 +59,15 @@ public GridNearTxFinishResponse() {
* @param err Error.
*/
public GridNearTxFinishResponse(GridCacheVersion xid, long nearThreadId, GridUuid futId, GridUuid miniId,
@Nullable Throwable err) {
GridCacheVersion dhtVer, @Nullable Throwable err) {
super(xid, futId);

assert miniId != null;

this.nearThreadId = nearThreadId;
this.miniId = miniId;
this.err = err;
this.dhtVer = dhtVer;
}

/**
Expand All @@ -86,6 +91,13 @@ public long threadId() {
return nearThreadId;
}

/**
* @return DHT version.
*/
public GridCacheVersion dhtVersion() {
return dhtVer;
}

/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheContext<K, V> ctx) throws GridException {
super.prepareMarshal(ctx);
Expand Down Expand Up @@ -122,6 +134,7 @@ public long threadId() {
_clone.errBytes = errBytes;
_clone.miniId = miniId;
_clone.nearThreadId = nearThreadId;
_clone.dhtVer = dhtVer;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -158,6 +171,12 @@ public long threadId() {

commState.idx++;

case 7:
if (!commState.putCacheVersion(dhtVer))
return false;

commState.idx++;

}

return true;
Expand Down Expand Up @@ -200,6 +219,16 @@ public long threadId() {

commState.idx++;

case 7:
GridCacheVersion dhtVer0 = commState.getCacheVersion();

if (dhtVer0 == CACHE_VER_NOT_READ)
return false;

dhtVer = dhtVer0;

commState.idx++;

}

return true;
Expand Down

0 comments on commit 6e7ecb2

Please sign in to comment.