Skip to content

Commit

Permalink
Merge pull request #262 from dblock/async-disconnect
Browse files Browse the repository at this point in the history
Closes #257
  • Loading branch information
dblock authored Apr 8, 2019
2 parents a9dabc5 + cf6d601 commit 8e2df24
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 71 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ Gemfile.lock
.DS_Store
.bundle
.idea
.rspec_status
12 changes: 10 additions & 2 deletions .rubocop_todo.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
# This configuration was generated by
# `rubocop --auto-gen-config`
# on 2019-01-19 17:37:31 -0500 using RuboCop version 0.61.1.
# on 2019-04-07 11:03:29 -0400 using RuboCop version 0.61.1.
# The point is for the user to remove these configuration records
# one by one as the offenses are removed from the code base.
# Note that changes in the inspected code, or installation of new
# versions of RuboCop, may require this file to be generated again.

# Offense count: 2
# Configuration parameters: AllowSafeAssignment.
Lint/AssignmentInCondition:
Exclude:
- 'lib/slack/real_time/concurrency/async.rb'
- 'lib/slack/real_time/socket.rb'

# Offense count: 4
Lint/HandleExceptions:
Exclude:
Expand Down Expand Up @@ -45,10 +52,11 @@ Style/AccessModifierDeclarations:
Style/GlobalVars:
Enabled: false

# Offense count: 2
# Offense count: 3
# Configuration parameters: MinBodyLength.
Style/GuardClause:
Exclude:
- 'lib/slack/real_time/socket.rb'
- 'lib/slack/real_time/stores/store.rb'
- 'lib/slack/web/faraday/response/raise_error.rb'

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
### 0.14.2 (Next)

