Skip to content

Commit

Permalink
Logging enhancements (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Jun 10, 2020
1 parent a84ed0f commit 096cf3b
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 23 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# OS files

## MacOS

.DS_Store

# Test results

*.trx
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,19 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- Add `.Verbose` log for handler invocations [#57](https://github.com/jet/FsKafka/pull/57) :pray: [@wantastic84](https://github.com/wantastic84)
- FsKafka0: Add ConsumerError logging [#57](https://github.com/jet/FsKafka/pull/57) :pray: [@svairagade](https://github.com/svairagade)

### Changed

- FsKafka: Distinguish Fatal Errors from by non-fatal by reducing level to Warning [#57](https://github.com/jet/FsKafka/pull/57) :pray: [@svairagade](https://github.com/svairagade)

### Removed
### Fixed

- Remove incorrect Producer logging (it logs before the wait hence generally reports 0), as spotted by [@wantastic84](https://github.com/wantastic84) [#57](https://github.com/jet/FsKafka/pull/57)

<a name="1.4.3"></a>
## [1.4.3] - 2020-05-20

Expand Down
43 changes: 23 additions & 20 deletions src/FsKafka/FsKafka.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ open System.Collections.Concurrent
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks
open Serilog.Events

module Binding =

Expand Down Expand Up @@ -61,13 +62,13 @@ type KafkaProducerConfig private (inner, bootstrapServers : string) =
let c =
let customPropsDictionary = match config with Some x -> x | None -> Dictionary<string,string>() :> IDictionary<string,string>
ProducerConfig(customPropsDictionary, // CK 1.2 and later has a default ctor and an IDictionary<string,string> overload
ClientId = clientId, BootstrapServers = bootstrapServers,
RetryBackoffMs = Nullable (match retryBackoff with Some (t : TimeSpan) -> int t.TotalMilliseconds | None -> 1000), // CK default 100ms
MessageSendMaxRetries = Nullable (defaultArg retries 60), // default 2
Acks = Nullable acks,
SocketKeepaliveEnable = Nullable (defaultArg socketKeepAlive true), // default: false
LogConnectionClose = Nullable false, // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017
MaxInFlight = Nullable (defaultArg maxInFlight 1_000_000)) // default 1_000_000
ClientId=clientId, BootstrapServers=bootstrapServers,
RetryBackoffMs=Nullable (match retryBackoff with Some (t : TimeSpan) -> int t.TotalMilliseconds | None -> 1000), // CK default 100ms
MessageSendMaxRetries=Nullable (defaultArg retries 60), // default 2
Acks=Nullable acks,
SocketKeepaliveEnable=Nullable (defaultArg socketKeepAlive true), // default: false
LogConnectionClose=Nullable false, // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017
MaxInFlight=Nullable (defaultArg maxInFlight 1_000_000)) // default 1_000_000
linger |> Option.iter<TimeSpan> (fun x -> c.LingerMs <- Nullable x.TotalMilliseconds) // default 0
partitioner |> Option.iter (fun x -> c.Partitioner <- Nullable x)
compression |> Option.iter (fun x -> c.CompressionType <- Nullable x)
Expand Down Expand Up @@ -151,9 +152,8 @@ type BatchedProducer private (log: ILogger, inner : IProducer<string, string>, t
results.[i - 1] <- m
if i = numMessages then tcs.TrySetResult results |> ignore
for key,value in keyValueBatch do
inner.Produce(topic, Message<_,_>(Key=key, Value=value), deliveryHandler = handler)
inner.Produce(topic, Message<_,_>(Key=key, Value=value), deliveryHandler=handler)
inner.Flush(ct)
log.Debug("Produced {count}",!numCompleted)
return! Async.AwaitTaskCorrect tcs.Task }

/// Creates and wraps a Confluent.Kafka Producer that affords a batched production mode.
Expand Down Expand Up @@ -252,12 +252,12 @@ type KafkaConsumerConfig = private { inner: ConsumerConfig; topics: string list;
let customPropsDictionary = match config with Some x -> x | None -> Dictionary<string,string>() :> IDictionary<string,string>
ConsumerConfig(customPropsDictionary, // CK 1.2 and later has a default ctor and an IDictionary<string,string> overload
ClientId=clientId, BootstrapServers=bootstrapServers, GroupId=groupId,
AutoOffsetReset = Nullable autoOffsetReset, // default: latest
FetchMaxBytes = Nullable fetchMaxBytes, // default: 524_288_000
MessageMaxBytes = Nullable (defaultArg messageMaxBytes fetchMaxBytes), // default 1_000_000
EnableAutoCommit = Nullable true, // at AutoCommitIntervalMs interval, write value supplied by StoreOffset call
EnableAutoOffsetStore = Nullable false, // explicit calls to StoreOffset are the only things that effect progression in offsets
LogConnectionClose = Nullable false) // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017
AutoOffsetReset=Nullable autoOffsetReset, // default: latest
FetchMaxBytes=Nullable fetchMaxBytes, // default: 524_288_000
MessageMaxBytes=Nullable (defaultArg messageMaxBytes fetchMaxBytes), // default 1_000_000
EnableAutoCommit=Nullable true, // at AutoCommitIntervalMs interval, write value supplied by StoreOffset call
EnableAutoOffsetStore=Nullable false, // explicit calls to StoreOffset are the only things that effect progression in offsets
LogConnectionClose=Nullable false) // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017
fetchMinBytes |> Option.iter (fun x -> c.FetchMinBytes <- x) // Fetch waits for this amount of data for up to FetchWaitMaxMs (100)
autoCommitInterval |> Option.iter<TimeSpan> (fun x -> c.AutoCommitIntervalMs <- Nullable <| int x.TotalMilliseconds)
statisticsInterval |> Option.iter<TimeSpan> (fun x -> c.StatisticsIntervalMs <- Nullable <| int x.TotalMilliseconds)
Expand Down Expand Up @@ -304,7 +304,9 @@ type ConsumerBuilder =
static member WithLogging(log : ILogger, config : ConsumerConfig, ?onRevoke) =
ConsumerBuilder<_,_>(config)
.SetLogHandler(fun _c m -> log.Information("Consuming... {message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility))
.SetErrorHandler(fun _c e -> log.Error("Consuming... Error reason={reason} code={code} isBrokerError={isBrokerError}", e.Reason, e.Code, e.IsBrokerError))
.SetErrorHandler(fun _c e ->
let level = if e.IsFatal then LogEventLevel.Error else LogEventLevel.Warning
log.Write(level, "Consuming... Error reason={reason} code={code} isBrokerError={isBrokerError}", e.Reason, e.Code, e.IsBrokerError))
.SetStatisticsHandler(fun c json ->
// Stats format: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md
let stats = JToken.Parse json
Expand Down Expand Up @@ -365,7 +367,7 @@ module private ConsumerImpl =
let createCollection() =
match perPartitionCapacity with
| None -> new BlockingCollection<'Message>()
| Some c -> new BlockingCollection<'Message>(boundedCapacity = c)
| Some c -> new BlockingCollection<'Message>(boundedCapacity=c)

[<CLIEvent>]
member __.OnPartitionAdded = onPartitionAdded.Publish
Expand Down Expand Up @@ -411,6 +413,7 @@ module private ConsumerImpl =
try match nextBatch() with
| [||] -> ()
| batch ->
log.Verbose("Dispatching {count} message(s) to handler", batch.Length)
// run the handler function
do! handler batch

Expand Down Expand Up @@ -458,7 +461,7 @@ type BatchedConsumer private (inner : IConsumer<string, string>, task : Task<uni
member __.Stop() = triggerStop ()
/// Inspects current status of processing task
member __.Status = task.Status
member __.RanToCompletion = task.Status = System.Threading.Tasks.TaskStatus.RanToCompletion
member __.RanToCompletion = task.Status = TaskStatus.RanToCompletion
/// Asynchronously awaits until consumer stops or is faulted
member __.AwaitCompletion() = Async.AwaitTaskCorrect task

Expand All @@ -474,7 +477,7 @@ type BatchedConsumer private (inner : IConsumer<string, string>, task : Task<uni
let onRevoke (xs : seq<TopicPartitionOffset>) =
for x in xs do
partitionedCollection.Revoke(x.TopicPartition)
let consumer : IConsumer<string,string> = ConsumerBuilder.WithLogging(log, config.inner, onRevoke = onRevoke)
let consumer : IConsumer<string,string> = ConsumerBuilder.WithLogging(log, config.inner, onRevoke=onRevoke)
let cts = new CancellationTokenSource()
let triggerStop () =
log.Information("Consuming... Stopping {name:l}", consumer.Name)
Expand All @@ -501,4 +504,4 @@ type BatchedConsumer private (inner : IConsumer<string, string>, task : Task<uni
|> Async.Ignore
}

BatchedConsumer.Start(log, config, partitionHandler)
BatchedConsumer.Start(log, config, partitionHandler)
8 changes: 5 additions & 3 deletions src/FsKafka0/FsKafka.fs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ type BatchedProducer private (log: ILogger, inner : Producer<string, string>, to
member __.HandleDeliveryReport m = handler m }
for key,value in keyValueBatch do
inner.ProduceAsync(topic, key, value, blockIfQueueFull = true, deliveryHandler = handler')
log.Debug("Produced {count}",!numCompleted)
return! Async.AwaitTaskCorrect tcs.Task }

/// Creates and wraps a Confluent.Kafka Producer that affords a batched production mode.
Expand Down Expand Up @@ -297,7 +296,7 @@ type ConsumerBuilder =
let d1 = c.OnLog.Subscribe(fun m ->
log.Information("Consuming... {message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility))
let d2 = c.OnError.Subscribe(fun e ->
log.Error("Consuming... Error reason={reason} code={code} broker={isBrokerError}", e.Reason, e.Code, e.IsBrokerError))
log.Warning("Consuming... Error reason={reason} code={code} broker={isBrokerError}", e.Reason, e.Code, e.IsBrokerError))
let d3 = c.OnPartitionsAssigned.Subscribe(fun tps ->
for topic,partitions in tps |> Seq.groupBy (fun p -> p.Topic) |> Seq.map (fun (t,ps) -> t, [| for p in ps -> p.Partition |]) do
log.Information("Consuming... Assigned {topic:l} {partitions}", topic, partitions)
Expand Down Expand Up @@ -328,7 +327,9 @@ type ConsumerBuilder =
yield kpm |]
let totalLag = metrics |> Array.sumBy (fun x -> x.consumerLag)
log.Information("Consuming... Stats {topic:l} totalLag {totalLag} {@stats}", topic, totalLag, metrics))
fun () -> for d in [d1;d2;d3;d4;d5;d6;d7] do d.Dispose()
let d8 = c.OnConsumeError.Subscribe (fun msg ->
log.Error("Consuming... Error reason={reason} topic={topic} partition={partition} offset={offset}", msg.Error.Reason, msg.Topic, msg.Partition, msg.Offset))
fun () -> for d in [d1;d2;d3;d4;d5;d6;d7;d8] do d.Dispose()
static member WithLogging(log : ILogger, config : ConsumerConfig, ?onRevoke) =
let consumer = new Consumer<_,_>(config.Render(), mkDeserializer(), mkDeserializer())
let unsubLog = ConsumerBuilder.WithLogging(log, consumer, ?onRevoke = onRevoke)
Expand Down Expand Up @@ -433,6 +434,7 @@ module private ConsumerImpl =
try match nextBatch() with
| [||] -> ()
| batch ->
log.Verbose("Dispatching {count} message(s) to handler", batch.Length)
// run the handler function
do! handler batch

Expand Down

0 comments on commit 096cf3b

Please sign in to comment.