Skip to content

Commit

Permalink
Target CK/librdkafka 1.9.0 (#91)
Browse files Browse the repository at this point in the history
* Target CK/librdk 1.9.0
* Remove Destructurama.FSharp dep
  • Loading branch information
bartelink authored Jun 18, 2022
1 parent 156d8ab commit 2ba166d
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 139 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Changed

- Update global.json to use SDK version `6.0.300`, use `DotNet.ReproducibleBuilds` [#90](https://github.com/jet/FsKafka/pull/90) [#93](https://github.com/jet/FsKafka/pull/93)
- Target [`Confluent.Kafka [1.9.0]`](https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/CHANGELOG.md#190), [`librdkafka.redist [1.9.0]`](https://github.com/edenhill/librdkafka/releases/tag/v1.9.0) [#91](https://github.com/jet/FsKafka/pull/91)

### Removed
### Fixed
Expand Down
1 change: 0 additions & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

<WarningLevel>5</WarningLevel>
<DisableImplicitFSharpCoreReference>true</DisableImplicitFSharpCoreReference>
<DisableImplicitSystemValueTupleReference>true</DisableImplicitSystemValueTupleReference>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

F# friendly wrapper for `Confluent.Kafka`, with minimal dependencies or additional abstractions (but see [related repos](#related-repos)).

`FsKafka` wraps `Confluent.Kafka` to provide efficient batched Kafka Producer and Consumer configurations with basic logging instrumentation. [Depends](https://www.fuget.org/packages/FsKafka) on `Confluent.Kafka [1.7.0]`, `librdkafka [1.7.0]` (pinned to ensure we use a tested pairing), `Serilog` (but no specific Serilog sinks, i.e. you configure to emit to `NLog` etc) and `Newtonsoft.Json` (used internally to parse Broker-provided Statistics for logging purposes).
`FsKafka` wraps `Confluent.Kafka` to provide efficient batched Kafka Producer and Consumer configurations with basic logging instrumentation. [Depends](https://www.fuget.org/packages/FsKafka) on [`Confluent.Kafka [1.9.0]`](https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/CHANGELOG.md#190), [`librdkafka.redist [1.9.0]`](https://github.com/edenhill/librdkafka/releases/tag/v1.9.0) (pinned to ensure we use a tested pairing), `Serilog` (but no specific Serilog sinks, i.e. you configure to emit to `NLog` etc) and `Newtonsoft.Json` (used internally to parse Broker-provided Statistics for logging purposes).

## Usage

Expand Down
146 changes: 73 additions & 73 deletions src/FsKafka/FsKafka.fs

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions src/FsKafka/FsKafka.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="MinVer" Version="3.1.0" PrivateAssets="All" />
<PackageReference Include="MinVer" Version="4.0.0" PrivateAssets="All" />

<PackageReference Include="FSharp.Core" Version="4.3.4" />

<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="Confluent.Kafka" Version="[1.7.0]" />
<PackageReference Include="librdkafka.redist" Version="[1.7.0]" />
<PackageReference Include="Confluent.Kafka" Version="[1.9.0]" />
<PackageReference Include="librdkafka.redist" Version="[1.9.0]" />
<PackageReference Include="Serilog" Version="2.7.1" />
</ItemGroup>

Expand Down
38 changes: 16 additions & 22 deletions src/FsKafka/Monitor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ namespace FsKafka
open Confluent.Kafka
open Serilog
open System
open System.Diagnostics
open System.Threading

type PartitionResult =
Expand All @@ -15,11 +14,6 @@ type PartitionResult =
| Healthy

module MonitorImpl =
#if NET461
module Array =
let head xs = Seq.head xs
let last xs = Seq.last xs
#endif
module private Map =
let mergeChoice (f : 'a -> Choice<'b * 'c, 'b, 'c> -> 'd) (map1 : Map<'a, 'b>) (map2 : Map<'a, 'c>) : Map<'a, 'd> =
Set.union (map1 |> Seq.map (fun k -> k.Key) |> set) (map2 |> Seq.map (fun k -> k.Key) |> set)
Expand Down Expand Up @@ -147,25 +141,25 @@ module MonitorImpl =
let buffer : 'A [] = Array.zeroCreate capacity
let mutable head,tail,size = 0,-1,0

member __.TryCopyFull() =
member _.TryCopyFull() =
if size <> capacity then None
else
let arr = Array.zeroCreate size
let mutable i = head
for x = 0 to size - 1 do
arr.[x] <- buffer.[i % capacity]
arr[x] <- buffer[i % capacity]
i <- i + 1
Some arr

member __.Add(x : 'A) =
member _.Add(x : 'A) =
tail <- (tail + 1) % capacity
buffer.[tail] <- x
buffer[tail] <- x
if (size < capacity) then
size <- size + 1
else
head <- (head + 1) % capacity

member __.Clear() =
member _.Clear() =
head <- 0
tail <- -1
size <- 0
Expand Down Expand Up @@ -269,7 +263,7 @@ module MonitorImpl =
onStatus topic group states }

let rec loop failCount = async {
let sw = Stopwatch.StartNew()
let sw = System.Diagnostics.Stopwatch.StartNew()
let! failCount = async {
try if validateAssignments () then
do! checkConsumerProgress()
Expand Down Expand Up @@ -304,7 +298,7 @@ module MonitorImpl =
match res with
| Choice1Of3 (), _ -> ()
| Choice2Of3 (), errs ->
let lag = function (partitionId, ErrorPartitionStalled lag) -> Some (partitionId,lag) | x -> failwithf "mis-mapped %A" x
let lag = function partitionId, ErrorPartitionStalled lag -> Some (partitionId,lag) | x -> failwithf "mis-mapped %A" x
log.Error("Monitoring... {topic}/{group} Stalled with backlogs on {@stalled} [(partition,lag)]", topic, group, errs |> Seq.choose lag)
| Choice3Of3 (), warns ->
log.Warning("Monitoring... {topic}/{group} Growing lags on {@partitionIds}", topic, group, warns |> Seq.map fst)
Expand All @@ -327,11 +321,11 @@ module MonitorImpl =
/// and then map that to a per-partition status for each partition that the consumer being observed has been assigned
type KafkaMonitor<'k,'v>
( log : ILogger,
/// Interval between checks of high/low watermarks. Default 30s
// Interval between checks of high/low watermarks. Default 30s
?interval,
/// Number if readings per partition to use in order to make inferences. Default 10 (at default interval of 30s, implies a 5m window).
// Number if readings per partition to use in order to make inferences. Default 10 (at default interval of 30s, implies a 5m window).
?windowSize,
/// Number of failed calls to broker that should trigger discarding of buffered readings in order to avoid false positives. Default 3.
// Number of failed calls to broker that should trigger discarding of buffered readings in order to avoid false positives. Default 3.
?failResetCount) =
let failResetCount = defaultArg failResetCount 3
let interval = defaultArg interval (TimeSpan.FromSeconds 30.)
Expand All @@ -340,14 +334,14 @@ type KafkaMonitor<'k,'v>

/// Periodically supplies the status for all assigned partitions (whenever we've gathered `windowSize` of readings)
/// Subscriber can e.g. use this to force a consumer restart if no progress is being made
[<CLIEvent>] member __.OnStatus = onStatus.Publish
[<CLIEvent>] member _.OnStatus = onStatus.Publish

/// Raised whenever call to broker to ascertain watermarks has failed
/// Subscriber can e.g. raise an alert if enough consecutive failures have occurred
[<CLIEvent>] member __.OnCheckFailed = onCheckFailed.Publish
[<CLIEvent>] member _.OnCheckFailed = onCheckFailed.Publish

// One of these runs per topic
member private __.Pump(consumer, topic, group) = async {
member private _.Pump(consumer, topic, group) = async {
let! ct = Async.CancellationToken
let onQuery res =
MonitorImpl.Logging.logLatest log topic group res
Expand All @@ -361,8 +355,8 @@ type KafkaMonitor<'k,'v>
return! MonitorImpl.run consumer (interval,windowSize,failResetCount) topic group (onQuery,onCheckFailed,onStatus)
}
/// Commences a monitoring task per subscribed topic
member __.Start(target : IConsumer<'k,'v>, group) =
member m.Start(target : IConsumer<'k,'v>, group) =
let cts = new CancellationTokenSource()
for topic in target.Subscription do
Async.Start(__.Pump(target, topic, group), cts.Token)
{ new IDisposable with member __.Dispose() = cts.Cancel() }
Async.Start(m.Pump(target, topic, group), cts.Token)
{ new IDisposable with member _.Dispose() = cts.Cancel() }
4 changes: 2 additions & 2 deletions tests/FsKafka.Integration/FsKafka.Integration.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<DisableImplicitFSharpCoreReference>false</DisableImplicitFSharpCoreReference>
</PropertyGroup>

<ItemGroup>
Expand All @@ -16,7 +15,8 @@

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.2.0" />
<PackageReference Include="Destructurama.FSharp.NetCore" Version="1.0.14" />
<!-- NOTE must be >= 6.0.5 to ensure no exposure to https://github.com/dotnet/fsharp/issues/13165 -->
<PackageReference Include="FSharp.Core" Version="6.0.5" />
<PackageReference Include="unquote" Version="6.1.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
Expand Down
56 changes: 19 additions & 37 deletions tests/FsKafka.Integration/Integration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ open System
open System.Collections.Concurrent
open System.ComponentModel
open System.Threading
open System.Threading.Tasks
open Xunit

module Config =
Expand All @@ -35,11 +34,10 @@ module Helpers =
formatter.Format(logEvent, writer);
writer |> string |> testOutput.WriteLine
writer |> string |> System.Diagnostics.Debug.WriteLine
interface Serilog.Core.ILogEventSink with member __.Emit logEvent = writeSerilogEvent logEvent
interface Serilog.Core.ILogEventSink with member _.Emit logEvent = writeSerilogEvent logEvent

let createLogger sink =
LoggerConfiguration()
.Destructure.FSharpTypes()
.WriteTo.Sink(sink)
.WriteTo.Seq("http://localhost:5341")
.CreateLogger()
Expand All @@ -49,15 +47,11 @@ module Helpers =
| x when String.IsNullOrEmpty x -> invalidOp "missing environment variable 'TEST_KAFKA_BROKER'"
| x -> Uri x |> Config.validateBrokerUri

let newId () = let g = System.Guid.NewGuid() in g.ToString("N")
let newId () = let g = Guid.NewGuid() in g.ToString("N")

type Async with

// Unsafe in the name re https://github.com/dotnet/fsharp/issues/13165
// Reasonable in the context of this codebase as we do not exceed 1200 computations
// When there's an in-the-box ParallelThrottled, or this codebase is known to target an FSharp.Core that does not suffer from this issue,
// the Unsafe part of the name (and this note) can be removed
static member ParallelThrottledUnsafe dop computations =
static member ParallelThrottled dop computations =
Async.Parallel(computations, maxDegreeOfParallelism = dop)

type BatchedConsumer with
Expand All @@ -76,17 +70,13 @@ module Helpers =
consumerId : int
result : ConsumeResult<string,string>
payload : TestMessage
headers: MessageHeaders
headers : MessageHeaders
}

type ConsumerCallback = BatchedConsumer -> ConsumedTestMessage [] -> Async<unit>

let headers =
#if !KAFKA0
seq ["kafka", [| 0xDEuy; 0xADuy; 0xBEuy; 0xEFuy |]]
#else
seq []
#endif

let runProducers log broker (topic : string) (numProducers : int) (messagesPerProducer : int) = async {
let runProducer (producerId : int) = async {
Expand All @@ -99,15 +89,11 @@ module Helpers =
|> Seq.map (fun msgId ->
let key = string msgId
let value = JsonConvert.SerializeObject { producerId = producerId ; messageId = msgId }
#if KAFKA0
key, value
#else
key, value, headers
#endif
)
|> Seq.chunkBySize 100
|> Seq.map producer.ProduceBatch
|> Async.ParallelThrottledUnsafe 7
|> Async.ParallelThrottled 7

return Array.concat results
}
Expand All @@ -120,13 +106,9 @@ module Helpers =
let deserialize result =
let message = Binding.message result
let headers =
#if !KAFKA0
match message.Headers with
| null -> Seq.empty
| h -> h |> Seq.map(fun h -> h.Key, h.GetValueBytes())
#else
Seq.empty
#endif

{
consumerId = consumerId
Expand All @@ -139,14 +121,14 @@ module Helpers =
let consumerCell = ref None
let rec getConsumer() =
// avoid potential race conditions by polling
match !consumerCell with
match consumerCell.Value with
| None -> Thread.SpinWait 20; getConsumer()
| Some c -> c

let partitionHandler batch = handler (getConsumer()) (Array.map deserialize batch)
use consumer = BatchedConsumer.Start(log, config, partitionHandler)

consumerCell := Some consumer
consumerCell.Value <- Some consumer

timeout |> Option.iter consumer.StopAfter

Expand All @@ -158,7 +140,7 @@ module Helpers =

type FactIfBroker() =
inherit FactAttribute()
override __.Skip = if null <> Environment.GetEnvironmentVariable "TEST_KAFKA_BROKER" then null else "Skipping as no TEST_KAFKA_BROKER supplied"
override _.Skip = if null <> Environment.GetEnvironmentVariable "TEST_KAFKA_BROKER" then null else "Skipping as no TEST_KAFKA_BROKER supplied"

type T1(testOutputHelper) =
let log, broker = createLogger (TestOutputAdapter testOutputHelper), getTestBroker ()
Expand Down Expand Up @@ -258,15 +240,15 @@ type T2(testOutputHelper) =
do! runConsumers log config 1 None
(fun c b -> async { if Interlocked.Add(messageCount, b.Length) >= numMessages then c.Stop() })

test <@ numMessages = !messageCount @>
test <@ numMessages = messageCount.Value @>

let messageCount = ref 0
let groupId2 = newId()
let config = KafkaConsumerConfig.Create("panther", broker, [topic], groupId2, AutoOffsetReset.Earliest)
do! runConsumers log config 1 None
(fun c b -> async { if Interlocked.Add(messageCount, b.Length) >= numMessages then c.Stop() })

test <@ numMessages = !messageCount @>
test <@ numMessages = messageCount.Value @>
}

let [<FactIfBroker>] ``Spawning a new consumer with same consumer group id should not receive new messages`` () = async {
Expand All @@ -284,15 +266,15 @@ type T2(testOutputHelper) =
if Interlocked.Add(messageCount, b.Length) >= numMessages then
c.StopAfter(TimeSpan.FromSeconds 1.) }) // cancel after 1 second to allow offsets to be stored

test <@ numMessages = !messageCount @>
test <@ numMessages = messageCount.Value @>

// expected to read no messages from the subsequent consumer
let messageCount = ref 0
do! runConsumers log config 1 (Some (TimeSpan.FromSeconds 10.))
(fun c b -> async {
if Interlocked.Add(messageCount, b.Length) >= numMessages then c.Stop() })

test <@ 0 = !messageCount @>
test <@ 0 = messageCount.Value @>
}

// separated test type to allow the tests to run in parallel
Expand All @@ -314,7 +296,7 @@ type T3(testOutputHelper) =
if Interlocked.Add(messageCount, b.Length) >= numMessages then
c.StopAfter(TimeSpan.FromSeconds 1.) }) // cancel after 1 second to allow offsets to be committed)

test <@ numMessages = !messageCount @>
test <@ numMessages = messageCount.Value @>

let! _ = runProducers log broker topic 1 numMessages // produce more messages

Expand All @@ -326,7 +308,7 @@ type T3(testOutputHelper) =
if Interlocked.Add(messageCount, b.Length) >= numMessages then
c.StopAfter(TimeSpan.FromSeconds 1.) }) // cancel after 1 second to allow offsets to be committed)

test <@ numMessages = !messageCount @>
test <@ numMessages = messageCount.Value @>
}

let [<FactIfBroker>] ``Consumers should never schedule two batches of the same partition concurrently`` () = async {
Expand Down Expand Up @@ -355,7 +337,7 @@ type T3(testOutputHelper) =

do! runConsumers log config 1 None
(fun c b -> async {
let partition = Binding.partitionValue b.[0].result.Partition
let partition = Binding.partitionValue b[0].result.Partition

// check batch sizes are bounded by maxBatchSize
test <@ b.Length <= maxBatchSize @> // "batch sizes should never exceed maxBatchSize")
Expand All @@ -368,16 +350,16 @@ type T3(testOutputHelper) =
// check for message monotonicity
let offset = getPartitionOffset partition
for msg in b do
Assert.True((let o = msg.result.Offset in o.Value) > !offset, "offset for partition should be monotonic")
offset := let o = msg.result.Offset in o.Value
Assert.True((let o = msg.result.Offset in o.Value) > offset.Value, "offset for partition should be monotonic")
offset.Value <- let o = msg.result.Offset in o.Value

do! Async.Sleep 100

let _ = Interlocked.Decrement concurrentBatchCell

if Interlocked.Add(globalMessageCount, b.Length) >= numMessages then c.Stop() })

test <@ numMessages = !globalMessageCount @>
test <@ numMessages = globalMessageCount.Value @>
}

// separated test type to allow the tests to run in parallel
Expand Down Expand Up @@ -450,7 +432,7 @@ type T4(testOutputHelper) =
let! res = async {
use consumer = BatchedConsumer.Start(log, consumerCfg, handle)
consumer.StopAfter (TimeSpan.FromSeconds 20.)
use _ = FsKafka.KafkaMonitor(log).Start(consumer.Inner, consumerCfg.Inner.GroupId)
use _ = KafkaMonitor(log).Start(consumer.Inner, consumerCfg.Inner.GroupId)
return! consumer.AwaitShutdown() |> Async.Catch
}

Expand Down

0 comments on commit 2ba166d

Please sign in to comment.