Skip to content

Commit

Permalink
Merge pull request #528 from fd4s/redundant-braces
Browse files Browse the repository at this point in the history
Scalafmt rule to remove redundant braces
  • Loading branch information
bplommer authored Feb 14, 2021
2 parents 105635c + e73c9f0 commit ffe6741
Show file tree
Hide file tree
Showing 13 changed files with 45 additions and 86 deletions.
2 changes: 2 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ style = default
maxColumn = 100
project.git = true
continuationIndent.defnSite = 2

rewrite.rules = [RedundantBraces]
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,11 @@ object CommittableConsumerRecord {
}

implicit def committableConsumerRecordEq[F[_], K: Eq, V: Eq]
: Eq[CommittableConsumerRecord[F, K, V]] = {
: Eq[CommittableConsumerRecord[F, K, V]] =
Eq.instance {
case (l, r) =>
l.record === r.record && l.offset === r.offset
}
}

implicit def committableConsumerRecordBitraverse[F[_]]
: Bitraverse[CommittableConsumerRecord[F, *, *]] =
Expand All @@ -93,49 +92,43 @@ object CommittableConsumerRecord {
fab: CommittableConsumerRecord[F, A, B]
)(f: A => G[C], g: B => G[D])(
implicit G: Applicative[G]
): G[CommittableConsumerRecord[F, C, D]] = {
): G[CommittableConsumerRecord[F, C, D]] =
fab.record.bitraverse(f, g).map { (cd: ConsumerRecord[C, D]) =>
CommittableConsumerRecord(cd, fab.offset)
}
}

override def bifoldLeft[A, B, C](
fab: CommittableConsumerRecord[F, A, B],
c: C
)(f: (C, A) => C, g: (C, B) => C): C = {
)(f: (C, A) => C, g: (C, B) => C): C =
fab.record.bifoldLeft(c)(f, g)
}

override def bifoldRight[A, B, C](
fab: CommittableConsumerRecord[F, A, B],
c: Eval[C]
)(f: (A, Eval[C]) => Eval[C], g: (B, Eval[C]) => Eval[C]): Eval[C] = {
)(f: (A, Eval[C]) => Eval[C], g: (B, Eval[C]) => Eval[C]): Eval[C] =
fab.record.bifoldRight(c)(f, g)
}
}

implicit def committableConsumerRecordTraverse[F[_], K]
: Traverse[CommittableConsumerRecord[F, K, *]] =
new Traverse[CommittableConsumerRecord[F, K, *]] {
override def traverse[G[_], A, B](
fa: CommittableConsumerRecord[F, K, A]
)(f: A => G[B])(implicit G: Applicative[G]): G[CommittableConsumerRecord[F, K, B]] = {
)(f: A => G[B])(implicit G: Applicative[G]): G[CommittableConsumerRecord[F, K, B]] =
fa.record.traverse(f).map { (b: ConsumerRecord[K, B]) =>
CommittableConsumerRecord(b, fa.offset)
}
}

override def foldLeft[A, B](fa: CommittableConsumerRecord[F, K, A], b: B)(
f: (B, A) => B
): B = {
): B =
fa.record.foldLeft(b)(f)
}

override def foldRight[A, B](
fa: CommittableConsumerRecord[F, K, A],
lb: Eval[B]
)(f: (A, Eval[B]) => Eval[B]): Eval[B] = {
)(f: (A, Eval[B]) => Eval[B]): Eval[B] =
fa.record.foldRight(lb)(f)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ object CommittableProducerRecords {
override val records: Chunk[ProducerRecord[K, V]],
override val offset: CommittableOffset[F]
) extends CommittableProducerRecords[F, K, V] {
override def toString: String = {
override def toString: String =
if (records.isEmpty) s"CommittableProducerRecords(<empty>, $offset)"
else records.mkString("CommittableProducerRecords(", ", ", s", $offset)")
}
}

