From 9289ddc820d980388ae2351be98a75efff1ecb9f Mon Sep 17 00:00:00 2001 From: birdstorm Date: Wed, 7 Aug 2019 18:23:54 +0800 Subject: [PATCH] clean up redundant code --- .ci/build.groovy | 1 - .ci/integration_test.groovy | 11 +- .../com/pingcap/tispark/TiDBRelation.scala | 10 +- .../tispark/statistics/StatisticsHelper.scala | 32 +++--- .../spark/sql/execution/CoprocessorRDD.scala | 106 ++++++------------ .../spark/sql/tispark/TiHandleRDD.scala | 38 +------ .../org/apache/spark/sql/tispark/TiRDD.scala | 55 +-------- .../apache/spark/sql/tispark/TiRowRDD.scala | 86 ++++++++++++++ .../com/pingcap/tikv/statistics/Bucket.java | 1 - .../tikv/types/AbstractDateTimeType.java | 15 +++ .../com/pingcap/tikv/types/DateTimeType.java | 15 --- .../com/pingcap/tikv/types/TimestampType.java | 9 +- 12 files changed, 171 insertions(+), 208 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/sql/tispark/TiRowRDD.scala diff --git a/.ci/build.groovy b/.ci/build.groovy index cbcfd228be..cad2e4409c 100644 --- a/.ci/build.groovy +++ b/.ci/build.groovy @@ -7,7 +7,6 @@ def call(ghprbActualCommit, ghprbPullId, ghprbPullTitle, ghprbPullLink, ghprbPul catchError { node ('build') { - def ws = pwd() deleteDir() container("java") { stage('Checkout') { diff --git a/.ci/integration_test.groovy b/.ci/integration_test.groovy index 2a0e862570..7e18f20a5c 100644 --- a/.ci/integration_test.groovy +++ b/.ci/integration_test.groovy @@ -14,21 +14,18 @@ def call(ghprbActualCommit, ghprbCommentBody, ghprbPullId, ghprbPullTitle, ghprb if (m1) { TIDB_BRANCH = "${m1[0][1]}" } - m1 = null println "TIDB_BRANCH=${TIDB_BRANCH}" // parse pd branch def m2 = ghprbCommentBody =~ /pd\s*=\s*([^\s\\]+)(\s|\\|$)/ if (m2) { PD_BRANCH = "${m2[0][1]}" } - m2 = null println "PD_BRANCH=${PD_BRANCH}" // parse tikv branch def m3 = ghprbCommentBody =~ /tikv\s*=\s*([^\s\\]+)(\s|\\|$)/ if (m3) { TIKV_BRANCH = "${m3[0][1]}" } - m3 = null println "TIKV_BRANCH=${TIKV_BRANCH}" // parse mvn profile def m4 = ghprbCommentBody =~ /profile\s*=\s*([^\s\\]+)(\s|\\|$)/ @@ -41,10 +38,6 @@ def call(ghprbActualCommit, ghprbCommentBody, ghprbPullId, ghprbPullTitle, ghprb return file.split("\n") as List } - def remove_last_str = { str -> - return str.substring(0, str.length() - 1) - } - def get_mvn_str = { total_chunks -> def mvnStr = " -DwildcardSuites=" for (int i = 0 ; i < total_chunks.size() - 1; i++) { @@ -65,8 +58,7 @@ def call(ghprbActualCommit, ghprbCommentBody, ghprbPullId, ghprbPullTitle, ghprb println "${NODE_NAME}" container("golang") { deleteDir() - def ws = pwd() - + // tidb def tidb_sha1 = sh(returnStdout: true, script: "curl ${FILE_SERVER_URL}/download/refs/pingcap/tidb/${TIDB_BRANCH}/sha1").trim() sh "curl ${FILE_SERVER_URL}/download/builds/pingcap/tidb/${tidb_sha1}/centos7/tidb-server.tar.gz | tar xz" @@ -166,7 +158,6 @@ def call(ghprbActualCommit, ghprbCommentBody, ghprbPullId, ghprbPullTitle, ghprb node("test_java") { println "${NODE_NAME}" container("java") { - def ws = pwd() deleteDir() unstash 'binaries' unstash 'tispark' diff --git a/core/src/main/scala/com/pingcap/tispark/TiDBRelation.scala b/core/src/main/scala/com/pingcap/tispark/TiDBRelation.scala index e4fb457c09..4bacc40d33 100644 --- a/core/src/main/scala/com/pingcap/tispark/TiDBRelation.scala +++ b/core/src/main/scala/com/pingcap/tispark/TiDBRelation.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression} import org.apache.spark.sql.execution._ import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation} -import org.apache.spark.sql.tispark.{TiHandleRDD, TiRDD} +import org.apache.spark.sql.tispark.{TiHandleRDD, TiRDD, TiRowRDD} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} @@ -45,13 +45,13 @@ case class TiDBRelation(session: TiSession, override def sizeInBytes: Long = tableRef.sizeInBytes - def logicalPlanToRDD(dagRequest: TiDAGRequest): List[TiRDD] = { + def logicalPlanToRDD(dagRequest: TiDAGRequest): List[TiRowRDD] = { import scala.collection.JavaConverters._ val ids = dagRequest.getIds.asScala - var tiRDDs = new ListBuffer[TiRDD] + var tiRDDs = new ListBuffer[TiRowRDD] ids.foreach( id => { - tiRDDs += new TiRDD( + tiRDDs += new TiRowRDD( dagRequest, id, session.getConf, @@ -65,7 +65,6 @@ case class TiDBRelation(session: TiSession, } def dagRequestToRegionTaskExec(dagRequest: TiDAGRequest, output: Seq[Attribute]): SparkPlan = { - val timestamp = dagRequest.getStartTs import scala.collection.JavaConverters._ val ids = dagRequest.getIds.asScala var tiHandleRDDs = new ListBuffer[TiHandleRDD]() @@ -77,7 +76,6 @@ case class TiDBRelation(session: TiSession, id, session.getConf, tableRef, - timestamp, session, sqlContext.sparkSession ) diff --git a/core/src/main/scala/com/pingcap/tispark/statistics/StatisticsHelper.scala b/core/src/main/scala/com/pingcap/tispark/statistics/StatisticsHelper.scala index f1572c3d5d..faa5d54c03 100644 --- a/core/src/main/scala/com/pingcap/tispark/statistics/StatisticsHelper.scala +++ b/core/src/main/scala/com/pingcap/tispark/statistics/StatisticsHelper.scala @@ -212,38 +212,36 @@ object StatisticsHelper { } } - private[statistics] def buildHistogramsRequest(histTable: TiTableInfo, - targetTblId: Long, - startTs: TiTimestamp): TiDAGRequest = + private def checkColExists(table: TiTableInfo, column: String): Boolean = + table.getColumns.exists { _.matchName(column) } + + private def buildRequest(tableInfo: TiTableInfo, + requiredCols: Seq[String], + targetTblId: Long, + startTs: TiTimestamp): TiDAGRequest = { TiDAGRequest.Builder .newBuilder() - .setFullTableScan(histTable) + .setFullTableScan(tableInfo) .addFilter( ComparisonBinaryExpression .equal(ColumnRef.create("table_id"), Constant.create(targetTblId)) ) .addRequiredCols( - histRequiredCols.filter(checkColExists(histTable, _)) + requiredCols.filter(checkColExists(tableInfo, _)) ) .setStartTs(startTs) .build(PushDownType.NORMAL) + } - private def checkColExists(table: TiTableInfo, column: String): Boolean = - table.getColumns.exists { _.matchName(column) } + private[statistics] def buildHistogramsRequest(histTable: TiTableInfo, + targetTblId: Long, + startTs: TiTimestamp): TiDAGRequest = + buildRequest(histTable, histRequiredCols, targetTblId, startTs) private[statistics] def buildMetaRequest(metaTable: TiTableInfo, targetTblId: Long, startTs: TiTimestamp): TiDAGRequest = - TiDAGRequest.Builder - .newBuilder() - .setFullTableScan(metaTable) - .addFilter( - ComparisonBinaryExpression - .equal(ColumnRef.create("table_id"), Constant.create(targetTblId)) - ) - .addRequiredCols(metaRequiredCols.filter(checkColExists(metaTable, _))) - .setStartTs(startTs) - .build(PushDownType.NORMAL) + buildRequest(metaTable, metaRequiredCols, targetTblId, startTs) private[statistics] def buildBucketRequest(bucketTable: TiTableInfo, targetTblId: Long, diff --git a/core/src/main/scala/org/apache/spark/sql/execution/CoprocessorRDD.scala b/core/src/main/scala/org/apache/spark/sql/execution/CoprocessorRDD.scala index bfb6cd8037..c71fad2cad 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/CoprocessorRDD.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/CoprocessorRDD.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow, import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.tispark.{TiHandleRDD, TiRDD} +import org.apache.spark.sql.tispark.{TiHandleRDD, TiRDD, TiRowRDD} import org.apache.spark.sql.types.{ArrayType, DataType, LongType, Metadata} import org.apache.spark.sql.{Row, SparkSession} import org.tikv.kvproto.Coprocessor.KeyRange @@ -44,21 +44,15 @@ import org.tikv.kvproto.Coprocessor.KeyRange import scala.collection.JavaConversions._ import scala.collection.mutable -case class CoprocessorRDD(output: Seq[Attribute], tiRdds: List[TiRDD]) extends LeafExecNode { - - override lazy val metrics: Map[String, SQLMetric] = Map( - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows") - ) - - override val nodeName: String = "CoprocessorRDD" +trait LeafExecRDD extends LeafExecNode { override val outputPartitioning: Partitioning = UnknownPartitioning(0) - override val outputOrdering: Seq[SortOrder] = Nil + private[execution] val tiRDDs: List[TiRDD] - private val internalRDDs: List[RDD[InternalRow]] = - tiRdds.map(rdd => RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))) - private lazy val project = UnsafeProjection.create(schema) + private[execution] val internalRDDs: List[RDD[InternalRow]] = + tiRDDs.map(rdd => RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))) + private[execution] lazy val project = UnsafeProjection.create(schema) - private def internalRowToUnsafeRowWithIndex( + private[execution] def internalRowToUnsafeRowWithIndex( numOutputRows: SQLMetric ): (Int, Iterator[InternalRow]) => Iterator[UnsafeRow] = (index, iter) => { @@ -69,7 +63,34 @@ case class CoprocessorRDD(output: Seq[Attribute], tiRdds: List[TiRDD]) extends L } } - protected override def doExecute(): RDD[InternalRow] = { + def dagRequest: TiDAGRequest = tiRDDs.head.dagRequest + + override def verboseString: String = + if (tiRDDs.size > 1) { + val b = new mutable.StringBuilder() + b.append(s"TiSpark $nodeName on partition table:\n") + tiRDDs.zipWithIndex.map { + case (_, i) => b.append(s"partition p$i") + } + b.append(s"with dag request: $dagRequest") + b.toString() + } else { + s"TiDB $nodeName{$dagRequest}" + + s"${TiUtil.getReqEstCountStr(dagRequest)}" + } + + override def simpleString: String = verboseString +} + +case class CoprocessorRDD(output: Seq[Attribute], tiRDDs: List[TiRowRDD]) extends LeafExecRDD { + + override lazy val metrics: Map[String, SQLMetric] = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows") + ) + + override val nodeName: String = "CoprocessorRDD" + + override protected def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") internalRDDs @@ -81,26 +102,8 @@ case class CoprocessorRDD(output: Seq[Attribute], tiRdds: List[TiRDD]) extends L ).invoke() ) .reduce(_ union _) - } - def dagRequest: TiDAGRequest = tiRdds.head.dagRequest - - override def verboseString: String = - if (tiRdds.size > 1) { - val b = new StringBuilder - b.append(s"TiSpark $nodeName on partition table:\n") - tiRdds.zipWithIndex.map { - case (_, i) => b.append(s"partition p$i") - } - b.append(s"with dag request: $dagRequest") - b.toString() - } else { - s"TiSpark $nodeName{$dagRequest}" + - s"${TiUtil.getReqEstCountStr(dagRequest)}" - - } - override def simpleString: String = verboseString } @@ -108,32 +111,15 @@ case class CoprocessorRDD(output: Seq[Attribute], tiRdds: List[TiRDD]) extends L * HandleRDDExec is used for scanning handles from TiKV as a LeafExecNode in index plan. * Providing handle scan via a TiHandleRDD. * - * @param tiHandleRDDs handle source + * @param tiRDDs handle source */ -case class HandleRDDExec(tiHandleRDDs: List[TiHandleRDD]) extends LeafExecNode { +case class HandleRDDExec(tiRDDs: List[TiHandleRDD]) extends LeafExecRDD { override val nodeName: String = "HandleRDD" override lazy val metrics: Map[String, SQLMetric] = Map( "numOutputRegions" -> SQLMetrics.createMetric(sparkContext, "number of regions") ) - override val outputPartitioning: Partitioning = UnknownPartitioning(0) - - private val internalRDDs: List[RDD[InternalRow]] = - tiHandleRDDs.map(rdd => RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))) - private lazy val project = UnsafeProjection.create(schema) - - private def internalRowToUnsafeRowWithIndex( - numOutputRegions: SQLMetric - ): (Int, Iterator[InternalRow]) => Iterator[UnsafeRow] = - (index, iter) => { - project.initialize(index) - iter.map { r => - numOutputRegions += 1 - project(r) - } - } - override protected def doExecute(): RDD[InternalRow] = { val numOutputRegions = longMetric("numOutputRegions") @@ -159,24 +145,6 @@ case class HandleRDDExec(tiHandleRDDs: List[TiHandleRDD]) extends LeafExecNode { ) override def output: Seq[Attribute] = attributeRef - - def dagRequest: TiDAGRequest = tiHandleRDDs.head.dagRequest - - override def verboseString: String = - if (tiHandleRDDs.size > 1) { - val b = new mutable.StringBuilder() - b.append(s"TiSpark $nodeName on partition table:\n") - tiHandleRDDs.zipWithIndex.map { - case (_, i) => b.append(s"partition p$i") - } - b.append(s"with dag request: $dagRequest") - b.toString() - } else { - s"TiDB $nodeName{$dagRequest}" + - s"${TiUtil.getReqEstCountStr(dagRequest)}" - } - - override def simpleString: String = verboseString } /** diff --git a/core/src/main/scala/org/apache/spark/sql/tispark/TiHandleRDD.scala b/core/src/main/scala/org/apache/spark/sql/tispark/TiHandleRDD.scala index baddbd3c20..8f40f955b3 100644 --- a/core/src/main/scala/org/apache/spark/sql/tispark/TiHandleRDD.scala +++ b/core/src/main/scala/org/apache/spark/sql/tispark/TiHandleRDD.scala @@ -39,21 +39,20 @@ import scala.collection.mutable.ListBuffer * is a list of primitive long which represents the handles lie in that region. * */ -class TiHandleRDD(val dagRequest: TiDAGRequest, - val physicalId: Long, - val tiConf: TiConfiguration, - val tableRef: TiTableReference, - val ts: TiTimestamp, +class TiHandleRDD(override val dagRequest: TiDAGRequest, + override val physicalId: Long, + override val tiConf: TiConfiguration, + override val tableRef: TiTableReference, @transient private val session: TiSession, @transient private val sparkSession: SparkSession) - extends RDD[Row](sparkSession.sparkContext, Nil) { + extends TiRDD(dagRequest, physicalId, tiConf, tableRef, session, sparkSession) { override def compute(split: Partition, context: TaskContext): Iterator[Row] = new Iterator[Row] { dagRequest.resolve() private val tiPartition = split.asInstanceOf[TiPartition] private val session = TiSession.getInstance(tiConf) - private val snapshot = session.createSnapshot(ts) + private val snapshot = session.createSnapshot(dagRequest.getStartTs) private[this] val tasks = tiPartition.tasks private val handleIterator = snapshot.indexHandleRead(dagRequest, tasks) @@ -96,29 +95,4 @@ class TiHandleRDD(val dagRequest: TiDAGRequest, Row.apply(regionId, handleList.toArray()) } } - - override protected def getPartitions: Array[Partition] = { - val keyWithRegionTasks = RangeSplitter - .newSplitter(session.getRegionManager) - .splitRangeByRegion(dagRequest.getRangesByPhysicalId(physicalId)) - - val hostTasksMap = new mutable.HashMap[String, mutable.Set[RegionTask]] - with mutable.MultiMap[String, RegionTask] - - var index = 0 - val result = new ListBuffer[TiPartition] - for (task <- keyWithRegionTasks) { - hostTasksMap.addBinding(task.getHost, task) - val tasks = hostTasksMap(task.getHost) - result.append(new TiPartition(index, tasks.toSeq, sparkContext.applicationId)) - index += 1 - hostTasksMap.remove(task.getHost) - } - // add rest - for (tasks <- hostTasksMap.values) { - result.append(new TiPartition(index, tasks.toSeq, sparkContext.applicationId)) - index += 1 - } - result.toArray - } } diff --git a/core/src/main/scala/org/apache/spark/sql/tispark/TiRDD.scala b/core/src/main/scala/org/apache/spark/sql/tispark/TiRDD.scala index a5b4ab769a..92b8322f23 100644 --- a/core/src/main/scala/org/apache/spark/sql/tispark/TiRDD.scala +++ b/core/src/main/scala/org/apache/spark/sql/tispark/TiRDD.scala @@ -33,57 +33,14 @@ import scala.collection.JavaConversions._ import scala.collection.mutable import scala.collection.mutable.ListBuffer -class TiRDD(val dagRequest: TiDAGRequest, - val physicalId: Long, - val tiConf: TiConfiguration, - val tableRef: TiTableReference, - @transient private val session: TiSession, - @transient private val sparkSession: SparkSession) +abstract class TiRDD(val dagRequest: TiDAGRequest, + val physicalId: Long, + val tiConf: TiConfiguration, + val tableRef: TiTableReference, + @transient private val session: TiSession, + @transient private val sparkSession: SparkSession) extends RDD[Row](sparkSession.sparkContext, Nil) { - type TiRow = com.pingcap.tikv.row.Row - - @transient lazy val (_: List[DataType], rowTransformer: RowTransformer) = - initializeSchema() - - def initializeSchema(): (List[DataType], RowTransformer) = { - val schemaInferrer: SchemaInfer = SchemaInfer.create(dagRequest) - val rowTransformer: RowTransformer = schemaInferrer.getRowTransformer - (schemaInferrer.getTypes.toList, rowTransformer) - } - - // cache invalidation call back function - // used for driver to update PD cache - private val callBackFunc = CacheInvalidateListener.getInstance() - - override def compute(split: Partition, context: TaskContext): Iterator[Row] = new Iterator[Row] { - dagRequest.resolve() - - // bypass, sum return a long type - private val tiPartition = split.asInstanceOf[TiPartition] - private val session = TiSession.getInstance(tiConf) - session.injectCallBackFunc(callBackFunc) - private val snapshot = session.createSnapshot(dagRequest.getStartTs) - private[this] val tasks = tiPartition.tasks - - private val iterator = snapshot.tableRead(dagRequest, tasks) - - override def hasNext: Boolean = { - // Kill the task in case it has been marked as killed. This logic is from - // Interrupted Iterator, but we inline it here instead of wrapping the iterator in order - // to avoid performance overhead. - if (context.isInterrupted()) { - throw new TaskKilledException - } - iterator.hasNext - } - - override def next(): Row = TiConverter.toSparkRow(iterator.next, rowTransformer) - } - - override protected def getPreferredLocations(split: Partition): Seq[String] = - split.asInstanceOf[TiPartition].tasks.head.getHost :: Nil - override protected def getPartitions: Array[Partition] = { val keyWithRegionTasks = RangeSplitter .newSplitter(session.getRegionManager) diff --git a/core/src/main/scala/org/apache/spark/sql/tispark/TiRowRDD.scala b/core/src/main/scala/org/apache/spark/sql/tispark/TiRowRDD.scala new file mode 100644 index 0000000000..c5e13b7f2f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/tispark/TiRowRDD.scala @@ -0,0 +1,86 @@ +/* + * Copyright 2017 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.tispark + +import com.pingcap.tikv._ +import com.pingcap.tikv.meta.TiDAGRequest +import com.pingcap.tikv.operation.SchemaInfer +import com.pingcap.tikv.operation.transformer.RowTransformer +import com.pingcap.tikv.types.DataType +import com.pingcap.tikv.util.RangeSplitter +import com.pingcap.tikv.util.RangeSplitter.RegionTask +import com.pingcap.tispark.listener.CacheInvalidateListener +import com.pingcap.tispark.utils.TiConverter +import com.pingcap.tispark.{TiPartition, TiTableReference} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.{Partition, TaskContext, TaskKilledException} + +import scala.collection.JavaConversions._ +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +class TiRowRDD(override val dagRequest: TiDAGRequest, + override val physicalId: Long, + override val tiConf: TiConfiguration, + override val tableRef: TiTableReference, + @transient private val session: TiSession, + @transient private val sparkSession: SparkSession) + extends TiRDD(dagRequest, physicalId, tiConf, tableRef, session, sparkSession) { + + type TiRow = com.pingcap.tikv.row.Row + + @transient lazy val (_: List[DataType], rowTransformer: RowTransformer) = + initializeSchema() + + def initializeSchema(): (List[DataType], RowTransformer) = { + val schemaInferrer: SchemaInfer = SchemaInfer.create(dagRequest) + val rowTransformer: RowTransformer = schemaInferrer.getRowTransformer + (schemaInferrer.getTypes.toList, rowTransformer) + } + + // cache invalidation call back function + // used for driver to update PD cache + private val callBackFunc = CacheInvalidateListener.getInstance() + + override def compute(split: Partition, context: TaskContext): Iterator[Row] = new Iterator[Row] { + dagRequest.resolve() + + // bypass, sum return a long type + private val tiPartition = split.asInstanceOf[TiPartition] + private val session = TiSession.getInstance(tiConf) + session.injectCallBackFunc(callBackFunc) + private val snapshot = session.createSnapshot(dagRequest.getStartTs) + private[this] val tasks = tiPartition.tasks + + private val iterator = snapshot.tableRead(dagRequest, tasks) + + override def hasNext: Boolean = { + // Kill the task in case it has been marked as killed. This logic is from + // Interrupted Iterator, but we inline it here instead of wrapping the iterator in order + // to avoid performance overhead. + if (context.isInterrupted()) { + throw new TaskKilledException + } + iterator.hasNext + } + + override def next(): Row = TiConverter.toSparkRow(iterator.next, rowTransformer) + } + + override protected def getPreferredLocations(split: Partition): Seq[String] = + split.asInstanceOf[TiPartition].tasks.head.getHost :: Nil +} diff --git a/tikv-client/src/main/java/com/pingcap/tikv/statistics/Bucket.java b/tikv-client/src/main/java/com/pingcap/tikv/statistics/Bucket.java index 9448e1b86d..06ba7dc528 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/statistics/Bucket.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/statistics/Bucket.java @@ -56,7 +56,6 @@ public Bucket(Key upperBound) { } @Override - @SuppressWarnings("unchecked") public int compareTo(Bucket b) { return upperBound.compareTo(b.upperBound); } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/types/AbstractDateTimeType.java b/tikv-client/src/main/java/com/pingcap/tikv/types/AbstractDateTimeType.java index f6b00a3774..1dbc16db68 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/types/AbstractDateTimeType.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/types/AbstractDateTimeType.java @@ -7,6 +7,7 @@ import com.pingcap.tikv.codec.Codec.DateTimeCodec; import com.pingcap.tikv.codec.CodecDataInput; import com.pingcap.tikv.codec.CodecDataOutput; +import com.pingcap.tikv.exception.ConvertNotSupportException; import com.pingcap.tikv.exception.InvalidCodecFormatException; import com.pingcap.tikv.meta.TiColumnInfo.InternalTypeHolder; import org.joda.time.DateTimeZone; @@ -79,4 +80,18 @@ protected void encodeProto(CodecDataOutput cdo, Object value) { public ExprType getProtoExprType() { return ExprType.MysqlTime; } + + java.sql.Timestamp convertToMysqlDateTime(Object value) throws ConvertNotSupportException { + java.sql.Timestamp result; + if (value instanceof String) { + result = java.sql.Timestamp.valueOf((String) value); + } else if (value instanceof java.sql.Date) { + result = new java.sql.Timestamp(((java.sql.Date) value).getTime()); + } else if (value instanceof java.sql.Timestamp) { + result = (java.sql.Timestamp) value; + } else { + throw new ConvertNotSupportException(value.getClass().getName(), this.getClass().getName()); + } + return result; + } } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/types/DateTimeType.java b/tikv-client/src/main/java/com/pingcap/tikv/types/DateTimeType.java index 455a034806..5358067bbe 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/types/DateTimeType.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/types/DateTimeType.java @@ -53,21 +53,6 @@ protected Object doConvertToTiDBType(Object value) return convertToMysqlDateTime(value); } - private java.sql.Timestamp convertToMysqlDateTime(Object value) - throws ConvertNotSupportException { - java.sql.Timestamp result; - if (value instanceof String) { - result = java.sql.Timestamp.valueOf((String) value); - } else if (value instanceof java.sql.Date) { - result = new java.sql.Timestamp(((java.sql.Date) value).getTime()); - } else if (value instanceof java.sql.Timestamp) { - result = (java.sql.Timestamp) value; - } else { - throw new ConvertNotSupportException(value.getClass().getName(), this.getClass().getName()); - } - return result; - } - /** * Decode timestamp from packed long value In TiDB / MySQL, timestamp type is converted to UTC and * stored diff --git a/tikv-client/src/main/java/com/pingcap/tikv/types/TimestampType.java b/tikv-client/src/main/java/com/pingcap/tikv/types/TimestampType.java index 9dc78e65b0..6fb0d82a7c 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/types/TimestampType.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/types/TimestampType.java @@ -72,16 +72,9 @@ private java.sql.Timestamp convertToMysqlLocalTimestamp(Object value) if (value instanceof Long) { throw new ConvertNotSupportException(value.getClass().getName(), this.getClass().getName()); // result = new java.sql.Timestamp((Long) value); - } else if (value instanceof String) { - result = java.sql.Timestamp.valueOf((String) value); - } else if (value instanceof java.sql.Date) { - result = new java.sql.Timestamp(((java.sql.Date) value).getTime()); - } else if (value instanceof java.sql.Timestamp) { - result = (java.sql.Timestamp) value; } else { - throw new ConvertNotSupportException(value.getClass().getName(), this.getClass().getName()); + return convertToMysqlDateTime(value); } - return result; } /**