Skip to content

Commit

Permalink
Refactor RegionStoreClient logic (#989)
Browse files Browse the repository at this point in the history
  • Loading branch information
birdstorm authored and zhexuany committed Aug 23, 2019
1 parent fdb938e commit 60eec59
Show file tree
Hide file tree
Showing 23 changed files with 933 additions and 503 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.pingcap.tikv.TiSession
import com.pingcap.tispark.TiConfigConst
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{BaseTiSparkTest, DataFrame, Row}

Expand Down Expand Up @@ -82,7 +83,7 @@ class BaseDataSourceTest(val table: String,
sortCol: String = "i",
selectCol: String = null,
tableName: String
) = {
): Unit = {
// check data source result & expected answer
var df = queryDatasourceTiDBWithTable(sortCol, tableName)
if (selectCol != null) {
Expand Down Expand Up @@ -173,7 +174,14 @@ class BaseDataSourceTest(val table: String,
val answer = seqRowToList(expectedAnswer, schema)

val jdbcResult = queryTiDBViaJDBC(sql)
val df = queryDatasourceTiDB(sortCol)
val df = try {
queryDatasourceTiDB(sortCol)
} catch {
case e: NoSuchTableException =>
logger.warn("query via datasource api fails", e)
spark.sql("show tables").show
throw e
}
val tidbResult = seqRowToList(df.collect(), df.schema)

// check tidb result & expected answer
Expand Down Expand Up @@ -202,13 +210,11 @@ class BaseDataSourceTest(val table: String,
val df = queryDatasourceTiDBWithTable(sortCol, tableName = tblName)
val tidbResult = seqRowToList(df.collect(), df.schema)

println(s"running test on table $tblName")
if (compResult(jdbcResult, tidbResult)) {
assert(true)
} else {
println(s"failed on $tblName")
println(tidbResult)
assert(false)
if (!compResult(jdbcResult, tidbResult)) {
logger.error(s"""Failed on $tblName\n
|DataSourceAPI result: ${listToString(jdbcResult)}\n
|TiDB via JDBC result: ${listToString(tidbResult)}""".stripMargin)
fail()
}
}

Expand Down
24 changes: 1 addition & 23 deletions core/src/test/scala/org/apache/spark/sql/BaseTiSparkTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class BaseTiSparkTest extends QueryTest with SharedSQLContext {

protected def loadTestData(databases: Seq[String] = defaultTestDatabases): Unit =
try {
ti.meta.reloadAllMeta()
tableNames = Seq.empty[String]
for (dbName <- databases) {
setCurrentDatabase(dbName)
Expand Down Expand Up @@ -395,29 +396,6 @@ class BaseTiSparkTest extends QueryTest with SharedSQLContext {
}
}

private def listToString(result: List[List[Any]]): String =
if (result == null) s"[len: null] = null"
else if (result.isEmpty) s"[len: 0] = Empty"
else s"[len: ${result.length}] = ${result.map(mapStringList).mkString(",")}"

private def mapStringList(result: List[Any]): String =
if (result == null) "null" else "List(" + result.map(mapString).mkString(",") + ")"

private def mapString(result: Any): String =
if (result == null) "null"
else
result match {
case _: Array[Byte] =>
var str = "["
for (s <- result.asInstanceOf[Array[Byte]]) {
str += " " + s.toString
}
str += " ]"
str
case _ =>
result.toString
}

protected def explainTestAndCollect(sql: String): Unit = {
val df = spark.sql(sql)
df.explain
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -245,15 +245,15 @@ class IssueTestSuite extends BaseTiSparkTest {
tidbStmt.execute("insert into t values(1)")
tidbStmt.execute("insert into t values(2)")
tidbStmt.execute("insert into t values(4)")
ti.meta.reloadAllMeta()
loadTestData()
runTest("select count(c1) from t")
runTest("select count(c1 + 1) from t")
runTest("select count(1 + c1) from t")
tidbStmt.execute("drop table if exists t")
tidbStmt.execute("create table t(c1 int not null, c2 int not null)")
tidbStmt.execute("insert into t values(1, 4)")
tidbStmt.execute("insert into t values(2, 2)")
ti.meta.reloadAllMeta()
loadTestData()
runTest("select count(c1 + c2) from t")
}

Expand Down
23 changes: 23 additions & 0 deletions core/src/test/scala/org/apache/spark/sql/QueryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,29 @@ abstract class QueryTest extends SparkFunSuite {
}
}

def listToString(result: List[List[Any]]): String =
if (result == null) s"[len: null] = null"
else if (result.isEmpty) s"[len: 0] = Empty"
else s"[len: ${result.length}] = ${result.map(mapStringList).mkString(",")}"

private def mapStringList(result: List[Any]): String =
if (result == null) "null" else "List(" + result.map(mapString).mkString(",") + ")"

private def mapString(result: Any): String =
if (result == null) "null"
else
result match {
case _: Array[Byte] =>
var str = "["
for (s <- result.asInstanceOf[Array[Byte]]) {
str += " " + s.toString
}
str += " ]"
str
case _ =>
result.toString
}

protected def toOutput(value: Any, colType: String): Any = value match {
case _: BigDecimal =>
value.asInstanceOf[BigDecimal].setScale(2, BigDecimal.RoundingMode.HALF_UP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ trait BaseDataTypeTest extends BaseTiSparkTest {
setCurrentDatabase(dbName)
val tblName = generator.getTableNameWithDesc(desc, dataType)
val query = s"select ${generator.getColumnName(dataType)} from $tblName"
println(query)
logger.info(query)
runTest(query)
}

Expand Down
13 changes: 13 additions & 0 deletions tikv-client/src/main/java/com/pingcap/tikv/AbstractGRPCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,25 @@ public abstract class AbstractGRPCClient<
protected final Logger logger = Logger.getLogger(this.getClass());
protected TiConfiguration conf;
protected final ChannelFactory channelFactory;
protected BlockingStubT blockingStub;
protected StubT asyncStub;

protected AbstractGRPCClient(TiConfiguration conf, ChannelFactory channelFactory) {
this.conf = conf;
this.channelFactory = channelFactory;
}

protected AbstractGRPCClient(
TiConfiguration conf,
ChannelFactory channelFactory,
BlockingStubT blockingStub,
StubT asyncStub) {
this.conf = conf;
this.channelFactory = channelFactory;
this.blockingStub = blockingStub;
this.asyncStub = asyncStub;
}

public TiConfiguration getConf() {
return conf;
}
Expand Down
Loading

0 comments on commit 60eec59

Please sign in to comment.