diff --git a/README.md b/README.md index a18a632..110aea8 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,10 @@ gem "streamy" ### Broadcasting events +Streamy includes support for two different types of event encoding (JSON and [Avro](https://avro.apache.org/docs/current/spec.html)). + +#### Events with JSON encoding + Add this to config/initializer/streamy.rb ```ruby @@ -30,7 +34,7 @@ Create an event: ```ruby module Events - class ReceivedPayment < Streamy::Event + class ReceivedPayment < Streamy::JsonEvent def topic "payments.transactions" end @@ -51,6 +55,93 @@ end Publish it: +```ruby +Events::ReceivedPayment.publish +``` + +#### Events with Avro encoding + +Add this to config/initializer/streamy.rb + +```ruby +require "streamy/message_buses/kafka_message_bus" +Streamy.message_bus = Streamy::MessageBuses::KafkaMessageBus.new( + client_id: "streamy", + seed_brokers: "broker.remote:9092", + ssl_ca_certs_from_system: true, +) + +Streamy.configure do |config| + config.avro_schema_registry_url = "http://registry.example.com", + config.avro_schemas_path = "app/schemas" +end +``` + +*Default schemas path is "app/schemas"* +*Schema Registry Url is required for encoding with Avro* + +Create an event: + +```ruby +module Events + class ReceivedPayment < Streamy::AvroEvent + def topic + "payments.transactions" + end + + def body + { + amount: 200 + } + end + + def event_time + Time.now + end + end +end +``` + +Create Avro schema (`received_payment.asvc`) for event in schema path above: + +```json +{ + "type": "record", + "name": "received_payment", + "fields": [ + { + "name": "type", + "type": "string" + }, + { + "name": "event_time", + "type": { + "type": "long", + "logicalType": "timestamp-micros" + } + }, + { + "name": "body", + "type": { + "type": "record", + "name": "body", + "fields": [ + { + "name": "amount", + "type": ["null", "int"], + "default": null + } + ] + } + } + ] +} + +``` + +Publish event: + + ```ruby Events::ReceivedPayment.publish ``` @@ -59,7 +150,7 @@ Events::ReceivedPayment.publish ### Consuming events -We use [karafka](https://github.com/karafka/karafka) to handle the bulk of the consumer logic. +We use [karafka](https://github.com/karafka/karafka) to handle the bulk of the consumer logic. You can also use [karafka/avro](https://github.com/karafka/avro) to consume Avro encoded events. Configure karafka consumer: diff --git a/lib/streamy.rb b/lib/streamy.rb index d563aed..0ba18da 100644 --- a/lib/streamy.rb +++ b/lib/streamy.rb @@ -5,8 +5,11 @@ module Streamy require "active_support/core_ext/string" require "streamy/version" + require "streamy/configuration" require "streamy/consumer" require "streamy/event" + require "streamy/json_event" + require "streamy/avro_event" require "streamy/event_handler" require "streamy/message_processor" require "streamy/profiler" @@ -21,6 +24,10 @@ module Streamy require "streamy/message_buses/message_bus" require "streamy/message_buses/test_message_bus" + # Avro + # require patches for avro to allow for logical types in schemas + require "avro_patches" + class << self attr_accessor :message_bus, :worker, :logger, :cache @@ -29,6 +36,14 @@ def shutdown end end + def self.configuration + @configuration ||= Configuration.new + end + + def self.configure + yield(configuration) + end + self.logger = SimpleLogger.new end diff --git a/lib/streamy/avro_event.rb b/lib/streamy/avro_event.rb new file mode 100644 index 0000000..c07582a --- /dev/null +++ b/lib/streamy/avro_event.rb @@ -0,0 +1,18 @@ +require "avro_turf/messaging" + +module Streamy + class AvroEvent < Event + def payload + avro.encode(payload_attributes.deep_stringify_keys, schema_name: type) + end + + private + + def avro + AvroTurf::Messaging.new( + registry_url: Streamy.configuration.avro_schema_registry_url, + schemas_path: Streamy.configuration.avro_schemas_path + ) + end + end +end diff --git a/lib/streamy/configuration.rb b/lib/streamy/configuration.rb new file mode 100644 index 0000000..c237410 --- /dev/null +++ b/lib/streamy/configuration.rb @@ -0,0 +1,10 @@ +module Streamy + class Configuration + attr_accessor :avro_schema_registry_url, :avro_schemas_path + + def initialize + @avro_schema_registry_url = nil + @avro_schemas_path = nil + end + end +end diff --git a/lib/streamy/event.rb b/lib/streamy/event.rb index bcd629c..910fa56 100644 --- a/lib/streamy/event.rb +++ b/lib/streamy/event.rb @@ -7,6 +7,7 @@ class Event def self.priority(level) raise "unknown priority: #{level}" unless ALLOWED_PRIORITIES.include? level + self.default_priority = level end @@ -20,10 +21,8 @@ def publish message_bus.safe_deliver( key: key, topic: topic, - type: type, - body: body, - event_time: event_time, - priority: priority + priority: priority, + payload: payload ) end @@ -56,5 +55,17 @@ def body def event_time raise "event_time must be implemented on #{self.class}" end + + def payload_attributes + { + type: type, + body: body, + event_time: event_time + } + end + + def payload + payload_attributes + end end end diff --git a/lib/streamy/helpers/rspec_helper.rb b/lib/streamy/helpers/rspec_helper.rb index 2dc74fb..9f74dae 100644 --- a/lib/streamy/helpers/rspec_helper.rb +++ b/lib/streamy/helpers/rspec_helper.rb @@ -3,15 +3,28 @@ module Streamy module Helpers module RspecHelper - def expect_event(topic: kind_of(String), key: kind_of(String), body: kind_of(Hash), type:, event_time: nil) - expect(Streamy.message_bus.deliveries).to have_hash( + def expect_event(topic: kind_of(String), key: kind_of(String), body: kind_of(Hash), type:, event_time: kind_of(String)) + deliveries = hashify_messages(Streamy.message_bus.deliveries) + + expect(deliveries).to have_hash( topic: topic, key: key, - type: type, - body: body, - event_time: event_time || kind_of(Time) + payload: { + body: body, + type: type, + event_time: event_time + } ) end + + def hashify_messages(message_bus_deliveries) + message_bus_deliveries.map do |message| + message_hash = message.dup + message_hash[:payload] = JSON.parse(message_hash[:payload]).deep_symbolize_keys + message_hash + end + end + alias expect_published_event expect_event Streamy.message_bus = MessageBuses::TestMessageBus.new diff --git a/lib/streamy/json_event.rb b/lib/streamy/json_event.rb new file mode 100644 index 0000000..33f25aa --- /dev/null +++ b/lib/streamy/json_event.rb @@ -0,0 +1,7 @@ +module Streamy + class JsonEvent < Event + def payload + payload_attributes.to_json + end + end +end diff --git a/lib/streamy/message_buses/kafka_message_bus.rb b/lib/streamy/message_buses/kafka_message_bus.rb index d12ebfd..df851d3 100644 --- a/lib/streamy/message_buses/kafka_message_bus.rb +++ b/lib/streamy/message_buses/kafka_message_bus.rb @@ -11,13 +11,7 @@ def initialize(config) @kafka = Kafka.new(@config.kafka) end - def deliver(key:, topic:, type:, body:, event_time:, priority:) - payload = { - type: type, - body: body, - event_time: event_time - }.to_json - + def deliver(key:, topic:, payload:, priority:) producer(priority).tap do |p| p.produce(payload, key: key, topic: topic) case priority diff --git a/lib/streamy/message_buses/message_bus.rb b/lib/streamy/message_buses/message_bus.rb index b636265..65606e5 100644 --- a/lib/streamy/message_buses/message_bus.rb +++ b/lib/streamy/message_buses/message_bus.rb @@ -7,7 +7,7 @@ def safe_deliver(*args) raise PublicationFailedError.new(e, *args) end - def deliver(key:, topic:, type:, body:, event_time:, priority:) + def deliver(key:, topic:, payload:, priority:) raise "not implemented" end end diff --git a/lib/streamy/version.rb b/lib/streamy/version.rb index 52236ee..d7e8fff 100644 --- a/lib/streamy/version.rb +++ b/lib/streamy/version.rb @@ -1,3 +1,3 @@ module Streamy - VERSION = "0.2.0".freeze + VERSION = "0.3.0".freeze end diff --git a/streamy.gemspec b/streamy.gemspec index 3fc731c..f31afb7 100644 --- a/streamy.gemspec +++ b/streamy.gemspec @@ -38,7 +38,10 @@ Gem::Specification.new do |spec| # rubocop:disable Metrics/BlockLength spec.add_development_dependency "pry", "~> 0.11" spec.add_development_dependency "rake", "~> 10.0" spec.add_development_dependency "ruby-kafka", "~> 0.6" + spec.add_development_dependency "sinatra" spec.add_dependency "activesupport", "~> 5.2" + spec.add_dependency "avro-patches" + spec.add_dependency "avro_turf", "~> 0.8.1" spec.add_dependency "webmock", "~> 3.3" end diff --git a/test/avro_event_test.rb b/test/avro_event_test.rb new file mode 100644 index 0000000..8035402 --- /dev/null +++ b/test/avro_event_test.rb @@ -0,0 +1,78 @@ +require "test_helper" +require "avro_turf/test/fake_confluent_schema_registry_server" +require "webmock/minitest" + +module Streamy + class AvroEventTest < Minitest::Test + def setup + Streamy.configuration.avro_schema_registry_url = "http://registry.example.com" + Streamy.configuration.avro_schemas_path = "test/fixtures/schemas" + FakeConfluentSchemaRegistryServer.clear + stub_request(:any, /^#{Streamy.configuration.avro_schema_registry_url}/).to_rack(FakeConfluentSchemaRegistryServer) + end + + class TestEvent < AvroEvent + def topic + :bacon + end + + def body + { + smoked: "true", + streaky: "false" + } + end + + def event_time + "nowish" + end + end + + class IncorrectAttributeEvent < AvroEvent + def topic + :bacon + end + + def body + { + smoked: "true", + streaky: 100 + } + end + + def event_time + "nowish" + end + end + + class EventWithNoSchema < AvroEvent + def topic; end + def body; end + def event_time; end + end + + def test_publish + SecureRandom.stubs(:uuid).returns("IAMUUID") + + TestEvent.publish + + assert_published_event( + key: "IAMUUID", + topic: :bacon, + payload: "\u0000\u0000\u0000\u0000\u0000\u0014test_event\u0002\fnowish\u0002\btrue\u0002\nfalse" + ) + end + + def test_helpful_error_message_on_incorrect_attribute_type + assert_raises Avro::IO::AvroTypeError do + IncorrectAttributeEvent.publish + end + end + + def test_helpful_error_message_on_event_with_no_schema + assert_raises AvroTurf::SchemaNotFoundError do + EventWithNoSchema.publish + end + end + end +end diff --git a/test/event_test.rb b/test/event_test.rb index 3027b17..9169114 100644 --- a/test/event_test.rb +++ b/test/event_test.rb @@ -2,6 +2,16 @@ module Streamy class EventTest < Minitest::Test + class ValidEvent < Event + def topic; end + def body; end + def event_time; end + end + + class OveriddenPriority < ValidEvent + priority :low + end + class EventWithoutTopic < Event def event_time; end def body; end @@ -17,27 +27,6 @@ def topic; end def event_time; end end - class TestEvent < Event - def topic - :bacon - end - - def body - { - smoked: "true", - streaky: "false" - } - end - - def event_time - "nowish" - end - end - - class OveriddenPriority < TestEvent - priority :low - end - def test_helpful_error_message_on_missing_topic assert_runtime_error "topic must be implemented on Streamy::EventTest::EventWithoutTopic" do EventWithoutTopic.publish @@ -56,28 +45,14 @@ def test_helpful_error_message_on_missing_body end end - def test_publish - SecureRandom.stubs(:uuid).returns("IAMUUID") - - TestEvent.new.publish - - assert_published_event( - key: "IAMUUID", - topic: :bacon, - type: "test_event", - body: { smoked: "true", streaky: "false" }, - event_time: "nowish" - ) - end - def test_default_priority - TestEvent.new.publish + ValidEvent.publish assert_published_event(priority: :standard) end def test_overidden_priority - OveriddenPriority.new.publish + OveriddenPriority.publish assert_published_event(priority: :low) end diff --git a/test/fixtures/schemas/incorrect_attribute_event.avsc b/test/fixtures/schemas/incorrect_attribute_event.avsc new file mode 100644 index 0000000..580d253 --- /dev/null +++ b/test/fixtures/schemas/incorrect_attribute_event.avsc @@ -0,0 +1,34 @@ +{ + "type": "record", + "name": "incorrect_attribute_event", + "fields": [ + { + "name": "type", + "type": "string" + }, + { + "name": "event_time", + "type": ["null", "string"], + "default": null + }, + { + "name": "body", + "type": { + "type": "record", + "name": "body", + "fields": [ + { + "name": "smoked", + "type": ["null", "string"], + "default": null + }, + { + "name": "streaky", + "type": ["null", "string"], + "default": null + } + ] + } + } + ] +} diff --git a/test/fixtures/schemas/test_event.avsc b/test/fixtures/schemas/test_event.avsc new file mode 100644 index 0000000..5e4670a --- /dev/null +++ b/test/fixtures/schemas/test_event.avsc @@ -0,0 +1,34 @@ +{ + "type": "record", + "name": "test_event", + "fields": [ + { + "name": "type", + "type": "string" + }, + { + "name": "event_time", + "type": ["null", "string"], + "default": null + }, + { + "name": "body", + "type": { + "type": "record", + "name": "body", + "fields": [ + { + "name": "smoked", + "type": ["null", "string"], + "default": null + }, + { + "name": "streaky", + "type": ["null", "string"], + "default": null + } + ] + } + } + ] +} diff --git a/test/json_event_test.rb b/test/json_event_test.rb new file mode 100644 index 0000000..8150ff3 --- /dev/null +++ b/test/json_event_test.rb @@ -0,0 +1,38 @@ +require "test_helper" + +module Streamy + class JsonEventTest < Minitest::Test + class TestEvent < JsonEvent + def topic + :bacon + end + + def body + { + smoked: "true", + streaky: "false" + } + end + + def event_time + "nowish" + end + end + + def test_publish + SecureRandom.stubs(:uuid).returns("IAMUUID") + + TestEvent.publish + + assert_published_event( + key: "IAMUUID", + topic: :bacon, + payload: { + type: "test_event", + body: { smoked: "true", streaky: "false" }, + event_time: "nowish" + }.to_json + ) + end + end +end diff --git a/test/message_buses/kafka_message_bus_test.rb b/test/message_buses/kafka_message_bus_test.rb index 7aa1acd..fc30661 100644 --- a/test/message_buses/kafka_message_bus_test.rb +++ b/test/message_buses/kafka_message_bus_test.rb @@ -21,15 +21,21 @@ def teardown def example_delivery(priority) bus.deliver( + payload: payload, key: "prk-sg-001", - type: "sausage", topic: "charcuterie", - event_time: "2018", - body: { meat: "pork", herbs: "sage" }, priority: priority ) end + def payload + { + type: "sausage", + body: { meat: "pork", herbs: "sage" }, + event_time: "2018" + }.to_json + end + def expected_event [ {