Skip to content

Commit

Permalink
Merge pull request #479 from splitio/Feature/iff
Browse files Browse the repository at this point in the history
Instant Feature flag implementation
  • Loading branch information
sanzmauro committed Jul 18, 2023
2 parents cb7cb8d + 84d18aa commit 7d21bb3
Show file tree
Hide file tree
Showing 30 changed files with 404 additions and 74 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
matrix:
version:
- '2.5.0'
- '3.1.1'
- '3.2.2'

steps:
- name: Checkout code
Expand Down Expand Up @@ -56,7 +56,7 @@ jobs:
run: echo "VERSION=$(cat lib/splitclient-rb/version.rb | grep VERSION | awk -F "'" '{print $2}')" >> $GITHUB_ENV

- name: SonarQube Scan (Push)
if: matrix.version == '3.1.1' && github.event_name == 'push'
if: matrix.version == '3.2.2' && github.event_name == 'push'
uses: SonarSource/sonarcloud-github-action@v1.9
env:
SONAR_TOKEN: ${{ secrets.SONARQUBE_TOKEN }}
Expand All @@ -68,7 +68,7 @@ jobs:
-Dsonar.projectVersion=${{ env.VERSION }}
- name: SonarQube Scan (Pull Request)
if: matrix.version == '3.1.1' && github.event_name == 'pull_request'
if: matrix.version == '3.2.2' && github.event_name == 'pull_request'
uses: SonarSource/sonarcloud-github-action@v1.9
env:
SONAR_TOKEN: ${{ secrets.SONARQUBE_TOKEN }}
Expand Down
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
CHANGES

8.2.0 (Jul 18, 2023)
- Improved streaming architecture implementation to apply feature flag updates from the notification received which is now enhanced, improving efficiency and reliability of the whole update system.

8.1.2 (May 15, 2023)
- Updated terminology on the SDKs codebase to be more aligned with current standard without causing a breaking change. The core change is the term split for feature flag on things like logs and IntelliSense comments.

Expand Down
2 changes: 2 additions & 0 deletions lib/splitclient-rb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
require 'splitclient-rb/clients/split_client'
require 'splitclient-rb/managers/split_manager'
require 'splitclient-rb/helpers/thread_helper'
require 'splitclient-rb/helpers/decryption_helper'
require 'splitclient-rb/helpers/util'
require 'splitclient-rb/split_factory'
require 'splitclient-rb/split_factory_builder'
require 'splitclient-rb/split_config'
Expand Down
12 changes: 1 addition & 11 deletions lib/splitclient-rb/engine/api/splits.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,11 @@ def splits_with_segment_names(splits_json)

parsed_splits[:segment_names] =
parsed_splits[:splits].each_with_object(Set.new) do |split, splits|
splits << segment_names(split)
splits << Helpers::Util.segment_names_by_feature_flag(split)
end.flatten

parsed_splits
end

def segment_names(split)
split[:conditions].each_with_object(Set.new) do |condition, names|
condition[:matcherGroup][:matchers].each do |matcher|
next if matcher[:userDefinedSegmentMatcherData].nil?

names << matcher[:userDefinedSegmentMatcherData][:segmentName]
end
end
end
end
end
end
1 change: 1 addition & 0 deletions lib/splitclient-rb/engine/push_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def start_sse

def stop_sse
@sse_handler.stop
SplitIoClient::Helpers::ThreadHelper.stop(:schedule_next_token_refresh, @config)
rescue StandardError => e
@config.logger.error(e.inspect)
end
Expand Down
25 changes: 25 additions & 0 deletions lib/splitclient-rb/helpers/decryption_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

module SplitIoClient
NO_COMPRESSION = 0
GZIP_COMPRESSION = 1
ZLIB_COMPRESSION = 2

