diff --git a/core/src/test/scala/com/pingcap/tispark/datasource/BaseDataSourceTest.scala b/core/src/test/scala/com/pingcap/tispark/datasource/BaseDataSourceTest.scala index d9730352cf..4a154ac475 100644 --- a/core/src/test/scala/com/pingcap/tispark/datasource/BaseDataSourceTest.scala +++ b/core/src/test/scala/com/pingcap/tispark/datasource/BaseDataSourceTest.scala @@ -6,6 +6,7 @@ import com.pingcap.tikv.TiSession import com.pingcap.tispark.TiConfigConst import org.apache.spark.SparkException import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{BaseTiSparkTest, DataFrame, Row} @@ -82,7 +83,7 @@ class BaseDataSourceTest(val table: String, sortCol: String = "i", selectCol: String = null, tableName: String - ) = { + ): Unit = { // check data source result & expected answer var df = queryDatasourceTiDBWithTable(sortCol, tableName) if (selectCol != null) { @@ -173,7 +174,14 @@ class BaseDataSourceTest(val table: String, val answer = seqRowToList(expectedAnswer, schema) val jdbcResult = queryTiDBViaJDBC(sql) - val df = queryDatasourceTiDB(sortCol) + val df = try { + queryDatasourceTiDB(sortCol) + } catch { + case e: NoSuchTableException => + logger.warn("query via datasource api fails", e) + spark.sql("show tables").show + throw e + } val tidbResult = seqRowToList(df.collect(), df.schema) // check tidb result & expected answer @@ -202,13 +210,11 @@ class BaseDataSourceTest(val table: String, val df = queryDatasourceTiDBWithTable(sortCol, tableName = tblName) val tidbResult = seqRowToList(df.collect(), df.schema) - println(s"running test on table $tblName") - if (compResult(jdbcResult, tidbResult)) { - assert(true) - } else { - println(s"failed on $tblName") - println(tidbResult) - assert(false) + if (!compResult(jdbcResult, tidbResult)) { + logger.error(s"""Failed on $tblName\n + |DataSourceAPI result: ${listToString(jdbcResult)}\n + |TiDB via JDBC result: ${listToString(tidbResult)}""".stripMargin) + fail() } } diff --git a/core/src/test/scala/org/apache/spark/sql/BaseTiSparkTest.scala b/core/src/test/scala/org/apache/spark/sql/BaseTiSparkTest.scala index d53bf756cf..3b4b2656d7 100644 --- a/core/src/test/scala/org/apache/spark/sql/BaseTiSparkTest.scala +++ b/core/src/test/scala/org/apache/spark/sql/BaseTiSparkTest.scala @@ -124,6 +124,7 @@ class BaseTiSparkTest extends QueryTest with SharedSQLContext { protected def loadTestData(databases: Seq[String] = defaultTestDatabases): Unit = try { + ti.meta.reloadAllMeta() tableNames = Seq.empty[String] for (dbName <- databases) { setCurrentDatabase(dbName) @@ -395,29 +396,6 @@ class BaseTiSparkTest extends QueryTest with SharedSQLContext { } } - private def listToString(result: List[List[Any]]): String = - if (result == null) s"[len: null] = null" - else if (result.isEmpty) s"[len: 0] = Empty" - else s"[len: ${result.length}] = ${result.map(mapStringList).mkString(",")}" - - private def mapStringList(result: List[Any]): String = - if (result == null) "null" else "List(" + result.map(mapString).mkString(",") + ")" - - private def mapString(result: Any): String = - if (result == null) "null" - else - result match { - case _: Array[Byte] => - var str = "[" - for (s <- result.asInstanceOf[Array[Byte]]) { - str += " " + s.toString - } - str += " ]" - str - case _ => - result.toString - } - protected def explainTestAndCollect(sql: String): Unit = { val df = spark.sql(sql) df.explain diff --git a/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala b/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala index 1c4c99fb7d..d1d138cf44 100644 --- a/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala @@ -245,7 +245,7 @@ class IssueTestSuite extends BaseTiSparkTest { tidbStmt.execute("insert into t values(1)") tidbStmt.execute("insert into t values(2)") tidbStmt.execute("insert into t values(4)") - ti.meta.reloadAllMeta() + loadTestData() runTest("select count(c1) from t") runTest("select count(c1 + 1) from t") runTest("select count(1 + c1) from t") @@ -253,7 +253,7 @@ class IssueTestSuite extends BaseTiSparkTest { tidbStmt.execute("create table t(c1 int not null, c2 int not null)") tidbStmt.execute("insert into t values(1, 4)") tidbStmt.execute("insert into t values(2, 2)") - ti.meta.reloadAllMeta() + loadTestData() runTest("select count(c1 + c2) from t") } diff --git a/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 8b139a0ca8..12663189f2 100644 --- a/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -196,6 +196,29 @@ abstract class QueryTest extends SparkFunSuite { } } + def listToString(result: List[List[Any]]): String = + if (result == null) s"[len: null] = null" + else if (result.isEmpty) s"[len: 0] = Empty" + else s"[len: ${result.length}] = ${result.map(mapStringList).mkString(",")}" + + private def mapStringList(result: List[Any]): String = + if (result == null) "null" else "List(" + result.map(mapString).mkString(",") + ")" + + private def mapString(result: Any): String = + if (result == null) "null" + else + result match { + case _: Array[Byte] => + var str = "[" + for (s <- result.asInstanceOf[Array[Byte]]) { + str += " " + s.toString + } + str += " ]" + str + case _ => + result.toString + } + protected def toOutput(value: Any, colType: String): Any = value match { case _: BigDecimal => value.asInstanceOf[BigDecimal].setScale(2, BigDecimal.RoundingMode.HALF_UP) diff --git a/core/src/test/scala/org/apache/spark/sql/types/BaseDataTypeTest.scala b/core/src/test/scala/org/apache/spark/sql/types/BaseDataTypeTest.scala index cca945b84f..14ab2ad745 100644 --- a/core/src/test/scala/org/apache/spark/sql/types/BaseDataTypeTest.scala +++ b/core/src/test/scala/org/apache/spark/sql/types/BaseDataTypeTest.scala @@ -35,7 +35,7 @@ trait BaseDataTypeTest extends BaseTiSparkTest { setCurrentDatabase(dbName) val tblName = generator.getTableNameWithDesc(desc, dataType) val query = s"select ${generator.getColumnName(dataType)} from $tblName" - println(query) + logger.info(query) runTest(query) } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/AbstractGRPCClient.java b/tikv-client/src/main/java/com/pingcap/tikv/AbstractGRPCClient.java index a56716010f..2e27778600 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/AbstractGRPCClient.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/AbstractGRPCClient.java @@ -37,12 +37,25 @@ public abstract class AbstractGRPCClient< protected final Logger logger = Logger.getLogger(this.getClass()); protected TiConfiguration conf; protected final ChannelFactory channelFactory; + protected BlockingStubT blockingStub; + protected StubT asyncStub; protected AbstractGRPCClient(TiConfiguration conf, ChannelFactory channelFactory) { this.conf = conf; this.channelFactory = channelFactory; } + protected AbstractGRPCClient( + TiConfiguration conf, + ChannelFactory channelFactory, + BlockingStubT blockingStub, + StubT asyncStub) { + this.conf = conf; + this.channelFactory = channelFactory; + this.blockingStub = blockingStub; + this.asyncStub = asyncStub; + } + public TiConfiguration getConf() { return conf; } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/KVClient.java b/tikv-client/src/main/java/com/pingcap/tikv/KVClient.java new file mode 100644 index 0000000000..9abb1a61f2 --- /dev/null +++ b/tikv-client/src/main/java/com/pingcap/tikv/KVClient.java @@ -0,0 +1,244 @@ +/* + * + * Copyright 2019 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.pingcap.tikv; + +import com.google.protobuf.ByteString; +import com.pingcap.tikv.exception.GrpcException; +import com.pingcap.tikv.exception.TiKVException; +import com.pingcap.tikv.operation.iterator.ConcreteScanIterator; +import com.pingcap.tikv.region.RegionStoreClient; +import com.pingcap.tikv.region.RegionStoreClient.*; +import com.pingcap.tikv.region.TiRegion; +import com.pingcap.tikv.util.BackOffFunction; +import com.pingcap.tikv.util.BackOffer; +import com.pingcap.tikv.util.ConcreteBackOffer; +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.Collectors; +import org.apache.log4j.Logger; +import org.tikv.kvproto.Kvrpcpb; +import org.tikv.kvproto.Kvrpcpb.KvPair; + +public class KVClient implements AutoCloseable { + private final RegionStoreClientBuilder clientBuilder; + private final TiConfiguration conf; + private final ExecutorService executorService; + private static final Logger logger = Logger.getLogger(KVClient.class); + + private static final int BATCH_GET_SIZE = 16 * 1024; + + public KVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder) { + Objects.requireNonNull(conf, "conf is null"); + Objects.requireNonNull(clientBuilder, "clientBuilder is null"); + this.conf = conf; + this.clientBuilder = clientBuilder; + // TODO: ExecutorService executors = + // Executors.newFixedThreadPool(conf.getKVClientConcurrency()); + executorService = Executors.newFixedThreadPool(20); + } + + @Override + public void close() { + if (executorService != null) { + executorService.shutdownNow(); + } + } + + /** + * Get a key-value pair from TiKV if key exists + * + * @param key key + * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist + */ + public ByteString get(ByteString key, long version) throws GrpcException { + BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); + while (true) { + RegionStoreClient client = clientBuilder.build(key); + try { + return client.get(backOffer, key, version); + } catch (final TiKVException e) { + backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); + } + } + } + + /** + * Get a set of key-value pair by keys from TiKV + * + * @param keys keys + */ + public List batchGet(List keys, long version) throws GrpcException { + return batchGet(ConcreteBackOffer.newBatchGetMaxBackOff(), keys, version); + } + + private List batchGet(BackOffer backOffer, List keys, long version) { + Set set = new HashSet<>(keys); + return batchGet(backOffer, set, version); + } + + private List batchGet(BackOffer backOffer, Set keys, long version) { + Map> groupKeys = groupKeysByRegion(keys); + List batches = new ArrayList<>(); + + for (Map.Entry> entry : groupKeys.entrySet()) { + appendBatches(batches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE); + } + return sendBatchGet(backOffer, batches, version); + } + + /** + * Scan key-value pairs from TiKV in range [startKey, endKey) + * + * @param startKey start key, inclusive + * @param endKey end key, exclusive + * @return list of key-value pairs in range + */ + public List scan(ByteString startKey, ByteString endKey, long version) + throws GrpcException { + Iterator iterator = + scanIterator(conf, clientBuilder, startKey, endKey, version); + List result = new ArrayList<>(); + iterator.forEachRemaining(result::add); + return result; + } + + /** + * Scan key-value pairs from TiKV in range [startKey, ♾), maximum to `limit` pairs + * + * @param startKey start key, inclusive + * @param limit limit of kv pairs + * @return list of key-value pairs in range + */ + public List scan(ByteString startKey, long version, int limit) + throws GrpcException { + Iterator iterator = scanIterator(conf, clientBuilder, startKey, version, limit); + List result = new ArrayList<>(); + iterator.forEachRemaining(result::add); + return result; + } + + public List scan(ByteString startKey, long version) throws GrpcException { + return scan(startKey, version, Integer.MAX_VALUE); + } + + /** A Batch containing the region and a list of keys to send */ + private static final class Batch { + private final TiRegion region; + private final List keys; + + Batch(TiRegion region, List keys) { + this.region = region; + this.keys = keys; + } + } + + /** + * Append batch to list and split them according to batch limit + * + * @param batches a grouped batch + * @param region region + * @param keys keys + * @param limit batch max limit + */ + private void appendBatches( + List batches, TiRegion region, List keys, int limit) { + List tmpKeys = new ArrayList<>(); + for (int i = 0; i < keys.size(); i++) { + if (i >= limit) { + batches.add(new Batch(region, tmpKeys)); + tmpKeys.clear(); + } + tmpKeys.add(keys.get(i)); + } + if (!tmpKeys.isEmpty()) { + batches.add(new Batch(region, tmpKeys)); + } + } + + /** + * Group by list of keys according to its region + * + * @param keys keys + * @return a mapping of keys and their region + */ + private Map> groupKeysByRegion(Set keys) { + return keys.stream() + .collect(Collectors.groupingBy(clientBuilder.getRegionManager()::getRegionByKey)); + } + + /** + * Send batchPut request concurrently + * + * @param backOffer current backOffer + * @param batches list of batch to send + */ + private List sendBatchGet(BackOffer backOffer, List batches, long version) { + ExecutorCompletionService> completionService = + new ExecutorCompletionService<>(executorService); + for (Batch batch : batches) { + completionService.submit( + () -> { + RegionStoreClient client = clientBuilder.build(batch.region); + BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer); + List keys = batch.keys; + try { + return client.batchGet(singleBatchBackOffer, keys, version); + } catch (final TiKVException e) { + // TODO: any elegant way to re-split the ranges if fails? + singleBatchBackOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); + logger.warn("ReSplitting ranges for BatchPutRequest"); + // recursive calls + return batchGet(singleBatchBackOffer, batch.keys, version); + } + }); + } + try { + List result = new ArrayList<>(); + for (int i = 0; i < batches.size(); i++) { + result.addAll( + completionService.take().get(BackOffer.BATCH_GET_MAX_BACKOFF, TimeUnit.SECONDS)); + } + return result; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new TiKVException("Current thread interrupted.", e); + } catch (TimeoutException e) { + throw new TiKVException("TimeOut Exceeded for current operation. ", e); + } catch (ExecutionException e) { + throw new TiKVException("Execution exception met.", e); + } + } + + private Iterator scanIterator( + TiConfiguration conf, + RegionStoreClientBuilder builder, + ByteString startKey, + ByteString endKey, + long version) { + return new ConcreteScanIterator(conf, builder, startKey, endKey, version); + } + + private Iterator scanIterator( + TiConfiguration conf, + RegionStoreClientBuilder builder, + ByteString startKey, + long version, + int limit) { + return new ConcreteScanIterator(conf, builder, startKey, version, limit); + } +} diff --git a/tikv-client/src/main/java/com/pingcap/tikv/PDClient.java b/tikv-client/src/main/java/com/pingcap/tikv/PDClient.java index 2b85f8f6cd..c17de7ad4e 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/PDClient.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/PDClient.java @@ -199,7 +199,7 @@ LeaderWrapper getLeaderWrapper() { return leaderWrapper; } - class LeaderWrapper { + static class LeaderWrapper { private final String leaderInfo; private final PDBlockingStub blockingStub; private final PDStub asyncStub; @@ -329,6 +329,9 @@ protected PDStub getAsyncStub() { private PDClient(TiConfiguration conf, ChannelFactory channelFactory) { super(conf, channelFactory); + initCluster(); + this.blockingStub = getBlockingStub(); + this.asyncStub = getAsyncStub(); } private void initCluster() { @@ -364,19 +367,6 @@ private void initCluster() { } static PDClient createRaw(TiConfiguration conf, ChannelFactory channelFactory) { - PDClient client = null; - try { - client = new PDClient(conf, channelFactory); - client.initCluster(); - } catch (Exception e) { - if (client != null) { - try { - client.close(); - } catch (InterruptedException ignore) { - } - } - throw e; - } - return client; + return new PDClient(conf, channelFactory); } } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/Snapshot.java b/tikv-client/src/main/java/com/pingcap/tikv/Snapshot.java index c2d5934413..a51a9c1f27 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/Snapshot.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/Snapshot.java @@ -19,13 +19,12 @@ import static com.pingcap.tikv.operation.iterator.CoprocessIterator.getRowIterator; import com.google.protobuf.ByteString; +import com.pingcap.tikv.key.Key; import com.pingcap.tikv.meta.TiDAGRequest; import com.pingcap.tikv.meta.TiTimestamp; +import com.pingcap.tikv.operation.iterator.ConcreteScanIterator; import com.pingcap.tikv.operation.iterator.IndexScanIterator; -import com.pingcap.tikv.operation.iterator.ScanIterator; -import com.pingcap.tikv.region.RegionStoreClient; import com.pingcap.tikv.row.Row; -import com.pingcap.tikv.util.ConcreteBackOffer; import com.pingcap.tikv.util.RangeSplitter; import com.pingcap.tikv.util.RangeSplitter.RegionTask; import java.util.Iterator; @@ -63,9 +62,8 @@ public byte[] get(byte[] key) { } public ByteString get(ByteString key) { - RegionStoreClient client = session.getRegionStoreClientBuilder().build(key); - // TODO: Need to deal with lock error after grpc stable - return client.get(ConcreteBackOffer.newGetBackOff(), key, timestamp.getVersion()); + return new KVClient(session.getConf(), session.getRegionStoreClientBuilder()) + .get(key, timestamp.getVersion()); } /** @@ -110,8 +108,35 @@ public Iterator indexHandleRead(TiDAGRequest dagRequest, List return getHandleIterator(dagRequest, tasks, session); } + /** + * scan all keys after startKey, inclusive + * + * @param startKey start of keys + * @return iterator of kvPair + */ public Iterator scan(ByteString startKey) { - return new ScanIterator(startKey, session, timestamp.getVersion()); + return new ConcreteScanIterator( + session.getConf(), + session.getRegionStoreClientBuilder(), + startKey, + timestamp.getVersion(), + Integer.MAX_VALUE); + } + + /** + * scan all keys with prefix + * + * @param prefix prefix of keys + * @return iterator of kvPair + */ + public Iterator scanPrefix(ByteString prefix) { + ByteString nextPrefix = Key.toRawKey(prefix).nextPrefix().toByteString(); + return new ConcreteScanIterator( + session.getConf(), + session.getRegionStoreClientBuilder(), + prefix, + nextPrefix, + timestamp.getVersion()); } public TiConfiguration getConf() { diff --git a/tikv-client/src/main/java/com/pingcap/tikv/codec/MetaCodec.java b/tikv-client/src/main/java/com/pingcap/tikv/codec/MetaCodec.java index 0d9f4c963f..48f3c94c9b 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/codec/MetaCodec.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/codec/MetaCodec.java @@ -98,16 +98,13 @@ public static List> hashGetFields( MetaCodec.encodeHashDataKeyPrefix(cdo, key.toByteArray()); ByteString encodedKey = cdo.toByteString(); - Iterator iterator = snapshot.scan(encodedKey); + Iterator iterator = snapshot.scanPrefix(encodedKey); List> fields = new ArrayList<>(); while (iterator.hasNext()) { Kvrpcpb.KvPair kv = iterator.next(); if (kv == null || kv.getKey() == null) { continue; } - if (!KeyUtils.hasPrefix(kv.getKey(), encodedKey)) { - break; - } fields.add(Pair.create(MetaCodec.decodeHashDataKey(kv.getKey()).second, kv.getValue())); } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/exception/KeyException.java b/tikv-client/src/main/java/com/pingcap/tikv/exception/KeyException.java index 789fc7c2c2..dc33c08425 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/exception/KeyException.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/exception/KeyException.java @@ -20,6 +20,7 @@ public class KeyException extends TiKVException { private static final long serialVersionUID = 6649195220216182286L; + private Kvrpcpb.KeyError keyError; public KeyException(String errMsg) { super(errMsg); @@ -27,5 +28,10 @@ public KeyException(String errMsg) { public KeyException(Kvrpcpb.KeyError keyErr) { super(String.format("Key exception occurred and the reason is %s", keyErr.toString())); + this.keyError = keyErr; + } + + public Kvrpcpb.KeyError getKeyError() { + return keyError; } } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/operation/KVErrorHandler.java b/tikv-client/src/main/java/com/pingcap/tikv/operation/KVErrorHandler.java index fbc9caaa84..8f00ac48dd 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/operation/KVErrorHandler.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/operation/KVErrorHandler.java @@ -17,41 +17,57 @@ package com.pingcap.tikv.operation; +import static com.pingcap.tikv.txn.LockResolverClient.extractLockFromKeyErr; +import static com.pingcap.tikv.util.BackOffFunction.BackOffFuncType.BoTxnLockFast; + import com.google.protobuf.ByteString; import com.pingcap.tikv.codec.KeyUtils; import com.pingcap.tikv.event.CacheInvalidateEvent; import com.pingcap.tikv.exception.GrpcException; +import com.pingcap.tikv.exception.KeyException; import com.pingcap.tikv.region.RegionErrorReceiver; import com.pingcap.tikv.region.RegionManager; import com.pingcap.tikv.region.TiRegion; +import com.pingcap.tikv.txn.Lock; +import com.pingcap.tikv.txn.LockResolverClient; import com.pingcap.tikv.util.BackOffFunction; import com.pingcap.tikv.util.BackOffer; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import java.util.ArrayList; +import java.util.Collections; import java.util.function.Function; import org.apache.log4j.Logger; import org.tikv.kvproto.Errorpb; +import org.tikv.kvproto.Kvrpcpb; // TODO: consider refactor to Builder mode +// TODO: KVErrorHandler should resolve locks if it could. public class KVErrorHandler implements ErrorHandler { private static final Logger logger = Logger.getLogger(KVErrorHandler.class); - private static final int NO_LEADER_STORE_ID = - 0; // if there's currently no leader of a store, store id is set to 0 + // if a store does not have leader currently, store id is set to 0 + private static final int NO_LEADER_STORE_ID = 0; private final Function getRegionError; + private final Function getKeyError; private final Function cacheInvalidateCallBack; private final RegionManager regionManager; private final RegionErrorReceiver recv; + private final LockResolverClient lockResolverClient; private final TiRegion ctxRegion; public KVErrorHandler( RegionManager regionManager, RegionErrorReceiver recv, + LockResolverClient lockResolverClient, TiRegion ctxRegion, - Function getRegionError) { + Function getRegionError, + Function getKeyError) { this.ctxRegion = ctxRegion; this.recv = recv; + this.lockResolverClient = lockResolverClient; this.regionManager = regionManager; this.getRegionError = getRegionError; + this.getKeyError = getKeyError; this.cacheInvalidateCallBack = regionManager != null ? regionManager.getCacheInvalidateCallback() : null; } @@ -117,6 +133,19 @@ private void notifyStoreCacheInvalidate(long storeId) { } } + private boolean checkLockError(BackOffer backOffer, Lock lock) { + logger.warn("resolving lock"); + boolean ok = + lockResolverClient.resolveLocks( + backOffer, new ArrayList<>(Collections.singletonList(lock))); + if (!ok) { + // if not resolve all locks, we wait and retry + backOffer.doBackOff(BoTxnLockFast, new KeyException(lock.toString())); + return true; + } + return false; + } + // Referenced from TiDB // store/tikv/region_request.go - onRegionError @Override @@ -233,6 +262,16 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) { invalidateRegionStoreCache(ctxRegion); } + // Key error handling logic + Kvrpcpb.KeyError keyError = getKeyError.apply(resp); + if (keyError != null) { + try { + Lock lock = extractLockFromKeyErr(keyError); + checkLockError(backOffer, lock); + } catch (KeyException e) { + logger.warn("Unable to handle KeyExceptions other than LockException", e); + } + } return false; } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/operation/iterator/ConcreteScanIterator.java b/tikv-client/src/main/java/com/pingcap/tikv/operation/iterator/ConcreteScanIterator.java new file mode 100644 index 0000000000..36988f15d8 --- /dev/null +++ b/tikv-client/src/main/java/com/pingcap/tikv/operation/iterator/ConcreteScanIterator.java @@ -0,0 +1,144 @@ +/* + * + * Copyright 2019 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.pingcap.tikv.operation.iterator; + +import static java.util.Objects.requireNonNull; + +import com.google.protobuf.ByteString; +import com.pingcap.tikv.TiConfiguration; +import com.pingcap.tikv.exception.GrpcException; +import com.pingcap.tikv.exception.KeyException; +import com.pingcap.tikv.key.Key; +import com.pingcap.tikv.region.RegionStoreClient; +import com.pingcap.tikv.region.RegionStoreClient.RegionStoreClientBuilder; +import com.pingcap.tikv.region.TiRegion; +import com.pingcap.tikv.util.BackOffer; +import com.pingcap.tikv.util.ConcreteBackOffer; +import com.pingcap.tikv.util.Pair; +import org.apache.log4j.Logger; +import org.tikv.kvproto.Kvrpcpb; +import org.tikv.kvproto.Kvrpcpb.KvPair; +import org.tikv.kvproto.Metapb; + +public class ConcreteScanIterator extends ScanIterator { + private final long version; + private final Logger logger = Logger.getLogger(ConcreteScanIterator.class); + + public ConcreteScanIterator( + TiConfiguration conf, + RegionStoreClientBuilder builder, + ByteString startKey, + long version, + int limit) { + // Passing endKey as ByteString.EMPTY means that endKey is +INF by default, + this(conf, builder, startKey, ByteString.EMPTY, version, limit); + } + + public ConcreteScanIterator( + TiConfiguration conf, + RegionStoreClientBuilder builder, + ByteString startKey, + ByteString endKey, + long version) { + // Passing endKey as ByteString.EMPTY means that endKey is +INF by default, + this(conf, builder, startKey, endKey, version, Integer.MAX_VALUE); + } + + private ConcreteScanIterator( + TiConfiguration conf, + RegionStoreClientBuilder builder, + ByteString startKey, + ByteString endKey, + long version, + int limit) { + super(conf, builder, startKey, endKey, limit); + this.version = version; + } + + TiRegion loadCurrentRegionToCache() throws GrpcException { + TiRegion region; + try (RegionStoreClient client = builder.build(startKey)) { + region = client.getRegion(); + BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff(); + currentCache = client.scan(backOffer, startKey, version); + return region; + } + } + + private ByteString resolveCurrentLock(Kvrpcpb.KvPair current) { + logger.warn(String.format("resolve current key error %s", current.getError().toString())); + Pair pair = + builder.getRegionManager().getRegionStorePairByKey(startKey); + TiRegion region = pair.first; + Metapb.Store store = pair.second; + BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); + try (RegionStoreClient client = builder.build(region, store)) { + return client.get(backOffer, current.getKey(), version); + } catch (Exception e) { + throw new KeyException(current.getError()); + } + } + + @Override + public boolean hasNext() { + Kvrpcpb.KvPair current; + // continue when cache is empty but not null + do { + current = getCurrent(); + if (isCacheDrained() && cacheLoadFails()) { + endOfScan = true; + return false; + } + } while (currentCache != null && current == null); + // for last batch to be processed, we have to check if + return !processingLastBatch + || current == null + || (hasEndKey && Key.toRawKey(current.getKey()).compareTo(endKey) < 0); + } + + @Override + public KvPair next() { + --limit; + KvPair current = currentCache.get(index++); + + requireNonNull(current, "current kv pair cannot be null"); + if (current.hasError()) { + ByteString val = resolveCurrentLock(current); + current = KvPair.newBuilder().setKey(current.getKey()).setValue(val).build(); + } + + return current; + } + + /** + * Cache is drained when - no data extracted - scan limit was not defined - have read the last + * index of cache - index not initialized + * + * @return whether cache is drained + */ + private boolean isCacheDrained() { + return currentCache == null || limit <= 0 || index >= currentCache.size() || index == -1; + } + + private Kvrpcpb.KvPair getCurrent() { + if (isCacheDrained()) { + return null; + } + return currentCache.get(index); + } +} diff --git a/tikv-client/src/main/java/com/pingcap/tikv/operation/iterator/ScanIterator.java b/tikv-client/src/main/java/com/pingcap/tikv/operation/iterator/ScanIterator.java index d60b431d84..1851e3608c 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/operation/iterator/ScanIterator.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/operation/iterator/ScanIterator.java @@ -18,133 +18,98 @@ import static java.util.Objects.requireNonNull; import com.google.protobuf.ByteString; -import com.pingcap.tikv.TiSession; -import com.pingcap.tikv.exception.KeyException; +import com.pingcap.tikv.TiConfiguration; +import com.pingcap.tikv.exception.GrpcException; import com.pingcap.tikv.exception.TiClientInternalException; import com.pingcap.tikv.key.Key; -import com.pingcap.tikv.region.RegionManager; -import com.pingcap.tikv.region.RegionStoreClient; +import com.pingcap.tikv.region.RegionStoreClient.RegionStoreClientBuilder; import com.pingcap.tikv.region.TiRegion; -import com.pingcap.tikv.util.BackOffer; -import com.pingcap.tikv.util.ConcreteBackOffer; -import com.pingcap.tikv.util.Pair; import java.util.Iterator; import java.util.List; -import java.util.Objects; -import org.apache.log4j.Logger; import org.tikv.kvproto.Kvrpcpb; -import org.tikv.kvproto.Kvrpcpb.KvPair; -import org.tikv.kvproto.Metapb; -public class ScanIterator implements Iterator { - private final Logger logger = Logger.getLogger(ScanIterator.class); - protected final TiSession session; - private final RegionManager regionCache; - protected final long version; - - private List currentCache; +public abstract class ScanIterator implements Iterator { + protected final TiConfiguration conf; + protected final RegionStoreClientBuilder builder; + protected List currentCache; protected ByteString startKey; protected int index = -1; - private boolean endOfScan = false; + protected int limit; + protected boolean endOfScan = false; + + protected Key endKey; + protected boolean hasEndKey; + protected boolean processingLastBatch = false; - public ScanIterator(ByteString startKey, TiSession session, long version) { + ScanIterator( + TiConfiguration conf, + RegionStoreClientBuilder builder, + ByteString startKey, + ByteString endKey, + int limit) { this.startKey = requireNonNull(startKey, "start key is null"); if (startKey.isEmpty()) { throw new IllegalArgumentException("start key cannot be empty"); } - this.session = session; - this.regionCache = session.getRegionManager(); - this.version = version; + this.endKey = Key.toRawKey(requireNonNull(endKey, "end key is null")); + this.hasEndKey = !endKey.equals(ByteString.EMPTY); + this.limit = limit; + this.conf = conf; + this.builder = builder; } - // return false if current cache is not loaded or empty - private boolean loadCache() { - if (endOfScan) { - return false; + /** + * Load current region to cache, returns the region if loaded. + * + * @return TiRegion of current data loaded to cache + * @throws GrpcException if scan still fails after backoff + */ + abstract TiRegion loadCurrentRegionToCache() throws GrpcException; + + // return true if current cache is not loaded or empty + boolean cacheLoadFails() { + if (endOfScan || processingLastBatch) { + return true; } - if (startKey.isEmpty()) { - return false; + if (startKey == null || startKey.isEmpty()) { + return true; } - Pair pair = regionCache.getRegionStorePairByKey(startKey); - TiRegion region = pair.first; - Metapb.Store store = pair.second; - try (RegionStoreClient client = session.getRegionStoreClientBuilder().build(region, store)) { - BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff(); - currentCache = client.scan(backOffer, startKey, version); + try { + TiRegion region = loadCurrentRegionToCache(); + ByteString curRegionEndKey = region.getEndKey(); // currentCache is null means no keys found, whereas currentCache is empty means no values // found. The difference lies in whether to continue scanning, because chances are that // an empty region exists due to deletion, region split, e.t.c. // See https://github.com/pingcap/tispark/issues/393 for details if (currentCache == null) { - return false; + return true; } index = 0; + Key lastKey = Key.EMPTY; // Session should be single-threaded itself // so that we don't worry about conf change in the middle // of a transaction. Otherwise below code might lose data - if (currentCache.size() < session.getConf().getScanBatchSize()) { - // Current region done, start new batch from next region - startKey = region.getEndKey(); + if (currentCache.size() < conf.getScanBatchSize()) { + startKey = curRegionEndKey; + } else if (currentCache.size() > conf.getScanBatchSize()) { + throw new IndexOutOfBoundsException( + "current cache size = " + + currentCache.size() + + ", larger than " + + conf.getScanBatchSize()); } else { // Start new scan from exact next key in current region - Key lastKey = Key.toRawKey(currentCache.get(currentCache.size() - 1).getKey()); + lastKey = Key.toRawKey(currentCache.get(currentCache.size() - 1).getKey()); startKey = lastKey.next().toByteString(); } + // notify last batch if lastKey is greater than or equal to endKey + if (hasEndKey && lastKey.compareTo(endKey) >= 0) { + processingLastBatch = true; + startKey = null; + } } catch (Exception e) { throw new TiClientInternalException("Error scanning data from region.", e); } - return true; - } - - private boolean isCacheDrained() { - return currentCache == null || index >= currentCache.size() || index == -1; - } - - @Override - public boolean hasNext() { - if (isCacheDrained() && !loadCache()) { - endOfScan = true; - return false; - } - return true; - } - - private Kvrpcpb.KvPair getCurrent() { - if (isCacheDrained()) { - return null; - } - return currentCache.get(index++); - } - - private ByteString resolveCurrentLock(Kvrpcpb.KvPair current) { - logger.warn(String.format("resolve current key error %s", current.getError().toString())); - Pair pair = regionCache.getRegionStorePairByKey(startKey); - TiRegion region = pair.first; - Metapb.Store store = pair.second; - BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); - try (RegionStoreClient client = session.getRegionStoreClientBuilder().build(region, store)) { - return client.get(backOffer, current.getKey(), version); - } catch (Exception e) { - throw new KeyException(current.getError()); - } - } - - @Override - public Kvrpcpb.KvPair next() { - Kvrpcpb.KvPair current; - // continue when cache is empty but not null - for (current = getCurrent(); currentCache != null && current == null; current = getCurrent()) { - if (!loadCache()) { - return null; - } - } - - Objects.requireNonNull(current, "current kv pair cannot be null"); - if (current.hasError()) { - ByteString val = resolveCurrentLock(current); - current = KvPair.newBuilder().setKey(current.getKey()).setValue(val).build(); - } - - return current; + return false; } } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/region/AbstractRegionStoreClient.java b/tikv-client/src/main/java/com/pingcap/tikv/region/AbstractRegionStoreClient.java new file mode 100644 index 0000000000..b366ec8bd0 --- /dev/null +++ b/tikv-client/src/main/java/com/pingcap/tikv/region/AbstractRegionStoreClient.java @@ -0,0 +1,112 @@ +/* + * + * Copyright 2019 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.pingcap.tikv.region; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.pingcap.tikv.AbstractGRPCClient; +import com.pingcap.tikv.TiConfiguration; +import com.pingcap.tikv.exception.GrpcException; +import com.pingcap.tikv.util.ChannelFactory; +import io.grpc.ManagedChannel; +import org.tikv.kvproto.Metapb; +import org.tikv.kvproto.TikvGrpc; + +public abstract class AbstractRegionStoreClient + extends AbstractGRPCClient + implements RegionErrorReceiver { + + protected TiRegion region; + protected final RegionManager regionManager; + + protected AbstractRegionStoreClient( + TiConfiguration conf, + TiRegion region, + ChannelFactory channelFactory, + TikvGrpc.TikvBlockingStub blockingStub, + TikvGrpc.TikvStub asyncStub, + RegionManager regionManager) { + super(conf, channelFactory, blockingStub, asyncStub); + checkNotNull(region, "Region is empty"); + checkNotNull(region.getLeader(), "Leader Peer is null"); + checkArgument(region.getLeader() != null, "Leader Peer is null"); + this.region = region; + this.regionManager = regionManager; + } + + public TiRegion getRegion() { + return region; + } + + @Override + protected TikvGrpc.TikvBlockingStub getBlockingStub() { + return blockingStub.withDeadlineAfter(getConf().getTimeout(), getConf().getTimeoutUnit()); + } + + @Override + protected TikvGrpc.TikvStub getAsyncStub() { + return asyncStub.withDeadlineAfter(getConf().getTimeout(), getConf().getTimeoutUnit()); + } + + @Override + public void close() throws GrpcException {} + + /** + * onNotLeader deals with NotLeaderError and returns whether re-splitting key range is needed + * + * @param newStore the new store presented by NotLeader Error + * @return false when re-split is needed. + */ + @Override + public boolean onNotLeader(Metapb.Store newStore) { + if (logger.isDebugEnabled()) { + logger.debug(region + ", new leader = " + newStore.getId()); + } + TiRegion cachedRegion = regionManager.getRegionById(region.getId()); + // When switch leader fails or the region changed its key range, + // it would be necessary to re-split task's key range for new region. + if (!region.getStartKey().equals(cachedRegion.getStartKey()) + || !region.getEndKey().equals(cachedRegion.getEndKey())) { + return false; + } + region = cachedRegion; + String addressStr = regionManager.getStoreById(region.getLeader().getStoreId()).getAddress(); + ManagedChannel channel = channelFactory.getChannel(addressStr); + blockingStub = TikvGrpc.newBlockingStub(channel); + asyncStub = TikvGrpc.newStub(channel); + return true; + } + + @Override + public void onStoreNotMatch(Metapb.Store store) { + String addressStr = store.getAddress(); + ManagedChannel channel = channelFactory.getChannel(addressStr); + blockingStub = TikvGrpc.newBlockingStub(channel); + asyncStub = TikvGrpc.newStub(channel); + if (logger.isDebugEnabled() && region.getLeader().getStoreId() != store.getId()) { + logger.debug( + "store_not_match may occur? " + + region + + ", original store = " + + store.getId() + + " address = " + + addressStr); + } + } +} diff --git a/tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java b/tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java index 612e387236..a432bba1eb 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java @@ -17,9 +17,8 @@ package com.pingcap.tikv.region; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import static com.pingcap.tikv.region.RegionStoreClient.RequestTypes.REQ_TYPE_DAG; +import static com.pingcap.tikv.txn.LockResolverClient.extractLockFromKeyErr; import static com.pingcap.tikv.util.BackOffFunction.BackOffFuncType.*; import com.google.common.annotations.VisibleForTesting; @@ -27,7 +26,6 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.pingcap.tidb.tipb.DAGRequest; import com.pingcap.tidb.tipb.SelectResponse; -import com.pingcap.tikv.AbstractGRPCClient; import com.pingcap.tikv.TiConfiguration; import com.pingcap.tikv.exception.*; import com.pingcap.tikv.operation.KVErrorHandler; @@ -38,7 +36,6 @@ import io.grpc.ManagedChannel; import java.util.*; import java.util.function.Supplier; -import java.util.stream.Collectors; import org.apache.log4j.Logger; import org.tikv.kvproto.Coprocessor; import org.tikv.kvproto.Coprocessor.KeyRange; @@ -51,9 +48,14 @@ import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; import org.tikv.kvproto.TikvGrpc.TikvStub; -// RegionStore itself is not thread-safe -public class RegionStoreClient extends AbstractGRPCClient - implements RegionErrorReceiver { +// Note that RegionStoreClient itself is not thread-safe +// TODO: +// 1. RegionStoreClient will be inaccessible directly. +// 2. All apis of RegionStoreClient would not provide retry aside from callWithRetry, +// if a request needs to be retried because of an un-retryable cause, e.g., keys +// need to be re-split across regions/stores, region info outdated, e.t.c., you should +// retry it in an upper client logic (KVClient, TxnClient, e.t.c.) +public class RegionStoreClient extends AbstractRegionStoreClient { public enum RequestTypes { REQ_TYPE_SELECT(101), REQ_TYPE_INDEX(102), @@ -73,73 +75,42 @@ public int getValue() { } private static final Logger logger = Logger.getLogger(RegionStoreClient.class); - private TiRegion region; - private final RegionManager regionManager; @VisibleForTesting public final LockResolverClient lockResolverClient; - private TikvBlockingStub blockingStub; - private TikvStub asyncStub; - - public TiRegion getRegion() { - return region; - } - - private boolean checkLockError(BackOffer backOffer, KeyError error) { - if (error.hasLocked()) { - Lock lock = new Lock(error.getLocked()); - logger.warn("resolving lock"); - boolean ok = - lockResolverClient.resolveLocks( - backOffer, new ArrayList<>(Collections.singletonList(lock))); - if (!ok) { - // if not resolve all locks, we wait and retry - backOffer.doBackOff(BoTxnLockFast, new KeyException((error.getLocked().toString()))); - } - return false; - } else { - // retry or abort - // this should trigger Spark to retry the txn - throw new KeyException(error); - } - } /** * Fetch a value according to a key * - * @param backOffer - * @param key - * @param version - * @return + * @param backOffer backOffer + * @param key key to fetch + * @param version key version + * @return value * @throws TiClientInternalException * @throws KeyException */ public ByteString get(BackOffer backOffer, ByteString key, long version) throws TiClientInternalException, KeyException { - while (true) { - // we should refresh region - region = regionManager.getRegionByKey(key); - - Supplier factory = - () -> - GetRequest.newBuilder() - .setContext(region.getContext()) - .setKey(key) - .setVersion(version) - .build(); + Supplier factory = + () -> + GetRequest.newBuilder() + .setContext(region.getContext()) + .setKey(key) + .setVersion(version) + .build(); - KVErrorHandler handler = - new KVErrorHandler<>( - regionManager, - this, - region, - resp -> resp.hasRegionError() ? resp.getRegionError() : null); + KVErrorHandler handler = + new KVErrorHandler<>( + regionManager, + this, + lockResolverClient, + region, + resp -> resp.hasRegionError() ? resp.getRegionError() : null, + resp -> resp.hasError() ? resp.getError() : null); - GetResponse resp = callWithRetry(backOffer, TikvGrpc.METHOD_KV_GET, factory, handler); + GetResponse resp = callWithRetry(backOffer, TikvGrpc.METHOD_KV_GET, factory, handler); - if (isGetSuccess(backOffer, resp)) { - return resp.getValue(); - } - } + handleGetResponse(backOffer, resp); + return resp.getValue(); } /** @@ -151,67 +122,42 @@ public ByteString get(BackOffer backOffer, ByteString key, long version) * @throws TiClientInternalException * @throws KeyException */ - private boolean isGetSuccess(BackOffer backOffer, GetResponse resp) + private void handleGetResponse(BackOffer backOffer, GetResponse resp) throws TiClientInternalException, KeyException { if (resp == null) { this.regionManager.onRequestFail(region); throw new TiClientInternalException("GetResponse failed without a cause"); } if (resp.hasRegionError()) { - backOffer.doBackOff(BoRegionMiss, new RegionException(resp.getRegionError())); - return false; + throw new RegionException(resp.getRegionError()); } - if (resp.hasError()) { - return checkLockError(backOffer, resp.getError()); + throw new KeyException(resp.getError()); } - return true; } - public List batchGet(BackOffer backOffer, List keys, long version) { - List result = new ArrayList<>(); - while (true) { - // re-split keys - Map> map = - keys.stream().collect(Collectors.groupingBy(regionManager::getRegionByKey)); - boolean ok = true; - for (Map.Entry> entry : map.entrySet()) { - TiRegion newRegion = entry.getKey(); - if (!newRegion.equals(region)) { - RegionStoreClient newRegionStoreClient = - new RegionStoreClientBuilder(conf, this.channelFactory, this.regionManager) - .build(newRegion); - result.addAll(newRegionStoreClient.batchGet(backOffer, entry.getValue(), version)); - } else { - Supplier request = - () -> - BatchGetRequest.newBuilder() - .setContext(region.getContext()) - .addAllKeys(entry.getValue()) - .setVersion(version) - .build(); - KVErrorHandler handler = - new KVErrorHandler<>( - regionManager, - this, - region, - resp -> resp.hasRegionError() ? resp.getRegionError() : null); - BatchGetResponse resp = - callWithRetry(backOffer, TikvGrpc.METHOD_KV_BATCH_GET, request, handler); - if (isBatchGetSuccess(backOffer, resp)) { - result.addAll(resp.getPairsList()); - } else { - ok = false; - } - } - } - if (ok) { - return result; - } - } + public List batchGet(BackOffer backOffer, Iterable keys, long version) { + Supplier request = + () -> + BatchGetRequest.newBuilder() + .setContext(region.getContext()) + .addAllKeys(keys) + .setVersion(version) + .build(); + KVErrorHandler handler = + new KVErrorHandler<>( + regionManager, + this, + lockResolverClient, + region, + resp -> resp.hasRegionError() ? resp.getRegionError() : null, + resp -> null); + BatchGetResponse resp = + callWithRetry(backOffer, TikvGrpc.METHOD_KV_BATCH_GET, request, handler); + return handleBatchGetResponse(backOffer, resp); } - private boolean isBatchGetSuccess(BackOffer bo, BatchGetResponse resp) { + private List handleBatchGetResponse(BackOffer bo, BatchGetResponse resp) { if (resp == null) { this.regionManager.onRequestFail(region); throw new TiClientInternalException("BatchGetResponse failed without a cause"); @@ -235,12 +181,13 @@ private boolean isBatchGetSuccess(BackOffer bo, BatchGetResponse resp) { if (!locks.isEmpty()) { boolean ok = lockResolverClient.resolveLocks(bo, locks); if (!ok) { - // if not resolve all locks, we wait and retry - bo.doBackOff(BoTxnLockFast, new KeyException((resp.getPairsList().get(0).getError()))); - return false; + // resolveLocks already retried, just throw error to upper logic. + throw new TiKVException("locks not resolved, retry"); } + + // FIXME: we should retry } - return true; + return resp.getPairsList(); } public List scan( @@ -263,8 +210,10 @@ public List scan( new KVErrorHandler<>( regionManager, this, + lockResolverClient, region, - resp -> resp.hasRegionError() ? resp.getRegionError() : null); + resp -> resp.hasRegionError() ? resp.getRegionError() : null, + resp -> null); ScanResponse resp = callWithRetry(backOffer, TikvGrpc.METHOD_KV_SCAN, request, handler); if (isScanSuccess(backOffer, resp)) { return doScan(resp); @@ -305,33 +254,6 @@ private List doScan(ScanResponse resp) { return Collections.unmodifiableList(newKvPairs); } - private Lock extractLockFromKeyErr(KeyError keyError) { - if (keyError.hasLocked()) { - return new Lock(keyError.getLocked()); - } - - if (keyError.hasConflict()) { - WriteConflict conflict = keyError.getConflict(); - throw new KeyException( - String.format( - "scan meet key conflict on primary key %s at commit ts %s", - conflict.getPrimary(), conflict.getConflictTs())); - } - - if (!keyError.getRetryable().isEmpty()) { - throw new KeyException( - String.format("tikv restart txn %s", keyError.getRetryableBytes().toStringUtf8())); - } - - if (!keyError.getAbort().isEmpty()) { - throw new KeyException( - String.format("tikv abort txn %s", keyError.getAbortBytes().toStringUtf8())); - } - - throw new KeyException( - String.format("unexpected key error meets and it is %s", keyError.toString())); - } - public List scan(BackOffer backOffer, ByteString startKey, long version) { return scan(backOffer, startKey, version, false); } @@ -394,8 +316,10 @@ public void prewrite( new KVErrorHandler<>( regionManager, this, + lockResolverClient, region, - resp -> resp.hasRegionError() ? resp.getRegionError() : null); + resp -> resp.hasRegionError() ? resp.getRegionError() : null, + resp -> null); PrewriteResponse resp = callWithRetry(bo, TikvGrpc.METHOD_KV_PREWRITE, factory, handler); if (isPrewriteSuccess(bo, resp)) { return; @@ -452,26 +376,24 @@ private boolean isPrewriteSuccess(BackOffer backOffer, PrewriteResponse resp) public void commit( BackOffer backOffer, Iterable keys, long startVersion, long commitVersion) throws KeyException { - while (true) { - Supplier factory = - () -> - CommitRequest.newBuilder() - .setStartVersion(startVersion) - .setCommitVersion(commitVersion) - .addAllKeys(keys) - .setContext(region.getContext()) - .build(); - KVErrorHandler handler = - new KVErrorHandler<>( - regionManager, - this, - region, - resp -> resp.hasRegionError() ? resp.getRegionError() : null); - CommitResponse resp = callWithRetry(backOffer, TikvGrpc.METHOD_KV_COMMIT, factory, handler); - if (isCommitSuccess(backOffer, resp)) { - break; - } - } + Supplier factory = + () -> + CommitRequest.newBuilder() + .setStartVersion(startVersion) + .setCommitVersion(commitVersion) + .addAllKeys(keys) + .setContext(region.getContext()) + .build(); + KVErrorHandler handler = + new KVErrorHandler<>( + regionManager, + this, + lockResolverClient, + region, + resp -> resp.hasRegionError() ? resp.getRegionError() : null, + resp -> resp.hasError() ? resp.getError() : null); + CommitResponse resp = callWithRetry(backOffer, TikvGrpc.METHOD_KV_COMMIT, factory, handler); + handleCommitResponse(backOffer, resp); } /** @@ -484,7 +406,7 @@ public void commit( * @throws RegionException * @throws KeyException */ - private boolean isCommitSuccess(BackOffer backOffer, CommitResponse resp) + private void handleCommitResponse(BackOffer backOffer, CommitResponse resp) throws TiClientInternalException, RegionException, KeyException { if (resp == null) { this.regionManager.onRequestFail(region); @@ -498,9 +420,8 @@ private boolean isCommitSuccess(BackOffer backOffer, CommitResponse resp) } // If we find locks, we first resolve and let its caller retry. if (resp.hasError()) { - return checkLockError(backOffer, resp.getError()); + throw new KeyException(resp.getError()); } - return true; } /** @@ -533,8 +454,10 @@ public List coprocess( new KVErrorHandler<>( regionManager, this, + lockResolverClient, region, - resp -> resp.hasRegionError() ? resp.getRegionError() : null); + resp -> resp.hasRegionError() ? resp.getRegionError() : null, + resp -> null); Coprocessor.Response resp = callWithRetry(backOffer, TikvGrpc.METHOD_COPROCESSOR, reqToSend, handler); return handleCopResponse(backOffer, resp, ranges, responseQueue); @@ -633,9 +556,10 @@ public Iterator coprocessStreaming(DAGRequest req, List( regionManager, this, + lockResolverClient, region, - StreamingResponse::getFirstRegionError // TODO: handle all errors in streaming response - ); + StreamingResponse::getFirstRegionError, // TODO: handle all errors in streaming response + resp -> null); StreamingResponse responseIterator = this.callServerStreamingWithRetry( @@ -661,7 +585,7 @@ public RegionStoreClientBuilder( this.regionManager = regionManager; } - public RegionStoreClient build(TiRegion region, Store store) { + public RegionStoreClient build(TiRegion region, Store store) throws GrpcException { Objects.requireNonNull(region, "region is null"); Objects.requireNonNull(store, "store is null"); @@ -678,12 +602,12 @@ public RegionStoreClient build(TiRegion region, Store store) { conf, region, channelFactory, blockingStub, asyncStub, regionManager); } - public RegionStoreClient build(ByteString key) { + public RegionStoreClient build(ByteString key) throws GrpcException { Pair pair = regionManager.getRegionStorePairByKey(key); return build(pair.first, pair.second); } - public RegionStoreClient build(TiRegion region) { + public RegionStoreClient build(TiRegion region) throws GrpcException { Store store = regionManager.getStoreById(region.getLeader().getStoreId()); return build(region, store); } @@ -700,72 +624,9 @@ private RegionStoreClient( TikvBlockingStub blockingStub, TikvStub asyncStub, RegionManager regionManager) { - super(conf, channelFactory); - checkNotNull(region, "Region is empty"); - checkNotNull(region.getLeader(), "Leader Peer is null"); - checkArgument(region.getLeader() != null, "Leader Peer is null"); - this.regionManager = regionManager; - this.region = region; - this.blockingStub = blockingStub; - this.asyncStub = asyncStub; + super(conf, region, channelFactory, blockingStub, asyncStub, regionManager); this.lockResolverClient = new LockResolverClient( - conf, this.blockingStub, this.asyncStub, channelFactory, regionManager); - } - - @Override - protected TikvBlockingStub getBlockingStub() { - return blockingStub.withDeadlineAfter(getConf().getTimeout(), getConf().getTimeoutUnit()); - } - - @Override - protected TikvStub getAsyncStub() { - return asyncStub.withDeadlineAfter(getConf().getTimeout(), getConf().getTimeoutUnit()); - } - - @Override - public void close() throws Exception {} - - /** - * onNotLeader deals with NotLeaderError and returns whether re-splitting key range is needed - * - * @param newStore the new store presented by NotLeader Error - * @return false when re-split is needed. - */ - @Override - public boolean onNotLeader(Store newStore) { - if (logger.isDebugEnabled()) { - logger.debug(region + ", new leader = " + newStore.getId()); - } - TiRegion cachedRegion = regionManager.getRegionById(region.getId()); - // When switch leader fails or the region changed its key range, - // it would be necessary to re-split task's key range for new region. - if (!region.getStartKey().equals(cachedRegion.getStartKey()) - || !region.getEndKey().equals(cachedRegion.getEndKey())) { - return false; - } - region = cachedRegion; - String addressStr = regionManager.getStoreById(region.getLeader().getStoreId()).getAddress(); - ManagedChannel channel = channelFactory.getChannel(addressStr); - blockingStub = TikvGrpc.newBlockingStub(channel); - asyncStub = TikvGrpc.newStub(channel); - return true; - } - - @Override - public void onStoreNotMatch(Store store) { - String addressStr = store.getAddress(); - ManagedChannel channel = channelFactory.getChannel(addressStr); - blockingStub = TikvGrpc.newBlockingStub(channel); - asyncStub = TikvGrpc.newStub(channel); - if (logger.isDebugEnabled() && region.getLeader().getStoreId() != store.getId()) { - logger.debug( - "store_not_match may occur? " - + region - + ", original store = " - + store.getId() - + " address = " - + addressStr); - } + conf, region, this.blockingStub, this.asyncStub, channelFactory, regionManager); } } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/streaming/StreamingResponse.java b/tikv-client/src/main/java/com/pingcap/tikv/streaming/StreamingResponse.java index bf2af25bf4..e791afdccf 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/streaming/StreamingResponse.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/streaming/StreamingResponse.java @@ -58,7 +58,6 @@ public Errorpb.Error getFirstRegionError() { return response.getRegionError(); } } - return null; } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/txn/LockResolverClient.java b/tikv-client/src/main/java/com/pingcap/tikv/txn/LockResolverClient.java index a03f62d1e8..af439c4095 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/txn/LockResolverClient.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/txn/LockResolverClient.java @@ -20,36 +20,33 @@ import static com.pingcap.tikv.util.BackOffFunction.BackOffFuncType.BoRegionMiss; import com.google.protobuf.ByteString; -import com.pingcap.tikv.AbstractGRPCClient; import com.pingcap.tikv.TiConfiguration; import com.pingcap.tikv.exception.KeyException; import com.pingcap.tikv.exception.RegionException; import com.pingcap.tikv.operation.KVErrorHandler; -import com.pingcap.tikv.region.RegionErrorReceiver; +import com.pingcap.tikv.region.AbstractRegionStoreClient; import com.pingcap.tikv.region.RegionManager; import com.pingcap.tikv.region.TiRegion; import com.pingcap.tikv.region.TiRegion.RegionVerID; import com.pingcap.tikv.util.BackOffer; import com.pingcap.tikv.util.ChannelFactory; import com.pingcap.tikv.util.TsoUtils; -import io.grpc.ManagedChannel; import java.util.*; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; import org.apache.log4j.Logger; +import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Kvrpcpb.CleanupRequest; import org.tikv.kvproto.Kvrpcpb.CleanupResponse; import org.tikv.kvproto.Kvrpcpb.ResolveLockRequest; import org.tikv.kvproto.Kvrpcpb.ResolveLockResponse; -import org.tikv.kvproto.Metapb.Store; import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; import org.tikv.kvproto.TikvGrpc.TikvStub; // LockResolver resolves locks and also caches resolved txn status. -public class LockResolverClient extends AbstractGRPCClient - implements RegionErrorReceiver { +public class LockResolverClient extends AbstractRegionStoreClient { // ResolvedCacheSize is max number of cached txn status. private static final long RESOLVED_TXN_CACHE_SIZE = 2048; // By default, locks after 3000ms is considered unusual (the client created the @@ -70,24 +67,18 @@ public class LockResolverClient extends AbstractGRPCClient resolved; // the list is chain of txn for O(1) lru cache private final Queue recentResolved; - private TikvBlockingStub blockingStub; - private TikvStub asyncStub; - private TiRegion region; - private final RegionManager regionManager; public LockResolverClient( TiConfiguration conf, + TiRegion region, TikvBlockingStub blockingStub, TikvStub asyncStub, ChannelFactory channelFactory, RegionManager regionManager) { - super(conf, channelFactory); + super(conf, region, channelFactory, blockingStub, asyncStub, regionManager); resolved = new HashMap<>(); recentResolved = new LinkedList<>(); readWriteLock = new ReentrantReadWriteLock(); - this.blockingStub = blockingStub; - this.regionManager = regionManager; - this.asyncStub = asyncStub; } private void saveResolved(long txnID, long status) { @@ -139,9 +130,10 @@ public Long getTxnStatus(BackOffer bo, Long txnID, ByteString primary) { new KVErrorHandler<>( regionManager, this, + this, region, - resp -> resp.hasRegionError() ? resp.getRegionError() : null); - + resp -> resp.hasRegionError() ? resp.getRegionError() : null, + resp -> resp.hasError() ? resp.getError() : null); CleanupResponse resp = callWithRetry(bo, TikvGrpc.METHOD_KV_CLEANUP, factory, handler); status = 0L; @@ -164,6 +156,33 @@ public Long getTxnStatus(BackOffer bo, Long txnID, ByteString primary) { } } + public static Lock extractLockFromKeyErr(Kvrpcpb.KeyError keyError) { + if (keyError.hasLocked()) { + return new Lock(keyError.getLocked()); + } + + if (keyError.hasConflict()) { + Kvrpcpb.WriteConflict conflict = keyError.getConflict(); + throw new KeyException( + String.format( + "scan meet key conflict on primary key %s at commit ts %s", + conflict.getPrimary(), conflict.getConflictTs())); + } + + if (!keyError.getRetryable().isEmpty()) { + throw new KeyException( + String.format("tikv restart txn %s", keyError.getRetryableBytes().toStringUtf8())); + } + + if (!keyError.getAbort().isEmpty()) { + throw new KeyException( + String.format("tikv abort txn %s", keyError.getAbortBytes().toStringUtf8())); + } + + throw new KeyException( + String.format("unexpected key error meets and it is %s", keyError.toString())); + } + // ResolveLocks tries to resolve Locks. The resolving process is in 3 steps: // 1) Use the `lockTTL` to pick up all expired locks. Only locks that are old // enough are considered orphan locks and will be handled later. If all locks @@ -236,9 +255,10 @@ private void resolveLock(BackOffer bo, Lock lock, long txnStatus, Set( regionManager, this, + this, region, - resp -> resp.hasRegionError() ? resp.getRegionError() : null); - + resp -> resp.hasRegionError() ? resp.getRegionError() : null, + resp -> resp.hasError() ? resp.getError() : null); ResolveLockResponse resp = callWithRetry(bo, TikvGrpc.METHOD_KV_RESOLVE_LOCK, factory, handler); @@ -257,51 +277,4 @@ private void resolveLock(BackOffer bo, Lock lock, long txnStatus, Set(); } + private ConcreteBackOffer(ConcreteBackOffer source) { + this.maxSleep = source.maxSleep; + this.totalSleep = source.totalSleep; + this.errors = source.errors; + this.backOffFunctionMap = source.backOffFunctionMap; + } + /** * Creates a back off func which implements exponential back off with optional jitters according * to different back off strategies. See http://www.awsarchitectureblog.com/2015/03/backoff.html diff --git a/tikv-client/src/test/java/com/pingcap/tikv/PDClientTest.java b/tikv-client/src/test/java/com/pingcap/tikv/PDClientTest.java index f67f5710ac..a290b0b053 100644 --- a/tikv-client/src/test/java/com/pingcap/tikv/PDClientTest.java +++ b/tikv-client/src/test/java/com/pingcap/tikv/PDClientTest.java @@ -258,7 +258,7 @@ public void testRetryPolicy() throws Exception { () -> client.getStore(ConcreteBackOffer.newCustomBackOff(5000), 0); Future storeFuture = service.submit(storeCallable); try { - Store r = storeFuture.get(5, TimeUnit.SECONDS); + Store r = storeFuture.get(50, TimeUnit.SECONDS); assertEquals(r.getId(), storeId); } catch (TimeoutException e) { fail(); diff --git a/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverRCTest.java b/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverRCTest.java index e763592b26..1238db9322 100644 --- a/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverRCTest.java +++ b/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverRCTest.java @@ -54,6 +54,7 @@ public void getRCTest() { skipTest(); return; } + session.getConf().setIsolationLevel(IsolationLevel.RC); putAlphabet(); prepareAlphabetLocks(); @@ -69,28 +70,36 @@ public void RCTest() { TiTimestamp startTs = pdClient.getTimestamp(backOffer); TiTimestamp endTs = pdClient.getTimestamp(backOffer); + // Put into kv putKV("a", "a", startTs.getVersion(), endTs.getVersion()); startTs = pdClient.getTimestamp(backOffer); endTs = pdClient.getTimestamp(backOffer); - lockKey("a", "aa", "a", "aa", false, startTs.getVersion(), endTs.getVersion()); + // Prewrite as primary without committing it + assertTrue(lockKey("a", "aa", "a", "aa", false, startTs.getVersion(), endTs.getVersion())); - TiRegion tiRegion = - session.getRegionManager().getRegionByKey(ByteString.copyFromUtf8(String.valueOf('a'))); + TiRegion tiRegion = session.getRegionManager().getRegionByKey(ByteString.copyFromUtf8("a")); RegionStoreClient client = builder.build(tiRegion); + // In RC mode, lock will not be read. is retrieved. ByteString v = client.get( - backOffer, - ByteString.copyFromUtf8(String.valueOf('a')), - pdClient.getTimestamp(backOffer).getVersion()); - assertEquals(v.toStringUtf8(), String.valueOf('a')); + backOffer, ByteString.copyFromUtf8("a"), pdClient.getTimestamp(backOffer).getVersion()); + assertEquals(v.toStringUtf8(), "a"); try { - commit( - startTs.getVersion(), - endTs.getVersion(), - Collections.singletonList(ByteString.copyFromUtf8("a"))); + // After committing , we can read it. + assertTrue( + commit( + startTs.getVersion(), + endTs.getVersion(), + Collections.singletonList(ByteString.copyFromUtf8("a")))); + v = + client.get( + backOffer, + ByteString.copyFromUtf8("a"), + pdClient.getTimestamp(backOffer).getVersion()); + assertEquals(v.toStringUtf8(), "aa"); } catch (KeyException e) { fail(); } diff --git a/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverSITest.java b/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverSITest.java index 5b684c3d8c..661ab4183b 100644 --- a/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverSITest.java +++ b/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverSITest.java @@ -57,10 +57,11 @@ public void getSITest() { skipTest(); return; } + session.getConf().setIsolationLevel(IsolationLevel.SI); putAlphabet(); prepareAlphabetLocks(); - versionTest(); + versionTest(true); } @Test @@ -73,7 +74,7 @@ public void cleanLockTest() { String k = String.valueOf((char) ('a' + i)); TiTimestamp startTs = pdClient.getTimestamp(backOffer); TiTimestamp endTs = pdClient.getTimestamp(backOffer); - lockKey(k, k, k, k, false, startTs.getVersion(), endTs.getVersion()); + assertTrue(lockKey(k, k, k, k, false, startTs.getVersion(), endTs.getVersion())); } List mutations = new ArrayList<>(); @@ -124,36 +125,33 @@ public void txnStatusTest() { TiTimestamp endTs = pdClient.getTimestamp(backOffer); putKV("a", "a", startTs.getVersion(), endTs.getVersion()); - TiRegion tiRegion = - session.getRegionManager().getRegionByKey(ByteString.copyFromUtf8(String.valueOf('a'))); + TiRegion tiRegion = session.getRegionManager().getRegionByKey(ByteString.copyFromUtf8("a")); RegionStoreClient client = builder.build(tiRegion); long status = client.lockResolverClient.getTxnStatus( - backOffer, startTs.getVersion(), ByteString.copyFromUtf8(String.valueOf('a'))); + backOffer, startTs.getVersion(), ByteString.copyFromUtf8("a")); assertEquals(status, endTs.getVersion()); startTs = pdClient.getTimestamp(backOffer); endTs = pdClient.getTimestamp(backOffer); - lockKey("a", "a", "a", "a", true, startTs.getVersion(), endTs.getVersion()); - tiRegion = - session.getRegionManager().getRegionByKey(ByteString.copyFromUtf8(String.valueOf('a'))); + assertTrue(lockKey("a", "a", "a", "a", true, startTs.getVersion(), endTs.getVersion())); + tiRegion = session.getRegionManager().getRegionByKey(ByteString.copyFromUtf8("a")); client = builder.build(tiRegion); status = client.lockResolverClient.getTxnStatus( - backOffer, startTs.getVersion(), ByteString.copyFromUtf8(String.valueOf('a'))); + backOffer, startTs.getVersion(), ByteString.copyFromUtf8("a")); assertEquals(status, endTs.getVersion()); startTs = pdClient.getTimestamp(backOffer); endTs = pdClient.getTimestamp(backOffer); - lockKey("a", "a", "a", "a", false, startTs.getVersion(), endTs.getVersion()); - tiRegion = - session.getRegionManager().getRegionByKey(ByteString.copyFromUtf8(String.valueOf('a'))); + assertTrue(lockKey("a", "a", "a", "a", false, startTs.getVersion(), endTs.getVersion())); + tiRegion = session.getRegionManager().getRegionByKey(ByteString.copyFromUtf8("a")); client = builder.build(tiRegion); status = client.lockResolverClient.getTxnStatus( - backOffer, startTs.getVersion(), ByteString.copyFromUtf8(String.valueOf('a'))); + backOffer, startTs.getVersion(), ByteString.copyFromUtf8("a")); assertNotSame(status, endTs.getVersion()); } @@ -166,30 +164,43 @@ public void SITest() { TiTimestamp startTs = pdClient.getTimestamp(backOffer); TiTimestamp endTs = pdClient.getTimestamp(backOffer); + // Put into kv putKV("a", "a", startTs.getVersion(), endTs.getVersion()); startTs = pdClient.getTimestamp(backOffer); endTs = pdClient.getTimestamp(backOffer); - lockKey("a", "aa", "a", "aa", false, startTs.getVersion(), endTs.getVersion()); + // Prewrite as primary without committing it + assertTrue(lockKey("a", "aa", "a", "aa", false, startTs.getVersion(), endTs.getVersion())); - TiRegion tiRegion = - session.getRegionManager().getRegionByKey(ByteString.copyFromUtf8(String.valueOf('a'))); + TiRegion tiRegion = session.getRegionManager().getRegionByKey(ByteString.copyFromUtf8("a")); RegionStoreClient client = builder.build(tiRegion); + + try { + // In SI mode, a lock is read. Try resolve it if expires TTL. + client.get( + backOffer, ByteString.copyFromUtf8("a"), pdClient.getTimestamp(backOffer).getVersion()); + fail(); + } catch (KeyException e) { + assertEquals(ByteString.copyFromUtf8("a"), e.getKeyError().getLocked().getKey()); + } + + // With TTL set to 10, after 10 milliseconds is resolved. + // We should be able to read instead. ByteString v = client.get( - backOffer, - ByteString.copyFromUtf8(String.valueOf('a')), - pdClient.getTimestamp(backOffer).getVersion()); + backOffer, ByteString.copyFromUtf8("a"), pdClient.getTimestamp(backOffer).getVersion()); assertEquals(v.toStringUtf8(), String.valueOf('a')); try { + // Trying to continue the commit phase of will fail because TxnLockNotFound commit( startTs.getVersion(), endTs.getVersion(), Collections.singletonList(ByteString.copyFromUtf8("a"))); fail(); } catch (KeyException e) { + assertFalse(e.getKeyError().getRetryable().isEmpty()); } } } diff --git a/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverTest.java b/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverTest.java index 296bd2ee41..24e64209b4 100644 --- a/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverTest.java +++ b/tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverTest.java @@ -69,6 +69,18 @@ void putKV(String key, String value, long startTS, long commitTS) { boolean prewrite(List mutations, long startTS, Mutation primary) { if (mutations.size() == 0) return true; + /*for (Mutation m : mutations) { + while (true) { + try { + TiRegion region = session.getRegionManager().getRegionByKey(m.getKey()); + RegionStoreClient client = builder.build(region); + client.prewrite(backOffer, primary.getKey(), mutations, startTS, DefaultTTL); + break; + } catch (Exception e) { + logger.warn(e.getMessage()); + } + } + }*/ for (Mutation m : mutations) { TiRegion region = session.getRegionManager().getRegionByKey(m.getKey()); RegionStoreClient client = builder.build(region); @@ -87,9 +99,10 @@ boolean prewrite(List mutations, long startTS, Mutation primary) { new KVErrorHandler<>( session.getRegionManager(), client, + client.lockResolverClient, region, - resp -> resp.hasRegionError() ? resp.getRegionError() : null); - + resp -> resp.hasRegionError() ? resp.getRegionError() : null, + resp -> null); PrewriteResponse resp = client.callWithRetry(backOffer, TikvGrpc.METHOD_KV_PREWRITE, factory, handler); @@ -193,9 +206,10 @@ boolean commit(long startTS, long commitTS, List keys) { new KVErrorHandler<>( session.getRegionManager(), client, + client.lockResolverClient, tiRegion, - resp -> resp.hasRegionError() ? resp.getRegionError() : null); - + resp -> resp.hasRegionError() ? resp.getRegionError() : null, + resp -> resp.hasError() ? resp.getError() : null); CommitResponse resp = client.callWithRetry(backOffer, TikvGrpc.METHOD_KV_COMMIT, factory, handler); @@ -249,18 +263,28 @@ void skipTest() { } void versionTest() { + versionTest(false); + } + + void versionTest(boolean hasLock) { for (int i = 0; i < 26; i++) { - TiRegion tiRegion = - session - .getRegionManager() - .getRegionByKey(ByteString.copyFromUtf8(String.valueOf((char) ('a' + i)))); + ByteString key = ByteString.copyFromUtf8(String.valueOf((char) ('a' + i))); + TiRegion tiRegion = session.getRegionManager().getRegionByKey(key); RegionStoreClient client = builder.build(tiRegion); - ByteString v = - client.get( - backOffer, - ByteString.copyFromUtf8(String.valueOf((char) ('a' + i))), - pdClient.getTimestamp(backOffer).getVersion()); - assertEquals(v.toStringUtf8(), String.valueOf((char) ('a' + i))); + try { + ByteString v = client.get(backOffer, key, pdClient.getTimestamp(backOffer).getVersion()); + if (hasLock && i == 3) { + // key "d" should be locked + fail(); + } else { + assertEquals(String.valueOf((char) ('a' + i)), v.toStringUtf8()); + } + } catch (KeyException e) { + assertEquals(ByteString.copyFromUtf8("d"), key); + LockInfo lock = e.getKeyError().getLocked(); + assertEquals(key, lock.getKey()); + assertEquals(ByteString.copyFromUtf8("z2"), lock.getPrimaryLock()); + } } } }