diff --git a/lib/shoryuken/cli.rb b/lib/shoryuken/cli.rb index 7a53d1a1..fb48f698 100644 --- a/lib/shoryuken/cli.rb +++ b/lib/shoryuken/cli.rb @@ -43,7 +43,7 @@ def run(args) @launcher = Shoryuken::Launcher.new - if callback = Shoryuken.start_callback + if (callback = Shoryuken.start_callback) logger.info { 'Calling Shoryuken.on_start block' } callback.call end @@ -103,11 +103,9 @@ def daemonize(options) end def write_pid(options) - if (path = options[:pidfile]) - File.open(path, 'w') do |f| - f.puts Process.pid - end - end + return unless (path = options[:pidfile]) + + File.open(path, 'w') { |f| f.puts(Process.pid) } end def parse_cli_args(argv) diff --git a/lib/shoryuken/launcher.rb b/lib/shoryuken/launcher.rb index 6027f1c0..243e15fc 100644 --- a/lib/shoryuken/launcher.rb +++ b/lib/shoryuken/launcher.rb @@ -3,27 +3,17 @@ class Launcher include Util def initialize - count = Shoryuken.options.fetch(:concurrency, 25) - - raise(ArgumentError, "Concurrency value #{count} is invalid, it needs to be a positive number") unless count > 0 - - @managers = Array.new(count) do - Shoryuken::Manager.new(1, - Shoryuken::Fetcher.new, - Shoryuken.options[:polling_strategy].new(Shoryuken.queues)) - end + @manager = Shoryuken::Manager.new(Shoryuken::Fetcher.new, + Shoryuken.options[:polling_strategy].new(Shoryuken.queues)) end def stop(options = {}) - @managers.map do |manager| - Thread.new { manager.stop(shutdown: !!options[:shutdown], timeout: Shoryuken.options[:timeout]) } - end.each(&:join) + @manager.stop(shutdown: !options[:shutdown].nil?, + timeout: Shoryuken.options[:timeout]) end def run - @managers.map do |manager| - Thread.new { manager.start } - end.each(&:join) + @manager.start end end end diff --git a/lib/shoryuken/manager.rb b/lib/shoryuken/manager.rb index e6dfb488..e5544a30 100644 --- a/lib/shoryuken/manager.rb +++ b/lib/shoryuken/manager.rb @@ -4,8 +4,10 @@ class Manager BATCH_LIMIT = 10 - def initialize(count, fetcher, polling_strategy) - @count = count + def initialize(fetcher, polling_strategy) + @count = Shoryuken.options.fetch(:concurrency, 25) + + raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") unless @count > 0 @queues = Shoryuken.queues.dup.uniq @@ -15,7 +17,7 @@ def initialize(count, fetcher, polling_strategy) @fetcher = fetcher @polling_strategy = polling_strategy - # @heartbeat = Concurrent::TimerTask.new(run_now: true, execution_interval: 0.1, timeout_interval: 60) { dispatch } + @heartbeat = Concurrent::TimerTask.new(run_now: true, execution_interval: 0.1, timeout_interval: 60) { dispatch } @pool = Concurrent::FixedThreadPool.new(@count, max_queue: @count) end @@ -23,8 +25,7 @@ def initialize(count, fetcher, polling_strategy) def start logger.info { 'Starting' } - # @heartbeat.execute - dispatch + @heartbeat.execute end def stop(options = {}) @@ -37,9 +38,9 @@ def stop(options = {}) fire_event(:shutdown, true) - logger.info { "Shutting down workers" } + logger.info { 'Shutting down workers' } - # @heartbeat.kill + @heartbeat.kill if options[:shutdown] hard_shutdown_in(options[:timeout]) @@ -50,8 +51,6 @@ def stop(options = {}) def processor_done(queue) logger.debug { "Process done for '#{queue}'" } - - dispatch end private @@ -66,7 +65,7 @@ def dispatch return logger.debug { 'Pausing fetcher, because all processors are busy' } end - unless queue = @polling_strategy.next_queue + unless (queue = @polling_strategy.next_queue) return logger.debug { 'Pausing fetcher, because all queues are paused' } end @@ -117,11 +116,10 @@ def hard_shutdown_in(delay) @pool.shutdown - unless @pool.wait_for_termination(delay) - logger.info { "Hard shutting down #{busy} busy workers" } + return if @pool.wait_for_termination(delay) - @pool.kill - end + logger.info { "Hard shutting down #{busy} busy workers" } + @pool.kill end def patch_batch!(sqs_msgs)