From 2ba166d1ced36550291cc87ed4f6af47b18e56e3 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sat, 18 Jun 2022 10:20:59 +0100 Subject: [PATCH] Target CK/librdkafka 1.9.0 (#91) * Target CK/librdk 1.9.0 * Remove Destructurama.FSharp dep --- CHANGELOG.md | 1 + Directory.Build.props | 1 - README.md | 2 +- src/FsKafka/FsKafka.fs | 146 +++++++++--------- src/FsKafka/FsKafka.fsproj | 6 +- src/FsKafka/Monitor.fs | 38 ++--- .../FsKafka.Integration.fsproj | 4 +- tests/FsKafka.Integration/Integration.fs | 56 +++---- 8 files changed, 115 insertions(+), 139 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7754cd6..e196f2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Changed - Update global.json to use SDK version `6.0.300`, use `DotNet.ReproducibleBuilds` [#90](https://github.com/jet/FsKafka/pull/90) [#93](https://github.com/jet/FsKafka/pull/93) +- Target [`Confluent.Kafka [1.9.0]`](https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/CHANGELOG.md#190), [`librdkafka.redist [1.9.0]`](https://github.com/edenhill/librdkafka/releases/tag/v1.9.0) [#91](https://github.com/jet/FsKafka/pull/91) ### Removed ### Fixed diff --git a/Directory.Build.props b/Directory.Build.props index 954b77e..ae608d8 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -14,7 +14,6 @@ 5 true - true true diff --git a/README.md b/README.md index 287581e..20ca252 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ F# friendly wrapper for `Confluent.Kafka`, with minimal dependencies or additional abstractions (but see [related repos](#related-repos)). -`FsKafka` wraps `Confluent.Kafka` to provide efficient batched Kafka Producer and Consumer configurations with basic logging instrumentation. [Depends](https://www.fuget.org/packages/FsKafka) on `Confluent.Kafka [1.7.0]`, `librdkafka [1.7.0]` (pinned to ensure we use a tested pairing), `Serilog` (but no specific Serilog sinks, i.e. you configure to emit to `NLog` etc) and `Newtonsoft.Json` (used internally to parse Broker-provided Statistics for logging purposes). +`FsKafka` wraps `Confluent.Kafka` to provide efficient batched Kafka Producer and Consumer configurations with basic logging instrumentation. [Depends](https://www.fuget.org/packages/FsKafka) on [`Confluent.Kafka [1.9.0]`](https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/CHANGELOG.md#190), [`librdkafka.redist [1.9.0]`](https://github.com/edenhill/librdkafka/releases/tag/v1.9.0) (pinned to ensure we use a tested pairing), `Serilog` (but no specific Serilog sinks, i.e. you configure to emit to `NLog` etc) and `Newtonsoft.Json` (used internally to parse Broker-provided Statistics for logging purposes). ## Usage diff --git a/src/FsKafka/FsKafka.fs b/src/FsKafka/FsKafka.fs index e00c256..7c1d4de 100644 --- a/src/FsKafka/FsKafka.fs +++ b/src/FsKafka/FsKafka.fs @@ -12,7 +12,7 @@ open System.Threading.Tasks module Binding = - let message (x : Confluent.Kafka.ConsumeResult) = x.Message + let message (x : ConsumeResult) = x.Message let offsetValue (x : Offset) : int64 = x.Value let partitionValue (x : Partition) : int = x.Value let internal makeTopicPartition (topic : string) (partition : int) = TopicPartition(topic, Partition partition) @@ -36,39 +36,39 @@ type Batching = /// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for documentation on the implications of specific settings [] type KafkaProducerConfig private (inner, bootstrapServers : string) = - member __.Inner : ProducerConfig = inner - member __.BootstrapServers = bootstrapServers - member __.Acks = let v = inner.Acks in v.Value - member __.Linger = let v = inner.LingerMs in v.Value - member __.MaxInFlight = let v = inner.MaxInFlight in v.Value - member __.Compression = let v = inner.CompressionType in v.GetValueOrDefault(CompressionType.None) + member _.Inner : ProducerConfig = inner + member _.BootstrapServers = bootstrapServers + member _.Acks = let v = inner.Acks in v.Value + member _.Linger = let v = inner.LingerMs in v.Value + member _.MaxInFlight = let v = inner.MaxInFlight in v.Value + member _.Compression = let v = inner.CompressionType in v.GetValueOrDefault(CompressionType.None) /// Creates and wraps a Confluent.Kafka ProducerConfig with the specified settings static member Create ( clientId : string, bootstrapServers : string, acks, - /// Defines combination of linger/maxInFlight settings to effect desired batching semantics + // Defines combination of linger/maxInFlight settings to effect desired batching semantics batching : Batching, - /// Message compression. Default: None. + // Message compression. Default: None. ?compression, - /// Number of retries. Confluent.Kafka default: 2. Default: 60. + // Number of retries. Confluent.Kafka default: 2. Default: 60. ?retries, - /// Backoff interval. Confluent.Kafka default: 100ms. Default: 1s. + // Backoff interval. Confluent.Kafka default: 100ms. Default: 1s. ?retryBackoff, - /// Statistics Interval. Default: no stats. + // Statistics Interval. Default: no stats. ?statisticsInterval, - /// Ack timeout (assuming Acks != Acks.Zero). Confluent.Kafka default: 5s. + // Ack timeout (assuming Acks != Acks.Zero). Confluent.Kafka default: 5s. ?requestTimeout, - /// Confluent.Kafka default: false. Defaults to true. + // Confluent.Kafka default: false. Defaults to true. ?socketKeepAlive, - /// Partition algorithm. Default: `ConsistentRandom`. + // Partition algorithm. Default: `ConsistentRandom`. ?partitioner, - /// Confluent.Kafka default: 1mB + // Confluent.Kafka default: 1mB ?messageMaxBytes, - /// Miscellaneous configuration parameters to be passed to the underlying Confluent.Kafka producer configuration. Same as constructor argument for Confluent.Kafka >=1.2. + // Miscellaneous configuration parameters to be passed to the underlying Confluent.Kafka producer configuration. Same as constructor argument for Confluent.Kafka >=1.2. ?config : IDictionary, - /// Miscellaneous configuration parameters to be passed to the underlying Confluent.Kafka producer configuration. + // Miscellaneous configuration parameters to be passed to the underlying Confluent.Kafka producer configuration. ?custom : #seq>, - /// Postprocesses the ProducerConfig after the rest of the rules have been applied + // Postprocesses the ProducerConfig after the rest of the rules have been applied ?customize : ProducerConfig -> unit) = let linger, maxInFlight = match batching with @@ -108,8 +108,8 @@ module private Message = /// Creates and wraps a Confluent.Kafka Producer with the supplied configuration type KafkaProducer private (inner : IProducer, topic : string) = - member __.Inner = inner - member __.Topic = topic + member _.Inner = inner + member _.Topic = topic interface IDisposable with member _.Dispose() = inner.Dispose() @@ -119,7 +119,7 @@ type KafkaProducer private (inner : IProducer, topic : string) = /// Thus its critical to ensure you don't submit another message for the same key until you've had a success / failure /// response from the call. /// - member __.ProduceAsync(message : Message) : Async> = async { + member _.ProduceAsync(message : Message) : Async> = async { let! ct = Async.CancellationToken return! inner.ProduceAsync(topic, message, ct) |> Async.AwaitTaskCorrect } @@ -129,9 +129,9 @@ type KafkaProducer private (inner : IProducer, topic : string) = /// Thus its critical to ensure you don't submit another message for the same key until you've had a success / failure /// response from the call. /// - member __.ProduceAsync(key : string, value: string, headers : seq) : Async> = + member p.ProduceAsync(key : string, value: string, headers : seq) : Async> = let message = Message.createWithHeaders(key, value, headers) - __.ProduceAsync(message) + p.ProduceAsync(message) /// Produces a single message, yielding a response upon completion/failure of the ack (>3ms to complete) /// @@ -139,8 +139,8 @@ type KafkaProducer private (inner : IProducer, topic : string) = /// Thus its critical to ensure you don't submit another message for the same key until you've had a success / failure /// response from the call. /// - member __.ProduceAsync(key : string, value : string) : Async> = - __.ProduceAsync(Message.create (key, value)) + member p.ProduceAsync(key : string, value : string) : Async> = + p.ProduceAsync(Message.create (key, value)) static member Create(log : ILogger, config : KafkaProducerConfig, topic : string): KafkaProducer = if String.IsNullOrEmpty topic then nullArg "topic" @@ -154,10 +154,10 @@ type KafkaProducer private (inner : IProducer, topic : string) = new KafkaProducer(p, topic) type BatchedProducer private (inner : IProducer, topic : string) = - member __.Inner = inner - member __.Topic = topic + member _.Inner = inner + member _.Topic = topic - interface IDisposable with member __.Dispose() = inner.Dispose() + interface IDisposable with member _.Dispose() = inner.Dispose() /// /// Produces a batch of supplied key/value messages. Results are returned in order of writing (which may vary from order of submission). @@ -169,7 +169,7 @@ type BatchedProducer private (inner : IProducer, topic : string) /// Note that the delivery and/or write order may vary from the supplied order unless `maxInFlight` is 1 (which massively constrains throughput). /// Thus it's important to note that supplying >1 item into the queue bearing the same key without maxInFlight=1 risks them being written out of order onto the topic. /// - member __.ProduceBatch(messageBatch : Message<_, _>[]) : Async[]> = async { + member _.ProduceBatch(messageBatch : Message<_, _>[]) : Async[]> = async { if Array.isEmpty messageBatch then return [||] else let! ct = Async.CancellationToken @@ -187,7 +187,7 @@ type BatchedProducer private (inner : IProducer, topic : string) tcs.TrySetException errorMsg |> ignore else let i = Interlocked.Increment numCompleted - results.[i - 1] <- m + results[i - 1] <- m if i = numMessages then tcs.TrySetResult results |> ignore for message in messageBatch do @@ -200,15 +200,15 @@ type BatchedProducer private (inner : IProducer, topic : string) /// Produces a batch of supplied key/value messages. /// See the other overload. /// - member __.ProduceBatch(messageBatch : seq) : Async[]> = - __.ProduceBatch([| for pair in messageBatch -> Message.create pair |]) + member p.ProduceBatch(messageBatch : seq) : Async[]> = + p.ProduceBatch([| for pair in messageBatch -> Message.create pair |]) /// /// Produces a batch of messages with supplied key/value/headers. /// See the other overload. /// - member __.ProduceBatch(messageBatch : seq>) : Async[]> = - __.ProduceBatch([| for pair in messageBatch -> Message.createWithHeaders pair |]) + member p.ProduceBatch(messageBatch : seq>) : Async[]> = + p.ProduceBatch([| for pair in messageBatch -> Message.createWithHeaders pair |]) /// Creates and wraps a Confluent.Kafka Producer that affords a best effort batched production mode. /// NB See caveats on the `ProduceBatch` API for further detail as to the semantics @@ -237,10 +237,10 @@ module Core = let mutable inFlightBytes = 0L - member __.InFlightMb = float inFlightBytes / 1024. / 1024. - member __.Delta(numBytes : int64) = Interlocked.Add(&inFlightBytes, numBytes) |> ignore - member __.IsOverLimitNow() = Volatile.Read(&inFlightBytes) > maxInFlightBytes - member __.AwaitThreshold(ct : CancellationToken, consumer : IConsumer<_,_>, ?busyWork) = + member _.InFlightMb = float inFlightBytes / 1024. / 1024. + member _.Delta(numBytes : int64) = Interlocked.Add(&inFlightBytes, numBytes) |> ignore + member _.IsOverLimitNow() = Volatile.Read(&inFlightBytes) > maxInFlightBytes + member c.AwaitThreshold(ct : CancellationToken, consumer : IConsumer<_,_>, ?busyWork) = // Avoid having our assignments revoked due to MAXPOLL (exceeding max.poll.interval.ms between calls to .Consume) let showConsumerWeAreStillAlive () = let tps = consumer.Assignment @@ -248,7 +248,7 @@ module Core = match busyWork with Some f -> f () | None -> () let _ = consumer.Consume(1) consumer.Resume(tps) - if __.IsOverLimitNow() then + if c.IsOverLimitNow() then log.ForContext("maxB", maxInFlightBytes).Information("Consuming... breached in-flight message threshold (now ~{currentB:n0}B), quiescing until it drops to < ~{minMb:n1}MiB", inFlightBytes, float minInFlightBytes / 1024. / 1024.) while Volatile.Read(&inFlightBytes) > minInFlightBytes && not ct.IsCancellationRequested do @@ -258,46 +258,46 @@ module Core = /// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for documentation on the implications of specific settings [] type KafkaConsumerConfig = private { inner: ConsumerConfig; topics: string list; buffering: Core.ConsumerBufferingConfig } with - member __.Buffering = __.buffering - member __.Inner = __.inner - member __.Topics = __.topics + member x.Buffering = x.buffering + member x.Inner = x.inner + member x.Topics = x.topics /// Builds a Kafka Consumer Config suitable for KafkaConsumer.Start* static member Create - ( /// Identify this consumer in logs etc + ( // Identify this consumer in logs etc clientId : string, bootstrapServers : string, topics, - /// Consumer group identifier. + // Consumer group identifier. groupId, - /// Specifies handling when Consumer Group does not yet have an offset recorded. Confluent.Kafka default: start from Latest. + // Specifies handling when Consumer Group does not yet have an offset recorded. Confluent.Kafka default: start from Latest. autoOffsetReset, - /// Default 100kB. Confluent.Kafka default: 500MB + // Default 100kB. Confluent.Kafka default: 500MB ?fetchMaxBytes, - /// Default: use `fetchMaxBytes` value (or its default, 100kB). Confluent.Kafka default: 1mB + // Default: use `fetchMaxBytes` value (or its default, 100kB). Confluent.Kafka default: 1mB ?messageMaxBytes, - /// Minimum number of bytes to wait for (subject to timeout with default of 100ms). Default 1B. + // Minimum number of bytes to wait for (subject to timeout with default of 100ms). Default 1B. ?fetchMinBytes, - /// Stats reporting interval for the consumer. Default: no reporting. + // Stats reporting interval for the consumer. Default: no reporting. ?statisticsInterval, - /// Consumed offsets commit interval. Default 5s. + // Consumed offsets commit interval. Default 5s. ?autoCommitInterval, - /// Override default policy wrt auto-creating topics. Confluent.Kafka < 1.5 default: true; Confluent.Kafka >= 1.5 default: false + // Override default policy wrt auto-creating topics. Confluent.Kafka < 1.5 default: true; Confluent.Kafka >= 1.5 default: false ?allowAutoCreateTopics, - /// Misc configuration parameters to be passed to the underlying CK consumer. Same as constructor argument for Confluent.Kafka >=1.2. + // Misc configuration parameters to be passed to the underlying CK consumer. Same as constructor argument for Confluent.Kafka >=1.2. ?config : IDictionary, - /// Misc configuration parameter to be passed to the underlying CK consumer. + // Misc configuration parameter to be passed to the underlying CK consumer. ?custom : #seq>, - /// Postprocesses the ConsumerConfig after the rest of the rules have been applied + // Postprocesses the ConsumerConfig after the rest of the rules have been applied ?customize : ConsumerConfig -> unit, (* Client-side batching / limiting of reading ahead to constrain memory consumption *) - /// Minimum total size of consumed messages in-memory for the consumer to attempt to fill. Default 2/3 of maxInFlightBytes. + // Minimum total size of consumed messages in-memory for the consumer to attempt to fill. Default 2/3 of maxInFlightBytes. ?minInFlightBytes, - /// Maximum total size of consumed messages in-memory before broker polling is throttled. Default 24MiB. + // Maximum total size of consumed messages in-memory before broker polling is throttled. Default 24MiB. ?maxInFlightBytes, - /// Message batch linger time. Default 500ms. + // Message batch linger time. Default 500ms. ?maxBatchDelay, - /// Maximum number of messages to group per batch on consumer callbacks for BatchedConsumer. Default 1000. + // Maximum number of messages to group per batch on consumer callbacks for BatchedConsumer. Default 1000. ?maxBatchSize) = let maxInFlightBytes = defaultArg maxInFlightBytes (16L * 1024L * 1024L) let minInFlightBytes = defaultArg minInFlightBytes (maxInFlightBytes * 2L / 3L) @@ -410,9 +410,9 @@ module private ConsumerImpl = while i < n && not cts.IsCancellationRequested do if bc.TryTake(&t, 5 (* ms *)) then - buffer.[i] <- t ; i <- i + 1 + buffer[i] <- t ; i <- i + 1 while i < n && not cts.IsCancellationRequested && bc.TryTake(&t) do - buffer.[i] <- t ; i <- i + 1 + buffer[i] <- t ; i <- i + 1 i type PartitionedBlockingCollection<'Key, 'Message when 'Key : equality>(?perPartitionCapacity : int) = @@ -425,9 +425,9 @@ module private ConsumerImpl = | Some c -> new BlockingCollection<'Message>(boundedCapacity=c) [] - member __.OnPartitionAdded = onPartitionAdded.Publish + member _.OnPartitionAdded = onPartitionAdded.Publish - member __.Add (key : 'Key, message : 'Message) = + member _.Add (key : 'Key, message : 'Message) = let factory key = lazy( let coll = createCollection() onPartitionAdded.Trigger(key, coll) @@ -436,7 +436,7 @@ module private ConsumerImpl = let buffer = collections.GetOrAdd(key, factory) buffer.Value.Add message - member __.Revoke(key : 'Key) = + member _.Revoke(key : 'Key) = match collections.TryRemove key with | true, coll -> Task.Delay(10000).ContinueWith(fun _ -> coll.Value.CompleteAdding()) |> ignore | _ -> () @@ -458,7 +458,7 @@ module private ConsumerImpl = let buffer = Array.zeroCreate buf.maxBatchSize let nextBatch () = let count = collection.FillBuffer(buffer, buf.maxBatchDelay) - let batch = Array.init count (fun i -> buffer.[i]) + let batch = Array.init count (fun i -> buffer[i]) Array.Clear(buffer, 0, count) batch @@ -505,7 +505,7 @@ module private ConsumerImpl = counter.Delta(+approximateMessageBytes result) partitionedCollection.Add(result.TopicPartition, result) with| :? ConsumeException as e -> log.Warning(e, "Consuming... exception {name}", consumer.Name) - | :? System.OperationCanceledException -> log.Warning("Consuming... cancelled {name}", consumer.Name) + | :? OperationCanceledException -> log.Warning("Consuming... cancelled {name}", consumer.Name) finally consumer.Close() @@ -517,17 +517,17 @@ module private ConsumerImpl = /// (parallel across partitions, sequenced/monotonic within) batch of processing carried out by the `partitionHandler` /// Conclusion of the processing (when a `partitionHandler` throws and/or `Stop()` is called) can be awaited via AwaitShutdown or AwaitWithStopOnCancellation. type BatchedConsumer private (inner : IConsumer, task : Task, triggerStop) = - member __.Inner = inner + member _.Inner = inner - interface IDisposable with member __.Dispose() = __.Stop() + interface IDisposable with member x.Dispose() = x.Stop() /// Request cancellation of processing - member __.Stop() = triggerStop () + member _.Stop() = triggerStop () /// Inspects current status of processing task - member __.Status = task.Status - member __.RanToCompletion = task.Status = TaskStatus.RanToCompletion + member _.Status = task.Status + member _.RanToCompletion = task.Status = TaskStatus.RanToCompletion /// Asynchronously awaits until consume loop stops or is faulted.
/// NOTE: does not Stop the consumer in response to Cancellation; see AwaitWithStopOnCancellation for such a mechanism - member __.AwaitShutdown() = + member _.AwaitShutdown() = // NOTE NOT Async.AwaitTask task, or we'd hang in the case of Cancellation via `Stop()` Async.AwaitTaskCorrect task /// Asynchronously awaits until this consumer stops or is faulted.
@@ -544,7 +544,7 @@ type BatchedConsumer private (inner : IConsumer, task : Task[] -> Async) = if List.isEmpty config.topics then invalidArg "config" "must specify at least one topic" log.Information("Consuming... {bootstrapServers} {topics} {groupId} autoOffsetReset={autoOffsetReset} fetchMaxBytes={fetchMaxB} maxInFlight={maxInFlightMb:n1}MiB maxBatchDelay={maxBatchDelay}s maxBatchSize={maxBatchSize}", - config.inner.BootstrapServers, config.topics, config.inner.GroupId, (let x = config.inner.AutoOffsetReset in x.Value), config.inner.FetchMaxBytes, + config.inner.BootstrapServers, Seq.ofList config.topics, config.inner.GroupId, (let x = config.inner.AutoOffsetReset in x.Value), config.inner.FetchMaxBytes, float config.buffering.maxInFlightBytes / 1024. / 1024., (let t = config.buffering.maxBatchDelay in t.TotalSeconds), config.buffering.maxBatchSize) let partitionedCollection = ConsumerImpl.PartitionedBlockingCollection>() let onRevoke (xs : seq) = diff --git a/src/FsKafka/FsKafka.fsproj b/src/FsKafka/FsKafka.fsproj index ef88dd3..811cd06 100644 --- a/src/FsKafka/FsKafka.fsproj +++ b/src/FsKafka/FsKafka.fsproj @@ -11,13 +11,13 @@ - + - - + + diff --git a/src/FsKafka/Monitor.fs b/src/FsKafka/Monitor.fs index 70fbf15..7be6996 100644 --- a/src/FsKafka/Monitor.fs +++ b/src/FsKafka/Monitor.fs @@ -5,7 +5,6 @@ namespace FsKafka open Confluent.Kafka open Serilog open System -open System.Diagnostics open System.Threading type PartitionResult = @@ -15,11 +14,6 @@ type PartitionResult = | Healthy module MonitorImpl = -#if NET461 - module Array = - let head xs = Seq.head xs - let last xs = Seq.last xs -#endif module private Map = let mergeChoice (f : 'a -> Choice<'b * 'c, 'b, 'c> -> 'd) (map1 : Map<'a, 'b>) (map2 : Map<'a, 'c>) : Map<'a, 'd> = Set.union (map1 |> Seq.map (fun k -> k.Key) |> set) (map2 |> Seq.map (fun k -> k.Key) |> set) @@ -147,25 +141,25 @@ module MonitorImpl = let buffer : 'A [] = Array.zeroCreate capacity let mutable head,tail,size = 0,-1,0 - member __.TryCopyFull() = + member _.TryCopyFull() = if size <> capacity then None else let arr = Array.zeroCreate size let mutable i = head for x = 0 to size - 1 do - arr.[x] <- buffer.[i % capacity] + arr[x] <- buffer[i % capacity] i <- i + 1 Some arr - member __.Add(x : 'A) = + member _.Add(x : 'A) = tail <- (tail + 1) % capacity - buffer.[tail] <- x + buffer[tail] <- x if (size < capacity) then size <- size + 1 else head <- (head + 1) % capacity - member __.Clear() = + member _.Clear() = head <- 0 tail <- -1 size <- 0 @@ -269,7 +263,7 @@ module MonitorImpl = onStatus topic group states } let rec loop failCount = async { - let sw = Stopwatch.StartNew() + let sw = System.Diagnostics.Stopwatch.StartNew() let! failCount = async { try if validateAssignments () then do! checkConsumerProgress() @@ -304,7 +298,7 @@ module MonitorImpl = match res with | Choice1Of3 (), _ -> () | Choice2Of3 (), errs -> - let lag = function (partitionId, ErrorPartitionStalled lag) -> Some (partitionId,lag) | x -> failwithf "mis-mapped %A" x + let lag = function partitionId, ErrorPartitionStalled lag -> Some (partitionId,lag) | x -> failwithf "mis-mapped %A" x log.Error("Monitoring... {topic}/{group} Stalled with backlogs on {@stalled} [(partition,lag)]", topic, group, errs |> Seq.choose lag) | Choice3Of3 (), warns -> log.Warning("Monitoring... {topic}/{group} Growing lags on {@partitionIds}", topic, group, warns |> Seq.map fst) @@ -327,11 +321,11 @@ module MonitorImpl = /// and then map that to a per-partition status for each partition that the consumer being observed has been assigned type KafkaMonitor<'k,'v> ( log : ILogger, - /// Interval between checks of high/low watermarks. Default 30s + // Interval between checks of high/low watermarks. Default 30s ?interval, - /// Number if readings per partition to use in order to make inferences. Default 10 (at default interval of 30s, implies a 5m window). + // Number if readings per partition to use in order to make inferences. Default 10 (at default interval of 30s, implies a 5m window). ?windowSize, - /// Number of failed calls to broker that should trigger discarding of buffered readings in order to avoid false positives. Default 3. + // Number of failed calls to broker that should trigger discarding of buffered readings in order to avoid false positives. Default 3. ?failResetCount) = let failResetCount = defaultArg failResetCount 3 let interval = defaultArg interval (TimeSpan.FromSeconds 30.) @@ -340,14 +334,14 @@ type KafkaMonitor<'k,'v> /// Periodically supplies the status for all assigned partitions (whenever we've gathered `windowSize` of readings) /// Subscriber can e.g. use this to force a consumer restart if no progress is being made - [] member __.OnStatus = onStatus.Publish + [] member _.OnStatus = onStatus.Publish /// Raised whenever call to broker to ascertain watermarks has failed /// Subscriber can e.g. raise an alert if enough consecutive failures have occurred - [] member __.OnCheckFailed = onCheckFailed.Publish + [] member _.OnCheckFailed = onCheckFailed.Publish // One of these runs per topic - member private __.Pump(consumer, topic, group) = async { + member private _.Pump(consumer, topic, group) = async { let! ct = Async.CancellationToken let onQuery res = MonitorImpl.Logging.logLatest log topic group res @@ -361,8 +355,8 @@ type KafkaMonitor<'k,'v> return! MonitorImpl.run consumer (interval,windowSize,failResetCount) topic group (onQuery,onCheckFailed,onStatus) } /// Commences a monitoring task per subscribed topic - member __.Start(target : IConsumer<'k,'v>, group) = + member m.Start(target : IConsumer<'k,'v>, group) = let cts = new CancellationTokenSource() for topic in target.Subscription do - Async.Start(__.Pump(target, topic, group), cts.Token) - { new IDisposable with member __.Dispose() = cts.Cancel() } + Async.Start(m.Pump(target, topic, group), cts.Token) + { new IDisposable with member _.Dispose() = cts.Cancel() } diff --git a/tests/FsKafka.Integration/FsKafka.Integration.fsproj b/tests/FsKafka.Integration/FsKafka.Integration.fsproj index 3e4a0db..b225128 100644 --- a/tests/FsKafka.Integration/FsKafka.Integration.fsproj +++ b/tests/FsKafka.Integration/FsKafka.Integration.fsproj @@ -2,7 +2,6 @@ net6.0 - false @@ -16,7 +15,8 @@ - + + diff --git a/tests/FsKafka.Integration/Integration.fs b/tests/FsKafka.Integration/Integration.fs index 7146366..215143b 100644 --- a/tests/FsKafka.Integration/Integration.fs +++ b/tests/FsKafka.Integration/Integration.fs @@ -9,7 +9,6 @@ open System open System.Collections.Concurrent open System.ComponentModel open System.Threading -open System.Threading.Tasks open Xunit module Config = @@ -35,11 +34,10 @@ module Helpers = formatter.Format(logEvent, writer); writer |> string |> testOutput.WriteLine writer |> string |> System.Diagnostics.Debug.WriteLine - interface Serilog.Core.ILogEventSink with member __.Emit logEvent = writeSerilogEvent logEvent + interface Serilog.Core.ILogEventSink with member _.Emit logEvent = writeSerilogEvent logEvent let createLogger sink = LoggerConfiguration() - .Destructure.FSharpTypes() .WriteTo.Sink(sink) .WriteTo.Seq("http://localhost:5341") .CreateLogger() @@ -49,15 +47,11 @@ module Helpers = | x when String.IsNullOrEmpty x -> invalidOp "missing environment variable 'TEST_KAFKA_BROKER'" | x -> Uri x |> Config.validateBrokerUri - let newId () = let g = System.Guid.NewGuid() in g.ToString("N") + let newId () = let g = Guid.NewGuid() in g.ToString("N") type Async with - // Unsafe in the name re https://github.com/dotnet/fsharp/issues/13165 - // Reasonable in the context of this codebase as we do not exceed 1200 computations - // When there's an in-the-box ParallelThrottled, or this codebase is known to target an FSharp.Core that does not suffer from this issue, - // the Unsafe part of the name (and this note) can be removed - static member ParallelThrottledUnsafe dop computations = + static member ParallelThrottled dop computations = Async.Parallel(computations, maxDegreeOfParallelism = dop) type BatchedConsumer with @@ -76,17 +70,13 @@ module Helpers = consumerId : int result : ConsumeResult payload : TestMessage - headers: MessageHeaders + headers : MessageHeaders } type ConsumerCallback = BatchedConsumer -> ConsumedTestMessage [] -> Async let headers = - #if !KAFKA0 seq ["kafka", [| 0xDEuy; 0xADuy; 0xBEuy; 0xEFuy |]] - #else - seq [] - #endif let runProducers log broker (topic : string) (numProducers : int) (messagesPerProducer : int) = async { let runProducer (producerId : int) = async { @@ -99,15 +89,11 @@ module Helpers = |> Seq.map (fun msgId -> let key = string msgId let value = JsonConvert.SerializeObject { producerId = producerId ; messageId = msgId } - #if KAFKA0 - key, value - #else key, value, headers - #endif ) |> Seq.chunkBySize 100 |> Seq.map producer.ProduceBatch - |> Async.ParallelThrottledUnsafe 7 + |> Async.ParallelThrottled 7 return Array.concat results } @@ -120,13 +106,9 @@ module Helpers = let deserialize result = let message = Binding.message result let headers = - #if !KAFKA0 match message.Headers with | null -> Seq.empty | h -> h |> Seq.map(fun h -> h.Key, h.GetValueBytes()) - #else - Seq.empty - #endif { consumerId = consumerId @@ -139,14 +121,14 @@ module Helpers = let consumerCell = ref None let rec getConsumer() = // avoid potential race conditions by polling - match !consumerCell with + match consumerCell.Value with | None -> Thread.SpinWait 20; getConsumer() | Some c -> c let partitionHandler batch = handler (getConsumer()) (Array.map deserialize batch) use consumer = BatchedConsumer.Start(log, config, partitionHandler) - consumerCell := Some consumer + consumerCell.Value <- Some consumer timeout |> Option.iter consumer.StopAfter @@ -158,7 +140,7 @@ module Helpers = type FactIfBroker() = inherit FactAttribute() - override __.Skip = if null <> Environment.GetEnvironmentVariable "TEST_KAFKA_BROKER" then null else "Skipping as no TEST_KAFKA_BROKER supplied" + override _.Skip = if null <> Environment.GetEnvironmentVariable "TEST_KAFKA_BROKER" then null else "Skipping as no TEST_KAFKA_BROKER supplied" type T1(testOutputHelper) = let log, broker = createLogger (TestOutputAdapter testOutputHelper), getTestBroker () @@ -258,7 +240,7 @@ type T2(testOutputHelper) = do! runConsumers log config 1 None (fun c b -> async { if Interlocked.Add(messageCount, b.Length) >= numMessages then c.Stop() }) - test <@ numMessages = !messageCount @> + test <@ numMessages = messageCount.Value @> let messageCount = ref 0 let groupId2 = newId() @@ -266,7 +248,7 @@ type T2(testOutputHelper) = do! runConsumers log config 1 None (fun c b -> async { if Interlocked.Add(messageCount, b.Length) >= numMessages then c.Stop() }) - test <@ numMessages = !messageCount @> + test <@ numMessages = messageCount.Value @> } let [] ``Spawning a new consumer with same consumer group id should not receive new messages`` () = async { @@ -284,7 +266,7 @@ type T2(testOutputHelper) = if Interlocked.Add(messageCount, b.Length) >= numMessages then c.StopAfter(TimeSpan.FromSeconds 1.) }) // cancel after 1 second to allow offsets to be stored - test <@ numMessages = !messageCount @> + test <@ numMessages = messageCount.Value @> // expected to read no messages from the subsequent consumer let messageCount = ref 0 @@ -292,7 +274,7 @@ type T2(testOutputHelper) = (fun c b -> async { if Interlocked.Add(messageCount, b.Length) >= numMessages then c.Stop() }) - test <@ 0 = !messageCount @> + test <@ 0 = messageCount.Value @> } // separated test type to allow the tests to run in parallel @@ -314,7 +296,7 @@ type T3(testOutputHelper) = if Interlocked.Add(messageCount, b.Length) >= numMessages then c.StopAfter(TimeSpan.FromSeconds 1.) }) // cancel after 1 second to allow offsets to be committed) - test <@ numMessages = !messageCount @> + test <@ numMessages = messageCount.Value @> let! _ = runProducers log broker topic 1 numMessages // produce more messages @@ -326,7 +308,7 @@ type T3(testOutputHelper) = if Interlocked.Add(messageCount, b.Length) >= numMessages then c.StopAfter(TimeSpan.FromSeconds 1.) }) // cancel after 1 second to allow offsets to be committed) - test <@ numMessages = !messageCount @> + test <@ numMessages = messageCount.Value @> } let [] ``Consumers should never schedule two batches of the same partition concurrently`` () = async { @@ -355,7 +337,7 @@ type T3(testOutputHelper) = do! runConsumers log config 1 None (fun c b -> async { - let partition = Binding.partitionValue b.[0].result.Partition + let partition = Binding.partitionValue b[0].result.Partition // check batch sizes are bounded by maxBatchSize test <@ b.Length <= maxBatchSize @> // "batch sizes should never exceed maxBatchSize") @@ -368,8 +350,8 @@ type T3(testOutputHelper) = // check for message monotonicity let offset = getPartitionOffset partition for msg in b do - Assert.True((let o = msg.result.Offset in o.Value) > !offset, "offset for partition should be monotonic") - offset := let o = msg.result.Offset in o.Value + Assert.True((let o = msg.result.Offset in o.Value) > offset.Value, "offset for partition should be monotonic") + offset.Value <- let o = msg.result.Offset in o.Value do! Async.Sleep 100 @@ -377,7 +359,7 @@ type T3(testOutputHelper) = if Interlocked.Add(globalMessageCount, b.Length) >= numMessages then c.Stop() }) - test <@ numMessages = !globalMessageCount @> + test <@ numMessages = globalMessageCount.Value @> } // separated test type to allow the tests to run in parallel @@ -450,7 +432,7 @@ type T4(testOutputHelper) = let! res = async { use consumer = BatchedConsumer.Start(log, consumerCfg, handle) consumer.StopAfter (TimeSpan.FromSeconds 20.) - use _ = FsKafka.KafkaMonitor(log).Start(consumer.Inner, consumerCfg.Inner.GroupId) + use _ = KafkaMonitor(log).Start(consumer.Inner, consumerCfg.Inner.GroupId) return! consumer.AwaitShutdown() |> Async.Catch }