From dfee9170c4e39d165a591824fbf71d8a3996f249 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Mon, 2 Jul 2018 13:00:09 -0700 Subject: [PATCH 1/5] fix error when stream has EOF'd --- lib/sse_client/sse_client.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sse_client/sse_client.rb b/lib/sse_client/sse_client.rb index e31b8607..f8c756f0 100644 --- a/lib/sse_client/sse_client.rb +++ b/lib/sse_client/sse_client.rb @@ -84,7 +84,7 @@ def run_stream @logger.error { "Unexpected error from event source: #{e.inspect}" } @logger.debug { "Exception trace: #{e.backtrace}" } end - @cxn.close if !cxn.nil? + @cxn.close if !@cxn.nil? end end From 5f777cfa0f2edf0e7ee7d074f19ea46f1c2a4816 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Mon, 2 Jul 2018 13:06:08 -0700 Subject: [PATCH 2/5] fix race conditions in closing socket --- lib/sse_client/sse_client.rb | 1 + lib/sse_client/streaming_http.rb | 8 ++++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/lib/sse_client/sse_client.rb b/lib/sse_client/sse_client.rb index f8c756f0..23fded9f 100644 --- a/lib/sse_client/sse_client.rb +++ b/lib/sse_client/sse_client.rb @@ -61,6 +61,7 @@ def on_error(&action) def close if @stopped.make_true @cxn.close if !@cxn.nil? + @cxn = nil end end diff --git a/lib/sse_client/streaming_http.rb b/lib/sse_client/streaming_http.rb index 1c0ed52b..eeb80e82 100644 --- a/lib/sse_client/streaming_http.rb +++ b/lib/sse_client/streaming_http.rb @@ -1,3 +1,4 @@ +require "concurrent/atomics" require "http_tools" require "socketry" @@ -15,11 +16,14 @@ def initialize(uri, proxy, headers, connect_timeout, read_timeout) @reader = HTTPResponseReader.new(@socket, read_timeout) @status = @reader.status @headers = @reader.headers + @closed = Concurrent::AtomicBoolean.new(false) end def close - @socket.close if @socket - @socket = nil + if @closed.make_true + @socket.close if @socket + @socket = nil + end end # Generator that returns one line of the response body at a time (delimited by \r, \n, From 6a74cbed007d1aafdec3ef289dd0df95a1d78cf6 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Mon, 2 Jul 2018 13:08:21 -0700 Subject: [PATCH 3/5] more exception handling --- lib/sse_client/sse_client.rb | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/sse_client/sse_client.rb b/lib/sse_client/sse_client.rb index 23fded9f..ccce3f0a 100644 --- a/lib/sse_client/sse_client.rb +++ b/lib/sse_client/sse_client.rb @@ -85,7 +85,12 @@ def run_stream @logger.error { "Unexpected error from event source: #{e.inspect}" } @logger.debug { "Exception trace: #{e.backtrace}" } end - @cxn.close if !@cxn.nil? + begin + @cxn.close if !@cxn.nil? + rescue StandardError => e + @logger.error { "Unexpected error while closing stream: #{e.inspect}" } + @logger.debug { "Exception trace: #{e.backtrace}" } + end end end From e9d395abcc537297f7964fc0743f2b0f6130628b Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Mon, 2 Jul 2018 13:39:28 -0700 Subject: [PATCH 4/5] slightly better race condition avoidance --- lib/sse_client/sse_client.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/sse_client/sse_client.rb b/lib/sse_client/sse_client.rb index ccce3f0a..9f285360 100644 --- a/lib/sse_client/sse_client.rb +++ b/lib/sse_client/sse_client.rb @@ -78,6 +78,9 @@ def run_stream @cxn = nil begin @cxn = connect + # There's a potential race if close was called in the middle of the previous line, i.e. after we + # connected but before @cxn was set. Checking the variable again is a bit clunky but avoids that. + return if @stopped.value read_stream(@cxn) if !@cxn.nil? rescue Errno::EBADF # don't log this - it probably means we closed our own connection deliberately From ea78edfb175739f5910890502cc698d49f6712cf Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Mon, 2 Jul 2018 17:00:38 -0700 Subject: [PATCH 5/5] unit test for reconnecting after stream EOF --- spec/sse_client/sse_client_spec.rb | 38 ++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/spec/sse_client/sse_client_spec.rb b/spec/sse_client/sse_client_spec.rb index ecb7c4e4..54f1f5c7 100644 --- a/spec/sse_client/sse_client_spec.rb +++ b/spec/sse_client/sse_client_spec.rb @@ -136,4 +136,42 @@ def with_client(client) end end end + + it "reconnects if stream returns EOF" do + events_body_1 = <<-EOT +event: go +data: foo + +EOT + events_body_2 = <<-EOT +event: go +data: bar + +EOT + with_server do |server| + attempt = 0 + server.setup_response("/") do |req,res| + attempt += 1 + if attempt == 1 + res.body = events_body_1 + else + res.body = events_body_2 + end + res.content_type = "text/event-stream" + res.status = 200 + end + + event_sink = Queue.new + client = subject.new(server.base_uri, + reconnect_time: 0.25, read_timeout: 0.25) do |c| + c.on_event { |event| event_sink << event } + end + + with_client(client) do |client| + expect(event_sink.pop).to eq(SSE::SSEEvent.new(:go, "foo", nil)) + expect(event_sink.pop).to eq(SSE::SSEEvent.new(:go, "bar", nil)) + expect(attempt).to be >= 2 + end + end + end end