Skip to content

Commit

Permalink
Update scalafmt-core to 3.7.17 in series/3.x (#1278)
Browse files Browse the repository at this point in the history
* Update scalafmt-core to 3.7.17 in series/3.x

* Reformat with scalafmt 3.7.17

Executed command: scalafmt --non-interactive

* Add 'Reformat with scalafmt 3.7.17' to .git-blame-ignore-revs
  • Loading branch information
scala-steward authored Dec 9, 2023
1 parent 803a377 commit 25ba73c
Show file tree
Hide file tree
Showing 18 changed files with 378 additions and 280 deletions.
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
# Scala Steward: Reformat with scalafmt 2.2.2
db0ea0ffd3d0956211a680941426f73ba7ec581b

# Scala Steward: Reformat with scalafmt 3.7.17
3dfecc9738dfc09b1bbd9a21b7f791f6ee7890b9
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "3.7.15"
version = "3.7.17"

# Scala 2 with -Xsource:3 compiler option
runner.dialect = scala213source3
Expand Down
60 changes: 28 additions & 32 deletions docs/src/main/mdoc/admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ The following imports are assumed throughout this page.
```scala mdoc:silent
import cats.effect._
import cats.syntax.all._
import fs2.Stream
import fs2.kafka._
import fs2.Stream
```

## Settings
Expand Down Expand Up @@ -58,12 +58,12 @@ def topicOperations[F[_]: Async]: F[Unit] =
kafkaAdminClientResource[F]("localhost:9092").use { client =>
for {
topicNames <- client.listTopics.names
_ <- client.describeTopics(topicNames.toList)
_ <- client.createTopic(new NewTopic("new-topic", 1, 1.toShort))
_ <- client.createTopics(new NewTopic("newer-topic", 1, 1.toShort) :: Nil)
_ <- client.createPartitions(Map("new-topic" -> NewPartitions.increaseTo(4)))
_ <- client.deleteTopic("new-topic")
_ <- client.deleteTopics("newer-topic" :: Nil)
_ <- client.describeTopics(topicNames.toList)
_ <- client.createTopic(new NewTopic("new-topic", 1, 1.toShort))
_ <- client.createTopics(new NewTopic("newer-topic", 1, 1.toShort) :: Nil)
_ <- client.createPartitions(Map("new-topic" -> NewPartitions.increaseTo(4)))
_ <- client.deleteTopic("new-topic")
_ <- client.deleteTopics("newer-topic" :: Nil)
} yield ()
}
```
Expand All @@ -73,8 +73,8 @@ def topicOperations[F[_]: Async]: F[Unit] =
We can edit the configuration of different resources, like topics and nodes.

```scala mdoc:silent
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.config.ConfigResource

def configOperations[F[_]: Async]: F[Unit] =
kafkaAdminClientResource[F]("localhost:9092").use { client =>
Expand All @@ -83,13 +83,15 @@ def configOperations[F[_]: Async]: F[Unit] =
for {
_ <- client.describeConfigs(topic :: Nil)
_ <- client.alterConfigs {
Map(topic -> List(
new AlterConfigOp(
new ConfigEntry("cleanup.policy", "delete"),
AlterConfigOp.OpType.SET
)
))
}
Map(
topic -> List(
new AlterConfigOp(
new ConfigEntry("cleanup.policy", "delete"),
AlterConfigOp.OpType.SET
)
)
)
}
} yield ()
}
```
Expand All @@ -114,12 +116,10 @@ def consumerGroupOperations[F[_]: Async: cats.Parallel]: F[Unit] =
kafkaAdminClientResource[F]("localhost:9092").use { client =>
for {
consumerGroupIds <- client.listConsumerGroups.groupIds
_ <- client.describeConsumerGroups(consumerGroupIds)
_ <- client.describeConsumerGroups(consumerGroupIds)
_ <- consumerGroupIds.parTraverse { groupId =>
client
.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata
}
client.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata
}
} yield ()
}
```
Expand All @@ -130,26 +130,22 @@ There are ACL management functions to describe, create and delete ACL entries.

```scala mdoc:silent
import org.apache.kafka.common.acl._
import org.apache.kafka.common.resource.{
PatternType,
ResourcePattern,
ResourceType
}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}

def aclOperations[F[_]: Async]: F[Unit] =
kafkaAdminClientResource[F]("localhost:9092").use { client =>
for {
describedAcls <- client.describeAcls(AclBindingFilter.ANY)

aclEntry = new AccessControlEntry(
"User:ANONYMOUS",
"*",
AclOperation.DESCRIBE,
AclPermissionType.ALLOW
)
"User:ANONYMOUS",
"*",
AclOperation.DESCRIBE,
AclPermissionType.ALLOW
)
pattern = new ResourcePattern(ResourceType.TOPIC, "topic1", PatternType.LITERAL)
acl = new AclBinding(pattern, aclEntry)
_ <- client.createAcls(List(acl))
acl = new AclBinding(pattern, aclEntry)
_ <- client.createAcls(List(acl))

_ <- client.deleteAcls(List(AclBindingFilter.ANY))
} yield ()
Expand Down
13 changes: 6 additions & 7 deletions docs/src/main/mdoc/certificates.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ def createKafkaProducerUsingPem[F[_]: Sync, K, V](
accessKey: String,
accessCertificate: String
)(implicit keySer: Serializer[F, K], valSer: Serializer[F, V]): ProducerSettings[F, K, V] =
ProducerSettings[F, K, V]
.withCredentials(
KafkaCredentialStore.fromPemStrings(
caCertificate,
accessKey,
accessCertificate
)
ProducerSettings[F, K, V].withCredentials(
KafkaCredentialStore.fromPemStrings(
caCertificate,
accessKey,
accessCertificate
)
)
```
Loading

0 comments on commit 25ba73c

Please sign in to comment.