From 6b898b28bb94d30c1d93536d017d8d5816d97598 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 31 Jul 2020 12:24:27 +0100 Subject: [PATCH] Update producer example for 1.5.0 --- README.md | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 5235388..d1dff51 100644 --- a/README.md +++ b/README.md @@ -91,10 +91,13 @@ The single recommended way to move off a 0.x dependency that uses the older styl open Confluent.Kafka open FsKafka -let producerConfig = KafkaProducerConfig.Create("MyClientId", "kafka:9092", Acks.All) -let producer = KafkaProducer.Create(Serilog.LoggerConfiguration().CreateLogger(), producerConfig, "MyTopic") +let log = Serilog.LoggerConfiguration().CreateLogger() + +let batching = Batching.Linger (System.TimeSpan.FromMilliseconds 10.) +let producerConfig = KafkaProducerConfig.Create("MyClientId", "kafka:9092", Acks.All, batching) +let producer = KafkaProducer.Create(log, producerConfig, "MyTopic") let key = Guid.NewGuid().ToString() -let deliveryReport = producer.ProduceAsync(key, "Hello World!") |> Async.RunSynchronously +let deliveryResult = producer.ProduceAsync(key, "Hello World!") |> Async.RunSynchronously ``` ## Minimal batched consumer example @@ -104,10 +107,17 @@ let deliveryReport = producer.ProduceAsync(key, "Hello World!") |> Async.RunSync open Confluent.Kafka open FsKafka -let handler (messages : ConsumeResult []) = async { for m in messages do printfn "Received: %s" m.Message.Value } -let consumerConfig = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["topic"], "MyGroupId", AutoOffsetReset.Earliest) +let log = Serilog.LoggerConfiguration().CreateLogger() + +let handler (messages : ConsumeResult []) = async { + for m in messages do + printfn "Received: %s" m.Message.Value +} + +let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest) + async { - use consumer = BatchedConsumer.Start(Serilog.LoggerConfiguration().CreateLogger(), consumerConfig, handler) + use consumer = BatchedConsumer.Start(log, cfg, handler) return! consumer.AwaitCompletion() } |> Async.RunSynchronously ``` @@ -119,12 +129,18 @@ async { open Confluent.Kafka open FsKafka -let handler (messages : ConsumeResult []) = async { for m in messages do printfn "Received: %s" m.Message.Value } -let consumerConfig = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["topic"], "MyGroupId", AutoOffsetReset.Earliest) +let log = Serilog.LoggerConfiguration().CreateLogger() + +let handler (messages : ConsumeResult []) = async { + for m in messages do + printfn "Received: %s" m.Message.Value +} + +let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest) + async { - let logger = Serilog.LoggerConfiguration().CreateLogger() - use consumer = BatchedConsumer.Start(logger, consumerConfig, handler) - do! KafkaMonitor(logger).StartAsChild(consumer.Inner, consumerConfig.Inner.GroupId) + use consumer = BatchedConsumer.Start(log, cfg, handler) + do! KafkaMonitor(log).StartAsChild(consumer.Inner, cfg.Inner.GroupId) return! consumer.AwaitCompletion() } |> Async.RunSynchronously ```