Skip to content

Commit

Permalink
Add Avro support (#62)
Browse files Browse the repository at this point in the history
* Add Avro support
  • Loading branch information
dannyd4315 authored Apr 8, 2019
1 parent 6d3d4e0 commit 244ccb8
Show file tree
Hide file tree
Showing 17 changed files with 387 additions and 60 deletions.
95 changes: 93 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ gem "streamy"

### Broadcasting events

Streamy includes support for two different types of event encoding (JSON and [Avro](https://avro.apache.org/docs/current/spec.html)).

#### Events with JSON encoding

Add this to config/initializer/streamy.rb

```ruby
Expand All @@ -30,7 +34,7 @@ Create an event:

```ruby
module Events
class ReceivedPayment < Streamy::Event
class ReceivedPayment < Streamy::JsonEvent
def topic
"payments.transactions"
end
Expand All @@ -51,6 +55,93 @@ end
Publish it:


```ruby
Events::ReceivedPayment.publish
```

#### Events with Avro encoding

Add this to config/initializer/streamy.rb

```ruby
require "streamy/message_buses/kafka_message_bus"
Streamy.message_bus = Streamy::MessageBuses::KafkaMessageBus.new(
client_id: "streamy",
seed_brokers: "broker.remote:9092",
ssl_ca_certs_from_system: true,
)

Streamy.configure do |config|
config.avro_schema_registry_url = "http://registry.example.com",
config.avro_schemas_path = "app/schemas"
end
```

*Default schemas path is "app/schemas"*
*Schema Registry Url is required for encoding with Avro*

Create an event:

```ruby
module Events
class ReceivedPayment < Streamy::AvroEvent
def topic
"payments.transactions"
end

def body
{
amount: 200
}
end

def event_time
Time.now
end
end
end
```

Create Avro schema (`received_payment.asvc`) for event in schema path above:

```json
{
"type": "record",
"name": "received_payment",
"fields": [
{
"name": "type",
"type": "string"
},
{
"name": "event_time",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
},
{
"name": "body",
"type": {
"type": "record",
"name": "body",
"fields": [
{
"name": "amount",
"type": ["null", "int"],
"default": null
}
]
}
}
]
}

```

Publish event:


```ruby
Events::ReceivedPayment.publish
```
Expand All @@ -59,7 +150,7 @@ Events::ReceivedPayment.publish

### Consuming events

We use [karafka](https://github.com/karafka/karafka) to handle the bulk of the consumer logic.
We use [karafka](https://github.com/karafka/karafka) to handle the bulk of the consumer logic. You can also use [karafka/avro](https://github.com/karafka/avro) to consume Avro encoded events.

Configure karafka consumer:

Expand Down
15 changes: 15 additions & 0 deletions lib/streamy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ module Streamy
require "active_support/core_ext/string"

require "streamy/version"
require "streamy/configuration"
require "streamy/consumer"
require "streamy/event"
require "streamy/json_event"
require "streamy/avro_event"
require "streamy/event_handler"
require "streamy/message_processor"
require "streamy/profiler"
Expand All @@ -21,6 +24,10 @@ module Streamy
require "streamy/message_buses/message_bus"
require "streamy/message_buses/test_message_bus"

# Avro
# require patches for avro to allow for logical types in schemas
require "avro_patches"

class << self
attr_accessor :message_bus, :worker, :logger, :cache

Expand All @@ -29,6 +36,14 @@ def shutdown
end
end

def self.configuration
@configuration ||= Configuration.new
end

def self.configure
yield(configuration)
end

self.logger = SimpleLogger.new
end

Expand Down
18 changes: 18 additions & 0 deletions lib/streamy/avro_event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
require "avro_turf/messaging"

module Streamy
class AvroEvent < Event
def payload
avro.encode(payload_attributes.deep_stringify_keys, schema_name: type)
end

private

def avro
AvroTurf::Messaging.new(
registry_url: Streamy.configuration.avro_schema_registry_url,
schemas_path: Streamy.configuration.avro_schemas_path
)
end
end
end
10 changes: 10 additions & 0 deletions lib/streamy/configuration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module Streamy
class Configuration
attr_accessor :avro_schema_registry_url, :avro_schemas_path

def initialize
@avro_schema_registry_url = nil
@avro_schemas_path = nil
end
end
end
19 changes: 15 additions & 4 deletions lib/streamy/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class Event

def self.priority(level)
raise "unknown priority: #{level}" unless ALLOWED_PRIORITIES.include? level

self.default_priority = level
end

Expand All @@ -20,10 +21,8 @@ def publish
message_bus.safe_deliver(
key: key,
topic: topic,
type: type,
body: body,
event_time: event_time,
priority: priority
priority: priority,
payload: payload
)
end

Expand Down Expand Up @@ -56,5 +55,17 @@ def body
def event_time
raise "event_time must be implemented on #{self.class}"
end

def payload_attributes
{
type: type,
body: body,
event_time: event_time
}
end

def payload
payload_attributes
end
end
end
23 changes: 18 additions & 5 deletions lib/streamy/helpers/rspec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,28 @@
module Streamy
module Helpers
module RspecHelper
def expect_event(topic: kind_of(String), key: kind_of(String), body: kind_of(Hash), type:, event_time: nil)
expect(Streamy.message_bus.deliveries).to have_hash(
def expect_event(topic: kind_of(String), key: kind_of(String), body: kind_of(Hash), type:, event_time: kind_of(String))
deliveries = hashify_messages(Streamy.message_bus.deliveries)

expect(deliveries).to have_hash(
topic: topic,
key: key,
type: type,
body: body,
event_time: event_time || kind_of(Time)
payload: {
body: body,
type: type,
event_time: event_time
}
)
end

def hashify_messages(message_bus_deliveries)
message_bus_deliveries.map do |message|
message_hash = message.dup
message_hash[:payload] = JSON.parse(message_hash[:payload]).deep_symbolize_keys
message_hash
end
end

alias expect_published_event expect_event

Streamy.message_bus = MessageBuses::TestMessageBus.new
Expand Down
7 changes: 7 additions & 0 deletions lib/streamy/json_event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module Streamy
class JsonEvent < Event
def payload
payload_attributes.to_json
end
end
end
8 changes: 1 addition & 7 deletions lib/streamy/message_buses/kafka_message_bus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,7 @@ def initialize(config)
@kafka = Kafka.new(@config.kafka)
end

def deliver(key:, topic:, type:, body:, event_time:, priority:)
payload = {
type: type,
body: body,
event_time: event_time
}.to_json

def deliver(key:, topic:, payload:, priority:)
producer(priority).tap do |p|
p.produce(payload, key: key, topic: topic)
case priority
Expand Down
2 changes: 1 addition & 1 deletion lib/streamy/message_buses/message_bus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def safe_deliver(*args)
raise PublicationFailedError.new(e, *args)
end

def deliver(key:, topic:, type:, body:, event_time:, priority:)
def deliver(key:, topic:, payload:, priority:)
raise "not implemented"
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/streamy/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Streamy
VERSION = "0.2.0".freeze
VERSION = "0.3.0".freeze
end
3 changes: 3 additions & 0 deletions streamy.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ Gem::Specification.new do |spec| # rubocop:disable Metrics/BlockLength
spec.add_development_dependency "pry", "~> 0.11"
spec.add_development_dependency "rake", "~> 10.0"
spec.add_development_dependency "ruby-kafka", "~> 0.6"
spec.add_development_dependency "sinatra"

spec.add_dependency "activesupport", "~> 5.2"
spec.add_dependency "avro-patches"
spec.add_dependency "avro_turf", "~> 0.8.1"
spec.add_dependency "webmock", "~> 3.3"
end
78 changes: 78 additions & 0 deletions test/avro_event_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
require "test_helper"
require "avro_turf/test/fake_confluent_schema_registry_server"
require "webmock/minitest"

module Streamy
class AvroEventTest < Minitest::Test
def setup
Streamy.configuration.avro_schema_registry_url = "http://registry.example.com"
Streamy.configuration.avro_schemas_path = "test/fixtures/schemas"
FakeConfluentSchemaRegistryServer.clear
stub_request(:any, /^#{Streamy.configuration.avro_schema_registry_url}/).to_rack(FakeConfluentSchemaRegistryServer)
end

class TestEvent < AvroEvent
def topic
:bacon
end

def body
{
smoked: "true",
streaky: "false"
}
end

def event_time
"nowish"
end
end

class IncorrectAttributeEvent < AvroEvent
def topic
:bacon
end

def body
{
smoked: "true",
streaky: 100
}
end

def event_time
"nowish"
end
end

class EventWithNoSchema < AvroEvent
def topic; end
def body; end
def event_time; end
end

def test_publish
SecureRandom.stubs(:uuid).returns("IAMUUID")

TestEvent.publish

assert_published_event(
key: "IAMUUID",
topic: :bacon,
payload: "\u0000\u0000\u0000\u0000\u0000\u0014test_event\u0002\fnowish\u0002\btrue\u0002\nfalse"
)
end

def test_helpful_error_message_on_incorrect_attribute_type
assert_raises Avro::IO::AvroTypeError do
IncorrectAttributeEvent.publish
end
end

def test_helpful_error_message_on_event_with_no_schema
assert_raises AvroTurf::SchemaNotFoundError do
EventWithNoSchema.publish
end
end
end
end
Loading

0 comments on commit 244ccb8

Please sign in to comment.