Skip to content

Commit

Permalink
Move ParallelPropulsion to Core
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jun 1, 2019
1 parent 04fb569 commit 7f86ba7
Show file tree
Hide file tree
Showing 8 changed files with 13 additions and 11 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Added
### Changed

- Split reusable components of `ParallelConsumer` out into independent `Projection` and `Projection.Kafka` libraries [#34](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/34)
- Split reusable components of `ParallelConsumer` out into independent `Propulsion` and `Propulsion.Kafka` libraries [#34](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/34)

### Removed
### Fixed
Expand Down
2 changes: 2 additions & 0 deletions build.proj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

<Target Name="Pack">
<Exec Command="dotnet pack src/Jet.ConfluentKafka.FSharp $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion $(Cfg) $(PackOptions)" />
<Exec Command="dotnet pack src/Propulsion.Kafka $(Cfg) $(PackOptions)" />
</Target>

<Target Name="VSTest">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ open System.Threading
open System.Threading.Tasks

[<AutoOpen>]
module private Impl =
module private Helpers =

/// Maintains a Stopwatch used to drive a periodic loop, computing the remaining portion of the period per invocation
/// - `Some remainder` if the interval has time remaining
Expand Down Expand Up @@ -115,7 +115,7 @@ module KafkaIngestion =
/// Consumption pipeline that attempts to maximize concurrency of `handle` invocations (up to `dop` concurrently).
/// Consumes according to the `config` supplied to `Start`, until `Stop()` is requested or `handle` yields a fault.
/// Conclusion of processing can be awaited by via `AwaitCompletion()`.
type PipelinedConsumer private (inner : IConsumer<string, string>, task : Task<unit>, triggerStop) =
type ConsumerPipeline private (inner : IConsumer<string, string>, task : Task<unit>, triggerStop) =

interface IDisposable with member __.Dispose() = __.Stop()

Expand Down Expand Up @@ -176,7 +176,7 @@ type PipelinedConsumer private (inner : IConsumer<string, string>, task : Task<u
do! Async.AwaitTaskCorrect tcs.Task
}
let task = Async.StartAsTask machine
new PipelinedConsumer(consumer, task, triggerStop)
new ConsumerPipeline(consumer, task, triggerStop)

[<AbstractClass; Sealed>]
type ParallelConsumer private () =
Expand All @@ -198,7 +198,7 @@ type ParallelConsumer private () =
scheduler.Submit x
x.messages.Length
let submitter = Submission.SubmissionEngine(log, maxSubmissionsPerPartition, mapBatch, submitBatch, statsInterval, pumpInterval)
PipelinedConsumer.Start(log, config, mapResult, submitter.Ingest, submitter.Pump(), scheduler.Pump, dispatcher.Pump(), statsInterval)
ConsumerPipeline.Start(log, config, mapResult, submitter.Ingest, submitter.Pump(), scheduler.Pump, dispatcher.Pump(), statsInterval)

/// Builds a processing pipeline per the `config` running up to `dop` instances of `handle` concurrently to maximize global throughput across partitions.
/// Processor pumps until `handle` yields a `Choice2Of2` or `Stop()` is requested.
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Kafka/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ module private AsyncHelpers =
elif t.IsCanceled then ek (TaskCanceledException("Task wrapped with Async has been cancelled."))
elif t.IsCompleted then k ()
else ek(Exception "invalid Task state!"))
|> ignore
|> ignore
3 changes: 1 addition & 2 deletions src/Propulsion.Kafka/Propulsion.Kafka.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@

<ItemGroup>
<Compile Include="Infrastructure.fs" />
<Compile Include="ParallelPropulsion.fs" />
<Compile Include="PropulsionConfluentKafka.fs" />
<Compile Include="Consumers.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions src/Propulsion/Propulsion.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<ItemGroup>
<Compile Include="Infrastructure.fs" />
<Compile Include="Propulsion.fs" />
<Compile Include="Parallel.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ module Helpers =
|> Async.Parallel
}

type PipelinedConsumer with
type ConsumerPipeline with
member c.StopAfter(delay : TimeSpan) =
Task.Delay(delay).ContinueWith(fun (_:Task) -> c.Stop()) |> ignore

type TestMessage = { producerId : int ; messageId : int }
[<NoComparison; NoEquality>]
type ConsumedTestMessage = { consumerId : int ; raw : ConsumeResult<string,string> ; payload : TestMessage }
type ConsumerCallback = PipelinedConsumer -> ConsumedTestMessage -> Async<unit>
type ConsumerCallback = ConsumerPipeline -> ConsumedTestMessage -> Async<unit>

let runProducers log (broker : Uri) (topic : string) (numProducers : int) (messagesPerProducer : int) = async {
let runProducer (producerId : int) = async {
Expand Down Expand Up @@ -128,7 +128,7 @@ type T1(testOutputHelper) =
let groupId = newId()

let consumedBatches = new ConcurrentBag<ConsumedTestMessage>()
let consumerCallback (consumer:PipelinedConsumer) msg = async {
let consumerCallback (consumer:ConsumerPipeline) msg = async {
do consumedBatches.Add msg
let messageCount = consumedBatches |> Seq.length
// signal cancellation if consumed items reaches expected size
Expand Down

0 comments on commit 7f86ba7

Please sign in to comment.