Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tweak logging #57

Merged
merged 9 commits into from
Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# OS files

## MacOS

.DS_Store

# Test results

*.trx
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,19 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- Add `.Verbose` log for handler invocations [#57](https://github.com/jet/FsKafka/pull/57) :pray: [@wantastic84](https://github.com/wantastic84)
- FsKafka0: Add ConsumerError logging [#57](https://github.com/jet/FsKafka/pull/57) :pray: [@svairagade](https://github.com/svairagade)

### Changed

- FsKafka: Distinguish Fatal Errors from by non-fatal by reducing level to Warning [#57](https://github.com/jet/FsKafka/pull/57) :pray: [@svairagade](https://github.com/svairagade)

### Removed
### Fixed

- Remove incorrect Producer logging (it logs before the wait hence generally reports 0), as spotted by [@wantastic84](https://github.com/wantastic84) [#57](https://github.com/jet/FsKafka/pull/57)

<a name="1.4.3"></a>
## [1.4.3] - 2020-05-20

Expand Down
43 changes: 23 additions & 20 deletions src/FsKafka/FsKafka.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ open System.Collections.Concurrent
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks
open Serilog.Events

module Binding =

Expand Down Expand Up @@ -61,13 +62,13 @@ type KafkaProducerConfig private (inner, bootstrapServers : string) =
let c =
let customPropsDictionary = match config with Some x -> x | None -> Dictionary<string,string>() :> IDictionary<string,string>
ProducerConfig(customPropsDictionary, // CK 1.2 and later has a default ctor and an IDictionary<string,string> overload
ClientId = clientId, BootstrapServers = bootstrapServers,
RetryBackoffMs = Nullable (match retryBackoff with Some (t : TimeSpan) -> int t.TotalMilliseconds | None -> 1000), // CK default 100ms
MessageSendMaxRetries = Nullable (defaultArg retries 60), // default 2
Acks = Nullable acks,
SocketKeepaliveEnable = Nullable (defaultArg socketKeepAlive true), // default: false
LogConnectionClose = Nullable false, // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017
MaxInFlight = Nullable (defaultArg maxInFlight 1_000_000)) // default 1_000_000
ClientId=clientId, BootstrapServers=bootstrapServers,
RetryBackoffMs=Nullable (match retryBackoff with Some (t : TimeSpan) -> int t.TotalMilliseconds | None -> 1000), // CK default 100ms
MessageSendMaxRetries=Nullable (defaultArg retries 60), // default 2
Acks=Nullable acks,
SocketKeepaliveEnable=Nullable (defaultArg socketKeepAlive true), // default: false
LogConnectionClose=Nullable false, // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017
MaxInFlight=Nullable (defaultArg maxInFlight 1_000_000)) // default 1_000_000
linger |> Option.iter<TimeSpan> (fun x -> c.LingerMs <- Nullable x.TotalMilliseconds) // default 0
partitioner |> Option.iter (fun x -> c.Partitioner <- Nullable x)
compression |> Option.iter (fun x -> c.CompressionType <- Nullable x)
Expand Down Expand Up @@ -151,9 +152,8 @@ type BatchedProducer private (log: ILogger, inner : IProducer<string, string>, t
results.[i - 1] <- m
if i = numMessages then tcs.TrySetResult results |> ignore
for key,value in keyValueBatch do
inner.Produce(topic, Message<_,_>(Key=key, Value=value), deliveryHandler = handler)
inner.Produce(topic, Message<_,_>(Key=key, Value=value), deliveryHandler=handler)
inner.Flush(ct)
log.Debug("Produced {count}",!numCompleted)
return! Async.AwaitTaskCorrect tcs.Task }

/// Creates and wraps a Confluent.Kafka Producer that affords a batched production mode.
Expand Down Expand Up @@ -252,12 +252,12 @@ type KafkaConsumerConfig = private { inner: ConsumerConfig; topics: string list;
let customPropsDictionary = match config with Some x -> x | None -> Dictionary<string,string>() :> IDictionary<string,string>
ConsumerConfig(customPropsDictionary, // CK 1.2 and later has a default ctor and an IDictionary<string,string> overload
ClientId=clientId, BootstrapServers=bootstrapServers, GroupId=groupId,
AutoOffsetReset = Nullable autoOffsetReset, // default: latest
FetchMaxBytes = Nullable fetchMaxBytes, // default: 524_288_000
MessageMaxBytes = Nullable (defaultArg messageMaxBytes fetchMaxBytes), // default 1_000_000
EnableAutoCommit = Nullable true, // at AutoCommitIntervalMs interval, write value supplied by StoreOffset call
EnableAutoOffsetStore = Nullable false, // explicit calls to StoreOffset are the only things that effect progression in offsets
LogConnectionClose = Nullable false) // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017
AutoOffsetReset=Nullable autoOffsetReset, // default: latest
FetchMaxBytes=Nullable fetchMaxBytes, // default: 524_288_000
MessageMaxBytes=Nullable (defaultArg messageMaxBytes fetchMaxBytes), // default 1_000_000
EnableAutoCommit=Nullable true, // at AutoCommitIntervalMs interval, write value supplied by StoreOffset call
EnableAutoOffsetStore=Nullable false, // explicit calls to StoreOffset are the only things that effect progression in offsets
LogConnectionClose=Nullable false) // https://github.com/confluentinc/confluent-kafka-dotnet/issues/124#issuecomment-289727017
fetchMinBytes |> Option.iter (fun x -> c.FetchMinBytes <- x) // Fetch waits for this amount of data for up to FetchWaitMaxMs (100)
autoCommitInterval |> Option.iter<TimeSpan> (fun x -> c.AutoCommitIntervalMs <- Nullable <| int x.TotalMilliseconds)
statisticsInterval |> Option.iter<TimeSpan> (fun x -> c.StatisticsIntervalMs <- Nullable <| int x.TotalMilliseconds)
Expand Down Expand Up @@ -304,7 +304,9 @@ type ConsumerBuilder =
static member WithLogging(log : ILogger, config : ConsumerConfig, ?onRevoke) =
ConsumerBuilder<_,_>(config)
.SetLogHandler(fun _c m -> log.Information("Consuming... {message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility))
.SetErrorHandler(fun _c e -> log.Error("Consuming... Error reason={reason} code={code} isBrokerError={isBrokerError}", e.Reason, e.Code, e.IsBrokerError))
.SetErrorHandler(fun _c e ->
let level = if e.IsFatal then LogEventLevel.Error else LogEventLevel.Warning
log.Write(level, "Consuming... Error reason={reason} code={code} isBrokerError={isBrokerError}", e.Reason, e.Code, e.IsBrokerError))
.SetStatisticsHandler(fun c json ->
// Stats format: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md
let stats = JToken.Parse json
Expand Down Expand Up @@ -365,7 +367,7 @@ module private ConsumerImpl =
let createCollection() =
match perPartitionCapacity with
| None -> new BlockingCollection<'Message>()
| Some c -> new BlockingCollection<'Message>(boundedCapacity = c)
| Some c -> new BlockingCollection<'Message>(boundedCapacity=c)

[<CLIEvent>]
member __.OnPartitionAdded = onPartitionAdded.Publish
Expand Down Expand Up @@ -411,6 +413,7 @@ module private ConsumerImpl =
try match nextBatch() with
| [||] -> ()
| batch ->
log.Verbose("Dispatching {count} message(s) to handler", batch.Length)
// run the handler function
do! handler batch

Expand Down Expand Up @@ -458,7 +461,7 @@ type BatchedConsumer private (inner : IConsumer<string, string>, task : Task<uni
member __.Stop() = triggerStop ()
/// Inspects current status of processing task
member __.Status = task.Status
member __.RanToCompletion = task.Status = System.Threading.Tasks.TaskStatus.RanToCompletion
member __.RanToCompletion = task.Status = TaskStatus.RanToCompletion
/// Asynchronously awaits until consumer stops or is faulted
member __.AwaitCompletion() = Async.AwaitTaskCorrect task

Expand All @@ -474,7 +477,7 @@ type BatchedConsumer private (inner : IConsumer<string, string>, task : Task<uni
let onRevoke (xs : seq<TopicPartitionOffset>) =
for x in xs do
partitionedCollection.Revoke(x.TopicPartition)
let consumer : IConsumer<string,string> = ConsumerBuilder.WithLogging(log, config.inner, onRevoke = onRevoke)
let consumer : IConsumer<string,string> = ConsumerBuilder.WithLogging(log, config.inner, onRevoke=onRevoke)
let cts = new CancellationTokenSource()
let triggerStop () =
log.Information("Consuming... Stopping {name:l}", consumer.Name)
Expand All @@ -501,4 +504,4 @@ type BatchedConsumer private (inner : IConsumer<string, string>, task : Task<uni
|> Async.Ignore
}

BatchedConsumer.Start(log, config, partitionHandler)
BatchedConsumer.Start(log, config, partitionHandler)
8 changes: 5 additions & 3 deletions src/FsKafka0/FsKafka.fs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ type BatchedProducer private (log: ILogger, inner : Producer<string, string>, to
member __.HandleDeliveryReport m = handler m }
for key,value in keyValueBatch do
inner.ProduceAsync(topic, key, value, blockIfQueueFull = true, deliveryHandler = handler')
log.Debug("Produced {count}",!numCompleted)
return! Async.AwaitTaskCorrect tcs.Task }

/// Creates and wraps a Confluent.Kafka Producer that affords a batched production mode.
Expand Down Expand Up @@ -297,7 +296,7 @@ type ConsumerBuilder =
let d1 = c.OnLog.Subscribe(fun m ->
log.Information("Consuming... {message} level={level} name={name} facility={facility}", m.Message, m.Level, m.Name, m.Facility))
let d2 = c.OnError.Subscribe(fun e ->
log.Error("Consuming... Error reason={reason} code={code} broker={isBrokerError}", e.Reason, e.Code, e.IsBrokerError))
log.Warning("Consuming... Error reason={reason} code={code} broker={isBrokerError}", e.Reason, e.Code, e.IsBrokerError))
let d3 = c.OnPartitionsAssigned.Subscribe(fun tps ->
for topic,partitions in tps |> Seq.groupBy (fun p -> p.Topic) |> Seq.map (fun (t,ps) -> t, [| for p in ps -> p.Partition |]) do
log.Information("Consuming... Assigned {topic:l} {partitions}", topic, partitions)
Expand Down Expand Up @@ -328,7 +327,9 @@ type ConsumerBuilder =
yield kpm |]
let totalLag = metrics |> Array.sumBy (fun x -> x.consumerLag)
log.Information("Consuming... Stats {topic:l} totalLag {totalLag} {@stats}", topic, totalLag, metrics))
fun () -> for d in [d1;d2;d3;d4;d5;d6;d7] do d.Dispose()
let d8 = c.OnConsumeError.Subscribe (fun msg ->
log.Error("Consuming... Error reason={reason} topic={topic} partition={partition} offset={offset}", msg.Error.Reason, msg.Topic, msg.Partition, msg.Offset))
fun () -> for d in [d1;d2;d3;d4;d5;d6;d7;d8] do d.Dispose()
static member WithLogging(log : ILogger, config : ConsumerConfig, ?onRevoke) =
let consumer = new Consumer<_,_>(config.Render(), mkDeserializer(), mkDeserializer())
let unsubLog = ConsumerBuilder.WithLogging(log, consumer, ?onRevoke = onRevoke)
Expand Down Expand Up @@ -433,6 +434,7 @@ module private ConsumerImpl =
try match nextBatch() with
| [||] -> ()
| batch ->
log.Verbose("Dispatching {count} message(s) to handler", batch.Length)
// run the handler function
do! handler batch

Expand Down