diff --git a/CHANGELOG.md b/CHANGELOG.md index c4a542d..f323ba7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - `FsKafka0`: (Moved from [Propulsion.Kafka0](https://github.com/jet/propulsion/tree/ddbcf41072627b26c39ecc915cb747a71ce2a91d/src/Propulsion.Kafka0)) - Implementation of same API as FsKafka based on `Confluent.Kafka` v `0.11.3` [#51](https://github.com/jet/FsKafka/pull/51) - `KafkaMonitor` based on [Burrow](https://github.com/linkedin/Burrow) (Moved from [Propulsion.Kafka](https://github.com/jet/propulsion/tree/ddbcf41072627b26c39ecc915cb747a71ce2a91d/src/Propulsion.Kafka) [jet/Propulsion #51](https://github.com/jet/FsKafka/pull/51) [#12](https://github.com/jet/propulsion/pull/12) :pray: [@jgardella](https://github.com/jgardella) +- `KafkaProducerConfig.Create`: `requestTimeoutMs` [#52](https://github.com/jet/FsKafka/pull/52) ### Changed @@ -27,7 +28,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Removed -- Removed `Config.validateBrokerUri` [#51](https://github.com/jet/FsKafka/pull/51) +- `Config.validateBrokerUri` [#51](https://github.com/jet/FsKafka/pull/51) ### Fixed diff --git a/src/FsKafka/FsKafka.fs b/src/FsKafka/FsKafka.fs index 6a688f4..4cc61fd 100644 --- a/src/FsKafka/FsKafka.fs +++ b/src/FsKafka/FsKafka.fs @@ -37,6 +37,8 @@ type KafkaProducerConfig private (inner, bootstrapServers : string) = ?retryBackoff, /// Statistics Interval. Default: no stats. ?statisticsInterval, + /// Ack timeout (assuming Acks != Acks.0). Confluent.Kafka default: 5s. + ?requestTimeout, /// Confluent.Kafka default: false. Defaults to true. ?socketKeepAlive, /// Partition algorithm. Default: `ConsistentRandom`. @@ -60,6 +62,7 @@ type KafkaProducerConfig private (inner, bootstrapServers : string) = linger |> Option.iter (fun x -> c.LingerMs <- Nullable x.TotalMilliseconds) // default 0 partitioner |> Option.iter (fun x -> c.Partitioner <- Nullable x) compression |> Option.iter (fun x -> c.CompressionType <- Nullable x) + requestTimeout |> Option.iter (fun x -> c.RequestTimeoutMs <- Nullable (int x.TotalMilliseconds)) statisticsInterval |> Option.iter (fun x -> c.StatisticsIntervalMs <- Nullable (int x.TotalMilliseconds)) custom |> Option.iter (fun xs -> for KeyValue (k,v) in xs do c.Set(k,v)) customize |> Option.iter (fun f -> f c) diff --git a/src/FsKafka0/ConfluentKafkaShims.fs b/src/FsKafka0/ConfluentKafkaShims.fs index 0724b3d..89a050e 100644 --- a/src/FsKafka0/ConfluentKafkaShims.fs +++ b/src/FsKafka0/ConfluentKafkaShims.fs @@ -52,6 +52,7 @@ module Config = let linger = mkKey "linger.ms" id let messageSendRetries = mkKey "message.send.max.retries" id let partitioner = mkKey "partitioner" (function Partitioner.Random -> "random" | Partitioner.Consistent -> "consistent" | Partitioner.ConsistentRandom -> "consistent_random") + let requestTimeoutMs = mkKey "request.timeout.ms" id /// Config keys applying to Consumers module Consumer = @@ -86,6 +87,7 @@ type ProducerConfig() = member val LingerMs = Nullable() with get, set member val Partitioner = Nullable() with get, set member val CompressionType = Nullable() with get, set + member val RequestTimeoutMs = Nullable() with get, set member val StatisticsIntervalMs = Nullable() with get, set member __.Render() : KeyValuePair[] = @@ -100,6 +102,7 @@ type ProducerConfig() = match __.LingerMs with Null -> () | HasValue v -> yield Config.Producer.linger ==> v match __.Partitioner with Null -> () | HasValue v -> yield Config.Producer.partitioner ==> v match __.CompressionType with Null -> () | HasValue v -> yield Config.Producer.compression ==> v + match __.RequestTimeoutMs with Null -> () | HasValue v -> yield Config.Producer.requestTimeoutMs ==> v match __.StatisticsIntervalMs with Null -> () | HasValue v -> yield Config.statisticsInterval ==> v yield! values |] diff --git a/src/FsKafka0/FsKafka.fs b/src/FsKafka0/FsKafka.fs index 2e234a5..7478354 100644 --- a/src/FsKafka0/FsKafka.fs +++ b/src/FsKafka0/FsKafka.fs @@ -35,6 +35,8 @@ type KafkaProducerConfig private (inner, bootstrapServers : string) = ?retries, /// Backoff interval. Confluent.Kafka default: 100ms. Default: 1s. ?retryBackoff, + /// Ack timeout (assuming Acks != Acks.0). Confluent.Kafka default: 5s. + ?requestTimeout, /// Statistics Interval. Default: no stats. ?statisticsInterval, /// Confluent.Kafka default: false. Defaults to true. @@ -60,6 +62,7 @@ type KafkaProducerConfig private (inner, bootstrapServers : string) = linger |> Option.iter (fun x -> c.LingerMs <- Nullable (int x.TotalMilliseconds)) // default 0 partitioner |> Option.iter (fun x -> c.Partitioner <- Nullable x) compression |> Option.iter (fun x -> c.CompressionType <- Nullable x) + requestTimeout |> Option.iter (fun x -> c.RequestTimeoutMs <- Nullable (int x.TotalMilliseconds)) statisticsInterval |> Option.iter (fun x -> c.StatisticsIntervalMs <- Nullable (int x.TotalMilliseconds)) custom |> Option.iter (fun xs -> for KeyValue (k,v) in xs do c.Set(k,v)) customize |> Option.iter (fun f -> f c)