Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StreamsProducer + StreamsConsumer #35

Merged
merged 1 commit into from
Jun 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- Add `StreamsConsumer` and `StreamsProducer` [#35](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/35)

### Changed

- Split reusable components of `ParallelConsumer` out into independent `Propulsion` and `Propulsion.Kafka` libraries [#34](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/34)
Expand Down
7 changes: 7 additions & 0 deletions Jet.ConfluentKafka.FSharp.sln
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Propulsion", "src\Propulsio
EndProject
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Propulsion.Kafka", "src\Propulsion.Kafka\Propulsion.Kafka.fsproj", "{5F176C54-609B-4D2E-804B-3C1F60ADDAF4}"
EndProject
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Propulsion.Tests", "tests\Propulsion.Tests\Propulsion.Tests.fsproj", "{BBD50DA2-7932-4D24-888D-C168F4788705}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -59,6 +61,10 @@ Global
{5F176C54-609B-4D2E-804B-3C1F60ADDAF4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5F176C54-609B-4D2E-804B-3C1F60ADDAF4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5F176C54-609B-4D2E-804B-3C1F60ADDAF4}.Release|Any CPU.Build.0 = Release|Any CPU
{BBD50DA2-7932-4D24-888D-C168F4788705}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BBD50DA2-7932-4D24-888D-C168F4788705}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BBD50DA2-7932-4D24-888D-C168F4788705}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BBD50DA2-7932-4D24-888D-C168F4788705}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -68,6 +74,7 @@ Global
{76802BE3-00C2-4B1D-96A2-95A3E2136DBE} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898}
{0F72360F-1C14-46E3-9A60-B6BF87BD726D} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898}
{5F176C54-609B-4D2E-804B-3C1F60ADDAF4} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898}
{BBD50DA2-7932-4D24-888D-C168F4788705} = {302B09C4-7F38-4CF7-93B9-1B7A6035386E}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DF04AF73-7412-46E5-9CC8-15CB48E3139A}
Expand Down
1 change: 1 addition & 0 deletions build.proj
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

<Target Name="VSTest">
<Exec Command="dotnet test tests/Jet.ConfluentKafka.FSharp.Integration $(Cfg) $(TestOptions)" />
<Exec Command="dotnet test tests/Propulsion.Tests $(Cfg) $(TestOptions)" />
</Target>

<Target Name="Build" DependsOnTargets="Pack;VSTest" />
Expand Down
35 changes: 34 additions & 1 deletion src/Propulsion.Kafka/Consumers.fs
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,37 @@ type ParallelConsumer private () =
?maxSubmissionsPerPartition, ?pumpInterval, ?statsInterval, ?logExternalStats) =
let mapConsumeResult (x : ConsumeResult<string,string>) = KeyValuePair(x.Key, x.Value)
ParallelConsumer.Start<KeyValuePair<string,string>>(log, config, maxDop, mapConsumeResult, handle >> Async.Catch,
?maxSubmissionsPerPartition=maxSubmissionsPerPartition, ?pumpInterval=pumpInterval, ?statsInterval=statsInterval, ?logExternalStats=logExternalStats)
?maxSubmissionsPerPartition=maxSubmissionsPerPartition, ?pumpInterval=pumpInterval, ?statsInterval=statsInterval, ?logExternalStats=logExternalStats)

type StreamsConsumer =
/// Builds a processing pipeline per the `config` running up to `dop` instances of `handle` concurrently to maximize global throughput across partitions.
/// Processor pumps until `handle` yields a `Choice2Of2` or `Stop()` is requested.
static member Start<'M>
( log : ILogger, config : Jet.ConfluentKafka.FSharp.KafkaConsumerConfig, maxDop, enumStreamItems, handle, categorize,
?maxSubmissionsPerPartition, ?pumpInterval, ?statsInterval, ?stateInterval, ?logExternalStats) =
let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)
let pumpInterval = defaultArg pumpInterval (TimeSpan.FromMilliseconds 5.)
let maxSubmissionsPerPartition = defaultArg maxSubmissionsPerPartition 5

