Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Slack Socket level ping/pong #226

Merged
merged 20 commits into from
Oct 17, 2018
Merged

Conversation

RodneyU215
Copy link
Collaborator

Summary

This PR attempts to add websocket-level ping monitoring (#223):

  • I've added a check to validate the socket is connected before sending any outgoing messages.
  • I've implemented heartbeat following Slack's RTM guidelines. See "Ping and Pong".
  • The argument in this statement heartbeat 30 should likely be configurable.

It's not quite "shippable" yet, but I'd love some feedback on this.

Requirements

@dblock
Copy link
Collaborator

dblock commented Sep 25, 2018

Looks like a solid start. Thanks!

  • We need this for all RTM support (eventmachine, celluloid and async); we actually mostly care about async since it's supported and recommended today as default
  • Refactor parts that can be shared such as storing ping/pong for the above, there's a lot of looping and some OOO could help :)
  • There's a thread and updating a hash from multiple threads if I am not mistaken, is that thread safe?
  • Can we avoid a thread altogether and use asynchronous constructs of EM/Celluloid/Async?
  • Is the ping thread dying on any kind of send error?

@RodneyU215
Copy link
Collaborator Author

RodneyU215 commented Oct 3, 2018

Thanks for the feedback @dblock!

I've been thinking of ways to refactor this as well as chatting with a few other engineers at Slack and I wanted to get your perspective on a new approach.

Taking your feedback into consideration what do you think about implementing an async specific ping method in Slack::RealTime::Concurrency::Async::Socket that recursively sent out messages and checked to to ensure the connection was still “alive”.

          def ping(delay, task, ping_id=0)
            unless @alive
              disconnect! if connected?
              return close
            end

            ping_data = { type: 'ping', id: ping_id }
            send_data(ping_data.to_json)
            @alive = false

            task.sleep delay

            ping(delay, task, ping_id + 1)
          end

This would require us to update the connect! method so that by default each pong event would update the socket.

          def connect!
            super do |driver|
              driver.on :message do |event|
                event_data = JSON.parse(event.data)
                @alive = true if event_data['type'] == 'pong'
              end
              yield driver
            end
            run_loop
          end

Lastly we’d ensure that this was run on a different block from the client’s run_loop following the example here.

          def start_async(client)
            Thread.new do
              ::Async::Reactor.run do
                ::Async::Reactor.run do
                  client.run_loop
                end
                ::Async::Reactor.run do |task|
                  @alive = true
                  ping(task)
                end
              end
            end
          end

What do you think?

@dblock
Copy link
Collaborator

dblock commented Oct 3, 2018

Looks reasonable and I would merge support for just one of the async implementations for this.

I don't see it being difficult to generalize this to all 3 EM, celluloid, async, so I encourage you to implement this all the way for one, then expand from there.

sleep delay
end

ping(delay, ping_id + 1, &block)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to blow up the stack eventually, won't it?

@dblock
Copy link
Collaborator

dblock commented Oct 5, 2018

@RodneyU215 This is a lot better.

  • Make the ping loop iterative, not recursive.
  • Ping loop isn't resilient, I am pretty sure that sending data over a dead socket will abort the ping loop while the connection may still be very much alive and just keep going.

client.run_loop
end
::Async::Reactor.run do |task|
ping do |delay|
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe model this as run_loop into a run_ping that's something that does ping every once in a while and recovers from errors.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on the types of errors you'd like to recover from as well as what the expected recover steps would be?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can really be anything, but from looking at the code if send_data fails in any way the ping worker dies.

@dblock
Copy link
Collaborator

dblock commented Oct 6, 2018

I think you're getting close @RodneyU215, nice work.

  • Error handling is an important one. It also looks pretty trivial to make it work for all async drivers from here.
  • Do we need a way to disable this in configuration? Configure frequency of ping?
  • README should be talking about pings

@RodneyU215
Copy link
Collaborator Author

RodneyU215 commented Oct 7, 2018

Thanks for all of the feedback you've given!

Configure frequency of ping?

Upon closer inspection around the configuration it appears that there's already a option for websocket_ping which by default is set to 30. I've switched the implementation to use this.

Do we need a way to disable this in configuration?

I've added a check to see if websocket_ping was set. If users set this to 0 or nil then the code will not run.

README should be talking about pings

The websocket_ping lists the default behavior in #configuring-slackrealtimeclient as "The number of seconds that indicates how often the WebSocket should send ping frames, default is 30.". This to me implies that it was supposed to be behaving this way the whole time. Is there any other detail you wanted added? Seems like a solid description.

@RodneyU215
Copy link
Collaborator Author

I'd like to talk a bit more around error handling. If the driver is unable to communicate with the server I can only really see reestablishing the connection or exiting the application as the best next steps.

In the current implementation on subsequent unanswered pings we disconnect and close the Socket connection. So that when send_data is called immediately after it'll throw a Slack::RealTime::Socket::SocketNotConnectedError.

To keep the application running and to reestablish the connection what do you think about catching that error, running connect again and retrying the ping task?

::Async::Reactor.run do |task|
  begin
    client.run_ping do |delay|
      task.sleep delay
    end
  rescue Slack::RealTime::Socket::SocketNotConnectedError
    connect
    retry
  end
end

@dblock
Copy link
Collaborator

dblock commented Oct 8, 2018

@RodneyU215 Sounds like a good approach. Since this will be the same for every driver, see how you can sink this implementation into a shared class.

@dblock
Copy link
Collaborator

dblock commented Oct 8, 2018

The websocket ping description is good enough. I guess before we were documenting it but it was actually never run :) I would change "should send frames" to describing what actually happens now with this code somewhere.

