diff --git a/Changes.md b/Changes.md index d995d27..4d401f2 100644 --- a/Changes.md +++ b/Changes.md @@ -1,6 +1,8 @@ HEAD ---- +- Breaking: if your jobs use Kiba's "legacy runner" via `config :kiba, runner: Kiba::Runner`, be aware that this legacy runner has been removed in [#96](https://github.com/thbar/kiba/pull/96). The upgrade path is to remove this config line and let Kiba use the more modern `Kiba::StreamingRunner`, which is the default anyway since Kiba v3.0.0 (see [#83](https://github.com/thbar/kiba/pull/83) for context) and is normally fully backward-compatible. + 3.6.0 ----- diff --git a/lib/kiba.rb b/lib/kiba.rb index 6671ecf..a9a619d 100644 --- a/lib/kiba.rb +++ b/lib/kiba.rb @@ -4,7 +4,6 @@ require 'kiba/control' require 'kiba/context' require 'kiba/parser' -require 'kiba/runner' require 'kiba/streaming_runner' require 'kiba/dsl_extensions/config' diff --git a/lib/kiba/runner.rb b/lib/kiba/runner.rb deleted file mode 100644 index 4c9000a..0000000 --- a/lib/kiba/runner.rb +++ /dev/null @@ -1,74 +0,0 @@ -module Kiba - module Runner - extend self - - # allow to handle a block form just like a regular transform - class AliasingProc < Proc - alias_method :process, :call - end - - def run(control) - run_pre_processes(control) - process_rows( - to_instances(control.sources), - to_instances(control.transforms, true), - destinations = to_instances(control.destinations) - ) - close_destinations(destinations) - run_post_processes(control) - end - - def run_pre_processes(control) - to_instances(control.pre_processes, true, false).each(&:call) - end - - def run_post_processes(control) - to_instances(control.post_processes, true, false).each(&:call) - end - - def close_destinations(destinations) - destinations - .find_all { |d| d.respond_to?(:close) } - .each(&:close) - end - - def process_rows(sources, transforms, destinations) - sources.each do |source| - source.each do |row| - transforms.each do |transform| - row = transform.process(row) - break unless row - end - next unless row - destinations.each do |destination| - destination.write(row) - end - end - end - end - - # not using keyword args because JRuby defaults to 1.9 syntax currently - def to_instances(definitions, allow_block = false, allow_class = true) - definitions.map do |definition| - to_instance( - *definition.values_at(:klass, :args, :block), - allow_block, allow_class - ) - end - end - - def to_instance(klass, args, block, allow_block, allow_class) - if klass && block - fail 'Class and block form cannot be used together at the moment' - elsif klass - fail 'Class form is not allowed here' unless allow_class - klass.new(*args) - elsif block - fail 'Block form is not allowed here' unless allow_block - AliasingProc.new(&block) - else - fail 'Nil parameters not allowed here' - end - end - end -end diff --git a/lib/kiba/streaming_runner.rb b/lib/kiba/streaming_runner.rb index 943fbc9..b574804 100644 --- a/lib/kiba/streaming_runner.rb +++ b/lib/kiba/streaming_runner.rb @@ -1,8 +1,37 @@ module Kiba module StreamingRunner - include Runner extend self - + + # allow to handle a block form just like a regular transform + class AliasingProc < Proc + alias_method :process, :call + end + + def run(control) + run_pre_processes(control) + process_rows( + to_instances(control.sources), + to_instances(control.transforms, true), + destinations = to_instances(control.destinations) + ) + close_destinations(destinations) + run_post_processes(control) + end + + def run_pre_processes(control) + to_instances(control.pre_processes, true, false).each(&:call) + end + + def run_post_processes(control) + to_instances(control.post_processes, true, false).each(&:call) + end + + def close_destinations(destinations) + destinations + .find_all { |d| d.respond_to?(:close) } + .each(&:close) + end + def transform_stream(stream, t) Enumerator.new do |y| stream.each do |input_row| @@ -34,5 +63,29 @@ def process_rows(sources, transforms, destinations) destinations.each { |d| d.write(r) } end end + + # not using keyword args because JRuby defaults to 1.9 syntax currently + def to_instances(definitions, allow_block = false, allow_class = true) + definitions.map do |definition| + to_instance( + *definition.values_at(:klass, :args, :block), + allow_block, allow_class + ) + end + end + + def to_instance(klass, args, block, allow_block, allow_class) + if klass && block + fail 'Class and block form cannot be used together at the moment' + elsif klass + fail 'Class form is not allowed here' unless allow_class + klass.new(*args) + elsif block + fail 'Block form is not allowed here' unless allow_block + AliasingProc.new(&block) + else + fail 'Nil parameters not allowed here' + end + end end end \ No newline at end of file diff --git a/test/test_runner.rb b/test/test_runner.rb deleted file mode 100644 index 33fb03e..0000000 --- a/test/test_runner.rb +++ /dev/null @@ -1,11 +0,0 @@ -require_relative 'helper' -require_relative 'shared_runner_tests' - -class TestRunner < Kiba::Test - def kiba_run(job) - job.config[:kiba] = {runner: Kiba::Runner} - Kiba.run(job) - end - - include SharedRunnerTests -end