Skip to content

Commit

Permalink
#68 - rewritten timer logic on the session - we're using 3 timers now…
Browse files Browse the repository at this point in the history
… instead of 1.
  • Loading branch information
majek committed May 31, 2012
1 parent 8f5bc2b commit 9364173
Showing 1 changed file with 111 additions and 37 deletions.
148 changes: 111 additions & 37 deletions src/transport.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# ***** END LICENSE BLOCK *****

stream = require('stream')
events = require('events')
uuid = require('node-uuid')
utils = require('./utils')

Expand Down Expand Up @@ -60,23 +61,97 @@ SockJSConnection.prototype.__defineGetter__ 'readyState', ->

MAP = {}

class Timeout
constructor: (callback, @delay) ->
@missed = 0
@doTimeout = =>
@missed += 1
# 1. reschedule
if @delay
@tref = setTimeout(@doTimeout, @delay)
# 2. call
if callback
callback(@missed)

start: () ->
@missed = 0
if @tref
clearTimeout(@tref)
if @delay
@tref = setTimeout(@doTimeout, @delay)

stop: () ->
if @tref
clearTimeout(@tref)
@tref = null

poke: () ->
@stop()
@start()


class SessionTimer extends events.EventEmitter
constructor: (session, disconnect_delay, send_delay, recv_delay) ->
doPollTimeout = (m) =>
console.log('doPollTimeout',m);
session.doPollTimeout(m)
doSendTimeout = (m) =>
console.log('doSendTimeout',m);
session.doSendTimeout(m)
doRecvTimeout = (m) =>
console.log('doRecvTimeout',m);
session.doRecvTimeout(m)
@no_poll_tref = new Timeout(doPollTimeout, disconnect_delay)
@send_tref = new Timeout(doSendTimeout, send_delay)
@recv_tref = new Timeout(doRecvTimeout, recv_delay)
@no_poll_tref.start()
@recv_tref.start()
console.log('timer: start')

poll_start: () ->
console.log('timer: poll start')
@no_poll_tref.stop()
@send_tref.start()
@recv_tref.poke()

poll_end: () ->
console.log('timer: poll end')
@no_poll_tref.start()
@send_tref.stop()

send: () ->
console.log('timer: send')
@send_tref.poke()

recv: () ->
console.log('timer: recv')
@recv_tref.poke()

close: () ->
console.log('timer: close')
@no_poll_tref.stop()
@send_tref.stop()
@recv_tref.stop()


class Session
constructor: (@session_id, server) ->
@server_heartbeat_interval = server.options.server_heartbeat_interval
@client_heartbeat_reply = server.options.client_heartbeat_reply
@client_heartbeat_count = false
@disconnect_delay = server.options.disconnect_delay
@client_heartbeat_reply = server.options.client_heartbeat_reply
@prefix = server.options.prefix
@send_buffer = []
@is_closing = false
@readyState = Transport.CONNECTING
if @session_id
MAP[@session_id] = @
@timeout_cb = =>
@didTimeout()
@heartbeat_cb = =>
@doHeartbeat()
@to_tref = setTimeout(@timeout_cb, @disconnect_delay)
if server.options.client_heartbeat_reply
# We want the 'h' frame to fire just before the empty data
# "a[]" frame - to save bandwidth. After "h", "a[]" is
# spurious.
recv_delay = server.options.server_heartbeat_interval - 2
@timer = new SessionTimer(@,
server.options.disconnect_delay,
server.options.server_heartbeat_interval,
recv_delay)
@connection = new SockJSConnection(@)
@emit_open = =>
@emit_open = null
Expand All @@ -87,19 +162,16 @@ class Session
recv.doSendFrame(closeFrame(2010, "Another connection still open"))
recv.didClose()
return
if @to_tref
clearTimeout(@to_tref)
@to_tref = null
@connection.emit('poll')
@timer.poll_start()
if @readyState is Transport.CLOSING
recv.doSendFrame(@close_frame)
recv.didClose()
@to_tref = setTimeout(@timeout_cb, @disconnect_delay)
@timer.poll_end()
return

@client_heartbeat_count = 0
@connection.emit('poll')
# Registering. From now on 'unregister' is responsible for
# setting the timer.
# calling poll_end.
@recv = recv
@recv.session = @

Expand Down Expand Up @@ -142,48 +214,50 @@ class Session
@connection.headers = headers

unregister: ->
@recv.session = null
@recv = null
if @to_tref
clearTimeout(@to_tref)
@to_tref = setTimeout(@timeout_cb, @disconnect_delay)
@recv = @recv.session = null
@timer.poll_end()

tryFlush: ->
if @send_buffer.length > 0
[sb, @send_buffer] = [@send_buffer, []]
@recv.doSendBulk(sb)

if @to_tref
clearTimeout(@to_tref)
@to_tref = setTimeout(@heartbeat_cb, @server_heartbeat_interval)
@timer.send()
return

doHeartbeat: ->
@client_heartbeat_count += 1
console.log('heartbeat', @client_heartbeat_count)
if @client_heartbeat_reply and @client_heartbeat_count is 2
doSendTimeout: ->
@sendEmptyFrame()

doRecvTimeout: (missed) ->
if not @client_heartbeat_reply
return
if missed is 1
@sendHeartbeat()
else if missed is 2
console.log("Heartbeat missed")
@close(1006, "Heartbeat missed")
else if @sendHeartbeat()
@to_tref = setTimeout(@heartbeat_cb, @server_heartbeat_interval)

sendHeartbeat: ->
if not @recv
return false
@recv.doSendFrame("h")
@timer.send()
return true

sendEmptyFrame: ->
if not @recv
return false
@recv.doSendFrame("a[]")
@timer.send()
return true

didTimeout: ->
if @to_tref
clearTimeout(@to_tref)
@to_tref = null
doPollTimeout: ->
if @readyState isnt Transport.CONNECTING and
@readyState isnt Transport.OPEN and
@readyState isnt Transport.CLOSING
throw Error('INVALID_STATE_ERR')
if @recv
throw Error('RECV_STILL_THERE')
@timer.close()
@readyState = Transport.CLOSED
# Node streaming API is broken. Reader defines 'close' and 'end'
# but Writer defines only 'close'. 'End' isn't optional though.
Expand All @@ -202,7 +276,7 @@ class Session
@connection.emit('data', msg)
else
@connection.emit('heartbeat')
@client_heartbeat_count = 0
@timer.recv()
return

send: (payload) ->
Expand Down Expand Up @@ -263,7 +337,7 @@ class GenericReceiver
session = @session
@didClose(status, reason)
if session
session.didTimeout()
session.doPollTimeout()

didClose: (status, reason) ->
if @thingy
Expand Down

0 comments on commit 9364173

Please sign in to comment.