From 602c5e6dc3b41f2dbae982913a39255d90e9101d Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Tue, 10 Mar 2020 13:01:45 -0700 Subject: [PATCH] implement diagnostic events in Ruby (#130) --- lib/ldclient-rb/config.rb | 64 ++ lib/ldclient-rb/events.rb | 180 ++-- lib/ldclient-rb/flags_state.rb | 2 +- lib/ldclient-rb/impl/diagnostic_events.rb | 130 +++ lib/ldclient-rb/impl/event_sender.rb | 72 ++ lib/ldclient-rb/impl/util.rb | 19 + lib/ldclient-rb/ldclient.rb | 21 +- lib/ldclient-rb/requestor.rb | 3 +- lib/ldclient-rb/stream.rb | 23 +- spec/diagnostic_events_spec.rb | 163 +++ spec/evaluation_spec.rb | 2 +- spec/event_sender_spec.rb | 179 ++++ spec/events_spec.rb | 961 ++++++++---------- spec/file_data_source_spec.rb | 2 +- spec/http_util.rb | 17 +- .../integrations/consul_feature_store_spec.rb | 2 - .../dynamodb_feature_store_spec.rb | 2 - spec/ldclient_spec.rb | 2 +- spec/polling_spec.rb | 2 +- spec/redis_feature_store_spec.rb | 3 - spec/requestor_spec.rb | 24 +- spec/spec_helper.rb | 3 + 22 files changed, 1237 insertions(+), 639 deletions(-) create mode 100644 lib/ldclient-rb/impl/diagnostic_events.rb create mode 100644 lib/ldclient-rb/impl/event_sender.rb create mode 100644 lib/ldclient-rb/impl/util.rb create mode 100644 spec/diagnostic_events_spec.rb create mode 100644 spec/event_sender_spec.rb diff --git a/lib/ldclient-rb/config.rb b/lib/ldclient-rb/config.rb index c7c42e56..f3612756 100644 --- a/lib/ldclient-rb/config.rb +++ b/lib/ldclient-rb/config.rb @@ -37,6 +37,10 @@ class Config # @option opts [Object] :data_source See {#data_source}. # @option opts [Object] :update_processor Obsolete synonym for `data_source`. # @option opts [Object] :update_processor_factory Obsolete synonym for `data_source`. + # @option opts [Boolean] :diagnostic_opt_out (false) See {#diagnostic_opt_out?}. + # @option opts [Float] :diagnostic_recording_interval (900) See {#diagnostic_recording_interval}. + # @option opts [String] :wrapper_name See {#wrapper_name}. + # @option opts [String] :wrapper_version See {#wrapper_version}. # def initialize(opts = {}) @base_uri = (opts[:base_uri] || Config.default_base_uri).chomp("/") @@ -62,6 +66,11 @@ def initialize(opts = {}) @data_source = opts[:data_source] || opts[:update_processor] || opts[:update_processor_factory] @update_processor = opts[:update_processor] @update_processor_factory = opts[:update_processor_factory] + @diagnostic_opt_out = opts.has_key?(:diagnostic_opt_out) && opts[:diagnostic_opt_out] + @diagnostic_recording_interval = opts.has_key?(:diagnostic_recording_interval) && opts[:diagnostic_recording_interval] > Config.minimum_diagnostic_recording_interval ? + opts[:diagnostic_recording_interval] : Config.default_diagnostic_recording_interval + @wrapper_name = opts[:wrapper_name] + @wrapper_version = opts[:wrapper_version] end # @@ -257,6 +266,45 @@ def offline? # @deprecated This is replaced by {#data_source}. attr_reader :update_processor_factory + # + # Set to true to opt out of sending diagnostics data. + # + # Unless `diagnostic_opt_out` is set to true, the client will send some diagnostics data to the LaunchDarkly servers + # in order to assist in the development of future SDK improvements. These diagnostics consist of an initial payload + # containing some details of the SDK in use, the SDK's configuration, and the platform the SDK is being run on, as + # well as periodic information on irregular occurrences such as dropped events. + # @return [Boolean] + # + def diagnostic_opt_out? + @diagnostic_opt_out + end + + # + # The interval at which periodic diagnostic data is sent, in seconds. + # + # The default is 900 (every 15 minutes) and the minimum value is 60 (every minute). + # @return [Float] + # + attr_reader :diagnostic_recording_interval + + # + # For use by wrapper libraries to set an identifying name for the wrapper being used. + # + # This will be sent in User-Agent headers during requests to the LaunchDarkly servers to allow recording + # metrics on the usage of these wrapper libraries. + # @return [String] + # + attr_reader :wrapper_name + + # + # For use by wrapper libraries to report the version of the library in use. + # + # If `wrapper_name` is not set, this field will be ignored. Otherwise the version string will be included in + # the User-Agent headers along with the `wrapper_name` during requests to the LaunchDarkly servers. + # @return [String] + # + attr_reader :wrapper_version + # # The default LaunchDarkly client configuration. This configuration sets # reasonable defaults for most users. @@ -407,5 +455,21 @@ def self.default_user_keys_capacity def self.default_user_keys_flush_interval 300 end + + # + # The default value for {#diagnostic_recording_interval}. + # @return [Float] 900 + # + def self.default_diagnostic_recording_interval + 900 + end + + # + # The minimum value for {#diagnostic_recording_interval}. + # @return [Float] 60 + # + def self.minimum_diagnostic_recording_interval + 60 + end end end diff --git a/lib/ldclient-rb/events.rb b/lib/ldclient-rb/events.rb index bb12f6ec..9313b670 100644 --- a/lib/ldclient-rb/events.rb +++ b/lib/ldclient-rb/events.rb @@ -1,7 +1,10 @@ +require "ldclient-rb/impl/diagnostic_events" +require "ldclient-rb/impl/event_sender" +require "ldclient-rb/impl/util" + require "concurrent" require "concurrent/atomics" require "concurrent/executors" -require "securerandom" require "thread" require "time" @@ -24,12 +27,10 @@ module LaunchDarkly MAX_FLUSH_WORKERS = 5 - CURRENT_SCHEMA_VERSION = 3 USER_ATTRS_TO_STRINGIFY_FOR_EVENTS = [ :key, :secondary, :ip, :country, :email, :firstName, :lastName, :avatar, :name ] private_constant :MAX_FLUSH_WORKERS - private_constant :CURRENT_SCHEMA_VERSION private_constant :USER_ATTRS_TO_STRINGIFY_FOR_EVENTS # @private @@ -60,6 +61,10 @@ class FlushMessage class FlushUsersMessage end + # @private + class DiagnosticEventMessage + end + # @private class SynchronousMessage def initialize @@ -85,9 +90,9 @@ class StopMessage < SynchronousMessage # @private class EventProcessor - def initialize(sdk_key, config, client = nil) + def initialize(sdk_key, config, client = nil, diagnostic_accumulator = nil, test_properties = nil) @logger = config.logger - @inbox = SizedQueue.new(config.capacity) + @inbox = SizedQueue.new(config.capacity < 100 ? 100 : config.capacity) @flush_task = Concurrent::TimerTask.new(execution_interval: config.flush_interval) do post_to_inbox(FlushMessage.new) end @@ -96,14 +101,29 @@ def initialize(sdk_key, config, client = nil) post_to_inbox(FlushUsersMessage.new) end @users_flush_task.execute + if !diagnostic_accumulator.nil? + interval = test_properties && test_properties.has_key?(:diagnostic_recording_interval) ? + test_properties[:diagnostic_recording_interval] : + config.diagnostic_recording_interval + @diagnostic_event_task = Concurrent::TimerTask.new(execution_interval: interval) do + post_to_inbox(DiagnosticEventMessage.new) + end + @diagnostic_event_task.execute + else + @diagnostic_event_task = nil + end @stopped = Concurrent::AtomicBoolean.new(false) @inbox_full = Concurrent::AtomicBoolean.new(false) - EventDispatcher.new(@inbox, sdk_key, config, client) + event_sender = test_properties && test_properties.has_key?(:event_sender) ? + test_properties[:event_sender] : + Impl::EventSender.new(sdk_key, config, client ? client : Util.new_http_client(config.events_uri, config)) + + EventDispatcher.new(@inbox, sdk_key, config, diagnostic_accumulator, event_sender) end def add_event(event) - event[:creationDate] = (Time.now.to_f * 1000).to_i + event[:creationDate] = Impl::Util.current_time_millis post_to_inbox(EventMessage.new(event)) end @@ -117,6 +137,7 @@ def stop if @stopped.make_true @flush_task.shutdown @users_flush_task.shutdown + @diagnostic_event_task.shutdown if !@diagnostic_event_task.nil? # Note that here we are not calling post_to_inbox, because we *do* want to wait if the inbox # is full; an orderly shutdown can't happen unless these messages are received. @inbox << FlushMessage.new @@ -152,34 +173,36 @@ def post_to_inbox(message) # @private class EventDispatcher - def initialize(inbox, sdk_key, config, client) + def initialize(inbox, sdk_key, config, diagnostic_accumulator, event_sender) @sdk_key = sdk_key @config = config - - if client - @client = client - else - @client = Util.new_http_client(@config.events_uri, @config) - end + @diagnostic_accumulator = config.diagnostic_opt_out? ? nil : diagnostic_accumulator + @event_sender = event_sender @user_keys = SimpleLRUCacheSet.new(config.user_keys_capacity) @formatter = EventOutputFormatter.new(config) @disabled = Concurrent::AtomicBoolean.new(false) @last_known_past_time = Concurrent::AtomicReference.new(0) - + @deduplicated_users = 0 + @events_in_last_batch = 0 + outbox = EventBuffer.new(config.capacity, config.logger) flush_workers = NonBlockingThreadPool.new(MAX_FLUSH_WORKERS) - Thread.new { main_loop(inbox, outbox, flush_workers) } + if !@diagnostic_accumulator.nil? + diagnostic_event_workers = NonBlockingThreadPool.new(1) + init_event = @diagnostic_accumulator.create_init_event(config) + send_diagnostic_event(init_event, diagnostic_event_workers) + else + diagnostic_event_workers = nil + end + + Thread.new { main_loop(inbox, outbox, flush_workers, diagnostic_event_workers) } end private - def now_millis() - (Time.now.to_f * 1000).to_i - end - - def main_loop(inbox, outbox, flush_workers) + def main_loop(inbox, outbox, flush_workers, diagnostic_event_workers) running = true while running do begin @@ -191,11 +214,13 @@ def main_loop(inbox, outbox, flush_workers) trigger_flush(outbox, flush_workers) when FlushUsersMessage @user_keys.clear + when DiagnosticEventMessage + send_and_reset_diagnostics(outbox, diagnostic_event_workers) when TestSyncMessage - synchronize_for_testing(flush_workers) + synchronize_for_testing(flush_workers, diagnostic_event_workers) message.completed when StopMessage - do_shutdown(flush_workers) + do_shutdown(flush_workers, diagnostic_event_workers) running = false message.completed end @@ -205,18 +230,23 @@ def main_loop(inbox, outbox, flush_workers) end end - def do_shutdown(flush_workers) + def do_shutdown(flush_workers, diagnostic_event_workers) flush_workers.shutdown flush_workers.wait_for_termination + if !diagnostic_event_workers.nil? + diagnostic_event_workers.shutdown + diagnostic_event_workers.wait_for_termination + end begin @client.finish rescue end end - def synchronize_for_testing(flush_workers) + def synchronize_for_testing(flush_workers, diagnostic_event_workers) # Used only by unit tests. Wait until all active flush workers have finished. flush_workers.wait_all + diagnostic_event_workers.wait_all if !diagnostic_event_workers.nil? end def dispatch_event(event, outbox) @@ -260,7 +290,9 @@ def notice_user(user) if user.nil? || !user.has_key?(:key) true else - @user_keys.add(user[:key].to_s) + known = @user_keys.add(user[:key].to_s) + @deduplicated_users += 1 if known + known end end @@ -268,7 +300,7 @@ def should_debug_event(event) debug_until = event[:debugEventsUntilDate] if !debug_until.nil? last_past = @last_known_past_time.value - debug_until > last_past && debug_until > now_millis + debug_until > last_past && debug_until > Impl::Util.current_time_millis else false end @@ -281,34 +313,44 @@ def trigger_flush(outbox, flush_workers) payload = outbox.get_payload if !payload.events.empty? || !payload.summary.counters.empty? + count = payload.events.length + (payload.summary.counters.empty? ? 0 : 1) + @events_in_last_batch = count # If all available worker threads are busy, success will be false and no job will be queued. success = flush_workers.post do begin - resp = EventPayloadSendTask.new.run(@sdk_key, @config, @client, payload, @formatter) - handle_response(resp) if !resp.nil? + events_out = @formatter.make_output_events(payload.events, payload.summary) + result = @event_sender.send_event_data(events_out.to_json, false) + @disabled.value = true if result.must_shutdown + if !result.time_from_server.nil? + @last_known_past_time.value = (result.time_from_server.to_f * 1000).to_i + end rescue => e Util.log_exception(@config.logger, "Unexpected error in event processor", e) end end outbox.clear if success # Reset our internal state, these events now belong to the flush worker + else + @events_in_last_batch = 0 end end - def handle_response(res) - status = res.code.to_i - if status >= 400 - message = Util.http_error_message(status, "event delivery", "some events were dropped") - @config.logger.error { "[LDClient] #{message}" } - if !Util.http_error_recoverable?(status) - @disabled.value = true - end - else - if !res["date"].nil? - begin - res_time = (Time.httpdate(res["date"]).to_f * 1000).to_i - @last_known_past_time.value = res_time - rescue ArgumentError - end + def send_and_reset_diagnostics(outbox, diagnostic_event_workers) + return if @diagnostic_accumulator.nil? + dropped_count = outbox.get_and_clear_dropped_count + event = @diagnostic_accumulator.create_periodic_event_and_reset(dropped_count, @deduplicated_users, @events_in_last_batch) + @deduplicated_users = 0 + @events_in_last_batch = 0 + send_diagnostic_event(event, diagnostic_event_workers) + end + + def send_diagnostic_event(event, diagnostic_event_workers) + return if diagnostic_event_workers.nil? + uri = URI(@config.events_uri + "/diagnostic") + diagnostic_event_workers.post do + begin + @event_sender.send_event_data(event.to_json, true) + rescue => e + Util.log_exception(@config.logger, "Unexpected error in event processor", e) end end end @@ -323,6 +365,7 @@ def initialize(capacity, logger) @capacity = capacity @logger = logger @capacity_exceeded = false + @dropped_events = 0 @events = [] @summarizer = EventSummarizer.new end @@ -333,6 +376,7 @@ def add_event(event) @events.push(event) @capacity_exceeded = false else + @dropped_events += 1 if !@capacity_exceeded @capacity_exceeded = true @logger.warn { "[LDClient] Exceeded event queue capacity. Increase capacity to avoid dropping events." } @@ -348,54 +392,18 @@ def get_payload return FlushPayload.new(@events, @summarizer.snapshot) end + def get_and_clear_dropped_count + ret = @dropped_events + @dropped_events = 0 + ret + end + def clear @events = [] @summarizer.clear end end - # @private - class EventPayloadSendTask - def run(sdk_key, config, client, payload, formatter) - events_out = formatter.make_output_events(payload.events, payload.summary) - res = nil - body = events_out.to_json - payload_id = SecureRandom.uuid - (0..1).each do |attempt| - if attempt > 0 - config.logger.warn { "[LDClient] Will retry posting events after 1 second" } - sleep(1) - end - begin - client.start if !client.started? - config.logger.debug { "[LDClient] sending #{events_out.length} events: #{body}" } - uri = URI(config.events_uri + "/bulk") - req = Net::HTTP::Post.new(uri) - req.content_type = "application/json" - req.body = body - req["Authorization"] = sdk_key - req["User-Agent"] = "RubyClient/" + LaunchDarkly::VERSION - req["X-LaunchDarkly-Event-Schema"] = CURRENT_SCHEMA_VERSION.to_s - req["X-LaunchDarkly-Payload-ID"] = payload_id - req["Connection"] = "keep-alive" - res = client.request(req) - rescue StandardError => exn - config.logger.warn { "[LDClient] Error flushing events: #{exn.inspect}." } - next - end - status = res.code.to_i - if status < 200 || status >= 300 - if Util.http_error_recoverable?(status) - next - end - end - break - end - # used up our retries, return the last response if any - res - end - end - # @private class EventOutputFormatter def initialize(config) diff --git a/lib/ldclient-rb/flags_state.rb b/lib/ldclient-rb/flags_state.rb index 4efe1404..496ad61b 100644 --- a/lib/ldclient-rb/flags_state.rb +++ b/lib/ldclient-rb/flags_state.rb @@ -22,7 +22,7 @@ def add_flag(flag, value, variation, reason = nil, details_only_if_tracked = fal meta = {} with_details = !details_only_if_tracked || flag[:trackEvents] if !with_details && flag[:debugEventsUntilDate] - with_details = flag[:debugEventsUntilDate] > (Time.now.to_f * 1000).to_i + with_details = flag[:debugEventsUntilDate] > Impl::Util::current_time_millis end if with_details meta[:version] = flag[:version] diff --git a/lib/ldclient-rb/impl/diagnostic_events.rb b/lib/ldclient-rb/impl/diagnostic_events.rb new file mode 100644 index 00000000..4c61a905 --- /dev/null +++ b/lib/ldclient-rb/impl/diagnostic_events.rb @@ -0,0 +1,130 @@ +require "ldclient-rb/impl/util" + +require "rbconfig" +require "securerandom" + +module LaunchDarkly + module Impl + class DiagnosticAccumulator + def self.create_diagnostic_id(sdk_key) + { + diagnosticId: SecureRandom.uuid, + sdkKeySuffix: sdk_key[-6..-1] || sdk_key + } + end + + def initialize(diagnostic_id) + @id = diagnostic_id + @lock = Mutex.new + self.reset(Util.current_time_millis) + end + + def reset(time) + @data_since_date = time + @stream_inits = [] + end + + def create_init_event(config) + return { + kind: 'diagnostic-init', + creationDate: Util.current_time_millis, + id: @id, + configuration: DiagnosticAccumulator.make_config_data(config), + sdk: DiagnosticAccumulator.make_sdk_data(config), + platform: DiagnosticAccumulator.make_platform_data + } + end + + def record_stream_init(timestamp, failed, duration_millis) + @lock.synchronize do + @stream_inits.push({ timestamp: timestamp, failed: failed, durationMillis: duration_millis }) + end + end + + def create_periodic_event_and_reset(dropped_events, deduplicated_users, events_in_last_batch) + previous_stream_inits = @lock.synchronize do + si = @stream_inits + @stream_inits = [] + si + end + + current_time = Util.current_time_millis + event = { + kind: 'diagnostic', + creationDate: current_time, + id: @id, + dataSinceDate: @data_since_date, + droppedEvents: dropped_events, + deduplicatedUsers: deduplicated_users, + eventsInLastBatch: events_in_last_batch, + streamInits: previous_stream_inits + } + @data_since_date = current_time + event + end + + def self.make_config_data(config) + ret = { + allAttributesPrivate: config.all_attributes_private, + connectTimeoutMillis: self.seconds_to_millis(config.connect_timeout), + customBaseURI: config.base_uri != Config.default_base_uri, + customEventsURI: config.events_uri != Config.default_events_uri, + customStreamURI: config.stream_uri != Config.default_stream_uri, + diagnosticRecordingIntervalMillis: self.seconds_to_millis(config.diagnostic_recording_interval), + eventsCapacity: config.capacity, + eventsFlushIntervalMillis: self.seconds_to_millis(config.flush_interval), + inlineUsersInEvents: config.inline_users_in_events, + pollingIntervalMillis: self.seconds_to_millis(config.poll_interval), + socketTimeoutMillis: self.seconds_to_millis(config.read_timeout), + streamingDisabled: !config.stream?, + userKeysCapacity: config.user_keys_capacity, + userKeysFlushIntervalMillis: self.seconds_to_millis(config.user_keys_flush_interval), + usingProxy: ENV.has_key?('http_proxy') || ENV.has_key?('https_proxy') || ENV.has_key?('HTTP_PROXY'), + usingRelayDaemon: config.use_ldd?, + } + ret + end + + def self.make_sdk_data(config) + ret = { + name: 'ruby-server-sdk', + version: LaunchDarkly::VERSION + } + if config.wrapper_name + ret[:wrapperName] = config.wrapper_name + ret[:wrapperVersion] = config.wrapper_version + end + ret + end + + def self.make_platform_data + conf = RbConfig::CONFIG + { + name: 'ruby', + osArch: conf['host_cpu'], + osName: self.normalize_os_name(conf['host_os']), + osVersion: 'unknown', # there seems to be no portable way to detect this in Ruby + rubyVersion: conf['ruby_version'], + rubyImplementation: Object.constants.include?(:RUBY_ENGINE) ? RUBY_ENGINE : 'unknown' + } + end + + def self.normalize_os_name(name) + case name + when /linux|arch/i + 'Linux' + when /darwin/i + 'MacOS' + when /mswin|windows/i + 'Windows' + else + name + end + end + + def self.seconds_to_millis(s) + (s * 1000).to_i + end + end + end +end diff --git a/lib/ldclient-rb/impl/event_sender.rb b/lib/ldclient-rb/impl/event_sender.rb new file mode 100644 index 00000000..834cd3a3 --- /dev/null +++ b/lib/ldclient-rb/impl/event_sender.rb @@ -0,0 +1,72 @@ +require "securerandom" + +module LaunchDarkly + module Impl + EventSenderResult = Struct.new(:success, :must_shutdown, :time_from_server) + + class EventSender + CURRENT_SCHEMA_VERSION = 3 + DEFAULT_RETRY_INTERVAL = 1 + + def initialize(sdk_key, config, http_client = nil, retry_interval = DEFAULT_RETRY_INTERVAL) + @client = http_client ? http_client : LaunchDarkly::Util.new_http_client(config.events_uri, config) + @sdk_key = sdk_key + @config = config + @events_uri = config.events_uri + "/bulk" + @diagnostic_uri = config.events_uri + "/diagnostic" + @logger = config.logger + @retry_interval = retry_interval + end + + def send_event_data(event_data, is_diagnostic) + uri = is_diagnostic ? @diagnostic_uri : @events_uri + payload_id = is_diagnostic ? nil : SecureRandom.uuid + description = is_diagnostic ? 'diagnostic event' : "#{event_data.length} events" + res = nil + (0..1).each do |attempt| + if attempt > 0 + @logger.warn { "[LDClient] Will retry posting events after #{@retry_interval} second" } + sleep(@retry_interval) + end + begin + @client.start if !@client.started? + @logger.debug { "[LDClient] sending #{description}: #{body}" } + req = Net::HTTP::Post.new(uri) + req.content_type = "application/json" + req.body = event_data + Impl::Util.default_http_headers(@sdk_key, @config).each { |k, v| req[k] = v } + if !is_diagnostic + req["X-LaunchDarkly-Event-Schema"] = CURRENT_SCHEMA_VERSION.to_s + req["X-LaunchDarkly-Payload-ID"] = payload_id + end + req["Connection"] = "keep-alive" + res = @client.request(req) + rescue StandardError => exn + @logger.warn { "[LDClient] Error sending events: #{exn.inspect}." } + next + end + status = res.code.to_i + if status >= 200 && status < 300 + res_time = nil + if !res["date"].nil? + begin + res_time = Time.httpdate(res["date"]) + rescue ArgumentError + end + end + return EventSenderResult.new(true, false, res_time) + end + must_shutdown = !LaunchDarkly::Util.http_error_recoverable?(status) + can_retry = !must_shutdown && attempt == 0 + message = LaunchDarkly::Util.http_error_message(status, "event delivery", can_retry ? "will retry" : "some events were dropped") + @logger.error { "[LDClient] #{message}" } + if must_shutdown + return EventSenderResult.new(false, true, nil) + end + end + # used up our retries + return EventSenderResult.new(false, false, nil) + end + end + end +end diff --git a/lib/ldclient-rb/impl/util.rb b/lib/ldclient-rb/impl/util.rb new file mode 100644 index 00000000..d1197afe --- /dev/null +++ b/lib/ldclient-rb/impl/util.rb @@ -0,0 +1,19 @@ + +module LaunchDarkly + module Impl + module Util + def self.current_time_millis + (Time.now.to_f * 1000).to_i + end + + def self.default_http_headers(sdk_key, config) + ret = { "Authorization" => sdk_key, "User-Agent" => "RubyClient/" + LaunchDarkly::VERSION } + if config.wrapper_name + ret["X-LaunchDarkly-Wrapper"] = config.wrapper_name + + (config.wrapper_version ? "/" + config.wrapper_version : "") + end + ret + end + end + end +end diff --git a/lib/ldclient-rb/ldclient.rb b/lib/ldclient-rb/ldclient.rb index b7c2ee85..06db4f00 100644 --- a/lib/ldclient-rb/ldclient.rb +++ b/lib/ldclient-rb/ldclient.rb @@ -1,3 +1,4 @@ +require "ldclient-rb/impl/diagnostic_events" require "ldclient-rb/impl/event_factory" require "ldclient-rb/impl/store_client_wrapper" require "concurrent/atomics" @@ -46,10 +47,16 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5) updated_config.instance_variable_set(:@feature_store, @store) @config = updated_config + if !@config.offline? && @config.send_events && !@config.diagnostic_opt_out? + diagnostic_accumulator = Impl::DiagnosticAccumulator.new(Impl::DiagnosticAccumulator.create_diagnostic_id(sdk_key)) + else + diagnostic_accumulator = nil + end + if @config.offline? || !@config.send_events @event_processor = NullEventProcessor.new else - @event_processor = EventProcessor.new(sdk_key, config) + @event_processor = EventProcessor.new(sdk_key, config, diagnostic_accumulator) end if @config.use_ldd? @@ -59,7 +66,13 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5) data_source_or_factory = @config.data_source || self.method(:create_default_data_source) if data_source_or_factory.respond_to? :call - @data_source = data_source_or_factory.call(sdk_key, @config) + # Currently, data source factories take two parameters unless they need to be aware of diagnostic_accumulator, in + # which case they take three parameters. This will be changed in the future to use a less awkware mechanism. + if data_source_or_factory.arity == 3 + @data_source = data_source_or_factory.call(sdk_key, @config, diagnostic_accumulator) + else + @data_source = data_source_or_factory.call(sdk_key, @config) + end else @data_source = data_source_or_factory end @@ -335,13 +348,13 @@ def close private - def create_default_data_source(sdk_key, config) + def create_default_data_source(sdk_key, config, diagnostic_accumulator) if config.offline? return NullUpdateProcessor.new end requestor = Requestor.new(sdk_key, config) if config.stream? - StreamProcessor.new(sdk_key, config, requestor) + StreamProcessor.new(sdk_key, config, requestor, diagnostic_accumulator) else config.logger.info { "Disabling streaming API" } config.logger.warn { "You should only disable the streaming API if instructed to do so by LaunchDarkly support" } diff --git a/lib/ldclient-rb/requestor.rb b/lib/ldclient-rb/requestor.rb index f7174787..eae0a193 100644 --- a/lib/ldclient-rb/requestor.rb +++ b/lib/ldclient-rb/requestor.rb @@ -51,8 +51,7 @@ def make_request(path) @client.start if !@client.started? uri = URI(@config.base_uri + path) req = Net::HTTP::Get.new(uri) - req["Authorization"] = @sdk_key - req["User-Agent"] = "RubyClient/" + LaunchDarkly::VERSION + Impl::Util.default_http_headers(@sdk_key, @config).each { |k, v| req[k] = v } req["Connection"] = "keep-alive" cached = @cache.read(uri) if !cached.nil? diff --git a/lib/ldclient-rb/stream.rb b/lib/ldclient-rb/stream.rb index ddb7f669..e27fad32 100644 --- a/lib/ldclient-rb/stream.rb +++ b/lib/ldclient-rb/stream.rb @@ -24,7 +24,7 @@ module LaunchDarkly # @private class StreamProcessor - def initialize(sdk_key, config, requestor) + def initialize(sdk_key, config, requestor, diagnostic_accumulator = nil) @sdk_key = sdk_key @config = config @feature_store = config.feature_store @@ -33,6 +33,7 @@ def initialize(sdk_key, config, requestor) @started = Concurrent::AtomicBoolean.new(false) @stopped = Concurrent::AtomicBoolean.new(false) @ready = Concurrent::Event.new + @connection_attempt_start_time = 0 end def initialized? @@ -44,18 +45,17 @@ def start @config.logger.info { "[LDClient] Initializing stream connection" } - headers = { - 'Authorization' => @sdk_key, - 'User-Agent' => 'RubyClient/' + LaunchDarkly::VERSION - } + headers = Impl::Util.default_http_headers(@sdk_key, @config) opts = { headers: headers, read_timeout: READ_TIMEOUT_SECONDS, logger: @config.logger } + log_connection_started @es = SSE::Client.new(@config.stream_uri + "/all", **opts) do |conn| conn.on_event { |event| process_message(event) } conn.on_error { |err| + log_connection_result(false) case err when SSE::Errors::HTTPStatusError status = err.status @@ -82,6 +82,7 @@ def stop private def process_message(message) + log_connection_result(true) method = message.type @config.logger.debug { "[LDClient] Stream received #{method} message: #{message.data}" } if method == PUT @@ -137,5 +138,17 @@ def process_message(message) def key_for_path(kind, path) path.start_with?(KEY_PATHS[kind]) ? path[KEY_PATHS[kind].length..-1] : nil end + + def log_connection_started + @connection_attempt_start_time = Impl::Util::current_time_millis + end + + def log_connection_result(is_success) + if !@diagnostic_accumulator.nil? && @connection_attempt_start_time > 0 + @diagnostic_accumulator.record_stream_init(@connection_attempt_start_time, !is_success, + Impl::Util::current_time_millis - @connection_attempt_start_time) + @connection_attempt_start_time = 0 + end + end end end diff --git a/spec/diagnostic_events_spec.rb b/spec/diagnostic_events_spec.rb new file mode 100644 index 00000000..0c4ef058 --- /dev/null +++ b/spec/diagnostic_events_spec.rb @@ -0,0 +1,163 @@ +require "ldclient-rb/impl/diagnostic_events" + +require "spec_helper" + +module LaunchDarkly + module Impl + describe DiagnosticAccumulator do + subject { DiagnosticAccumulator } + + let(:sdk_key) { "sdk_key" } + let(:default_id) { subject.create_diagnostic_id("my-key") } + let(:default_acc) { subject.new(default_id) } + + it "creates unique ID with SDK key suffix" do + id1 = subject.create_diagnostic_id("1234567890") + expect(id1[:sdkKeySuffix]).to eq "567890" + expect(id1[:diagnosticId]).not_to be_nil + + id2 = subject.create_diagnostic_id("1234567890") + expect(id2[:diagnosticId]).not_to eq id1[:diagnosticId] + end + + describe "init event" do + def expected_default_config + { + allAttributesPrivate: false, + connectTimeoutMillis: Config.default_connect_timeout * 1000, + customBaseURI: false, + customEventsURI: false, + customStreamURI: false, + diagnosticRecordingIntervalMillis: Config.default_diagnostic_recording_interval * 1000, + eventsCapacity: Config.default_capacity, + eventsFlushIntervalMillis: Config.default_flush_interval * 1000, + inlineUsersInEvents: false, + pollingIntervalMillis: Config.default_poll_interval * 1000, + socketTimeoutMillis: Config.default_read_timeout * 1000, + streamingDisabled: false, + userKeysCapacity: Config.default_user_keys_capacity, + userKeysFlushIntervalMillis: Config.default_user_keys_flush_interval * 1000, + usingProxy: false, + usingRelayDaemon: false + } + end + + it "has basic fields" do + event = default_acc.create_init_event(Config.new) + expect(event[:kind]).to eq 'diagnostic-init' + expect(event[:creationDate]).not_to be_nil + expect(event[:id]).to eq default_id + end + + it "can have default config data" do + event = default_acc.create_init_event(Config.new) + expect(event[:configuration]).to eq expected_default_config + end + + it "can have custom config data" do + changes_and_expected = [ + [ { all_attributes_private: true }, { allAttributesPrivate: true } ], + [ { connect_timeout: 46 }, { connectTimeoutMillis: 46000 } ], + [ { base_uri: 'http://custom' }, { customBaseURI: true } ], + [ { events_uri: 'http://custom' }, { customEventsURI: true } ], + [ { stream_uri: 'http://custom' }, { customStreamURI: true } ], + [ { diagnostic_recording_interval: 9999 }, { diagnosticRecordingIntervalMillis: 9999000 } ], + [ { capacity: 4000 }, { eventsCapacity: 4000 } ], + [ { flush_interval: 46 }, { eventsFlushIntervalMillis: 46000 } ], + [ { inline_users_in_events: true }, { inlineUsersInEvents: true } ], + [ { poll_interval: 999 }, { pollingIntervalMillis: 999000 } ], + [ { read_timeout: 46 }, { socketTimeoutMillis: 46000 } ], + [ { stream: false }, { streamingDisabled: true } ], + [ { user_keys_capacity: 999 }, { userKeysCapacity: 999 } ], + [ { user_keys_flush_interval: 999 }, { userKeysFlushIntervalMillis: 999000 } ], + [ { use_ldd: true }, { usingRelayDaemon: true } ] + ] + changes_and_expected.each do |config_values, expected_values| + config = Config.new(config_values) + event = default_acc.create_init_event(config) + expect(event[:configuration]).to eq expected_default_config.merge(expected_values) + end + end + + it "detects proxy" do + begin + ENV["http_proxy"] = 'http://my-proxy' + event = default_acc.create_init_event(Config.new) + expect(event[:configuration][:usingProxy]).to be true + ensure + ENV["http_proxy"] = nil + end + end + + it "has expected SDK data" do + event = default_acc.create_init_event(Config.new) + expect(event[:sdk]).to eq ({ + name: 'ruby-server-sdk', + version: LaunchDarkly::VERSION + }) + end + + it "has expected SDK data with wrapper" do + event = default_acc.create_init_event(Config.new(wrapper_name: 'my-wrapper', wrapper_version: '2.0')) + expect(event[:sdk]).to eq ({ + name: 'ruby-server-sdk', + version: LaunchDarkly::VERSION, + wrapperName: 'my-wrapper', + wrapperVersion: '2.0' + }) + end + + it "has expected platform data" do + event = default_acc.create_init_event(Config.new) + expect(event[:platform]).to include ({ + name: 'ruby' + }) + end + end + + describe "periodic event" do + it "has correct default values" do + acc = subject.new(default_id) + event = acc.create_periodic_event_and_reset(2, 3, 4) + expect(event).to include({ + kind: 'diagnostic', + id: default_id, + droppedEvents: 2, + deduplicatedUsers: 3, + eventsInLastBatch: 4, + streamInits: [] + }) + expect(event[:creationDate]).not_to be_nil + expect(event[:dataSinceDate]).not_to be_nil + end + + it "can add stream init" do + acc = subject.new(default_id) + acc.record_stream_init(1000, false, 2000) + event = acc.create_periodic_event_and_reset(0, 0, 0) + expect(event[:streamInits]).to eq [{ timestamp: 1000, failed: false, durationMillis: 2000 }] + end + + it "resets fields after creating event" do + acc = subject.new(default_id) + acc.record_stream_init(1000, false, 2000) + event1 = acc.create_periodic_event_and_reset(2, 3, 4) + event2 = acc.create_periodic_event_and_reset(5, 6, 7) + expect(event1).to include ({ + droppedEvents: 2, + deduplicatedUsers: 3, + eventsInLastBatch: 4, + streamInits: [{ timestamp: 1000, failed: false, durationMillis: 2000 }] + }) + expect(event2).to include ({ + dataSinceDate: event1[:creationDate], + droppedEvents: 5, + deduplicatedUsers: 6, + eventsInLastBatch: 7, + streamInits: [] + }) + end + end + end + end +end diff --git a/spec/evaluation_spec.rb b/spec/evaluation_spec.rb index 2efbd745..14d5ed80 100644 --- a/spec/evaluation_spec.rb +++ b/spec/evaluation_spec.rb @@ -17,7 +17,7 @@ } } - let(:logger) { LaunchDarkly::Config.default_logger } + let(:logger) { $null_log } def boolean_flag_with_rules(rules) { key: 'feature', on: true, rules: rules, fallthrough: { variation: 0 }, variations: [ false, true ] } diff --git a/spec/event_sender_spec.rb b/spec/event_sender_spec.rb new file mode 100644 index 00000000..e99761b7 --- /dev/null +++ b/spec/event_sender_spec.rb @@ -0,0 +1,179 @@ +require "ldclient-rb/impl/event_sender" + +require "http_util" +require "spec_helper" + +require "time" + +module LaunchDarkly + module Impl + describe EventSender do + subject { EventSender } + + let(:sdk_key) { "sdk_key" } + let(:fake_data) { '{"things":[]}' } + + def make_sender(server) + subject.new(sdk_key, Config.new(events_uri: server.base_uri.to_s, logger: $null_log), nil, 0.1) + end + + def with_sender_and_server + with_server do |server| + yield make_sender(server), server + end + end + + it "sends analytics event data" do + with_sender_and_server do |es, server| + server.setup_ok_response("/bulk", "") + + result = es.send_event_data(fake_data, false) + + expect(result.success).to be true + expect(result.must_shutdown).to be false + expect(result.time_from_server).not_to be_nil + + req = server.await_request + expect(req.body).to eq fake_data + expect(req.header).to include({ + "authorization" => [ sdk_key ], + "content-type" => [ "application/json" ], + "user-agent" => [ "RubyClient/" + LaunchDarkly::VERSION ], + "x-launchdarkly-event-schema" => [ "3" ] + }) + expect(req.header['x-launchdarkly-payload-id']).not_to eq [] + end + end + + it "generates a new payload ID for each payload" do + with_sender_and_server do |es, server| + server.setup_ok_response("/bulk", "") + + result1 = es.send_event_data(fake_data, false) + result2 = es.send_event_data(fake_data, false) + expect(result1.success).to be true + expect(result2.success).to be true + + req1, body1 = server.await_request_with_body + req2, body2 = server.await_request_with_body + expect(body1).to eq fake_data + expect(body2).to eq fake_data + expect(req1.header['x-launchdarkly-payload-id']).not_to eq req2.header['x-launchdarkly-payload-id'] + end + end + + it "sends diagnostic event data" do + with_sender_and_server do |es, server| + server.setup_ok_response("/diagnostic", "") + + result = es.send_event_data(fake_data, true) + + expect(result.success).to be true + expect(result.must_shutdown).to be false + expect(result.time_from_server).not_to be_nil + + req, body = server.await_request_with_body + expect(body).to eq fake_data + expect(req.header).to include({ + "authorization" => [ sdk_key ], + "content-type" => [ "application/json" ], + "user-agent" => [ "RubyClient/" + LaunchDarkly::VERSION ], + }) + expect(req.header['x-launchdarkly-event-schema']).to eq [] + expect(req.header['x-launchdarkly-payload-id']).to eq [] + end + end + + it "can use a proxy server" do + with_server do |server| + server.setup_ok_response("/bulk", "") + + with_server(StubProxyServer.new) do |proxy| + begin + ENV["http_proxy"] = proxy.base_uri.to_s + + es = make_sender(server) + + result = es.send_event_data(fake_data, false) + + expect(result.success).to be true + + req, body = server.await_request_with_body + expect(body).to eq fake_data + ensure + ENV["http_proxy"] = nil + end + end + end + end + + [400, 408, 429, 500].each do |status| + it "handles recoverable error #{status}" do + with_sender_and_server do |es, server| + req_count = 0 + server.setup_response("/bulk") do |req, res| + req_count = req_count + 1 + res.status = req_count == 2 ? 200 : status + end + + result = es.send_event_data(fake_data, false) + + expect(result.success).to be true + expect(result.must_shutdown).to be false + expect(result.time_from_server).not_to be_nil + + expect(server.requests.count).to eq 2 + req1, body1 = server.await_request_with_body + req2, body2 = server.await_request_with_body + expect(body1).to eq fake_data + expect(body2).to eq fake_data + expect(req1.header['x-launchdarkly-payload-id']).to eq req2.header['x-launchdarkly-payload-id'] + end + end + end + + [400, 408, 429, 500].each do |status| + it "only retries error #{status} once" do + with_sender_and_server do |es, server| + req_count = 0 + server.setup_response("/bulk") do |req, res| + req_count = req_count + 1 + res.status = req_count == 3 ? 200 : status + end + + result = es.send_event_data(fake_data, false) + + expect(result.success).to be false + expect(result.must_shutdown).to be false + expect(result.time_from_server).to be_nil + + expect(server.requests.count).to eq 2 + req1, body1 = server.await_request_with_body + req2, body2 = server.await_request_with_body + expect(body1).to eq fake_data + expect(body2).to eq fake_data + expect(req1.header['x-launchdarkly-payload-id']).to eq req2.header['x-launchdarkly-payload-id'] + end + end + end + + [401, 403].each do |status| + it "gives up after unrecoverable error #{status}" do + with_sender_and_server do |es, server| + server.setup_response("/bulk") do |req, res| + res.status = status + end + + result = es.send_event_data(fake_data, false) + + expect(result.success).to be false + expect(result.must_shutdown).to be true + expect(result.time_from_server).to be_nil + + expect(server.requests.count).to eq 1 + end + end + end + end + end +end diff --git a/spec/events_spec.rb b/spec/events_spec.rb index 1108a3ac..a36fa95f 100644 --- a/spec/events_spec.rb +++ b/spec/events_spec.rb @@ -5,8 +5,8 @@ describe LaunchDarkly::EventProcessor do subject { LaunchDarkly::EventProcessor } - let(:default_config) { LaunchDarkly::Config.new } - let(:hc) { FakeHttpClient.new } + let(:default_config_opts) { { diagnostic_opt_out: true, logger: $null_log } } + let(:default_config) { LaunchDarkly::Config.new(default_config_opts) } let(:user) { { key: "userkey", name: "Red" } } let(:filtered_user) { { key: "userkey", privateAttrs: [ "name" ] } } let(:numeric_user) { { key: 1, secondary: 2, ip: 3, country: 4, email: 5, firstName: 6, lastName: 7, @@ -14,546 +14,508 @@ let(:stringified_numeric_user) { { key: '1', secondary: '2', ip: '3', country: '4', email: '5', firstName: '6', lastName: '7', avatar: '8', name: '9', anonymous: false, custom: { age: 99 } } } - after(:each) do - if !@ep.nil? - @ep.stop + def with_processor_and_sender(config) + sender = FakeEventSender.new + ep = subject.new("sdk_key", config, nil, nil, { event_sender: sender }) + begin + yield ep, sender + ensure + ep.stop end end it "queues identify event" do - @ep = subject.new("sdk_key", default_config, hc) - e = { kind: "identify", key: user[:key], user: user } - @ep.add_event(e) + with_processor_and_sender(default_config) do |ep, sender| + e = { kind: "identify", key: user[:key], user: user } + ep.add_event(e) - output = flush_and_get_events - expect(output).to contain_exactly(e) + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly(e) + end end it "filters user in identify event" do - config = LaunchDarkly::Config.new(all_attributes_private: true) - @ep = subject.new("sdk_key", config, hc) - e = { kind: "identify", key: user[:key], user: user } - @ep.add_event(e) - - output = flush_and_get_events - expect(output).to contain_exactly({ - kind: "identify", - key: user[:key], - creationDate: e[:creationDate], - user: filtered_user - }) + config = LaunchDarkly::Config.new(default_config_opts.merge(all_attributes_private: true)) + with_processor_and_sender(config) do |ep, sender| + e = { kind: "identify", key: user[:key], user: user } + ep.add_event(e) + + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly({ + kind: "identify", + key: user[:key], + creationDate: e[:creationDate], + user: filtered_user + }) + end end it "stringifies built-in user attributes in identify event" do - @ep = subject.new("sdk_key", default_config, hc) - flag = { key: "flagkey", version: 11 } - e = { kind: "identify", key: numeric_user[:key], user: numeric_user } - @ep.add_event(e) - - output = flush_and_get_events - expect(output).to contain_exactly( - kind: "identify", - key: numeric_user[:key].to_s, - creationDate: e[:creationDate], - user: stringified_numeric_user - ) + with_processor_and_sender(default_config) do |ep, sender| + flag = { key: "flagkey", version: 11 } + e = { kind: "identify", key: numeric_user[:key], user: numeric_user } + ep.add_event(e) + + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + kind: "identify", + key: numeric_user[:key].to_s, + creationDate: e[:creationDate], + user: stringified_numeric_user + ) + end end it "queues individual feature event with index event" do - @ep = subject.new("sdk_key", default_config, hc) - flag = { key: "flagkey", version: 11 } - fe = { - kind: "feature", key: "flagkey", version: 11, user: user, - variation: 1, value: "value", trackEvents: true - } - @ep.add_event(fe) - - output = flush_and_get_events - expect(output).to contain_exactly( - eq(index_event(fe, user)), - eq(feature_event(fe, flag, false, nil)), - include(:kind => "summary") - ) + with_processor_and_sender(default_config) do |ep, sender| + flag = { key: "flagkey", version: 11 } + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: true + } + ep.add_event(fe) + + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + eq(index_event(fe, user)), + eq(feature_event(fe, flag, false, nil)), + include(:kind => "summary") + ) + end end it "filters user in index event" do - config = LaunchDarkly::Config.new(all_attributes_private: true) - @ep = subject.new("sdk_key", config, hc) - flag = { key: "flagkey", version: 11 } - fe = { - kind: "feature", key: "flagkey", version: 11, user: user, - variation: 1, value: "value", trackEvents: true - } - @ep.add_event(fe) - - output = flush_and_get_events - expect(output).to contain_exactly( - eq(index_event(fe, filtered_user)), - eq(feature_event(fe, flag, false, nil)), - include(:kind => "summary") - ) + config = LaunchDarkly::Config.new(default_config_opts.merge(all_attributes_private: true)) + with_processor_and_sender(config) do |ep, sender| + flag = { key: "flagkey", version: 11 } + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: true + } + ep.add_event(fe) + + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + eq(index_event(fe, filtered_user)), + eq(feature_event(fe, flag, false, nil)), + include(:kind => "summary") + ) + end end it "stringifies built-in user attributes in index event" do - @ep = subject.new("sdk_key", default_config, hc) - flag = { key: "flagkey", version: 11 } - fe = { - kind: "feature", key: "flagkey", version: 11, user: numeric_user, - variation: 1, value: "value", trackEvents: true - } - @ep.add_event(fe) - - output = flush_and_get_events - expect(output).to contain_exactly( - eq(index_event(fe, stringified_numeric_user)), - eq(feature_event(fe, flag, false, nil)), - include(:kind => "summary") - ) + with_processor_and_sender(default_config) do |ep, sender| + flag = { key: "flagkey", version: 11 } + fe = { + kind: "feature", key: "flagkey", version: 11, user: numeric_user, + variation: 1, value: "value", trackEvents: true + } + ep.add_event(fe) + + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + eq(index_event(fe, stringified_numeric_user)), + eq(feature_event(fe, flag, false, nil)), + include(:kind => "summary") + ) + end end it "can include inline user in feature event" do - config = LaunchDarkly::Config.new(inline_users_in_events: true) - @ep = subject.new("sdk_key", config, hc) - flag = { key: "flagkey", version: 11 } - fe = { - kind: "feature", key: "flagkey", version: 11, user: user, - variation: 1, value: "value", trackEvents: true - } - @ep.add_event(fe) - - output = flush_and_get_events - expect(output).to contain_exactly( - eq(feature_event(fe, flag, false, user)), - include(:kind => "summary") - ) + config = LaunchDarkly::Config.new(default_config_opts.merge(inline_users_in_events: true)) + with_processor_and_sender(config) do |ep, sender| + flag = { key: "flagkey", version: 11 } + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: true + } + ep.add_event(fe) + + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + eq(feature_event(fe, flag, false, user)), + include(:kind => "summary") + ) + end end it "stringifies built-in user attributes in feature event" do - config = LaunchDarkly::Config.new(inline_users_in_events: true) - @ep = subject.new("sdk_key", config, hc) - flag = { key: "flagkey", version: 11 } - fe = { - kind: "feature", key: "flagkey", version: 11, user: numeric_user, - variation: 1, value: "value", trackEvents: true - } - @ep.add_event(fe) - - output = flush_and_get_events - expect(output).to contain_exactly( - eq(feature_event(fe, flag, false, stringified_numeric_user)), - include(:kind => "summary") - ) + config = LaunchDarkly::Config.new(default_config_opts.merge(inline_users_in_events: true)) + with_processor_and_sender(config) do |ep, sender| + flag = { key: "flagkey", version: 11 } + fe = { + kind: "feature", key: "flagkey", version: 11, user: numeric_user, + variation: 1, value: "value", trackEvents: true + } + ep.add_event(fe) + + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + eq(feature_event(fe, flag, false, stringified_numeric_user)), + include(:kind => "summary") + ) + end end it "filters user in feature event" do - config = LaunchDarkly::Config.new(all_attributes_private: true, inline_users_in_events: true) - @ep = subject.new("sdk_key", config, hc) - flag = { key: "flagkey", version: 11 } - fe = { - kind: "feature", key: "flagkey", version: 11, user: user, - variation: 1, value: "value", trackEvents: true - } - @ep.add_event(fe) - - output = flush_and_get_events - expect(output).to contain_exactly( - eq(feature_event(fe, flag, false, filtered_user)), - include(:kind => "summary") - ) + config = LaunchDarkly::Config.new(default_config_opts.merge(all_attributes_private: true, inline_users_in_events: true)) + with_processor_and_sender(config) do |ep, sender| + flag = { key: "flagkey", version: 11 } + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: true + } + ep.add_event(fe) + + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + eq(feature_event(fe, flag, false, filtered_user)), + include(:kind => "summary") + ) + end end it "still generates index event if inline_users is true but feature event was not tracked" do - config = LaunchDarkly::Config.new(inline_users_in_events: true) - @ep = subject.new("sdk_key", config, hc) - flag = { key: "flagkey", version: 11 } - fe = { - kind: "feature", key: "flagkey", version: 11, user: user, - variation: 1, value: "value", trackEvents: false - } - @ep.add_event(fe) - - output = flush_and_get_events - expect(output).to contain_exactly( - eq(index_event(fe, user)), - include(:kind => "summary") - ) + config = LaunchDarkly::Config.new(default_config_opts.merge(inline_users_in_events: true)) + with_processor_and_sender(config) do |ep, sender| + flag = { key: "flagkey", version: 11 } + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: false + } + ep.add_event(fe) + + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + eq(index_event(fe, user)), + include(:kind => "summary") + ) + end end it "sets event kind to debug if flag is temporarily in debug mode" do - @ep = subject.new("sdk_key", default_config, hc) - flag = { key: "flagkey", version: 11 } - future_time = (Time.now.to_f * 1000).to_i + 1000000 - fe = { - kind: "feature", key: "flagkey", version: 11, user: user, - variation: 1, value: "value", trackEvents: false, debugEventsUntilDate: future_time - } - @ep.add_event(fe) - - output = flush_and_get_events - expect(output).to contain_exactly( - eq(index_event(fe, user)), - eq(feature_event(fe, flag, true, user)), - include(:kind => "summary") - ) + with_processor_and_sender(default_config) do |ep, sender| + flag = { key: "flagkey", version: 11 } + future_time = (Time.now.to_f * 1000).to_i + 1000000 + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: false, debugEventsUntilDate: future_time + } + ep.add_event(fe) + + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + eq(index_event(fe, user)), + eq(feature_event(fe, flag, true, user)), + include(:kind => "summary") + ) + end end it "can be both debugging and tracking an event" do - @ep = subject.new("sdk_key", default_config, hc) - flag = { key: "flagkey", version: 11 } - future_time = (Time.now.to_f * 1000).to_i + 1000000 - fe = { - kind: "feature", key: "flagkey", version: 11, user: user, - variation: 1, value: "value", trackEvents: true, debugEventsUntilDate: future_time - } - @ep.add_event(fe) - - output = flush_and_get_events - expect(output).to contain_exactly( - eq(index_event(fe, user)), - eq(feature_event(fe, flag, false, nil)), - eq(feature_event(fe, flag, true, user)), - include(:kind => "summary") - ) + with_processor_and_sender(default_config) do |ep, sender| + flag = { key: "flagkey", version: 11 } + future_time = (Time.now.to_f * 1000).to_i + 1000000 + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: true, debugEventsUntilDate: future_time + } + ep.add_event(fe) + + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + eq(index_event(fe, user)), + eq(feature_event(fe, flag, false, nil)), + eq(feature_event(fe, flag, true, user)), + include(:kind => "summary") + ) + end end it "ends debug mode based on client time if client time is later than server time" do - @ep = subject.new("sdk_key", default_config, hc) - - # Pick a server time that is somewhat behind the client time - server_time = (Time.now.to_f * 1000).to_i - 20000 - - # Send and flush an event we don't care about, just to set the last server time - hc.set_server_time(server_time) - @ep.add_event({ kind: "identify", user: { key: "otherUser" }}) - flush_and_get_events - - # Now send an event with debug mode on, with a "debug until" time that is further in - # the future than the server time, but in the past compared to the client. - flag = { key: "flagkey", version: 11 } - debug_until = server_time + 1000 - fe = { - kind: "feature", key: "flagkey", version: 11, user: user, - variation: 1, value: "value", trackEvents: false, debugEventsUntilDate: debug_until - } - @ep.add_event(fe) - - # Should get a summary event only, not a full feature event - output = flush_and_get_events - expect(output).to contain_exactly( - eq(index_event(fe, user)), - include(:kind => "summary") - ) + with_processor_and_sender(default_config) do |ep, sender| + # Pick a server time that is somewhat behind the client time + server_time = Time.now - 20 + + # Send and flush an event we don't care about, just to set the last server time + sender.result = LaunchDarkly::Impl::EventSenderResult.new(true, false, server_time) + ep.add_event({ kind: "identify", user: user }) + flush_and_get_events(ep, sender) + + # Now send an event with debug mode on, with a "debug until" time that is further in + # the future than the server time, but in the past compared to the client. + flag = { key: "flagkey", version: 11 } + debug_until = (server_time.to_f * 1000).to_i + 1000 + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: false, debugEventsUntilDate: debug_until + } + ep.add_event(fe) + + # Should get a summary event only, not a full feature event + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + include(:kind => "summary") + ) + end end it "ends debug mode based on server time if server time is later than client time" do - @ep = subject.new("sdk_key", default_config, hc) - - # Pick a server time that is somewhat ahead of the client time - server_time = (Time.now.to_f * 1000).to_i + 20000 - - # Send and flush an event we don't care about, just to set the last server time - hc.set_server_time(server_time) - @ep.add_event({ kind: "identify", user: { key: "otherUser" }}) - flush_and_get_events - - # Now send an event with debug mode on, with a "debug until" time that is further in - # the future than the server time, but in the past compared to the client. - flag = { key: "flagkey", version: 11 } - debug_until = server_time - 1000 - fe = { - kind: "feature", key: "flagkey", version: 11, user: user, - variation: 1, value: "value", trackEvents: false, debugEventsUntilDate: debug_until - } - @ep.add_event(fe) - - # Should get a summary event only, not a full feature event - output = flush_and_get_events - expect(output).to contain_exactly( - eq(index_event(fe, user)), - include(:kind => "summary") - ) + with_processor_and_sender(default_config) do |ep, sender| + # Pick a server time that is somewhat ahead of the client time + server_time = Time.now + 20 + + # Send and flush an event we don't care about, just to set the last server time + sender.result = LaunchDarkly::Impl::EventSenderResult.new(true, false, server_time) + ep.add_event({ kind: "identify", user: user }) + flush_and_get_events(ep, sender) + + # Now send an event with debug mode on, with a "debug until" time that is further in + # the future than the server time, but in the past compared to the client. + flag = { key: "flagkey", version: 11 } + debug_until = (server_time.to_f * 1000).to_i - 1000 + fe = { + kind: "feature", key: "flagkey", version: 11, user: user, + variation: 1, value: "value", trackEvents: false, debugEventsUntilDate: debug_until + } + ep.add_event(fe) + + # Should get a summary event only, not a full feature event + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + include(:kind => "summary") + ) + end end it "generates only one index event for multiple events with same user" do - @ep = subject.new("sdk_key", default_config, hc) - flag1 = { key: "flagkey1", version: 11 } - flag2 = { key: "flagkey2", version: 22 } - future_time = (Time.now.to_f * 1000).to_i + 1000000 - fe1 = { - kind: "feature", key: "flagkey1", version: 11, user: user, - variation: 1, value: "value", trackEvents: true - } - fe2 = { - kind: "feature", key: "flagkey2", version: 22, user: user, - variation: 1, value: "value", trackEvents: true - } - @ep.add_event(fe1) - @ep.add_event(fe2) - - output = flush_and_get_events - expect(output).to contain_exactly( - eq(index_event(fe1, user)), - eq(feature_event(fe1, flag1, false, nil)), - eq(feature_event(fe2, flag2, false, nil)), - include(:kind => "summary") - ) + with_processor_and_sender(default_config) do |ep, sender| + flag1 = { key: "flagkey1", version: 11 } + flag2 = { key: "flagkey2", version: 22 } + future_time = (Time.now.to_f * 1000).to_i + 1000000 + fe1 = { + kind: "feature", key: "flagkey1", version: 11, user: user, + variation: 1, value: "value", trackEvents: true + } + fe2 = { + kind: "feature", key: "flagkey2", version: 22, user: user, + variation: 1, value: "value", trackEvents: true + } + ep.add_event(fe1) + ep.add_event(fe2) + + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + eq(index_event(fe1, user)), + eq(feature_event(fe1, flag1, false, nil)), + eq(feature_event(fe2, flag2, false, nil)), + include(:kind => "summary") + ) + end end it "summarizes non-tracked events" do - @ep = subject.new("sdk_key", default_config, hc) - flag1 = { key: "flagkey1", version: 11 } - flag2 = { key: "flagkey2", version: 22 } - future_time = (Time.now.to_f * 1000).to_i + 1000000 - fe1 = { - kind: "feature", key: "flagkey1", version: 11, user: user, - variation: 1, value: "value1", default: "default1" - } - fe2 = { - kind: "feature", key: "flagkey2", version: 22, user: user, - variation: 2, value: "value2", default: "default2" - } - @ep.add_event(fe1) - @ep.add_event(fe2) - - output = flush_and_get_events - expect(output).to contain_exactly( - eq(index_event(fe1, user)), - eq({ - kind: "summary", - startDate: fe1[:creationDate], - endDate: fe2[:creationDate], - features: { - flagkey1: { - default: "default1", - counters: [ - { version: 11, variation: 1, value: "value1", count: 1 } - ] - }, - flagkey2: { - default: "default2", - counters: [ - { version: 22, variation: 2, value: "value2", count: 1 } - ] + with_processor_and_sender(default_config) do |ep, sender| + flag1 = { key: "flagkey1", version: 11 } + flag2 = { key: "flagkey2", version: 22 } + future_time = (Time.now.to_f * 1000).to_i + 1000000 + fe1 = { + kind: "feature", key: "flagkey1", version: 11, user: user, + variation: 1, value: "value1", default: "default1" + } + fe2 = { + kind: "feature", key: "flagkey2", version: 22, user: user, + variation: 2, value: "value2", default: "default2" + } + ep.add_event(fe1) + ep.add_event(fe2) + + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + eq(index_event(fe1, user)), + eq({ + kind: "summary", + startDate: fe1[:creationDate], + endDate: fe2[:creationDate], + features: { + flagkey1: { + default: "default1", + counters: [ + { version: 11, variation: 1, value: "value1", count: 1 } + ] + }, + flagkey2: { + default: "default2", + counters: [ + { version: 22, variation: 2, value: "value2", count: 1 } + ] + } } - } - }) - ) + }) + ) + end end it "queues custom event with user" do - @ep = subject.new("sdk_key", default_config, hc) - e = { kind: "custom", key: "eventkey", user: user, data: { thing: "stuff" }, metricValue: 1.5 } - @ep.add_event(e) - - output = flush_and_get_events - expect(output).to contain_exactly( - eq(index_event(e, user)), - eq(custom_event(e, nil)) - ) + with_processor_and_sender(default_config) do |ep, sender| + e = { kind: "custom", key: "eventkey", user: user, data: { thing: "stuff" }, metricValue: 1.5 } + ep.add_event(e) + + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + eq(index_event(e, user)), + eq(custom_event(e, nil)) + ) + end end it "can include inline user in custom event" do - config = LaunchDarkly::Config.new(inline_users_in_events: true) - @ep = subject.new("sdk_key", config, hc) - e = { kind: "custom", key: "eventkey", user: user, data: { thing: "stuff" } } - @ep.add_event(e) - - output = flush_and_get_events - expect(output).to contain_exactly( - eq(custom_event(e, user)) - ) + config = LaunchDarkly::Config.new(default_config_opts.merge(inline_users_in_events: true)) + with_processor_and_sender(config) do |ep, sender| + e = { kind: "custom", key: "eventkey", user: user, data: { thing: "stuff" } } + ep.add_event(e) + + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + eq(custom_event(e, user)) + ) + end end it "filters user in custom event" do - config = LaunchDarkly::Config.new(all_attributes_private: true, inline_users_in_events: true) - @ep = subject.new("sdk_key", config, hc) - e = { kind: "custom", key: "eventkey", user: user, data: { thing: "stuff" } } - @ep.add_event(e) - - output = flush_and_get_events - expect(output).to contain_exactly( - eq(custom_event(e, filtered_user)) - ) + config = LaunchDarkly::Config.new(default_config_opts.merge(all_attributes_private: true, inline_users_in_events: true)) + with_processor_and_sender(config) do |ep, sender| + e = { kind: "custom", key: "eventkey", user: user, data: { thing: "stuff" } } + ep.add_event(e) + + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + eq(custom_event(e, filtered_user)) + ) + end end it "stringifies built-in user attributes in custom event" do - config = LaunchDarkly::Config.new(inline_users_in_events: true) - @ep = subject.new("sdk_key", config, hc) - e = { kind: "custom", key: "eventkey", user: numeric_user } - @ep.add_event(e) - - output = flush_and_get_events - expect(output).to contain_exactly( - eq(custom_event(e, stringified_numeric_user)) - ) + config = LaunchDarkly::Config.new(default_config_opts.merge(inline_users_in_events: true)) + with_processor_and_sender(config) do |ep, sender| + e = { kind: "custom", key: "eventkey", user: numeric_user } + ep.add_event(e) + + output = flush_and_get_events(ep, sender) + expect(output).to contain_exactly( + eq(custom_event(e, stringified_numeric_user)) + ) + end end it "does a final flush when shutting down" do - @ep = subject.new("sdk_key", default_config, hc) - e = { kind: "identify", key: user[:key], user: user } - @ep.add_event(e) - - @ep.stop - - output = get_events_from_last_request - expect(output).to contain_exactly(e) + with_processor_and_sender(default_config) do |ep, sender| + e = { kind: "identify", key: user[:key], user: user } + ep.add_event(e) + + ep.stop + + output = sender.analytics_payloads.pop + expect(output).to contain_exactly(e) + end end it "sends nothing if there are no events" do - @ep = subject.new("sdk_key", default_config, hc) - @ep.flush - expect(hc.get_request).to be nil - end - - it "sends SDK key" do - @ep = subject.new("sdk_key", default_config, hc) - e = { kind: "identify", user: user } - @ep.add_event(e) - - @ep.flush - @ep.wait_until_inactive - - expect(hc.get_request["authorization"]).to eq "sdk_key" - end - - it "sends unique payload IDs" do - @ep = subject.new("sdk_key", default_config, hc) - e = { kind: "identify", user: user } - - @ep.add_event(e) - @ep.flush - @ep.wait_until_inactive - req0 = hc.get_request - - @ep.add_event(e) - @ep.flush - @ep.wait_until_inactive - req1 = hc.get_request - - id0 = req0["x-launchdarkly-payload-id"] - id1 = req1["x-launchdarkly-payload-id"] - expect(id0).not_to be_nil - expect(id0).not_to eq "" - expect(id1).not_to be nil - expect(id1).not_to eq "" - expect(id1).not_to eq id0 - end - - def verify_unrecoverable_http_error(status) - @ep = subject.new("sdk_key", default_config, hc) - e = { kind: "identify", user: user } - @ep.add_event(e) - - hc.set_response_status(status) - @ep.flush - @ep.wait_until_inactive - expect(hc.get_request).not_to be_nil - hc.reset - - @ep.add_event(e) - @ep.flush - @ep.wait_until_inactive - expect(hc.get_request).to be_nil - end - - def verify_recoverable_http_error(status) - @ep = subject.new("sdk_key", default_config, hc) - e = { kind: "identify", user: user } - @ep.add_event(e) - - hc.set_response_status(503) - @ep.flush - @ep.wait_until_inactive - - req0 = hc.get_request - expect(req0).not_to be_nil - req1 = hc.get_request - expect(req1).not_to be_nil - id0 = req0["x-launchdarkly-payload-id"] - expect(id0).not_to be_nil - expect(id0).not_to eq "" - expect(req1["x-launchdarkly-payload-id"]).to eq id0 - - expect(hc.get_request).to be_nil # no 3rd request - - # now verify that a subsequent flush still generates a request - hc.reset - @ep.add_event(e) - @ep.flush - @ep.wait_until_inactive - expect(hc.get_request).not_to be_nil + with_processor_and_sender(default_config) do |ep, sender| + ep.flush + ep.wait_until_inactive + expect(sender.analytics_payloads.empty?).to be true + end end - it "stops posting events after getting a 401 error" do - verify_unrecoverable_http_error(401) - end + it "stops posting events after unrecoverable error" do + with_processor_and_sender(default_config) do |ep, sender| + sender.result = LaunchDarkly::Impl::EventSenderResult.new(false, true, nil) + e = { kind: "identify", key: user[:key], user: user } + ep.add_event(e) + flush_and_get_events(ep, sender) - it "stops posting events after getting a 403 error" do - verify_unrecoverable_http_error(403) + e = { kind: "identify", key: user[:key], user: user } + ep.add_event(e) + ep.flush + ep.wait_until_inactive + expect(sender.analytics_payloads.empty?).to be true + end end - it "retries after 408 error" do - verify_recoverable_http_error(408) - end + describe "diagnostic events" do + let(:default_id) { LaunchDarkly::Impl::DiagnosticAccumulator.create_diagnostic_id('sdk_key') } + let(:diagnostic_config) { LaunchDarkly::Config.new(diagnostic_opt_out: false, logger: $null_log) } - it "retries after 429 error" do - verify_recoverable_http_error(429) - end + def with_diagnostic_processor_and_sender(config) + sender = FakeEventSender.new + acc = LaunchDarkly::Impl::DiagnosticAccumulator.new(default_id) + ep = subject.new("sdk_key", config, nil, acc, + { diagnostic_recording_interval: 0.2, event_sender: sender }) + begin + yield ep, sender + ensure + ep.stop + end + end - it "retries after 503 error" do - verify_recoverable_http_error(503) - end + it "sends init event" do + with_diagnostic_processor_and_sender(diagnostic_config) do |ep, sender| + event = sender.diagnostic_payloads.pop + expect(event).to include({ + kind: 'diagnostic-init', + id: default_id + }) + end + end - it "retries flush once after connection error" do - @ep = subject.new("sdk_key", default_config, hc) - e = { kind: "identify", user: user } - @ep.add_event(e) + it "sends periodic event" do + with_diagnostic_processor_and_sender(diagnostic_config) do |ep, sender| + init_event = sender.diagnostic_payloads.pop + periodic_event = sender.diagnostic_payloads.pop + expect(periodic_event).to include({ + kind: 'diagnostic', + id: default_id, + droppedEvents: 0, + deduplicatedUsers: 0, + eventsInLastBatch: 0, + streamInits: [] + }) + end + end - hc.set_exception(IOError.new("deliberate error")) - @ep.flush - @ep.wait_until_inactive + it "counts events in queue from last flush and dropped events" do + config = LaunchDarkly::Config.new(diagnostic_opt_out: false, capacity: 2, logger: $null_log) + with_diagnostic_processor_and_sender(config) do |ep, sender| + init_event = sender.diagnostic_payloads.pop + + ep.add_event({ kind: 'identify', user: user }) + ep.add_event({ kind: 'identify', user: user }) + ep.add_event({ kind: 'identify', user: user }) + flush_and_get_events(ep, sender) + + periodic_event = sender.diagnostic_payloads.pop + expect(periodic_event).to include({ + kind: 'diagnostic', + droppedEvents: 1, + eventsInLastBatch: 2 + }) + end + end - expect(hc.get_request).not_to be_nil - expect(hc.get_request).not_to be_nil - expect(hc.get_request).to be_nil # no 3rd request - end + it "counts deduplicated users" do + with_diagnostic_processor_and_sender(diagnostic_config) do |ep, sender| + init_event = sender.diagnostic_payloads.pop - it "makes actual HTTP request with correct headers" do - e = { kind: "identify", key: user[:key], user: user } - with_server do |server| - server.setup_ok_response("/bulk", "") - - @ep = subject.new("sdk_key", LaunchDarkly::Config.new(events_uri: server.base_uri.to_s)) - @ep.add_event(e) - @ep.flush - - req = server.await_request - expect(req.header).to include({ - "authorization" => [ "sdk_key" ], - "content-type" => [ "application/json" ], - "user-agent" => [ "RubyClient/" + LaunchDarkly::VERSION ], - "x-launchdarkly-event-schema" => [ "3" ] - }) - end - end + ep.add_event({ kind: 'custom', key: 'event1', user: user }) + ep.add_event({ kind: 'custom', key: 'event2', user: user }) + events = flush_and_get_events(ep, sender) - it "can use a proxy server" do - e = { kind: "identify", key: user[:key], user: user } - with_server do |server| - server.setup_ok_response("/bulk", "") - - with_server(StubProxyServer.new) do |proxy| - begin - ENV["http_proxy"] = proxy.base_uri.to_s - @ep = subject.new("sdk_key", LaunchDarkly::Config.new(events_uri: server.base_uri.to_s)) - @ep.add_event(e) - @ep.flush - - req = server.await_request - expect(req["content-type"]).to eq("application/json") - ensure - ENV["http_proxy"] = nil - end + periodic_event = sender.diagnostic_payloads.pop + expect(periodic_event).to include({ + kind: 'diagnostic', + deduplicatedUsers: 1 + }) end end end @@ -599,75 +561,26 @@ def custom_event(e, inline_user) out end - def flush_and_get_events - @ep.flush - @ep.wait_until_inactive - get_events_from_last_request + def flush_and_get_events(ep, sender) + ep.flush + ep.wait_until_inactive + sender.analytics_payloads.pop end - def get_events_from_last_request - req = hc.get_request - JSON.parse(req.body, symbolize_names: true) - end + class FakeEventSender + attr_accessor :result + attr_reader :analytics_payloads + attr_reader :diagnostic_payloads - class FakeHttpClient def initialize - reset - end - - def set_response_status(status) - @status = status - end - - def set_server_time(time_millis) - @server_time = Time.at(time_millis.to_f / 1000) - end - - def set_exception(e) - @exception = e - end - - def reset - @requests = [] - @status = 200 - end - - def request(req) - @requests.push(req) - if @exception - raise @exception - else - headers = {} - if @server_time - headers["Date"] = @server_time.httpdate - end - FakeResponse.new(@status ? @status : 200, headers) - end + @result = LaunchDarkly::Impl::EventSenderResult.new(true, false, nil) + @analytics_payloads = Queue.new + @diagnostic_payloads = Queue.new end - def start - end - - def started? - false - end - - def finish - end - - def get_request - @requests.shift - end - end - - class FakeResponse - include Net::HTTPHeader - - attr_reader :code - - def initialize(status, headers) - @code = status.to_s - initialize_http_header(headers) + def send_event_data(data, is_diagnostic) + (is_diagnostic ? @diagnostic_payloads : @analytics_payloads).push(JSON.parse(data, symbolize_names: true)) + @result end end end diff --git a/spec/file_data_source_spec.rb b/spec/file_data_source_spec.rb index 837b775d..212d057b 100644 --- a/spec/file_data_source_spec.rb +++ b/spec/file_data_source_spec.rb @@ -95,7 +95,7 @@ def []=(key, value) let(:bad_file_path) { "no-such-file" } before do - @config = LaunchDarkly::Config.new + @config = LaunchDarkly::Config.new(logger: $null_log) @store = @config.feature_store @tmp_dir = Dir.mktmpdir end diff --git a/spec/http_util.rb b/spec/http_util.rb index e43e2ded..27032589 100644 --- a/spec/http_util.rb +++ b/spec/http_util.rb @@ -52,6 +52,13 @@ def setup_response(uri_path, &action) @server.mount_proc(uri_path, action) end + def setup_status_response(uri_path, status, headers={}) + setup_response(uri_path) do |req, res| + res.status = status + headers.each { |n, v| res[n] = v } + end + end + def setup_ok_response(uri_path, body, content_type=nil, headers={}) setup_response(uri_path) do |req, res| res.status = 200 @@ -63,11 +70,17 @@ def setup_ok_response(uri_path, body, content_type=nil, headers={}) def record_request(req, res) @requests.push(req) - @requests_queue << req + @requests_queue << [req, req.body] end def await_request - @requests_queue.pop + r = @requests_queue.pop + r[0] + end + + def await_request_with_body + r = @requests_queue.pop + return r[0], r[1] end end diff --git a/spec/integrations/consul_feature_store_spec.rb b/spec/integrations/consul_feature_store_spec.rb index e74d0f0d..bad1e736 100644 --- a/spec/integrations/consul_feature_store_spec.rb +++ b/spec/integrations/consul_feature_store_spec.rb @@ -4,8 +4,6 @@ $my_prefix = 'testprefix' -$null_log = ::Logger.new($stdout) -$null_log.level = ::Logger::FATAL $consul_base_opts = { prefix: $my_prefix, diff --git a/spec/integrations/dynamodb_feature_store_spec.rb b/spec/integrations/dynamodb_feature_store_spec.rb index 7734670e..3b95edc8 100644 --- a/spec/integrations/dynamodb_feature_store_spec.rb +++ b/spec/integrations/dynamodb_feature_store_spec.rb @@ -6,8 +6,6 @@ $table_name = 'LD_DYNAMODB_TEST_TABLE' $endpoint = 'http://localhost:8000' $my_prefix = 'testprefix' -$null_log = ::Logger.new($stdout) -$null_log.level = ::Logger::FATAL $dynamodb_opts = { credentials: Aws::Credentials.new("key", "secret"), diff --git a/spec/ldclient_spec.rb b/spec/ldclient_spec.rb index 4672a662..1d3bb506 100644 --- a/spec/ldclient_spec.rb +++ b/spec/ldclient_spec.rb @@ -461,7 +461,7 @@ def event_processor end describe 'with send_events: true' do - let(:config_with_events) { LaunchDarkly::Config.new({offline: false, send_events: true, data_source: null_data}) } + let(:config_with_events) { LaunchDarkly::Config.new({offline: false, send_events: true, diagnostic_opt_out: true, data_source: null_data}) } let(:client_with_events) { subject.new("secret", config_with_events) } it "does not use a NullEventProcessor" do diff --git a/spec/polling_spec.rb b/spec/polling_spec.rb index 690147d0..b0eb46c5 100644 --- a/spec/polling_spec.rb +++ b/spec/polling_spec.rb @@ -6,7 +6,7 @@ let(:requestor) { double() } def with_processor(store) - config = LaunchDarkly::Config.new(feature_store: store) + config = LaunchDarkly::Config.new(feature_store: store, logger: $null_log) processor = subject.new(config, requestor) begin yield processor diff --git a/spec/redis_feature_store_spec.rb b/spec/redis_feature_store_spec.rb index 5aec6658..cf69f334 100644 --- a/spec/redis_feature_store_spec.rb +++ b/spec/redis_feature_store_spec.rb @@ -4,10 +4,7 @@ require "spec_helper" - $my_prefix = 'testprefix' -$null_log = ::Logger.new($stdout) -$null_log.level = ::Logger::FATAL $base_opts = { prefix: $my_prefix, diff --git a/spec/requestor_spec.rb b/spec/requestor_spec.rb index 502f6d86..6833ea1f 100644 --- a/spec/requestor_spec.rb +++ b/spec/requestor_spec.rb @@ -4,10 +4,13 @@ $sdk_key = "secret" describe LaunchDarkly::Requestor do - def with_requestor(base_uri) - r = LaunchDarkly::Requestor.new($sdk_key, LaunchDarkly::Config.new(base_uri: base_uri)) - yield r - r.stop + def with_requestor(base_uri, opts = {}) + r = LaunchDarkly::Requestor.new($sdk_key, LaunchDarkly::Config.new({ base_uri: base_uri }.merge(opts))) + begin + yield r + ensure + r.stop + end end describe "request_all_flags" do @@ -56,6 +59,19 @@ def with_requestor(base_uri) end end + it "sends wrapper header if configured" do + with_server do |server| + with_requestor(server.base_uri.to_s, { wrapper_name: 'MyWrapper', wrapper_version: '1.0' }) do |requestor| + server.setup_ok_response("/", "{}") + requestor.request_all_data() + expect(server.requests.count).to eq 1 + expect(server.requests[0].header).to include({ + "x-launchdarkly-wrapper" => [ "MyWrapper/1.0" ] + }) + end + end + end + it "can reuse cached data" do etag = "xyz" expected_data = { flags: { x: { key: "x" } } } diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index cc5e312b..52926ac1 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -3,6 +3,9 @@ require "ldclient-rb" +$null_log = ::Logger.new($stdout) +$null_log.level = ::Logger::FATAL + RSpec.configure do |config| config.before(:each) do end