Skip to content

Commit

Permalink
Update producer example for 1.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jul 31, 2020
1 parent 0c8c568 commit 6b898b2
Showing 1 changed file with 27 additions and 11 deletions.
38 changes: 27 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,13 @@ The single recommended way to move off a 0.x dependency that uses the older styl
open Confluent.Kafka
open FsKafka
let producerConfig = KafkaProducerConfig.Create("MyClientId", "kafka:9092", Acks.All)
let producer = KafkaProducer.Create(Serilog.LoggerConfiguration().CreateLogger(), producerConfig, "MyTopic")
let log = Serilog.LoggerConfiguration().CreateLogger()
let batching = Batching.Linger (System.TimeSpan.FromMilliseconds 10.)
let producerConfig = KafkaProducerConfig.Create("MyClientId", "kafka:9092", Acks.All, batching)
let producer = KafkaProducer.Create(log, producerConfig, "MyTopic")
let key = Guid.NewGuid().ToString()
let deliveryReport = producer.ProduceAsync(key, "Hello World!") |> Async.RunSynchronously
let deliveryResult = producer.ProduceAsync(key, "Hello World!") |> Async.RunSynchronously
```

## Minimal batched consumer example
Expand All @@ -104,10 +107,17 @@ let deliveryReport = producer.ProduceAsync(key, "Hello World!") |> Async.RunSync
open Confluent.Kafka
open FsKafka
let handler (messages : ConsumeResult<string,string> []) = async { for m in messages do printfn "Received: %s" m.Message.Value }
let consumerConfig = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["topic"], "MyGroupId", AutoOffsetReset.Earliest)
let log = Serilog.LoggerConfiguration().CreateLogger()
let handler (messages : ConsumeResult<string,string> []) = async {
for m in messages do
printfn "Received: %s" m.Message.Value
}
let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest)
async {
use consumer = BatchedConsumer.Start(Serilog.LoggerConfiguration().CreateLogger(), consumerConfig, handler)
use consumer = BatchedConsumer.Start(log, cfg, handler)
return! consumer.AwaitCompletion()
} |> Async.RunSynchronously
```
Expand All @@ -119,12 +129,18 @@ async {
open Confluent.Kafka
open FsKafka
let handler (messages : ConsumeResult<string,string> []) = async { for m in messages do printfn "Received: %s" m.Message.Value }
let consumerConfig = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["topic"], "MyGroupId", AutoOffsetReset.Earliest)
let log = Serilog.LoggerConfiguration().CreateLogger()
let handler (messages : ConsumeResult<string,string> []) = async {
for m in messages do
printfn "Received: %s" m.Message.Value
}
let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest)
async {
let logger = Serilog.LoggerConfiguration().CreateLogger()
use consumer = BatchedConsumer.Start(logger, consumerConfig, handler)
do! KafkaMonitor(logger).StartAsChild(consumer.Inner, consumerConfig.Inner.GroupId)
use consumer = BatchedConsumer.Start(log, cfg, handler)
do! KafkaMonitor(log).StartAsChild(consumer.Inner, cfg.Inner.GroupId)
return! consumer.AwaitCompletion()
} |> Async.RunSynchronously
```

0 comments on commit 6b898b2

Please sign in to comment.