Skip to content

Commit

Permalink
Remove redundant pre-write/commit logic in LockResolverTest (#1062)
Browse files Browse the repository at this point in the history
  • Loading branch information
birdstorm authored and zhexuany committed Aug 23, 2019
1 parent 420279a commit b72a7b2
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 232 deletions.
1 change: 0 additions & 1 deletion tikv-client/src/main/java/com/pingcap/tikv/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ public Future<Store> getStoreAsync(BackOffer backOffer, long storeId) {
public void close() throws InterruptedException {
if (service != null) {
service.shutdownNow();
service.awaitTermination(1, TimeUnit.SECONDS);
}
if (channelFactory != null) {
channelFactory.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,4 @@ public interface ReadOnlyPDClient {
Store getStore(BackOffer backOffer, long storeId);

Future<Store> getStoreAsync(BackOffer backOffer, long storeId);

/** Close underlining resources */
void close() throws InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
import org.tikv.kvproto.TikvGrpc.TikvStub;

// Note that RegionStoreClient itself is not thread-safe
// TODO:
// 1. RegionStoreClient will be inaccessible directly.
// 2. All apis of RegionStoreClient would not provide retry aside from callWithRetry,
// if a request needs to be retried because of an un-retryable cause, e.g., keys
// need to be re-split across regions/stores, region info outdated, e.t.c., you should
// retry it in an upper client logic (KVClient, TxnClient, e.t.c.)
// 1. RegionStoreClient will be inaccessible directly.
// 2. All apis of RegionStoreClient would not provide retry aside from callWithRetry,
// if a request needs to be retried because of an un-retryable cause, e.g., keys
// need to be re-split across regions/stores, region info outdated, e.t.c., you
// should retry it in an upper client logic (KVClient, TxnClient, e.t.c.)
/** Note that RegionStoreClient itself is not thread-safe */
public class RegionStoreClient extends AbstractRegionStoreClient {
public enum RequestTypes {
REQ_TYPE_SELECT(101),
Expand Down Expand Up @@ -85,8 +85,8 @@ public int getValue() {
* @param key key to fetch
* @param version key version
* @return value
* @throws TiClientInternalException
* @throws KeyException
* @throws TiClientInternalException TiSpark Client exception, unexpected
* @throws KeyException Key may be locked
*/
public ByteString get(BackOffer backOffer, ByteString key, long version)
throws TiClientInternalException, KeyException {
Expand All @@ -109,21 +109,16 @@ public ByteString get(BackOffer backOffer, ByteString key, long version)

GetResponse resp = callWithRetry(backOffer, TikvGrpc.METHOD_KV_GET, factory, handler);

handleGetResponse(backOffer, resp);
handleGetResponse(resp);
return resp.getValue();
}

/**
* @param backOffer
* @param resp
* @return Return true means the rpc call success. Return false means the rpc call fail,
* RegionStoreClient should retry. Throw an Exception means the rpc call fail,
* RegionStoreClient cannot handle this kind of error.
* @throws TiClientInternalException
* @throws KeyException
* @param resp GetResponse
* @throws TiClientInternalException TiSpark Client exception, unexpected
* @throws KeyException Key may be locked
*/
private void handleGetResponse(BackOffer backOffer, GetResponse resp)
throws TiClientInternalException, KeyException {
private void handleGetResponse(GetResponse resp) throws TiClientInternalException, KeyException {
if (resp == null) {
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("GetResponse failed without a cause");
Expand Down Expand Up @@ -157,7 +152,7 @@ public List<KvPair> batchGet(BackOffer backOffer, Iterable<ByteString> keys, lon
return handleBatchGetResponse(backOffer, resp);
}

private List<KvPair> handleBatchGetResponse(BackOffer bo, BatchGetResponse resp) {
private List<KvPair> handleBatchGetResponse(BackOffer backOffer, BatchGetResponse resp) {
if (resp == null) {
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("BatchGetResponse failed without a cause");
Expand All @@ -179,7 +174,7 @@ private List<KvPair> handleBatchGetResponse(BackOffer bo, BatchGetResponse resp)
}

if (!locks.isEmpty()) {
boolean ok = lockResolverClient.resolveLocks(bo, locks);
boolean ok = lockResolverClient.resolveLocks(backOffer, locks);
if (!ok) {
// resolveLocks already retried, just throw error to upper logic.
throw new TiKVException("locks not resolved, retry");
Expand Down Expand Up @@ -261,14 +256,14 @@ public List<KvPair> scan(BackOffer backOffer, ByteString startKey, long version)
/**
* Prewrite batch keys
*
* @param backOffer
* @param primary
* @param mutations
* @param startTs
* @param lockTTL
* @throws TiClientInternalException
* @throws KeyException
* @throws RegionException
* @param backOffer backOffer
* @param primary primary lock of keys
* @param mutations batch key-values as mutations
* @param startTs startTs of prewrite
* @param lockTTL lock ttl
* @throws TiClientInternalException TiSpark Client exception, unexpected
* @throws KeyException Key may be locked
* @throws RegionException region error occurs
*/
public void prewrite(
BackOffer backOffer,
Expand All @@ -283,21 +278,13 @@ public void prewrite(
/**
* Prewrite batch keys
*
* @param bo
* @param primaryLock
* @param mutations
* @param startVersion
* @param ttl
* @param skipConstraintCheck
* @throws TiClientInternalException
* @throws KeyException
* @throws RegionException
* @param skipConstraintCheck whether to skip constraint check
*/
public void prewrite(
BackOffer bo,
ByteString primaryLock,
Iterable<Mutation> mutations,
long startVersion,
long startTs,
long ttl,
boolean skipConstraintCheck)
throws TiClientInternalException, KeyException, RegionException {
Expand All @@ -306,7 +293,7 @@ public void prewrite(
() ->
PrewriteRequest.newBuilder()
.setContext(region.getContext())
.setStartVersion(startVersion)
.setStartVersion(startTs)
.setPrimaryLock(primaryLock)
.addAllMutations(mutations)
.setLockTtl(ttl)
Expand All @@ -328,8 +315,8 @@ public void prewrite(
}

/**
* @param backOffer
* @param resp
* @param backOffer backOffer
* @param resp response
* @return Return true means the rpc call success. Return false means the rpc call fail,
* RegionStoreClient should retry. Throw an Exception means the rpc call fail,
* RegionStoreClient cannot handle this kind of error
Expand All @@ -347,40 +334,42 @@ private boolean isPrewriteSuccess(BackOffer backOffer, PrewriteResponse resp)
throw new RegionException(resp.getRegionError());
}

boolean result = true;
boolean isSuccess = true;
List<Lock> locks = new ArrayList<>();
for (KeyError err : resp.getErrorsList()) {
if (err.hasLocked()) {
result = false;
isSuccess = false;
Lock lock = new Lock(err.getLocked());
locks.add(lock);
} else {
throw new KeyException(err.toString());
}
}
if (isSuccess) {
return true;
}

if (!lockResolverClient.resolveLocks(backOffer, locks)) {
backOffer.doBackOff(BoTxnLock, new KeyException(resp.getErrorsList().get(0)));
}
return result;
return false;
}

/**
* Commit batch keys
*
* @param backOffer
* @param keys
* @param startVersion
* @param commitVersion
* @param backOffer backOffer
* @param keys keys to commit
* @param startTs start version
* @param commitTs commit version
*/
public void commit(
BackOffer backOffer, Iterable<ByteString> keys, long startVersion, long commitVersion)
public void commit(BackOffer backOffer, Iterable<ByteString> keys, long startTs, long commitTs)
throws KeyException {
Supplier<CommitRequest> factory =
() ->
CommitRequest.newBuilder()
.setStartVersion(startVersion)
.setCommitVersion(commitVersion)
.setStartVersion(startTs)
.setCommitVersion(commitTs)
.addAllKeys(keys)
.setContext(region.getContext())
.build();
Expand All @@ -393,20 +382,16 @@ public void commit(
resp -> resp.hasRegionError() ? resp.getRegionError() : null,
resp -> resp.hasError() ? resp.getError() : null);
CommitResponse resp = callWithRetry(backOffer, TikvGrpc.METHOD_KV_COMMIT, factory, handler);
handleCommitResponse(backOffer, resp);
handleCommitResponse(resp);
}

/**
* @param backOffer
* @param resp
* @return Return true means the rpc call success. Return false means the rpc call fail,
* RegionStoreClient should retry. Throw an Exception means the rpc call fail,
* RegionStoreClient cannot handle this kind of error
* @param resp CommitResponse
* @throws TiClientInternalException
* @throws RegionException
* @throws KeyException
*/
private void handleCommitResponse(BackOffer backOffer, CommitResponse resp)
private void handleCommitResponse(CommitResponse resp)
throws TiClientInternalException, RegionException, KeyException {
if (resp == null) {
this.regionManager.onRequestFail(region);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.pingcap.tikv.meta.TiTimestamp;
import com.pingcap.tikv.region.RegionStoreClient;
import com.pingcap.tikv.region.TiRegion;
import com.pingcap.tikv.util.BackOffer;
import com.pingcap.tikv.util.ConcreteBackOffer;
import java.util.Collections;
import org.apache.log4j.Logger;
import org.junit.Before;
Expand All @@ -39,7 +41,6 @@ public void setUp() {
conf.setIsolationLevel(IsolationLevel.RC);
try {
session = TiSession.getInstance(conf);
pdClient = session.getPDClient();
this.builder = session.getRegionStoreClientBuilder();
init = true;
} catch (Exception e) {
Expand Down Expand Up @@ -67,38 +68,39 @@ public void RCTest() {
skipTest();
return;
}
TiTimestamp startTs = pdClient.getTimestamp(backOffer);
TiTimestamp endTs = pdClient.getTimestamp(backOffer);
TiTimestamp startTs = session.getTimestamp();
TiTimestamp endTs = session.getTimestamp();

// Put <a, a> into kv
putKV("a", "a", startTs.getVersion(), endTs.getVersion());

startTs = pdClient.getTimestamp(backOffer);
endTs = pdClient.getTimestamp(backOffer);
startTs = session.getTimestamp();
endTs = session.getTimestamp();

// Prewrite <a, aa> as primary without committing it
assertTrue(lockKey("a", "aa", "a", "aa", false, startTs.getVersion(), endTs.getVersion()));

TiRegion tiRegion = session.getRegionManager().getRegionByKey(ByteString.copyFromUtf8("a"));
RegionStoreClient client = builder.build(tiRegion);
// In RC mode, lock will not be read. <a, a> is retrieved.
ByteString v =
client.get(
backOffer, ByteString.copyFromUtf8("a"), pdClient.getTimestamp(backOffer).getVersion());
assertEquals(v.toStringUtf8(), "a");

{
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
// In RC mode, lock will not be read. <a, a> is retrieved.
ByteString v =
client.get(backOffer, ByteString.copyFromUtf8("a"), session.getTimestamp().getVersion());
assertEquals(v.toStringUtf8(), "a");
}

try {
// After committing <a, aa>, we can read it.
assertTrue(
commit(
Collections.singletonList(ByteString.copyFromUtf8("a")),
startTs.getVersion(),
endTs.getVersion(),
Collections.singletonList(ByteString.copyFromUtf8("a"))));
v =
client.get(
backOffer,
ByteString.copyFromUtf8("a"),
pdClient.getTimestamp(backOffer).getVersion());
endTs.getVersion()));
BackOffer backOffer = ConcreteBackOffer.newGetBackOff();
ByteString v =
client.get(backOffer, ByteString.copyFromUtf8("a"), session.getTimestamp().getVersion());
assertEquals(v.toStringUtf8(), "aa");
} catch (KeyException e) {
fail();
Expand Down
Loading

0 comments on commit b72a7b2

Please sign in to comment.