/**
Expand Down Expand Up @@ -120,69 +119,63 @@ object CommittableProducerRecords {
fab: CommittableProducerRecords[F, A, B]
)(f: A => G[C], g: B => G[D])(
implicit G: Applicative[G]
): G[CommittableProducerRecords[F, C, D]] = {
): G[CommittableProducerRecords[F, C, D]] =
fab.records
.traverse { record =>
record.bitraverse(f, g)
}
.map { (cd: Chunk[ProducerRecord[C, D]]) =>
CommittableProducerRecords(cd, fab.offset)
}
}

override def bifoldLeft[A, B, C](
fab: CommittableProducerRecords[F, A, B],
c: C
)(f: (C, A) => C, g: (C, B) => C): C = {
)(f: (C, A) => C, g: (C, B) => C): C =
fab.records.foldLeft(c) {
case (acc, record) =>
record.bifoldLeft(acc)(f, g)
}
}

override def bifoldRight[A, B, C](
fab: CommittableProducerRecords[F, A, B],
c: Eval[C]
)(f: (A, Eval[C]) => Eval[C], g: (B, Eval[C]) => Eval[C]): Eval[C] = {
)(f: (A, Eval[C]) => Eval[C], g: (B, Eval[C]) => Eval[C]): Eval[C] =
fab.records.foldRight(c) {
case (record, acc) =>
record.bifoldRight(acc)(f, g)
}
}
}

implicit def committableProducerRecordsTraverse[F[_], K]
: Traverse[CommittableProducerRecords[F, K, *]] =
new Traverse[CommittableProducerRecords[F, K, *]] {
override def traverse[G[_], A, B](
fa: CommittableProducerRecords[F, K, A]
)(f: A => G[B])(implicit G: Applicative[G]): G[CommittableProducerRecords[F, K, B]] = {
)(f: A => G[B])(implicit G: Applicative[G]): G[CommittableProducerRecords[F, K, B]] =
fa.records
.traverse { record =>
record.traverse(f)
}
.map { (b: Chunk[ProducerRecord[K, B]]) =>
CommittableProducerRecords(b, fa.offset)
}
}

override def foldLeft[A, B](fa: CommittableProducerRecords[F, K, A], b: B)(
f: (B, A) => B
): B = {
): B =
fa.records.foldLeft(b) {
case (acc, record) =>
record.foldLeft(acc)(f)
}
}

override def foldRight[A, B](
fa: CommittableProducerRecords[F, K, A],
lb: Eval[B]
)(f: (A, Eval[B]) => Eval[B]): Eval[B] = {
)(f: (A, Eval[B]) => Eval[B]): Eval[B] =
fa.records.foldRight(lb) {
case (record, acc) =>
record.foldRight(acc)(f)
}
}
}
}
18 changes: 6 additions & 12 deletions modules/core/src/main/scala/fs2/kafka/ConsumerRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -260,46 +260,40 @@ object ConsumerRecord {
new Bitraverse[ConsumerRecord] {
override def bitraverse[G[_], A, B, C, D](
fab: ConsumerRecord[A, B]
)(f: A => G[C], g: B => G[D])(implicit G: Applicative[G]): G[ConsumerRecord[C, D]] = {
)(f: A => G[C], g: B => G[D])(implicit G: Applicative[G]): G[ConsumerRecord[C, D]] =
G.product(f(fab.key), g(fab.value)).map {
case (c, d) =>
fab.withKeyValue(c, d)
}
}

override def bifoldLeft[A, B, C](
fab: ConsumerRecord[A, B],
c: C
)(f: (C, A) => C, g: (C, B) => C): C = {
)(f: (C, A) => C, g: (C, B) => C): C =
g(f(c, fab.key), fab.value)
}

override def bifoldRight[A, B, C](
fab: ConsumerRecord[A, B],
c: Eval[C]
)(f: (A, Eval[C]) => Eval[C], g: (B, Eval[C]) => Eval[C]): Eval[C] = {
)(f: (A, Eval[C]) => Eval[C], g: (B, Eval[C]) => Eval[C]): Eval[C] =
g(fab.value, f(fab.key, c))
}
}

implicit def consumerRecordTraverse[K]: Traverse[ConsumerRecord[K, *]] =
new Traverse[ConsumerRecord[K, *]] {
override def traverse[G[_], A, B](
fa: ConsumerRecord[K, A]
)(f: A => G[B])(implicit G: Applicative[G]): G[ConsumerRecord[K, B]] = {
)(f: A => G[B])(implicit G: Applicative[G]): G[ConsumerRecord[K, B]] =
f(fa.value).map { b =>
fa.withValue(b)
}
}

override def foldLeft[A, B](fa: ConsumerRecord[K, A], b: B)(f: (B, A) => B): B = {
override def foldLeft[A, B](fa: ConsumerRecord[K, A], b: B)(f: (B, A) => B): B =
f(b, fa.value)
}

override def foldRight[A, B](fa: ConsumerRecord[K, A], lb: Eval[B])(
f: (A, Eval[B]) => Eval[B]
): Eval[B] = {
): Eval[B] =
f(fa.value, lb)
}
}
}
3 changes: 1 addition & 2 deletions modules/core/src/main/scala/fs2/kafka/Headers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,9 @@ object Headers {
override def exists(key: String): Boolean =
headers.exists(_.key == key)

override def concat(that: Headers): Headers = {
override def concat(that: Headers): Headers =
if (that.isEmpty) this
else new HeadersImpl(headers.appendChain(that.toChain))
}

override def toChain: Chain[Header] =
headers.toChain
Expand Down
18 changes: 6 additions & 12 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ object KafkaConsumer {
streamId: StreamId,
partitionStreamId: PartitionStreamId,
partition: TopicPartition
): F[Stream[F, CommittableConsumerRecord[F, K, V]]] = {
): F[Stream[F, CommittableConsumerRecord[F, K, V]]] =
for {
chunks <- chunkQueue
dequeueDone <- Deferred[F, Unit]
Expand Down Expand Up @@ -217,7 +217,6 @@ object KafkaConsumer {
.onFinalize(dequeueDone.complete(()))
}
}.flatten
}

def enqueueAssignment(
streamId: StreamId,
Expand Down Expand Up @@ -274,7 +273,7 @@ object KafkaConsumer {
streamId: StreamId,
partitionStreamIdRef: Ref[F, PartitionStreamId],
partitionsMapQueue: PartitionsMapQueue
): F[SortedSet[TopicPartition]] = {
): F[SortedSet[TopicPartition]] =
Deferred[F, Either[Throwable, SortedSet[TopicPartition]]].flatMap { deferred =>
val request =
Request.Assignment[F, K, V](
Expand All @@ -287,7 +286,6 @@ object KafkaConsumer {
case Right(assigned) => assigned
}
}
}

def initialEnqueue(
streamId: StreamId,
Expand Down Expand Up @@ -318,35 +316,32 @@ object KafkaConsumer {
}
}

override def partitionedStream: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]] = {
override def partitionedStream: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]] =
partitionsMapStream.flatMap { partitionsMap =>
Stream.emits(partitionsMap.toVector.map {
case (_, partitionStream) =>
partitionStream
})
}
}

override def stream: Stream[F, CommittableConsumerRecord[F, K, V]] =
partitionedStream.parJoinUnbounded

override def commitAsync(offsets: Map[TopicPartition, OffsetAndMetadata]): F[Unit] = {
override def commitAsync(offsets: Map[TopicPartition, OffsetAndMetadata]): F[Unit] =
request { callback =>
Request.ManualCommitAsync(
callback = callback,
offsets = offsets
)
}
}

override def commitSync(offsets: Map[TopicPartition, OffsetAndMetadata]): F[Unit] = {
override def commitSync(offsets: Map[TopicPartition, OffsetAndMetadata]): F[Unit] =
request { callback =>
Request.ManualCommitSync(
callback = callback,
offsets = offsets
)
}
}

private[this] def request[A](
request: (Either[Throwable, A] => F[Unit]) => Request[F, K, V]
Expand Down Expand Up @@ -493,7 +488,7 @@ object KafkaConsumer {
override def assign(topic: String, partitions: NonEmptySet[Int]): F[Unit] =
assign(partitions.map(new TopicPartition(topic, _)))

override def assign(topic: String): F[Unit] = {
override def assign(topic: String): F[Unit] =
for {
partitions <- partitionsFor(topic)
.map { partitionInfo =>
Expand All @@ -503,7 +498,6 @@ object KafkaConsumer {
}
_ <- partitions.fold(F.unit)(assign(topic, _))
} yield ()
}

override def beginningOffsets(
partitions: Set[TopicPartition]
Expand Down
3 changes: 1 addition & 2 deletions modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,12 @@ object KafkaProducer {
new KafkaProducer.Metrics[F, K, V] {
override def produce[P](
records: ProducerRecords[K, V, P]
): F[F[ProducerResult[K, V, P]]] = {
): F[F[ProducerResult[K, V, P]]] =
withProducer { (producer, _) =>
records.records
.traverse(produceRecord(keySerializer, valueSerializer, producer))
.map(_.sequence.map(ProducerResult(_, records.passthrough)))
}
}

override def metrics: F[Map[MetricName, Metric]] =
withProducer.blocking { _.metrics().asScala.toMap }
Expand Down
18 changes: 6 additions & 12 deletions modules/core/src/main/scala/fs2/kafka/ProducerRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,46 +152,40 @@ object ProducerRecord {
new Bitraverse[ProducerRecord] {
override def bitraverse[G[_], A, B, C, D](
fab: ProducerRecord[A, B]
)(f: A => G[C], g: B => G[D])(implicit G: Applicative[G]): G[ProducerRecord[C, D]] = {
)(f: A => G[C], g: B => G[D])(implicit G: Applicative[G]): G[ProducerRecord[C, D]] =
G.product(f(fab.key), g(fab.value)).map {
case (c, d) =>
fab.withKeyValue(c, d)
}
}

override def bifoldLeft[A, B, C](
fab: ProducerRecord[A, B],
c: C
)(f: (C, A) => C, g: (C, B) => C): C = {
)(f: (C, A) => C, g: (C, B) => C): C =
g(f(c, fab.key), fab.value)
}

override def bifoldRight[A, B, C](
fab: ProducerRecord[A, B],
c: Eval[C]
)(f: (A, Eval[C]) => Eval[C], g: (B, Eval[C]) => Eval[C]): Eval[C] = {
)(f: (A, Eval[C]) => Eval[C], g: (B, Eval[C]) => Eval[C]): Eval[C] =
g(fab.value, f(fab.key, c))
}
}

implicit def producerRecordTraverse[K]: Traverse[ProducerRecord[K, *]] =
new Traverse[ProducerRecord[K, *]] {
override def traverse[G[_], A, B](
fa: ProducerRecord[K, A]
)(f: A => G[B])(implicit G: Applicative[G]): G[ProducerRecord[K, B]] = {
)(f: A => G[B])(implicit G: Applicative[G]): G[ProducerRecord[K, B]] =
f(fa.value).map { b =>
fa.withValue(b)
}
}

override def foldLeft[A, B](fa: ProducerRecord[K, A], b: B)(f: (B, A) => B): B = {
override def foldLeft[A, B](fa: ProducerRecord[K, A], b: B)(f: (B, A) => B): B =
f(b, fa.value)
}

override def foldRight[A, B](fa: ProducerRecord[K, A], lb: Eval[B])(
f: (A, Eval[B]) => Eval[B]
): Eval[B] = {
): Eval[B] =
f(fa.value, lb)
}
}
}
3 changes: 1 addition & 2 deletions modules/core/src/main/scala/fs2/kafka/ProducerRecords.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@ object ProducerRecords {
override val records: Chunk[ProducerRecord[K, V]],
override val passthrough: P
) extends ProducerRecords[K, V, P] {
override def toString: String = {
override def toString: String =
if (records.isEmpty) s"ProducerRecords(<empty>, $passthrough)"
else records.mkString("ProducerRecords(", ", ", s", $passthrough)")
}
}

/**
Expand Down
Loading

0 comments on commit ffe6741

Please sign in to comment.