* [#256](https://github.com/slack-ruby/slack-ruby-client/pull/256): Added support for specifying signing secrets on a per-request basis via optional parameters to the `Slack::Events::Request` constructor - [@gabrielmdeal](https://github.com/gabrielmdeal).
* [#257](https://github.com/slack-ruby/slack-ruby-client/pull/257), [#262](https://github.com/slack-ruby/slack-ruby-client/pull/262): Fixed occasional failures to reconnect - [@ioquatix](https://github.com/ioquatix), [@dblock](https://github.com/dblock).
* Your contribution here.

### 0.14.1 (2019/2/26)
Expand Down
2 changes: 1 addition & 1 deletion Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ end
require 'rubocop/rake_task'
RuboCop::RakeTask.new

task default: %i[rubocop spec]
task default: %i[spec rubocop]

load 'tasks/git.rake'
load 'tasks/web.rake'
Expand Down
29 changes: 23 additions & 6 deletions lib/slack/real_time/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ def config

def run_loop
@socket.connect! do |driver|
@callback.call(driver) if @callback

driver.on :open do |event|
logger.debug("#{self.class}##{__method__}") { event.class.name }
open(event)
Expand All @@ -100,16 +98,35 @@ def run_loop
close(event)
callback(event, :closed)
end

# This must be called last to ensure any events are registered before invoking user code.
@callback.call(driver) if @callback
end
end

def run_ping!
# Ensure the server is running, and ping the remote server if no other messages were sent.
def keep_alive?
# We can't ping the remote server if we aren't connected.
return false if @socket.nil? || !@socket.connected?

time_since_last_message = @socket.time_since_last_message
return if time_since_last_message < websocket_ping
raise Slack::RealTime::Client::ClientNotStartedError if !@socket.connected? || time_since_last_message > (websocket_ping * 2)

# If the server responded within the specified time, we are okay:
return true if time_since_last_message < websocket_ping

# If the server has not responded for a while:
return false if time_since_last_message > (websocket_ping * 2)

# Kick off the next ping message:
ping
rescue Slack::RealTime::Client::ClientNotStartedError

true
end

# Check if the remote server is responsive, and if not, restart the connection.
def run_ping!
return if keep_alive?

restart_async
end

Expand Down
78 changes: 55 additions & 23 deletions lib/slack/real_time/concurrency/async.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
require 'async/websocket'
require 'async/notification'
require 'async/clock'

module Slack
module RealTime
module Concurrency
module Async
class Reactor < ::Async::Reactor
def_delegators :@timers, :cancel
end

class Client < ::Async::WebSocket::Client
extend ::Forwardable
def_delegators :@driver, :on, :text, :binary, :emit
Expand All @@ -17,35 +14,57 @@ class Client < ::Async::WebSocket::Client
class Socket < Slack::RealTime::Socket
attr_reader :client

def start_sync(client)
start_reactor(client).wait
end

def start_async(client)
@reactor = Reactor.new
Thread.new do
start_reactor(client)
end
end

def start_reactor(client)
Async do |task|
@restart = ::Async::Notification.new

if client.run_ping?
@reactor.every(client.websocket_ping) do
client.run_ping!
@ping_task = task.async do |subtask|
subtask.annotate 'client keep-alive'

# The timer task will naturally exit after the driver is set to nil.
while @restart
subtask.sleep client.websocket_ping
client.run_ping! if @restart
end
end
end
@reactor.run do |task|
task.async do
client.run_loop

while @restart
@client_task.stop if @client_task

@client_task = task.async do |subtask|
begin
subtask.annotate 'client run-loop'
client.run_loop
rescue ::Async::Wrapper::Cancelled => e
# Will get restarted by ping worker.
client.logger.warn(subtask.to_s) { e.message }
end
end

@restart.wait
end

@ping_task.stop if @ping_task
end
end

def restart_async(client, new_url)
def restart_async(_client, new_url)
@url = new_url
@last_message_at = current_time
return unless @reactor

@reactor.async do
client.run_loop
end
end

def disconnect!
super
@reactor.cancel
@restart.signal if @restart
end

def current_time
Expand All @@ -57,14 +76,27 @@ def connect!
run_loop
end

# Kill the restart/ping loop.
def disconnect!
super
ensure
if restart = @restart
@restart = nil
restart.signal
end
end

# Close the socket.
def close
@closing = true
@driver.close if @driver
super
ensure
if @socket
@socket.close
@socket = nil
end
end

def run_loop
@closing = false
while @driver && @driver.next_event
# $stderr.puts event.inspect
end
Expand Down
1 change: 0 additions & 1 deletion lib/slack/real_time/concurrency/celluloid.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def disconnect!

def close
@closing = true
driver.close if driver
super
end

Expand Down
5 changes: 0 additions & 5 deletions lib/slack/real_time/concurrency/eventmachine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ def disconnect!
@thread = nil
end

def close
driver.close if driver
super
end

def send_data(message)
logger.debug("#{self.class}##{__method__}") { message }
driver.send(message)
Expand Down
25 changes: 16 additions & 9 deletions lib/slack/real_time/socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ def initialize(url, options = {})
def send_data(message)
logger.debug("#{self.class}##{__method__}") { message }
case message
when Numeric then driver.text(message.to_s)
when String then driver.text(message)
when Array then driver.binary(message)
when Numeric then @driver.text(message.to_s)
when String then @driver.text(message)
when Array then @driver.binary(message)
else false
end
end
Expand All @@ -29,21 +29,24 @@ def connect!
return if connected?

connect
logger.debug("#{self.class}##{__method__}") { driver.class }
logger.debug("#{self.class}##{__method__}") { @driver.class }

driver.on :message do
@driver.on :message do
@last_message_at = current_time
end

yield driver if block_given?
yield @driver if block_given?
end

# Gracefully shut down the connection.
def disconnect!
driver.close
@driver.close
ensure
close
end

def connected?
!driver.nil?
!@driver.nil?
end

def start_sync(client)
Expand Down Expand Up @@ -73,7 +76,11 @@ def current_time
end

def close
@driver = nil
# When you call `driver.emit(:close)`, it will typically end up calling `client.close` which will call `@socket.close` and end up back here. In order to break this infinite recursion, we check and set `@driver = nil` before invoking `client.close`.
if driver = @driver
@driver = nil
driver.emit(:close)
end
end

protected
Expand Down
Loading

0 comments on commit 8e2df24

Please sign in to comment.