Skip to content

Commit

Permalink
[SPARK-46455][CORE][SQL][SS][CONNECT][PYTHON] Remove redundant type c…
Browse files Browse the repository at this point in the history
…onversion

### What changes were proposed in this pull request?
This pr aims to clean up redundant type conversion in Spark production code.

### Why are the changes needed?
Code clean up.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#44412 from LuciferYang/cleanup-redundant-conversion.

Lead-authored-by: yangjie01 <yangjie01@baidu.com>
Co-authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
LuciferYang authored and dongjoon-hyun committed Dec 24, 2023
1 parent 53c7d16 commit 5db7beb
Show file tree
Hide file tree
Showing 18 changed files with 24 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ private[sql] class AvroDeserializer(
if (nonNullTypes.length == 1) {
newWriter(nonNullTypes.head, catalystType, avroPath, catalystPath)
} else {
nonNullTypes.map(_.getType).toSeq match {
nonNullTypes.map(_.getType) match {
case Seq(a, b) if Set(a, b) == Set(INT, LONG) && catalystType == LongType =>
(updater, ordinal, value) =>
value match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
Long.MaxValue
} else {
val confSize =
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION).toLong
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION)
if (confSize > 0) System.currentTimeMillis() + confSize else Long.MaxValue
}

Expand All @@ -167,7 +167,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
Long.MaxValue
} else {
val confSize =
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE).toLong
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE)
if (confSize > 0) confSize else Long.MaxValue
}
var sentResponsesSize: Long = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder:
* value greater than 0 will buffer the response from getResponse.
*/
private val retryBufferSize = if (executeHolder.reattachable) {
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE).toLong
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE)
} else {
0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,13 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
scheduledExecutor match {
case Some(_) => // Already running.
case None =>
val interval = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL).toLong
val interval = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL)
logInfo(s"Starting thread for cleanup of abandoned executions every $interval ms")
scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
scheduledExecutor.get.scheduleAtFixedRate(
() => {
try {
val timeout = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT).toLong
val timeout = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT)
periodicMaintenance(timeout)
} catch {
case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ object KinesisWordProducerASL {
// Iterate and put records onto the stream per the given recordPerSec and wordsPerRecord
for (i <- 1 to 10) {
// Generate recordsPerSec records to put onto the stream
val records = (1 to recordsPerSecond.toInt).foreach { recordNum =>
val records = (1 to recordsPerSecond).foreach { recordNum =>
// Randomly generate wordsPerRecord number of words
val data = (1 to wordsPerRecord.toInt).map(x => {
val data = (1 to wordsPerRecord).map(x => {
// Get a random index to a word
val randomWordIdx = Random.nextInt(randomWords.size)
val randomWord = randomWords(randomWordIdx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
val producer = getProducer(aggregate)
val shardIdToSeqNumbers = producer.sendData(streamName, testData)
logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
shardIdToSeqNumbers.toMap
shardIdToSeqNumbers
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
}

val left = num - results.size
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts))

val buf = new Array[Array[T]](p.size)
self.context.setCallSite(callSite)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1486,7 +1486,7 @@ abstract class RDD[T: ClassTag](
}
}

val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts))
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)

res.foreach(buf ++= _.take(num - buf.size))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ private[spark] class AppStatusStore(
Double.NaN
}
}
}.toIndexedSeq
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object MLUtil {
val destFSPath = new FSPath(destPath)
val fs = destFSPath.getFileSystem(hadoopConf)

fs.copyFromLocalFile(false, true, new FSPath(localPath.toString), destFSPath)
fs.copyFromLocalFile(false, true, new FSPath(localPath), destFSPath)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ object Metadata {
case (key, JInt(value)) =>
builder.putLong(key, value.toLong)
case (key, JLong(value)) =>
builder.putLong(key, value.toLong)
builder.putLong(key, value)
case (key, JDouble(value)) =>
builder.putDouble(key, value)
case (key, JBool(value)) =>
Expand All @@ -157,7 +157,7 @@ object Metadata {
case _: JInt =>
builder.putLongArray(key, value.asInstanceOf[List[JInt]].map(_.num.toLong).toArray)
case _: JLong =>
builder.putLongArray(key, value.asInstanceOf[List[JLong]].map(_.num.toLong).toArray)
builder.putLongArray(key, value.asInstanceOf[List[JLong]].map(_.num).toArray)
case _: JDouble =>
builder.putDoubleArray(key, value.asInstanceOf[List[JDouble]].map(_.num).toArray)
case _: JBool =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
functionName: String, argumentName: String, candidates: Seq[String]): Throwable = {
import org.apache.spark.sql.catalyst.util.StringUtils.orderSuggestedIdentifiersBySimilarity

val inputs = candidates.map(candidate => Seq(candidate)).toSeq
val inputs = candidates.map(candidate => Seq(candidate))
val recommendations = orderSuggestedIdentifiersBySimilarity(argumentName, inputs)
.take(3)
new AnalysisException(
Expand Down
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1435,7 +1435,7 @@ class Dataset[T] private[sql](
case s: Symbol => Column(s.name).expr
case e: Expression => e
case literal => Literal(literal)
}.toSeq
}
UnresolvedHint(name, exprs, logicalPlan)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}
}

val parts = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val parts = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts))
val partsToScan = if (takeFromEnd) {
// Reverse partitions to scan. So, if parts was [1, 2, 3] in 200 partitions (0 to 199),
// it becomes [198, 197, 196].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ case class TableCacheQueryStageExec(
sparkContext.submitJob(
rdd,
(_: Iterator[CachedBatch]) => (),
(0 until rdd.getNumPartitions).toSeq,
(0 until rdd.getNumPartitions),
(_: Int, _: Unit) => (),
()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ case class AnalyzePartitionCommand(
if (filteredSpec.isEmpty) {
None
} else {
Some(filteredSpec.toMap)
Some(filteredSpec)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ trait OrcFiltersBase {
.groupBy(_._1.toLowerCase(Locale.ROOT))
.filter(_._2.size == 1)
.transform((_, v) => v.head._2)
CaseInsensitiveMap(dedupPrimitiveFields.toMap)
CaseInsensitiveMap(dedupPrimitiveFields)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -547,13 +547,13 @@ class RocksDB(
readerMemUsage + memTableMemUsage + blockCacheUsage,
pinnedBlocksMemUsage,
totalSSTFilesBytes,
nativeOpsLatencyMicros.toMap,
nativeOpsLatencyMicros,
commitLatencyMs,
bytesCopied = fileManagerMetrics.bytesCopied,
filesCopied = fileManagerMetrics.filesCopied,
filesReused = fileManagerMetrics.filesReused,
zipFileBytesUncompressed = fileManagerMetrics.zipFileBytesUncompressed,
nativeOpsMetrics = nativeOpsMetrics.toMap)
nativeOpsMetrics = nativeOpsMetrics)
}

/**
Expand Down Expand Up @@ -861,7 +861,7 @@ object RocksDBConf {
}

def getStringConf(conf: ConfEntry): String = {
Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default).toString } getOrElse {
Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default) } getOrElse {
throw new IllegalArgumentException(
s"Invalid value for '${conf.fullName}', must be a string"
)
Expand Down

0 comments on commit 5db7beb

Please sign in to comment.