Skip to content

Commit

Permalink
💄
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Cantero committed Feb 13, 2017
1 parent f75a943 commit fed9d81
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 35 deletions.
10 changes: 4 additions & 6 deletions lib/shoryuken/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def run(args)

@launcher = Shoryuken::Launcher.new

if callback = Shoryuken.start_callback
if (callback = Shoryuken.start_callback)
logger.info { 'Calling Shoryuken.on_start block' }
callback.call
end
Expand Down Expand Up @@ -103,11 +103,9 @@ def daemonize(options)
end

def write_pid(options)
if (path = options[:pidfile])
File.open(path, 'w') do |f|
f.puts Process.pid
end
end
return unless (path = options[:pidfile])

File.open(path, 'w') { |f| f.puts(Process.pid) }
end

def parse_cli_args(argv)
Expand Down
20 changes: 5 additions & 15 deletions lib/shoryuken/launcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,17 @@ class Launcher
include Util

def initialize
count = Shoryuken.options.fetch(:concurrency, 25)

raise(ArgumentError, "Concurrency value #{count} is invalid, it needs to be a positive number") unless count > 0

@managers = Array.new(count) do
Shoryuken::Manager.new(1,
Shoryuken::Fetcher.new,
Shoryuken.options[:polling_strategy].new(Shoryuken.queues))
end
@manager = Shoryuken::Manager.new(Shoryuken::Fetcher.new,
Shoryuken.options[:polling_strategy].new(Shoryuken.queues))
end

def stop(options = {})
@managers.map do |manager|
Thread.new { manager.stop(shutdown: !!options[:shutdown], timeout: Shoryuken.options[:timeout]) }
end.each(&:join)
@manager.stop(shutdown: !options[:shutdown].nil?,
timeout: Shoryuken.options[:timeout])
end

def run
@managers.map do |manager|
Thread.new { manager.start }
end.each(&:join)
@manager.start
end
end
end
26 changes: 12 additions & 14 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ class Manager

BATCH_LIMIT = 10

def initialize(count, fetcher, polling_strategy)
@count = count
def initialize(fetcher, polling_strategy)
@count = Shoryuken.options.fetch(:concurrency, 25)

raise(ArgumentError, "Concurrency value #{@count} is invalid, it needs to be a positive number") unless @count > 0

@queues = Shoryuken.queues.dup.uniq

Expand All @@ -15,16 +17,15 @@ def initialize(count, fetcher, polling_strategy)
@fetcher = fetcher
@polling_strategy = polling_strategy

# @heartbeat = Concurrent::TimerTask.new(run_now: true, execution_interval: 0.1, timeout_interval: 60) { dispatch }
@heartbeat = Concurrent::TimerTask.new(run_now: true, execution_interval: 0.1, timeout_interval: 60) { dispatch }

@pool = Concurrent::FixedThreadPool.new(@count, max_queue: @count)
end

def start
logger.info { 'Starting' }

# @heartbeat.execute
dispatch
@heartbeat.execute
end

def stop(options = {})
Expand All @@ -37,9 +38,9 @@ def stop(options = {})

fire_event(:shutdown, true)

logger.info { "Shutting down workers" }
logger.info { 'Shutting down workers' }

# @heartbeat.kill
@heartbeat.kill

if options[:shutdown]
hard_shutdown_in(options[:timeout])
Expand All @@ -50,8 +51,6 @@ def stop(options = {})

def processor_done(queue)
logger.debug { "Process done for '#{queue}'" }

dispatch
end

private
Expand All @@ -66,7 +65,7 @@ def dispatch
return logger.debug { 'Pausing fetcher, because all processors are busy' }
end

unless queue = @polling_strategy.next_queue
unless (queue = @polling_strategy.next_queue)
return logger.debug { 'Pausing fetcher, because all queues are paused' }
end

Expand Down Expand Up @@ -117,11 +116,10 @@ def hard_shutdown_in(delay)

@pool.shutdown

unless @pool.wait_for_termination(delay)
logger.info { "Hard shutting down #{busy} busy workers" }
return if @pool.wait_for_termination(delay)

@pool.kill
end
logger.info { "Hard shutting down #{busy} busy workers" }
@pool.kill
end

def patch_batch!(sqs_msgs)
Expand Down

0 comments on commit fed9d81

Please sign in to comment.