From 76a318f61930f96f82b93e8577bf6070ac18d879 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 9 Jun 2020 19:28:35 +0100 Subject: [PATCH 1/9] Remove erroneous logging --- src/FsKafka/FsKafka.fs | 33 ++++++++++++++++----------------- src/FsKafka0/FsKafka.fs | 1 - 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/src/FsKafka/FsKafka.fs b/src/FsKafka/FsKafka.fs index e391eef..ea2cf45 100644 --- a/src/FsKafka/FsKafka.fs +++ b/src/FsKafka/FsKafka.fs @@ -61,13 +61,13 @@ type KafkaProducerConfig private (inner, bootstrapServers : string) = let c = let customPropsDictionary = match config with Some x -> x | None -> Dictionary() :> IDictionary ProducerConfig(customPropsDictionary, // CK 1.2 and later has a default ctor and an IDictionary overload - ClientId = clientId, BootstrapServers = bootstrapServers, - RetryBackoffMs = Nullable (match retryBackoff with Some (t : TimeSpan) -> int t.TotalMilliseconds | None -> 1000), // CK default 100ms - MessageSendMaxRetries = Nullable (defaultArg retries 60), // default 2 - Acks = Nullable acks, - SocketKeepaliveEnable = Nullable (defaultArg socketKeepAlive true), // default: false - LogConnectionClose = Nullable false, // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017 - MaxInFlight = Nullable (defaultArg maxInFlight 1_000_000)) // default 1_000_000 + ClientId=clientId, BootstrapServers=bootstrapServers, + RetryBackoffMs=Nullable (match retryBackoff with Some (t : TimeSpan) -> int t.TotalMilliseconds | None -> 1000), // CK default 100ms + MessageSendMaxRetries=Nullable (defaultArg retries 60), // default 2 + Acks=Nullable acks, + SocketKeepaliveEnable=Nullable (defaultArg socketKeepAlive true), // default: false + LogConnectionClose=Nullable false, // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017 + MaxInFlight=Nullable (defaultArg maxInFlight 1_000_000)) // default 1_000_000 linger |> Option.iter (fun x -> c.LingerMs <- Nullable x.TotalMilliseconds) // default 0 partitioner |> Option.iter (fun x -> c.Partitioner <- Nullable x) compression |> Option.iter (fun x -> c.CompressionType <- Nullable x) @@ -151,9 +151,8 @@ type BatchedProducer private (log: ILogger, inner : IProducer, t results.[i - 1] <- m if i = numMessages then tcs.TrySetResult results |> ignore for key,value in keyValueBatch do - inner.Produce(topic, Message<_,_>(Key=key, Value=value), deliveryHandler = handler) + inner.Produce(topic, Message<_,_>(Key=key, Value=value), deliveryHandler=handler) inner.Flush(ct) - log.Debug("Produced {count}",!numCompleted) return! Async.AwaitTaskCorrect tcs.Task } /// Creates and wraps a Confluent.Kafka Producer that affords a batched production mode. @@ -252,12 +251,12 @@ type KafkaConsumerConfig = private { inner: ConsumerConfig; topics: string list; let customPropsDictionary = match config with Some x -> x | None -> Dictionary() :> IDictionary ConsumerConfig(customPropsDictionary, // CK 1.2 and later has a default ctor and an IDictionary overload ClientId=clientId, BootstrapServers=bootstrapServers, GroupId=groupId, - AutoOffsetReset = Nullable autoOffsetReset, // default: latest - FetchMaxBytes = Nullable fetchMaxBytes, // default: 524_288_000 - MessageMaxBytes = Nullable (defaultArg messageMaxBytes fetchMaxBytes), // default 1_000_000 - EnableAutoCommit = Nullable true, // at AutoCommitIntervalMs interval, write value supplied by StoreOffset call - EnableAutoOffsetStore = Nullable false, // explicit calls to StoreOffset are the only things that effect progression in offsets - LogConnectionClose = Nullable false) // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017 + AutoOffsetReset=Nullable autoOffsetReset, // default: latest + FetchMaxBytes=Nullable fetchMaxBytes, // default: 524_288_000 + MessageMaxBytes=Nullable (defaultArg messageMaxBytes fetchMaxBytes), // default 1_000_000 + EnableAutoCommit=Nullable true, // at AutoCommitIntervalMs interval, write value supplied by StoreOffset call + EnableAutoOffsetStore=Nullable false, // explicit calls to StoreOffset are the only things that effect progression in offsets + LogConnectionClose=Nullable false) // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017 fetchMinBytes |> Option.iter (fun x -> c.FetchMinBytes <- x) // Fetch waits for this amount of data for up to FetchWaitMaxMs (100) autoCommitInterval |> Option.iter (fun x -> c.AutoCommitIntervalMs <- Nullable <| int x.TotalMilliseconds) statisticsInterval |> Option.iter (fun x -> c.StatisticsIntervalMs <- Nullable <| int x.TotalMilliseconds) @@ -365,7 +364,7 @@ module private ConsumerImpl = let createCollection() = match perPartitionCapacity with | None -> new BlockingCollection<'Message>() - | Some c -> new BlockingCollection<'Message>(boundedCapacity = c) + | Some c -> new BlockingCollection<'Message>(boundedCapacity=c) [] member __.OnPartitionAdded = onPartitionAdded.Publish @@ -474,7 +473,7 @@ type BatchedConsumer private (inner : IConsumer, task : Task) = for x in xs do partitionedCollection.Revoke(x.TopicPartition) - let consumer : IConsumer = ConsumerBuilder.WithLogging(log, config.inner, onRevoke = onRevoke) + let consumer : IConsumer = ConsumerBuilder.WithLogging(log, config.inner, onRevoke=onRevoke) let cts = new CancellationTokenSource() let triggerStop () = log.Information("Consuming... Stopping {name:l}", consumer.Name) diff --git a/src/FsKafka0/FsKafka.fs b/src/FsKafka0/FsKafka.fs index f6180d1..f357e40 100644 --- a/src/FsKafka0/FsKafka.fs +++ b/src/FsKafka0/FsKafka.fs @@ -148,7 +148,6 @@ type BatchedProducer private (log: ILogger, inner : Producer, to member __.HandleDeliveryReport m = handler m } for key,value in keyValueBatch do inner.ProduceAsync(topic, key, value, blockIfQueueFull = true, deliveryHandler = handler') - log.Debug("Produced {count}",!numCompleted) return! Async.AwaitTaskCorrect tcs.Task } /// Creates and wraps a Confluent.Kafka Producer that affords a batched production mode. From 56db3e850fc48f8aaff1f91ec8b2598d5836702e Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Tue, 9 Jun 2020 19:29:49 +0100 Subject: [PATCH 2/9] gitignore MacOS --- .gitignore | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.gitignore b/.gitignore index 84f58e2..d4d62e3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,9 @@ +# OS files + +## MacOS + +.DS_Store + # Test results *.trx From 9c3e1f3da3359bf2e40eec8915f7237f642b6acd Mon Sep 17 00:00:00 2001 From: Wan Lee Date: Tue, 9 Jun 2020 19:06:40 -0400 Subject: [PATCH 3/9] Adding debugging logs to show batch count dispatched to handler --- src/FsKafka/FsKafka.fs | 1 + src/FsKafka0/FsKafka.fs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/FsKafka/FsKafka.fs b/src/FsKafka/FsKafka.fs index ea2cf45..4532052 100644 --- a/src/FsKafka/FsKafka.fs +++ b/src/FsKafka/FsKafka.fs @@ -410,6 +410,7 @@ module private ConsumerImpl = try match nextBatch() with | [||] -> () | batch -> + log.Verbose("Dispatching {count} message(s) to handler", batch.Length) // run the handler function do! handler batch diff --git a/src/FsKafka0/FsKafka.fs b/src/FsKafka0/FsKafka.fs index f357e40..15399f9 100644 --- a/src/FsKafka0/FsKafka.fs +++ b/src/FsKafka0/FsKafka.fs @@ -432,6 +432,7 @@ module private ConsumerImpl = try match nextBatch() with | [||] -> () | batch -> + log.Verbose("Dispatching {count} message(s) to handler", batch.Length) // run the handler function do! handler batch From 11e5e68dd36a97ccddf5e0eefb5b5c4cc4f1766f Mon Sep 17 00:00:00 2001 From: svairagade Date: Tue, 9 Jun 2020 18:16:31 -0400 Subject: [PATCH 4/9] Updated Logging - FSKafka: Filter to log error only when it is Fatal - FSKafka0: Add listener for ConsumeError --- src/FsKafka/FsKafka.fs | 6 +++++- src/FsKafka0/FsKafka.fs | 7 +++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/FsKafka/FsKafka.fs b/src/FsKafka/FsKafka.fs index 4532052..70340eb 100644 --- a/src/FsKafka/FsKafka.fs +++ b/src/FsKafka/FsKafka.fs @@ -303,7 +303,11 @@ type ConsumerBuilder = static member WithLogging(log : ILogger, config : ConsumerConfig, ?onRevoke) = ConsumerBuilder<_,_>(config) .SetLogHandler(fun _c m -> log.Information("Consuming... {message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility)) - .SetErrorHandler(fun _c e -> log.Error("Consuming... Error reason={reason} code={code} isBrokerError={isBrokerError}", e.Reason, e.Code, e.IsBrokerError)) + .SetErrorHandler(fun _c e -> + if e.IsFatal then + log.Error("Consuming... Error reason={reason} code={code} isBrokerError={isBrokerError}", e.Reason, e.Code, e.IsBrokerError) + else + log.Warning("Consuming... Warning reason={reason} code={code} isBrokerError={isBrokerError}", e.Reason, e.Code, e.IsBrokerError)) .SetStatisticsHandler(fun c json -> // Stats format: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md let stats = JToken.Parse json diff --git a/src/FsKafka0/FsKafka.fs b/src/FsKafka0/FsKafka.fs index 15399f9..291eb54 100644 --- a/src/FsKafka0/FsKafka.fs +++ b/src/FsKafka0/FsKafka.fs @@ -296,7 +296,7 @@ type ConsumerBuilder = let d1 = c.OnLog.Subscribe(fun m -> log.Information("Consuming... {message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility)) let d2 = c.OnError.Subscribe(fun e -> - log.Error("Consuming... Error reason={reason} code={code} broker={isBrokerError}", e.Reason, e.Code, e.IsBrokerError)) + log.Warning("Consuming... Warning reason={reason} code={code} broker={isBrokerError}", e.Reason, e.Code, e.IsBrokerError)) let d3 = c.OnPartitionsAssigned.Subscribe(fun tps -> for topic,partitions in tps |> Seq.groupBy (fun p -> p.Topic) |> Seq.map (fun (t,ps) -> t, [| for p in ps -> p.Partition |]) do log.Information("Consuming... Assigned {topic:l} {partitions}", topic, partitions) @@ -327,7 +327,10 @@ type ConsumerBuilder = yield kpm |] let totalLag = metrics |> Array.sumBy (fun x -> x.consumerLag) log.Information("Consuming... Stats {topic:l} totalLag {totalLag} {@stats}", topic, totalLag, metrics)) - fun () -> for d in [d1;d2;d3;d4;d5;d6;d7] do d.Dispose() + let d8 = c.OnConsumeError.Subscribe (fun msg -> + if msg.Error.HasError then + log.Error ("Error reason={reason} topic={topic} partition={partition} offset={offset}", msg.Error.Reason, msg.Topic, msg.Partition, msg.Offset)) + fun () -> for d in [d1;d2;d3;d4;d5;d6;d7;d8] do d.Dispose() static member WithLogging(log : ILogger, config : ConsumerConfig, ?onRevoke) = let consumer = new Consumer<_,_>(config.Render(), mkDeserializer(), mkDeserializer()) let unsubLog = ConsumerBuilder.WithLogging(log, consumer, ?onRevoke = onRevoke) From d851c9d343bb9292f929379031e5a1f36fc18efc Mon Sep 17 00:00:00 2001 From: svairagade Date: Tue, 9 Jun 2020 18:35:52 -0400 Subject: [PATCH 5/9] Made Log format consistent --- src/FsKafka0/FsKafka.fs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/FsKafka0/FsKafka.fs b/src/FsKafka0/FsKafka.fs index 291eb54..c5449f8 100644 --- a/src/FsKafka0/FsKafka.fs +++ b/src/FsKafka0/FsKafka.fs @@ -329,7 +329,7 @@ type ConsumerBuilder = log.Information("Consuming... Stats {topic:l} totalLag {totalLag} {@stats}", topic, totalLag, metrics)) let d8 = c.OnConsumeError.Subscribe (fun msg -> if msg.Error.HasError then - log.Error ("Error reason={reason} topic={topic} partition={partition} offset={offset}", msg.Error.Reason, msg.Topic, msg.Partition, msg.Offset)) + log.Error ("Consuming... Error reason={reason} topic={topic} partition={partition} offset={offset}", msg.Error.Reason, msg.Topic, msg.Partition, msg.Offset)) fun () -> for d in [d1;d2;d3;d4;d5;d6;d7;d8] do d.Dispose() static member WithLogging(log : ILogger, config : ConsumerConfig, ?onRevoke) = let consumer = new Consumer<_,_>(config.Render(), mkDeserializer(), mkDeserializer()) From 803dfc2363254bd38a39d99affa1451b9e3d3363 Mon Sep 17 00:00:00 2001 From: svairagade Date: Wed, 10 Jun 2020 11:04:53 -0400 Subject: [PATCH 6/9] Addressed review comments --- src/FsKafka/FsKafka.fs | 8 ++++---- src/FsKafka0/FsKafka.fs | 5 ++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/FsKafka/FsKafka.fs b/src/FsKafka/FsKafka.fs index 70340eb..a3328cb 100644 --- a/src/FsKafka/FsKafka.fs +++ b/src/FsKafka/FsKafka.fs @@ -9,6 +9,7 @@ open System.Collections.Concurrent open System.Collections.Generic open System.Threading open System.Threading.Tasks +open Serilog.Events module Binding = @@ -304,10 +305,9 @@ type ConsumerBuilder = ConsumerBuilder<_,_>(config) .SetLogHandler(fun _c m -> log.Information("Consuming... {message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility)) .SetErrorHandler(fun _c e -> - if e.IsFatal then - log.Error("Consuming... Error reason={reason} code={code} isBrokerError={isBrokerError}", e.Reason, e.Code, e.IsBrokerError) - else - log.Warning("Consuming... Warning reason={reason} code={code} isBrokerError={isBrokerError}", e.Reason, e.Code, e.IsBrokerError)) + log.Write((if e.IsFatal then LogEventLevel.Error else LogEventLevel.Warning), + "Consuming... Error reason={reason} code={code} isBrokerError={isBrokerError}", + e.Reason, e.Code, e.IsBrokerError)) .SetStatisticsHandler(fun c json -> // Stats format: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md let stats = JToken.Parse json diff --git a/src/FsKafka0/FsKafka.fs b/src/FsKafka0/FsKafka.fs index c5449f8..16755dc 100644 --- a/src/FsKafka0/FsKafka.fs +++ b/src/FsKafka0/FsKafka.fs @@ -296,7 +296,7 @@ type ConsumerBuilder = let d1 = c.OnLog.Subscribe(fun m -> log.Information("Consuming... {message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility)) let d2 = c.OnError.Subscribe(fun e -> - log.Warning("Consuming... Warning reason={reason} code={code} broker={isBrokerError}", e.Reason, e.Code, e.IsBrokerError)) + log.Warning("Consuming... Error reason={reason} code={code} broker={isBrokerError}", e.Reason, e.Code, e.IsBrokerError)) let d3 = c.OnPartitionsAssigned.Subscribe(fun tps -> for topic,partitions in tps |> Seq.groupBy (fun p -> p.Topic) |> Seq.map (fun (t,ps) -> t, [| for p in ps -> p.Partition |]) do log.Information("Consuming... Assigned {topic:l} {partitions}", topic, partitions) @@ -327,8 +327,7 @@ type ConsumerBuilder = yield kpm |] let totalLag = metrics |> Array.sumBy (fun x -> x.consumerLag) log.Information("Consuming... Stats {topic:l} totalLag {totalLag} {@stats}", topic, totalLag, metrics)) - let d8 = c.OnConsumeError.Subscribe (fun msg -> - if msg.Error.HasError then + let d8 = c.OnConsumeError.Subscribe (fun msg - log.Error ("Consuming... Error reason={reason} topic={topic} partition={partition} offset={offset}", msg.Error.Reason, msg.Topic, msg.Partition, msg.Offset)) fun () -> for d in [d1;d2;d3;d4;d5;d6;d7;d8] do d.Dispose() static member WithLogging(log : ILogger, config : ConsumerConfig, ?onRevoke) = From ede081f06804b390f0935ae7bd1ab01376c4e178 Mon Sep 17 00:00:00 2001 From: svairagade Date: Wed, 10 Jun 2020 11:30:35 -0400 Subject: [PATCH 7/9] Fixed compilation issue --- src/FsKafka0/FsKafka.fs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/FsKafka0/FsKafka.fs b/src/FsKafka0/FsKafka.fs index 16755dc..cce5b2b 100644 --- a/src/FsKafka0/FsKafka.fs +++ b/src/FsKafka0/FsKafka.fs @@ -327,7 +327,7 @@ type ConsumerBuilder = yield kpm |] let totalLag = metrics |> Array.sumBy (fun x -> x.consumerLag) log.Information("Consuming... Stats {topic:l} totalLag {totalLag} {@stats}", topic, totalLag, metrics)) - let d8 = c.OnConsumeError.Subscribe (fun msg - + let d8 = c.OnConsumeError.Subscribe (fun msg -> log.Error ("Consuming... Error reason={reason} topic={topic} partition={partition} offset={offset}", msg.Error.Reason, msg.Topic, msg.Partition, msg.Offset)) fun () -> for d in [d1;d2;d3;d4;d5;d6;d7;d8] do d.Dispose() static member WithLogging(log : ILogger, config : ConsumerConfig, ?onRevoke) = From bd5b0908a3b8b6383ab74a713d13cabbc1574002 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 10 Jun 2020 18:52:45 +0100 Subject: [PATCH 8/9] Formatting --- src/FsKafka/FsKafka.fs | 9 ++++----- src/FsKafka0/FsKafka.fs | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/FsKafka/FsKafka.fs b/src/FsKafka/FsKafka.fs index a3328cb..8ce3c21 100644 --- a/src/FsKafka/FsKafka.fs +++ b/src/FsKafka/FsKafka.fs @@ -305,9 +305,8 @@ type ConsumerBuilder = ConsumerBuilder<_,_>(config) .SetLogHandler(fun _c m -> log.Information("Consuming... {message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility)) .SetErrorHandler(fun _c e -> - log.Write((if e.IsFatal then LogEventLevel.Error else LogEventLevel.Warning), - "Consuming... Error reason={reason} code={code} isBrokerError={isBrokerError}", - e.Reason, e.Code, e.IsBrokerError)) + let level = if e.IsFatal then LogEventLevel.Error else LogEventLevel.Warning + log.Write(level, "Consuming... Error reason={reason} code={code} isBrokerError={isBrokerError}", e.Reason, e.Code, e.IsBrokerError)) .SetStatisticsHandler(fun c json -> // Stats format: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md let stats = JToken.Parse json @@ -462,7 +461,7 @@ type BatchedConsumer private (inner : IConsumer, task : Task, task : Task Async.Ignore } - BatchedConsumer.Start(log, config, partitionHandler) \ No newline at end of file + BatchedConsumer.Start(log, config, partitionHandler) diff --git a/src/FsKafka0/FsKafka.fs b/src/FsKafka0/FsKafka.fs index cce5b2b..16f16a0 100644 --- a/src/FsKafka0/FsKafka.fs +++ b/src/FsKafka0/FsKafka.fs @@ -328,7 +328,7 @@ type ConsumerBuilder = let totalLag = metrics |> Array.sumBy (fun x -> x.consumerLag) log.Information("Consuming... Stats {topic:l} totalLag {totalLag} {@stats}", topic, totalLag, metrics)) let d8 = c.OnConsumeError.Subscribe (fun msg -> - log.Error ("Consuming... Error reason={reason} topic={topic} partition={partition} offset={offset}", msg.Error.Reason, msg.Topic, msg.Partition, msg.Offset)) + log.Error("Consuming... Error reason={reason} topic={topic} partition={partition} offset={offset}", msg.Error.Reason, msg.Topic, msg.Partition, msg.Offset)) fun () -> for d in [d1;d2;d3;d4;d5;d6;d7;d8] do d.Dispose() static member WithLogging(log : ILogger, config : ConsumerConfig, ?onRevoke) = let consumer = new Consumer<_,_>(config.Render(), mkDeserializer(), mkDeserializer()) From e16654a9cf9e9544fbe9e0db69cff52e03a7b66d Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 10 Jun 2020 19:05:24 +0100 Subject: [PATCH 9/9] Changelog --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e511aad..c5d5902 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,10 +9,19 @@ The `Unreleased` section name is replaced by the expected version of next releas ## [Unreleased] ### Added + +- Add `.Verbose` log for handler invocations [#57](https://github.com/jet/FsKafka/pull/57) :pray: [@wantastic84](https://github.com/wantastic84) +- FsKafka0: Add ConsumerError logging [#57](https://github.com/jet/FsKafka/pull/57) :pray: [@svairagade](https://github.com/svairagade) + ### Changed + +- FsKafka: Distinguish Fatal Errors from by non-fatal by reducing level to Warning [#57](https://github.com/jet/FsKafka/pull/57) :pray: [@svairagade](https://github.com/svairagade) + ### Removed ### Fixed +- Remove incorrect Producer logging (it logs before the wait hence generally reports 0), as spotted by [@wantastic84](https://github.com/wantastic84) [#57](https://github.com/jet/FsKafka/pull/57) + ## [1.4.3] - 2020-05-20