Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add way to filter InFlightMessageCounter re #31 #32

Merged
merged 1 commit into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added

- split batching behaviors out into `BatchedProducer`/`BatchedConsumer` [#30](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/30)
- mechanism to remove logging regarding polling backoff [#32](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/32) HT [@szer](https://github.com/Szer) re [#31](https://github.com/jet/Jet.ConfluentKafka.FSharp/issues/31)

### Changed

- split batching behaviors out into `BatchedProducer`/`BatchedConsumer` [#30](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/30)
- default auto-commit interval dropped from 10s to 5s (which is the `Confluent.Kafka` default) [#30](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/30)
- removed curried `member` Method arguments in `Start` methods

Expand Down
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ See [the Equinox QuickStart](https://github.com/jet/equinox#quickstart) for exam
The components within this repository are delivered as (presently single) multi-targeted Nuget package targeting `net461` (F# 3.1+) and `netstandard2.0` (F# 4.5+) profiles

- [![NuGet](https://img.shields.io/nuget/vpre/Jet.ConfluentKafka.FSharp.svg)](https://www.nuget.org/packages/Jet.ConfluentKafka.FSharp/) `Jet.ConfluentKafka.FSharp`: Wraps `Confluent.Kafka` to provide efficient batched Kafka Producer and Consumer configurations, with basic logging instrumentation.
[Depends](https://www.fuget.org/packages/Jet.ConfluentKafka.FSharp) on `Confluent.Kafka [1.0.0]`, `librdkafka [1.0.0]` (pinned to ensure we use a tested pairing enterprise wide), `Serilog` (but no specific Serilog sinks, i.e. you configure to emit to `NLog` etc) and `Newtonsoft.Json` (used internally to parse Statistics for logging purposes).
[Depends](https://www.fuget.org/packages/Jet.ConfluentKafka.FSharp) on `Confluent.Kafka [1.0.0]`, `librdkafka [1.0.0]` (pinned to ensure we use a tested pairing enterprise wide), `Serilog` (but no specific Serilog sinks, i.e. you configure to emit to `NLog` etc) and `Newtonsoft.Json` (used internally to parse Broker-provided Statistics for logging purposes).

## CONTRIBUTING

Expand Down Expand Up @@ -51,6 +51,12 @@ dotnet build build.proj -v n

## FAQ

### How do I get rid of all the `breaking off polling` ... `resuming polling` spam?

- The `BatchedConsumer` implementation tries to give clear feedback as to when reading is not keeping up, for diagnostic purposes. As of [#32](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/32), such messages are tagged with the type `Jet.ConfluentKafka.FSharp.InFlightMessageCounter`, and as such can be silenced by including the following in one's `LoggerConfiguration()`:

`.MinimumLevel.Override(Jet.ConfluentKafka.FSharp.Constants.messageCounterSourceContext, Serilog.Events.LogEventLevel.Warning)`

### What is this, why does it exist, where did it come from, is anyone using it ?

This code results from building out an end-to-end batteries-included set of libraries and templates as part of the [Equinox](https://github.com/jet/equinox) project.
Expand Down
10 changes: 7 additions & 3 deletions src/Jet.ConfluentKafka.FSharp/ConfluentKafka.fs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ type BatchedProducer private (log: ILogger, inner : IProducer<string, string>, t

type ConsumerBufferingConfig = { minInFlightBytes : int64; maxInFlightBytes : int64; maxBatchSize : int; maxBatchDelay : TimeSpan }

module Constants =
let messageCounterSourceContext = "Jet.ConfluentKafka.FSharp.InFlightMessageCounter"

module private ConsumerImpl =
/// guesstimate approximate message size in bytes
let approximateMessageBytes (message : ConsumeResult<string, string>) =
Expand Down Expand Up @@ -217,9 +220,9 @@ module private ConsumerImpl =

member __.AwaitThreshold() =
if inFlightBytes > maxInFlightBytes then
log.Warning("Consumer reached in-flight message threshold, breaking off polling, bytes={max}", inFlightBytes)
log.Information("Consumer reached in-flight message threshold, breaking off polling, bytes={max}", inFlightBytes)
while inFlightBytes > minInFlightBytes do Thread.Sleep 5
log.Information "Consumer resuming polling"
log.Verbose "Consumer resuming polling"

let mkBatchedMessageConsumer (log: ILogger) (buf : ConsumerBufferingConfig) (ct : CancellationToken) (consumer : IConsumer<string, string>)
(partitionedCollection: PartitionedBlockingCollection<TopicPartition, ConsumeResult<string, string>>)
Expand All @@ -230,7 +233,8 @@ module private ConsumerImpl =

use _ = consumer

let counter = new InFlightMessageCounter(log, buf.minInFlightBytes, buf.maxInFlightBytes)
let mcLog = log.ForContext(Serilog.Core.Constants.SourceContextPropertyName, Constants.messageCounterSourceContext)
let counter = new InFlightMessageCounter(mcLog, buf.minInFlightBytes, buf.maxInFlightBytes)

// starts a tail recursive loop that dequeues batches for a given partition buffer and schedules the user callback
let consumePartition (collection : BlockingCollection<ConsumeResult<string, string>>) =
Expand Down