Skip to content

Commit

Permalink
Merge pull request #1246 from aartigao/bump-ce-fs2
Browse files Browse the repository at this point in the history
Bump CE and fs2 to the latest version
  • Loading branch information
aartigao authored Sep 27, 2023
2 parents caa0411 + 0dceadd commit 7b230df
Show file tree
Hide file tree
Showing 10 changed files with 23 additions and 40 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
scala: [2.12.17, 2.13.10, 3.2.2]
scala: [2.12.17, 2.13.10, 3.3.1]
java: [temurin@8, temurin@17]
exclude:
- scala: 2.12.17
java: temurin@17
- scala: 3.2.2
- scala: 3.3.1
java: temurin@17
runs-on: ${{ matrix.os }}
steps:
Expand Down
12 changes: 0 additions & 12 deletions .scala-steward.conf
Original file line number Diff line number Diff line change
@@ -1,18 +1,6 @@
pullRequests.frequency = "14 days"

updates.ignore = [{
groupId = "org.typelevel",
artifactId="cats-effect"
},{
groupId = "org.typelevel",
artifactId="cats-effect-laws"
},{
groupId = "org.typelevel",
artifactId="cats-effect-testkit"
},{
groupId = "co.fs2",
artifactId="fs2-core"
},{
groupId = "com.dimafeng"
},{
groupId = "org.scalameta",
Expand Down
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
val catsEffectVersion = "3.4.9"
val catsEffectVersion = "3.5.1"

val catsVersion = "2.6.1"

val confluentVersion = "7.3.4"

val fs2Version = "3.6.1"
val fs2Version = "3.9.2"

val kafkaVersion = "3.4.1"

Expand All @@ -18,7 +18,7 @@ val scala212 = "2.12.17"

val scala213 = "2.13.10"

val scala3 = "3.2.2"
val scala3 = "3.3.1"

ThisBuild / tlBaseVersion := "3.0"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object CommittableProducerRecords {
records: G[ProducerRecord[K, V]],
offset: CommittableOffset[F]
)(implicit G: Foldable[G]): CommittableProducerRecords[F, K, V] =
chunk(Chunk.iterable(Foldable[G].toIterable(records)), offset)
chunk(Chunk.from(Foldable[G].toIterable(records)), offset)

/**
* Creates a new [[CommittableProducerRecords]] for producing exactly
Expand Down
17 changes: 6 additions & 11 deletions modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,17 +197,12 @@ object KafkaProducer {
else promise.failure(exception)
}
)
}.as {
F.delay(promise.future).flatMap { fut =>
F.executionContext.flatMap { implicit ec =>
F.async[(ProducerRecord[K, V], RecordMetadata)] { cb =>
F.delay(fut.onComplete(t => cb(t.toEither))).as(Some(F.unit))
}
}
}
// TODO: replace the above with the following once CE3.5.0 is out
// F.fromFutureCancelable(F.delay(promise.future))
}
}.map(
javaFuture =>
F.fromFutureCancelable(
F.delay((promise.future, F.delay(javaFuture.cancel(true)).void))
)
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](

val action = st.fetches.filterKeysStrictList(withRecords).traverse {
case (partition, partitionFetches) =>
val records = Chunk.vector(st.records(partition).toVector)
val records = Chunk.from(st.records(partition).toVector)
partitionFetches.values.toList.traverse(_.completeRevoked(records))
} >> logging.log(
RevokedFetchesWithRecords(st.records.filterKeysStrict(withRecords), newState)
Expand Down Expand Up @@ -317,7 +317,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V](
def completeFetches: F[Unit] =
state.fetches.filterKeysStrictList(canBeCompleted).traverse_ {
case (partition, fetches) =>
val records = Chunk.vector(allRecords(partition).toVector)
val records = Chunk.from(allRecords(partition).toVector)
fetches.values.toList.traverse_(_.completeRecords(records))
}

Expand Down
2 changes: 1 addition & 1 deletion modules/core/src/main/scala/fs2/kafka/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ package kafka {
records: F[ProducerRecord[K, V]]
)(
implicit F: Traverse[F]
): ProducerRecords[K, V] = Chunk.iterable(Foldable[F].toIterable(records))
): ProducerRecords[K, V] = Chunk.from(Foldable[F].toIterable(records))

def one[K, V](record: ProducerRecord[K, V]): ProducerRecords[K, V] =
Chunk.singleton(record)
Expand Down
4 changes: 2 additions & 2 deletions modules/core/src/test/scala/fs2/kafka/KafkaProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ final class KafkaProducerSpec extends BaseKafkaSpec {
(for {
producer <- KafkaProducer.stream(producerSettings[IO])
_ <- Stream.eval(IO(producer.toString should startWith("KafkaProducer$")))
(records, passthrough) <- Stream.chunk(Chunk.seq(toProduce).map {
(records, passthrough) <- Stream.chunk(Chunk.from(toProduce).map {
case passthrough @ (key, value) =>
(ProducerRecords.one(ProducerRecord(topic, key, value)), passthrough)
})
Expand All @@ -63,7 +63,7 @@ final class KafkaProducerSpec extends BaseKafkaSpec {

(for {
producer <- KafkaProducer[IO].stream(producerSettings[IO])
records <- Stream.chunk(Chunk.seq(toProduce).map {
records <- Stream.chunk(Chunk.from(toProduce).map {
case (key, value) =>
ProducerRecords.one(ProducerRecord(topic, key, value))
})
Expand Down
2 changes: 1 addition & 1 deletion modules/core/src/test/scala/fs2/kafka/KafkaSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ final class KafkaSpec extends BaseAsyncSpec {
(for {
ref <- Stream.eval(Ref[IO].of(Option.empty[Map[TopicPartition, OffsetAndMetadata]]))
commit = (offsets: Map[TopicPartition, OffsetAndMetadata]) => ref.set(Some(offsets))
offsets = Chunk.seq(exampleOffsets(commit))
offsets = Chunk.from(exampleOffsets(commit))
_ <- Stream
.chunk(offsets)
.covary[IO]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
)
)
_ <- Stream.eval(IO(producer.toString should startWith("TransactionalKafkaProducer$")))
(records, passthrough) <- Stream.chunk(Chunk.seq(toProduce)).zipWithIndex.map {
(records, passthrough) <- Stream.chunk(Chunk.from(toProduce)).zipWithIndex.map {
case ((key, value), i) =>
val record = ProducerRecord(topic, key, value)

Expand Down Expand Up @@ -185,7 +185,7 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
_ => IO.unit
)

records = Chunk.seq(0 to 100).map(i => CommittableProducerRecords(Chunk.empty, offsets(i)))
records = Chunk.from(0 to 100).map(i => CommittableProducerRecords(Chunk.empty, offsets(i)))

results <- Stream.eval(producer.produce(records))
} yield {
Expand All @@ -198,7 +198,7 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
private def testMultiple(topic: String, makeOffset: Option[Int => CommittableOffset[IO]]) = {
createCustomTopic(topic, partitions = 3)
val toProduce =
Chunk.seq((0 to 100).toList.map(n => s"key-$n" -> s"value-$n"))
Chunk.from((0 to 100).toList.map(n => s"key-$n" -> s"value-$n"))

val toPassthrough = "passthrough"

Expand Down Expand Up @@ -262,7 +262,7 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)
val toProduce =
Chunk.seq((0 to 1000000).toList.map(n => s"key-$n" -> s"value-$n"))
Chunk.from((0 to 1000000).toList.map(n => s"key-$n" -> s"value-$n"))

val result =
(for {
Expand Down Expand Up @@ -367,7 +367,7 @@ class TransactionalKafkaProducerSpec extends BaseKafkaSpec with EitherValues {
_ => IO.unit
)
}
records = Chunk.seq(recordsToProduce.zip(offsets)).map {
records = Chunk.from(recordsToProduce.zip(offsets)).map {
case (record, offset) =>
CommittableProducerRecords.chunk(
Chunk.singleton(record),
Expand Down

0 comments on commit 7b230df

Please sign in to comment.