From 01dc51dfb36f7aca1369bcfcb53629a97a016689 Mon Sep 17 00:00:00 2001 From: LaunchDarklyReleaseBot <86431345+LaunchDarklyReleaseBot@users.noreply.github.com> Date: Thu, 4 May 2023 08:43:16 -0400 Subject: [PATCH] prepare 7.2.0 release (#218) * add event payload ID header * (6.0) drop support for old Ruby versions * add Ruby version constraint to gemspec * remove Rake dependency * update ld-eventsource to 1.0.2 which doesn't have Rake dependency * implement diagnostic events in Ruby (#130) * update ruby-eventsource to 1.0.3 for backoff bug * fix incorrect initialization of EventProcessor * remove install-time openssl check that breaks if you don't have rake * treat comparison with wrong data type as a non-match, not an exception (#134) * fail fast for nil SDK key when appropriate * tolerate nil value for user.custom (#137) * Only shutdown the Redis pool if it is owned by the SDK (#158) * Only shutdown a Redis pool created by SDK * Make pool shutdown behavior an option * improve doc comment * remove support for indirect/patch and indirect/put (#138) * update to json 2.3.1 (#139) * update json dep to 2.3.x to fix CVE * add publication of API docs on GitHub Pages (#143) * try fixing release metadata * update the default base url (#144) * revert renames of feature_store & update_processor * [ch92483] Use http gem and add socket factory support (#142) * update dependencies and add CI for ruby 3 (#141) * reference eventsource 2.0 in gemspec * add 5.x releasable branch for releaser * use Ruby 2.6.6 in releases * Removed the guides link * [ch99757] add alias method (#147) * don't send event for nil user evaluation * remove lockfile (#148) * rm redundant nil check * Experiment Allocation Changes (#150) * WIP - from sam's pairing session * starting sdk changes * adding tests and making sure everything works * adding more tests * removing the singleton for fallthrough * Revert "removing the singleton for fallthrough" This reverts commit dff7adbb809ecc63118d0fbff9742a88a039c679. * taking a different approach to keep things immutable * adding tests for untracked * remove unnecessary comment * making sure to return two values in all code paths Co-authored-by: pellyg-ld * Use camelCase for JSON property names (#151) The in_experiment attribute was added to reasons as part of #150 but it doesn't appear to be received in events. I think that's because it's sending it in JSON as "in_experiment" rather than "inExperiment" as we expect to parse it. * fixing ruby logic causing ih failures (#152) * fixing ruby logic * adding missing spec * Apply suggestions from code review Co-authored-by: Sam Stokes * pr tweaks * making spec language consistent Co-authored-by: Sam Stokes * add log warning for missing user key (#153) * add log warnings for nil/empty user key * rm warning for empty string key * fix test * diagnostic events should respect HTTPS_PROXY (#154) * minor test simplification (#155) * allow higher minor versions of json and http gems * allow v5.x of http gem (#157) * use Bundler 2.2.10 + modernize CI config (#158) * enable verbose rspec output * fix socket factory tests * restore log suppression * Replacing deprecated circleci image usage (#159) * use Releaser v2 config (#161) * Updates docs URLs * Update lib/ldclient-rb/ldclient.rb Co-authored-by: Louis Chan <91093020+louis-launchdarkly@users.noreply.github.com> * remove reliance on git in gemspec (#163) * use ruby-eventsource 2.1.1 for fix of sc-123850 and sc-125504 (#164) * use ruby-eventsource 2.1.1 for fix of sc-123850 and sc-125504 * comment phrasing * Start work on flag builder. * Add user targeting and rule builder * Add datasource implementation * Convert the current_flags hash to use symbols instead of strings as keys * Fix typo on FlagRuleBuilder copy constructor * minor refactoring of impl; Added use of new Clause struct instead of Hash in FlagRuleBuilder; Moved TestData.factory out of Impl namespace and renamed Impl to TestDataImpl * Add the doc comments * (big segments 1) add public config/interface/reason types (#167) * Cleanup docstrings to be YARD docs * Added Util.is_bool helper function to clean up the check for whether an object is a boolean; Removed the DeepCopyHash/DeepCopyArray objects in favor of deep_copy_hash and deep_copy_array functions * Move public classes out of Impl namespace. Most of it is in public namespace except for the data source now. * Move require of concurrent/atomics to the correct module * (big segments 2) implement Big Segments evaluation & status APIs (#168) * improve CONTRIBUTING.md with notes on code organization * add note about doc comments * Cleanup YARD warnings and cleanup docs * Address PR feedback: Move is_bool back to Impl namespace to avoid confusion; Remove unnecessary nil check on variations in build function; fixup comments * (big segments 3) implement Redis & DynamoDB big segment stores (#169) * add missing import * fix stale calculation * fix big segments user hash algorithm to use SHA256 * improve & refactor client/evaluation tests * more cleanup/DRY * add use_preconfigured_flag and use_preconfigured_segment to TestData (#173) * always cache big segment query result even if it's nil * comments * add test for cache expiration * use TestData in our own tests (#174) * use TestData in our own tests * fix test * replace LaunchDarkly::FileDataSource with LaunchDarkly::Integrations::FileData * update ruby-eventsource version for recent SSE fixes * Bump bundler version (#184) * Add ability to to set initial reconnect delay (#183) * Treat secondary as a built-in attribute (#180) * all_flags_state is invalid if store isn't initialized (#182) * identify should not emit events if user key is "" (#181) * Account for traffic allocation on all flags (#185) * Add contract tests (#178) * Fix string interpolation in log message (#187) * Default opts to empty hash when creating persistent feature store (#186) * Remove Hakiri badge from README (#188) Hakiri was sunset on January 31st, 2022 at which time our badge stopped working. * detect http/https proxy env vars when creating HTTP clients * rever accidental change * fix nil safety in test service config * master -> main (#190) * master -> main * update ruby-eventsource version for parsing efficiency fix * miscellaneous optimizations for event processing (#193) * Drop support for EOL ruby versions (#196) Ruby 2.5 was EOL 2021-04-05 As of June 27th, 2022, the latest jRuby is Ruby 2.6 compatible. * Remove alias support (#195) * Add polling support for contract test service (#198) * Update rubocop and enable in CI (#197) Several of the Rubocop cop definitions have been renamed or moved to entirely other gems. This brings the configuration up to date with the latest naming conventions. * Add windows tests in circleci (#199) At some point in the past, we were experimenting with using Azure to verify Window builds. Now that CircleCI supports Windows, we should keep everything on a single CI provider. * Add application info support (#194) * reuse EvaluationDetail instances by precomputing results * rubocop reformatting * add super constructor calls * disable rubocop Rails rules and fix some remaining syntax offenses * fix super calls * Add big segment support to contract tests (#201) * Initial creation of LDContext (#206) This introduces the initial structure and usage of the LDContext class. Instances of this class are expected to be created through static factory methods: ```ruby LaunchDarkly::LDContext.create({...}) LaunchDarkly::LDContext.create_multi([...]) ``` This class is not completed yet. Rather, this initial commit is focused on the creation patterns and the most basic operations. Subsequent commits will continue fleshing out this class and its operation. The `get_value` method will see significant changes as we introduce attribute reference support. Its current more simplistic implementation exists only to serve some interim unit tests. * Add reference based value retrieval (#207) This commit introduces the References type used for targeting complex attributes in the new LDContexts. References are expected to be created through static factory methods: ```ruby LaunchDarkly::Reference.create("/a/b") LaunchDarkly::Reference.create_literal("/a/b") ``` These references can be used to retrieve values from an existing LDContext ```ruby ref = LaunchDarkly::Reference.create("/a/b") result = context.get_value_for_reference(ref) ``` * Basic changes to use contexts in evaluations instead of users (#208) This commit follows the general approach of the [equivalent PHP SDK PR][pr]. This replaces `LDUser` with `LDContext` in the parameters for evaluations, and makes the minimum necessary adjustments to allow evaluations to keep working as before as long as the context kind is "user". None of the new behavior defined in the U2C spec is implemented yet. Generation of evaluation events is temporarily disabled because the event logic hasn't been updated yet. U2C contract tests for evaluations are partially enabled; a lot of functionality is still missing, but all the tests that only cover previously-existing evaluation behavior are passing. [pr]: https://github.com/launchdarkly/php-server-sdk-private/pull/103 * Support ContextKind in Clauses (#209) This commit follows the general approach of the [equivalent PHP SDK PR][pr]. The main features of this commit are: - introduction of `individual_context` and `individual_context_count` methods - context kind matching in clauses [pr]: https://github.com/launchdarkly/php-server-sdk-private/pull/108 * Support included / excluded contexts in segments (#210) This commit follows the general approach of the [equivalent PHP SDK PR][pr]. Segments are now able to provide `includedContext` and `excludedContext` properties which can target values within a specific context kind. ```json { "includedContexts": [ { "contextKind": "org", "values": ["orgkey1", "orgkey2"] } ] } ``` [pr]: https://github.com/launchdarkly/php-server-sdk-private/pull/111 * Add contextKind support for rollouts & experiements (#211) This commit follows the general approach of the [equivalent PHP SDK PR][pr]. [pr]: https://github.com/launchdarkly/php-server-sdk-private/pull/110 * Style and test matcher improvements (#212) This commit enables several rubocop rules that were previously disabled. Once enabled, `rubocop -A` was run to automatically apply these fixes. There are a couple of additional changes that were made by hand: - I added the rubocop and rubocop-performance gems as dev packages. This should help address the original installation issue we ran into when I introduced these tools. - By default, new rubocop rules are disabled. This was the default before, but if you don't explicitly set this value, each run generates a ton of warning noise. This quiets that down. - Updates some LDContext tests to be more strict in their expectations of truth. * Remove support for secondary attribute (#213) As decided in the [spec], we are removing the special behavior of the secondary attribute. Going forward, secondary will be treated like any other attribute, and will no longer be included when determining the bucket for a context. [spec]: https://launchdarkly.atlassian.net/wiki/spaces/ENG/pages/2165212563/Consistent+and+Transparent+Rollout+Behavior+Unifying+Percent+Rollout+and+Traffic+Allocation * Remove deprecated APIs (#214) Since the users to context change requires a version break, this is the perfect time to remove previously deprecated bits of functionality. This includes: - Removing `update_processor*` config entries - `FileDataSource` entry point - `RedisFeatureStore` entry point - `Redis::sadd?` warning in unit tests * store data model with classes that aren't Hash * lint * remove [] override methods in places where we don't need them * comments * migrate some more of the model to be non-hash classes * lint * Anonymous cannot be nil in new context format (#216) The legacy user format allowed anonymous to be missing or explicitly provided but set to nil. The new context format requires anonymous to either not be set, or if it is explicitly set, it must be a boolean value. * Tweak error message language and style (#217) Our previous error messages suffered from a couple drawbacks: - The messages were complete sentences, limiting our ability to compose error messages - The messages were overly broad in many cases - The messages unnecessarily required string interpolation that rarely provided much value These new messages are more succinct and are written as small clauses which can be used in conjunction with other error messages more easily. * copyedit Co-authored-by: Matthew M. Keeler * Implement prerequisite cycle detection (#219) * Support attribute reference lookups (#215) This adds support for slash-delimited paths in clause attributes, bucketBy, etc. It does not do anything related to private attribute redaction because none of the U2C event logic is implemented yet. * Implement segment recursion and cycle detection (#220) Clauses in segment rules are now allowed to reference segments. To prevent an infinite recursion edge case, we implement a similar cycle detection mechanism as used on prerequisites. * Update event logic to support users to context change (#221) * Add legacy user-type support to the contract tests (#222) * Remove inline user configuration option (#223) * Add context_ configuration options (#224) These new context_ configuration options are meant to replace the historic user_ options. If both are provided, the context_ variant will take precedence. * Add support for flag context targets (#225) * Bump diplomat * Bump redis * Remove oga * Bump connection_pool * Favor set for faster target lookups (#228) A few of our internal models maintain arrays of values. These arrays can frequently be checked to see if they contain specific values. Since set lookups are much faster than array lookups, this commit changes the internal structure to a set for the values stored in Target and SegmentTarget. * Add secure mode hash to contract tests (#229) * Update big segment support for users to context (#226) To support the users to context change for big segments, this commit makes the following changes: - Introduces a new `Segment.unboundedContextKind` attribute. This will default to `LDContext::KIND_DEFAULT` and is only referenced when `Segment.unbounded` is true. - With the creation of multi-kind contexts, a single evaluation may result in multiple queries to the big segment store. This is reflected in the changes to the `EvalResult` processing. * Drop support for ruby 2.6 (#227) Ruby 2.6 went EOL in March 2022. We originally didn't drop support for it as doing so would require dropping support for jRuby as well. However, jRuby recently released 9.4 which is Ruby 2.7+ compatible. * Update remaining references from user to contexts (#231) There are multiple places throughout the code where we are still referencing users. I have tried to update all the places where a rename seems reasonable or appropriate. There is still some work to do in the test flag builders, but that will be done in a subsequent commit. * Remove new relic integration (#233) The new relic integration was removed many versions ago but a small trace remained behind. * Rename config option private_attribute_names (#234) Co-authored-by: Eli Bishop * Update test data integration to support contexts (#232) * improve data model validation logging; allow missing/empty attribute for segmentMatch (#236) * Fix JSON serialization failure (#237) When we introduced models for the flag and segment data, we added to_json methods which proxy to the underlying to_json method provided by the `json` gem. We defined the method with a variadic parameter, but we failed to unpack it when passing it on to the underlying implementation. This resulted in a serialization failure which prevented the redis data store from initializing. * fix: Bump eventsource to resolve header parsing (#239) The underlying event source library had an issue where, in certain environments, content-type header detection was failing. This was resolved in v2.2.2 of the event source gem. * Add key to error log for invalid context during variation call for easier debugging (#214) * feat: Add support for payload filtering (#238) * feat: Add data source status provider support (#240) The client instance will now provide access to a `data_source_status_provider`. This provider allows developers to retrieve the status of the SDK on demand, or asynchronously by registering listeners. * test: Skip database integration tests by default (#241) The full unit test suite includes tests relying on externally instances of Redis, Consult, and DynamoDB. By default, we do not want to run these tests. Rather, they should be opt-in by setting the environment variable `LD_SKIP_DATABASE_TESTS=0`. * feat: Introduce flag change tracker api (#242) The client instance will now provide access to a `flag_tracker`. This tracker allows developers to be notified when a flag configuration changes (or optionally when the /value/ of a flag changes for a particular context). * ci: Add code coverage generation (#244) * feat: Add support for data store status monitoring (#243) The client instance will now provide access to a `data_store_status_provider`. This provider allows developers to retrieve the data store status of the SDK on demand, or asynchronously by registering listeners. * Fix monitoring_enabled? access --------- Co-authored-by: Eli Bishop Co-authored-by: LaunchDarklyCI Co-authored-by: Jacob Smith Co-authored-by: Elliot <35050275+Apache-HB@users.noreply.github.com> Co-authored-by: Ben Woskow Co-authored-by: Ben Woskow <48036130+bwoskow-ld@users.noreply.github.com> Co-authored-by: hroederld Co-authored-by: Kerrie Martinez Co-authored-by: pellyg-ld Co-authored-by: Sam Stokes Co-authored-by: LaunchDarklyReleaseBot Co-authored-by: Ember Stevens Co-authored-by: ember-stevens <79482775+ember-stevens@users.noreply.github.com> Co-authored-by: Louis Chan <91093020+louis-launchdarkly@users.noreply.github.com> Co-authored-by: Matthew M. Keeler Co-authored-by: Ben Levy Co-authored-by: Ben Levy Co-authored-by: Matthew M. Keeler Co-authored-by: Louis Chan Co-authored-by: Matt Hooks <46452201+matt-dutchie@users.noreply.github.com> --- .circleci/config.yml | 10 + .ldrelease/config.yml | 2 - CONTRIBUTING.md | 2 +- launchdarkly-server-sdk.gemspec | 1 + lib/ldclient-rb/config.rb | 15 + lib/ldclient-rb/impl/broadcaster.rb | 78 +++ lib/ldclient-rb/impl/data_source.rb | 188 +++++++ lib/ldclient-rb/impl/data_store.rb | 59 +++ lib/ldclient-rb/impl/dependency_tracker.rb | 102 ++++ lib/ldclient-rb/impl/flag_tracker.rb | 58 +++ .../impl/integrations/consul_impl.rb | 12 + .../impl/integrations/dynamodb_impl.rb | 8 + .../impl/integrations/file_data_source.rb | 18 +- .../impl/integrations/redis_impl.rb | 16 + lib/ldclient-rb/impl/repeating_task.rb | 5 +- lib/ldclient-rb/impl/store_client_wrapper.rb | 110 +++- lib/ldclient-rb/in_memory_store.rb | 7 + lib/ldclient-rb/integrations/file_data.rb | 2 +- .../integrations/util/store_wrapper.rb | 11 + lib/ldclient-rb/interfaces.rb | 489 ++++++++++++++++++ lib/ldclient-rb/ldclient.rb | 68 ++- lib/ldclient-rb/polling.rb | 56 +- lib/ldclient-rb/stream.rb | 116 ++++- spec/impl/data_source_spec.rb | 339 ++++++++++++ spec/impl/flag_tracker_spec.rb | 77 +++ spec/impl/store_client_wrapper_spec.rb | 84 +++ spec/in_memory_feature_store_spec.rb | 6 + .../integrations/consul_feature_store_spec.rb | 32 +- spec/integrations/dynamodb_stores_spec.rb | 31 +- spec/integrations/file_data_source_spec.rb | 39 +- spec/integrations/redis_stores_spec.rb | 24 +- spec/integrations/store_wrapper_spec.rb | 19 + spec/polling_spec.rb | 72 ++- spec/spec_helper.rb | 31 ++ spec/stream_spec.rb | 25 +- 35 files changed, 2138 insertions(+), 74 deletions(-) create mode 100644 lib/ldclient-rb/impl/broadcaster.rb create mode 100644 lib/ldclient-rb/impl/data_source.rb create mode 100644 lib/ldclient-rb/impl/data_store.rb create mode 100644 lib/ldclient-rb/impl/dependency_tracker.rb create mode 100644 lib/ldclient-rb/impl/flag_tracker.rb create mode 100644 spec/impl/data_source_spec.rb create mode 100644 spec/impl/flag_tracker_spec.rb create mode 100644 spec/impl/store_client_wrapper_spec.rb diff --git a/.circleci/config.yml b/.circleci/config.yml index 6b6de30d..5c446bb1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -30,6 +30,10 @@ jobs: build-test-windows: executor: win/default + environment: + LD_SKIP_DATABASE_TESTS: 0 + LD_ENABLE_CODE_COVERAGE: 1 + steps: - checkout @@ -79,6 +83,7 @@ jobs: - run: bundle _2.2.33_ install - run: mkdir /tmp/circle-artifacts - run: bundle _2.2.33_ exec rspec --format documentation --format RspecJunitFormatter -o /tmp/circle-artifacts/rspec.xml spec + - run: mv coverage /tmp/circle-artifacts/ - store_test_results: path: /tmp/circle-artifacts @@ -99,6 +104,10 @@ jobs: - image: redis - image: amazon/dynamodb-local + environment: + LD_SKIP_DATABASE_TESTS: 0 + LD_ENABLE_CODE_COVERAGE: 1 + steps: - checkout - when: @@ -116,6 +125,7 @@ jobs: - run: bundle _2.2.33_ install - run: mkdir /tmp/circle-artifacts - run: bundle _2.2.33_ exec rspec --format documentation --format RspecJunitFormatter -o /tmp/circle-artifacts/rspec.xml spec + - run: mv coverage /tmp/circle-artifacts/ - when: condition: diff --git a/.ldrelease/config.yml b/.ldrelease/config.yml index a62ec91f..b4f60bc0 100644 --- a/.ldrelease/config.yml +++ b/.ldrelease/config.yml @@ -20,8 +20,6 @@ jobs: image: ruby:2.7-buster template: name: ruby - env: - LD_SKIP_DATABASE_TESTS: "1" # Don't run Redis/Consul/DynamoDB tests in release; they are run in CI documentation: gitHubPages: true diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index edaa9a64..b458e834 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -30,7 +30,7 @@ To run all unit tests: bundle exec rspec spec ``` -By default, the full unit test suite includes live tests of the integrations for Consul, DynamoDB, and Redis. Those tests expect you to have instances of all of those databases running locally. To skip them, set the environment variable `LD_SKIP_DATABASE_TESTS=1` before running the tests. +The full unit test suite includes live tests of the integrations for Consul, DynamoDB, and Redis. Those tests expect you to have instances of all of those databases running locally. By default, these tests will be skipped. To run them, set the environment variable `LD_SKIP_DATABASE_TESTS=0` before running the tests. ### Building documentation diff --git a/launchdarkly-server-sdk.gemspec b/launchdarkly-server-sdk.gemspec index 5b68fa57..a1d299cc 100644 --- a/launchdarkly-server-sdk.gemspec +++ b/launchdarkly-server-sdk.gemspec @@ -23,6 +23,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "aws-sdk-dynamodb", "~> 1.57" spec.add_development_dependency "bundler", "2.2.33" + spec.add_development_dependency "simplecov", "~> 0.21" spec.add_development_dependency "rspec", "~> 3.10" spec.add_development_dependency "diplomat", "~> 2.6" spec.add_development_dependency "redis", "~> 5.0" diff --git a/lib/ldclient-rb/config.rb b/lib/ldclient-rb/config.rb index 536e9019..05b3e7cd 100644 --- a/lib/ldclient-rb/config.rb +++ b/lib/ldclient-rb/config.rb @@ -90,8 +90,23 @@ def initialize(opts = {}) @big_segments = opts[:big_segments] || BigSegmentsConfig.new(store: nil) @application = LaunchDarkly::Impl::Util.validate_application_info(opts[:application] || {}, @logger) @payload_filter_key = opts[:payload_filter_key] + @data_source_update_sink = nil end + # + # Returns the component that allows a data source to push data into the SDK. + # + # This property should only be set by the SDK. Long term access of this + # property is not supported; it is temporarily being exposed to maintain + # backwards compatibility while the SDK structure is updated. + # + # Custom data source implementations should integrate with this sink if + # they want to provide support for data source status listeners. + # + # @private + # + attr_accessor :data_source_update_sink + # # The base URL for the LaunchDarkly server. This is configurable mainly for testing # purposes; most users should use the default value. diff --git a/lib/ldclient-rb/impl/broadcaster.rb b/lib/ldclient-rb/impl/broadcaster.rb new file mode 100644 index 00000000..f914abcc --- /dev/null +++ b/lib/ldclient-rb/impl/broadcaster.rb @@ -0,0 +1,78 @@ +# frozen_string_literal: true + +module LaunchDarkly + module Impl + + # + # A generic mechanism for registering event listeners and broadcasting + # events to them. + # + # The SDK maintains an instance of this for each available type of listener + # (flag change, data store status, etc.). They are all intended to share a + # single executor service; notifications are submitted individually to this + # service for each listener. + # + class Broadcaster + def initialize(executor, logger) + @listeners = Concurrent::Set.new + @executor = executor + @logger = logger + end + + # + # Register a listener to this broadcaster. + # + # @param listener [#update] + # + def add_listener(listener) + unless listener.respond_to? :update + logger.warn("listener (#{listener.class}) does not respond to :update method. ignoring as registered listener") + return + end + + listeners.add(listener) + end + + # + # Removes a registered listener from this broadcaster. + # + def remove_listener(listener) + listeners.delete(listener) + end + + def has_listeners? + !listeners.empty? + end + + # + # Broadcast the provided event to all registered listeners. + # + # Each listener will be notified using the broadcasters executor. This + # method is non-blocking. + # + def broadcast(event) + listeners.each do |listener| + executor.post do + begin + listener.update(event) + rescue StandardError => e + logger.error("listener (#{listener.class}) raised exception (#{e}) processing event (#{event.class})") + end + end + end + end + + + private + + # @return [Concurrent::ThreadPoolExecutor] + attr_reader :executor + + # @return [Logger] + attr_reader :logger + + # @return [Concurrent::Set] + attr_reader :listeners + end + end +end diff --git a/lib/ldclient-rb/impl/data_source.rb b/lib/ldclient-rb/impl/data_source.rb new file mode 100644 index 00000000..44cf5eab --- /dev/null +++ b/lib/ldclient-rb/impl/data_source.rb @@ -0,0 +1,188 @@ +require "concurrent" +require "forwardable" +require "ldclient-rb/impl/dependency_tracker" +require "ldclient-rb/interfaces" +require "set" + +module LaunchDarkly + module Impl + module DataSource + class StatusProvider + include LaunchDarkly::Interfaces::DataSource::StatusProvider + + extend Forwardable + def_delegators :@status_broadcaster, :add_listener, :remove_listener + + def initialize(status_broadcaster, update_sink) + # @type [Broadcaster] + @status_broadcaster = status_broadcaster + # @type [UpdateSink] + @data_source_update_sink = update_sink + end + + def status + @data_source_update_sink.current_status + end + end + + class UpdateSink + include LaunchDarkly::Interfaces::DataSource::UpdateSink + + # @return [LaunchDarkly::Interfaces::DataSource::Status] + attr_reader :current_status + + def initialize(data_store, status_broadcaster, flag_change_broadcaster) + # @type [LaunchDarkly::Interfaces::FeatureStore] + @data_store = data_store + # @type [Broadcaster] + @status_broadcaster = status_broadcaster + # @type [Broadcaster] + @flag_change_broadcaster = flag_change_broadcaster + @dependency_tracker = LaunchDarkly::Impl::DependencyTracker.new + + @mutex = Mutex.new + @current_status = LaunchDarkly::Interfaces::DataSource::Status.new( + LaunchDarkly::Interfaces::DataSource::Status::INITIALIZING, + Time.now, + nil) + end + + def init(all_data) + old_data = nil + monitor_store_update do + if @flag_change_broadcaster.has_listeners? + old_data = {} + LaunchDarkly::ALL_KINDS.each do |kind| + old_data[kind] = @data_store.all(kind) + end + end + + @data_store.init(all_data) + end + + update_full_dependency_tracker(all_data) + + return if old_data.nil? + + send_change_events( + compute_changed_items_for_full_data_set(old_data, all_data) + ) + end + + def upsert(kind, item) + monitor_store_update { @data_store.upsert(kind, item) } + + # TODO(sc-197908): We only want to do this if the store successfully + # updates the record. + @dependency_tracker.update_dependencies_from(kind, item[:key], item) + if @flag_change_broadcaster.has_listeners? + affected_items = Set.new + @dependency_tracker.add_affected_items(affected_items, {kind: kind, key: item[:key]}) + send_change_events(affected_items) + end + end + + def delete(kind, key, version) + monitor_store_update { @data_store.delete(kind, key, version) } + + @dependency_tracker.update_dependencies_from(kind, key, nil) + if @flag_change_broadcaster.has_listeners? + affected_items = Set.new + @dependency_tracker.add_affected_items(affected_items, {kind: kind, key: key}) + send_change_events(affected_items) + end + end + + def update_status(new_state, new_error) + return if new_state.nil? + + status_to_broadcast = nil + + @mutex.synchronize do + old_status = @current_status + + if new_state == LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED && old_status.state == LaunchDarkly::Interfaces::DataSource::Status::INITIALIZING + # See {LaunchDarkly::Interfaces::DataSource::UpdateSink#update_status} for more information + new_state = LaunchDarkly::Interfaces::DataSource::Status::INITIALIZING + end + + unless new_state == old_status.state && new_error.nil? + @current_status = LaunchDarkly::Interfaces::DataSource::Status.new( + new_state, + new_state == current_status.state ? current_status.state_since : Time.now, + new_error.nil? ? current_status.last_error : new_error + ) + status_to_broadcast = current_status + end + end + + @status_broadcaster.broadcast(status_to_broadcast) unless status_to_broadcast.nil? + end + + private def update_full_dependency_tracker(all_data) + @dependency_tracker.reset + all_data.each do |kind, items| + items.each do |key, item| + @dependency_tracker.update_dependencies_from(kind, item.key, item) + end + end + end + + + # + # Method to monitor updates to the store. You provide a block to update + # the store. This mthod wraps that block, catching and re-raising all + # errors, and notifying all status listeners of the error. + # + private def monitor_store_update + begin + yield + rescue => e + error_info = LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(LaunchDarkly::Interfaces::DataSource::ErrorInfo::STORE_ERROR, 0, e.to_s, Time.now) + update_status(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, error_info) + raise + end + end + + + # + # @param [Hash] old_data + # @param [Hash] new_data + # @return [Set] + # + private def compute_changed_items_for_full_data_set(old_data, new_data) + affected_items = Set.new + + LaunchDarkly::ALL_KINDS.each do |kind| + old_items = old_data[kind] || {} + new_items = new_data[kind] || {} + + old_items.keys.concat(new_items.keys).each do |key| + old_item = old_items[key] + new_item = new_items[key] + + next if old_item.nil? && new_item.nil? + + if old_item.nil? || new_item.nil? || old_item[:version] < new_item[:version] + @dependency_tracker.add_affected_items(affected_items, {kind: kind, key: key.to_s}) + end + end + end + + affected_items + end + + # + # @param affected_items [Set] + # + private def send_change_events(affected_items) + affected_items.each do |item| + if item[:kind] == LaunchDarkly::FEATURES + @flag_change_broadcaster.broadcast(LaunchDarkly::Interfaces::FlagChange.new(item[:key])) + end + end + end + end + end + end +end diff --git a/lib/ldclient-rb/impl/data_store.rb b/lib/ldclient-rb/impl/data_store.rb new file mode 100644 index 00000000..e834e522 --- /dev/null +++ b/lib/ldclient-rb/impl/data_store.rb @@ -0,0 +1,59 @@ +require 'concurrent' +require "ldclient-rb/interfaces" + +module LaunchDarkly + module Impl + module DataStore + class StatusProvider + include LaunchDarkly::Interfaces::DataStore::StatusProvider + + def initialize(store, update_sink) + # @type [LaunchDarkly::Impl::FeatureStoreClientWrapper] + @store = store + # @type [UpdateSink] + @update_sink = update_sink + end + + def status + @update_sink.last_status.get + end + + def monitoring_enabled? + @store.monitoring_enabled? + end + + def add_listener(listener) + @update_sink.broadcaster.add_listener(listener) + end + + def remove_listener(listener) + @update_sink.broadcaster.remove_listener(listener) + end + end + + class UpdateSink + include LaunchDarkly::Interfaces::DataStore::UpdateSink + + # @return [LaunchDarkly::Impl::Broadcaster] + attr_reader :broadcaster + + # @return [Concurrent::AtomicReference] + attr_reader :last_status + + def initialize(broadcaster) + @broadcaster = broadcaster + @last_status = Concurrent::AtomicReference.new( + LaunchDarkly::Interfaces::DataStore::Status.new(true, false) + ) + end + + def update_status(status) + return if status.nil? + + old_status = @last_status.get_and_set(status) + @broadcaster.broadcast(status) unless old_status == status + end + end + end + end +end diff --git a/lib/ldclient-rb/impl/dependency_tracker.rb b/lib/ldclient-rb/impl/dependency_tracker.rb new file mode 100644 index 00000000..0784c00c --- /dev/null +++ b/lib/ldclient-rb/impl/dependency_tracker.rb @@ -0,0 +1,102 @@ +module LaunchDarkly + module Impl + class DependencyTracker + def initialize + @from = {} + @to = {} + end + + # + # Updates the dependency graph when an item has changed. + # + # @param from_kind [Object] the changed item's kind + # @param from_key [String] the changed item's key + # @param from_item [Object] the changed item + # + def update_dependencies_from(from_kind, from_key, from_item) + from_what = { kind: from_kind, key: from_key } + updated_dependencies = DependencyTracker.compute_dependencies_from(from_kind, from_item) + + old_dependency_set = @from[from_what] + unless old_dependency_set.nil? + old_dependency_set.each do |kind_and_key| + deps_to_this_old_dep = @to[kind_and_key] + deps_to_this_old_dep&.delete(from_what) + end + end + + @from[from_what] = updated_dependencies + updated_dependencies.each do |kind_and_key| + deps_to_this_new_dep = @to[kind_and_key] + if deps_to_this_new_dep.nil? + deps_to_this_new_dep = Set.new + @to[kind_and_key] = deps_to_this_new_dep + end + deps_to_this_new_dep.add(from_what) + end + end + + def self.segment_keys_from_clauses(clauses) + clauses.flat_map do |clause| + if clause.op == :segmentMatch + clause.values.map { |value| {kind: LaunchDarkly::SEGMENTS, key: value }} + else + [] + end + end + end + + # + # @param from_kind [String] + # @param from_item [LaunchDarkly::Impl::Model::FeatureFlag, LaunchDarkly::Impl::Model::Segment] + # @return [Set] + # + def self.compute_dependencies_from(from_kind, from_item) + return Set.new if from_item.nil? + + if from_kind == LaunchDarkly::FEATURES + prereq_keys = from_item.prerequisites.map { |prereq| {kind: from_kind, key: prereq.key} } + segment_keys = from_item.rules.flat_map { |rule| DependencyTracker.segment_keys_from_clauses(rule.clauses) } + + results = Set.new(prereq_keys) + results.merge(segment_keys) + elsif from_kind == LaunchDarkly::SEGMENTS + kind_and_keys = from_item.rules.flat_map do |rule| + DependencyTracker.segment_keys_from_clauses(rule.clauses) + end + Set.new(kind_and_keys) + else + Set.new + end + end + + # + # Clear any tracked dependencies and reset the tracking state to a clean slate. + # + def reset + @from.clear + @to.clear + end + + # + # Populates the given set with the union of the initial item and all items that directly or indirectly + # depend on it (based on the current state of the dependency graph). + # + # @param items_out [Set] + # @param initial_modified_item [Object] + # + def add_affected_items(items_out, initial_modified_item) + return if items_out.include? initial_modified_item + + items_out.add(initial_modified_item) + affected_items = @to[initial_modified_item] + + return if affected_items.nil? + + affected_items.each do |affected_item| + add_affected_items(items_out, affected_item) + end + end + end + end +end diff --git a/lib/ldclient-rb/impl/flag_tracker.rb b/lib/ldclient-rb/impl/flag_tracker.rb new file mode 100644 index 00000000..83341118 --- /dev/null +++ b/lib/ldclient-rb/impl/flag_tracker.rb @@ -0,0 +1,58 @@ +require "concurrent" +require "ldclient-rb/interfaces" +require "forwardable" + +module LaunchDarkly + module Impl + class FlagTracker + include LaunchDarkly::Interfaces::FlagTracker + + extend Forwardable + def_delegators :@broadcaster, :add_listener, :remove_listener + + def initialize(broadcaster, eval_fn) + @broadcaster = broadcaster + @eval_fn = eval_fn + end + + def add_flag_value_change_listener(key, context, listener) + flag_change_listener = FlagValueChangeAdapter.new(key, context, listener, @eval_fn) + add_listener(flag_change_listener) + + flag_change_listener + end + + # + # An adapter which turns a normal flag change listener into a flag value change listener. + # + class FlagValueChangeAdapter + # @param [Symbol] flag_key + # @param [LaunchDarkly::LDContext] context + # @param [#update] listener + # @param [#call] eval_fn + def initialize(flag_key, context, listener, eval_fn) + @flag_key = flag_key + @context = context + @listener = listener + @eval_fn = eval_fn + @value = Concurrent::AtomicReference.new(@eval_fn.call(@flag_key, @context)) + end + + # + # @param [LaunchDarkly::Interfaces::FlagChange] flag_change + # + def update(flag_change) + return unless flag_change.key == @flag_key + + new_eval = @eval_fn.call(@flag_key, @context) + old_eval = @value.get_and_set(new_eval) + + return if new_eval == old_eval + + @listener.update( + LaunchDarkly::Interfaces::FlagValueChange.new(@flag_key, old_eval, new_eval)) + end + end + end + end +end diff --git a/lib/ldclient-rb/impl/integrations/consul_impl.rb b/lib/ldclient-rb/impl/integrations/consul_impl.rb index f5043fb9..099bb3c8 100644 --- a/lib/ldclient-rb/impl/integrations/consul_impl.rb +++ b/lib/ldclient-rb/impl/integrations/consul_impl.rb @@ -119,6 +119,18 @@ def initialized_internal? end end + def available? + # Most implementations use the initialized_internal? method as a + # proxy for this check. However, since `initialized_internal?` + # catches a KeyNotFound exception, and that exception can be raised + # when the server goes away, we have to modify our behavior + # slightly. + Diplomat::Kv.get(inited_key, {}, :return, :return) + true + rescue + false + end + def stop # There's no Consul client instance to dispose of end diff --git a/lib/ldclient-rb/impl/integrations/dynamodb_impl.rb b/lib/ldclient-rb/impl/integrations/dynamodb_impl.rb index 0995b36b..07b8944e 100644 --- a/lib/ldclient-rb/impl/integrations/dynamodb_impl.rb +++ b/lib/ldclient-rb/impl/integrations/dynamodb_impl.rb @@ -62,6 +62,14 @@ def description "DynamoDBFeatureStore" end + def available? + resp = get_item_by_keys(inited_key, inited_key) + !resp.item.nil? && resp.item.length > 0 + true + rescue + false + end + def init_internal(all_data) # Start by reading the existing keys; we will later delete any of these that weren't in all_data. unused_old_keys = read_existing_keys(all_data.keys) diff --git a/lib/ldclient-rb/impl/integrations/file_data_source.rb b/lib/ldclient-rb/impl/integrations/file_data_source.rb index d8f22745..60c0d9c6 100644 --- a/lib/ldclient-rb/impl/integrations/file_data_source.rb +++ b/lib/ldclient-rb/impl/integrations/file_data_source.rb @@ -20,8 +20,15 @@ class FileDataSourceImpl rescue LoadError end - def initialize(feature_store, logger, options={}) - @feature_store = feature_store + # + # @param data_store [LaunchDarkly::Interfaces::FeatureStore] + # @param data_source_update_sink [LaunchDarkly::Interfaces::DataSource::UpdateSink, nil] Might be nil for backwards compatibility reasons. + # @param logger [Logger] + # @param options [Hash] + # + def initialize(data_store, data_source_update_sink, logger, options={}) + @data_store = data_source_update_sink || data_store + @data_source_update_sink = data_source_update_sink @logger = logger @paths = options[:paths] || [] if @paths.is_a? String @@ -80,10 +87,15 @@ def load_all load_file(path, all_data) rescue => exn LaunchDarkly::Util.log_exception(@logger, "Unable to load flag data from \"#{path}\"", exn) + @data_source_update_sink&.update_status( + LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, + LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(LaunchDarkly::Interfaces::DataSource::ErrorInfo::INVALID_DATA, 0, exn.to_s, Time.now) + ) return end end - @feature_store.init(all_data) + @data_store.init(all_data) + @data_source_update_sink&.update_status(LaunchDarkly::Interfaces::DataSource::Status::VALID, nil) @initialized.make_true end diff --git a/lib/ldclient-rb/impl/integrations/redis_impl.rb b/lib/ldclient-rb/impl/integrations/redis_impl.rb index 14739e97..ccdf388e 100644 --- a/lib/ldclient-rb/impl/integrations/redis_impl.rb +++ b/lib/ldclient-rb/impl/integrations/redis_impl.rb @@ -42,6 +42,14 @@ def initialize(opts = {}) @wrapper = LaunchDarkly::Integrations::Util::CachingStoreWrapper.new(core, opts) end + def monitoring_enabled? + true + end + + def available? + @wrapper.available? + end + # # Default value for the `redis_url` constructor parameter; points to an instance of Redis # running at `localhost` with its default port. @@ -154,6 +162,14 @@ def initialize(opts) @test_hook = opts[:test_hook] # used for unit tests, deliberately undocumented end + def available? + # We don't care what the status is, only that we can connect + initialized_internal? + true + rescue + false + end + def description "RedisFeatureStore" end diff --git a/lib/ldclient-rb/impl/repeating_task.rb b/lib/ldclient-rb/impl/repeating_task.rb index 299454cc..5fd0d029 100644 --- a/lib/ldclient-rb/impl/repeating_task.rb +++ b/lib/ldclient-rb/impl/repeating_task.rb @@ -16,9 +16,8 @@ def initialize(interval, start_delay, task, logger) def start @worker = Thread.new do - if @start_delay - sleep(@start_delay) - end + sleep(@start_delay) unless @start_delay.nil? || @start_delay == 0 + until @stopped.value do started_at = Time.now begin diff --git a/lib/ldclient-rb/impl/store_client_wrapper.rb b/lib/ldclient-rb/impl/store_client_wrapper.rb index f0948251..1651c383 100644 --- a/lib/ldclient-rb/impl/store_client_wrapper.rb +++ b/lib/ldclient-rb/impl/store_client_wrapper.rb @@ -1,3 +1,4 @@ +require "concurrent" require "ldclient-rb/interfaces" require "ldclient-rb/impl/store_data_set_sorter" @@ -5,34 +6,45 @@ module LaunchDarkly module Impl # # Provides additional behavior that the client requires before or after feature store operations. - # Currently this just means sorting the data set for init(). In the future we may also use this - # to provide an update listener capability. + # This just means sorting the data set for init() and dealing with data store status listeners. # class FeatureStoreClientWrapper include Interfaces::FeatureStore - def initialize(store) + def initialize(store, store_update_sink, logger) + # @type [LaunchDarkly::Interfaces::FeatureStore] @store = store + + @monitoring_enabled = does_store_support_monitoring? + + # @type [LaunchDarkly::Impl::DataStore::UpdateSink] + @store_update_sink = store_update_sink + @logger = logger + + @mutex = Mutex.new # Covers the following variables + @last_available = true + # @type [LaunchDarkly::Impl::RepeatingTask, nil] + @poller = nil end def init(all_data) - @store.init(FeatureStoreDataSetSorter.sort_all_collections(all_data)) + wrapper { @store.init(FeatureStoreDataSetSorter.sort_all_collections(all_data)) } end def get(kind, key) - @store.get(kind, key) + wrapper { @store.get(kind, key) } end def all(kind) - @store.all(kind) + wrapper { @store.all(kind) } end def upsert(kind, item) - @store.upsert(kind, item) + wrapper { @store.upsert(kind, item) } end def delete(kind, key, version) - @store.delete(kind, key, version) + wrapper { @store.delete(kind, key, version) } end def initialized? @@ -41,6 +53,88 @@ def initialized? def stop @store.stop + @mutex.synchronize do + return if @poller.nil? + + @poller.stop + @poller = nil + end + end + + def monitoring_enabled? + @monitoring_enabled + end + + private def wrapper() + begin + yield + rescue => e + update_availability(false) if @monitoring_enabled + raise + end + end + + private def update_availability(available) + @mutex.synchronize do + return if available == @last_available + @last_available = available + end + + status = LaunchDarkly::Interfaces::DataStore::Status.new(available, false) + + @logger.warn("Persistent store is available again") if available + + @store_update_sink.update_status(status) + + if available + @mutex.synchronize do + return if @poller.nil? + + @poller.stop + @poller = nil + end + + return + end + + @logger.warn("Detected persistent store unavailability; updates will be cached until it recovers.") + + task = Impl::RepeatingTask.new(0.5, 0, -> { self.check_availability }, @logger) + + @mutex.synchronize do + @poller = task + @poller.start + end + end + + private def check_availability + begin + update_availability(true) if @store.available? + rescue => e + @logger.error("Unexpected error from data store status function: #{e}") + end + end + + # This methods determines whether the wrapped store can support enabling monitoring. + # + # The wrapped store must provide a monitoring_enabled method, which must + # be true. But this alone is not sufficient. + # + # Because this class wraps all interactions with a provided store, it can + # technically "monitor" any store. However, monitoring also requires that + # we notify listeners when the store is available again. + # + # We determine this by checking the store's `available?` method, so this + # is also a requirement for monitoring support. + # + # These extra checks won't be necessary once `available` becomes a part + # of the core interface requirements and this class no longer wraps every + # feature store. + private def does_store_support_monitoring? + return false unless @store.respond_to? :monitoring_enabled? + return false unless @store.respond_to? :available? + + @store.monitoring_enabled? end end end diff --git a/lib/ldclient-rb/in_memory_store.rb b/lib/ldclient-rb/in_memory_store.rb index dcef4529..ad4f6e85 100644 --- a/lib/ldclient-rb/in_memory_store.rb +++ b/lib/ldclient-rb/in_memory_store.rb @@ -23,6 +23,9 @@ module LaunchDarkly priority: 0, }.freeze + # @private + ALL_KINDS = [FEATURES, SEGMENTS].freeze + # # Default implementation of the LaunchDarkly client's feature store, using an in-memory # cache. This object holds feature flags and related data received from LaunchDarkly. @@ -37,6 +40,10 @@ def initialize @initialized = Concurrent::AtomicBoolean.new(false) end + def monitoring_enabled? + false + end + def get(kind, key) @lock.with_read_lock do coll = @items[kind] diff --git a/lib/ldclient-rb/integrations/file_data.rb b/lib/ldclient-rb/integrations/file_data.rb index 4c356667..f747a4d6 100644 --- a/lib/ldclient-rb/integrations/file_data.rb +++ b/lib/ldclient-rb/integrations/file_data.rb @@ -101,7 +101,7 @@ module FileData # def self.data_source(options={}) lambda { |sdk_key, config| - Impl::Integrations::FileDataSourceImpl.new(config.feature_store, config.logger, options) } + Impl::Integrations::FileDataSourceImpl.new(config.feature_store, config.data_source_update_sink, config.logger, options) } end end end diff --git a/lib/ldclient-rb/integrations/util/store_wrapper.rb b/lib/ldclient-rb/integrations/util/store_wrapper.rb index bb129c9c..808e2e33 100644 --- a/lib/ldclient-rb/integrations/util/store_wrapper.rb +++ b/lib/ldclient-rb/integrations/util/store_wrapper.rb @@ -43,6 +43,17 @@ def initialize(core, opts) end @inited = Concurrent::AtomicBoolean.new(false) + @has_available_method = @core.respond_to? :available? + end + + def monitoring_enabled? + @has_available_method + end + + def available? + return false unless @has_available_method + + @core.available? end def init(all_data) diff --git a/lib/ldclient-rb/interfaces.rb b/lib/ldclient-rb/interfaces.rb index 64120dd5..c3a6ac15 100644 --- a/lib/ldclient-rb/interfaces.rb +++ b/lib/ldclient-rb/interfaces.rb @@ -112,6 +112,269 @@ def initialized? # def stop end + + # + # WARN: This isn't a required method on a FeatureStore yet. The SDK will + # currently check if the provided store responds to this method, and if + # it does, will take appropriate action based on the documented behavior + # below. This will become required in a future major version release of + # the SDK. + # + # Returns true if this data store implementation supports status + # monitoring. + # + # This is normally only true for persistent data stores but it could also + # be true for any custom {FeatureStore} implementation. + # + # Returning true means that the store guarantees that if it ever enters + # an invalid state (that is, an operation has failed or it knows that + # operations cannot succeed at the moment), it will publish a status + # update, and will then publish another status update once it has + # returned to a valid state. + # + # Custom implementations must implement `def available?` which + # synchronously checks if the store is available. Without this method, + # the SDK cannot ensure status updates will occur once the store has gone + # offline. + # + # The same value will be returned from + # {StatusProvider::monitoring_enabled?}. + # + # def monitoring_enabled? end + + # + # WARN: This isn't a required method on a FeatureStore. The SDK will + # check if the provided store responds to this method, and if it does, + # will take appropriate action based on the documented behavior below. + # Usage of this method will be dropped in a future version of the SDK. + # + # Tests whether the data store seems to be functioning normally. + # + # This should not be a detailed test of different kinds of operations, + # but just the smallest possible operation to determine whether (for + # instance) we can reach the database. + # + # Whenever one of the store's other methods throws an exception, the SDK + # will assume that it may have become unavailable (e.g. the database + # connection was lost). The SDK will then call {#available?} at intervals + # until it returns true. + # + # @return [Boolean] true if the underlying data store is reachable + # + # def available? end + end + + # + # An interface for tracking changes in feature flag configurations. + # + # An implementation of this interface is returned by {LaunchDarkly::LDClient#flag_tracker}. + # Application code never needs to implement this interface. + # + module FlagTracker + # + # Registers a listener to be notified of feature flag changes in general. + # + # The listener will be notified whenever the SDK receives any change to any feature flag's configuration, + # or to a user segment that is referenced by a feature flag. If the updated flag is used as a prerequisite + # for other flags, the SDK assumes that those flags may now behave differently and sends flag change events + # for them as well. + # + # Note that this does not necessarily mean the flag's value has changed for any particular evaluation + # context, only that some part of the flag configuration was changed so that it may return a + # different value than it previously returned for some context. If you want to track flag value changes, + # use {#add_flag_value_change_listener} instead. + # + # It is possible, given current design restrictions, that a listener might be notified when no change has + # occurred. This edge case will be addressed in a later version of the SDK. It is important to note this issue + # does not affect {#add_flag_value_change_listener} listeners. + # + # If using the file data source, any change in a data file will be treated as a change to every flag. Again, + # use {#add_flag_value_change_listener} (or just re-evaluate the flag # yourself) if you want to know whether + # this is a change that really affects a flag's value. + # + # Change events only work if the SDK is actually connecting to LaunchDarkly (or using the file data source). + # If the SDK is only reading flags from a database then it cannot know when there is a change, because + # flags are read on an as-needed basis. + # + # The listener will be called from a worker thread. + # + # Calling this method for an already-registered listener has no effect. + # + # @param listener [#update] + # + def add_listener(listener) end + + # + # Unregisters a listener so that it will no longer be notified of feature flag changes. + # + # Calling this method for a listener that was not previously registered has no effect. + # + # @param listener [Object] + # + def remove_listener(listener) end + + # + # Registers a listener to be notified of a change in a specific feature flag's value for a specific + # evaluation context. + # + # When you call this method, it first immediately evaluates the feature flag. It then uses + # {#add_listener} to start listening for feature flag configuration + # changes, and whenever the specified feature flag changes, it re-evaluates the flag for the same context. + # It then calls your listener if and only if the resulting value has changed. + # + # All feature flag evaluations require an instance of {LaunchDarkly::LDContext}. If the feature flag you are + # tracking does not have any context targeting rules, you must still pass a dummy context such as + # `LDContext.with_key("for-global-flags")`. If you do not want the user to appear on your dashboard, + # use the anonymous property: `LDContext.create({key: "for-global-flags", kind: "user", anonymous: true})`. + # + # The returned listener represents the subscription that was created by this method + # call; to unsubscribe, pass that object (not your listener) to {#remove_listener}. + # + # @param key [Symbol] + # @param context [LaunchDarkly::LDContext] + # @param listener [#update] + # + def add_flag_value_change_listener(key, context, listener) end + end + + # + # Change event fired when some aspect of the flag referenced by the key has changed. + # + class FlagChange + attr_accessor :key + + # @param [Symbol] key + def initialize(key) + @key = key + end + end + + # + # Change event fired when the evaluated value for the specified flag key has changed. + # + class FlagValueChange + attr_accessor :key + attr_accessor :old_value + attr_accessor :new_value + + # @param [Symbol] key + # @param [Object] old_value + # @param [Object] new_value + def initialize(key, old_value, new_value) + @key = key + @old_value = old_value + @new_value = new_value + end + end + + module DataStore + # + # An interface for querying the status of a persistent data store. + # + # An implementation of this interface is returned by {LaunchDarkly::LDClient#data_store_status_provider}. + # Application code should not implement this interface. + # + module StatusProvider + # + # Returns the current status of the store. + # + # This is only meaningful for persistent stores, or any custom data store implementation that makes use of + # the status reporting mechanism provided by the SDK. For the default in-memory store, the status will always + # be reported as "available". + # + # @return [Status] the latest status + # + def status + end + + # + # Indicates whether the current data store implementation supports status monitoring. + # + # This is normally true for all persistent data stores, and false for the default in-memory store. A true value + # means that any listeners added with {#add_listener} can expect to be notified if there is any error in + # storing data, and then notified again when the error condition is resolved. A false value means that the + # status is not meaningful and listeners should not expect to be notified. + # + # @return [Boolean] true if status monitoring is enabled + # + def monitoring_enabled? + end + + # + # Subscribes for notifications of status changes. + # + # Applications may wish to know if there is an outage in a persistent data store, since that could mean that + # flag evaluations are unable to get the flag data from the store (unless it is currently cached) and therefore + # might return default values. + # + # If the SDK receives an exception while trying to query or update the data store, then it notifies listeners + # that the store appears to be offline ({Status#available} is false) and begins polling the store + # at intervals until a query succeeds. Once it succeeds, it notifies listeners again with {Status#available} + # set to true. + # + # This method has no effect if the data store implementation does not support status tracking, such as if you + # are using the default in-memory store rather than a persistent store. + # + # @param listener [#update] the listener to add + # + def add_listener(listener) + end + + # + # Unsubscribes from notifications of status changes. + # + # This method has no effect if the data store implementation does not support status tracking, such as if you + # are using the default in-memory store rather than a persistent store. + # + # @param listener [Object] the listener to remove; if no such listener was added, this does nothing + # + def remove_listener(listener) + end + end + + # + # Interface that a data store implementation can use to report information back to the SDK. + # + module UpdateSink + # + # Reports a change in the data store's operational status. + # + # This is what makes the status monitoring mechanisms in {StatusProvider} work. + # + # @param status [Status] the updated status properties + # + def update_status(status) + end + end + + class Status + def initialize(available, stale) + @available = available + @stale = stale + end + + # + # Returns true if the SDK believes the data store is now available. + # + # This property is normally true. If the SDK receives an exception while trying to query or update the data + # store, then it sets this property to false (notifying listeners, if any) and polls the store at intervals + # until a query succeeds. Once it succeeds, it sets the property back to true (again notifying listeners). + # + # @return [Boolean] true if store is available + # + attr_reader :available + + # + # Returns true if the store may be out of date due to a previous + # outage, so the SDK should attempt to refresh all feature flag data + # and rewrite it to the store. + # + # This property is not meaningful to application code. + # + # @return [Boolean] true if data should be rewritten + # + attr_reader :stale + end end # @@ -299,5 +562,231 @@ module BigSegmentStoreStatusProvider def status end end + + module DataSource + # + # An interface for querying the status of the SDK's data source. The data + # source is the component that receives updates to feature flag data; + # normally this is a streaming connection, but it could be polling or + # file data depending on your configuration. + # + # An implementation of this interface is returned by + # {LaunchDarkly::LDClient#data_source_status_provider}. Application code + # never needs to implement this interface. + # + module StatusProvider + # + # Returns the current status of the data source. + # + # All of the built-in data source implementations are guaranteed to update this status whenever they + # successfully initialize, encounter an error, or recover after an error. + # + # For a custom data source implementation, it is the responsibility of the data source to push + # status updates to the SDK; if it does not do so, the status will always be reported as + # {Status::INITIALIZING}. + # + # @return [Status] + # + def status + end + + # + # Subscribes for notifications of status changes. + # + # The listener will be notified whenever any property of the status has changed. See {Status} for an + # explanation of the meaning of each property and what could cause it to change. + # + # Notifications will be dispatched on a worker thread. It is the listener's responsibility to return as soon as + # possible so as not to block subsequent notifications. + # + # @param [#update] the listener to add + # + def add_listener(listener) end + + # + # Unsubscribes from notifications of status changes. + # + def remove_listener(listener) end + end + + # + # Interface that a data source implementation will use to push data into + # the SDK. + # + # The data source interacts with this object, rather than manipulating + # the data store directly, so that the SDK can perform any other + # necessary operations that must happen when data is updated. + # + module UpdateSink + # + # Initializes (or re-initializes) the store with the specified set of entities. Any + # existing entries will be removed. Implementations can assume that this data set is up to + # date-- there is no need to perform individual version comparisons between the existing + # objects and the supplied features. + # + # If possible, the store should update the entire data set atomically. If that is not possible, + # it should iterate through the outer hash and then the inner hash using the existing iteration + # order of those hashes (the SDK will ensure that the items were inserted into the hashes in + # the correct order), storing each item, and then delete any leftover items at the very end. + # + # @param all_data [Hash] a hash where each key is one of the data kind objects, and each + # value is in turn a hash of string keys to entities + # @return [void] + # + def init(all_data) end + + # + # Attempt to add an entity, or update an existing entity with the same key. An update + # should only succeed if the new item's `:version` is greater than the old one; + # otherwise, the method should do nothing. + # + # @param kind [Object] the kind of entity to add or update + # @param item [Hash] the entity to add or update + # @return [void] + # + def upsert(kind, item) end + + # + # Attempt to delete an entity if it exists. Deletion should only succeed if the + # `version` parameter is greater than the existing entity's `:version`; otherwise, the + # method should do nothing. + # + # @param kind [Object] the kind of entity to delete + # @param key [String] the unique key of the entity + # @param version [Integer] the entity must have a lower version than this to be deleted + # @return [void] + # + def delete(kind, key, version) end + + # + # Informs the SDK of a change in the data source's status. + # + # Data source implementations should use this method if they have any + # concept of being in a valid state, a temporarily disconnected state, + # or a permanently stopped state. + # + # If `new_state` is different from the previous state, and/or + # `new_error` is non-null, the SDK will start returning the new status + # (adding a timestamp for the change) from {StatusProvider#status}, and + # will trigger status change events to any registered listeners. + # + # A special case is that if {new_state} is {Status::INTERRUPTED}, but the + # previous state was {Status::INITIALIZING}, the state will remain at + # {Status::INITIALIZING} because {Status::INTERRUPTED} is only meaningful + # after a successful startup. + # + # @param new_state [Symbol] + # @param new_error [ErrorInfo, nil] + # + def update_status(new_state, new_error) end + end + + # + # Information about the data source's status and about the last status change. + # + class Status + # + # The initial state of the data source when the SDK is being initialized. + # + # If it encounters an error that requires it to retry initialization, the state will remain at + # {INITIALIZING} until it either succeeds and becomes {VALID}, or permanently fails and + # becomes {OFF}. + # + + INITIALIZING = :initializing + + # + # Indicates that the data source is currently operational and has not had any problems since the + # last time it received data. + # + # In streaming mode, this means that there is currently an open stream connection and that at least + # one initial message has been received on the stream. In polling mode, it means that the last poll + # request succeeded. + # + VALID = :valid + + # + # Indicates that the data source encountered an error that it will attempt to recover from. + # + # In streaming mode, this means that the stream connection failed, or had to be dropped due to some + # other error, and will be retried after a backoff delay. In polling mode, it means that the last poll + # request failed, and a new poll request will be made after the configured polling interval. + # + INTERRUPTED = :interrupted + + # + # Indicates that the data source has been permanently shut down. + # + # This could be because it encountered an unrecoverable error (for instance, the LaunchDarkly service + # rejected the SDK key; an invalid SDK key will never become valid), or because the SDK client was + # explicitly shut down. + # + OFF = :off + + # @return [Symbol] The basic state + attr_reader :state + # @return [Time] timestamp of the last state transition + attr_reader :state_since + # @return [ErrorInfo, nil] a description of the last error or nil if no errors have occurred since startup + attr_reader :last_error + + def initialize(state, state_since, last_error) + @state = state + @state_since = state_since + @last_error = last_error + end + end + + # + # A description of an error condition that the data source encountered. + # + class ErrorInfo + # + # An unexpected error, such as an uncaught exception, further described by {#message}. + # + UNKNOWN = :unknown + + # + # An I/O error such as a dropped connection. + # + NETWORK_ERROR = :network_error + + # + # The LaunchDarkly service returned an HTTP response with an error status, available with + # {#status_code}. + # + ERROR_RESPONSE = :error_response + + # + # The SDK received malformed data from the LaunchDarkly service. + # + INVALID_DATA = :invalid_data + + # + # The data source itself is working, but when it tried to put an update into the data store, the data + # store failed (so the SDK may not have the latest data). + # + # Data source implementations do not need to report this kind of error; it will be automatically + # reported by the SDK when exceptions are detected. + # + STORE_ERROR = :store_error + + # @return [Symbol] the general category of the error + attr_reader :kind + # @return [Integer] an HTTP status or zero + attr_reader :status_code + # @return [String, nil] message an error message if applicable, or nil + attr_reader :message + # @return [Time] time the error timestamp + attr_reader :time + + def initialize(kind, status_code, message, time) + @kind = kind + @status_code = status_code + @message = message + @time = time + end + end + end end end diff --git a/lib/ldclient-rb/ldclient.rb b/lib/ldclient-rb/ldclient.rb index 2c515680..2afba39c 100644 --- a/lib/ldclient-rb/ldclient.rb +++ b/lib/ldclient-rb/ldclient.rb @@ -1,6 +1,10 @@ require "ldclient-rb/impl/big_segments" +require "ldclient-rb/impl/broadcaster" +require "ldclient-rb/impl/data_source" +require "ldclient-rb/impl/data_store" require "ldclient-rb/impl/diagnostic_events" require "ldclient-rb/impl/evaluator" +require "ldclient-rb/impl/flag_tracker" require "ldclient-rb/impl/store_client_wrapper" require "concurrent/atomics" require "digest/sha1" @@ -45,15 +49,22 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5) @sdk_key = sdk_key + @shared_executor = Concurrent::SingleThreadExecutor.new + + data_store_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, config.logger) + store_sink = LaunchDarkly::Impl::DataStore::UpdateSink.new(data_store_broadcaster) + # We need to wrap the feature store object with a FeatureStoreClientWrapper in order to add # some necessary logic around updates. Unfortunately, we have code elsewhere that accesses # the feature store through the Config object, so we need to make a new Config that uses # the wrapped store. - @store = Impl::FeatureStoreClientWrapper.new(config.feature_store) + @store = Impl::FeatureStoreClientWrapper.new(config.feature_store, store_sink, config.logger) updated_config = config.clone updated_config.instance_variable_set(:@feature_store, @store) @config = updated_config + @data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new(@store, store_sink) + @big_segment_store_manager = Impl::BigSegmentStoreManager.new(config.big_segments, @config.logger) @big_segment_store_status_provider = @big_segment_store_manager.status_provider @@ -79,6 +90,16 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5) return # requestor and update processor are not used in this mode end + flag_tracker_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger) + @flag_tracker = LaunchDarkly::Impl::FlagTracker.new(flag_tracker_broadcaster, lambda { |key, context| variation(key, context, nil) }) + + data_source_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger) + + # Make the update sink available on the config so that our data source factory can access the sink with a shared executor. + @config.data_source_update_sink = LaunchDarkly::Impl::DataSource::UpdateSink.new(@store, data_source_broadcaster, flag_tracker_broadcaster) + + @data_source_status_provider = LaunchDarkly::Impl::DataSource::StatusProvider.new(data_source_broadcaster, @config.data_source_update_sink) + data_source_or_factory = @config.data_source || self.method(:create_default_data_source) if data_source_or_factory.respond_to? :call # Currently, data source factories take two parameters unless they need to be aware of diagnostic_accumulator, in @@ -345,16 +366,53 @@ def close @event_processor.stop @big_segment_store_manager.stop @store.stop + @shared_executor.shutdown end # # Returns an interface for tracking the status of a Big Segment store. # - # The {BigSegmentStoreStatusProvider} has methods for checking whether the Big Segment store + # The {Interfaces::BigSegmentStoreStatusProvider} has methods for checking whether the Big Segment store # is (as far as the SDK knows) currently operational and tracking changes in this status. # attr_reader :big_segment_store_status_provider + # + # Returns an interface for tracking the status of a persistent data store. + # + # The {LaunchDarkly::Interfaces::DataStore::StatusProvider} has methods for + # checking whether the data store is (as far as the SDK knows) currently + # operational, tracking changes in this status, and getting cache + # statistics. These are only relevant for a persistent data store; if you + # are using an in-memory data store, then this method will return a stub + # object that provides no information. + # + # @return [LaunchDarkly::Interfaces::DataStore::StatusProvider] + # + attr_reader :data_store_status_provider + + # + # Returns an interface for tracking the status of the data source. + # + # The data source is the mechanism that the SDK uses to get feature flag + # configurations, such as a streaming connection (the default) or poll + # requests. The {LaunchDarkly::Interfaces::DataSource::StatusProvider} has + # methods for checking whether the data source is (as far as the SDK knows) + # currently operational and tracking changes in this status. + # + # @return [LaunchDarkly::Interfaces::DataSource::StatusProvider] + # + attr_reader :data_source_status_provider + + # + # Returns an interface for tracking changes in feature flag configurations. + # + # The {LaunchDarkly::Interfaces::FlagTracker} contains methods for + # requesting notifications about feature flag changes using an event + # listener model. + # + attr_reader :flag_tracker + private def create_default_data_source(sdk_key, config, diagnostic_accumulator) @@ -403,7 +461,11 @@ def evaluate_internal(key, context, default, with_reasons) end end - feature = @store.get(FEATURES, key) + begin + feature = @store.get(FEATURES, key) + rescue + # Ignored + end if feature.nil? @config.logger.info { "[LDClient] Unknown feature flag \"#{key}\". Returning default value" } diff --git a/lib/ldclient-rb/polling.rb b/lib/ldclient-rb/polling.rb index 89d9f6c9..15dfb746 100644 --- a/lib/ldclient-rb/polling.rb +++ b/lib/ldclient-rb/polling.rb @@ -1,6 +1,7 @@ require "ldclient-rb/impl/repeating_task" require "concurrent/atomics" +require "json" require "thread" module LaunchDarkly @@ -27,30 +28,75 @@ def start end def stop - @task.stop - @config.logger.info { "[LDClient] Polling connection stopped" } + stop_with_error_info end def poll begin all_data = @requestor.request_all_data if all_data - @config.feature_store.init(all_data) + update_sink_or_data_store.init(all_data) if @initialized.make_true @config.logger.info { "[LDClient] Polling connection initialized" } @ready.set end end + @config.data_source_update_sink&.update_status(LaunchDarkly::Interfaces::DataSource::Status::VALID, nil) + rescue JSON::ParserError => e + @config.logger.error { "[LDClient] JSON parsing failed for polling response." } + error_info = LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::INVALID_DATA, + 0, + e.to_s, + Time.now + ) + @config.data_source_update_sink&.update_status(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, error_info) rescue UnexpectedResponseError => e + error_info = LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::ERROR_RESPONSE, e.status, nil, Time.now) message = Util.http_error_message(e.status, "polling request", "will retry") @config.logger.error { "[LDClient] #{message}" } - unless Util.http_error_recoverable?(e.status) + + if Util.http_error_recoverable?(e.status) + @config.data_source_update_sink&.update_status( + LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, + error_info + ) + else @ready.set # if client was waiting on us, make it stop waiting - has no effect if already set - stop + stop_with_error_info error_info end rescue StandardError => e Util.log_exception(@config.logger, "Exception while polling", e) + @config.data_source_update_sink&.update_status( + LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, + LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(LaunchDarkly::Interfaces::DataSource::ErrorInfo::UNKNOWN, 0, e.to_s, Time.now) + ) end end + + # + # The original implementation of this class relied on the feature store + # directly, which we are trying to move away from. Customers who might have + # instantiated this directly for some reason wouldn't know they have to set + # the config's sink manually, so we have to fall back to the store if the + # sink isn't present. + # + # The next major release should be able to simplify this structure and + # remove the need for fall back to the data store because the update sink + # should always be present. + # + private def update_sink_or_data_store + @config.data_source_update_sink || @config.feature_store + end + + # + # @param [LaunchDarkly::Interfaces::DataSource::ErrorInfo, nil] error_info + # + private def stop_with_error_info(error_info = nil) + @task.stop + @config.logger.info { "[LDClient] Polling connection stopped" } + @config.data_source_update_sink&.update_status(LaunchDarkly::Interfaces::DataSource::Status::OFF, error_info) + end end end diff --git a/lib/ldclient-rb/stream.rb b/lib/ldclient-rb/stream.rb index 6d8dd1bb..7bc5311b 100644 --- a/lib/ldclient-rb/stream.rb +++ b/lib/ldclient-rb/stream.rb @@ -25,6 +25,7 @@ class StreamProcessor def initialize(sdk_key, config, diagnostic_accumulator = nil) @sdk_key = sdk_key @config = config + @data_source_update_sink = config.data_source_update_sink @feature_store = config.feature_store @initialized = Concurrent::AtomicBoolean.new(false) @started = Concurrent::AtomicBoolean.new(false) @@ -60,12 +61,31 @@ def start case err when SSE::Errors::HTTPStatusError status = err.status + error_info = LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::ERROR_RESPONSE, status, nil, Time.now) message = Util.http_error_message(status, "streaming connection", "will retry") @config.logger.error { "[LDClient] #{message}" } - unless Util.http_error_recoverable?(status) + + if Util.http_error_recoverable?(status) + @data_source_update_sink&.update_status( + LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, + error_info + ) + else @ready.set # if client was waiting on us, make it stop waiting - has no effect if already set - stop + stop_with_error_info error_info end + when SSE::Errors::HTTPContentTypeError, SSE::Errors::HTTPProxyError, SSE::Errors::ReadTimeoutError + @data_source_update_sink&.update_status( + LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, + LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(LaunchDarkly::Interfaces::DataSource::ErrorInfo::NETWORK_ERROR, 0, err.to_s, Time.now) + ) + + else + @data_source_update_sink&.update_status( + LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, + LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(LaunchDarkly::Interfaces::DataSource::ErrorInfo::UNKNOWN, 0, err.to_s, Time.now) + ) end } end @@ -74,46 +94,86 @@ def start end def stop + stop_with_error_info + end + + private + + # + # @param [LaunchDarkly::Interfaces::DataSource::ErrorInfo, nil] error_info + # + def stop_with_error_info(error_info = nil) if @stopped.make_true @es.close + @data_source_update_sink&.update_status(LaunchDarkly::Interfaces::DataSource::Status::OFF, error_info) @config.logger.info { "[LDClient] Stream connection stopped" } end end - private + # + # The original implementation of this class relied on the feature store + # directly, which we are trying to move away from. Customers who might have + # instantiated this directly for some reason wouldn't know they have to set + # the config's sink manually, so we have to fall back to the store if the + # sink isn't present. + # + # The next major release should be able to simplify this structure and + # remove the need for fall back to the data store because the update sink + # should always be present. + # + def update_sink_or_data_store + @data_source_update_sink || @feature_store + end def process_message(message) log_connection_result(true) method = message.type @config.logger.debug { "[LDClient] Stream received #{method} message: #{message.data}" } - if method == PUT - message = JSON.parse(message.data, symbolize_names: true) - all_data = Impl::Model.make_all_store_data(message[:data], @config.logger) - @feature_store.init(all_data) - @initialized.make_true - @config.logger.info { "[LDClient] Stream initialized" } - @ready.set - elsif method == PATCH - data = JSON.parse(message.data, symbolize_names: true) - for kind in [FEATURES, SEGMENTS] - key = key_for_path(kind, data[:path]) - if key - item = Impl::Model.deserialize(kind, data[:data], @config.logger) - @feature_store.upsert(kind, item) - break + + begin + if method == PUT + message = JSON.parse(message.data, symbolize_names: true) + all_data = Impl::Model.make_all_store_data(message[:data], @config.logger) + update_sink_or_data_store.init(all_data) + @initialized.make_true + @config.logger.info { "[LDClient] Stream initialized" } + @ready.set + elsif method == PATCH + data = JSON.parse(message.data, symbolize_names: true) + for kind in [FEATURES, SEGMENTS] + key = key_for_path(kind, data[:path]) + if key + item = Impl::Model.deserialize(kind, data[:data], @config.logger) + update_sink_or_data_store.upsert(kind, item) + break + end end - end - elsif method == DELETE - data = JSON.parse(message.data, symbolize_names: true) - for kind in [FEATURES, SEGMENTS] - key = key_for_path(kind, data[:path]) - if key - @feature_store.delete(kind, key, data[:version]) - break + elsif method == DELETE + data = JSON.parse(message.data, symbolize_names: true) + for kind in [FEATURES, SEGMENTS] + key = key_for_path(kind, data[:path]) + if key + update_sink_or_data_store.delete(kind, key, data[:version]) + break + end end + else + @config.logger.warn { "[LDClient] Unknown message received: #{method}" } end - else - @config.logger.warn { "[LDClient] Unknown message received: #{method}" } + + @data_source_update_sink&.update_status(LaunchDarkly::Interfaces::DataSource::Status::VALID, nil) + rescue JSON::ParserError => e + @config.logger.error { "[LDClient] JSON parsing failed for method #{method}. Ignoring event." } + error_info = LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::INVALID_DATA, + 0, + e.to_s, + Time.now + ) + @data_source_update_sink&.update_status(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, error_info) + + # Re-raise the exception so the SSE implementation can catch it and restart the stream. + raise end end diff --git a/spec/impl/data_source_spec.rb b/spec/impl/data_source_spec.rb new file mode 100644 index 00000000..98e23468 --- /dev/null +++ b/spec/impl/data_source_spec.rb @@ -0,0 +1,339 @@ +require "spec_helper" + +module LaunchDarkly + module Impl + describe DataSource::UpdateSink do + subject { DataSource::UpdateSink } + let(:store) { InMemoryFeatureStore.new } + let(:executor) { SynchronousExecutor.new } + let(:status_broadcaster) { LaunchDarkly::Impl::Broadcaster.new(executor, $null_log) } + let(:flag_change_broadcaster) { LaunchDarkly::Impl::Broadcaster.new(executor, $null_log) } + let(:sink) { subject.new(store, status_broadcaster, flag_change_broadcaster) } + + it "defaults to initializing" do + expect(sink.current_status.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::INITIALIZING) + expect(sink.current_status.last_error).to be_nil + end + + it "setting status to interrupted while initializing maintains initializing state" do + sink.update_status(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, nil) + expect(sink.current_status.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::INITIALIZING) + expect(sink.current_status.last_error).to be_nil + end + + it "listener is triggered only for state changes" do + listener = ListenerSpy.new + status_broadcaster.add_listener(listener) + + sink.update_status(LaunchDarkly::Interfaces::DataSource::Status::VALID, nil) + sink.update_status(LaunchDarkly::Interfaces::DataSource::Status::VALID, nil) + expect(listener.statuses.count).to eq(1) + + sink.update_status(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, nil) + sink.update_status(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, nil) + expect(listener.statuses.count).to eq(2) + end + + it "all listeners are called for a single change" do + listener1 = ListenerSpy.new + status_broadcaster.add_listener(listener1) + + listener2 = ListenerSpy.new + status_broadcaster.add_listener(listener2) + + sink.update_status(LaunchDarkly::Interfaces::DataSource::Status::VALID, nil) + expect(listener1.statuses.count).to eq(1) + expect(listener2.statuses.count).to eq(1) + end + + describe "simple flag change listener" do + let(:all_data) { + { + LaunchDarkly::FEATURES => { + flag1: LaunchDarkly::Impl::Model::FeatureFlag.new({ key: 'flag1', version: 1 }), + flag2: LaunchDarkly::Impl::Model::FeatureFlag.new({ key: 'flag2', version: 1 }), + flag3: LaunchDarkly::Impl::Model::FeatureFlag.new( + { + key: 'flag3', + version: 1, + variation: 0, + rules: [ + { + clauses: [ + { + contextKind: 'user', + attribute: 'segmentMatch', + op: 'segmentMatch', + values: [ + 'segment2', + ], + negate: false, + }, + ], + }, + ], + } + ), + }, + LaunchDarkly::SEGMENTS => { + segment1: LaunchDarkly::Impl::Model::Segment.new({ key: 'segment1', version: 1 }), + segment2: LaunchDarkly::Impl::Model::Segment.new({ key: 'segment2', version: 1 }), + }, + } + } + + it "is called once per flag changed during init" do + sink.init(all_data) + + listener = ListenerSpy.new + flag_change_broadcaster.add_listener(listener) + + updated_data = { + LaunchDarkly::FEATURES => { + flag1: LaunchDarkly::Impl::Model::FeatureFlag.new({ key: 'flag1', version: 2 }), + flag4: LaunchDarkly::Impl::Model::FeatureFlag.new({ key: 'flag4', version: 1 }), + }, + } + + sink.init(updated_data) + + expect(listener.statuses.count).to eq(4) + expect(listener.statuses[0].key).to eq('flag1') # Version update + expect(listener.statuses[1].key).to eq('flag2') # Deleted + expect(listener.statuses[2].key).to eq('flag3') # Deleted + expect(listener.statuses[3].key).to eq('flag4') # Newly created + end + + it "is called if flag changes through upsert" do + sink.init(all_data) + + listener = ListenerSpy.new + flag_change_broadcaster.add_listener(listener) + + sink.upsert(LaunchDarkly::FEATURES, LaunchDarkly::Impl::Model::FeatureFlag.new({ key: 'flag1', version: 2 })) + # TODO(sc-197908): Once the store starts returning a success status on upsert, the flag change notification + # can start ignoring duplicate requests like this. + # sink.upsert(LaunchDarkly::FEATURES, LaunchDarkly::Impl::Model::FeatureFlag.new({ key: 'flag1', version: 2 })) + + expect(listener.statuses.count).to eq(1) + expect(listener.statuses[0].key).to eq('flag1') + end + + it "is called if flag is deleted" do + sink.init(all_data) + + listener = ListenerSpy.new + flag_change_broadcaster.add_listener(listener) + + sink.delete(LaunchDarkly::FEATURES, "flag1", 2) + # TODO(sc-197908): Once the store starts returning a success status on delete, the flag change notification + # can start ignoring duplicate requests like this. + # sink.delete(LaunchDarkly::FEATURES, :flag1, 2) + + expect(listener.statuses.count).to eq(1) + expect(listener.statuses[0].key).to eq("flag1") + end + + it "is called if the segment is updated" do + sink.init(all_data) + + listener = ListenerSpy.new + flag_change_broadcaster.add_listener(listener) + + sink.upsert(LaunchDarkly::SEGMENTS, LaunchDarkly::Impl::Model::Segment.new({ key: 'segment2', version: 2 })) + # TODO(sc-197908): Once the store starts returning a success status on upsert, the flag change notification + # can start ignoring duplicate requests like this. + # sink.upsert(LaunchDarkly::Impl::Model::Segment.new({ key: 'segment2', version: 2 })) + + expect(listener.statuses.count).to eq(1) + expect(listener.statuses[0].key).to eq('flag3') + end + end + + describe "prerequisite flag change listener" do + let(:all_data) { + { + LaunchDarkly::FEATURES => { + flag1: LaunchDarkly::Impl::Model::FeatureFlag.new({ key: 'flag1', version: 1, prerequisites: [{key: 'flag2', variation: 0}] }), + flag2: LaunchDarkly::Impl::Model::FeatureFlag.new({ key: 'flag2', version: 1, +prerequisites: [{key: 'flag3', variation: 0}, {key: 'flag4', variation: 0}, {key: 'flag6', variation: 0}] }), + flag3: LaunchDarkly::Impl::Model::FeatureFlag.new({ key: 'flag3', version: 1 }), + flag4: LaunchDarkly::Impl::Model::FeatureFlag.new({ key: 'flag4', version: 1 }), + flag5: LaunchDarkly::Impl::Model::FeatureFlag.new({ key: 'flag5', version: 1 }), + flag6: LaunchDarkly::Impl::Model::FeatureFlag.new( + { + key: 'flag6', + version: 1, + variation: 0, + rules: [ + { + clauses: [ + { + contextKind: 'user', + attribute: 'segmentMatch', + op: 'segmentMatch', + values: [ + 'segment2', + ], + negate: false, + }, + ], + }, + ], + } + ), + }, + LaunchDarkly::SEGMENTS => { + segment1: LaunchDarkly::Impl::Model::Segment.new({ key: 'segment1', version: 1 }), + segment2: LaunchDarkly::Impl::Model::Segment.new( + { + key: 'segment2', + version: 1, + rules: [ + { + clauses: [ + { + contextKind: 'user', + attribute: 'segmentMatch', + op: 'segmentMatch', + values: [ + 'segment1', + ], + negate: false, + }, + ], + rolloutContextKind: 'user', + }, + ], + + } + ), + }, + } + } + + it "triggers for entire dependency stack if top of chain is changed" do + sink.init(all_data) + + listener = ListenerSpy.new + flag_change_broadcaster.add_listener(listener) + + sink.upsert(LaunchDarkly::FEATURES, LaunchDarkly::Impl::Model::FeatureFlag.new({ key: 'flag4', version: 2 })) + expect(listener.statuses.count).to eq(3) + expect(listener.statuses[0].key).to eq('flag4') + expect(listener.statuses[1].key).to eq('flag2') + expect(listener.statuses[2].key).to eq('flag1') + end + + it "triggers when new pre-requisites are added" do + sink.init(all_data) + + listener = ListenerSpy.new + flag_change_broadcaster.add_listener(listener) + + sink.upsert(LaunchDarkly::FEATURES, LaunchDarkly::Impl::Model::FeatureFlag.new({ key: 'flag3', version: 2, prerequisities: [{key: 'flag4', variation: 0}] })) + expect(listener.statuses.count).to eq(3) + expect(listener.statuses[0].key).to eq('flag3') + expect(listener.statuses[1].key).to eq('flag2') + expect(listener.statuses[2].key).to eq('flag1') + end + + it "triggers when new pre-requisites are removed" do + sink.init(all_data) + + listener = ListenerSpy.new + flag_change_broadcaster.add_listener(listener) + + sink.upsert(LaunchDarkly::FEATURES, LaunchDarkly::Impl::Model::FeatureFlag.new({ key: 'flag2', version: 2, prerequisities: [{key: 'flag3', variation: 0}] })) + expect(listener.statuses.count).to eq(2) + expect(listener.statuses[0].key).to eq('flag2') + expect(listener.statuses[1].key).to eq('flag1') + end + + it "triggers for entire dependency stack if top of chain is deleted" do + sink.init(all_data) + + listener = ListenerSpy.new + flag_change_broadcaster.add_listener(listener) + + sink.delete(LaunchDarkly::FEATURES, "flag4", 2) + expect(listener.statuses.count).to eq(3) + expect(listener.statuses[0].key).to eq('flag4') + expect(listener.statuses[1].key).to eq('flag2') + expect(listener.statuses[2].key).to eq('flag1') + end + + it "triggers if dependent segment is modified" do + sink.init(all_data) + + listener = ListenerSpy.new + flag_change_broadcaster.add_listener(listener) + + sink.upsert(LaunchDarkly::SEGMENTS, LaunchDarkly::Impl::Model::Segment.new({ key: 'segment1', version: 2 })) + # TODO(sc-197908): Once the store starts returning a success status on upsert, the flag change notification + # can start ignoring duplicate requests like this. + # sink.upsert(LaunchDarkly::SEGMENTS, LaunchDarkly::Impl::Model::Segment.new({ key: 'segment1', version: 2 })) + + expect(listener.statuses.count).to eq(3) + expect(listener.statuses[0].key).to eq('flag6') + expect(listener.statuses[1].key).to eq('flag2') + expect(listener.statuses[2].key).to eq('flag1') + end + + it "triggers if dependent segment rule is removed" do + sink.init(all_data) + + listener = ListenerSpy.new + flag_change_broadcaster.add_listener(listener) + + sink.delete(LaunchDarkly::SEGMENTS, 'segment2', 2) + # TODO(sc-197908): Once the store starts returning a success status on upsert, the flag change notification + # can start ignoring duplicate requests like this. + # sink.delete(LaunchDarkly::SEGMENTS, 'segment2', 2) + + expect(listener.statuses.count).to eq(3) + expect(listener.statuses[0].key).to eq('flag6') + expect(listener.statuses[1].key).to eq('flag2') + expect(listener.statuses[2].key).to eq('flag1') + end + end + + describe "listeners are triggered for store errors" do + def confirm_store_error(error_type) + # Make it valid first so the error changes from initializing + sink.update_status(LaunchDarkly::Interfaces::DataSource::Status::VALID, nil) + + listener = ListenerSpy.new + status_broadcaster.add_listener(listener) + + allow(store).to receive(:init).and_raise(StandardError.new("init error")) + allow(store).to receive(:upsert).and_raise(StandardError.new("upsert error")) + allow(store).to receive(:delete).and_raise(StandardError.new("delete error")) + + begin + yield + rescue + # ignored + end + + expect(listener.statuses.count).to eq(1) + expect(listener.statuses[0].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED) + expect(listener.statuses[0].last_error.kind).to eq(LaunchDarkly::Interfaces::DataSource::ErrorInfo::STORE_ERROR) + expect(listener.statuses[0].last_error.message).to eq("#{error_type} error") + end + + it "when calling init" do + confirm_store_error("init") { sink.init({}) } + end + + it "when calling upsert" do + confirm_store_error("upsert") { sink.upsert("flag", nil) } + end + + it "when calling delete" do + confirm_store_error("delete") { sink.delete("flag", "flag-key", 1) } + end + end + end + end +end diff --git a/spec/impl/flag_tracker_spec.rb b/spec/impl/flag_tracker_spec.rb new file mode 100644 index 00000000..61ed5191 --- /dev/null +++ b/spec/impl/flag_tracker_spec.rb @@ -0,0 +1,77 @@ +require "spec_helper" +require "ldclient-rb/impl/flag_tracker" + +describe LaunchDarkly::Impl::FlagTracker do + subject { LaunchDarkly::Impl::FlagTracker } + let(:executor) { SynchronousExecutor.new } + let(:broadcaster) { LaunchDarkly::Impl::Broadcaster.new(executor, $null_log) } + + it "can add and remove listeners as expected" do + listener = ListenerSpy.new + + tracker = subject.new(broadcaster, Proc.new {}) + tracker.add_listener(listener) + + broadcaster.broadcast(LaunchDarkly::Interfaces::FlagChange.new(:flag1)) + broadcaster.broadcast(LaunchDarkly::Interfaces::FlagChange.new(:flag2)) + + tracker.remove_listener(listener) + + broadcaster.broadcast(LaunchDarkly::Interfaces::FlagChange.new(:flag3)) + + expect(listener.statuses.count).to eq(2) + expect(listener.statuses[0].key).to eq(:flag1) + expect(listener.statuses[1].key).to eq(:flag2) + end + + describe "flag change listener" do + it "listener is notified when value changes" do + responses = [:initial, :second, :second, :final] + eval_fn = Proc.new { responses.shift } + tracker = subject.new(broadcaster, eval_fn) + + listener = ListenerSpy.new + tracker.add_flag_value_change_listener(:flag1, nil, listener) + expect(listener.statuses.count).to eq(0) + + broadcaster.broadcast(LaunchDarkly::Interfaces::FlagChange.new(:flag1)) + expect(listener.statuses.count).to eq(1) + + # No change was returned here (:second -> :second), so expect no change + broadcaster.broadcast(LaunchDarkly::Interfaces::FlagChange.new(:flag1)) + expect(listener.statuses.count).to eq(1) + + broadcaster.broadcast(LaunchDarkly::Interfaces::FlagChange.new(:flag1)) + expect(listener.statuses.count).to eq(2) + + expect(listener.statuses[0].key).to eq(:flag1) + expect(listener.statuses[0].old_value).to eq(:initial) + expect(listener.statuses[0].new_value).to eq(:second) + + expect(listener.statuses[1].key).to eq(:flag1) + expect(listener.statuses[1].old_value).to eq(:second) + expect(listener.statuses[1].new_value).to eq(:final) + end + + it "returns a listener which we can unregister" do + responses = [:initial, :second, :third] + eval_fn = Proc.new { responses.shift } + tracker = subject.new(broadcaster, eval_fn) + + listener = ListenerSpy.new + created_listener = tracker.add_flag_value_change_listener(:flag1, nil, listener) + expect(listener.statuses.count).to eq(0) + + broadcaster.broadcast(LaunchDarkly::Interfaces::FlagChange.new(:flag1)) + expect(listener.statuses.count).to eq(1) + + tracker.remove_listener(created_listener) + broadcaster.broadcast(LaunchDarkly::Interfaces::FlagChange.new(:flag1)) + expect(listener.statuses.count).to eq(1) + + expect(listener.statuses[0].old_value).to eq(:initial) + expect(listener.statuses[0].new_value).to eq(:second) + end + end +end + diff --git a/spec/impl/store_client_wrapper_spec.rb b/spec/impl/store_client_wrapper_spec.rb new file mode 100644 index 00000000..b0086992 --- /dev/null +++ b/spec/impl/store_client_wrapper_spec.rb @@ -0,0 +1,84 @@ +require "ldclient-rb/impl/store_client_wrapper" +require "spec_helper" + +module LaunchDarkly + module Impl + describe FeatureStoreClientWrapper do + describe "store listener" do + it "will not notify sink if wrapped store does not support monitoring" do + store = double + sink = double + + allow(store).to receive(:stop) + allow(store).to receive(:monitoring_enabled?).and_return(false) + allow(store).to receive(:init).and_raise(StandardError.new('init error')) + + ensure_stop(FeatureStoreClientWrapper.new(store, sink, $null_log)) do |wrapper| + begin + wrapper.init({}) + raise "init should have raised exception" + rescue StandardError + # Ignored + end + + expect(sink).not_to receive(:update_status) + end + end + + it "will not notify sink if wrapped store cannot come back online" do + store = double + sink = double + + allow(store).to receive(:stop) + allow(store).to receive(:monitoring_enabled?).and_return(true) + allow(store).to receive(:init).and_raise(StandardError.new('init error')) + + ensure_stop(FeatureStoreClientWrapper.new(store, sink, $null_log)) do |wrapper| + begin + wrapper.init({}) + raise "init should have raised exception" + rescue StandardError + # Ignored + end + + expect(sink).not_to receive(:update_status) + end + end + + it "sink will be notified when store is back online" do + event = Concurrent::Event.new + statuses = [] + listener = CallbackListener.new(->(status) { + statuses << status + event.set if status.available? + }) + + broadcaster = Broadcaster.new(SynchronousExecutor.new, $null_log) + broadcaster.add_listener(listener) + sink = DataStore::UpdateSink.new(broadcaster) + store = double + + allow(store).to receive(:stop) + allow(store).to receive(:monitoring_enabled?).and_return(true) + allow(store).to receive(:available?).and_return(false, true) + allow(store).to receive(:init).and_raise(StandardError.new('init error')) + + ensure_stop(FeatureStoreClientWrapper.new(store, sink, $null_log)) do |wrapper| + begin + wrapper.init({}) + raise "init should have raised exception" + rescue StandardError + # Ignored + end + + event.wait(2) + + expect(statuses.count).to eq(2) + expect(statuses[0].available).to be false + expect(statuses[1].available).to be true + end + end + end + end + end +end diff --git a/spec/in_memory_feature_store_spec.rb b/spec/in_memory_feature_store_spec.rb index 1d56078f..2178b22b 100644 --- a/spec/in_memory_feature_store_spec.rb +++ b/spec/in_memory_feature_store_spec.rb @@ -11,4 +11,10 @@ def create_feature_store subject { LaunchDarkly::InMemoryFeatureStore } include_examples "any_feature_store", InMemoryStoreTester.new + + it "does not provide status monitoring support" do + store = subject.new + + expect(store.monitoring_enabled?).to be false + end end diff --git a/spec/integrations/consul_feature_store_spec.rb b/spec/integrations/consul_feature_store_spec.rb index 356f1679..cd3bc983 100644 --- a/spec/integrations/consul_feature_store_spec.rb +++ b/spec/integrations/consul_feature_store_spec.rb @@ -3,7 +3,7 @@ require "spec_helper" # These tests will all fail if there isn't a local Consul instance running. -# They can be disabled with LD_SKIP_DATABASE_TESTS=1 +# They can be enabled with LD_SKIP_DATABASE_TESTS=0 $consul_base_opts = { prefix: $my_prefix, @@ -27,9 +27,37 @@ def create_feature_store describe "Consul feature store" do - break if ENV['LD_SKIP_DATABASE_TESTS'] == '1' + break unless ENV['LD_SKIP_DATABASE_TESTS'] == '0' + + before do + Diplomat.configuration = Diplomat::Configuration.new + end include_examples "persistent_feature_store", ConsulStoreTester + + it "should have monitoring enabled and defaults to available" do + tester = ConsulStoreTester.new({ logger: $null_logger }) + + ensure_stop(tester.create_feature_store) do |store| + expect(store.monitoring_enabled?).to be true + expect(store.available?).to be true + end + end + + it "can detect that a non-existent store is not available" do + Diplomat.configure do |config| + config.url = 'http://i-mean-what-are-the-odds:13579' + config.options[:request] ||= {} + # Short timeout so we don't delay the tests too long + config.options[:request][:timeout] = 0.1 + end + tester = ConsulStoreTester.new({ consul_config: Diplomat.configuration }) + + ensure_stop(tester.create_feature_store) do |store| + expect(store.available?).to be false + end + end + end # There isn't a Big Segments integration for Consul. diff --git a/spec/integrations/dynamodb_stores_spec.rb b/spec/integrations/dynamodb_stores_spec.rb index f12fa63a..68232285 100644 --- a/spec/integrations/dynamodb_stores_spec.rb +++ b/spec/integrations/dynamodb_stores_spec.rb @@ -4,7 +4,7 @@ require "spec_helper" # These tests will all fail if there isn't a local DynamoDB instance running. -# They can be disabled with LD_SKIP_DATABASE_TESTS=1 +# They can be enabled with LD_SKIP_DATABASE_TESTS=0 $DynamoDBBigSegmentStore = LaunchDarkly::Impl::Integrations::DynamoDB::DynamoDBBigSegmentStore @@ -23,7 +23,7 @@ class DynamoDBStoreTester def initialize(options = {}) @options = options.clone - @options[:dynamodb_opts] = DYNAMODB_OPTS + @options[:dynamodb_opts] = DYNAMODB_OPTS unless @options.key? :dynamodb_opts @actual_prefix = options[:prefix] ? "#{options[:prefix]}:" : "" end @@ -134,15 +134,38 @@ def set_big_segments(context_hash, includes, excludes) describe "DynamoDB feature store" do - break if ENV['LD_SKIP_DATABASE_TESTS'] == '1' + break unless ENV['LD_SKIP_DATABASE_TESTS'] == '0' DynamoDBStoreTester.create_table_if_necessary include_examples "persistent_feature_store", DynamoDBStoreTester + + it "should have monitoring enabled and defaults to available" do + tester = DynamoDBStoreTester.new({ logger: $null_logger }) + + ensure_stop(tester.create_feature_store) do |store| + expect(store.monitoring_enabled?).to be true + expect(store.available?).to be true + end + end + + it "can detect that a non-existent store is not available" do + options = DynamoDBStoreTester::DYNAMODB_OPTS.clone + options[:endpoint] = 'http://i-mean-what-are-the-odds:13579' + options[:retry_limit] = 0 + options[:http_open_timeout] = 0.1 + + # Short timeout so we don't delay the tests too long + tester = DynamoDBStoreTester.new({ dynamodb_opts: options, logger: $null_logger }) + + ensure_stop(tester.create_feature_store) do |store| + expect(store.available?).to be false + end + end end describe "DynamoDB big segment store" do - break if ENV['LD_SKIP_DATABASE_TESTS'] == '1' + break unless ENV['LD_SKIP_DATABASE_TESTS'] == '0' DynamoDBStoreTester.create_table_if_necessary diff --git a/spec/integrations/file_data_source_spec.rb b/spec/integrations/file_data_source_spec.rb index ce756fb6..976d362b 100644 --- a/spec/integrations/file_data_source_spec.rb +++ b/spec/integrations/file_data_source_spec.rb @@ -18,6 +18,7 @@ def []=(key, value) let(:full_segment_1_key) { "seg1" } let(:all_segment_keys) { [ full_segment_1_key.to_sym ] } + let(:invalid_json) { "My invalid JSON" } let(:flag_only_json) { <<-EOF { "flags": { @@ -97,6 +98,12 @@ def []=(key, value) before do @config = LaunchDarkly::Config.new(logger: $null_log) @store = @config.feature_store + + @executor = SynchronousExecutor.new + @status_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@executor, $null_log) + @flag_change_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@executor, $null_log) + @config.data_source_update_sink = LaunchDarkly::Impl::DataSource::UpdateSink.new(@store, @status_broadcaster, @flag_change_broadcaster) + @tmp_dir = Dir.mktmpdir end @@ -113,9 +120,19 @@ def make_temp_file(content) file end - def with_data_source(options) + def with_data_source(options, initialize_to_valid = false) factory = LaunchDarkly::Integrations::FileData.data_source(options) + + if initialize_to_valid + # If the update sink receives an interrupted signal when the state is + # still initializing, it will continue staying in the initializing phase. + # Therefore, we set the state to valid before this test so we can + # determine if the interrupted signal is actually generated. + @config.data_source_update_sink.update_status(LaunchDarkly::Interfaces::DataSource::Status::VALID, nil) + end + ds = factory.call('', @config) + begin yield ds ensure @@ -135,10 +152,16 @@ def with_data_source(options) it "loads flags on start - from JSON" do file = make_temp_file(all_properties_json) with_data_source({ paths: [ file.path ] }) do |ds| + listener = ListenerSpy.new + @status_broadcaster.add_listener(listener) + ds.start expect(@store.initialized?).to eq(true) expect(@store.all(LaunchDarkly::FEATURES).keys).to eq(all_flag_keys) expect(@store.all(LaunchDarkly::SEGMENTS).keys).to eq(all_segment_keys) + + expect(listener.statuses.count).to eq(1) + expect(listener.statuses[0].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) end end @@ -194,6 +217,20 @@ def with_data_source(options) end end + it "file loading failure results in interrupted status" do + file1 = make_temp_file(flag_only_json) + file2 = make_temp_file(invalid_json) + with_data_source({ paths: [ file1.path, file2.path ] }, true) do |ds| + listener = ListenerSpy.new + @status_broadcaster.add_listener(listener) + + ds.start + expect(@store.initialized?).to eq(false) + expect(listener.statuses.count).to eq(1) + expect(listener.statuses[0].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED) + end + end + it "does not allow duplicate keys" do file1 = make_temp_file(flag_only_json) file2 = make_temp_file(flag_only_json) diff --git a/spec/integrations/redis_stores_spec.rb b/spec/integrations/redis_stores_spec.rb index d5c503bc..1b87ee4e 100644 --- a/spec/integrations/redis_stores_spec.rb +++ b/spec/integrations/redis_stores_spec.rb @@ -7,7 +7,7 @@ require "redis" # These tests will all fail if there isn't a local Redis instance running. -# They can be disabled with LD_SKIP_DATABASE_TESTS=1 +# They can be enabled with LD_SKIP_DATABASE_TESTS=0 $RedisBigSegmentStore = LaunchDarkly::Impl::Integrations::Redis::RedisBigSegmentStore @@ -60,7 +60,7 @@ def set_big_segments(context_hash, includes, excludes) describe "Redis feature store" do - break if ENV['LD_SKIP_DATABASE_TESTS'] == '1' + break unless ENV['LD_SKIP_DATABASE_TESTS'] == '0' include_examples "persistent_feature_store", RedisStoreTester @@ -80,6 +80,24 @@ def make_concurrent_modifier_test_hook(other_client, flag, start_version, end_ve tester = RedisStoreTester.new({ logger: $null_logger }) + it "should have monitoring enabled and defaults to available" do + tester = RedisStoreTester.new({ logger: $null_logger }) + + ensure_stop(tester.create_feature_store) do |store| + expect(store.monitoring_enabled?).to be true + expect(store.available?).to be true + end + end + + it "can detect that a non-existent store is not available" do + # Short timeout so we don't delay the tests too long + tester = RedisStoreTester.new({ redis_opts: { url: "redis://i-mean-what-are-the-odds:13579", timeout: 0.1 }, logger: $null_logger }) + + ensure_stop(tester.create_feature_store) do |store| + expect(store.available?).to be false + end + end + it "handles upsert race condition against external client with lower version" do with_redis_test_client do |other_client| flag = { key: "foo", version: 1 } @@ -146,7 +164,7 @@ def make_concurrent_modifier_test_hook(other_client, flag, start_version, end_ve end describe "Redis big segment store" do - break if ENV['LD_SKIP_DATABASE_TESTS'] == '1' + break unless ENV['LD_SKIP_DATABASE_TESTS'] == '0' include_examples "big_segment_store", RedisStoreTester end diff --git a/spec/integrations/store_wrapper_spec.rb b/spec/integrations/store_wrapper_spec.rb index 58def5a8..552fe5a4 100644 --- a/spec/integrations/store_wrapper_spec.rb +++ b/spec/integrations/store_wrapper_spec.rb @@ -5,6 +5,25 @@ THINGS = { namespace: "things" } + it "monitoring enabled if available is defined" do + [true, false].each do |expected| + core = double + allow(core).to receive(:available?).and_return(expected) + wrapper = subject.new(core, {}) + + expect(wrapper.monitoring_enabled?).to be true + expect(wrapper.available?).to be expected + end + end + + it "available is false if core doesn't support monitoring" do + core = double + wrapper = subject.new(core, {}) + + expect(wrapper.monitoring_enabled?).to be false + expect(wrapper.available?).to be false + end + shared_examples "tests" do |cached| opts = cached ? { expiration: 30 } : { expiration: 0 } diff --git a/spec/polling_spec.rb b/spec/polling_spec.rb index c8f801c2..8e5c1d48 100644 --- a/spec/polling_spec.rb +++ b/spec/polling_spec.rb @@ -1,12 +1,27 @@ -require "spec_helper" +require "ldclient-rb/impl/model/feature_flag" +require "ldclient-rb/impl/model/segment" require 'ostruct' +require "spec_helper" describe LaunchDarkly::PollingProcessor do subject { LaunchDarkly::PollingProcessor } + let(:executor) { SynchronousExecutor.new } + let(:status_broadcaster) { LaunchDarkly::Impl::Broadcaster.new(executor, $null_log) } + let(:flag_change_broadcaster) { LaunchDarkly::Impl::Broadcaster.new(executor, $null_log) } let(:requestor) { double() } - def with_processor(store) + def with_processor(store, initialize_to_valid = false) config = LaunchDarkly::Config.new(feature_store: store, logger: $null_log) + config.data_source_update_sink = LaunchDarkly::Impl::DataSource::UpdateSink.new(store, status_broadcaster, flag_change_broadcaster) + + if initialize_to_valid + # If the update sink receives an interrupted signal when the state is + # still initializing, it will continue staying in the initializing phase. + # Therefore, we set the state to valid before this test so we can + # determine if the interrupted signal is actually generated. + config.data_source_update_sink.update_status(LaunchDarkly::Interfaces::DataSource::Status::VALID, nil) + end + processor = subject.new(config, requestor) begin yield processor @@ -16,8 +31,8 @@ def with_processor(store) end describe 'successful request' do - flag = { key: 'flagkey', version: 1 } - segment = { key: 'segkey', version: 1 } + flag = LaunchDarkly::Impl::Model::FeatureFlag.new({ key: 'flagkey', version: 1 }) + segment = LaunchDarkly::Impl::Model::Segment.new({ key: 'segkey', version: 1 }) all_data = { LaunchDarkly::FEATURES => { flagkey: flag, @@ -48,6 +63,23 @@ def with_processor(store) expect(store.initialized?).to be true end end + + it 'status is set to valid when data is received' do + allow(requestor).to receive(:request_all_data).and_return(all_data) + listener = ListenerSpy.new + status_broadcaster.add_listener(listener) + + store = LaunchDarkly::InMemoryFeatureStore.new + with_processor(store) do |processor| + ready = processor.start + ready.wait + expect(store.get(LaunchDarkly::FEATURES, "flagkey")).to eq(flag) + expect(store.get(LaunchDarkly::SEGMENTS, "segkey")).to eq(segment) + + expect(listener.statuses.count).to eq(1) + expect(listener.statuses[0].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + end + end end describe 'connection error' do @@ -56,7 +88,7 @@ def with_processor(store) store = LaunchDarkly::InMemoryFeatureStore.new with_processor(store) do |processor| ready = processor.start - finished = ready.wait(0.2) + finished = ready.wait(1) expect(finished).to be false expect(processor.initialized?).to be false expect(store.initialized?).to be false @@ -67,21 +99,39 @@ def with_processor(store) describe 'HTTP errors' do def verify_unrecoverable_http_error(status) allow(requestor).to receive(:request_all_data).and_raise(LaunchDarkly::UnexpectedResponseError.new(status)) + listener = ListenerSpy.new + status_broadcaster.add_listener(listener) + with_processor(LaunchDarkly::InMemoryFeatureStore.new) do |processor| ready = processor.start - finished = ready.wait(0.2) + finished = ready.wait(1) expect(finished).to be true expect(processor.initialized?).to be false + + expect(listener.statuses.count).to eq(1) + + s = listener.statuses[0] + expect(s.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::OFF) + expect(s.last_error.status_code).to eq(status) end end def verify_recoverable_http_error(status) allow(requestor).to receive(:request_all_data).and_raise(LaunchDarkly::UnexpectedResponseError.new(status)) - with_processor(LaunchDarkly::InMemoryFeatureStore.new) do |processor| + listener = ListenerSpy.new + status_broadcaster.add_listener(listener) + + with_processor(LaunchDarkly::InMemoryFeatureStore.new, true) do |processor| ready = processor.start - finished = ready.wait(0.2) + finished = ready.wait(1) expect(finished).to be false expect(processor.initialized?).to be false + + expect(listener.statuses.count).to eq(2) + + s = listener.statuses[1] + expect(s.state).to eq(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED) + expect(s.last_error.status_code).to eq(status) end end @@ -108,12 +158,18 @@ def verify_recoverable_http_error(status) describe 'stop' do it 'stops promptly rather than continuing to wait for poll interval' do + listener = ListenerSpy.new + status_broadcaster.add_listener(listener) + with_processor(LaunchDarkly::InMemoryFeatureStore.new) do |processor| sleep(1) # somewhat arbitrary, but should ensure that it has started polling start_time = Time.now processor.stop end_time = Time.now expect(end_time - start_time).to be <(LaunchDarkly::Config.default_poll_interval - 5) + + expect(listener.statuses.count).to eq(1) + expect(listener.statuses[0].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::OFF) end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 9b87bc33..83f40de9 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,3 +1,5 @@ +require "simplecov" if ENV['LD_ENABLE_CODE_COVERAGE'] == '1' + require "ldclient-rb" $null_log = ::Logger.new($stdout) @@ -19,6 +21,35 @@ def ensure_stop(thing) end end +class SynchronousExecutor + def post + yield + end +end + +class CallbackListener + def initialize(callable) + @callable = callable + end + + def update(status) + @callable.call(status) + end +end + +class ListenerSpy + attr_reader :statuses + + def initialize + @statuses = [] + end + + def update(status) + @statuses << status + end +end + + RSpec.configure do |config| config.expect_with :rspec do |expectations| expectations.max_formatted_output_length = 1000 # otherwise rspec tends to abbreviate our failure output and make it unreadable diff --git a/spec/stream_spec.rb b/spec/stream_spec.rb index cb89830a..7016ee77 100644 --- a/spec/stream_spec.rb +++ b/spec/stream_spec.rb @@ -4,7 +4,15 @@ describe LaunchDarkly::StreamProcessor do subject { LaunchDarkly::StreamProcessor } - let(:config) { LaunchDarkly::Config.new } + let(:executor) { SynchronousExecutor.new } + let(:status_broadcaster) { LaunchDarkly::Impl::Broadcaster.new(executor, $null_log) } + let(:flag_change_broadcaster) { LaunchDarkly::Impl::Broadcaster.new(executor, $null_log) } + let(:config) { + config = LaunchDarkly::Config.new + config.data_source_update_sink = LaunchDarkly::Impl::DataSource::UpdateSink.new(config.feature_store, status_broadcaster, flag_change_broadcaster) + config.data_source_update_sink.update_status(LaunchDarkly::Interfaces::DataSource::Status::VALID, nil) + config + } let(:processor) { subject.new("sdk_key", config) } describe '#process_message' do @@ -13,6 +21,7 @@ let(:patch_seg_message) { SSE::StreamEvent.new(:patch, '{"path": "/segments/key", "data": {"key": "asdf", "version": 1}}') } let(:delete_flag_message) { SSE::StreamEvent.new(:delete, '{"path": "/flags/key", "version": 2}') } let(:delete_seg_message) { SSE::StreamEvent.new(:delete, '{"path": "/segments/key", "version": 2}') } + let(:invalid_message) { SSE::StreamEvent.new(:put, '{Hi there}') } it "will accept PUT methods" do processor.send(:process_message, put_message) @@ -41,6 +50,18 @@ expect(processor.instance_variable_get(:@config).logger).to receive :warn processor.send(:process_message, SSE::StreamEvent.new(type: :get, data: "", id: nil)) end + it "status listener will trigger error when JSON is invalid" do + listener = ListenerSpy.new + status_broadcaster.add_listener(listener) + + begin + processor.send(:process_message, invalid_message) + rescue + end + + expect(listener.statuses.count).to eq(2) + expect(listener.statuses[1].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED) + expect(listener.statuses[1].last_error.kind).to eq(LaunchDarkly::Interfaces::DataSource::ErrorInfo::INVALID_DATA) + end end end -