From d0fe10e098033d8cc4be6932d7bcc389d25aac6d Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 13 Jun 2018 18:02:44 -0700 Subject: [PATCH 01/50] refactor to use an Event in update processor init --- ldclient-rb.gemspec | 2 -- lib/ldclient-rb/ldclient.rb | 39 +++++++++++++++++++++++++------------ lib/ldclient-rb/polling.rb | 5 ++++- lib/ldclient-rb/stream.rb | 6 +++++- spec/ldclient_spec.rb | 11 +---------- 5 files changed, 37 insertions(+), 26 deletions(-) diff --git a/ldclient-rb.gemspec b/ldclient-rb.gemspec index 4c6e8eeb..5eb6ad18 100644 --- a/ldclient-rb.gemspec +++ b/ldclient-rb.gemspec @@ -56,6 +56,4 @@ Gem::Specification.new do |spec| else spec.add_runtime_dependency "nio4r", "~> 1.1" # for maximum ruby version compatibility. end - - spec.add_runtime_dependency "waitutil", "0.2" end diff --git a/lib/ldclient-rb/ldclient.rb b/lib/ldclient-rb/ldclient.rb index ece7c4ec..68ca73d6 100644 --- a/lib/ldclient-rb/ldclient.rb +++ b/lib/ldclient-rb/ldclient.rb @@ -1,7 +1,7 @@ +require "concurrent/atomics" require "digest/sha1" require "logger" require "benchmark" -require "waitutil" require "json" require "openssl" @@ -41,7 +41,9 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5) requestor = Requestor.new(sdk_key, config) - if !@config.offline? + if @config.offline? + @update_processor = NullUpdateProcessor.new + else if @config.update_processor.nil? if @config.stream? @update_processor = StreamProcessor.new(sdk_key, config, requestor) @@ -53,15 +55,12 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5) else @update_processor = @config.update_processor end - @update_processor.start end - if !@config.offline? && wait_for_sec > 0 - begin - WaitUtil.wait_for_condition("LaunchDarkly client initialization", timeout_sec: wait_for_sec, delay_sec: 0.1) do - initialized? - end - rescue WaitUtil::TimeoutError + ready = @update_processor.start + if wait_for_sec > 0 + ok = ready.wait(wait_for_sec) + if !ok @config.logger.error { "[LDClient] Timeout encountered waiting for LaunchDarkly client initialization" } end end @@ -220,9 +219,7 @@ def all_flags(user) # @return [void] def close @config.logger.info { "[LDClient] Closing LaunchDarkly client..." } - if not @config.offline? - @update_processor.stop - end + @update_processor.stop @event_processor.stop @store.stop end @@ -255,4 +252,22 @@ def make_feature_event(flag, user, variation, value, default) private :evaluate, :log_exception, :sanitize_user, :make_feature_event end + + # + # Used internally when the client is offline. + # + class NullUpdateProcessor + def start + e = Concurrent::Event.new + e.set + e + end + + def initialized? + true + end + + def stop + end + end end diff --git a/lib/ldclient-rb/polling.rb b/lib/ldclient-rb/polling.rb index cc391bca..32494421 100644 --- a/lib/ldclient-rb/polling.rb +++ b/lib/ldclient-rb/polling.rb @@ -9,6 +9,7 @@ def initialize(config, requestor) @initialized = Concurrent::AtomicBoolean.new(false) @started = Concurrent::AtomicBoolean.new(false) @stopped = Concurrent::AtomicBoolean.new(false) + @ready = Concurrent::Event.new end def initialized? @@ -16,9 +17,10 @@ def initialized? end def start - return unless @started.make_true + return @ready unless @started.make_true @config.logger.info { "[LDClient] Initializing polling connection" } create_worker + @ready end def stop @@ -39,6 +41,7 @@ def poll }) if @initialized.make_true @config.logger.info { "[LDClient] Polling connection initialized" } + @ready.set end end end diff --git a/lib/ldclient-rb/stream.rb b/lib/ldclient-rb/stream.rb index 4ec1052a..0420fef6 100644 --- a/lib/ldclient-rb/stream.rb +++ b/lib/ldclient-rb/stream.rb @@ -24,6 +24,7 @@ def initialize(sdk_key, config, requestor) @initialized = Concurrent::AtomicBoolean.new(false) @started = Concurrent::AtomicBoolean.new(false) @stopped = Concurrent::AtomicBoolean.new(false) + @ready = Concurrent::Event.new end def initialized? @@ -31,7 +32,7 @@ def initialized? end def start - return unless @started.make_true + return @ready unless @started.make_true @config.logger.info { "[LDClient] Initializing stream connection" } @@ -55,6 +56,8 @@ def start end } end + + @ready end def stop @@ -83,6 +86,7 @@ def process_message(message, method) }) @initialized.make_true @config.logger.info { "[LDClient] Stream initialized" } + @ready.set elsif method == PATCH message = JSON.parse(message.data, symbolize_names: true) for kind in [FEATURES, SEGMENTS] diff --git a/spec/ldclient_spec.rb b/spec/ldclient_spec.rb index 405e0d53..8e4b5eb5 100644 --- a/spec/ldclient_spec.rb +++ b/spec/ldclient_spec.rb @@ -7,7 +7,7 @@ let(:offline_client) do subject.new("secret", offline_config) end - let(:update_processor) { NullUpdateProcessor.new } + let(:update_processor) { LaunchDarkly::NullUpdateProcessor.new } let(:config) { LaunchDarkly::Config.new({send_events: false, update_processor: update_processor}) } let(:client) do subject.new("secret", config) @@ -160,13 +160,4 @@ def event_processor expect(ep).not_to be_a(LaunchDarkly::NullEventProcessor) end end - - class NullUpdateProcessor - def start - end - - def initialized? - true - end - end end \ No newline at end of file From 3755d2b0ed32192a87ef3a261c86810a47f46892 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 13 Jun 2018 18:29:54 -0700 Subject: [PATCH 02/50] allow update processors to signal immediate failure --- lib/ldclient-rb/ldclient.rb | 2 ++ lib/ldclient-rb/polling.rb | 1 + lib/ldclient-rb/stream.rb | 1 + 3 files changed, 4 insertions(+) diff --git a/lib/ldclient-rb/ldclient.rb b/lib/ldclient-rb/ldclient.rb index 68ca73d6..5c0e872d 100644 --- a/lib/ldclient-rb/ldclient.rb +++ b/lib/ldclient-rb/ldclient.rb @@ -62,6 +62,8 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5) ok = ready.wait(wait_for_sec) if !ok @config.logger.error { "[LDClient] Timeout encountered waiting for LaunchDarkly client initialization" } + elsif !@update_processor.initialized? + @config.logger.error { "[LDClient] LaunchDarkly client initialization failed" } end end end diff --git a/lib/ldclient-rb/polling.rb b/lib/ldclient-rb/polling.rb index 32494421..18cbeded 100644 --- a/lib/ldclient-rb/polling.rb +++ b/lib/ldclient-rb/polling.rb @@ -59,6 +59,7 @@ def create_worker end rescue InvalidSDKKeyError @config.logger.error { "[LDClient] Received 401 error, no further polling requests will be made since SDK key is invalid" }; + @ready.set # if client was waiting on us, make it stop waiting - has no effect if already set stop rescue StandardError => exn @config.logger.error { "[LDClient] Exception while polling: #{exn.inspect}" } diff --git a/lib/ldclient-rb/stream.rb b/lib/ldclient-rb/stream.rb index 0420fef6..e5ff0e17 100644 --- a/lib/ldclient-rb/stream.rb +++ b/lib/ldclient-rb/stream.rb @@ -52,6 +52,7 @@ def start @config.logger.error { "[LDClient] Unexpected status code #{err[:status_code]} from streaming connection" } if err[:status_code] == 401 @config.logger.error { "[LDClient] Received 401 error, no further streaming connection will be made since SDK key is invalid" } + @ready.set # if client was waiting on us, make it stop waiting - has no effect if already set stop end } From adf1e0c0aa1f2f525c09634b4eae8527504e3c92 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 15 Jun 2018 12:50:22 -0700 Subject: [PATCH 03/50] fail permanently on most 4xx errors --- lib/ldclient-rb.rb | 1 + lib/ldclient-rb/events.rb | 20 +++++--- lib/ldclient-rb/polling.rb | 19 ++++---- lib/ldclient-rb/requestor.rb | 24 ++++------ lib/ldclient-rb/stream.rb | 7 +-- lib/ldclient-rb/util.rb | 18 ++++++++ spec/events_spec.rb | 33 +++++++++++-- spec/polling_spec.rb | 89 ++++++++++++++++++++++++++++++++++++ 8 files changed, 176 insertions(+), 35 deletions(-) create mode 100644 lib/ldclient-rb/util.rb create mode 100644 spec/polling_spec.rb diff --git a/lib/ldclient-rb.rb b/lib/ldclient-rb.rb index 541cf4d7..ce9d0307 100644 --- a/lib/ldclient-rb.rb +++ b/lib/ldclient-rb.rb @@ -1,4 +1,5 @@ require "ldclient-rb/version" +require "ldclient-rb/util" require "ldclient-rb/evaluation" require "ldclient-rb/ldclient" require "ldclient-rb/cache_store" diff --git a/lib/ldclient-rb/events.rb b/lib/ldclient-rb/events.rb index 96db3f46..0c9a0ece 100644 --- a/lib/ldclient-rb/events.rb +++ b/lib/ldclient-rb/events.rb @@ -222,17 +222,24 @@ def trigger_flush(buffer, flush_workers) if !payload.events.empty? || !payload.summary.counters.empty? # If all available worker threads are busy, success will be false and no job will be queued. success = flush_workers.post do - resp = EventPayloadSendTask.new.run(@sdk_key, @config, @client, payload, @formatter) - handle_response(resp) if !resp.nil? + begin + resp = EventPayloadSendTask.new.run(@sdk_key, @config, @client, payload, @formatter) + handle_response(resp) if !resp.nil? + rescue => e + @config.logger.warn { "[LDClient] Unexpected error in event processor: #{e.inspect}. \nTrace: #{e.backtrace}" } + end end buffer.clear if success # Reset our internal state, these events now belong to the flush worker end end def handle_response(res) - if res.status == 401 - @config.logger.error { "[LDClient] Received 401 error, no further events will be posted since SDK key is invalid" } - @disabled.value = true + if res.status >= 400 + message = Util.http_error_message(res.status, "event delivery", "some events were dropped") + @config.logger.error { "[LDClient] #{message}" } + if !Util.http_error_recoverable?(res.status) + @disabled.value = true + end else if !res.headers.nil? && res.headers.has_key?("Date") begin @@ -309,8 +316,7 @@ def run(sdk_key, config, client, payload, formatter) next end if res.status < 200 || res.status >= 300 - config.logger.error { "[LDClient] Unexpected status code while processing events: #{res.status}" } - if res.status >= 500 + if Util.http_error_recoverable?(res.status) next end end diff --git a/lib/ldclient-rb/polling.rb b/lib/ldclient-rb/polling.rb index 18cbeded..15965201 100644 --- a/lib/ldclient-rb/polling.rb +++ b/lib/ldclient-rb/polling.rb @@ -50,21 +50,24 @@ def create_worker @worker = Thread.new do @config.logger.debug { "[LDClient] Starting polling worker" } while !@stopped.value do + started_at = Time.now begin - started_at = Time.now poll - delta = @config.poll_interval - (Time.now - started_at) - if delta > 0 - sleep(delta) + rescue UnexpectedResponseError => e + message = Util.http_error_message(e.status, "polling request", "will retry") + @config.logger.error { "[LDClient] #{message}" }; + if !Util.http_error_recoverable?(e.status) + @ready.set # if client was waiting on us, make it stop waiting - has no effect if already set + stop end - rescue InvalidSDKKeyError - @config.logger.error { "[LDClient] Received 401 error, no further polling requests will be made since SDK key is invalid" }; - @ready.set # if client was waiting on us, make it stop waiting - has no effect if already set - stop rescue StandardError => exn @config.logger.error { "[LDClient] Exception while polling: #{exn.inspect}" } # TODO: log_exception(__method__.to_s, exn) end + delta = @config.poll_interval - (Time.now - started_at) + if delta > 0 + sleep(delta) + end end end end diff --git a/lib/ldclient-rb/requestor.rb b/lib/ldclient-rb/requestor.rb index abaab854..25cce121 100644 --- a/lib/ldclient-rb/requestor.rb +++ b/lib/ldclient-rb/requestor.rb @@ -4,7 +4,14 @@ module LaunchDarkly - class InvalidSDKKeyError < StandardError + class UnexpectedResponseError < StandardError + def initialize(status) + @status = status + end + + def status + @status + end end class Requestor @@ -13,7 +20,7 @@ def initialize(sdk_key, config) @config = config @client = Faraday.new do |builder| builder.use :http_cache, store: @config.cache_store - + builder.adapter :net_http_persistent end end @@ -44,19 +51,8 @@ def make_request(path) @config.logger.debug { "[LDClient] Got response from uri: #{uri}\n\tstatus code: #{res.status}\n\theaders: #{res.headers}\n\tbody: #{res.body}" } - if res.status == 401 - @config.logger.error { "[LDClient] Invalid SDK key" } - raise InvalidSDKKeyError - end - - if res.status == 404 - @config.logger.error { "[LDClient] Resource not found" } - return nil - end - if res.status < 200 || res.status >= 300 - @config.logger.error { "[LDClient] Unexpected status code #{res.status}" } - return nil + raise UnexpectedResponseError.new(res.status) end JSON.parse(res.body, symbolize_names: true) diff --git a/lib/ldclient-rb/stream.rb b/lib/ldclient-rb/stream.rb index e5ff0e17..33496e9b 100644 --- a/lib/ldclient-rb/stream.rb +++ b/lib/ldclient-rb/stream.rb @@ -49,9 +49,10 @@ def start conn.on(INDIRECT_PUT) { |message| process_message(message, INDIRECT_PUT) } conn.on(INDIRECT_PATCH) { |message| process_message(message, INDIRECT_PATCH) } conn.on_error { |err| - @config.logger.error { "[LDClient] Unexpected status code #{err[:status_code]} from streaming connection" } - if err[:status_code] == 401 - @config.logger.error { "[LDClient] Received 401 error, no further streaming connection will be made since SDK key is invalid" } + status = err[:status_code] + message = Util.http_error_message(status, "streaming connection", "will retry") + @config.logger.error { "[LDClient] #{message}" } + if !Util.http_error_recoverable?(status) @ready.set # if client was waiting on us, make it stop waiting - has no effect if already set stop end diff --git a/lib/ldclient-rb/util.rb b/lib/ldclient-rb/util.rb new file mode 100644 index 00000000..6ba70dbc --- /dev/null +++ b/lib/ldclient-rb/util.rb @@ -0,0 +1,18 @@ + +module LaunchDarkly + module Util + def self.http_error_recoverable?(status) + if status >= 400 && status < 500 + status == 400 || status == 408 || status == 429 + else + true + end + end + + def self.http_error_message(status, context, recoverable_message) + desc = (status == 401 || status == 403) ? " (invalid SDK key)" : "" + message = Util.http_error_recoverable?(status) ? recoverable_message : "giving up permanently" + "HTTP error #{status}#{desc} for #{context} - #{message}" + end + end +end diff --git a/spec/events_spec.rb b/spec/events_spec.rb index cbce1fbe..56bd14a2 100644 --- a/spec/events_spec.rb +++ b/spec/events_spec.rb @@ -351,12 +351,12 @@ expect(hc.get_request.headers["Authorization"]).to eq "sdk_key" end - it "stops posting events after getting a 401 error" do + def verify_unrecoverable_http_error(status) @ep = subject.new("sdk_key", default_config, hc) e = { kind: "identify", user: user } @ep.add_event(e) - hc.set_response_status(401) + hc.set_response_status(status) @ep.flush @ep.wait_until_inactive expect(hc.get_request).not_to be_nil @@ -368,7 +368,7 @@ expect(hc.get_request).to be_nil end - it "retries flush once after 5xx error" do + def verify_recoverable_http_error(status) @ep = subject.new("sdk_key", default_config, hc) e = { kind: "identify", user: user } @ep.add_event(e) @@ -380,6 +380,33 @@ expect(hc.get_request).not_to be_nil expect(hc.get_request).not_to be_nil expect(hc.get_request).to be_nil # no 3rd request + + # now verify that a subsequent flush still generates a request + hc.reset + @ep.add_event(e) + @ep.flush + @ep.wait_until_inactive + expect(hc.get_request).not_to be_nil + end + + it "stops posting events after getting a 401 error" do + verify_unrecoverable_http_error(401) + end + + it "stops posting events after getting a 403 error" do + verify_unrecoverable_http_error(403) + end + + it "retries after 408 error" do + verify_recoverable_http_error(408) + end + + it "retries after 429 error" do + verify_recoverable_http_error(429) + end + + it "retries after 503 error" do + verify_recoverable_http_error(503) end it "retries flush once after connection error" do diff --git a/spec/polling_spec.rb b/spec/polling_spec.rb new file mode 100644 index 00000000..8183b8c3 --- /dev/null +++ b/spec/polling_spec.rb @@ -0,0 +1,89 @@ +require "spec_helper" +require 'ostruct' + +describe LaunchDarkly::PollingProcessor do + subject { LaunchDarkly::PollingProcessor } + let(:store) { LaunchDarkly::InMemoryFeatureStore.new } + let(:config) { LaunchDarkly::Config.new(feature_store: store) } + let(:requestor) { double() } + let(:processor) { subject.new(config, requestor) } + + describe 'successful request' do + flag = { key: 'flagkey', version: 1 } + segment = { key: 'segkey', version: 1 } + all_data = { + flags: { + flagkey: flag + }, + segments: { + segkey: segment + } + } + + it 'puts feature data in store' do + allow(requestor).to receive(:request_all_data).and_return(all_data) + ready = processor.start + ready.wait + expect(store.get(LaunchDarkly::FEATURES, "flagkey")).to eq(flag) + expect(store.get(LaunchDarkly::SEGMENTS, "segkey")).to eq(segment) + end + + it 'sets initialized to true' do + allow(requestor).to receive(:request_all_data).and_return(all_data) + ready = processor.start + ready.wait + expect(processor.initialized?).to be true + expect(store.initialized?).to be true + end + end + + describe 'connection error' do + it 'does not cause immediate failure, does not set initialized' do + allow(requestor).to receive(:request_all_data).and_raise(StandardError.new("test error")) + ready = processor.start + finished = ready.wait(0.2) + expect(finished).to be false + expect(processor.initialized?).to be false + expect(store.initialized?).to be false + end + end + + describe 'HTTP errors' do + def verify_unrecoverable_http_error(status) + allow(requestor).to receive(:request_all_data).and_raise(LaunchDarkly::UnexpectedResponseError.new(status)) + ready = processor.start + finished = ready.wait(0.2) + expect(finished).to be true + expect(processor.initialized?).to be false + end + + def verify_recoverable_http_error(status) + allow(requestor).to receive(:request_all_data).and_raise(LaunchDarkly::UnexpectedResponseError.new(status)) + ready = processor.start + finished = ready.wait(0.2) + expect(finished).to be false + expect(processor.initialized?).to be false + end + + it 'stops immediately for error 401' do + verify_unrecoverable_http_error(401) + end + + it 'stops immediately for error 403' do + verify_unrecoverable_http_error(403) + end + + it 'does not stop immediately for error 408' do + verify_recoverable_http_error(408) + end + + it 'does not stop immediately for error 429' do + verify_recoverable_http_error(429) + end + + it 'does not stop immediately for error 503' do + verify_recoverable_http_error(503) + end + end +end + From c281bac154d59fcf9d2d7acd95102e496a0a9b5b Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 13:19:22 -0700 Subject: [PATCH 04/50] reimplement SSE client without Celluloid --- ldclient-rb.gemspec | 4 +- lib/ldclient-rb.rb | 2 + lib/ldclient-rb/backoff.rb | 38 ++++ lib/ldclient-rb/sse_client.rb | 307 +++++++++++++++++++++++++++++++++ lib/ldclient-rb/stream.rb | 38 ++-- lib/ldclient-rb/user_filter.rb | 1 + 6 files changed, 365 insertions(+), 25 deletions(-) create mode 100644 lib/ldclient-rb/backoff.rb create mode 100644 lib/ldclient-rb/sse_client.rb diff --git a/ldclient-rb.gemspec b/ldclient-rb.gemspec index 5eb6ad18..e34fd912 100644 --- a/ldclient-rb.gemspec +++ b/ldclient-rb.gemspec @@ -48,8 +48,8 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency "net-http-persistent", "~> 2.9" spec.add_runtime_dependency "concurrent-ruby", "~> 1.0.4" spec.add_runtime_dependency "hashdiff", "~> 0.2" - spec.add_runtime_dependency "ld-celluloid-eventsource", "~> 0.11.0" - spec.add_runtime_dependency "celluloid", "~> 0.18.0.pre" # transitive dep; specified here for more control + spec.add_runtime_dependency "http_tools", '~> 0.4.5' + spec.add_runtime_dependency "socketry", "~> 0.5.1" if RUBY_VERSION >= "2.2.2" spec.add_runtime_dependency "nio4r", "< 3" # for maximum ruby version compatibility. diff --git a/lib/ldclient-rb.rb b/lib/ldclient-rb.rb index ce9d0307..e7762982 100644 --- a/lib/ldclient-rb.rb +++ b/lib/ldclient-rb.rb @@ -8,6 +8,8 @@ require "ldclient-rb/in_memory_store" require "ldclient-rb/config" require "ldclient-rb/newrelic" +require "ldclient-rb/backoff" +require "ldclient-rb/sse_client" require "ldclient-rb/stream" require "ldclient-rb/polling" require "ldclient-rb/user_filter" diff --git a/lib/ldclient-rb/backoff.rb b/lib/ldclient-rb/backoff.rb new file mode 100644 index 00000000..d46592c1 --- /dev/null +++ b/lib/ldclient-rb/backoff.rb @@ -0,0 +1,38 @@ + +module LaunchDarkly + # + # A simple backoff algorithm that can be reset at any time, or reset itself after a given + # interval has passed without errors. + # + class Backoff + def initialize(base_interval, max_interval, auto_reset_interval = 60) + @base_interval = base_interval + @max_interval = max_interval + @auto_reset_interval = auto_reset_interval + @attempts = 0 + @last_good_time = nil + @jitter_rand = Random.new + end + + attr_accessor :base_interval + + def next_interval + if !@last_good_time.nil? && (Time.now.to_i - @last_good_time) >= @auto_reset_interval + @attempts = 0 + end + @last_good_time = nil + if @attempts == 0 + @attempts += 1 + return 0 + end + @last_good_time = nil + target = ([@base_interval * (2 ** @attempts), @max_interval].min).to_f + @attempts += 1 + (target / 2) + @jitter_rand.rand(target / 2) + end + + def mark_success + @last_good_time = Time.now.to_i if @last_good_time.nil? + end + end +end diff --git a/lib/ldclient-rb/sse_client.rb b/lib/ldclient-rb/sse_client.rb new file mode 100644 index 00000000..c94cd38f --- /dev/null +++ b/lib/ldclient-rb/sse_client.rb @@ -0,0 +1,307 @@ +require "concurrent/atomics" +require "http_tools" +require "logger" +require "socketry" +require "thread" +require "uri" + +module LaunchDarkly + # + # A lightweight Server-Sent Events implementation, relying on two gems: socketry for sockets with + # read timeouts, and http_tools for HTTP response parsing. The overall logic is based on + # [https://github.com/Tonkpils/celluloid-eventsource]. + # + class SSEClient + DEFAULT_CONNECT_TIMEOUT = 10 + DEFAULT_READ_TIMEOUT = 300 + DEFAULT_CHUNK_SIZE = 10000 + DEFAULT_RECONNECT_TIME = 1 + MAX_RECONNECT_TIME = 30 + + def initialize(uri, options = {}) + @uri = URI(uri) + @stopped = Concurrent::AtomicBoolean.new(false) + + @headers = options[:headers].clone || {} + @connect_timeout = options[:connect_timeout] || DEFAULT_CONNECT_TIMEOUT + @read_timeout = options[:read_timeout] || DEFAULT_READ_TIMEOUT + @chunk_size = options[:chunk_size] || DEFAULT_CHUNK_SIZE + @logger = options[:logger] || default_logger + + proxy = ENV['HTTP_PROXY'] || ENV['http_proxy'] || options[:proxy] + if proxy + proxyUri = URI(proxy) + if proxyUri.scheme == 'http' || proxyUri.scheme == 'https' + @proxy = proxyUri + end + end + + reconnect_time = options[:reconnect_time] || DEFAULT_RECONNECT_TIME + @backoff = Backoff.new(reconnect_time, MAX_RECONNECT_TIME) + + @on = { event: ->(_) {}, error: ->(_) {} } + @last_id = nil + + yield self if block_given? + + @worker = Thread.new do + run_stream + end + end + + def on(event_name, &action) + @on[event_name.to_sym] = action + end + + def on_event(&action) + @on[:event] = action + end + + def on_error(&action) + @on[:error] = action + end + + def close + if @stopped.make_true + @worker.raise ShutdownSignal.new + end + end + + private + + def default_logger + log = ::Logger.new($stdout) + log.level = ::Logger::WARN + log + end + + def run_stream + while !@stopped.value + begin + connect + read_stream + rescue ShutdownSignal + return + rescue StandardError => e + @logger.error("Unexpected error from event source: #{e.inspect}") + end + end + end + + def connect + loop do + interval = @backoff.next_interval + if interval > 0 + @logger.warn("Will retry connection after #{'%.3f' % interval} seconds") + sleep(interval) + end + begin + if @proxy + @socket = open_socket(@proxy) + @socket.write(build_proxy_request) + else + @socket = open_socket(@uri) + end + @socket.write(build_request) + + @parser = ResponseParser.new + while !@parser.headers? && read_chunk(@parser) + end + + if @parser.status != 200 + # Consume response body if present + while !@parser.done? && read_chunk(@parser) + end + close + @on[:error].call({status_code: @parser.status, body: @parser.buffer}) + return false + end + if @parser.headers["content-type"] && @parser.headers["content-type"].start_with?("text/event-stream") + return + end + @logger.error("Event source returned unexpected content type '#{@parser.headers["content-type"]}'") + rescue StandardError => e + @logger.error("Unexpected error from event source: #{e.inspect}") + end + end + end + + def disconnect + @socket.close if @socket + @socket = nil + end + + def read_stream + event_parser = EventParser.new + event_parser.on(:event) do |event| + dispatch_event(event) + end + event_parser.on(:retry) do |interval| + @backoff.base_interval = interval / 1000 + end + loop do + line = @parser.readline + if line.nil? + return if !read_chunk(@parser) + else + event_parser << line + end + end + end + + def dispatch_event(event) + @last_id = event.id + @backoff.mark_success + @on[:event].call(event) + end + + def open_socket(uri) + if uri.scheme == 'https' + Socketry::SSL::Socket.connect(uri.host, uri.port, timeout: @connect_timeout) + else + Socketry::TCP::Socket.connect(uri.host, uri.port, timeout: @connect_timeout) + end + end + + def build_request + buf = "GET #{@uri.request_uri} HTTP/1.1\r\n" + all_headers = @headers.merge(base_headers) + all_headers.each { |k, v| + buf << "#{k}: #{v}\r\n" + } + buf + "\r\n" + end + + def base_headers + h = { + 'Accept' => 'text/event-stream', + 'Cache-Control' => 'no-cache', + 'Host' => @uri.host + } + h['Last-Event-Id'] = @last_id if !@last_id.nil? + h + end + + def build_proxy_request + buf = "CONNECT #{@uri.host}:#{@uri.port} HTTP/1.1\r\n" + buf << "Host: #{@uri.host}:#{@uri.port}\r\n" + if @proxy.user || @proxy.password + encoded_credentials = Base64.strict_encode64([@proxy.user || '', @proxy.password || ''].join(":")) + buf << "Proxy-Authorization: Basic #{encoded_credentials}\r\n" + end + buf << "\r\n" + buf + end + + def read_chunk(sink) + data = @socket.readpartial(@chunk_size, timeout: @read_timeout) + return false if data == :eof + sink << data + true + end + end + + class ShutdownSignal < StandardError + end + + class ResponseParser + def initialize + @parser = HTTPTools::Parser.new + @done = false + @buffer = "" + @lock = Mutex.new + @parser.on(:header) do + @headers = Hash[@parser.header.map { |k,v| [k.downcase, v] }] + end + @parser.on(:stream) do |data| + @lock.synchronize do + @buffer << data + end + end + @parser.on(:finish) do + @done = true + end + end + + attr_reader :headers + attr_reader :buffer + + def <<(data) + @parser << data + end + + def done? + @done + end + + def headers? + !!@headers + end + + def status + @parser.status_code + end + + def readline() + @lock.synchronize do + i = @buffer.index(/[\r\n]/) + return nil if i.nil? + i += 1 if (@buffer[i] == "\r" && i < @buffer.length - 1 && @buffer[i + 1] == "\n") + @buffer.slice!(0, i + 1).force_encoding(Encoding::UTF_8) + end + end + end + + SSEEvent = Struct.new(:type, :data, :id) + + class EventParser + def initialize + @on = { event: ->(_) {}, retry: ->(_) {} } + reset + end + + def on(event_name, &action) + @on[event_name] = action + end + + def <<(line) + line.chomp! + if line.empty? + return if @data.empty? + event = SSEEvent.new(@type || :message, @data, @id) + reset + @on[:event].call(event) + else + case line + when /^:.*$/ + when /^(\w+): ?(.*)$/ + process_field($1, $2) + end + end + end + + private + + def reset + @id = nil + @type = nil + @data = "" + end + + def process_field(name, value) + case name + when "event" + @type = value.to_sym + when "data" + @data << "\n" if !@data.empty? + @data << value + when "id" + @id = field_value + when "retry" + if /^(?\d+)$/ =~ value + @on_retry.call(num.to_i) + end + end + end + end +end diff --git a/lib/ldclient-rb/stream.rb b/lib/ldclient-rb/stream.rb index 33496e9b..4f3f2652 100644 --- a/lib/ldclient-rb/stream.rb +++ b/lib/ldclient-rb/stream.rb @@ -1,6 +1,5 @@ require "concurrent/atomics" require "json" -require "celluloid/eventsource" module LaunchDarkly PUT = :put @@ -36,18 +35,18 @@ def start @config.logger.info { "[LDClient] Initializing stream connection" } - headers = - { + headers = { 'Authorization' => @sdk_key, 'User-Agent' => 'RubyClient/' + LaunchDarkly::VERSION } - opts = {:headers => headers, :with_credentials => true, :proxy => @config.proxy, :read_timeout => READ_TIMEOUT_SECONDS} - @es = Celluloid::EventSource.new(@config.stream_uri + "/all", opts) do |conn| - conn.on(PUT) { |message| process_message(message, PUT) } - conn.on(PATCH) { |message| process_message(message, PATCH) } - conn.on(DELETE) { |message| process_message(message, DELETE) } - conn.on(INDIRECT_PUT) { |message| process_message(message, INDIRECT_PUT) } - conn.on(INDIRECT_PATCH) { |message| process_message(message, INDIRECT_PATCH) } + opts = { + headers: headers, + proxy: @config.proxy, + read_timeout: READ_TIMEOUT_SECONDS, + logger: @config.logger + } + @es = SSEClient.new(@config.stream_uri + "/all", opts) do |conn| + conn.on_event { |event| process_message(event, event.type) } conn.on_error { |err| status = err[:status_code] message = Util.http_error_message(status, "streaming connection", "will retry") @@ -69,13 +68,6 @@ def stop end end - def stop - if @stopped.make_true - @es.close - @config.logger.info { "[LDClient] Stream connection stopped" } - end - end - private def process_message(message, method) @@ -90,20 +82,20 @@ def process_message(message, method) @config.logger.info { "[LDClient] Stream initialized" } @ready.set elsif method == PATCH - message = JSON.parse(message.data, symbolize_names: true) + data = JSON.parse(message.data, symbolize_names: true) for kind in [FEATURES, SEGMENTS] - key = key_for_path(kind, message[:path]) + key = key_for_path(kind, data[:path]) if key - @feature_store.upsert(kind, message[:data]) + @feature_store.upsert(kind, data[:data]) break end end elsif method == DELETE - message = JSON.parse(message.data, symbolize_names: true) + data = JSON.parse(message.data, symbolize_names: true) for kind in [FEATURES, SEGMENTS] - key = key_for_path(kind, message[:path]) + key = key_for_path(kind, data[:path]) if key - @feature_store.delete(kind, key, message[:version]) + @feature_store.delete(kind, key, data[:version]) break end end diff --git a/lib/ldclient-rb/user_filter.rb b/lib/ldclient-rb/user_filter.rb index 9f4bce82..449d8d2e 100644 --- a/lib/ldclient-rb/user_filter.rb +++ b/lib/ldclient-rb/user_filter.rb @@ -1,4 +1,5 @@ require "json" +require "set" module LaunchDarkly class UserFilter From 58e78bb7a7bd55f70d3457e15b5515688dfa0965 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 13:36:59 -0700 Subject: [PATCH 05/50] rm nio4r dependency --- ldclient-rb.gemspec | 6 ------ 1 file changed, 6 deletions(-) diff --git a/ldclient-rb.gemspec b/ldclient-rb.gemspec index e34fd912..3f9d2638 100644 --- a/ldclient-rb.gemspec +++ b/ldclient-rb.gemspec @@ -50,10 +50,4 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency "hashdiff", "~> 0.2" spec.add_runtime_dependency "http_tools", '~> 0.4.5' spec.add_runtime_dependency "socketry", "~> 0.5.1" - - if RUBY_VERSION >= "2.2.2" - spec.add_runtime_dependency "nio4r", "< 3" # for maximum ruby version compatibility. - else - spec.add_runtime_dependency "nio4r", "~> 1.1" # for maximum ruby version compatibility. - end end From bffde7fe0897241d7b5f17fbc30eec60319053f3 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 14:21:36 -0700 Subject: [PATCH 06/50] refactor for better encapsulation of HTTP logic --- lib/ldclient-rb/sse_client.rb | 209 +++++++++++++++++----------------- 1 file changed, 107 insertions(+), 102 deletions(-) diff --git a/lib/ldclient-rb/sse_client.rb b/lib/ldclient-rb/sse_client.rb index c94cd38f..b921d9b2 100644 --- a/lib/ldclient-rb/sse_client.rb +++ b/lib/ldclient-rb/sse_client.rb @@ -14,7 +14,6 @@ module LaunchDarkly class SSEClient DEFAULT_CONNECT_TIMEOUT = 10 DEFAULT_READ_TIMEOUT = 300 - DEFAULT_CHUNK_SIZE = 10000 DEFAULT_RECONNECT_TIME = 1 MAX_RECONNECT_TIME = 30 @@ -25,7 +24,6 @@ def initialize(uri, options = {}) @headers = options[:headers].clone || {} @connect_timeout = options[:connect_timeout] || DEFAULT_CONNECT_TIMEOUT @read_timeout = options[:read_timeout] || DEFAULT_READ_TIMEOUT - @chunk_size = options[:chunk_size] || DEFAULT_CHUNK_SIZE @logger = options[:logger] || default_logger proxy = ENV['HTTP_PROXY'] || ENV['http_proxy'] || options[:proxy] @@ -77,13 +75,15 @@ def default_logger def run_stream while !@stopped.value + cxn = nil begin - connect - read_stream + cxn = connect + read_stream(cxn) rescue ShutdownSignal return rescue StandardError => e - @logger.error("Unexpected error from event source: #{e.inspect}") + @logger.error("Unexpected error from event source: #{e.inspect} #{e.backtrace}") + cxn.close if !cxn.nil? end end end @@ -96,42 +96,24 @@ def connect sleep(interval) end begin - if @proxy - @socket = open_socket(@proxy) - @socket.write(build_proxy_request) - else - @socket = open_socket(@uri) - end - @socket.write(build_request) - - @parser = ResponseParser.new - while !@parser.headers? && read_chunk(@parser) - end - - if @parser.status != 200 - # Consume response body if present - while !@parser.done? && read_chunk(@parser) - end - close - @on[:error].call({status_code: @parser.status, body: @parser.buffer}) - return false - end - if @parser.headers["content-type"] && @parser.headers["content-type"].start_with?("text/event-stream") - return + cxn = StreamingHTTPConnection.new(@uri, @proxy, build_headers, @connect_timeout, @read_timeout) + resp_status, resp_headers = cxn.read_headers + if resp_status != 200 + body = cxn.consume_body + cxn.close + @on[:error].call({status_code: resp_status, body: body}) + elsif resp_headers["content-type"] && resp_headers["content-type"].start_with?("text/event-stream") + return cxn end - @logger.error("Event source returned unexpected content type '#{@parser.headers["content-type"]}'") + @logger.error("Event source returned unexpected content type '#{resp_headers["content-type"]}'") rescue StandardError => e - @logger.error("Unexpected error from event source: #{e.inspect}") + @logger.error("Unexpected error from event source: #{e.inspect} #{e.backtrace}") + cxn.close if !cxn.nil? end end end - def disconnect - @socket.close if @socket - @socket = nil - end - - def read_stream + def read_stream(cxn) event_parser = EventParser.new event_parser.on(:event) do |event| dispatch_event(event) @@ -139,13 +121,8 @@ def read_stream event_parser.on(:retry) do |interval| @backoff.base_interval = interval / 1000 end - loop do - line = @parser.readline - if line.nil? - return if !read_chunk(@parser) - else - event_parser << line - end + cxn.read_lines.each do |line| + event_parser << line end end @@ -155,99 +132,127 @@ def dispatch_event(event) @on[:event].call(event) end - def open_socket(uri) - if uri.scheme == 'https' - Socketry::SSL::Socket.connect(uri.host, uri.port, timeout: @connect_timeout) - else - Socketry::TCP::Socket.connect(uri.host, uri.port, timeout: @connect_timeout) - end - end - - def build_request - buf = "GET #{@uri.request_uri} HTTP/1.1\r\n" - all_headers = @headers.merge(base_headers) - all_headers.each { |k, v| - buf << "#{k}: #{v}\r\n" - } - buf + "\r\n" - end - - def base_headers + def build_headers h = { 'Accept' => 'text/event-stream', 'Cache-Control' => 'no-cache', 'Host' => @uri.host } h['Last-Event-Id'] = @last_id if !@last_id.nil? - h - end - - def build_proxy_request - buf = "CONNECT #{@uri.host}:#{@uri.port} HTTP/1.1\r\n" - buf << "Host: #{@uri.host}:#{@uri.port}\r\n" - if @proxy.user || @proxy.password - encoded_credentials = Base64.strict_encode64([@proxy.user || '', @proxy.password || ''].join(":")) - buf << "Proxy-Authorization: Basic #{encoded_credentials}\r\n" - end - buf << "\r\n" - buf - end - - def read_chunk(sink) - data = @socket.readpartial(@chunk_size, timeout: @read_timeout) - return false if data == :eof - sink << data - true + h.merge(@headers) end end class ShutdownSignal < StandardError end - class ResponseParser - def initialize + class StreamingHTTPConnection + DEFAULT_CHUNK_SIZE = 10000 + + def initialize(uri, proxy, headers, connect_timeout, read_timeout) @parser = HTTPTools::Parser.new - @done = false + @headers = nil @buffer = "" + @read_timeout = read_timeout + @done = false @lock = Mutex.new + @parser.on(:header) do @headers = Hash[@parser.header.map { |k,v| [k.downcase, v] }] end @parser.on(:stream) do |data| - @lock.synchronize do - @buffer << data - end + @lock.synchronize { @buffer << data } end @parser.on(:finish) do - @done = true + @lock.synchronize { @done = true } end + + if proxy + @socket = open_socket(proxy, connect_timeout) + @socket.write(build_proxy_request(uri, proxy)) + else + @socket = open_socket(uri, connect_timeout) + end + + @socket.write(build_request(uri, headers)) end - attr_reader :headers - attr_reader :buffer + def close + @socket.close if @socket + @socket = nil + end - def <<(data) - @parser << data + def read_headers + while @headers.nil? && read_chunk + end + [@parser.status_code, @headers] + end + + def read_lines + Enumerator.new do |gen| + loop do + line = read_line + break if line.nil? + gen.yield line + end + end + end + + def consume_body + loop do + @lock.synchronize { break if @done } + break if !read_chunk + end + @buffer end - def done? - @done + private + + def open_socket(uri, connect_timeout) + if uri.scheme == 'https' + Socketry::SSL::Socket.connect(uri.host, uri.port, timeout: connect_timeout) + else + Socketry::TCP::Socket.connect(uri.host, uri.port, timeout: connect_timeout) + end end - def headers? - !!@headers + def build_request(uri, headers) + ret = "GET #{uri.request_uri} HTTP/1.1\r\n" + headers.each { |k, v| + ret << "#{k}: #{v}\r\n" + } + ret + "\r\n" end - def status - @parser.status_code + def build_proxy_request(uri, proxy) + ret = "CONNECT #{uri.host}:#{uri.port} HTTP/1.1\r\n" + ret << "Host: #{uri.host}:#{uri.port}\r\n" + if proxy.user || proxy.password + encoded_credentials = Base64.strict_encode64([proxy.user || '', proxy.password || ''].join(":")) + ret << "Proxy-Authorization: Basic #{encoded_credentials}\r\n" + end + ret << "\r\n" + ret end - def readline() - @lock.synchronize do - i = @buffer.index(/[\r\n]/) - return nil if i.nil? - i += 1 if (@buffer[i] == "\r" && i < @buffer.length - 1 && @buffer[i + 1] == "\n") - @buffer.slice!(0, i + 1).force_encoding(Encoding::UTF_8) + def read_chunk + data = @socket.readpartial(DEFAULT_CHUNK_SIZE, timeout: @read_timeout) + return false if data == :eof + @parser << data + true + end + + def read_line + loop do + @lock.synchronize do + return nil if @done + i = @buffer.index(/[\r\n]/) + if !i.nil? + i += 1 if (@buffer[i] == "\r" && i < @buffer.length - 1 && @buffer[i + 1] == "\n") + return @buffer.slice!(0, i + 1).force_encoding(Encoding::UTF_8) + end + end + return nil if !read_chunk end end end From 38ac599cb06d02baf940ea0f6a72c29cc38541d1 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 14:34:22 -0700 Subject: [PATCH 07/50] rm stacktraces --- lib/ldclient-rb/sse_client.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/ldclient-rb/sse_client.rb b/lib/ldclient-rb/sse_client.rb index b921d9b2..6a77df5b 100644 --- a/lib/ldclient-rb/sse_client.rb +++ b/lib/ldclient-rb/sse_client.rb @@ -82,7 +82,7 @@ def run_stream rescue ShutdownSignal return rescue StandardError => e - @logger.error("Unexpected error from event source: #{e.inspect} #{e.backtrace}") + @logger.error("Unexpected error from event source: #{e.inspect}") cxn.close if !cxn.nil? end end @@ -107,7 +107,7 @@ def connect end @logger.error("Event source returned unexpected content type '#{resp_headers["content-type"]}'") rescue StandardError => e - @logger.error("Unexpected error from event source: #{e.inspect} #{e.backtrace}") + @logger.error("Unexpected error from event source: #{e.inspect}") cxn.close if !cxn.nil? end end From f87db975c7da6706cc9d4e059a3ef4e73552e344 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 14:35:34 -0700 Subject: [PATCH 08/50] better logging --- lib/ldclient-rb/sse_client.rb | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/ldclient-rb/sse_client.rb b/lib/ldclient-rb/sse_client.rb index 6a77df5b..ea5a260f 100644 --- a/lib/ldclient-rb/sse_client.rb +++ b/lib/ldclient-rb/sse_client.rb @@ -82,7 +82,8 @@ def run_stream rescue ShutdownSignal return rescue StandardError => e - @logger.error("Unexpected error from event source: #{e.inspect}") + @logger.error { "Unexpected error from event source: #{e.inspect}" } + @logger.debug { "Exception trace: #{e.backtrace}" } cxn.close if !cxn.nil? end end @@ -92,7 +93,7 @@ def connect loop do interval = @backoff.next_interval if interval > 0 - @logger.warn("Will retry connection after #{'%.3f' % interval} seconds") + @logger.warn { "Will retry connection after #{'%.3f' % interval} seconds" } sleep(interval) end begin @@ -105,9 +106,10 @@ def connect elsif resp_headers["content-type"] && resp_headers["content-type"].start_with?("text/event-stream") return cxn end - @logger.error("Event source returned unexpected content type '#{resp_headers["content-type"]}'") + @logger.error { "Event source returned unexpected content type '#{resp_headers["content-type"]}'" } rescue StandardError => e - @logger.error("Unexpected error from event source: #{e.inspect}") + @logger.error { "Unexpected error from event source: #{e.inspect}" } + @logger.debug { "Exception trace: #{e.backtrace}" } cxn.close if !cxn.nil? end end From a2cde5e5a9b3dda8cc83af145b0882db7501a35b Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 14:36:30 -0700 Subject: [PATCH 09/50] ensure connection is closed --- lib/ldclient-rb/sse_client.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ldclient-rb/sse_client.rb b/lib/ldclient-rb/sse_client.rb index ea5a260f..c905a31a 100644 --- a/lib/ldclient-rb/sse_client.rb +++ b/lib/ldclient-rb/sse_client.rb @@ -84,8 +84,8 @@ def run_stream rescue StandardError => e @logger.error { "Unexpected error from event source: #{e.inspect}" } @logger.debug { "Exception trace: #{e.backtrace}" } - cxn.close if !cxn.nil? end + cxn.close if !cxn.nil? end end From 42d561035ec609622716c760ce35aa8747d4d7ce Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 14:38:52 -0700 Subject: [PATCH 10/50] comments --- lib/ldclient-rb/sse_client.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/ldclient-rb/sse_client.rb b/lib/ldclient-rb/sse_client.rb index c905a31a..a7bcc5d7 100644 --- a/lib/ldclient-rb/sse_client.rb +++ b/lib/ldclient-rb/sse_client.rb @@ -184,12 +184,15 @@ def close @socket = nil end + # Blocks until status code and headers have been read, and returns them. def read_headers while @headers.nil? && read_chunk end [@parser.status_code, @headers] end + # Generator that returns one line at a time (delimited by \r, \n, or \r\n) until the + # response is fully consumed or the socket is closed. def read_lines Enumerator.new do |gen| loop do @@ -200,6 +203,7 @@ def read_lines end end + # Consumes the entire response body and returns it. def consume_body loop do @lock.synchronize { break if @done } From 4c9cc042434ce11e819b643d8b1008bed7f14b51 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 14:39:04 -0700 Subject: [PATCH 11/50] don't build on unsupported Ruby versions --- .circleci/config.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index c94d18e4..6433b4d3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -54,7 +54,8 @@ jobs: machine: image: circleci/classic:latest environment: - - RUBIES: "ruby-2.1.9 ruby-2.0.0 ruby-1.9.3 jruby-9.0.5.0" + #- RUBIES: "ruby-2.1.9 ruby-2.0.0 ruby-1.9.3 jruby-9.0.5.0" + - RUBIES: "jruby-9.0.5.0" # minimum Ruby version is 2.2.6 steps: - run: sudo apt-get -q update - run: sudo apt-get -qy install redis-server From a06ebc1a8ff33213d8586f7a89e7ee376c7728f1 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 14:43:47 -0700 Subject: [PATCH 12/50] don't build for JRuby 9.0 either --- .circleci/config.yml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 6433b4d3..5fe3f4a0 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -4,7 +4,7 @@ workflows: version: 2 test: jobs: - - test-misc-rubies + # - test-misc-rubies # none of these older Ruby versions are supported on this branch - test-2.2 - test-2.3 - test-2.4 @@ -54,8 +54,7 @@ jobs: machine: image: circleci/classic:latest environment: - #- RUBIES: "ruby-2.1.9 ruby-2.0.0 ruby-1.9.3 jruby-9.0.5.0" - - RUBIES: "jruby-9.0.5.0" # minimum Ruby version is 2.2.6 + - RUBIES: "ruby-2.1.9 ruby-2.0.0 ruby-1.9.3 jruby-9.0.5.0" steps: - run: sudo apt-get -q update - run: sudo apt-get -qy install redis-server From a2aafd922d792123cf9bde29dafd9523f77efc79 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 15:56:00 -0700 Subject: [PATCH 13/50] read status code & headers in constructor; add comments --- lib/ldclient-rb/sse_client.rb | 63 +++++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 18 deletions(-) diff --git a/lib/ldclient-rb/sse_client.rb b/lib/ldclient-rb/sse_client.rb index a7bcc5d7..c1b1c72f 100644 --- a/lib/ldclient-rb/sse_client.rb +++ b/lib/ldclient-rb/sse_client.rb @@ -89,6 +89,7 @@ def run_stream end end + # Try to establish a streaming connection. Returns the StreamingHTTPConnection object if successful. def connect loop do interval = @backoff.next_interval @@ -98,23 +99,24 @@ def connect end begin cxn = StreamingHTTPConnection.new(@uri, @proxy, build_headers, @connect_timeout, @read_timeout) - resp_status, resp_headers = cxn.read_headers - if resp_status != 200 - body = cxn.consume_body + if cxn.status != 200 + body = cxn.consume_body # grab the whole response body in case it has error details cxn.close - @on[:error].call({status_code: resp_status, body: body}) - elsif resp_headers["content-type"] && resp_headers["content-type"].start_with?("text/event-stream") - return cxn + @on[:error].call({status_code: cxn.status, body: body}) + elsif cxn.headers["content-type"] && cxn.headers["content-type"].start_with?("text/event-stream") + return cxn # we're good to proceed end - @logger.error { "Event source returned unexpected content type '#{resp_headers["content-type"]}'" } + @logger.error { "Event source returned unexpected content type '#{cxn.headers["content-type"]}'" } rescue StandardError => e @logger.error { "Unexpected error from event source: #{e.inspect}" } @logger.debug { "Exception trace: #{e.backtrace}" } cxn.close if !cxn.nil? end + # if unsuccessful, continue the loop to connect again end end + # Read lines one at a time from the StreamingHTTPConnection, and parse them into events. def read_stream(cxn) event_parser = EventParser.new event_parser.on(:event) do |event| @@ -130,7 +132,12 @@ def read_stream(cxn) def dispatch_event(event) @last_id = event.id + + # Tell the Backoff object that as of the current time, we have succeeded in getting some data. It + # uses that information so it can automatically reset itself if enough time passes between failures. @backoff.mark_success + + # Pass the event to the caller @on[:event].call(event) end @@ -145,22 +152,36 @@ def build_headers end end + # Custom exception that we use to tell the worker thread to stop class ShutdownSignal < StandardError end + # + # Wrapper around a socket allowing us to read an HTTP response incrementally, line by line, + # or to consume the entire response body. + # + # The socket is managed by Socketry, which implements the read timeout. + # + # Incoming data is fed into an instance of HTTPTools::Parser, which gives us the header and + # chunks of the body via callbacks. + # class StreamingHTTPConnection DEFAULT_CHUNK_SIZE = 10000 + attr_reader :status + attr_reader :headers + def initialize(uri, proxy, headers, connect_timeout, read_timeout) @parser = HTTPTools::Parser.new - @headers = nil @buffer = "" @read_timeout = read_timeout @done = false @lock = Mutex.new + # Provide callbacks for the Parser to give us the headers and body + have_headers = false @parser.on(:header) do - @headers = Hash[@parser.header.map { |k,v| [k.downcase, v] }] + have_headers = true end @parser.on(:stream) do |data| @lock.synchronize { @buffer << data } @@ -177,6 +198,12 @@ def initialize(uri, proxy, headers, connect_timeout, read_timeout) end @socket.write(build_request(uri, headers)) + + # Block until the status code and headers have been successfully read. + while !have_headers && read_chunk + end + @headers = Hash[@parser.header.map { |k,v| [k.downcase, v] }] + @status = @parser.status_code end def close @@ -184,15 +211,8 @@ def close @socket = nil end - # Blocks until status code and headers have been read, and returns them. - def read_headers - while @headers.nil? && read_chunk - end - [@parser.status_code, @headers] - end - - # Generator that returns one line at a time (delimited by \r, \n, or \r\n) until the - # response is fully consumed or the socket is closed. + # Generator that returns one line of the response body at a time (delimited by \r, \n, + # or \r\n) until the response is fully consumed or the socket is closed. def read_lines Enumerator.new do |gen| loop do @@ -222,6 +242,7 @@ def open_socket(uri, connect_timeout) end end + # Build an HTTP request line and headers. def build_request(uri, headers) ret = "GET #{uri.request_uri} HTTP/1.1\r\n" headers.each { |k, v| @@ -230,6 +251,7 @@ def build_request(uri, headers) ret + "\r\n" end + # Build a proxy connection header. def build_proxy_request(uri, proxy) ret = "CONNECT #{uri.host}:#{uri.port} HTTP/1.1\r\n" ret << "Host: #{uri.host}:#{uri.port}\r\n" @@ -241,6 +263,7 @@ def build_proxy_request(uri, proxy) ret end + # Attempt to read some more data from the socket. Return true if successful, false if EOF. def read_chunk data = @socket.readpartial(DEFAULT_CHUNK_SIZE, timeout: @read_timeout) return false if data == :eof @@ -248,6 +271,7 @@ def read_chunk true end + # Extract the next line of text from the read buffer, refilling the buffer as needed. def read_line loop do @lock.synchronize do @@ -265,6 +289,9 @@ def read_line SSEEvent = Struct.new(:type, :data, :id) + # + # Accepts lines of text and parses them into SSE messages, which it emits via a callback. + # class EventParser def initialize @on = { event: ->(_) {}, retry: ->(_) {} } From 89251f44ab4c4d661e650572457132ee15b489de Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 16:02:32 -0700 Subject: [PATCH 14/50] break out streaming support classes into separate files --- lib/ldclient-rb.rb | 2 + lib/ldclient-rb/sse_client.rb | 187 ------------------------------ lib/ldclient-rb/sse_events.rb | 59 ++++++++++ lib/ldclient-rb/streaming_http.rb | 135 +++++++++++++++++++++ 4 files changed, 196 insertions(+), 187 deletions(-) create mode 100644 lib/ldclient-rb/sse_events.rb create mode 100644 lib/ldclient-rb/streaming_http.rb diff --git a/lib/ldclient-rb.rb b/lib/ldclient-rb.rb index e7762982..530d15e9 100644 --- a/lib/ldclient-rb.rb +++ b/lib/ldclient-rb.rb @@ -9,6 +9,8 @@ require "ldclient-rb/config" require "ldclient-rb/newrelic" require "ldclient-rb/backoff" +require "ldclient-rb/streaming_http" +require "ldclient-rb/sse_events" require "ldclient-rb/sse_client" require "ldclient-rb/stream" require "ldclient-rb/polling" diff --git a/lib/ldclient-rb/sse_client.rb b/lib/ldclient-rb/sse_client.rb index c1b1c72f..238dffd2 100644 --- a/lib/ldclient-rb/sse_client.rb +++ b/lib/ldclient-rb/sse_client.rb @@ -155,191 +155,4 @@ def build_headers # Custom exception that we use to tell the worker thread to stop class ShutdownSignal < StandardError end - - # - # Wrapper around a socket allowing us to read an HTTP response incrementally, line by line, - # or to consume the entire response body. - # - # The socket is managed by Socketry, which implements the read timeout. - # - # Incoming data is fed into an instance of HTTPTools::Parser, which gives us the header and - # chunks of the body via callbacks. - # - class StreamingHTTPConnection - DEFAULT_CHUNK_SIZE = 10000 - - attr_reader :status - attr_reader :headers - - def initialize(uri, proxy, headers, connect_timeout, read_timeout) - @parser = HTTPTools::Parser.new - @buffer = "" - @read_timeout = read_timeout - @done = false - @lock = Mutex.new - - # Provide callbacks for the Parser to give us the headers and body - have_headers = false - @parser.on(:header) do - have_headers = true - end - @parser.on(:stream) do |data| - @lock.synchronize { @buffer << data } - end - @parser.on(:finish) do - @lock.synchronize { @done = true } - end - - if proxy - @socket = open_socket(proxy, connect_timeout) - @socket.write(build_proxy_request(uri, proxy)) - else - @socket = open_socket(uri, connect_timeout) - end - - @socket.write(build_request(uri, headers)) - - # Block until the status code and headers have been successfully read. - while !have_headers && read_chunk - end - @headers = Hash[@parser.header.map { |k,v| [k.downcase, v] }] - @status = @parser.status_code - end - - def close - @socket.close if @socket - @socket = nil - end - - # Generator that returns one line of the response body at a time (delimited by \r, \n, - # or \r\n) until the response is fully consumed or the socket is closed. - def read_lines - Enumerator.new do |gen| - loop do - line = read_line - break if line.nil? - gen.yield line - end - end - end - - # Consumes the entire response body and returns it. - def consume_body - loop do - @lock.synchronize { break if @done } - break if !read_chunk - end - @buffer - end - - private - - def open_socket(uri, connect_timeout) - if uri.scheme == 'https' - Socketry::SSL::Socket.connect(uri.host, uri.port, timeout: connect_timeout) - else - Socketry::TCP::Socket.connect(uri.host, uri.port, timeout: connect_timeout) - end - end - - # Build an HTTP request line and headers. - def build_request(uri, headers) - ret = "GET #{uri.request_uri} HTTP/1.1\r\n" - headers.each { |k, v| - ret << "#{k}: #{v}\r\n" - } - ret + "\r\n" - end - - # Build a proxy connection header. - def build_proxy_request(uri, proxy) - ret = "CONNECT #{uri.host}:#{uri.port} HTTP/1.1\r\n" - ret << "Host: #{uri.host}:#{uri.port}\r\n" - if proxy.user || proxy.password - encoded_credentials = Base64.strict_encode64([proxy.user || '', proxy.password || ''].join(":")) - ret << "Proxy-Authorization: Basic #{encoded_credentials}\r\n" - end - ret << "\r\n" - ret - end - - # Attempt to read some more data from the socket. Return true if successful, false if EOF. - def read_chunk - data = @socket.readpartial(DEFAULT_CHUNK_SIZE, timeout: @read_timeout) - return false if data == :eof - @parser << data - true - end - - # Extract the next line of text from the read buffer, refilling the buffer as needed. - def read_line - loop do - @lock.synchronize do - return nil if @done - i = @buffer.index(/[\r\n]/) - if !i.nil? - i += 1 if (@buffer[i] == "\r" && i < @buffer.length - 1 && @buffer[i + 1] == "\n") - return @buffer.slice!(0, i + 1).force_encoding(Encoding::UTF_8) - end - end - return nil if !read_chunk - end - end - end - - SSEEvent = Struct.new(:type, :data, :id) - - # - # Accepts lines of text and parses them into SSE messages, which it emits via a callback. - # - class EventParser - def initialize - @on = { event: ->(_) {}, retry: ->(_) {} } - reset - end - - def on(event_name, &action) - @on[event_name] = action - end - - def <<(line) - line.chomp! - if line.empty? - return if @data.empty? - event = SSEEvent.new(@type || :message, @data, @id) - reset - @on[:event].call(event) - else - case line - when /^:.*$/ - when /^(\w+): ?(.*)$/ - process_field($1, $2) - end - end - end - - private - - def reset - @id = nil - @type = nil - @data = "" - end - - def process_field(name, value) - case name - when "event" - @type = value.to_sym - when "data" - @data << "\n" if !@data.empty? - @data << value - when "id" - @id = field_value - when "retry" - if /^(?\d+)$/ =~ value - @on_retry.call(num.to_i) - end - end - end - end end diff --git a/lib/ldclient-rb/sse_events.rb b/lib/ldclient-rb/sse_events.rb new file mode 100644 index 00000000..89c68d25 --- /dev/null +++ b/lib/ldclient-rb/sse_events.rb @@ -0,0 +1,59 @@ + +module LaunchDarkly + # Server-Sent Event type used by SSEClient and EventParser. + SSEEvent = Struct.new(:type, :data, :id) + + # + # Accepts lines of text and parses them into SSE messages, which it emits via a callback. + # + class EventParser + def initialize + @on = { event: ->(_) {}, retry: ->(_) {} } + reset + end + + def on(event_name, &action) + @on[event_name] = action + end + + def <<(line) + line.chomp! + if line.empty? + return if @data.empty? + event = SSEEvent.new(@type || :message, @data, @id) + reset + @on[:event].call(event) + else + case line + when /^:.*$/ + when /^(\w+): ?(.*)$/ + process_field($1, $2) + end + end + end + + private + + def reset + @id = nil + @type = nil + @data = "" + end + + def process_field(name, value) + case name + when "event" + @type = value.to_sym + when "data" + @data << "\n" if !@data.empty? + @data << value + when "id" + @id = field_value + when "retry" + if /^(?\d+)$/ =~ value + @on_retry.call(num.to_i) + end + end + end + end +end diff --git a/lib/ldclient-rb/streaming_http.rb b/lib/ldclient-rb/streaming_http.rb new file mode 100644 index 00000000..4400da28 --- /dev/null +++ b/lib/ldclient-rb/streaming_http.rb @@ -0,0 +1,135 @@ +require "http_tools" +require "socketry" + +module LaunchDarkly + # + # Wrapper around a socket allowing us to read an HTTP response incrementally, line by line, + # or to consume the entire response body. + # + # The socket is managed by Socketry, which implements the read timeout. + # + # Incoming data is fed into an instance of HTTPTools::Parser, which gives us the header and + # chunks of the body via callbacks. + # + class StreamingHTTPConnection + DEFAULT_CHUNK_SIZE = 10000 + + attr_reader :status + attr_reader :headers + + def initialize(uri, proxy, headers, connect_timeout, read_timeout) + @parser = HTTPTools::Parser.new + @buffer = "" + @read_timeout = read_timeout + @done = false + @lock = Mutex.new + + # Provide callbacks for the Parser to give us the headers and body + have_headers = false + @parser.on(:header) do + have_headers = true + end + @parser.on(:stream) do |data| + @lock.synchronize { @buffer << data } + end + @parser.on(:finish) do + @lock.synchronize { @done = true } + end + + if proxy + @socket = open_socket(proxy, connect_timeout) + @socket.write(build_proxy_request(uri, proxy)) + else + @socket = open_socket(uri, connect_timeout) + end + + @socket.write(build_request(uri, headers)) + + # Block until the status code and headers have been successfully read. + while !have_headers && read_chunk + end + @headers = Hash[@parser.header.map { |k,v| [k.downcase, v] }] + @status = @parser.status_code + end + + def close + @socket.close if @socket + @socket = nil + end + + # Generator that returns one line of the response body at a time (delimited by \r, \n, + # or \r\n) until the response is fully consumed or the socket is closed. + def read_lines + Enumerator.new do |gen| + loop do + line = read_line + break if line.nil? + gen.yield line + end + end + end + + # Consumes the entire response body and returns it. + def consume_body + loop do + @lock.synchronize { break if @done } + break if !read_chunk + end + @buffer + end + + private + + def open_socket(uri, connect_timeout) + if uri.scheme == 'https' + Socketry::SSL::Socket.connect(uri.host, uri.port, timeout: connect_timeout) + else + Socketry::TCP::Socket.connect(uri.host, uri.port, timeout: connect_timeout) + end + end + + # Build an HTTP request line and headers. + def build_request(uri, headers) + ret = "GET #{uri.request_uri} HTTP/1.1\r\n" + headers.each { |k, v| + ret << "#{k}: #{v}\r\n" + } + ret + "\r\n" + end + + # Build a proxy connection header. + def build_proxy_request(uri, proxy) + ret = "CONNECT #{uri.host}:#{uri.port} HTTP/1.1\r\n" + ret << "Host: #{uri.host}:#{uri.port}\r\n" + if proxy.user || proxy.password + encoded_credentials = Base64.strict_encode64([proxy.user || '', proxy.password || ''].join(":")) + ret << "Proxy-Authorization: Basic #{encoded_credentials}\r\n" + end + ret << "\r\n" + ret + end + + # Attempt to read some more data from the socket. Return true if successful, false if EOF. + def read_chunk + data = @socket.readpartial(DEFAULT_CHUNK_SIZE, timeout: @read_timeout) + return false if data == :eof + @parser << data + true + end + + # Extract the next line of text from the read buffer, refilling the buffer as needed. + def read_line + loop do + @lock.synchronize do + return nil if @done + i = @buffer.index(/[\r\n]/) + if !i.nil? + i += 1 if (@buffer[i] == "\r" && i < @buffer.length - 1 && @buffer[i + 1] == "\n") + return @buffer.slice!(0, i + 1).force_encoding(Encoding::UTF_8) + end + end + return nil if !read_chunk + end + end + end +end From 61f83ce8d8f7b9701c807f83b3edc870f22172ca Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 16:28:53 -0700 Subject: [PATCH 15/50] revise EventParser to return parsed items via a generator --- lib/ldclient-rb/sse_client.rb | 20 ++++++------- lib/ldclient-rb/sse_events.rb | 54 ++++++++++++++++++++--------------- 2 files changed, 41 insertions(+), 33 deletions(-) diff --git a/lib/ldclient-rb/sse_client.rb b/lib/ldclient-rb/sse_client.rb index 238dffd2..4378961c 100644 --- a/lib/ldclient-rb/sse_client.rb +++ b/lib/ldclient-rb/sse_client.rb @@ -116,17 +116,17 @@ def connect end end - # Read lines one at a time from the StreamingHTTPConnection, and parse them into events. + # Pipe the output of the StreamingHTTPConnection into the EventParser, and dispatch events as + # they arrive. def read_stream(cxn) - event_parser = EventParser.new - event_parser.on(:event) do |event| - dispatch_event(event) - end - event_parser.on(:retry) do |interval| - @backoff.base_interval = interval / 1000 - end - cxn.read_lines.each do |line| - event_parser << line + event_parser = EventParser.new(cxn.read_lines) + event_parser.items.each do |item| + case item + when SSEEvent + dispatch_event(item) + when SSESetRetryInterval + @backoff.base_interval = event.milliseconds.t-Of / 1000 + end end end diff --git a/lib/ldclient-rb/sse_events.rb b/lib/ldclient-rb/sse_events.rb index 89c68d25..8cacc985 100644 --- a/lib/ldclient-rb/sse_events.rb +++ b/lib/ldclient-rb/sse_events.rb @@ -2,39 +2,41 @@ module LaunchDarkly # Server-Sent Event type used by SSEClient and EventParser. SSEEvent = Struct.new(:type, :data, :id) - + + SSESetRetryInterval = Struct.new(:milliseconds) + # - # Accepts lines of text and parses them into SSE messages, which it emits via a callback. + # Accepts lines of text via an iterator, and parses them into SSE messages. # class EventParser - def initialize - @on = { event: ->(_) {}, retry: ->(_) {} } - reset - end - - def on(event_name, &action) - @on[event_name] = action + def initialize(lines) + @lines = lines + reset_buffers end - def <<(line) - line.chomp! - if line.empty? - return if @data.empty? - event = SSEEvent.new(@type || :message, @data, @id) - reset - @on[:event].call(event) - else - case line - when /^:.*$/ - when /^(\w+): ?(.*)$/ - process_field($1, $2) + # Generator that parses the input interator and returns instances of SSEEvent or SSERetryInterval. + def items + Enumerator.new do |gen| + @lines.each do |line| + line.chomp! + if line.empty? + event = maybe_create_event + reset_buffers + gen.yield event if !event.nil? + else + case line + when /^(\w+): ?(.*)$/ + item = process_field($1, $2) + gen.yield item if !item.nil? + end + end end end end private - def reset + def reset_buffers @id = nil @type = nil @data = "" @@ -51,9 +53,15 @@ def process_field(name, value) @id = field_value when "retry" if /^(?\d+)$/ =~ value - @on_retry.call(num.to_i) + return SSESetRetryInterval(num.to_i) end end + nil + end + + def maybe_create_event + return nil if @data.empty? + SSEEvent.new(@type || :message, @data, @id) end end end From 9170f8a24a312c82816eb30d2bf67ea48da84f92 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 16:40:49 -0700 Subject: [PATCH 16/50] throw exception if we lose the connection before we have response headers --- lib/ldclient-rb/streaming_http.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/ldclient-rb/streaming_http.rb b/lib/ldclient-rb/streaming_http.rb index 4400da28..0ea82966 100644 --- a/lib/ldclient-rb/streaming_http.rb +++ b/lib/ldclient-rb/streaming_http.rb @@ -46,7 +46,8 @@ def initialize(uri, proxy, headers, connect_timeout, read_timeout) @socket.write(build_request(uri, headers)) # Block until the status code and headers have been successfully read. - while !have_headers && read_chunk + while !have_headers + raise EOFError if !read_chunk_into_buffer end @headers = Hash[@parser.header.map { |k,v| [k.downcase, v] }] @status = @parser.status_code From fec18dfc6561395270a69f4f39c217c640ddb886 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 16:41:05 -0700 Subject: [PATCH 17/50] comments & better method name --- lib/ldclient-rb/streaming_http.rb | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/lib/ldclient-rb/streaming_http.rb b/lib/ldclient-rb/streaming_http.rb index 0ea82966..034589da 100644 --- a/lib/ldclient-rb/streaming_http.rb +++ b/lib/ldclient-rb/streaming_http.rb @@ -24,7 +24,8 @@ def initialize(uri, proxy, headers, connect_timeout, read_timeout) @done = false @lock = Mutex.new - # Provide callbacks for the Parser to give us the headers and body + # Provide callbacks for the Parser to give us the headers and body. This has to be done + # before we start piping any data into the parser. have_headers = false @parser.on(:header) do have_headers = true @@ -74,7 +75,7 @@ def read_lines def consume_body loop do @lock.synchronize { break if @done } - break if !read_chunk + break if !read_chunk_into_buffer end @buffer end @@ -111,10 +112,13 @@ def build_proxy_request(uri, proxy) end # Attempt to read some more data from the socket. Return true if successful, false if EOF. - def read_chunk + # A read timeout will result in an exception. + def read_chunk_into_buffer data = @socket.readpartial(DEFAULT_CHUNK_SIZE, timeout: @read_timeout) return false if data == :eof @parser << data + # We are piping the content through the parser so that it can handle things like chunked + # encoding for us. The content ends up being appended to @buffer via our callback. true end @@ -129,7 +133,7 @@ def read_line return @buffer.slice!(0, i + 1).force_encoding(Encoding::UTF_8) end end - return nil if !read_chunk + return nil if !read_chunk_into_buffer end end end From 291b08ebee0c7a77a583c06ea8e12236d16902ef Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 17:04:44 -0700 Subject: [PATCH 18/50] move SSE classes into separate module --- lib/ldclient-rb.rb | 4 ---- lib/ldclient-rb/stream.rb | 3 ++- lib/sse_client.rb | 4 ++++ lib/{ldclient-rb => sse_client}/backoff.rb | 2 +- lib/{ldclient-rb => sse_client}/sse_client.rb | 4 +--- lib/{ldclient-rb => sse_client}/sse_events.rb | 4 ++-- lib/{ldclient-rb => sse_client}/streaming_http.rb | 2 +- 7 files changed, 11 insertions(+), 12 deletions(-) create mode 100644 lib/sse_client.rb rename lib/{ldclient-rb => sse_client}/backoff.rb (98%) rename lib/{ldclient-rb => sse_client}/sse_client.rb (98%) rename lib/{ldclient-rb => sse_client}/sse_events.rb (96%) rename lib/{ldclient-rb => sse_client}/streaming_http.rb (99%) diff --git a/lib/ldclient-rb.rb b/lib/ldclient-rb.rb index 530d15e9..ce9d0307 100644 --- a/lib/ldclient-rb.rb +++ b/lib/ldclient-rb.rb @@ -8,10 +8,6 @@ require "ldclient-rb/in_memory_store" require "ldclient-rb/config" require "ldclient-rb/newrelic" -require "ldclient-rb/backoff" -require "ldclient-rb/streaming_http" -require "ldclient-rb/sse_events" -require "ldclient-rb/sse_client" require "ldclient-rb/stream" require "ldclient-rb/polling" require "ldclient-rb/user_filter" diff --git a/lib/ldclient-rb/stream.rb b/lib/ldclient-rb/stream.rb index 4f3f2652..2151e945 100644 --- a/lib/ldclient-rb/stream.rb +++ b/lib/ldclient-rb/stream.rb @@ -1,5 +1,6 @@ require "concurrent/atomics" require "json" +require "sse_client" module LaunchDarkly PUT = :put @@ -45,7 +46,7 @@ def start read_timeout: READ_TIMEOUT_SECONDS, logger: @config.logger } - @es = SSEClient.new(@config.stream_uri + "/all", opts) do |conn| + @es = SSE::SSEClient.new(@config.stream_uri + "/all", opts) do |conn| conn.on_event { |event| process_message(event, event.type) } conn.on_error { |err| status = err[:status_code] diff --git a/lib/sse_client.rb b/lib/sse_client.rb new file mode 100644 index 00000000..dd24c3a6 --- /dev/null +++ b/lib/sse_client.rb @@ -0,0 +1,4 @@ +require "sse_client/streaming_http" +require "sse_client/sse_events" +require "sse_client/backoff" +require "sse_client/sse_client" diff --git a/lib/ldclient-rb/backoff.rb b/lib/sse_client/backoff.rb similarity index 98% rename from lib/ldclient-rb/backoff.rb rename to lib/sse_client/backoff.rb index d46592c1..73e0754f 100644 --- a/lib/ldclient-rb/backoff.rb +++ b/lib/sse_client/backoff.rb @@ -1,5 +1,5 @@ -module LaunchDarkly +module SSE # # A simple backoff algorithm that can be reset at any time, or reset itself after a given # interval has passed without errors. diff --git a/lib/ldclient-rb/sse_client.rb b/lib/sse_client/sse_client.rb similarity index 98% rename from lib/ldclient-rb/sse_client.rb rename to lib/sse_client/sse_client.rb index 4378961c..7e0d9a6e 100644 --- a/lib/ldclient-rb/sse_client.rb +++ b/lib/sse_client/sse_client.rb @@ -1,11 +1,9 @@ require "concurrent/atomics" -require "http_tools" require "logger" -require "socketry" require "thread" require "uri" -module LaunchDarkly +module SSE # # A lightweight Server-Sent Events implementation, relying on two gems: socketry for sockets with # read timeouts, and http_tools for HTTP response parsing. The overall logic is based on diff --git a/lib/ldclient-rb/sse_events.rb b/lib/sse_client/sse_events.rb similarity index 96% rename from lib/ldclient-rb/sse_events.rb rename to lib/sse_client/sse_events.rb index 8cacc985..d56f4e8c 100644 --- a/lib/ldclient-rb/sse_events.rb +++ b/lib/sse_client/sse_events.rb @@ -1,5 +1,5 @@ -module LaunchDarkly +module SSE # Server-Sent Event type used by SSEClient and EventParser. SSEEvent = Struct.new(:type, :data, :id) @@ -50,7 +50,7 @@ def process_field(name, value) @data << "\n" if !@data.empty? @data << value when "id" - @id = field_value + @id = value when "retry" if /^(?\d+)$/ =~ value return SSESetRetryInterval(num.to_i) diff --git a/lib/ldclient-rb/streaming_http.rb b/lib/sse_client/streaming_http.rb similarity index 99% rename from lib/ldclient-rb/streaming_http.rb rename to lib/sse_client/streaming_http.rb index 034589da..888c49aa 100644 --- a/lib/ldclient-rb/streaming_http.rb +++ b/lib/sse_client/streaming_http.rb @@ -1,7 +1,7 @@ require "http_tools" require "socketry" -module LaunchDarkly +module SSE # # Wrapper around a socket allowing us to read an HTTP response incrementally, line by line, # or to consume the entire response body. From ee4fa7c3fed3e602879835fc26668f3253631ce4 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 17:14:01 -0700 Subject: [PATCH 19/50] unit tests for event parser --- lib/sse_client/sse_events.rb | 2 +- spec/sse_events_spec.rb | 100 +++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 1 deletion(-) create mode 100644 spec/sse_events_spec.rb diff --git a/lib/sse_client/sse_events.rb b/lib/sse_client/sse_events.rb index d56f4e8c..762cc2b0 100644 --- a/lib/sse_client/sse_events.rb +++ b/lib/sse_client/sse_events.rb @@ -53,7 +53,7 @@ def process_field(name, value) @id = value when "retry" if /^(?\d+)$/ =~ value - return SSESetRetryInterval(num.to_i) + return SSESetRetryInterval.new(num.to_i) end end nil diff --git a/spec/sse_events_spec.rb b/spec/sse_events_spec.rb new file mode 100644 index 00000000..438cfa7a --- /dev/null +++ b/spec/sse_events_spec.rb @@ -0,0 +1,100 @@ +require "spec_helper" + +describe SSE::EventParser do + subject { SSE::EventParser } + + it "parses an event with all fields" do + lines = [ + "event: abc\r\n", + "data: def\r\n", + "id: 1\r\n", + "\r\n" + ] + ep = subject.new(lines) + + expected_event = SSE::SSEEvent.new(:abc, "def", "1") + output = ep.items.to_a + expect(output).to eq([ expected_event ]) + end + + it "parses an event with only data" do + lines = [ + "data: def\r\n", + "\r\n" + ] + ep = subject.new(lines) + + expected_event = SSE::SSEEvent.new(:message, "def", nil) + output = ep.items.to_a + expect(output).to eq([ expected_event ]) + end + + it "parses an event with multi-line data" do + lines = [ + "data: def\r\n", + "data: ghi\r\n", + "\r\n" + ] + ep = subject.new(lines) + + expected_event = SSE::SSEEvent.new(:message, "def\nghi", nil) + output = ep.items.to_a + expect(output).to eq([ expected_event ]) + end + + it "ignores comments" do + lines = [ + ":", + "data: def\r\n", + ":", + "\r\n" + ] + ep = subject.new(lines) + + expected_event = SSE::SSEEvent.new(:message, "def", nil) + output = ep.items.to_a + expect(output).to eq([ expected_event ]) + end + + it "parses reconnect interval" do + lines = [ + "retry: 2500\r\n", + "\r\n" + ] + ep = subject.new(lines) + + expected_item = SSE::SSESetRetryInterval.new(2500) + output = ep.items.to_a + expect(output).to eq([ expected_item ]) + end + + it "parses multiple events" do + lines = [ + "event: abc\r\n", + "data: def\r\n", + "id: 1\r\n", + "\r\n", + "data: ghi\r\n", + "\r\n" + ] + ep = subject.new(lines) + + expected_event_1 = SSE::SSEEvent.new(:abc, "def", "1") + expected_event_2 = SSE::SSEEvent.new(:message, "ghi", nil) + output = ep.items.to_a + expect(output).to eq([ expected_event_1, expected_event_2 ]) + end + + it "ignores events with no data" do + lines = [ + "event: nothing\r\n", + "\r\n", + "event: nada\r\n", + "\r\n" + ] + ep = subject.new(lines) + + output = ep.items.to_a + expect(output).to eq([]) + end +end From 36a4ca642ecf6fb73b6cfb64e866caeffd821d75 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 21:22:27 -0700 Subject: [PATCH 20/50] move test file --- spec/{ => sse_client}/sse_events_spec.rb | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename spec/{ => sse_client}/sse_events_spec.rb (100%) diff --git a/spec/sse_events_spec.rb b/spec/sse_client/sse_events_spec.rb similarity index 100% rename from spec/sse_events_spec.rb rename to spec/sse_client/sse_events_spec.rb From ef536271fb77cf2f11f1716d201186558d6580a9 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 20 Jun 2018 21:22:45 -0700 Subject: [PATCH 21/50] break out method for testability --- lib/sse_client/sse_client.rb | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/sse_client/sse_client.rb b/lib/sse_client/sse_client.rb index 7e0d9a6e..c4587aed 100644 --- a/lib/sse_client/sse_client.rb +++ b/lib/sse_client/sse_client.rb @@ -96,7 +96,7 @@ def connect sleep(interval) end begin - cxn = StreamingHTTPConnection.new(@uri, @proxy, build_headers, @connect_timeout, @read_timeout) + cxn = open_connection(build_headers) if cxn.status != 200 body = cxn.consume_body # grab the whole response body in case it has error details cxn.close @@ -114,6 +114,11 @@ def connect end end + # Just calls the StreamingHTTPConnection constructor - factored out for test purposes + def open_connection(headers) + StreamingHTTPConnection.new(@uri, @proxy, headers, @connect_timeout, @read_timeout) + end + # Pipe the output of the StreamingHTTPConnection into the EventParser, and dispatch events as # they arrive. def read_stream(cxn) From d0b257b29f3471a235ef809fd6f10420560946ce Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 21 Jun 2018 15:43:04 -0700 Subject: [PATCH 22/50] break out HTTP response reader into its own class --- lib/sse_client/streaming_http.rb | 121 ++++++++++++++++++------------- 1 file changed, 69 insertions(+), 52 deletions(-) diff --git a/lib/sse_client/streaming_http.rb b/lib/sse_client/streaming_http.rb index 888c49aa..7d3e90eb 100644 --- a/lib/sse_client/streaming_http.rb +++ b/lib/sse_client/streaming_http.rb @@ -3,40 +3,11 @@ module SSE # - # Wrapper around a socket allowing us to read an HTTP response incrementally, line by line, - # or to consume the entire response body. - # - # The socket is managed by Socketry, which implements the read timeout. - # - # Incoming data is fed into an instance of HTTPTools::Parser, which gives us the header and - # chunks of the body via callbacks. + # Wrapper around a socket providing a simplified HTTP request-response cycle including streaming. + # The socket is created and managed by Socketry, which we use so that we can have a read timeout. # class StreamingHTTPConnection - DEFAULT_CHUNK_SIZE = 10000 - - attr_reader :status - attr_reader :headers - def initialize(uri, proxy, headers, connect_timeout, read_timeout) - @parser = HTTPTools::Parser.new - @buffer = "" - @read_timeout = read_timeout - @done = false - @lock = Mutex.new - - # Provide callbacks for the Parser to give us the headers and body. This has to be done - # before we start piping any data into the parser. - have_headers = false - @parser.on(:header) do - have_headers = true - end - @parser.on(:stream) do |data| - @lock.synchronize { @buffer << data } - end - @parser.on(:finish) do - @lock.synchronize { @done = true } - end - if proxy @socket = open_socket(proxy, connect_timeout) @socket.write(build_proxy_request(uri, proxy)) @@ -46,12 +17,7 @@ def initialize(uri, proxy, headers, connect_timeout, read_timeout) @socket.write(build_request(uri, headers)) - # Block until the status code and headers have been successfully read. - while !have_headers - raise EOFError if !read_chunk_into_buffer - end - @headers = Hash[@parser.header.map { |k,v| [k.downcase, v] }] - @status = @parser.status_code + @reader = HTTPResponseReader.new(@socket, read_timeout) end def close @@ -59,25 +25,23 @@ def close @socket = nil end + def status + @reader.status + end + + def headers + @reader.headers + end + # Generator that returns one line of the response body at a time (delimited by \r, \n, # or \r\n) until the response is fully consumed or the socket is closed. def read_lines - Enumerator.new do |gen| - loop do - line = read_line - break if line.nil? - gen.yield line - end - end + @reader.read_lines end # Consumes the entire response body and returns it. - def consume_body - loop do - @lock.synchronize { break if @done } - break if !read_chunk_into_buffer - end - @buffer + def read_all + @reader.read_all end private @@ -110,9 +74,63 @@ def build_proxy_request(uri, proxy) ret << "\r\n" ret end + end + + # + # Used internally to read the HTTP response, either all at once or as a stream of text lines. + # Incoming data is fed into an instance of HTTPTools::Parser, which gives us the header and + # chunks of the body via callbacks. + # + class HTTPResponseReader + DEFAULT_CHUNK_SIZE = 10000 + + attr_reader :status, :headers + + def initialize(socket, read_timeout) + @socket = socket + @read_timeout = read_timeout + @parser = HTTPTools::Parser.new + @buffer = "" + @lock = Mutex.new + + # Provide callbacks for the Parser to give us the headers and body. This has to be done + # before we start piping any data into the parser. + have_headers = false + @parser.on(:header) do + have_headers = true + end + @parser.on(:stream) do |data| + @lock.synchronize { @buffer << data } # synchronize because we're called from another thread in Socketry + end + + # Block until the status code and headers have been successfully read. + while !have_headers + raise EOFError if !read_chunk_into_buffer + end + @headers = Hash[@parser.header.map { |k,v| [k.downcase, v] }] + @status = @parser.status_code + end + + def read_lines + Enumerator.new do |gen| + loop do + line = read_line + break if line.nil? + gen.yield line + end + end + end + + def read_all + while read_chunk_into_buffer + end + @buffer + end + + private # Attempt to read some more data from the socket. Return true if successful, false if EOF. - # A read timeout will result in an exception. + # A read timeout will result in an exception from Socketry's readpartial method. def read_chunk_into_buffer data = @socket.readpartial(DEFAULT_CHUNK_SIZE, timeout: @read_timeout) return false if data == :eof @@ -126,7 +144,6 @@ def read_chunk_into_buffer def read_line loop do @lock.synchronize do - return nil if @done i = @buffer.index(/[\r\n]/) if !i.nil? i += 1 if (@buffer[i] == "\r" && i < @buffer.length - 1 && @buffer[i + 1] == "\n") From 51bd8a72b4354ff30c4b9e3484a3d9357e3d3e33 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 21 Jun 2018 16:23:02 -0700 Subject: [PATCH 23/50] fix method name --- lib/sse_client/sse_client.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sse_client/sse_client.rb b/lib/sse_client/sse_client.rb index c4587aed..76758c55 100644 --- a/lib/sse_client/sse_client.rb +++ b/lib/sse_client/sse_client.rb @@ -98,7 +98,7 @@ def connect begin cxn = open_connection(build_headers) if cxn.status != 200 - body = cxn.consume_body # grab the whole response body in case it has error details + body = cxn.read_all # grab the whole response body in case it has error details cxn.close @on[:error].call({status_code: cxn.status, body: body}) elsif cxn.headers["content-type"] && cxn.headers["content-type"].start_with?("text/event-stream") From f071b9f524660f53c853e227827a6ea2ab78c598 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 21 Jun 2018 16:41:48 -0700 Subject: [PATCH 24/50] add tests for HTTPResponseReader --- spec/sse_client/streaming_http_spec.rb | 94 ++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 spec/sse_client/streaming_http_spec.rb diff --git a/spec/sse_client/streaming_http_spec.rb b/spec/sse_client/streaming_http_spec.rb new file mode 100644 index 00000000..84046461 --- /dev/null +++ b/spec/sse_client/streaming_http_spec.rb @@ -0,0 +1,94 @@ +require "spec_helper" +require "socketry" + +describe SSE::HTTPResponseReader do + subject { SSE::HTTPResponseReader } + + let(:simple_response) { <<-EOT +HTTP/1.1 200 OK +Cache-Control: no-cache +Content-Type: text/event-stream + +line1\r +line2 +\r +EOT + } + + let(:malformed_response) { <<-EOT +HTTP/1.1 200 OK +Cache-Control: no-cache +EOT + } + + def make_chunks(str) + # arbitrarily split content into 5-character blocks + str.scan(/.{1,5}/m).to_enum + end + + def mock_socket_without_timeout(chunks) + mock_socket(chunks) { :eof } + end + + def mock_socket_with_timeout(chunks) + mock_socket(chunks) { raise Socketry::TimeoutError } + end + + def mock_socket(chunks) + sock = double + allow(sock).to receive(:readpartial) do + begin + chunks.next + rescue StopIteration + yield + end + end + sock + end + + it "parses status code" do + socket = mock_socket_without_timeout(make_chunks(simple_response)) + reader = subject.new(socket, 0) + expect(reader.status).to eq(200) + end + + it "parses headers" do + socket = mock_socket_without_timeout(make_chunks(simple_response)) + reader = subject.new(socket, 0) + expect(reader.headers).to eq({ + 'cache-control' => 'no-cache', + 'content-type' => 'text/event-stream' + }) + end + + it "can read entire response body" do + socket = mock_socket_without_timeout(make_chunks(simple_response)) + reader = subject.new(socket, 0) + expect(reader.read_all).to eq("line1\r\nline2\n\r\n") + end + + it "can read response body as lines" do + socket = mock_socket_without_timeout(make_chunks(simple_response)) + reader = subject.new(socket, 0) + expect(reader.read_lines.to_a).to eq([ + "line1\r\n", + "line2\n", + "\r\n" + ]) + end + + it "raises error if response ends without complete headers" do + socket = mock_socket_without_timeout(make_chunks(malformed_response)) + expect { subject.new(socket, 0) }.to raise_error(EOFError) + end + + it "throws timeout if thrown by socket read" do + socket = mock_socket_with_timeout(make_chunks(simple_response)) + reader = subject.new(socket, 0) + lines = reader.read_lines + lines.next + lines.next + lines.next + expect { lines.next }.to raise_error(Socketry::TimeoutError) + end +end From 95a9d8ec1aeaf0ed1009648518e3cdd57b3fd6e1 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 21 Jun 2018 18:52:32 -0700 Subject: [PATCH 25/50] end-to-end tests for new SSE client [2 of 2] --- lib/sse_client/sse_client.rb | 5 +- spec/sse_client/sse_client_spec.rb | 174 +++++++++++++++++++++++++ spec/sse_client/sse_shared.rb | 51 ++++++++ spec/sse_client/streaming_http_spec.rb | 86 ++++++++++++ 4 files changed, 315 insertions(+), 1 deletion(-) create mode 100644 spec/sse_client/sse_client_spec.rb create mode 100644 spec/sse_client/sse_shared.rb diff --git a/lib/sse_client/sse_client.rb b/lib/sse_client/sse_client.rb index 76758c55..bb693d81 100644 --- a/lib/sse_client/sse_client.rb +++ b/lib/sse_client/sse_client.rb @@ -19,7 +19,7 @@ def initialize(uri, options = {}) @uri = URI(uri) @stopped = Concurrent::AtomicBoolean.new(false) - @headers = options[:headers].clone || {} + @headers = options[:headers] ? options[:headers].clone : {} @connect_timeout = options[:connect_timeout] || DEFAULT_CONNECT_TIMEOUT @read_timeout = options[:read_timeout] || DEFAULT_READ_TIMEOUT @logger = options[:logger] || default_logger @@ -101,10 +101,13 @@ def connect body = cxn.read_all # grab the whole response body in case it has error details cxn.close @on[:error].call({status_code: cxn.status, body: body}) + next elsif cxn.headers["content-type"] && cxn.headers["content-type"].start_with?("text/event-stream") return cxn # we're good to proceed end @logger.error { "Event source returned unexpected content type '#{cxn.headers["content-type"]}'" } + rescue ShutdownSignal + raise rescue StandardError => e @logger.error { "Unexpected error from event source: #{e.inspect}" } @logger.debug { "Exception trace: #{e.backtrace}" } diff --git a/spec/sse_client/sse_client_spec.rb b/spec/sse_client/sse_client_spec.rb new file mode 100644 index 00000000..001e1203 --- /dev/null +++ b/spec/sse_client/sse_client_spec.rb @@ -0,0 +1,174 @@ +require "spec_helper" +require "concurrent/atomics" +require "socketry" +require "sse_client/sse_shared" + +# +# End-to-end tests of SSEClient against a real server +# +describe SSE::SSEClient do + subject { SSE::SSEClient } + + def with_client(client) + begin + yield client + ensure + client.close + end + end + + class ObjectSink + def initialize(expected_count) + @expected_count = expected_count + @semaphore = Concurrent::Semaphore.new(expected_count) + @semaphore.acquire(expected_count) + @received = [] + end + + def <<(value) + @received << value + @semaphore.release(1) + end + + def await_values + @semaphore.acquire(@expected_count) + @received + end + end + + it "sends expected headers" do + with_server do |server| + connected = Concurrent::Event.new + received_req = nil + server.setup_response("/") do |req,res| + received_req = req + res.content_type = "text/event-stream" + res.status = 200 + connected.set + end + + headers = { + "Authorization" => "secret" + } + + with_client(subject.new(server.base_uri, headers: headers, logger: NullLogger.new)) do |client| + connected.wait + expect(received_req).not_to be_nil + expect(received_req.header).to eq({ + "accept" => ["text/event-stream"], + "cache-control" => ["no-cache"], + "host" => ["127.0.0.1"], + "authorization" => ["secret"] + }) + end + end + end + + it "receives messages" do + events_body = <<-EOT +event: go +data: foo +id: 1 + +event: stop +data: bar + +EOT + + with_server do |server| + server.setup_response("/") do |req,res| + res.content_type = "text/event-stream" + res.status = 200 + res.body = events_body + end + + event_sink = ObjectSink.new(2) + client = subject.new(server.base_uri, logger: NullLogger.new) do |c| + c.on_event { |event| event_sink << event } + end + + with_client(client) do |client| + expect(event_sink.await_values).to eq([ + SSE::SSEEvent.new(:go, "foo", "1"), + SSE::SSEEvent.new(:stop, "bar", nil) + ]) + end + end + end + + it "reconnects after error response" do + events_body = <<-EOT +event: go +data: foo + +EOT + + with_server do |server| + attempt = 0 + server.setup_response("/") do |req,res| + attempt += 1 + if attempt == 1 + res.status = 500 + res.body = "sorry" + res.keep_alive = false + else + res.content_type = "text/event-stream" + res.status = 200 + res.body = events_body + end + end + + event_sink = ObjectSink.new(1) + error_sink = ObjectSink.new(1) + client = subject.new(server.base_uri, + reconnect_time: 0.25, logger: NullLogger.new) do |c| + c.on_event { |event| event_sink << event } + c.on_error { |error| error_sink << error } + end + + with_client(client) do |client| + expect(event_sink.await_values).to eq([ + SSE::SSEEvent.new(:go, "foo", nil) + ]) + expect(error_sink.await_values).to eq([ + { status_code: 500, body: "sorry" } + ]) + expect(attempt).to eq(2) + end + end + end + + it "reconnects after read timeout" do + events_body = <<-EOT +event: go +data: foo + +EOT + + with_server do |server| + attempt = 0 + server.setup_response("/") do |req,res| + attempt += 1 + if attempt == 1 + sleep(2) + end + res.content_type = "text/event-stream" + res.status = 200 + res.body = events_body + end + + event_sink = ObjectSink.new(1) + client = subject.new(server.base_uri, + reconnect_time: 0.25, read_timeout: 0.25, logger: NullLogger.new) do |c| + c.on_event { |event| event_sink << event } + end + + with_client(client) do |client| + expect(event_sink.await_values).to eq([ + SSE::SSEEvent.new(:go, "foo", nil) + ]) + expect(attempt).to eq(2) + end + end + end +end diff --git a/spec/sse_client/sse_shared.rb b/spec/sse_client/sse_shared.rb new file mode 100644 index 00000000..9112387b --- /dev/null +++ b/spec/sse_client/sse_shared.rb @@ -0,0 +1,51 @@ +require "spec_helper" +require "webrick" + +class StubHTTPServer + def initialize + @port = 50000 + begin + @server = WEBrick::HTTPServer.new( + BindAddress: '127.0.0.1', + Port: @port, + AccessLog: [], + Logger: NullLogger.new + ) + rescue Errno::EADDRINUSE + @port += 1 + retry + end + end + + def start + Thread.new { @server.start } + end + + def stop + @server.shutdown + end + + def base_uri + URI("http://127.0.0.1:#{@port}") + end + + def setup_response(uri_path, &action) + @server.mount_proc(uri_path, action) + end +end + +class NullLogger + def method_missing(*) + self + end +end + +def with_server + server = StubHTTPServer.new + begin + server.start + yield server + ensure + server.stop + end +end diff --git a/spec/sse_client/streaming_http_spec.rb b/spec/sse_client/streaming_http_spec.rb index 84046461..552b4deb 100644 --- a/spec/sse_client/streaming_http_spec.rb +++ b/spec/sse_client/streaming_http_spec.rb @@ -1,6 +1,92 @@ require "spec_helper" require "socketry" +require "sse_client/sse_shared" +# +# End-to-end tests of HTTP requests against a real server +# +describe SSE::StreamingHTTPConnection do + subject { SSE::StreamingHTTPConnection } + + def with_connection(cxn) + begin + yield cxn + ensure + cxn.close + end + end + + it "makes HTTP connection and sends request" do + with_server do |server| + received_req = nil + server.setup_response("/foo") do |req,res| + received_req = req + res.status = 200 + end + headers = { + "Accept" => "text/plain" + } + with_connection(subject.new(server.base_uri.merge("/foo?bar"), nil, headers, 30, 30)) do + expect(received_req).not_to be_nil + expect(received_req.unparsed_uri).to eq("/foo?bar") + expect(received_req.header).to eq({ "accept" => ["text/plain"] }) + end + end + end + + it "receives response status" do + with_server do |server| + server.setup_response("/foo") do |req,res| + res.status = 204 + end + with_connection(subject.new(server.base_uri.merge("/foo"), nil, {}, 30, 30)) do |cxn| + expect(cxn.status).to eq(204) + end + end + end + + it "receives response headers" do + with_server do |server| + server.setup_response("/foo") do |req,res| + res["Content-Type"] = "application/json" + end + with_connection(subject.new(server.base_uri.merge("/foo"), nil, {}, 30, 30)) do |cxn| + expect(cxn.headers["content-type"]).to eq("application/json") + end + end + end + + it "can read response as lines" do + body = <<-EOT +This is +a response +EOT + with_server do |server| + server.setup_response("/foo") do |req,res| + res.body = body + end + with_connection(subject.new(server.base_uri.merge("/foo"), nil, {}, 30, 30)) do |cxn| + lines = cxn.read_lines + expect(lines.next).to eq("This is\n") + expect(lines.next).to eq("a response\n") + end + end + end + + it "enforces read timeout" do + with_server do |server| + server.setup_response("/") do |req,res| + sleep(2) + res.status = 200 + end + expect { subject.new(server.base_uri, nil, {}, 30, 0.25) }.to raise_error(Socketry::TimeoutError) + end + end +end + +# +# Tests of response parsing functionality without a real HTTP request +# describe SSE::HTTPResponseReader do subject { SSE::HTTPResponseReader } From bfdfdc87488fee2ad6b3a8d2b7b1f040abba3c7f Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 21 Jun 2018 19:08:40 -0700 Subject: [PATCH 26/50] don't use Thread.raise --- lib/sse_client/sse_client.rb | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/lib/sse_client/sse_client.rb b/lib/sse_client/sse_client.rb index bb693d81..290c4b7e 100644 --- a/lib/sse_client/sse_client.rb +++ b/lib/sse_client/sse_client.rb @@ -40,7 +40,7 @@ def initialize(uri, options = {}) yield self if block_given? - @worker = Thread.new do + Thread.new do run_stream end end @@ -59,7 +59,7 @@ def on_error(&action) def close if @stopped.make_true - @worker.raise ShutdownSignal.new + @cxn.close if !@cxn.nil? end end @@ -73,23 +73,24 @@ def default_logger def run_stream while !@stopped.value - cxn = nil + @cxn = nil begin - cxn = connect - read_stream(cxn) - rescue ShutdownSignal - return + @cxn = connect + read_stream(@cxn) + rescue Errno::EBADF + # don't log this - it probably means we closed our own connection deliberately rescue StandardError => e @logger.error { "Unexpected error from event source: #{e.inspect}" } @logger.debug { "Exception trace: #{e.backtrace}" } end - cxn.close if !cxn.nil? + @cxn.close if !cxn.nil? end end # Try to establish a streaming connection. Returns the StreamingHTTPConnection object if successful. def connect loop do + return if @stopped.value interval = @backoff.next_interval if interval > 0 @logger.warn { "Will retry connection after #{'%.3f' % interval} seconds" } @@ -106,7 +107,7 @@ def connect return cxn # we're good to proceed end @logger.error { "Event source returned unexpected content type '#{cxn.headers["content-type"]}'" } - rescue ShutdownSignal + rescue Errno::EBADF raise rescue StandardError => e @logger.error { "Unexpected error from event source: #{e.inspect}" } @@ -127,6 +128,7 @@ def open_connection(headers) def read_stream(cxn) event_parser = EventParser.new(cxn.read_lines) event_parser.items.each do |item| + return if @stopped.value case item when SSEEvent dispatch_event(item) @@ -157,8 +159,4 @@ def build_headers h.merge(@headers) end end - - # Custom exception that we use to tell the worker thread to stop - class ShutdownSignal < StandardError - end end From d19c5893c7de8f0a0431c14cafe6b4357c08710b Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 21 Jun 2018 19:15:04 -0700 Subject: [PATCH 27/50] rm unnecessary class --- spec/sse_client/sse_client_spec.rb | 45 ++++++------------------------ 1 file changed, 9 insertions(+), 36 deletions(-) diff --git a/spec/sse_client/sse_client_spec.rb b/spec/sse_client/sse_client_spec.rb index 001e1203..be4bbbaf 100644 --- a/spec/sse_client/sse_client_spec.rb +++ b/spec/sse_client/sse_client_spec.rb @@ -17,25 +17,6 @@ def with_client(client) end end - class ObjectSink - def initialize(expected_count) - @expected_count = expected_count - @semaphore = Concurrent::Semaphore.new(expected_count) - @semaphore.acquire(expected_count) - @received = [] - end - - def <<(value) - @received << value - @semaphore.release(1) - end - - def await_values - @semaphore.acquire(@expected_count) - @received - end - end - it "sends expected headers" do with_server do |server| connected = Concurrent::Event.new @@ -82,16 +63,14 @@ def await_values res.body = events_body end - event_sink = ObjectSink.new(2) + event_sink = Queue.new client = subject.new(server.base_uri, logger: NullLogger.new) do |c| c.on_event { |event| event_sink << event } end with_client(client) do |client| - expect(event_sink.await_values).to eq([ - SSE::SSEEvent.new(:go, "foo", "1"), - SSE::SSEEvent.new(:stop, "bar", nil) - ]) + expect(event_sink.pop).to eq(SSE::SSEEvent.new(:go, "foo", "1")) + expect(event_sink.pop).to eq(SSE::SSEEvent.new(:stop, "bar", nil)) end end end @@ -118,8 +97,8 @@ def await_values end end - event_sink = ObjectSink.new(1) - error_sink = ObjectSink.new(1) + event_sink = Queue.new + error_sink = Queue.new client = subject.new(server.base_uri, reconnect_time: 0.25, logger: NullLogger.new) do |c| c.on_event { |event| event_sink << event } @@ -127,12 +106,8 @@ def await_values end with_client(client) do |client| - expect(event_sink.await_values).to eq([ - SSE::SSEEvent.new(:go, "foo", nil) - ]) - expect(error_sink.await_values).to eq([ - { status_code: 500, body: "sorry" } - ]) + expect(event_sink.pop).to eq(SSE::SSEEvent.new(:go, "foo", nil)) + expect(error_sink.pop).to eq({ status_code: 500, body: "sorry" }) expect(attempt).to eq(2) end end @@ -157,16 +132,14 @@ def await_values res.body = events_body end - event_sink = ObjectSink.new(1) + event_sink = Queue.new client = subject.new(server.base_uri, reconnect_time: 0.25, read_timeout: 0.25, logger: NullLogger.new) do |c| c.on_event { |event| event_sink << event } end with_client(client) do |client| - expect(event_sink.await_values).to eq([ - SSE::SSEEvent.new(:go, "foo", nil) - ]) + expect(event_sink.pop).to eq(SSE::SSEEvent.new(:go, "foo", nil)) expect(attempt).to eq(2) end end From a0b314cd4979293eb8eeabaa77dfb6d7e0716fb1 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 21 Jun 2018 19:26:54 -0700 Subject: [PATCH 28/50] add unit test for chunked encoding --- spec/sse_client/streaming_http_spec.rb | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/spec/sse_client/streaming_http_spec.rb b/spec/sse_client/streaming_http_spec.rb index 552b4deb..321b86e7 100644 --- a/spec/sse_client/streaming_http_spec.rb +++ b/spec/sse_client/streaming_http_spec.rb @@ -163,6 +163,24 @@ def mock_socket(chunks) ]) end + it "handles chunked encoding" do + chunked_response = <<-EOT +HTTP/1.1 200 OK +Content-Type: text/plain +Transfer-Encoding: chunked + +6\r +things\r +A\r + and stuff\r +0\r +\r +EOT + socket = mock_socket_without_timeout(make_chunks(chunked_response)) + reader = subject.new(socket, 0) + expect(reader.read_all).to eq("things and stuff") + end + it "raises error if response ends without complete headers" do socket = mock_socket_without_timeout(make_chunks(malformed_response)) expect { subject.new(socket, 0) }.to raise_error(EOFError) From 3b4ca92e8c098c2f83a38ddfea3b18efcf70e8ff Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 21 Jun 2018 19:33:00 -0700 Subject: [PATCH 29/50] misc cleanup --- spec/sse_client/sse_client_spec.rb | 13 +++++-------- spec/sse_client/streaming_http_spec.rb | 6 +++--- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/spec/sse_client/sse_client_spec.rb b/spec/sse_client/sse_client_spec.rb index be4bbbaf..b76c84dd 100644 --- a/spec/sse_client/sse_client_spec.rb +++ b/spec/sse_client/sse_client_spec.rb @@ -1,5 +1,4 @@ require "spec_helper" -require "concurrent/atomics" require "socketry" require "sse_client/sse_shared" @@ -19,10 +18,9 @@ def with_client(client) it "sends expected headers" do with_server do |server| - connected = Concurrent::Event.new - received_req = nil + requests = Queue.new server.setup_response("/") do |req,res| - received_req = req + requests << req res.content_type = "text/event-stream" res.status = 200 connected.set @@ -33,8 +31,7 @@ def with_client(client) } with_client(subject.new(server.base_uri, headers: headers, logger: NullLogger.new)) do |client| - connected.wait - expect(received_req).not_to be_nil + received_req = requests.pop expect(received_req.header).to eq({ "accept" => ["text/event-stream"], "cache-control" => ["no-cache"], @@ -108,7 +105,7 @@ def with_client(client) with_client(client) do |client| expect(event_sink.pop).to eq(SSE::SSEEvent.new(:go, "foo", nil)) expect(error_sink.pop).to eq({ status_code: 500, body: "sorry" }) - expect(attempt).to eq(2) + expect(attempt).to be >= 2 end end end @@ -140,7 +137,7 @@ def with_client(client) with_client(client) do |client| expect(event_sink.pop).to eq(SSE::SSEEvent.new(:go, "foo", nil)) - expect(attempt).to eq(2) + expect(attempt).to be >= 2 end end end diff --git a/spec/sse_client/streaming_http_spec.rb b/spec/sse_client/streaming_http_spec.rb index 321b86e7..19b5e7f9 100644 --- a/spec/sse_client/streaming_http_spec.rb +++ b/spec/sse_client/streaming_http_spec.rb @@ -18,16 +18,16 @@ def with_connection(cxn) it "makes HTTP connection and sends request" do with_server do |server| - received_req = nil + requests = Queue.new server.setup_response("/foo") do |req,res| - received_req = req + requests << req res.status = 200 end headers = { "Accept" => "text/plain" } with_connection(subject.new(server.base_uri.merge("/foo?bar"), nil, headers, 30, 30)) do - expect(received_req).not_to be_nil + received_req = requests.pop expect(received_req.unparsed_uri).to eq("/foo?bar") expect(received_req.header).to eq({ "accept" => ["text/plain"] }) end From 86cc3bc8e86c439dfb4cd07bb44c58aaabd4eb92 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 21 Jun 2018 19:34:46 -0700 Subject: [PATCH 30/50] misc cleanup --- spec/sse_client/sse_client_spec.rb | 3 --- spec/sse_client/streaming_http_spec.rb | 10 ++++------ 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/spec/sse_client/sse_client_spec.rb b/spec/sse_client/sse_client_spec.rb index b76c84dd..65eb776f 100644 --- a/spec/sse_client/sse_client_spec.rb +++ b/spec/sse_client/sse_client_spec.rb @@ -52,7 +52,6 @@ def with_client(client) data: bar EOT - with_server do |server| server.setup_response("/") do |req,res| res.content_type = "text/event-stream" @@ -78,7 +77,6 @@ def with_client(client) data: foo EOT - with_server do |server| attempt = 0 server.setup_response("/") do |req,res| @@ -116,7 +114,6 @@ def with_client(client) data: foo EOT - with_server do |server| attempt = 0 server.setup_response("/") do |req,res| diff --git a/spec/sse_client/streaming_http_spec.rb b/spec/sse_client/streaming_http_spec.rb index 19b5e7f9..819a8932 100644 --- a/spec/sse_client/streaming_http_spec.rb +++ b/spec/sse_client/streaming_http_spec.rb @@ -101,12 +101,6 @@ def with_connection(cxn) EOT } - let(:malformed_response) { <<-EOT -HTTP/1.1 200 OK -Cache-Control: no-cache -EOT - } - def make_chunks(str) # arbitrarily split content into 5-character blocks str.scan(/.{1,5}/m).to_enum @@ -182,6 +176,10 @@ def mock_socket(chunks) end it "raises error if response ends without complete headers" do + malformed_response = <<-EOT +HTTP/1.1 200 OK +Cache-Control: no-cache +EOT socket = mock_socket_without_timeout(make_chunks(malformed_response)) expect { subject.new(socket, 0) }.to raise_error(EOFError) end From 253c0cd4c0a09878670a7685256d5440b3c766b6 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 22 Jun 2018 09:50:35 -0700 Subject: [PATCH 31/50] rm unused var --- spec/sse_client/sse_client_spec.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/spec/sse_client/sse_client_spec.rb b/spec/sse_client/sse_client_spec.rb index 65eb776f..1e9a99f9 100644 --- a/spec/sse_client/sse_client_spec.rb +++ b/spec/sse_client/sse_client_spec.rb @@ -23,7 +23,6 @@ def with_client(client) requests << req res.content_type = "text/event-stream" res.status = 200 - connected.set end headers = { From e983700b7858611ecaf80b0c8e41e39e0d89605f Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 22 Jun 2018 10:17:27 -0700 Subject: [PATCH 32/50] turn logging back on to diagnose test problems --- spec/sse_client/sse_client_spec.rb | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/spec/sse_client/sse_client_spec.rb b/spec/sse_client/sse_client_spec.rb index 1e9a99f9..ecb7c4e4 100644 --- a/spec/sse_client/sse_client_spec.rb +++ b/spec/sse_client/sse_client_spec.rb @@ -29,7 +29,7 @@ def with_client(client) "Authorization" => "secret" } - with_client(subject.new(server.base_uri, headers: headers, logger: NullLogger.new)) do |client| + with_client(subject.new(server.base_uri, headers: headers)) do |client| received_req = requests.pop expect(received_req.header).to eq({ "accept" => ["text/event-stream"], @@ -59,7 +59,7 @@ def with_client(client) end event_sink = Queue.new - client = subject.new(server.base_uri, logger: NullLogger.new) do |c| + client = subject.new(server.base_uri) do |c| c.on_event { |event| event_sink << event } end @@ -93,8 +93,7 @@ def with_client(client) event_sink = Queue.new error_sink = Queue.new - client = subject.new(server.base_uri, - reconnect_time: 0.25, logger: NullLogger.new) do |c| + client = subject.new(server.base_uri, reconnect_time: 0.25) do |c| c.on_event { |event| event_sink << event } c.on_error { |error| error_sink << error } end @@ -127,7 +126,7 @@ def with_client(client) event_sink = Queue.new client = subject.new(server.base_uri, - reconnect_time: 0.25, read_timeout: 0.25, logger: NullLogger.new) do |c| + reconnect_time: 0.25, read_timeout: 0.25) do |c| c.on_event { |event| event_sink << event } end From dcb7af84dc7f3afe303ea3fe323d5fa3befe5a10 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 22 Jun 2018 12:26:43 -0700 Subject: [PATCH 33/50] make sure we don't try to read past end of response body --- lib/sse_client/streaming_http.rb | 6 ++++++ spec/sse_client/streaming_http_spec.rb | 16 ++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/lib/sse_client/streaming_http.rb b/lib/sse_client/streaming_http.rb index 7d3e90eb..6f6a1fc2 100644 --- a/lib/sse_client/streaming_http.rb +++ b/lib/sse_client/streaming_http.rb @@ -91,6 +91,7 @@ def initialize(socket, read_timeout) @read_timeout = read_timeout @parser = HTTPTools::Parser.new @buffer = "" + @done = false @lock = Mutex.new # Provide callbacks for the Parser to give us the headers and body. This has to be done @@ -102,6 +103,9 @@ def initialize(socket, read_timeout) @parser.on(:stream) do |data| @lock.synchronize { @buffer << data } # synchronize because we're called from another thread in Socketry end + @parser.on(:finish) do + @lock.synchronize { @done = true } + end # Block until the status code and headers have been successfully read. while !have_headers @@ -132,6 +136,8 @@ def read_all # Attempt to read some more data from the socket. Return true if successful, false if EOF. # A read timeout will result in an exception from Socketry's readpartial method. def read_chunk_into_buffer + # If @done is set, it means the Parser has signaled end of response body + @lock.synchronize { return false if @done } data = @socket.readpartial(DEFAULT_CHUNK_SIZE, timeout: @read_timeout) return false if data == :eof @parser << data diff --git a/spec/sse_client/streaming_http_spec.rb b/spec/sse_client/streaming_http_spec.rb index 819a8932..f68f3764 100644 --- a/spec/sse_client/streaming_http_spec.rb +++ b/spec/sse_client/streaming_http_spec.rb @@ -73,6 +73,22 @@ def with_connection(cxn) end end + it "can read entire response body" do + body = <<-EOT +This is +a response +EOT + with_server do |server| + server.setup_response("/foo") do |req,res| + res.body = body + end + with_connection(subject.new(server.base_uri.merge("/foo"), nil, {}, 30, 30)) do |cxn| + read_body = cxn.read_all + expect(read_body).to eq("This is\na response\n") + end + end + end + it "enforces read timeout" do with_server do |server| server.setup_response("/") do |req,res| From 9243993122aa8a9754bb72c4856888da87b50e9e Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 22 Jun 2018 12:51:08 -0700 Subject: [PATCH 34/50] add Ruby 2.5 and JRuby 9.1 to build --- .circleci/config.yml | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5fe3f4a0..df9dac51 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -4,11 +4,12 @@ workflows: version: 2 test: jobs: - # - test-misc-rubies # none of these older Ruby versions are supported on this branch + - test-misc-rubies - test-2.2 - test-2.3 - test-2.4 - - test-jruby-9.1 + - test-2.5 + - test-jruby-9.2 ruby-docker-template: &ruby-docker-template steps: @@ -17,6 +18,7 @@ ruby-docker-template: &ruby-docker-template if [[ $CIRCLE_JOB == test-jruby* ]]; then gem install jruby-openssl; # required by bundler, no effect on Ruby MRI fi + - run: ruby -v - run: gem install bundler - run: bundle install - run: mkdir ./rspec @@ -40,9 +42,14 @@ jobs: test-2.4: <<: *ruby-docker-template docker: - - image: circleci/ruby:2.4.3-jessie + - image: circleci/ruby:2.4.4-stretch - image: redis - test-jruby-9.1: + test-2.5: + <<: *ruby-docker-template + docker: + - image: circleci/ruby:2.5.1-stretch + - image: redis + test-jruby-9.2: <<: *ruby-docker-template docker: - image: circleci/jruby:9-jdk @@ -54,7 +61,7 @@ jobs: machine: image: circleci/classic:latest environment: - - RUBIES: "ruby-2.1.9 ruby-2.0.0 ruby-1.9.3 jruby-9.0.5.0" + - RUBIES: "jruby-9.1.17.0" steps: - run: sudo apt-get -q update - run: sudo apt-get -qy install redis-server From fc86c6ddb852fea1bc41c25963b84d4bcd9575b2 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 22 Jun 2018 12:55:39 -0700 Subject: [PATCH 35/50] update readme --- README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README.md b/README.md index 69908087..33bfbc77 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,11 @@ LaunchDarkly SDK for Ruby [![Test Coverage](https://codeclimate.com/github/launchdarkly/ruby-client/badges/coverage.svg)](https://codeclimate.com/github/launchdarkly/ruby-client/coverage) [![security](https://hakiri.io/github/launchdarkly/ruby-client/master.svg)](https://hakiri.io/github/launchdarkly/ruby-client/master) +Supported Ruby versions +----------------------- + +This version of the LaunchDarkly SDK has a minimum Ruby version of 2.2.6, or 9.1.6 for JRuby. + Quick setup ----------- From 7357bebc3f5989a765ff011176e79b92e3dae774 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 22 Jun 2018 13:53:43 -0700 Subject: [PATCH 36/50] remove mention of Celluloid from readme --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index 33bfbc77..f8735d4d 100644 --- a/README.md +++ b/README.md @@ -18,9 +18,8 @@ Quick setup 0. Install the Ruby SDK with `gem` ```shell -gem install ldclient-rb --prerelease +gem install ldclient-rb ``` -Note: The `--prerelease` flag is there to satisfy the dependency of celluloid 0.18pre which we have tested extensively and have found stable in our use case. Unfortunately, the upstream provider has not promoted this version to stable yet. See [here](https://github.com/celluloid/celluloid/issues/762) This is not required for use in a Gemfile. 1. Require the LaunchDarkly client: From c66703d062c67a13748e1e03de58c4c69f1b25e6 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 22 Jun 2018 14:22:45 -0700 Subject: [PATCH 37/50] nil guard to avoid spurious error when shutting down --- lib/sse_client/sse_client.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sse_client/sse_client.rb b/lib/sse_client/sse_client.rb index 290c4b7e..b0eee8d7 100644 --- a/lib/sse_client/sse_client.rb +++ b/lib/sse_client/sse_client.rb @@ -76,7 +76,7 @@ def run_stream @cxn = nil begin @cxn = connect - read_stream(@cxn) + read_stream(@cxn) if !@cxn.nil? rescue Errno::EBADF # don't log this - it probably means we closed our own connection deliberately rescue StandardError => e From 718d1a4a795c036d1bb8a16703f914cab706955a Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 22 Jun 2018 16:03:58 -0700 Subject: [PATCH 38/50] fix proxy implementation so it actually works --- lib/sse_client/streaming_http.rb | 44 +++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/lib/sse_client/streaming_http.rb b/lib/sse_client/streaming_http.rb index 6f6a1fc2..87bcdd64 100644 --- a/lib/sse_client/streaming_http.rb +++ b/lib/sse_client/streaming_http.rb @@ -7,32 +7,41 @@ module SSE # The socket is created and managed by Socketry, which we use so that we can have a read timeout. # class StreamingHTTPConnection + attr_reader :status, :headers + def initialize(uri, proxy, headers, connect_timeout, read_timeout) if proxy @socket = open_socket(proxy, connect_timeout) - @socket.write(build_proxy_request(uri, proxy)) + + write(build_proxy_request(uri, proxy)) + + # temporarily create a reader just for the proxy connect dialogue + proxy_reader = HTTPResponseReader.new(@socket, read_timeout) + + # if proxy connect failed, return immediately with error status + if proxy_reader.status != 200 + @status = proxy_reader.status + return + end + + # start using TLS at this point if appropriate + switch_socket_to_ssl if uri.scheme.downcase == 'https' else @socket = open_socket(uri, connect_timeout) end - @socket.write(build_request(uri, headers)) + write(build_request(uri, headers)) @reader = HTTPResponseReader.new(@socket, read_timeout) + @status = @reader.status + @headers = @reader.headers end def close @socket.close if @socket @socket = nil end - - def status - @reader.status - end - - def headers - @reader.headers - end - + # Generator that returns one line of the response body at a time (delimited by \r, \n, # or \r\n) until the response is fully consumed or the socket is closed. def read_lines @@ -47,13 +56,24 @@ def read_all private def open_socket(uri, connect_timeout) - if uri.scheme == 'https' + if uri.scheme.downcase == 'https' Socketry::SSL::Socket.connect(uri.host, uri.port, timeout: connect_timeout) else Socketry::TCP::Socket.connect(uri.host, uri.port, timeout: connect_timeout) end end + def write(data) + @socket.write(data) + end + + def switch_socket_to_ssl + io = IO.try_convert(@socket) + ssl_sock = OpenSSL::SSL::SSLSocket.new(io, OpenSSL::SSL::SSLContext.new) + ssl_sock.connect + @socket = Socketry::SSL::Socket.new.from_socket(ssl_sock) + end + # Build an HTTP request line and headers. def build_request(uri, headers) ret = "GET #{uri.request_uri} HTTP/1.1\r\n" From 776bc982cd85437523052d4496f8a939dec0b7e3 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 22 Jun 2018 16:57:47 -0700 Subject: [PATCH 39/50] add unit tests for http/https proxy --- lib/sse_client/streaming_http.rb | 2 +- spec/sse_client/sse_shared.rb | 60 ++++++++++++++++++++++---- spec/sse_client/streaming_http_spec.rb | 32 ++++++++++++++ 3 files changed, 85 insertions(+), 9 deletions(-) diff --git a/lib/sse_client/streaming_http.rb b/lib/sse_client/streaming_http.rb index 87bcdd64..d0123723 100644 --- a/lib/sse_client/streaming_http.rb +++ b/lib/sse_client/streaming_http.rb @@ -50,7 +50,7 @@ def read_lines # Consumes the entire response body and returns it. def read_all - @reader.read_all + @reader.read_all if !@reader.nil? end private diff --git a/spec/sse_client/sse_shared.rb b/spec/sse_client/sse_shared.rb index 9112387b..0d59c865 100644 --- a/spec/sse_client/sse_shared.rb +++ b/spec/sse_client/sse_shared.rb @@ -1,22 +1,28 @@ require "spec_helper" require "webrick" +require "webrick/httpproxy" +require "webrick/https" class StubHTTPServer def initialize @port = 50000 begin - @server = WEBrick::HTTPServer.new( - BindAddress: '127.0.0.1', - Port: @port, - AccessLog: [], - Logger: NullLogger.new - ) + @server = create_server(@port) rescue Errno::EADDRINUSE @port += 1 retry end end + def create_server(port) + WEBrick::HTTPServer.new( + BindAddress: '127.0.0.1', + Port: port, + AccessLog: [], + Logger: NullLogger.new + ) + end + def start Thread.new { @server.start } end @@ -34,14 +40,52 @@ def setup_response(uri_path, &action) end end +class StubSecureHTTPServer < StubHTTPServer + def create_server(port) + WEBrick::HTTPServer.new( + BindAddress: '127.0.0.1', + Port: port, + SSLEnable: true, + SSLCertName: [['CN', 'localhost', OpenSSL::ASN1::PRINTABLESTRING]], # self-signed cert + AccessLog: [], + Logger: NullLogger.new + ) + end + + def base_uri + URI("https://127.0.0.1:#{@port}") + end +end + +class StubProxyServer < StubHTTPServer + attr_reader :request_count + + def initialize + super + @request_count = 0 + end + + def create_server(port) + WEBrick::HTTPProxyServer.new( + BindAddress: '127.0.0.1', + Port: port, + AccessLog: [], + Logger: NullLogger.new, + ProxyContentHandler: proc do |req,res| + @request_count += 1 + end + ) + end +end + class NullLogger def method_missing(*) self end end -def with_server - server = StubHTTPServer.new +def with_server(server = nil) + server = StubHTTPServer.new if server.nil? begin server.start yield server diff --git a/spec/sse_client/streaming_http_spec.rb b/spec/sse_client/streaming_http_spec.rb index f68f3764..c0e86fbe 100644 --- a/spec/sse_client/streaming_http_spec.rb +++ b/spec/sse_client/streaming_http_spec.rb @@ -98,6 +98,38 @@ def with_connection(cxn) expect { subject.new(server.base_uri, nil, {}, 30, 0.25) }.to raise_error(Socketry::TimeoutError) end end + + it "connects to HTTP server through proxy" do + body = "hi" + with_server do |server| + server.setup_response("/foo") do |req,res| + res.body = body + end + with_server(StubProxyServer.new) do |proxy| + with_connection(subject.new(server.base_uri.merge("/foo"), proxy.base_uri, {}, 30, 30)) do |cxn| + read_body = cxn.read_all + expect(read_body).to eq("hi") + expect(proxy.request_count).to eq(1) + end + end + end + end + + it "connects to HTTPS server through proxy" do + body = "hi" + with_server(StubSecureHTTPServer.new) do |server| + server.setup_response("/foo") do |req,res| + res.body = body + end + with_server(StubProxyServer.new) do |proxy| + with_connection(subject.new(server.base_uri.merge("/foo"), proxy.base_uri, {}, 30, 30)) do |cxn| + read_body = cxn.read_all + expect(read_body).to eq("hi") + expect(proxy.request_count).to eq(1) + end + end + end + end end # From 1113c04b860358d3e50456e9099cbf13d2813426 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 22 Jun 2018 17:04:59 -0700 Subject: [PATCH 40/50] add basic HTTPS request test --- spec/sse_client/sse_shared.rb | 2 +- spec/sse_client/streaming_http_spec.rb | 21 +++++++++++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/spec/sse_client/sse_shared.rb b/spec/sse_client/sse_shared.rb index 0d59c865..9b9c8b53 100644 --- a/spec/sse_client/sse_shared.rb +++ b/spec/sse_client/sse_shared.rb @@ -46,7 +46,7 @@ def create_server(port) BindAddress: '127.0.0.1', Port: port, SSLEnable: true, - SSLCertName: [['CN', 'localhost', OpenSSL::ASN1::PRINTABLESTRING]], # self-signed cert + SSLCertName: [['CN', '127.0.0.1', OpenSSL::ASN1::PRINTABLESTRING]], # self-signed cert AccessLog: [], Logger: NullLogger.new ) diff --git a/spec/sse_client/streaming_http_spec.rb b/spec/sse_client/streaming_http_spec.rb index c0e86fbe..219bd6fa 100644 --- a/spec/sse_client/streaming_http_spec.rb +++ b/spec/sse_client/streaming_http_spec.rb @@ -99,14 +99,27 @@ def with_connection(cxn) end end + it "connects to HTTPS server" do + body = "hi" + with_server(StubSecureHTTPServer.new) do |server| + server.setup_response("/") do |req,res| + res.body = body + end + with_connection(subject.new(server.base_uri, nil, {}, 30, 30)) do |cxn| + read_body = cxn.read_all + expect(read_body).to eq("hi") + end + end + end + it "connects to HTTP server through proxy" do body = "hi" with_server do |server| - server.setup_response("/foo") do |req,res| + server.setup_response("/") do |req,res| res.body = body end with_server(StubProxyServer.new) do |proxy| - with_connection(subject.new(server.base_uri.merge("/foo"), proxy.base_uri, {}, 30, 30)) do |cxn| + with_connection(subject.new(server.base_uri, proxy.base_uri, {}, 30, 30)) do |cxn| read_body = cxn.read_all expect(read_body).to eq("hi") expect(proxy.request_count).to eq(1) @@ -118,11 +131,11 @@ def with_connection(cxn) it "connects to HTTPS server through proxy" do body = "hi" with_server(StubSecureHTTPServer.new) do |server| - server.setup_response("/foo") do |req,res| + server.setup_response("/") do |req,res| res.body = body end with_server(StubProxyServer.new) do |proxy| - with_connection(subject.new(server.base_uri.merge("/foo"), proxy.base_uri, {}, 30, 30)) do |cxn| + with_connection(subject.new(server.base_uri, proxy.base_uri, {}, 30, 30)) do |cxn| read_body = cxn.read_all expect(read_body).to eq("hi") expect(proxy.request_count).to eq(1) From f6b8c755d06d1472339aa8430508b078760c8d4d Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 22 Jun 2018 18:10:34 -0700 Subject: [PATCH 41/50] rm obsolete dependencies --- ldclient-rb.gemspec | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/ldclient-rb.gemspec b/ldclient-rb.gemspec index 3f9d2638..a9bbfb23 100644 --- a/ldclient-rb.gemspec +++ b/ldclient-rb.gemspec @@ -26,23 +26,13 @@ Gem::Specification.new do |spec| spec.add_development_dependency "codeclimate-test-reporter", "~> 0" spec.add_development_dependency "redis", "~> 3.3.5" spec.add_development_dependency "connection_pool", ">= 2.1.2" - if RUBY_VERSION >= "2.0.0" - spec.add_development_dependency "rake", "~> 10.0" - spec.add_development_dependency "rspec_junit_formatter", "~> 0.3.0" - else - spec.add_development_dependency "rake", "12.1.0" - # higher versions of rake fail to install in JRuby 1.7 - end + spec.add_development_dependency "rake", "~> 10.0" + spec.add_development_dependency "rspec_junit_formatter", "~> 0.3.0" spec.add_development_dependency "timecop", "~> 0.9.1" spec.add_runtime_dependency "json", [">= 1.8", "< 3"] - if RUBY_VERSION >= "2.1.0" - spec.add_runtime_dependency "faraday", [">= 0.9", "< 2"] - spec.add_runtime_dependency "faraday-http-cache", [">= 1.3.0", "< 3"] - else - spec.add_runtime_dependency "faraday", [">= 0.9", "< 0.14.0"] - spec.add_runtime_dependency "faraday-http-cache", [">= 1.3.0", "< 2"] - end + spec.add_runtime_dependency "faraday", [">= 0.9", "< 2"] + spec.add_runtime_dependency "faraday-http-cache", [">= 1.3.0", "< 3"] spec.add_runtime_dependency "semantic", "~> 1.6.0" spec.add_runtime_dependency "thread_safe", "~> 0.3" spec.add_runtime_dependency "net-http-persistent", "~> 2.9" From e4ac0efcc8a417ca3e4baf255ddef555170f5816 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 22 Jun 2018 18:33:30 -0700 Subject: [PATCH 42/50] always send Host header --- lib/sse_client/streaming_http.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/sse_client/streaming_http.rb b/lib/sse_client/streaming_http.rb index d0123723..5d8a052f 100644 --- a/lib/sse_client/streaming_http.rb +++ b/lib/sse_client/streaming_http.rb @@ -77,6 +77,7 @@ def switch_socket_to_ssl # Build an HTTP request line and headers. def build_request(uri, headers) ret = "GET #{uri.request_uri} HTTP/1.1\r\n" + ret << "Host: #{uri.host}\r\n" headers.each { |k, v| ret << "#{k}: #{v}\r\n" } From 3796a2d590521388f51553464b7ae9075dd6f981 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 22 Jun 2018 18:34:46 -0700 Subject: [PATCH 43/50] don't try to create an embedded HTTPS server, it doesn't work in JRuby 9.1 --- spec/sse_client/sse_shared.rb | 17 ---------- spec/sse_client/streaming_http_spec.rb | 45 ++++++++++++-------------- 2 files changed, 20 insertions(+), 42 deletions(-) diff --git a/spec/sse_client/sse_shared.rb b/spec/sse_client/sse_shared.rb index 9b9c8b53..66f79742 100644 --- a/spec/sse_client/sse_shared.rb +++ b/spec/sse_client/sse_shared.rb @@ -40,23 +40,6 @@ def setup_response(uri_path, &action) end end -class StubSecureHTTPServer < StubHTTPServer - def create_server(port) - WEBrick::HTTPServer.new( - BindAddress: '127.0.0.1', - Port: port, - SSLEnable: true, - SSLCertName: [['CN', '127.0.0.1', OpenSSL::ASN1::PRINTABLESTRING]], # self-signed cert - AccessLog: [], - Logger: NullLogger.new - ) - end - - def base_uri - URI("https://127.0.0.1:#{@port}") - end -end - class StubProxyServer < StubHTTPServer attr_reader :request_count diff --git a/spec/sse_client/streaming_http_spec.rb b/spec/sse_client/streaming_http_spec.rb index 219bd6fa..2842ed7d 100644 --- a/spec/sse_client/streaming_http_spec.rb +++ b/spec/sse_client/streaming_http_spec.rb @@ -29,7 +29,10 @@ def with_connection(cxn) with_connection(subject.new(server.base_uri.merge("/foo?bar"), nil, headers, 30, 30)) do received_req = requests.pop expect(received_req.unparsed_uri).to eq("/foo?bar") - expect(received_req.header).to eq({ "accept" => ["text/plain"] }) + expect(received_req.header).to eq({ + "accept" => ["text/plain"], + "host" => [server.base_uri.host] + }) end end end @@ -99,19 +102,6 @@ def with_connection(cxn) end end - it "connects to HTTPS server" do - body = "hi" - with_server(StubSecureHTTPServer.new) do |server| - server.setup_response("/") do |req,res| - res.body = body - end - with_connection(subject.new(server.base_uri, nil, {}, 30, 30)) do |cxn| - read_body = cxn.read_all - expect(read_body).to eq("hi") - end - end - end - it "connects to HTTP server through proxy" do body = "hi" with_server do |server| @@ -128,18 +118,23 @@ def with_connection(cxn) end end + # The following 2 tests were originally written to connect to an embedded HTTPS server made with + # WEBrick. Unfortunately, some unknown problem prevents WEBrick's self-signed certificate feature + # from working in JRuby 9.1 (but not in any other Ruby version). Therefore these tests currently + # hit an external URL. + + it "connects to HTTPS server" do + with_connection(subject.new(URI("https://app.launchdarkly.com"), nil, {}, 30, 30)) do |cxn| + expect(cxn.status).to eq 200 + end + end + it "connects to HTTPS server through proxy" do - body = "hi" - with_server(StubSecureHTTPServer.new) do |server| - server.setup_response("/") do |req,res| - res.body = body - end - with_server(StubProxyServer.new) do |proxy| - with_connection(subject.new(server.base_uri, proxy.base_uri, {}, 30, 30)) do |cxn| - read_body = cxn.read_all - expect(read_body).to eq("hi") - expect(proxy.request_count).to eq(1) - end + with_server(StubProxyServer.new) do |proxy| + with_connection(subject.new(URI("https://app.launchdarkly.com"), proxy.base_uri, {}, 30, 30)) do |cxn| + read_body = cxn.read_all + expect(cxn.status).to eq 200 + expect(proxy.request_count).to eq(1) end end end From e8bc6b7eebeae35baee0efc02f84210f8d432afa Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 22 Jun 2018 18:37:14 -0700 Subject: [PATCH 44/50] don't send Host header twice --- lib/sse_client/sse_client.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/sse_client/sse_client.rb b/lib/sse_client/sse_client.rb index b0eee8d7..e6a342a5 100644 --- a/lib/sse_client/sse_client.rb +++ b/lib/sse_client/sse_client.rb @@ -152,8 +152,7 @@ def dispatch_event(event) def build_headers h = { 'Accept' => 'text/event-stream', - 'Cache-Control' => 'no-cache', - 'Host' => @uri.host + 'Cache-Control' => 'no-cache' } h['Last-Event-Id'] = @last_id if !@last_id.nil? h.merge(@headers) From 58a92ca58e3e467afbb10236d24e21ab81459011 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 22 Jun 2018 18:45:37 -0700 Subject: [PATCH 45/50] don't need to read body --- spec/sse_client/streaming_http_spec.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/spec/sse_client/streaming_http_spec.rb b/spec/sse_client/streaming_http_spec.rb index 2842ed7d..cb5afd40 100644 --- a/spec/sse_client/streaming_http_spec.rb +++ b/spec/sse_client/streaming_http_spec.rb @@ -132,7 +132,6 @@ def with_connection(cxn) it "connects to HTTPS server through proxy" do with_server(StubProxyServer.new) do |proxy| with_connection(subject.new(URI("https://app.launchdarkly.com"), proxy.base_uri, {}, 30, 30)) do |cxn| - read_body = cxn.read_all expect(cxn.status).to eq 200 expect(proxy.request_count).to eq(1) end From 74b87384dd8a3788c0f85b932ec2e6f21e06363d Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 22 Jun 2018 18:55:38 -0700 Subject: [PATCH 46/50] more correct logic for getting proxy from env vars --- lib/sse_client/sse_client.rb | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/sse_client/sse_client.rb b/lib/sse_client/sse_client.rb index e6a342a5..e31b8607 100644 --- a/lib/sse_client/sse_client.rb +++ b/lib/sse_client/sse_client.rb @@ -24,10 +24,11 @@ def initialize(uri, options = {}) @read_timeout = options[:read_timeout] || DEFAULT_READ_TIMEOUT @logger = options[:logger] || default_logger - proxy = ENV['HTTP_PROXY'] || ENV['http_proxy'] || options[:proxy] - if proxy - proxyUri = URI(proxy) - if proxyUri.scheme == 'http' || proxyUri.scheme == 'https' + if options[:proxy] + @proxy = options[:proxy] + else + proxyUri = @uri.find_proxy + if !proxyUri.nil? && (proxyUri.scheme == 'http' || proxyUri.scheme == 'https') @proxy = proxyUri end end From c34de47de821105f0cc951a2f9f822f9a9185ff5 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Fri, 22 Jun 2018 19:16:23 -0700 Subject: [PATCH 47/50] update readme to mention Socketry --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f8735d4d..1790b2d4 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,7 @@ Note that this gem will automatically switch to using the Rails logger it is det HTTPS proxy ------------ -The Ruby SDK uses Faraday to handle all of its network traffic. Faraday provides built-in support for the use of an HTTPS proxy. If the HTTPS_PROXY environment variable is present then the SDK will proxy all network requests through the URL provided. +The Ruby SDK uses Faraday and Socketry to handle its network traffic. Both of these provide built-in support for the use of an HTTPS proxy. If the HTTPS_PROXY environment variable is present then the SDK will proxy all network requests through the URL provided. How to set the HTTPS_PROXY environment variable on Mac/Linux systems: ``` From 16c45e5ac6bbc671dcf480fefbe6b48c74ffc547 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Mon, 25 Jun 2018 10:39:34 -0700 Subject: [PATCH 48/50] ditch mutative method, misc cleanup --- lib/sse_client/streaming_http.rb | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/lib/sse_client/streaming_http.rb b/lib/sse_client/streaming_http.rb index 5d8a052f..6f2745e0 100644 --- a/lib/sse_client/streaming_http.rb +++ b/lib/sse_client/streaming_http.rb @@ -11,12 +11,12 @@ class StreamingHTTPConnection def initialize(uri, proxy, headers, connect_timeout, read_timeout) if proxy - @socket = open_socket(proxy, connect_timeout) + socket = open_socket(proxy, connect_timeout) - write(build_proxy_request(uri, proxy)) + socket.write(build_proxy_request(uri, proxy)) # temporarily create a reader just for the proxy connect dialogue - proxy_reader = HTTPResponseReader.new(@socket, read_timeout) + proxy_reader = HTTPResponseReader.new(socket, read_timeout) # if proxy connect failed, return immediately with error status if proxy_reader.status != 200 @@ -25,12 +25,16 @@ def initialize(uri, proxy, headers, connect_timeout, read_timeout) end # start using TLS at this point if appropriate - switch_socket_to_ssl if uri.scheme.downcase == 'https' + if uri.scheme.downcase == 'https' + @socket = wrap_socket_in_ssl_socket(socket) + else + @socket = socket + end else @socket = open_socket(uri, connect_timeout) end - write(build_request(uri, headers)) + @socket.write(build_request(uri, headers)) @reader = HTTPResponseReader.new(@socket, read_timeout) @status = @reader.status @@ -62,16 +66,12 @@ def open_socket(uri, connect_timeout) Socketry::TCP::Socket.connect(uri.host, uri.port, timeout: connect_timeout) end end - - def write(data) - @socket.write(data) - end - - def switch_socket_to_ssl - io = IO.try_convert(@socket) + + def wrap_socket_in_ssl_socket(socket) + io = IO.try_convert(socket) ssl_sock = OpenSSL::SSL::SSLSocket.new(io, OpenSSL::SSL::SSLContext.new) ssl_sock.connect - @socket = Socketry::SSL::Socket.new.from_socket(ssl_sock) + Socketry::SSL::Socket.new.from_socket(ssl_sock) end # Build an HTTP request line and headers. From 228f3aa4688420a4ec69e39d629cf59ff21e1648 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Mon, 25 Jun 2018 10:46:28 -0700 Subject: [PATCH 49/50] factor out socket creation/proxy logic --- lib/sse_client/streaming_http.rb | 95 ++++++++++++++++++-------------- 1 file changed, 53 insertions(+), 42 deletions(-) diff --git a/lib/sse_client/streaming_http.rb b/lib/sse_client/streaming_http.rb index 6f2745e0..bd2f128c 100644 --- a/lib/sse_client/streaming_http.rb +++ b/lib/sse_client/streaming_http.rb @@ -10,32 +10,8 @@ class StreamingHTTPConnection attr_reader :status, :headers def initialize(uri, proxy, headers, connect_timeout, read_timeout) - if proxy - socket = open_socket(proxy, connect_timeout) - - socket.write(build_proxy_request(uri, proxy)) - - # temporarily create a reader just for the proxy connect dialogue - proxy_reader = HTTPResponseReader.new(socket, read_timeout) - - # if proxy connect failed, return immediately with error status - if proxy_reader.status != 200 - @status = proxy_reader.status - return - end - - # start using TLS at this point if appropriate - if uri.scheme.downcase == 'https' - @socket = wrap_socket_in_ssl_socket(socket) - else - @socket = socket - end - else - @socket = open_socket(uri, connect_timeout) - end - + @socket = HTTPConnectionFactory.connect(uri, proxy, connect_timeout, read_timeout) @socket.write(build_request(uri, headers)) - @reader = HTTPResponseReader.new(@socket, read_timeout) @status = @reader.status @headers = @reader.headers @@ -54,26 +30,11 @@ def read_lines # Consumes the entire response body and returns it. def read_all - @reader.read_all if !@reader.nil? + @reader.read_all end private - def open_socket(uri, connect_timeout) - if uri.scheme.downcase == 'https' - Socketry::SSL::Socket.connect(uri.host, uri.port, timeout: connect_timeout) - else - Socketry::TCP::Socket.connect(uri.host, uri.port, timeout: connect_timeout) - end - end - - def wrap_socket_in_ssl_socket(socket) - io = IO.try_convert(socket) - ssl_sock = OpenSSL::SSL::SSLSocket.new(io, OpenSSL::SSL::SSLContext.new) - ssl_sock.connect - Socketry::SSL::Socket.new.from_socket(ssl_sock) - end - # Build an HTTP request line and headers. def build_request(uri, headers) ret = "GET #{uri.request_uri} HTTP/1.1\r\n" @@ -83,9 +44,46 @@ def build_request(uri, headers) } ret + "\r\n" end + end + + # + # Used internally to send the HTTP request, including the proxy dialogue if necessary. + # + class HTTPConnectionFactory + def self.connect(uri, proxy, connect_timeout, read_timeout) + if !proxy + return open_socket(uri, connect_timeout) + end + + socket = open_socket(proxy, connect_timeout) + socket.write(build_proxy_request(uri, proxy)) + + # temporarily create a reader just for the proxy connect response + proxy_reader = HTTPResponseReader.new(socket, read_timeout) + if proxy_reader.status != 200 + throw ProxyError, "proxy connection refused, status #{proxy_reader.status}" + end + + # start using TLS at this point if appropriate + if uri.scheme.downcase == 'https' + wrap_socket_in_ssl_socket(socket) + else + socket + end + end + + private + + def self.open_socket(uri, connect_timeout) + if uri.scheme.downcase == 'https' + Socketry::SSL::Socket.connect(uri.host, uri.port, timeout: connect_timeout) + else + Socketry::TCP::Socket.connect(uri.host, uri.port, timeout: connect_timeout) + end + end # Build a proxy connection header. - def build_proxy_request(uri, proxy) + def self.build_proxy_request(uri, proxy) ret = "CONNECT #{uri.host}:#{uri.port} HTTP/1.1\r\n" ret << "Host: #{uri.host}:#{uri.port}\r\n" if proxy.user || proxy.password @@ -95,6 +93,19 @@ def build_proxy_request(uri, proxy) ret << "\r\n" ret end + + def self.wrap_socket_in_ssl_socket(socket) + io = IO.try_convert(socket) + ssl_sock = OpenSSL::SSL::SSLSocket.new(io, OpenSSL::SSL::SSLContext.new) + ssl_sock.connect + Socketry::SSL::Socket.new.from_socket(ssl_sock) + end + end + + class ProxyError < StandardError + def initialize(message) + super + end end # From 06c3214bf57f37acbc635aece68606fe960d4540 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Mon, 25 Jun 2018 12:21:10 -0700 Subject: [PATCH 50/50] fix raising of ProxyError, add unit test --- lib/sse_client/streaming_http.rb | 2 +- spec/sse_client/sse_shared.rb | 4 ++++ spec/sse_client/streaming_http_spec.rb | 12 ++++++++++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/lib/sse_client/streaming_http.rb b/lib/sse_client/streaming_http.rb index bd2f128c..1c0ed52b 100644 --- a/lib/sse_client/streaming_http.rb +++ b/lib/sse_client/streaming_http.rb @@ -61,7 +61,7 @@ def self.connect(uri, proxy, connect_timeout, read_timeout) # temporarily create a reader just for the proxy connect response proxy_reader = HTTPResponseReader.new(socket, read_timeout) if proxy_reader.status != 200 - throw ProxyError, "proxy connection refused, status #{proxy_reader.status}" + raise ProxyError, "proxy connection refused, status #{proxy_reader.status}" end # start using TLS at this point if appropriate diff --git a/spec/sse_client/sse_shared.rb b/spec/sse_client/sse_shared.rb index 66f79742..3ecabb57 100644 --- a/spec/sse_client/sse_shared.rb +++ b/spec/sse_client/sse_shared.rb @@ -42,6 +42,7 @@ def setup_response(uri_path, &action) class StubProxyServer < StubHTTPServer attr_reader :request_count + attr_accessor :connect_status def initialize super @@ -55,6 +56,9 @@ def create_server(port) AccessLog: [], Logger: NullLogger.new, ProxyContentHandler: proc do |req,res| + if !@connect_status.nil? + res.status = @connect_status + end @request_count += 1 end ) diff --git a/spec/sse_client/streaming_http_spec.rb b/spec/sse_client/streaming_http_spec.rb index cb5afd40..7dfac9bd 100644 --- a/spec/sse_client/streaming_http_spec.rb +++ b/spec/sse_client/streaming_http_spec.rb @@ -118,6 +118,18 @@ def with_connection(cxn) end end + it "throws error if proxy responds with error status" do + with_server do |server| + server.setup_response("/") do |req,res| + res.body = body + end + with_server(StubProxyServer.new) do |proxy| + proxy.connect_status = 403 + expect { subject.new(server.base_uri, proxy.base_uri, {}, 30, 30) }.to raise_error(SSE::ProxyError) + end + end + end + # The following 2 tests were originally written to connect to an embedded HTTPS server made with # WEBrick. Unfortunately, some unknown problem prevents WEBrick's self-signed certificate feature # from working in JRuby 9.1 (but not in any other Ruby version). Therefore these tests currently