module Helpers
class DecryptionHelper
def self.get_encoded_definition(compression, data)
case compression
when NO_COMPRESSION
Base64.decode64(data)
when GZIP_COMPRESSION
gz = Zlib::GzipReader.new(StringIO.new(Base64.decode64(data)))
gz.read
when ZLIB_COMPRESSION
Zlib::Inflate.inflate(Base64.decode64(data))
else
raise StandardError, 'Compression flag value is incorrect'
end
end
end
end
end
17 changes: 17 additions & 0 deletions lib/splitclient-rb/helpers/util.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

module SplitIoClient
module Helpers
class Util
def self.segment_names_by_feature_flag(feature_flag)
feature_flag[:conditions].each_with_object(Set.new) do |condition, names|
condition[:matcherGroup][:matchers].each do |matcher|
next if matcher[:userDefinedSegmentMatcherData].nil?

names << matcher[:userDefinedSegmentMatcherData][:segmentName]
end
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/splitclient-rb/split_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def init_impressions_mode(impressions_mode, adapter)
return :debug
else
default = adapter == :redis ? :debug : :optimized
@logger.error("You passed an invalid impressions_mode, impressions_mode should be one of the following values: :debug, :optimized or :none. Defaulting to #{default} mode")
@logger.error("You passed an invalid impressions_mode, impressions_mode should be one of the following values: :debug, :optimized or :none. Defaulting to #{default} mode") unless impressions_mode.nil?
return default
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/splitclient-rb/split_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def build_synchronizer

def build_streaming_components
@push_status_queue = Queue.new
splits_worker = SSE::Workers::SplitsWorker.new(@synchronizer, @config, @splits_repository)
splits_worker = SSE::Workers::SplitsWorker.new(@synchronizer, @config, @splits_repository, @runtime_producer, @segment_fetcher)
segments_worker = SSE::Workers::SegmentsWorker.new(@synchronizer, @config, @segments_repository)
notification_manager_keeper = SSE::NotificationManagerKeeper.new(@config, @runtime_producer, @push_status_queue)
notification_processor = SSE::NotificationProcessor.new(@config, splits_worker, segments_worker)
Expand Down
2 changes: 1 addition & 1 deletion lib/splitclient-rb/sse/event_source/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def connect_stream(latch)

raise 'eof exception' if partial_data == :eof
rescue Errno::EBADF, IOError => e
@config.logger.error(e.inspect)
@config.logger.error(e.inspect) if @config.debug_enabled
return nil
rescue StandardError => e
return nil if ENV['SPLITCLIENT_ENV'] == 'test'
Expand Down
8 changes: 2 additions & 6 deletions lib/splitclient-rb/sse/notification_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,12 @@ def process(incoming_notification)

def process_split_update(notification)
@config.logger.debug("SPLIT UPDATE notification received: #{notification}") if @config.debug_enabled
@splits_worker.add_to_queue(notification.data['changeNumber'])
@splits_worker.add_to_queue(notification)
end

def process_split_kill(notification)
@config.logger.debug("SPLIT KILL notification received: #{notification}") if @config.debug_enabled
change_number = notification.data['changeNumber']
default_treatment = notification.data['defaultTreatment']
split_name = notification.data['splitName']

@splits_worker.kill_split(change_number, split_name, default_treatment)
@splits_worker.add_to_queue(notification)
end

def process_segment_update(notification)
Expand Down
89 changes: 69 additions & 20 deletions lib/splitclient-rb/sse/workers/splits_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ module SplitIoClient
module SSE
module Workers
class SplitsWorker
def initialize(synchronizer, config, splits_repository)
def initialize(synchronizer, config, feature_flags_repository, telemetry_runtime_producer, segment_fetcher)
@synchronizer = synchronizer
@config = config
@splits_repository = splits_repository
@feature_flags_repository = feature_flags_repository
@queue = Queue.new
@running = Concurrent::AtomicBoolean.new(false)
@telemetry_runtime_producer = telemetry_runtime_producer
@segment_fetcher = segment_fetcher
end