let dispatcher = Streams.Scheduling.Dispatcher<_> maxDop
let stats = Streams.Scheduling.Stats(log, statsInterval, stateInterval)
let dumpStreams (streams : Streams.Scheduling.StreamStates<_>) log =
logExternalStats |> Option.iter (fun f -> f log)
streams.Dump(log, Streams.Buffering.StreamState.eventsSize, categorize)
let streamsScheduler = Streams.Scheduling.StreamSchedulingEngine.Create(dispatcher, stats, handle, dumpStreams)
let mapConsumedMessagesToStreamsBatch onCompletion (x : Submission.SubmissionBatch<KeyValuePair<string,string>>) : Streams.Scheduling.StreamsBatch<_> =
let onCompletion () = x.onCompletion(); onCompletion()
Streams.Scheduling.StreamsBatch.Create(onCompletion, Seq.collect enumStreamItems x.messages) |> fst
let tryCompactQueue (queue : Queue<Streams.Scheduling.StreamsBatch<_>>) =
let mutable acc, worked = None, false
for x in queue do
match acc with
| None -> acc <- Some x
| Some a -> if a.TryMerge x then worked <- true
worked
let submitStreamsBatch (x : Streams.Scheduling.StreamsBatch<_>) : int =
streamsScheduler.Submit x
x.RemainingStreamsCount
let submitter = Submission.SubmissionEngine(log, maxSubmissionsPerPartition, mapConsumedMessagesToStreamsBatch, submitStreamsBatch, statsInterval, pumpInterval, tryCompactQueue)
let mapResult (x : Confluent.Kafka.ConsumeResult<string,string>) = KeyValuePair(x.Key,x.Value)
ConsumerPipeline.Start(log, config, mapResult, submitter.Ingest, submitter.Pump(), streamsScheduler.Pump, dispatcher.Pump(), statsInterval)
31 changes: 31 additions & 0 deletions src/Propulsion.Kafka/Producers.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
namespace Propulsion.Kafka

open Confluent.Kafka
open Jet.ConfluentKafka.FSharp
open Serilog
open System

