Skip to content

Commit

Permalink
#475: Use Scala 2.13 compatible collection syntaxes (#572)
Browse files Browse the repository at this point in the history
* Use Scala 2.13 compatibile collection syntaxes

* Enable existential types for Scala 2.11
  • Loading branch information
xerial authored Aug 2, 2019
1 parent f138d74 commit 6f20a16
Show file tree
Hide file tree
Showing 37 changed files with 108 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import java.util.concurrent.atomic.AtomicLong

import wvlet.log.LogSupport

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

private[canvas] case class MemoryRefHolder(ref: MemoryReference, size: Long)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import java.sql.{ResultSet, Time, Timestamp, Types}
import wvlet.airframe.codec.PrimitiveCodec._
import wvlet.airframe.msgpack.spi.{MessagePack, Packer, Unpacker, ValueType}
import wvlet.log.LogSupport
import scala.collection.compat._

/**
*
Expand Down Expand Up @@ -45,7 +46,7 @@ object JDBCCodec extends LogSupport {
/**
* Encode the all ResultSet rows as JSON object values
*/
def toJsonSeq: TraversableOnce[String] = {
def toJsonSeq: IterableOnce[String] = {
mapMsgPackMapRows { msgpack =>
JSONCodec.toJson(msgpack)
}
Expand Down Expand Up @@ -93,14 +94,14 @@ object JDBCCodec extends LogSupport {
/**
* Create an interator for reading ResultSet as a sequence of MsgPack Map values
*/
def mapMsgPackMapRows[U](f: Array[Byte] => U): TraversableOnce[U] = {
def mapMsgPackMapRows[U](f: Array[Byte] => U): IterableOnce[U] = {
new RStoMsgPackIterator[U](f, packer = packRowAsMap(_))
}

/**
* Create an interator for reading ResultSet as a sequence of MsgPack array values
*/
def mapMsgPackArrayRows[U](f: Array[Byte] => U): TraversableOnce[U] = {
def mapMsgPackArrayRows[U](f: Array[Byte] => U): IterableOnce[U] = {
new RStoMsgPackIterator[U](f, packer = packRowAsArray(_))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package wvlet.airframe.codec
import wvlet.airframe.msgpack.spi.MessagePack
import wvlet.airframe.surface.Surface

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import wvlet.airframe.AirframeSpec
import wvlet.airframe.codec.JDBCCodec._
import wvlet.airframe.msgpack.spi.MessagePack
import wvlet.log.io.IOUtil.withResource
import scala.collection.compat._

/**
*
Expand Down Expand Up @@ -183,7 +184,7 @@ class JDBCCodecTest extends AirframeSpec {
|(select * from (values (1, 'leo'), (2, 'yui')))
|select * from a order by id asc
|""".stripMargin) { rs =>
val jsonSeq = JDBCCodec(rs).toJsonSeq.toIndexedSeq
val jsonSeq = JDBCCodec(rs).toJsonSeq.iterator.toIndexedSeq
jsonSeq(0) shouldBe """{"id":1,"name":"leo"}"""
jsonSeq(1) shouldBe """{"id":2,"name":"yui"}"""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import wvlet.airframe.surface.{ArraySurface, GenericSurface, Surface}
*
*/
class PrimitiveCodecTest extends CodecSpec with ScalaCheckDrivenPropertyChecks {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

def roundTripTest[T](surface: Surface, dataType: DataType)(implicit impArb: Arbitrary[T]): Unit = {
forAll { (v: T) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import wvlet.airframe.json.JSONParseException
import wvlet.airframe.msgpack.spi.{MessagePack, Packer, Unpacker, ValueType}
import wvlet.airframe.surface.{Surface, Zero}

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.mutable

object CollectionCodec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ object PropertiesConfig extends LogSupport {

def overrideWithProperties(config: Config, props: Properties, onUnusedProperties: Properties => Unit): Config = {
val overrides: Seq[ConfigProperty] = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
val b = Seq.newBuilder[ConfigProperty]
for ((k, v) <- props.asScala) yield {
val key = configKeyOf(k)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import wvlet.log.io.IOUtil._
import wvlet.airframe.surface.Surface
import wvlet.airframe.surface.reflect.ReflectTypeUtil

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.immutable.ListMap
import scala.reflect.runtime.{universe => ru}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ package object config {
def overrideConfigParamsWithProperties(
props: Properties,
onUnusedProperties: Properties => Unit = REPORT_UNUSED_PROPERTIES): Design = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
val m = for (key <- props.propertyNames().asScala) yield {
key.toString -> props.get(key).asInstanceOf[Any]
}
Expand Down
118 changes: 59 additions & 59 deletions airframe-control/src/main/scala/wvlet/airframe/control/Parallel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ object Parallel extends LogSupport {
/**
* Process all elements of the source by the given function then wait for the completion.
*
* @param source Source collection
* @param source Source collection
* @param parallelism Parallelism, the default value is a number of available processors
* @param f Function which processes each element of the source collection
* @param f Function which processes each element of the source collection
* @return Collection of the results
*/
def run[T, R: ClassTag](source: Seq[T], parallelism: Int = Runtime.getRuntime.availableProcessors())(
Expand All @@ -81,7 +81,7 @@ object Parallel extends LogSupport {

try {
// Process all elements of source
val it = source.zipWithIndex.toIterator
val it = source.zipWithIndex.iterator
while (it.hasNext) {
val worker = requestQueue.take()
worker.message.set(it.next())
Expand Down Expand Up @@ -114,9 +114,9 @@ object Parallel extends LogSupport {
* Process all elements of the source by the given function then don't wait completion.
* The result is an iterator which is likely a stream which elements are pushed continuously.
*
* @param source the source iterator
* @param source the source iterator
* @param parallelism Parallelism, the default value is a number of available processors
* @param f Function which processes each element of the source collection
* @param f Function which processes each element of the source collection
* @return Iterator of the results
*/
def iterate[T, R](source: Iterator[T],
Expand Down Expand Up @@ -174,60 +174,60 @@ object Parallel extends LogSupport {
new ResultIterator[R](resultQueue)
}

// /**
// * Run the given function with each element of the source periodically and repeatedly.
// * Execution can be stopped by the returned Stoppable object.
// *
// * @param source Source collection
// * @param interval Interval of execution of an element
// * @param f Function which process each element of the source collection
// * @return Object to stop execution
// */
// def repeat[T](source: Seq[T], interval: Duration, ticker: Ticker = Ticker.systemTicker)(f: T => Unit): Stoppable = {
// val requestQueue = new LinkedBlockingQueue[IndexedWorker[T, Unit]](source.size)
// val resultArray = new Array[Unit](source.size)
// val executor = Executors.newFixedThreadPool(source.size)
// val cancelable = new Stoppable(executor)
//
// Range(0, source.size).foreach { _ =>
// val repeatedFunction = (arg: T) => {
// while (!cancelable.isStopped) {
// // Use nanotime to make it independent from the system clock time
// val startNano = ticker.read
// f(arg)
// val durationNanos = ticker.read - startNano
// val wait = math.max(0, interval.toMillis - TimeUnit.NANOSECONDS.toMillis(durationNanos))
// try {
// Thread.sleep(wait)
// } catch {
// case _: InterruptedException => ()
// }
// }
// }
//
// val worker = new IndexedWorker[T, Unit](requestQueue, resultArray, repeatedFunction)
// requestQueue.put(worker)
// }
//
// source.zipWithIndex.foreach {
// case (e, i) =>
// val worker = requestQueue.take()
// worker.message.set(e, i)
// executor.execute(worker)
// }
//
// cancelable
// }
//
// class Stoppable(executor: ExecutorService) {
// private val cancelled = new AtomicBoolean(false)
// def isStopped: Boolean = cancelled.get()
//
// def stop: Unit = {
// executor.shutdownNow()
// cancelled.set(true)
// }
// }
// /**
// * Run the given function with each element of the source periodically and repeatedly.
// * Execution can be stopped by the returned Stoppable object.
// *
// * @param source Source collection
// * @param interval Interval of execution of an element
// * @param f Function which process each element of the source collection
// * @return Object to stop execution
// */
// def repeat[T](source: Seq[T], interval: Duration, ticker: Ticker = Ticker.systemTicker)(f: T => Unit): Stoppable = {
// val requestQueue = new LinkedBlockingQueue[IndexedWorker[T, Unit]](source.size)
// val resultArray = new Array[Unit](source.size)
// val executor = Executors.newFixedThreadPool(source.size)
// val cancelable = new Stoppable(executor)
//
// Range(0, source.size).foreach { _ =>
// val repeatedFunction = (arg: T) => {
// while (!cancelable.isStopped) {
// // Use nanotime to make it independent from the system clock time
// val startNano = ticker.read
// f(arg)
// val durationNanos = ticker.read - startNano
// val wait = math.max(0, interval.toMillis - TimeUnit.NANOSECONDS.toMillis(durationNanos))
// try {
// Thread.sleep(wait)
// } catch {
// case _: InterruptedException => ()
// }
// }
// }
//
// val worker = new IndexedWorker[T, Unit](requestQueue, resultArray, repeatedFunction)
// requestQueue.put(worker)
// }
//
// source.zipWithIndex.foreach {
// case (e, i) =>
// val worker = requestQueue.take()
// worker.message.set(e, i)
// executor.execute(worker)
// }
//
// cancelable
// }
//
// class Stoppable(executor: ExecutorService) {
// private val cancelled = new AtomicBoolean(false)
// def isStopped: Boolean = cancelled.get()
//
// def stop: Unit = {
// executor.shutdownNow()
// cancelled.set(true)
// }
// }

private[control] class Worker[T, R](executionId: String,
workerId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import wvlet.log.LogSupport

import scala.collection.mutable.WeakHashMap
import scala.sys.process.{Process, ProcessLogger}
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

/**
* Launch UNIX (or cygwin) commands from Scala
Expand Down Expand Up @@ -190,7 +190,7 @@ object Shell extends LogSupport {
def prepareProcessBuilder(cmdLine: String, inheritIO: Boolean): ProcessBuilder = {
trace(s"cmdLine: $cmdLine")
val tokens = Array(Shell.getCommand("sh"), "-c", if (OS.isWindows) quote(cmdLine) else cmdLine)
prepareProcessBuilderFromSeq(tokens, inheritIO)
prepareProcessBuilderFromSeq(tokens.toIndexedSeq, inheritIO)
}

def prepareProcessBuilderFromSeq(tokens: Seq[String], inheritIO: Boolean): ProcessBuilder = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ParallelTest extends AirframeSpec {

val source = Seq(1, 2, 3)
val startTime = Array(Long.MaxValue, Long.MaxValue, Long.MaxValue)
val result = Parallel.iterate(source.toIterator, parallelism = 3) { i =>
val result = Parallel.iterate(source.iterator, parallelism = 3) { i =>
startTime(i - 1) = System.currentTimeMillis()
i * 2
}
Expand Down Expand Up @@ -103,7 +103,7 @@ class ParallelTest extends AirframeSpec {
val exception = new RuntimeException("failure")

val result = Parallel
.iterate(source.toIterator, parallelism = 3) { i =>
.iterate(source.iterator, parallelism = 3) { i =>
Try {
if (i == 2) {
throw exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class FluentdLogger(val tagPrefix: Option[String] = None, useExtendedEventTime:
}

private def toJavaMap(event: Map[String, Any]): java.util.Map[String, AnyRef] = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
(for ((k, v) <- event) yield {
k -> v.asInstanceOf[AnyRef]
}).asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class MetricLoggerFactory(fluentdClient: MetricLogger,
def getLoggerWithTagPrefix(tagPrefix: String): MetricLogger =
fluentdClient.withTagPrefix(tagPrefix)

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.reflect.runtime.{universe => ru}

private val loggerCache = new ConcurrentHashMap[Surface, TypedMetricLogger[_]]().asScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import wvlet.airframe.control.ResultClass
import wvlet.airframe.control.ResultClass.{Failed, Succeeded, nonRetryableFailure, retryableFailure}
import wvlet.airframe.control.Retry.RetryContext
import wvlet.log.LogSupport
import scala.language.existentials

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ case class JMXMBean(obj: AnyRef, mBeanInfo: MBeanInfo, attributes: Seq[MBeanPara

override def setAttributes(attributes: AttributeList): AttributeList = {
val l = new AttributeList(attributes.size())
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
for (a <- attributes.asList().asScala) {
l.add(setAttribute(a))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object JMXUtil extends LogSupport {
p.setProperty("com.sun.management.jmxremote.authenticate", "false")
p.setProperty("com.sun.management.jmxremote.ssl", "false")

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
if (isAtLeastJava9) {
// TODO Java9 support
// Try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ class CommandLauncher(private[launcher] val launcherInfo: LauncherInfo,
val subCommandName = result.unusedArgument.head
findSubCommand(subCommandName) match {
case Some(subCommand) =>
subCommand.execute(launcherConfig, nextStack, result.unusedArgument.tail, showHelpMessage)
subCommand.execute(launcherConfig, nextStack, result.unusedArgument.tail.toIndexedSeq, showHelpMessage)
case None =>
throw new IllegalArgumentException(s"Unknown sub command: ${subCommandName}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class LauncherTest extends AirframeSpec {
f.arg should be(None)

val f2 = Launcher.execute[OptArg]("hello")
f2.arg should be('defined)
f2.arg shouldBe defined
f2.arg.get should be("hello")
}

Expand Down
2 changes: 1 addition & 1 deletion airframe-log/shared/src/main/scala/wvlet/log/Logger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ object Logger {
// Set a custom LogManager to show log messages even in shutdown hooks
System.setProperty("java.util.logging.manager", "wvlet.log.AirframeLogManager")

import collection.JavaConverters._
import scala.jdk.CollectionConverters._

private lazy val loggerCache = new ConcurrentHashMap[String, Logger].asScala

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ import scala.collection.mutable.LinkedHashMap
* @author leo
*/
trait Timer extends Serializable {
import collection.JavaConverters._
import scala.jdk.CollectionConverters._
@transient private[this] val holder =
new ThreadLocal[util.ArrayDeque[TimeReport]] {
override def initialValue() = new util.ArrayDeque[TimeReport]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ case class TimeWindow(start: ZonedDateTime, end: ZonedDateTime) {
object TimeWindow {

def withTimeZone(zoneName: String): TimeWindowBuilder = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
// Add commonly used daylight saving times
val idMap = ZoneId.SHORT_IDS.asScala ++
Map("PDT" -> "-07:00", "EDT" -> "-04:00", "CDT" -> "-05:00", "MDT" -> "-06:00")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ object PackerImpl {
v8.ValueFactory.newExtension(-1, extBytes)
}
case ArrayValue(elems) =>
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
val values = elems.map(x => toMsgPackV8Value(x)).toList.asJava
v8.ValueFactory.newArray(values)
case MapValue(entries) =>
Expand Down
Loading

0 comments on commit 6f20a16

Please sign in to comment.