From 0dceaddf9ca733fd16ab129ccd8e03fbe7f3e292 Mon Sep 17 00:00:00 2001 From: "a.artigao" Date: Sun, 24 Sep 2023 10:15:19 +0200 Subject: [PATCH] Bump CE and fs2 to the latest version --- .github/workflows/ci.yml | 4 ++-- .scala-steward.conf | 12 ------------ build.sbt | 6 +++--- .../fs2/kafka/CommittableProducerRecords.scala | 2 +- .../main/scala/fs2/kafka/KafkaProducer.scala | 17 ++++++----------- .../fs2/kafka/internal/KafkaConsumerActor.scala | 4 ++-- .../core/src/main/scala/fs2/kafka/package.scala | 2 +- .../scala/fs2/kafka/KafkaProducerSpec.scala | 4 ++-- .../src/test/scala/fs2/kafka/KafkaSpec.scala | 2 +- .../kafka/TransactionalKafkaProducerSpec.scala | 10 +++++----- 10 files changed, 23 insertions(+), 40 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4ebb4faf5..e19843d28 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,12 +28,12 @@ jobs: strategy: matrix: os: [ubuntu-latest] - scala: [2.12.17, 2.13.10, 3.2.2] + scala: [2.12.17, 2.13.10, 3.3.1] java: [temurin@8, temurin@17] exclude: - scala: 2.12.17 java: temurin@17 - - scala: 3.2.2 + - scala: 3.3.1 java: temurin@17 runs-on: ${{ matrix.os }} steps: diff --git a/.scala-steward.conf b/.scala-steward.conf index 8e3e92e57..d6ef8e254 100644 --- a/.scala-steward.conf +++ b/.scala-steward.conf @@ -1,18 +1,6 @@ pullRequests.frequency = "14 days" updates.ignore = [{ - groupId = "org.typelevel", - artifactId="cats-effect" -},{ - groupId = "org.typelevel", - artifactId="cats-effect-laws" -},{ - groupId = "org.typelevel", - artifactId="cats-effect-testkit" -},{ - groupId = "co.fs2", - artifactId="fs2-core" -},{ groupId = "com.dimafeng" },{ groupId = "org.scalameta", diff --git a/build.sbt b/build.sbt index 60a973bc8..88b6ff55d 100644 --- a/build.sbt +++ b/build.sbt @@ -1,10 +1,10 @@ -val catsEffectVersion = "3.4.9" +val catsEffectVersion = "3.5.1" val catsVersion = "2.6.1" val confluentVersion = "7.3.4" -val fs2Version = "3.6.1" +val fs2Version = "3.9.2" val kafkaVersion = "3.4.1" @@ -18,7 +18,7 @@ val scala212 = "2.12.17" val scala213 = "2.13.10" -val scala3 = "3.2.2" +val scala3 = "3.3.1" ThisBuild / tlBaseVersion := "3.0" diff --git a/modules/core/src/main/scala/fs2/kafka/CommittableProducerRecords.scala b/modules/core/src/main/scala/fs2/kafka/CommittableProducerRecords.scala index 652aefc18..28226d34c 100644 --- a/modules/core/src/main/scala/fs2/kafka/CommittableProducerRecords.scala +++ b/modules/core/src/main/scala/fs2/kafka/CommittableProducerRecords.scala @@ -58,7 +58,7 @@ object CommittableProducerRecords { records: G[ProducerRecord[K, V]], offset: CommittableOffset[F] )(implicit G: Foldable[G]): CommittableProducerRecords[F, K, V] = - chunk(Chunk.iterable(Foldable[G].toIterable(records)), offset) + chunk(Chunk.from(Foldable[G].toIterable(records)), offset) /** * Creates a new [[CommittableProducerRecords]] for producing exactly diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala index d62fce43c..01b769a9a 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala @@ -197,17 +197,12 @@ object KafkaProducer { else promise.failure(exception) } ) - }.as { - F.delay(promise.future).flatMap { fut => - F.executionContext.flatMap { implicit ec => - F.async[(ProducerRecord[K, V], RecordMetadata)] { cb => - F.delay(fut.onComplete(t => cb(t.toEither))).as(Some(F.unit)) - } - } - } - // TODO: replace the above with the following once CE3.5.0 is out - // F.fromFutureCancelable(F.delay(promise.future)) - } + }.map( + javaFuture => + F.fromFutureCancelable( + F.delay((promise.future, F.delay(javaFuture.cancel(true)).void)) + ) + ) } } diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index 30e51ae1b..183f885c8 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -152,7 +152,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( val action = st.fetches.filterKeysStrictList(withRecords).traverse { case (partition, partitionFetches) => - val records = Chunk.vector(st.records(partition).toVector) + val records = Chunk.from(st.records(partition).toVector) partitionFetches.values.toList.traverse(_.completeRevoked(records)) } >> logging.log( RevokedFetchesWithRecords(st.records.filterKeysStrict(withRecords), newState) @@ -317,7 +317,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( def completeFetches: F[Unit] = state.fetches.filterKeysStrictList(canBeCompleted).traverse_ { case (partition, fetches) => - val records = Chunk.vector(allRecords(partition).toVector) + val records = Chunk.from(allRecords(partition).toVector) fetches.values.toList.traverse_(_.completeRecords(records)) } diff --git a/modules/core/src/main/scala/fs2/kafka/package.scala b/modules/core/src/main/scala/fs2/kafka/package.scala index 6f5999b8e..f2b94436c 100644 --- a/modules/core/src/main/scala/fs2/kafka/package.scala +++ b/modules/core/src/main/scala/fs2/kafka/package.scala @@ -94,7 +94,7 @@ package kafka { records: F[ProducerRecord[K, V]] )( implicit F: Traverse[F] - ): ProducerRecords[K, V] = Chunk.iterable(Foldable[F].toIterable(records)) + ): ProducerRecords[K, V] = Chunk.from(Foldable[F].toIterable(records)) def one[K, V](record: ProducerRecord[K, V]): ProducerRecords[K, V] = Chunk.singleton(record) diff --git a/modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala b/modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala index 4aefa5fc4..caeb3384f 100644 --- a/modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala @@ -36,7 +36,7 @@ final class KafkaProducerSpec extends BaseKafkaSpec { (for { producer <- KafkaProducer.stream(producerSettings[IO]) _ <- Stream.eval(IO(producer.toString should startWith("KafkaProducer$"))) - (records, passthrough) <- Stream.chunk(Chunk.seq(toProduce).map { + (records, passthrough) <- Stream.chunk(Chunk.from(toProduce).map { case passthrough @ (key, value) => (ProducerRecords.one(ProducerRecord(topic, key, value)), passthrough) }) @@ -63,7 +63,7 @@ final class KafkaProducerSpec extends BaseKafkaSpec { (for { producer <- KafkaProducer[IO].stream(producerSettings[IO]) - records <- Stream.chunk(Chunk.seq(toProduce).map { + records <- Stream.chunk(Chunk.from(toProduce).map { case (key, value) => ProducerRecords.one(ProducerRecord(topic, key, value)) }) diff --git a/modules/core/src/test/scala/fs2/kafka/KafkaSpec.scala b/modules/core/src/test/scala/fs2/kafka/KafkaSpec.scala index d67ea5b0b..0bfe1a00c 100644 --- a/modules/core/src/test/scala/fs2/kafka/KafkaSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/KafkaSpec.scala @@ -23,7 +23,7 @@ final class KafkaSpec extends BaseAsyncSpec { (for { ref <- Stream.eval(Ref[IO].of(Option.empty[Map[TopicPartition, OffsetAndMetadata]])) commit = (offsets: Map[TopicPartition, OffsetAndMetadata]) => ref.set(Some(offsets)) - offsets = Chunk.seq(exampleOffsets(commit)) + offsets = Chunk.from(exampleOffsets(commit)) _ <- Stream .chunk(offsets) .covary[IO] diff --git a/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala b/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala index 1d1c8a1ed..db93f3dc2 100644 --- a/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/TransactionalKafkaProducerSpec.scala @@ -79,7 +79,7 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues { ) ) _ <- Stream.eval(IO(producer.toString should startWith("TransactionalKafkaProducer$"))) - (records, passthrough) <- Stream.chunk(Chunk.seq(toProduce)).zipWithIndex.map { + (records, passthrough) <- Stream.chunk(Chunk.from(toProduce)).zipWithIndex.map { case ((key, value), i) => val record = ProducerRecord(topic, key, value) @@ -185,7 +185,7 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues { _ => IO.unit ) - records = Chunk.seq(0 to 100).map(i => CommittableProducerRecords(Chunk.empty, offsets(i))) + records = Chunk.from(0 to 100).map(i => CommittableProducerRecords(Chunk.empty, offsets(i))) results <- Stream.eval(producer.produce(records)) } yield { @@ -198,7 +198,7 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues { private def testMultiple(topic: String, makeOffset: Option[Int => CommittableOffset[IO]]) = { createCustomTopic(topic, partitions = 3) val toProduce = - Chunk.seq((0 to 100).toList.map(n => s"key-$n" -> s"value-$n")) + Chunk.from((0 to 100).toList.map(n => s"key-$n" -> s"value-$n")) val toPassthrough = "passthrough" @@ -262,7 +262,7 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues { withTopic { topic => createCustomTopic(topic, partitions = 3) val toProduce = - Chunk.seq((0 to 1000000).toList.map(n => s"key-$n" -> s"value-$n")) + Chunk.from((0 to 1000000).toList.map(n => s"key-$n" -> s"value-$n")) val result = (for { @@ -367,7 +367,7 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues { _ => IO.unit ) } - records = Chunk.seq(recordsToProduce.zip(offsets)).map { + records = Chunk.from(recordsToProduce.zip(offsets)).map { case (record, offset) => CommittableProducerRecords.chunk( Chunk.singleton(record),