type StreamsProducer =
static member Start(log : ILogger, maxReadAhead, maxConcurrentStreams, clientId, broker, topic, render, categorize, ?statsInterval, ?stateInterval)
: Propulsion.ProjectorPipeline<_> =
let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)
let projectionAndKafkaStats = Propulsion.Streams.Projector.Stats(log.ForContext<Propulsion.Streams.Projector.Stats>(), categorize, statsInterval, stateInterval)
let producerConfig = KafkaProducerConfig.Create(clientId, broker, Acks.Leader, compression = CompressionType.Lz4, maxInFlight = 1_000_000, linger = TimeSpan.Zero)
let producer = KafkaProducer.Create(log, producerConfig, topic)
let attemptWrite (_writePos,stream,fullBuffer : Propulsion.Streams.StreamSpan<_>) = async {
let maxEvents, maxBytes = 16384, 1_000_000 - (*fudge*)4096
let ((eventCount,_) as stats), span = Propulsion.Streams.Buffering.Span.slice (maxEvents,maxBytes) fullBuffer
let spanJson = render (stream, span)
try let! _res = producer.ProduceAsync(stream,spanJson)
let res = ()
return Choice1Of2 (span.index + int64 eventCount,stats,res)
with e -> return Choice2Of2 (stats,e) }
let interpretWriteResultProgress _streams _stream = function
| Choice1Of2 (i',_, _) -> Some i'
| Choice2Of2 (_,_) -> None
let dispatcher = Propulsion.Streams.Scheduling.Dispatcher<_>(maxConcurrentStreams)
let streamScheduler =
Propulsion.Streams.Scheduling.StreamSchedulingEngine<_,_>(
dispatcher, projectionAndKafkaStats, attemptWrite, interpretWriteResultProgress,
fun s l -> s.Dump(l, Propulsion.Streams.Buffering.StreamState.eventsSize, categorize))
Propulsion.Streams.Projector.StreamsProjectorPipeline.Start(log, dispatcher.Pump(), streamScheduler.Pump, maxReadAhead, streamScheduler.Submit, statsInterval)
1 change: 1 addition & 0 deletions src/Propulsion.Kafka/Propulsion.Kafka.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<ItemGroup>
<Compile Include="Infrastructure.fs" />
<Compile Include="Consumers.fs" />
<Compile Include="Producers.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
3 changes: 3 additions & 0 deletions src/Propulsion/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ open System.Threading.Tasks
module Array =
let takeWhile p = Seq.takeWhile p >> Array.ofSeq
let head = Seq.head

module Option =
let toNullable option = match option with None -> System.Nullable() | Some v -> System.Nullable(v)
#endif

[<AutoOpen>]
Expand Down
157 changes: 157 additions & 0 deletions src/Propulsion/Ingestion.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
module Propulsion.Ingestion

open Serilog
open System
open System.Collections.Concurrent
open System.Threading

/// Manages writing of progress
/// - Each write attempt is always of the newest token (each update is assumed to also count for all preceding ones)
/// - retries until success or a new item is posted
type ProgressWriter<'Res when 'Res: equality>(?period,?sleep) =
let sleepSpan = defaultArg sleep (TimeSpan.FromMilliseconds 100.)
let writeInterval,sleepPeriod = defaultArg period (TimeSpan.FromSeconds 5.), int sleepSpan.TotalMilliseconds
let due = intervalCheck writeInterval
let mutable committedEpoch = None
let mutable validatedPos = None
let result = Event<Choice<'Res,exn>>()
[<CLIEvent>] member __.Result = result.Publish
member __.Post(version,f) =
Volatile.Write(&validatedPos,Some (version,f))
member __.CommittedEpoch = Volatile.Read(&committedEpoch)
member __.Pump() = async {
let! ct = Async.CancellationToken
while not ct.IsCancellationRequested do
match Volatile.Read &validatedPos with
| Some (v,f) when Volatile.Read(&committedEpoch) <> Some v && due () ->
try do! f
Volatile.Write(&committedEpoch, Some v)
result.Trigger (Choice1Of2 v)
with e -> result.Trigger (Choice2Of2 e)
| _ -> do! Async.Sleep sleepPeriod }

[<NoComparison; NoEquality>]
type private InternalMessage =
/// Confirmed completion of a batch
| Validated of epoch: int64
/// Result from updating of Progress to backing store - processed up to nominated `epoch` or threw `exn`
| ProgressResult of Choice<int64,exn>
/// Internal message for stats purposes
| Added of streams: int * events: int

type private Stats(log : ILogger, statsInterval : TimeSpan) =
let mutable validatedEpoch, comittedEpoch : int64 option * int64 option = None, None
let progCommitFails, progCommits = ref 0, ref 0
let cycles, batchesPended, streamsPended, eventsPended = ref 0, ref 0, ref 0, ref 0
let statsDue = intervalCheck statsInterval
let dumpStats (activeReads, maxReads) =
log.Information("Buffering Cycles {cycles} Ingested {batches} ({streams:n0}s {events:n0}e)", !cycles, !batchesPended, !streamsPended, !eventsPended)
cycles := 0; batchesPended := 0; streamsPended := 0; eventsPended := 0
if !progCommitFails <> 0 || !progCommits <> 0 then
match comittedEpoch with
| None ->
log.Error("Uncommitted {activeReads}/{maxReads} @ {validated}; writing failing: {failures} failures ({commits} successful commits)",
activeReads, maxReads, Option.toNullable validatedEpoch, !progCommitFails, !progCommits)
| Some committed when !progCommitFails <> 0 ->
log.Warning("Uncommitted {activeReads}/{maxReads} @ {validated} (committed: {committed}, {commits} commits, {failures} failures)",
activeReads, maxReads, Option.toNullable validatedEpoch, committed, !progCommits, !progCommitFails)
| Some committed ->
log.Information("Uncommitted {activeReads}/{maxReads} @ {validated} (committed: {committed}, {commits} commits)",
activeReads, maxReads, Option.toNullable validatedEpoch, committed, !progCommits)
progCommits := 0; progCommitFails := 0
else
log.Information("Uncommitted {activeReads}/{maxReads} @ {validated} (committed: {committed})",
activeReads, maxReads, Option.toNullable validatedEpoch, Option.toNullable comittedEpoch)
member __.Handle : InternalMessage -> unit = function
| Validated epoch ->
validatedEpoch <- Some epoch
| ProgressResult (Choice1Of2 epoch) ->
incr progCommits
comittedEpoch <- Some epoch
| ProgressResult (Choice2Of2 (_exn : exn)) ->
incr progCommitFails
| Added (streams,events) ->
incr batchesPended
streamsPended := !streamsPended + streams
eventsPended := !eventsPended + events
member __.TryDump(readState) =
incr cycles
let due = statsDue ()
if due then dumpStats readState
due

type Sem(max) =
let inner = new SemaphoreSlim(max)
member __.Await(ct : CancellationToken) = inner.WaitAsync(ct) |> Async.AwaitTaskCorrect
member __.Release() = inner.Release() |> ignore
member __.State = max-inner.CurrentCount,max

/// Buffers items read from a range, unpacking them out of band from the reading so that can overlap
/// On completion of the unpacking, they get submitted onward to the Submitter which will buffer them for us
type Ingester<'Items,'Batch> private
( log : ILogger, stats : Stats, maxRead, sleepInterval : TimeSpan,
makeBatch : (unit->unit) -> 'Items -> ('Batch * (int * int)),
submit : 'Batch -> unit,
cts : CancellationTokenSource) =
let sleepInterval = int sleepInterval.TotalMilliseconds
let maxRead = Sem maxRead
let incoming = new ConcurrentQueue<_>()
let messages = new ConcurrentQueue<InternalMessage>()
let tryDequeue (x : ConcurrentQueue<_>) =
let mutable tmp = Unchecked.defaultof<_>
if x.TryDequeue &tmp then Some tmp
else None
let progressWriter = ProgressWriter<_>()
let rec tryIncoming () =
match tryDequeue incoming with
| None -> false
| Some (epoch,checkpoint,items) ->
let markCompleted () =
maxRead.Release()
messages.Enqueue (Validated epoch)
progressWriter.Post(epoch,checkpoint)
let batch,(streamCount, itemCount) = makeBatch markCompleted items
submit batch
messages.Enqueue(Added (streamCount,itemCount))
true
let rec tryHandle () =
match tryDequeue messages with
| None -> false
| Some x ->
stats.Handle x
true