def start
Expand All @@ -29,36 +31,83 @@ def stop
end

@running.make_false
SplitIoClient::Helpers::ThreadHelper.stop(:split_update_worker, @config)
Helpers::ThreadHelper.stop(:split_update_worker, @config)
end

def add_to_queue(change_number)
@config.logger.debug("feature_flags_worker add to queue #{change_number}")
@queue.push(change_number)
def add_to_queue(notification)
@config.logger.debug("feature_flags_worker add to queue #{notification.data['changeNumber']}")
@queue.push(notification)
end

def kill_split(change_number, split_name, default_treatment)
return if @splits_repository.get_change_number.to_i > change_number
private

@config.logger.debug("feature_flags_worker kill #{split_name}, #{change_number}")
@splits_repository.kill(change_number, split_name, default_treatment)
add_to_queue(change_number)
def perform_thread
@config.threads[:split_update_worker] = Thread.new do
@config.logger.debug('starting feature_flags_worker ...') if @config.debug_enabled
perform
end
end

private

def perform
while (change_number = @queue.pop)
@config.logger.debug("feature_flags_worker change_number dequeue #{change_number}")
@synchronizer.fetch_splits(change_number)
while (notification = @queue.pop)
@config.logger.debug("feature_flags_worker change_number dequeue #{notification.data['changeNumber']}")
case notification.data['type']
when SSE::EventSource::EventTypes::SPLIT_UPDATE
success = update_feature_flag(notification)
@synchronizer.fetch_splits(notification.data['changeNumber']) unless success
when SSE::EventSource::EventTypes::SPLIT_KILL
kill_feature_flag(notification)
end
end
end

def perform_thread
@config.threads[:split_update_worker] = Thread.new do
@config.logger.debug('starting feature_flags_worker ...') if @config.debug_enabled
perform
def update_feature_flag(notification)
return true if @feature_flags_repository.get_change_number.to_i >= notification.data['changeNumber']
return false unless !notification.data['d'].nil? && @feature_flags_repository.get_change_number == notification.data['pcn']

new_split = return_split_from_json(notification)
if Engine::Models::Split.archived?(new_split)
@feature_flags_repository.remove_split(new_split)
else
@feature_flags_repository.add_split(new_split)

fetch_segments_if_not_exists(new_split)
end

@feature_flags_repository.set_change_number(notification.data['changeNumber'])
@telemetry_runtime_producer.record_updates_from_sse(Telemetry::Domain::Constants::SPLITS)

true
rescue StandardError => e
@config.logger.debug("Failed to update Split: #{e.inspect}") if @config.debug_enabled

false
end

def kill_feature_flag(notification)
return if @feature_flags_repository.get_change_number.to_i > notification.data['changeNumber']

@config.logger.debug("feature_flags_worker kill #{notification.data['splitName']}, #{notification.data['changeNumber']}")
@feature_flags_repository.kill(
notification.data['changeNumber'],
notification.data['splitName'],
notification.data['defaultTreatment']
)
@synchronizer.fetch_splits(notification.data['changeNumber'])
end

def return_split_from_json(notification)
split_json = Helpers::DecryptionHelper.get_encoded_definition(notification.data['c'], notification.data['d'])

JSON.parse(split_json, symbolize_names: true)
end

def fetch_segments_if_not_exists(feature_flag)
segment_names = Helpers::Util.segment_names_by_feature_flag(feature_flag)
return if segment_names.nil?

@feature_flags_repository.set_segment_names(segment_names)
@segment_fetcher.fetch_segments_if_not_exists(segment_names)
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions lib/splitclient-rb/telemetry/domain/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class Constants
TREATMENT_WITH_CONFIG = 'treatmentWithConfig'
TREATMENTS_WITH_CONFIG = 'treatmentsWithConfig'
TRACK = 'track'