@alive = true

driver.on :message do |event|
event_data = JSON.parse(event.data)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we do, but do we need to handle garbage from the server here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good idea, though I'd recommend handling that in a separate PR. One where Slack::Messages::Message is used and updating it so that it would take care of any parsing errors that may occur.

@RodneyU215
Copy link
Collaborator Author

Okay, so I've rebuilt things a bit to implement the ability for the client to automatically reconnect on ping failures. It felt a bit like scope creep but overall I think it's a better implementation. I've tested things manually and it worked, but I'll add unit tests tomorrow. Let me know what you think.

@dblock
Copy link
Collaborator

dblock commented Oct 9, 2018

I think that's what we want - taking away from the user any kind of worry that the connection goes away. Try making it work for EventMachine and Celluloid too, the retry pattern probably belongs inside a generic class.

@dblock dblock mentioned this pull request Oct 10, 2018
ping_data = { type: 'ping', time: Time.now, id: "p#{ping_int += 1}" }
ping(ping_data)
end
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do you call Thread#join? If two threads try to write to the socket at the same time, what happens when the data becomes interleaved?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current PR version is no longer using a thread @ioquatix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry my bad.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries. Please do make suggestions about this implementation! There's a lot of looping around in that calling code :)

@ioquatix
Copy link
Contributor

Using a thread to implement this is very risky.

@@ -15,7 +15,22 @@ class Socket < Slack::RealTime::Socket
def start_async(client)
Thread.new do
::Async::Reactor.run do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simplify this code by something like

::Async::Reactor.run do |task|
  task.async do
    # nested task
  end
end

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point! I'll fix this.

@ioquatix
Copy link
Contributor

Okay, I looked at what you are doing. I think there is a fundamental issue with how this code is structured, but that being said, here is a rough outline of what I'd do:

class Client
	attr :last_message_at
	
	def time_since_last_message
		Async::Clock.now - @last_message_at
	end
	
	def run_loop
		on(:message) do
			@last_message_at = Async::Clock.now
		end
	end
end

class Socket
	def start_async
		::Async::Reactor.run do |task|
			task.async do
				while @running # or true, some way to control this loop for graceful shutdown
					# Probably make this one method?
					client.build_socket
					begin
						client.run_loop
					rescue EOFError, EPIPE #, etc
						# Log and loop around again
					end
				end
			end

			task.async do |subtask|
				while true
					subtask.sleep delay
					
					client.ping # Will eventually trigger some kind of "pong".
					
					if client.time_since_last_message > (delay * 2)
						# Connection failed
						client.close
					end
				end
			end
		end
	end
end

@ioquatix
Copy link
Contributor

