From b72a7b2038c42186c72f00e7a04b117bd5e874c8 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Fri, 23 Aug 2019 20:58:54 +0800 Subject: [PATCH] Remove redundant pre-write/commit logic in LockResolverTest (#1062) --- .../main/java/com/pingcap/tikv/PDClient.java | 1 - .../com/pingcap/tikv/ReadOnlyPDClient.java | 3 - .../tikv/region/RegionStoreClient.java | 103 +++++------ .../pingcap/tikv/txn/LockResolverRCTest.java | 36 ++-- .../pingcap/tikv/txn/LockResolverSITest.java | 59 ++++--- .../pingcap/tikv/txn/LockResolverTest.java | 165 +++++------------- 6 files changed, 135 insertions(+), 232 deletions(-) diff --git a/tikv-client/src/main/java/com/pingcap/tikv/PDClient.java b/tikv-client/src/main/java/com/pingcap/tikv/PDClient.java index c17de7ad4e..330e34923c 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/PDClient.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/PDClient.java @@ -178,7 +178,6 @@ public Future getStoreAsync(BackOffer backOffer, long storeId) { public void close() throws InterruptedException { if (service != null) { service.shutdownNow(); - service.awaitTermination(1, TimeUnit.SECONDS); } if (channelFactory != null) { channelFactory.close(); diff --git a/tikv-client/src/main/java/com/pingcap/tikv/ReadOnlyPDClient.java b/tikv-client/src/main/java/com/pingcap/tikv/ReadOnlyPDClient.java index 6996e2212d..1be1bef691 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/ReadOnlyPDClient.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/ReadOnlyPDClient.java @@ -60,7 +60,4 @@ public interface ReadOnlyPDClient { Store getStore(BackOffer backOffer, long storeId); Future getStoreAsync(BackOffer backOffer, long storeId); - - /** Close underlining resources */ - void close() throws InterruptedException; } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java b/tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java index a432bba1eb..5d520db708 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java @@ -48,13 +48,13 @@ import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; import org.tikv.kvproto.TikvGrpc.TikvStub; -// Note that RegionStoreClient itself is not thread-safe // TODO: -// 1. RegionStoreClient will be inaccessible directly. -// 2. All apis of RegionStoreClient would not provide retry aside from callWithRetry, -// if a request needs to be retried because of an un-retryable cause, e.g., keys -// need to be re-split across regions/stores, region info outdated, e.t.c., you should -// retry it in an upper client logic (KVClient, TxnClient, e.t.c.) +// 1. RegionStoreClient will be inaccessible directly. +// 2. All apis of RegionStoreClient would not provide retry aside from callWithRetry, +// if a request needs to be retried because of an un-retryable cause, e.g., keys +// need to be re-split across regions/stores, region info outdated, e.t.c., you +// should retry it in an upper client logic (KVClient, TxnClient, e.t.c.) +/** Note that RegionStoreClient itself is not thread-safe */ public class RegionStoreClient extends AbstractRegionStoreClient { public enum RequestTypes { REQ_TYPE_SELECT(101), @@ -85,8 +85,8 @@ public int getValue() { * @param key key to fetch * @param version key version * @return value - * @throws TiClientInternalException - * @throws KeyException + * @throws TiClientInternalException TiSpark Client exception, unexpected + * @throws KeyException Key may be locked */ public ByteString get(BackOffer backOffer, ByteString key, long version) throws TiClientInternalException, KeyException { @@ -109,21 +109,16 @@ public ByteString get(BackOffer backOffer, ByteString key, long version) GetResponse resp = callWithRetry(backOffer, TikvGrpc.METHOD_KV_GET, factory, handler); - handleGetResponse(backOffer, resp); + handleGetResponse(resp); return resp.getValue(); } /** - * @param backOffer - * @param resp - * @return Return true means the rpc call success. Return false means the rpc call fail, - * RegionStoreClient should retry. Throw an Exception means the rpc call fail, - * RegionStoreClient cannot handle this kind of error. - * @throws TiClientInternalException - * @throws KeyException + * @param resp GetResponse + * @throws TiClientInternalException TiSpark Client exception, unexpected + * @throws KeyException Key may be locked */ - private void handleGetResponse(BackOffer backOffer, GetResponse resp) - throws TiClientInternalException, KeyException { + private void handleGetResponse(GetResponse resp) throws TiClientInternalException, KeyException { if (resp == null) { this.regionManager.onRequestFail(region); throw new TiClientInternalException("GetResponse failed without a cause"); @@ -157,7 +152,7 @@ public List batchGet(BackOffer backOffer, Iterable keys, lon return handleBatchGetResponse(backOffer, resp); } - private List handleBatchGetResponse(BackOffer bo, BatchGetResponse resp) { + private List handleBatchGetResponse(BackOffer backOffer, BatchGetResponse resp) { if (resp == null) { this.regionManager.onRequestFail(region); throw new TiClientInternalException("BatchGetResponse failed without a cause"); @@ -179,7 +174,7 @@ private List handleBatchGetResponse(BackOffer bo, BatchGetResponse resp) } if (!locks.isEmpty()) { - boolean ok = lockResolverClient.resolveLocks(bo, locks); + boolean ok = lockResolverClient.resolveLocks(backOffer, locks); if (!ok) { // resolveLocks already retried, just throw error to upper logic. throw new TiKVException("locks not resolved, retry"); @@ -261,14 +256,14 @@ public List scan(BackOffer backOffer, ByteString startKey, long version) /** * Prewrite batch keys * - * @param backOffer - * @param primary - * @param mutations - * @param startTs - * @param lockTTL - * @throws TiClientInternalException - * @throws KeyException - * @throws RegionException + * @param backOffer backOffer + * @param primary primary lock of keys + * @param mutations batch key-values as mutations + * @param startTs startTs of prewrite + * @param lockTTL lock ttl + * @throws TiClientInternalException TiSpark Client exception, unexpected + * @throws KeyException Key may be locked + * @throws RegionException region error occurs */ public void prewrite( BackOffer backOffer, @@ -283,21 +278,13 @@ public void prewrite( /** * Prewrite batch keys * - * @param bo - * @param primaryLock - * @param mutations - * @param startVersion - * @param ttl - * @param skipConstraintCheck - * @throws TiClientInternalException - * @throws KeyException - * @throws RegionException + * @param skipConstraintCheck whether to skip constraint check */ public void prewrite( BackOffer bo, ByteString primaryLock, Iterable mutations, - long startVersion, + long startTs, long ttl, boolean skipConstraintCheck) throws TiClientInternalException, KeyException, RegionException { @@ -306,7 +293,7 @@ public void prewrite( () -> PrewriteRequest.newBuilder() .setContext(region.getContext()) - .setStartVersion(startVersion) + .setStartVersion(startTs) .setPrimaryLock(primaryLock) .addAllMutations(mutations) .setLockTtl(ttl) @@ -328,8 +315,8 @@ public void prewrite( } /** - * @param backOffer - * @param resp + * @param backOffer backOffer + * @param resp response * @return Return true means the rpc call success. Return false means the rpc call fail, * RegionStoreClient should retry. Throw an Exception means the rpc call fail, * RegionStoreClient cannot handle this kind of error @@ -347,40 +334,42 @@ private boolean isPrewriteSuccess(BackOffer backOffer, PrewriteResponse resp) throw new RegionException(resp.getRegionError()); } - boolean result = true; + boolean isSuccess = true; List locks = new ArrayList<>(); for (KeyError err : resp.getErrorsList()) { if (err.hasLocked()) { - result = false; + isSuccess = false; Lock lock = new Lock(err.getLocked()); locks.add(lock); } else { throw new KeyException(err.toString()); } } + if (isSuccess) { + return true; + } if (!lockResolverClient.resolveLocks(backOffer, locks)) { backOffer.doBackOff(BoTxnLock, new KeyException(resp.getErrorsList().get(0))); } - return result; + return false; } /** * Commit batch keys * - * @param backOffer - * @param keys - * @param startVersion - * @param commitVersion + * @param backOffer backOffer + * @param keys keys to commit + * @param startTs start version + * @param commitTs commit version */ - public void commit( - BackOffer backOffer, Iterable keys, long startVersion, long commitVersion) + public void commit(BackOffer backOffer, Iterable keys, long startTs, long commitTs) throws KeyException { Supplier factory = () -> CommitRequest.newBuilder() - .setStartVersion(startVersion) - .setCommitVersion(commitVersion) + .setStartVersion(startTs) + .setCommitVersion(commitTs) .addAllKeys(keys) .setContext(region.getContext()) .build(); @@ -393,20 +382,16 @@ public void commit( resp -> resp.hasRegionError() ? resp.getRegionError() : null, resp -> resp.hasError() ? resp.getError() : null); CommitResponse resp = callWithRetry(backOffer, TikvGrpc.METHOD_KV_COMMIT, factory, handler); - handleCommitResponse(backOffer, resp); + handleCommitResponse(resp); } /** - * @param backOffer - * @param resp - * @return Return true means the rpc call success. Return false means the rpc call fail, - * RegionStoreClient should retry. Throw an Exception means the rpc call fail, - * RegionStoreClient cannot handle this kind of error + * @param resp CommitResponse * @throws TiClientInternalException * @throws RegionException * @throws KeyException */ - private void handleCommitResponse(BackOffer backOffer, CommitResponse resp) + private void handleCommitResponse(CommitResponse resp) throws TiClientInternalException, RegionException, KeyException { if (resp == null) { this.regionManager.onRequestFail(region); diff --git a/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverRCTest.java b/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverRCTest.java index 1238db9322..408e699194 100644 --- a/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverRCTest.java +++ b/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverRCTest.java @@ -24,6 +24,8 @@ import com.pingcap.tikv.meta.TiTimestamp; import com.pingcap.tikv.region.RegionStoreClient; import com.pingcap.tikv.region.TiRegion; +import com.pingcap.tikv.util.BackOffer; +import com.pingcap.tikv.util.ConcreteBackOffer; import java.util.Collections; import org.apache.log4j.Logger; import org.junit.Before; @@ -39,7 +41,6 @@ public void setUp() { conf.setIsolationLevel(IsolationLevel.RC); try { session = TiSession.getInstance(conf); - pdClient = session.getPDClient(); this.builder = session.getRegionStoreClientBuilder(); init = true; } catch (Exception e) { @@ -67,38 +68,39 @@ public void RCTest() { skipTest(); return; } - TiTimestamp startTs = pdClient.getTimestamp(backOffer); - TiTimestamp endTs = pdClient.getTimestamp(backOffer); + TiTimestamp startTs = session.getTimestamp(); + TiTimestamp endTs = session.getTimestamp(); // Put into kv putKV("a", "a", startTs.getVersion(), endTs.getVersion()); - startTs = pdClient.getTimestamp(backOffer); - endTs = pdClient.getTimestamp(backOffer); + startTs = session.getTimestamp(); + endTs = session.getTimestamp(); // Prewrite as primary without committing it assertTrue(lockKey("a", "aa", "a", "aa", false, startTs.getVersion(), endTs.getVersion())); TiRegion tiRegion = session.getRegionManager().getRegionByKey(ByteString.copyFromUtf8("a")); RegionStoreClient client = builder.build(tiRegion); - // In RC mode, lock will not be read. is retrieved. - ByteString v = - client.get( - backOffer, ByteString.copyFromUtf8("a"), pdClient.getTimestamp(backOffer).getVersion()); - assertEquals(v.toStringUtf8(), "a"); + + { + BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); + // In RC mode, lock will not be read. is retrieved. + ByteString v = + client.get(backOffer, ByteString.copyFromUtf8("a"), session.getTimestamp().getVersion()); + assertEquals(v.toStringUtf8(), "a"); + } try { // After committing , we can read it. assertTrue( commit( + Collections.singletonList(ByteString.copyFromUtf8("a")), startTs.getVersion(), - endTs.getVersion(), - Collections.singletonList(ByteString.copyFromUtf8("a")))); - v = - client.get( - backOffer, - ByteString.copyFromUtf8("a"), - pdClient.getTimestamp(backOffer).getVersion()); + endTs.getVersion())); + BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); + ByteString v = + client.get(backOffer, ByteString.copyFromUtf8("a"), session.getTimestamp().getVersion()); assertEquals(v.toStringUtf8(), "aa"); } catch (KeyException e) { fail(); diff --git a/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverSITest.java b/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverSITest.java index 661ab4183b..43b545ac0e 100644 --- a/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverSITest.java +++ b/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverSITest.java @@ -24,6 +24,8 @@ import com.pingcap.tikv.meta.TiTimestamp; import com.pingcap.tikv.region.RegionStoreClient; import com.pingcap.tikv.region.TiRegion; +import com.pingcap.tikv.util.BackOffer; +import com.pingcap.tikv.util.ConcreteBackOffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -42,7 +44,6 @@ public void setUp() { TiSession.clearCache(); try { session = TiSession.getInstance(conf); - pdClient = session.getPDClient(); this.builder = session.getRegionStoreClientBuilder(); init = true; } catch (Exception e) { @@ -72,8 +73,8 @@ public void cleanLockTest() { } for (int i = 0; i < 26; i++) { String k = String.valueOf((char) ('a' + i)); - TiTimestamp startTs = pdClient.getTimestamp(backOffer); - TiTimestamp endTs = pdClient.getTimestamp(backOffer); + TiTimestamp startTs = session.getTimestamp(); + TiTimestamp endTs = session.getTimestamp(); assertTrue(lockKey(k, k, k, k, false, startTs.getVersion(), endTs.getVersion())); } @@ -92,12 +93,12 @@ public void cleanLockTest() { keys.add(ByteString.copyFromUtf8(k)); } - TiTimestamp startTs = pdClient.getTimestamp(backOffer); - TiTimestamp endTs = pdClient.getTimestamp(backOffer); + TiTimestamp startTs = session.getTimestamp(); + TiTimestamp endTs = session.getTimestamp(); boolean res = prewrite(mutations, startTs.getVersion(), mutations.get(0)); assertTrue(res); - res = commit(startTs.getVersion(), endTs.getVersion(), keys); + res = commit(keys, startTs.getVersion(), endTs.getVersion()); assertTrue(res); for (int i = 0; i < 26; i++) { @@ -106,11 +107,12 @@ public void cleanLockTest() { .getRegionManager() .getRegionByKey(ByteString.copyFromUtf8(String.valueOf((char) ('a' + i)))); RegionStoreClient client = builder.build(tiRegion); + BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); ByteString v = client.get( backOffer, ByteString.copyFromUtf8(String.valueOf((char) ('a' + i))), - pdClient.getTimestamp(backOffer).getVersion()); + session.getTimestamp().getVersion()); assertEquals(v.toStringUtf8(), String.valueOf((char) ('a' + i + 1))); } } @@ -121,19 +123,20 @@ public void txnStatusTest() { skipTest(); return; } - TiTimestamp startTs = pdClient.getTimestamp(backOffer); - TiTimestamp endTs = pdClient.getTimestamp(backOffer); + TiTimestamp startTs = session.getTimestamp(); + TiTimestamp endTs = session.getTimestamp(); putKV("a", "a", startTs.getVersion(), endTs.getVersion()); TiRegion tiRegion = session.getRegionManager().getRegionByKey(ByteString.copyFromUtf8("a")); RegionStoreClient client = builder.build(tiRegion); + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(BackOffer.CLEANUP_MAX_BACKOFF); long status = client.lockResolverClient.getTxnStatus( backOffer, startTs.getVersion(), ByteString.copyFromUtf8("a")); assertEquals(status, endTs.getVersion()); - startTs = pdClient.getTimestamp(backOffer); - endTs = pdClient.getTimestamp(backOffer); + startTs = session.getTimestamp(); + endTs = session.getTimestamp(); assertTrue(lockKey("a", "a", "a", "a", true, startTs.getVersion(), endTs.getVersion())); tiRegion = session.getRegionManager().getRegionByKey(ByteString.copyFromUtf8("a")); @@ -143,8 +146,8 @@ public void txnStatusTest() { backOffer, startTs.getVersion(), ByteString.copyFromUtf8("a")); assertEquals(status, endTs.getVersion()); - startTs = pdClient.getTimestamp(backOffer); - endTs = pdClient.getTimestamp(backOffer); + startTs = session.getTimestamp(); + endTs = session.getTimestamp(); assertTrue(lockKey("a", "a", "a", "a", false, startTs.getVersion(), endTs.getVersion())); tiRegion = session.getRegionManager().getRegionByKey(ByteString.copyFromUtf8("a")); @@ -161,14 +164,14 @@ public void SITest() { skipTest(); return; } - TiTimestamp startTs = pdClient.getTimestamp(backOffer); - TiTimestamp endTs = pdClient.getTimestamp(backOffer); + TiTimestamp startTs = session.getTimestamp(); + TiTimestamp endTs = session.getTimestamp(); // Put into kv putKV("a", "a", startTs.getVersion(), endTs.getVersion()); - startTs = pdClient.getTimestamp(backOffer); - endTs = pdClient.getTimestamp(backOffer); + startTs = session.getTimestamp(); + endTs = session.getTimestamp(); // Prewrite as primary without committing it assertTrue(lockKey("a", "aa", "a", "aa", false, startTs.getVersion(), endTs.getVersion())); @@ -177,27 +180,29 @@ public void SITest() { RegionStoreClient client = builder.build(tiRegion); try { + BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); // In SI mode, a lock is read. Try resolve it if expires TTL. - client.get( - backOffer, ByteString.copyFromUtf8("a"), pdClient.getTimestamp(backOffer).getVersion()); + client.get(backOffer, ByteString.copyFromUtf8("a"), session.getTimestamp().getVersion()); fail(); } catch (KeyException e) { assertEquals(ByteString.copyFromUtf8("a"), e.getKeyError().getLocked().getKey()); } - // With TTL set to 10, after 10 milliseconds is resolved. - // We should be able to read instead. - ByteString v = - client.get( - backOffer, ByteString.copyFromUtf8("a"), pdClient.getTimestamp(backOffer).getVersion()); - assertEquals(v.toStringUtf8(), String.valueOf('a')); + { + BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); + // With TTL set to 10, after 10 milliseconds is resolved. + // We should be able to read instead. + ByteString v = + client.get(backOffer, ByteString.copyFromUtf8("a"), session.getTimestamp().getVersion()); + assertEquals(v.toStringUtf8(), String.valueOf('a')); + } try { // Trying to continue the commit phase of will fail because TxnLockNotFound commit( + Collections.singletonList(ByteString.copyFromUtf8("a")), startTs.getVersion(), - endTs.getVersion(), - Collections.singletonList(ByteString.copyFromUtf8("a"))); + endTs.getVersion()); fail(); } catch (KeyException e) { assertFalse(e.getKeyError().getRetryable().isEmpty()); diff --git a/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverTest.java b/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverTest.java index 24e64209b4..5d69e093e9 100644 --- a/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverTest.java +++ b/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverTest.java @@ -15,37 +15,30 @@ package com.pingcap.tikv.txn; -import static com.pingcap.tikv.util.BackOffFunction.BackOffFuncType.BoTxnLock; import static junit.framework.TestCase.*; import com.google.protobuf.ByteString; -import com.pingcap.tikv.ReadOnlyPDClient; import com.pingcap.tikv.TiSession; import com.pingcap.tikv.exception.KeyException; import com.pingcap.tikv.exception.RegionException; import com.pingcap.tikv.meta.TiTimestamp; -import com.pingcap.tikv.operation.KVErrorHandler; import com.pingcap.tikv.region.RegionStoreClient; import com.pingcap.tikv.region.TiRegion; +import com.pingcap.tikv.util.BackOffFunction; import com.pingcap.tikv.util.BackOffer; import com.pingcap.tikv.util.ConcreteBackOffer; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.function.Supplier; import org.apache.log4j.Logger; import org.junit.Before; import org.tikv.kvproto.Kvrpcpb.*; -import org.tikv.kvproto.TikvGrpc; public abstract class LockResolverTest { private final Logger logger = Logger.getLogger(this.getClass()); TiSession session; private static final int DefaultTTL = 10; - BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(1000); - ReadOnlyPDClient pdClient; RegionStoreClient.RegionStoreClientBuilder builder; boolean init; @@ -62,87 +55,46 @@ void putKV(String key, String value, long startTS, long commitTS) { boolean res = prewrite(Collections.singletonList(m), startTS, m); assertTrue(res); - res = commit(startTS, commitTS, Collections.singletonList(ByteString.copyFromUtf8(key))); + res = commit(Collections.singletonList(ByteString.copyFromUtf8(key)), startTS, commitTS); assertTrue(res); } boolean prewrite(List mutations, long startTS, Mutation primary) { if (mutations.size() == 0) return true; + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(1000); - /*for (Mutation m : mutations) { + for (Mutation m : mutations) { while (true) { try { TiRegion region = session.getRegionManager().getRegionByKey(m.getKey()); RegionStoreClient client = builder.build(region); - client.prewrite(backOffer, primary.getKey(), mutations, startTS, DefaultTTL); + client.prewrite( + backOffer, primary.getKey(), Collections.singletonList(m), startTS, DefaultTTL); break; - } catch (Exception e) { - logger.warn(e.getMessage()); + } catch (RegionException e) { + backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); } } - }*/ - for (Mutation m : mutations) { - TiRegion region = session.getRegionManager().getRegionByKey(m.getKey()); - RegionStoreClient client = builder.build(region); - - Supplier factory = - () -> - PrewriteRequest.newBuilder() - .addAllMutations(Collections.singletonList(m)) - .setPrimaryLock(primary.getKey()) - .setStartVersion(startTS) - .setLockTtl(DefaultTTL) - .setContext(region.getContext()) - .build(); - - KVErrorHandler handler = - new KVErrorHandler<>( - session.getRegionManager(), - client, - client.lockResolverClient, - region, - resp -> resp.hasRegionError() ? resp.getRegionError() : null, - resp -> null); - PrewriteResponse resp = - client.callWithRetry(backOffer, TikvGrpc.METHOD_KV_PREWRITE, factory, handler); - - if (resp.hasRegionError()) { - throw new RegionException(resp.getRegionError()); - } + } + return true; + } - if (resp.getErrorsCount() == 0) { - continue; - } + boolean commit(List keys, long startTS, long commitTS) { + if (keys.size() == 0) return true; + BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(1000); - List locks = new ArrayList<>(); - for (KeyError err : resp.getErrorsList()) { - if (err.hasLocked()) { - Lock lock = new Lock(err.getLocked()); - locks.add(lock); - } else { - throw new KeyException(err); + for (ByteString k : keys) { + while (true) { + try { + TiRegion tiRegion = session.getRegionManager().getRegionByKey(k); + RegionStoreClient client = builder.build(tiRegion); + client.commit(backOffer, Collections.singletonList(k), startTS, commitTS); + break; + } catch (RegionException e) { + backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); } } - - LockResolverClient resolver = null; - try { - Field field = RegionStoreClient.class.getDeclaredField("lockResolverClient"); - assert (field != null); - field.setAccessible(true); - resolver = (LockResolverClient) (field.get(client)); - } catch (Exception e) { - fail(); - } - - assertNotNull(resolver); - - if (!resolver.resolveLocks(backOffer, locks)) { - backOffer.doBackOff(BoTxnLock, new KeyException(resp.getErrorsList().get(0))); - } - - prewrite(Collections.singletonList(m), startTS, primary); } - return true; } @@ -174,62 +126,24 @@ boolean lockKey( if (commitPrimary) { if (!key.equals(primaryKey)) { return commit( + Arrays.asList(ByteString.copyFromUtf8(primaryKey), ByteString.copyFromUtf8(key)), startTs, - commitTS, - Arrays.asList(ByteString.copyFromUtf8(primaryKey), ByteString.copyFromUtf8(key))); + commitTS); } else { return commit( - startTs, commitTS, Collections.singletonList(ByteString.copyFromUtf8(primaryKey))); + Collections.singletonList(ByteString.copyFromUtf8(primaryKey)), startTs, commitTS); } } return true; } - boolean commit(long startTS, long commitTS, List keys) { - if (keys.size() == 0) return true; - - for (ByteString k : keys) { - TiRegion tiRegion = session.getRegionManager().getRegionByKey(k); - - RegionStoreClient client = builder.build(tiRegion); - Supplier factory = - () -> - CommitRequest.newBuilder() - .setStartVersion(startTS) - .setCommitVersion(commitTS) - .addAllKeys(Collections.singletonList(k)) - .setContext(tiRegion.getContext()) - .build(); - - KVErrorHandler handler = - new KVErrorHandler<>( - session.getRegionManager(), - client, - client.lockResolverClient, - tiRegion, - resp -> resp.hasRegionError() ? resp.getRegionError() : null, - resp -> resp.hasError() ? resp.getError() : null); - CommitResponse resp = - client.callWithRetry(backOffer, TikvGrpc.METHOD_KV_COMMIT, factory, handler); - - if (resp.hasRegionError()) { - throw new RegionException(resp.getRegionError()); - } - - if (resp.hasError()) { - throw new KeyException(resp.getError()); - } - } - return true; - } - void putAlphabet() { for (int i = 0; i < 26; i++) { - long startTs = pdClient.getTimestamp(backOffer).getVersion(); - long endTs = pdClient.getTimestamp(backOffer).getVersion(); + long startTs = session.getTimestamp().getVersion(); + long endTs = session.getTimestamp().getVersion(); while (startTs == endTs) { - endTs = pdClient.getTimestamp(backOffer).getVersion(); + endTs = session.getTimestamp().getVersion(); } putKV(String.valueOf((char) ('a' + i)), String.valueOf((char) ('a' + i)), startTs, endTs); } @@ -237,23 +151,23 @@ void putAlphabet() { } void prepareAlphabetLocks() { - TiTimestamp startTs = pdClient.getTimestamp(backOffer); - TiTimestamp endTs = pdClient.getTimestamp(backOffer); + TiTimestamp startTs = session.getTimestamp(); + TiTimestamp endTs = session.getTimestamp(); while (startTs == endTs) { - endTs = pdClient.getTimestamp(backOffer); + endTs = session.getTimestamp(); } putKV("c", "cc", startTs.getVersion(), endTs.getVersion()); - startTs = pdClient.getTimestamp(backOffer); - endTs = pdClient.getTimestamp(backOffer); + startTs = session.getTimestamp(); + endTs = session.getTimestamp(); while (startTs == endTs) { - endTs = pdClient.getTimestamp(backOffer); + endTs = session.getTimestamp(); } assertTrue(lockKey("c", "c", "z1", "z1", true, startTs.getVersion(), endTs.getVersion())); - startTs = pdClient.getTimestamp(backOffer); - endTs = pdClient.getTimestamp(backOffer); + startTs = session.getTimestamp(); + endTs = session.getTimestamp(); while (startTs == endTs) { - endTs = pdClient.getTimestamp(backOffer); + endTs = session.getTimestamp(); } assertTrue(lockKey("d", "dd", "z2", "z2", false, startTs.getVersion(), endTs.getVersion())); } @@ -271,8 +185,9 @@ void versionTest(boolean hasLock) { ByteString key = ByteString.copyFromUtf8(String.valueOf((char) ('a' + i))); TiRegion tiRegion = session.getRegionManager().getRegionByKey(key); RegionStoreClient client = builder.build(tiRegion); + BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); try { - ByteString v = client.get(backOffer, key, pdClient.getTimestamp(backOffer).getVersion()); + ByteString v = client.get(backOffer, key, session.getTimestamp().getVersion()); if (hasLock && i == 3) { // key "d" should be locked fail();