SPLITS = 'splits'
end
end
end
Expand Down
7 changes: 5 additions & 2 deletions lib/splitclient-rb/telemetry/domain/structs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ module Telemetry
# ls: lastSynchronization, ml: clientMethodLatencies, me: clientMethodExceptions, he: httpErros, hl: httpLatencies,
# tr: tokenRefreshes, ar: authRejections, iq: impressionsQueued, ide: impressionsDeduped, idr: impressionsDropped,
# spc: splitsCount, sec: segmentCount, skc: segmentKeyCount, sl: sessionLengthMs, eq: eventsQueued, ed: eventsDropped,
# se: streamingEvents, t: tags
Usage = Struct.new(:ls, :ml, :me, :he, :hl, :tr, :ar, :iq, :ide, :idr, :spc, :sec, :skc, :sl, :eq, :ed, :se, :t)
# se: streamingEvents, t: tags, ufs: updates from sse
Usage = Struct.new(:ls, :ml, :me, :he, :hl, :tr, :ar, :iq, :ide, :idr, :spc, :sec, :skc, :sl, :eq, :ed, :se, :t, :ufs)

# t: treatment, ts: treatments, tc: treatmentWithConfig, tcs: treatmentsWithConfig, tr: track
ClientMethodLatencies = Struct.new(:t, :ts, :tc, :tcs, :tr)
ClientMethodExceptions = Struct.new(:t, :ts, :tc, :tcs, :tr)

# sp: splits
UpdatesFromSSE = Struct.new(:sp)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ def session_length
@adapter.session_length.value
end

def pop_updates_from_sse
splits = @adapter.updates_from_sse[Domain::Constants::SPLITS]
@adapter.updates_from_sse[Domain::Constants::SPLITS] = 0

UpdatesFromSSE.new(splits)
end

private

def find_last_synchronization(type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ def record_session_length(session)
rescue StandardError => e
@config.log_found_exception(__method__.to_s, e)
end

def record_updates_from_sse(event)
@adapter.updates_from_sse[event] += 1
rescue StandardError => e
@config.log_found_exception(__method__.to_s, e)
end
end
end
end
6 changes: 4 additions & 2 deletions lib/splitclient-rb/telemetry/memory/memory_synchronizer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def synchronize_stats
@telemetry_runtime_consumer.events_stats(Domain::Constants::EVENTS_QUEUED),
@telemetry_runtime_consumer.events_stats(Domain::Constants::EVENTS_DROPPED),
@telemetry_runtime_consumer.pop_streaming_events,
@telemetry_runtime_consumer.pop_tags)
@telemetry_runtime_consumer.pop_tags,
@telemetry_runtime_consumer.pop_updates_from_sse)

@telemetry_api.record_stats(format_stats(usage))
rescue StandardError => e
Expand Down Expand Up @@ -163,7 +164,8 @@ def format_stats(usage)
eQ: usage.eq,
eD: usage.ed,
sE: usage.se,
t: usage.t
t: usage.t,
ufs: usage.ufs.to_h
}
end

Expand Down
3 changes: 2 additions & 1 deletion lib/splitclient-rb/telemetry/runtime_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ class RuntimeConsumer
:pop_auth_rejections,
:pop_token_refreshes,
:pop_streaming_events,
:session_length
:session_length,
:pop_updates_from_sse

def initialize(config)
@runtime = SplitIoClient::Telemetry::MemoryRuntimeConsumer.new(config)
Expand Down
3 changes: 2 additions & 1 deletion lib/splitclient-rb/telemetry/runtime_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ class RuntimeProducer
:record_auth_rejections,
:record_token_refreshes,
:record_streaming_event,
:record_session_length
:record_session_length,
:record_updates_from_sse

def initialize(config)
@runtime = SplitIoClient::Telemetry::MemoryRuntimeProducer.new(config)
Expand Down
Loading

0 comments on commit 7d21bb3

Please sign in to comment.