As a side note, you don't need to send the ping if some other message was received recently... you'd just dynamically adjust the delay, and then when you wake up, don't send ping if something else was sent in the mean time. That avoids sending unnecessary junk.

@dblock
Copy link
Collaborator

dblock commented Oct 10, 2018

@ioquatix Care to explain the fundamental issue? For my own education?

I like avoiding unnecessary pings and just keeping the timestamp of the last message.

@ioquatix
Copy link
Contributor

Okay, so the fundamental issue is the callback structure of the code and basically it makes it very hard to reason about. The main issue I have when reading the code is understanding the call flow and sequence of events that lead up to a particular line being executed. The solution that async brings to the table is ensuring your call graph is still essentially linear even with blocking operations. You still need to write sane, linear, readable code, but it's much harder to make mistakes and those mistakes are generally more obvious.

The next step is to consider invariants and how they are maintained. For example, are your connection stateful or stateless and where do you maintain that state. How is it affected by reconnections - is it modelled explicitly or implicitly (e.g. some kind of on(:connect)).

You propose reconnecting to the same URL, but is this valid? Is it possible after 24 hours, that URL changes? Where in your API do you need to implement the "timeout/reconnect" loop? That loop probably needs to be far up the call tree.

I see you try to restart the run loop in the exception handler, but then after that call retry. That code already seems wrong, do you want to retry the event loop after it's been gracefully shut down?

Ideally, you have some way to signal a disconnect from any part of the code and take appropriate action. If you are not careful, tracking state (which naturally includes which event handlers are registered) becomes difficult and buggy. Think about the following flow:

  • Create client
  • Register some events
  • Loop connect/reconnect

After the first reconnect, the initial events would be lost because the entire web socket is thrown away, no? If things are stateful, it's even more tricky to handle correctly.

@RodneyU215
Copy link
Collaborator Author

RodneyU215 commented Oct 10, 2018

FYI, I pushed that last commit before I had to chance to see all of this new feedback.

@ioquatix Thanks for the added insight! Dynamically adjusting the delay (before sending out a ping), based on the last message, is a great suggestion. I also agree that using task.async is a lot cleaner. I'll definitely go back and update these.

In the code you suggested I noticed that you've chosen not to check for a pong event. Was this to minimize the amount of state needed to update time_since_last_message in your example? It feels a little odd to skip over any sort of validation that a pong message was sent back given that it validates the ping was actually received.

Other than that It sounds like the only thing left is the logic to rebuild the web socket connection.

I see you try to restart the run loop in the exception handler, but then after that call retry. That code already seems wrong, do you want to retry the event loop after it's been gracefully shut down?

In the latest version I've push this logic into a method called restart_async. The code however always rebuilt the socket connection before running the event loop again in a new async task. Can you elaborate on why this seems wrong? I don't mean to be dense; I'm having a hard time seeing the problems this would cause.

You propose reconnecting to the same URL, but is this valid? Is it possible after 24 hours, that URL changes? Where in your API do you need to implement the "timeout/reconnect" loop? That loop probably needs to be far up the call tree.

WebSocket Message Server URL's are only valid for 30 seconds but because we call build_socket when attempting to reconnect, we retrieve and set a new URL.

After the first reconnect, the initial events would be lost because the entire web socket is thrown away, no? If things are stateful, it's even more tricky to handle correctly.

If I'm understanding this correctly the events registered by users are already stored in @callbacks in the client. We'd still maintain that state and push them back to the socket class during the rebuild.

end

def current_time
Time.now
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Time.now is not monotonic.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://blog.dnsimple.com/2018/03/elapsed-time-with-ruby-the-right-way/

Running to work to fix all the instances of Time.now that I have all over the place :)

Copy link
Contributor

@ioquatix ioquatix Oct 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, yeah, it's a common problem :p - common enough that I decided to add it to async. I might end up using hitime to do it which is supposedly more accurate, but for now the Process.clock method is okay.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid feedback! Thanks for the link dblock! I'll change that.

@ioquatix
Copy link
Contributor

In the code you suggested I noticed that you've chosen not to check for a pong event. Was this to minimize the amount of state needed to update time_since_last_message in your example? It feels a little odd to skip over any sort of validation that a pong message was sent back given that it validates the ping was actually received.

