Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove legacy Kiba::Runner #96

Merged
merged 5 commits into from
Mar 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Changes.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
HEAD
----

- Breaking: if your jobs use Kiba's "legacy runner" via `config :kiba, runner: Kiba::Runner`, be aware that this legacy runner has been removed in [#96](https://github.com/thbar/kiba/pull/96). The upgrade path is to remove this config line and let Kiba use the more modern `Kiba::StreamingRunner`, which is the default anyway since Kiba v3.0.0 (see [#83](https://github.com/thbar/kiba/pull/83) for context) and is normally fully backward-compatible.

3.6.0
-----

Expand Down
1 change: 0 additions & 1 deletion lib/kiba.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
require 'kiba/control'
require 'kiba/context'
require 'kiba/parser'
require 'kiba/runner'
require 'kiba/streaming_runner'
require 'kiba/dsl_extensions/config'

Expand Down
74 changes: 0 additions & 74 deletions lib/kiba/runner.rb

This file was deleted.

57 changes: 55 additions & 2 deletions lib/kiba/streaming_runner.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,37 @@
module Kiba
module StreamingRunner
include Runner
extend self


# allow to handle a block form just like a regular transform
class AliasingProc < Proc
alias_method :process, :call
end

def run(control)
run_pre_processes(control)
process_rows(
to_instances(control.sources),
to_instances(control.transforms, true),
destinations = to_instances(control.destinations)
)
close_destinations(destinations)
run_post_processes(control)
end

def run_pre_processes(control)
to_instances(control.pre_processes, true, false).each(&:call)
end

def run_post_processes(control)
to_instances(control.post_processes, true, false).each(&:call)
end

def close_destinations(destinations)
destinations
.find_all { |d| d.respond_to?(:close) }
.each(&:close)
end

def transform_stream(stream, t)
Enumerator.new do |y|
stream.each do |input_row|
Expand Down Expand Up @@ -34,5 +63,29 @@ def process_rows(sources, transforms, destinations)
destinations.each { |d| d.write(r) }
end
end

# not using keyword args because JRuby defaults to 1.9 syntax currently
def to_instances(definitions, allow_block = false, allow_class = true)
definitions.map do |definition|
to_instance(
*definition.values_at(:klass, :args, :block),
allow_block, allow_class
)
end
end

def to_instance(klass, args, block, allow_block, allow_class)
if klass && block
fail 'Class and block form cannot be used together at the moment'
elsif klass
fail 'Class form is not allowed here' unless allow_class
klass.new(*args)
elsif block
fail 'Block form is not allowed here' unless allow_block
AliasingProc.new(&block)
else
fail 'Nil parameters not allowed here'
end
end
end
end
11 changes: 0 additions & 11 deletions test/test_runner.rb

This file was deleted.