member private __.Pump() = async {
let! ct = Async.CancellationToken
use _ = progressWriter.Result.Subscribe(ProgressResult >> messages.Enqueue)
Async.Start(progressWriter.Pump(), ct)
while not ct.IsCancellationRequested do
// arguably the impl should be submitting while unpacking but
// - maintaining consistency between incoming order and submit order is required
// - in general maxRead will be double maxSubmit so this will only be relevant in catchup situations
try let worked = tryHandle () || tryIncoming () || stats.TryDump(maxRead.State)
if not worked then do! Async.Sleep sleepInterval
with e -> log.Error(e, "Ingester exception") }

/// Starts an independent Task which handles
/// a) `unpack`ing of `incoming` items
/// b) `submit`ting them onward (assuming there is capacity within the `readLimit`)
static member Start<'Item>(log, maxRead, makeBatch, submit, ?statsInterval, ?sleepInterval) =
let maxWait, statsInterval = defaultArg sleepInterval (TimeSpan.FromMilliseconds 5.), defaultArg statsInterval (TimeSpan.FromMinutes 5.)
let cts = new CancellationTokenSource()
let stats = Stats(log, statsInterval)
let instance = Ingester<_,_>(log, stats, maxRead, maxWait, makeBatch, submit, cts)
Async.Start(instance.Pump(), cts.Token)
instance

/// Submits a batch as read for unpacking and submission; will only return after the in-flight reads drops below the limit
/// Returns (reads in flight,maximum reads in flight)
member __.Submit(epoch, checkpoint, items) = async {
// If we've read it, feed it into the queue for unpacking
incoming.Enqueue (epoch, checkpoint, items)
// ... but we might hold off on yielding if we're at capacity
do! maxRead.Await(cts.Token)
return maxRead.State }

/// As range assignments get revoked, a user is expected to `Stop `the active processing thread for the Ingester before releasing references to it
member __.Stop() = cts.Cancel()
55 changes: 55 additions & 0 deletions src/Propulsion/Projector.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
namespace Propulsion

open Serilog
open System
open System.Threading
open System.Threading.Tasks

type ProjectorPipeline<'Ingester> private (task : Task<unit>, triggerStop, startIngester) =

interface IDisposable with member __.Dispose() = __.Stop()

member __.StartIngester(rangeLog : ILogger) : 'Ingester = startIngester rangeLog

/// Inspects current status of processing task
member __.Status = task.Status
/// After AwaitCompletion, can be used to infer whether exit was clean
member __.RanToCompletion = task.Status = TaskStatus.RanToCompletion

/// Request cancellation of processing
member __.Stop() = triggerStop ()

/// Asynchronously awaits until consumer stops or a `handle` invocation yields a fault
member __.AwaitCompletion() = Async.AwaitTaskCorrect task

static member Start(log : Serilog.ILogger, pumpDispatcher, pumpScheduler, pumpSubmitter, startIngester) =
let cts = new CancellationTokenSource()
let ct = cts.Token
let tcs = new TaskCompletionSource<unit>()
let start name f =
let wrap (name : string) computation = async {
try do! computation
log.Information("Exiting {name}", name)
with e -> log.Fatal(e, "Abend from {name}", name) }
Async.Start(wrap name f, ct)
// if scheduler encounters a faulted handler, we propagate that as the consumer's Result
let abend (exns : AggregateException) =
if tcs.TrySetException(exns) then log.Warning(exns, "Cancelling processing due to {count} faulted handlers", exns.InnerExceptions.Count)
else log.Information("Failed setting {count} exceptions", exns.InnerExceptions.Count)
// NB cancel needs to be after TSE or the Register(TSE) will win
cts.Cancel()
let machine = async {
// external cancellation should yield a success result
use _ = ct.Register(fun _ -> tcs.TrySetResult () |> ignore)
start "dispatcher" <| pumpDispatcher
// ... fault results from dispatched tasks result in the `machine` concluding with an exception
start "scheduler" <| pumpScheduler abend
start "submitter" <| pumpSubmitter

// await for either handler-driven abend or external cancellation via Stop()
do! Async.AwaitTaskCorrect tcs.Task }
let task = Async.StartAsTask machine
let triggerStop () =
log.Information("Stopping")
cts.Cancel();
new ProjectorPipeline<_>(task, triggerStop, startIngester)
4 changes: 4 additions & 0 deletions src/Propulsion/Propulsion.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
<ItemGroup>
<Compile Include="Infrastructure.fs" />
<Compile Include="Propulsion.fs" />
<Compile Include="Ingestion.fs" />
<Compile Include="Projector.fs" />
<Compile Include="Parallel.fs" />
<Compile Include="Parallel.fs" />
<Compile Include="Streams.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
Loading