It's really not necessary to confirm it's received. If no message is received within a certain timeframe, then the connection is considered dead. You don't care what kind of message it is, it doesn't change the outcome or behaviour.

In the latest version I've push this logic into a method called restart_async. The code however always rebuilt the socket connection before running the event loop again in a new async task. Can you elaborate on why this seems wrong? I don't mean to be dense; I'm having a hard time seeing the problems this would cause.

It looks like that code path has been removed. I was referring to 88ea7a5#diff-7de932a86b84fcf2a3e218dd5515f726L31

@ioquatix
Copy link
Contributor

Ultimately, I think you just want to ensure all state is cleaned up correctly and that an entirely new instance is re-created. Because you are supporting different concurrency models, this is tricky. You are using a Thread as your single high level abstraction, but keep in mind you still want to ensure that any run-loop specific concerns are taken care of correctly and that you aren't leaking threads, file descriptors, etc.

@RodneyU215
Copy link
Collaborator Author

@dblock I've addressed the outstanding feedback and added a few unit tests. The only thing I see to expand on is updating the Celluloid and EventMachine sockets so that their implementations of start_async kicks off the run_ping loop as well as implements the restart_async method.

While I'd be happy to help with that; I believe it may be best for a future PR. This one has been open for a bit and I believe should help out a lot of people who may be currently experiencing disconnects with the clients default Async socket.

Would this work for you?

@ioquatix
Copy link
Contributor

You might like to add this to your specs: https://github.com/socketry/async-rspec/blob/master/lib/async/rspec/leaks.rb

It detects leaks. It's not async specific, it works with any spec that opens/closes IOs.

Copy link
Collaborator

@dblock dblock left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks great to me. I am ready to merge this with either:

  1. Document in README/CHANGELOG that ping was added for Async only, not for EM/Celluloid, open an issue to implement those
  2. Implement EM/Celluloid

I would vote for 2, cause I have faith in you @RodneyU215, but I'll take 1) too ;)

CHANGELOG.md Outdated

* Your contribution here.

### 0.13.2 (2018/10/16)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hasn't shipped, so please put this back, we do this during release.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops! I'll fix it now.

@dblock
Copy link
Collaborator

dblock commented Oct 16, 2018

You might like to add this to your specs: https://github.com/socketry/async-rspec/blob/master/lib/async/rspec/leaks.rb

It detects leaks. It's not async specific, it works with any spec that opens/closes IOs.

Lets def add this in another PR! #230

@dblock dblock mentioned this pull request Oct 16, 2018
@dblock
Copy link
Collaborator

dblock commented Oct 16, 2018

@ioquatix Appreciate another code review of the state of things here when you get a chance. Thank you!

@RodneyU215
Copy link
Collaborator Author

@dblock I'm not going anywhere! 😊I'd like to help get EM/Celluloid implemented too. We'll have more time to jump on it after this tour around Europe.

@dblock dblock merged commit fc9e4bb into slack-ruby:master Oct 17, 2018
@dblock
Copy link
Collaborator

dblock commented Oct 17, 2018

I've merged this, thanks for your time @RodneyU215. Lets try to get EM/Celluloid in soon, I'll spend some time upgrading one of my bots to this and see if this works in production, too.

@dblock
Copy link
Collaborator

dblock commented Oct 17, 2018

@RodneyU215 Take a look at the broken build in https://travis-ci.org/slack-ruby/slack-ruby-client/jobs/442629093?

We run integration tests with a SLACK_API_TOKEN on master only, so this wasn't surfaced in this PR.

@RodneyU215
Copy link
Collaborator Author

I'm looking into this now!

@ioquatix
Copy link
Contributor

It looks like at least one aspect is that you are missing require 'async/clock'.

@dblock
Copy link
Collaborator

dblock commented Feb 25, 2019

I've released:

  • slack-ruby-client 0.14.0
  • slack-ruby-bot 0.12.0
  • slack-ruby-bot-server 0.9.0

Let's see how this puppy works and be attentive to fixing new bugs. Appreciate everyone's hard work!

@dblock
Copy link
Collaborator

dblock commented Feb 25, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants