Skip to content

Commit

Permalink
Add ProducerConfig.Create requestTimeoutMs
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed May 13, 2020
1 parent 4af8456 commit a6beaed
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

- `FsKafka0`: (Moved from [Propulsion.Kafka0](https://github.com/jet/propulsion/tree/ddbcf41072627b26c39ecc915cb747a71ce2a91d/src/Propulsion.Kafka0)) - Implementation of same API as FsKafka based on `Confluent.Kafka` v `0.11.3` [#51](https://github.com/jet/FsKafka/pull/51)
- `KafkaMonitor` based on [Burrow](https://github.com/linkedin/Burrow) (Moved from [Propulsion.Kafka](https://github.com/jet/propulsion/tree/ddbcf41072627b26c39ecc915cb747a71ce2a91d/src/Propulsion.Kafka) [jet/Propulsion #51](https://github.com/jet/FsKafka/pull/51) [#12](https://github.com/jet/propulsion/pull/12) :pray: [@jgardella](https://github.com/jgardella)
- `KafkaProducerConfig.Create`: `requestTimeoutMs` [#52](https://github.com/jet/FsKafka/pull/52)

### Changed

Expand All @@ -27,7 +28,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Removed

- Removed `Config.validateBrokerUri` [#51](https://github.com/jet/FsKafka/pull/51)
- `Config.validateBrokerUri` [#51](https://github.com/jet/FsKafka/pull/51)

### Fixed

Expand Down
3 changes: 3 additions & 0 deletions src/FsKafka/FsKafka.fs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type KafkaProducerConfig private (inner, bootstrapServers : string) =
?retryBackoff,
/// Statistics Interval. Default: no stats.
?statisticsInterval,
/// Ack timeout (assuming Acks != Acks.0). Confluent.Kafka default: 5s.
?requestTimeout,
/// Confluent.Kafka default: false. Defaults to true.
?socketKeepAlive,
/// Partition algorithm. Default: `ConsistentRandom`.
Expand All @@ -60,6 +62,7 @@ type KafkaProducerConfig private (inner, bootstrapServers : string) =
linger |> Option.iter<TimeSpan> (fun x -> c.LingerMs <- Nullable x.TotalMilliseconds) // default 0
partitioner |> Option.iter (fun x -> c.Partitioner <- Nullable x)
compression |> Option.iter (fun x -> c.CompressionType <- Nullable x)
requestTimeout |> Option.iter<TimeSpan> (fun x -> c.RequestTimeoutMs <- Nullable (int x.TotalMilliseconds))
statisticsInterval |> Option.iter<TimeSpan> (fun x -> c.StatisticsIntervalMs <- Nullable (int x.TotalMilliseconds))
custom |> Option.iter (fun xs -> for KeyValue (k,v) in xs do c.Set(k,v))
customize |> Option.iter (fun f -> f c)
Expand Down
3 changes: 3 additions & 0 deletions src/FsKafka0/ConfluentKafkaShims.fs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ module Config =
let linger = mkKey "linger.ms" id<int>
let messageSendRetries = mkKey "message.send.max.retries" id<int>
let partitioner = mkKey "partitioner" (function Partitioner.Random -> "random" | Partitioner.Consistent -> "consistent" | Partitioner.ConsistentRandom -> "consistent_random")
let requestTimeoutMs = mkKey "request.timeout.ms" id<int>

/// Config keys applying to Consumers
module Consumer =
Expand Down Expand Up @@ -86,6 +87,7 @@ type ProducerConfig() =
member val LingerMs = Nullable() with get, set
member val Partitioner = Nullable() with get, set
member val CompressionType = Nullable() with get, set
member val RequestTimeoutMs = Nullable() with get, set
member val StatisticsIntervalMs = Nullable() with get, set

member __.Render() : KeyValuePair<string,obj>[] =
Expand All @@ -100,6 +102,7 @@ type ProducerConfig() =
match __.LingerMs with Null -> () | HasValue v -> yield Config.Producer.linger ==> v
match __.Partitioner with Null -> () | HasValue v -> yield Config.Producer.partitioner ==> v
match __.CompressionType with Null -> () | HasValue v -> yield Config.Producer.compression ==> v
match __.RequestTimeoutMs with Null -> () | HasValue v -> yield Config.Producer.requestTimeoutMs ==> v
match __.StatisticsIntervalMs with Null -> () | HasValue v -> yield Config.statisticsInterval ==> v
yield! values |]

Expand Down
3 changes: 3 additions & 0 deletions src/FsKafka0/FsKafka.fs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type KafkaProducerConfig private (inner, bootstrapServers : string) =
?retries,
/// Backoff interval. Confluent.Kafka default: 100ms. Default: 1s.
?retryBackoff,
/// Ack timeout (assuming Acks != Acks.0). Confluent.Kafka default: 5s.
?requestTimeout,
/// Statistics Interval. Default: no stats.
?statisticsInterval,
/// Confluent.Kafka default: false. Defaults to true.
Expand All @@ -60,6 +62,7 @@ type KafkaProducerConfig private (inner, bootstrapServers : string) =
linger |> Option.iter<TimeSpan> (fun x -> c.LingerMs <- Nullable (int x.TotalMilliseconds)) // default 0
partitioner |> Option.iter (fun x -> c.Partitioner <- Nullable x)
compression |> Option.iter (fun x -> c.CompressionType <- Nullable x)
requestTimeout |> Option.iter<TimeSpan> (fun x -> c.RequestTimeoutMs <- Nullable (int x.TotalMilliseconds))
statisticsInterval |> Option.iter<TimeSpan> (fun x -> c.StatisticsIntervalMs <- Nullable (int x.TotalMilliseconds))
custom |> Option.iter (fun xs -> for KeyValue (k,v) in xs do c.Set(k,v))
customize |> Option.iter (fun f -> f c)
Expand Down

0 comments on commit a6beaed

Please sign in to comment.