diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 29b9fdf9dfb9a..9e10fac8bb552 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -327,7 +327,7 @@ private[sql] class AvroDeserializer( if (nonNullTypes.length == 1) { newWriter(nonNullTypes.head, catalystType, avroPath, catalystPath) } else { - nonNullTypes.map(_.getType).toSeq match { + nonNullTypes.map(_.getType) match { case Seq(a, b) if Set(a, b) == Set(INT, LONG) && catalystType == LongType => (updater, ordinal, value) => value match { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala index 115cedfe1128d..c9ceef969e297 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala @@ -158,7 +158,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( Long.MaxValue } else { val confSize = - SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION).toLong + SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION) if (confSize > 0) System.currentTimeMillis() + confSize else Long.MaxValue } @@ -167,7 +167,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( Long.MaxValue } else { val confSize = - SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE).toLong + SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE) if (confSize > 0) confSize else Long.MaxValue } var sentResponsesSize: Long = 0 diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala index b5844486b73aa..a7877503f4611 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala @@ -102,7 +102,7 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: * value greater than 0 will buffer the response from getResponse. */ private val retryBufferSize = if (executeHolder.reattachable) { - SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE).toLong + SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE) } else { 0 } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala index d8d9cee3dad43..c90f53ac07df3 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala @@ -188,13 +188,13 @@ private[connect] class SparkConnectExecutionManager() extends Logging { scheduledExecutor match { case Some(_) => // Already running. case None => - val interval = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL).toLong + val interval = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL) logInfo(s"Starting thread for cleanup of abandoned executions every $interval ms") scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor()) scheduledExecutor.get.scheduleAtFixedRate( () => { try { - val timeout = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT).toLong + val timeout = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT) periodicMaintenance(timeout) } catch { case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex) diff --git a/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala index 7d12af3256f1f..4835e9de086c4 100644 --- a/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala +++ b/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala @@ -230,9 +230,9 @@ object KinesisWordProducerASL { // Iterate and put records onto the stream per the given recordPerSec and wordsPerRecord for (i <- 1 to 10) { // Generate recordsPerSec records to put onto the stream - val records = (1 to recordsPerSecond.toInt).foreach { recordNum => + val records = (1 to recordsPerSecond).foreach { recordNum => // Randomly generate wordsPerRecord number of words - val data = (1 to wordsPerRecord.toInt).map(x => { + val data = (1 to wordsPerRecord).map(x => { // Get a random index to a word val randomWordIdx = Random.nextInt(randomWords.size) val randomWord = randomWords(randomWordIdx) diff --git a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index d0eee9c83c20f..406c19be9bff9 100644 --- a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -130,7 +130,7 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi val producer = getProducer(aggregate) val shardIdToSeqNumbers = producer.sendData(streamName, testData) logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}") - shardIdToSeqNumbers.toMap + shardIdToSeqNumbers } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index 7a9c0263631f3..2d72b4dd6bf2e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -104,7 +104,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi } val left = num - results.size - val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts)) val buf = new Array[Array[T]](p.size) self.context.setCallSite(callSite) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index fe10e140f82de..9518433a7f694 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1486,7 +1486,7 @@ abstract class RDD[T: ClassTag]( } } - val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts)) val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p) res.foreach(buf ++= _.take(num - buf.size)) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index c1c36d7a9f046..d50b8f935d561 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -371,7 +371,7 @@ private[spark] class AppStatusStore( Double.NaN } } - }.toIndexedSeq + } } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala b/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala index 5e2b8943ed842..9e085c7078e63 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala @@ -37,7 +37,7 @@ object MLUtil { val destFSPath = new FSPath(destPath) val fs = destFSPath.getFileSystem(hadoopConf) - fs.copyFromLocalFile(false, true, new FSPath(localPath.toString), destFSPath) + fs.copyFromLocalFile(false, true, new FSPath(localPath), destFSPath) } } diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala index dcb80221b0e03..17be8cfa12b53 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala @@ -139,7 +139,7 @@ object Metadata { case (key, JInt(value)) => builder.putLong(key, value.toLong) case (key, JLong(value)) => - builder.putLong(key, value.toLong) + builder.putLong(key, value) case (key, JDouble(value)) => builder.putDouble(key, value) case (key, JBool(value)) => @@ -157,7 +157,7 @@ object Metadata { case _: JInt => builder.putLongArray(key, value.asInstanceOf[List[JInt]].map(_.num.toLong).toArray) case _: JLong => - builder.putLongArray(key, value.asInstanceOf[List[JLong]].map(_.num.toLong).toArray) + builder.putLongArray(key, value.asInstanceOf[List[JLong]].map(_.num).toArray) case _: JDouble => builder.putDoubleArray(key, value.asInstanceOf[List[JDouble]].map(_.num).toArray) case _: JBool => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 3fd1fe04aed67..bb2b7e7ae0662 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -105,7 +105,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat functionName: String, argumentName: String, candidates: Seq[String]): Throwable = { import org.apache.spark.sql.catalyst.util.StringUtils.orderSuggestedIdentifiersBySimilarity - val inputs = candidates.map(candidate => Seq(candidate)).toSeq + val inputs = candidates.map(candidate => Seq(candidate)) val recommendations = orderSuggestedIdentifiersBySimilarity(argumentName, inputs) .take(3) new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index a3dc976647be5..31e1495db7e36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1435,7 +1435,7 @@ class Dataset[T] private[sql]( case s: Symbol => Column(s.name).expr case e: Expression => e case literal => Literal(literal) - }.toSeq + } UnresolvedHint(name, exprs, logicalPlan) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index c65d1931dd1ba..7bc770a0c9e33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -518,7 +518,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } } - val parts = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) + val parts = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts)) val partsToScan = if (takeFromEnd) { // Reverse partitions to scan. So, if parts was [1, 2, 3] in 200 partitions (0 to 199), // it becomes [198, 197, 196]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index 89e9de8b0843f..88954d6f822d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -285,7 +285,7 @@ case class TableCacheQueryStageExec( sparkContext.submitJob( rdd, (_: Iterator[CachedBatch]) => (), - (0 until rdd.getNumPartitions).toSeq, + (0 until rdd.getNumPartitions), (_: Int, _: Unit) => (), () ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index 98a851f19f054..4efd94e442e4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -65,7 +65,7 @@ case class AnalyzePartitionCommand( if (filteredSpec.isEmpty) { None } else { - Some(filteredSpec.toMap) + Some(filteredSpec) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala index c4e12d5c4ae03..2cf299f87c898 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala @@ -83,7 +83,7 @@ trait OrcFiltersBase { .groupBy(_._1.toLowerCase(Locale.ROOT)) .filter(_._2.size == 1) .transform((_, v) => v.head._2) - CaseInsensitiveMap(dedupPrimitiveFields.toMap) + CaseInsensitiveMap(dedupPrimitiveFields) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index c33a7c4728427..101a9e6b91998 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -547,13 +547,13 @@ class RocksDB( readerMemUsage + memTableMemUsage + blockCacheUsage, pinnedBlocksMemUsage, totalSSTFilesBytes, - nativeOpsLatencyMicros.toMap, + nativeOpsLatencyMicros, commitLatencyMs, bytesCopied = fileManagerMetrics.bytesCopied, filesCopied = fileManagerMetrics.filesCopied, filesReused = fileManagerMetrics.filesReused, zipFileBytesUncompressed = fileManagerMetrics.zipFileBytesUncompressed, - nativeOpsMetrics = nativeOpsMetrics.toMap) + nativeOpsMetrics = nativeOpsMetrics) } /** @@ -861,7 +861,7 @@ object RocksDBConf { } def getStringConf(conf: ConfEntry): String = { - Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default).toString } getOrElse { + Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default) } getOrElse { throw new IllegalArgumentException( s"Invalid value for '${conf.fullName}', must be a string" )