Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Avro support #62

Merged
merged 29 commits into from
Apr 8, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
52af024
Add Avro support
dannyd4315 Mar 8, 2019
295398b
Added new events and updated message bus
dannyd4315 Mar 22, 2019
773b297
Update specs message bus
dannyd4315 Mar 22, 2019
1a3bdfc
Fixed specs for test events without values
dannyd4315 Mar 22, 2019
54e5f16
Update specs
dannyd4315 Mar 25, 2019
5286d7a
Added avro event support and associated specs
dannyd4315 Mar 26, 2019
45d0889
Removed byebug from dependencys
dannyd4315 Mar 26, 2019
961d57c
Updated documentation to include new avro events
dannyd4315 Mar 27, 2019
dc7e505
Updated formatting in Readme
dannyd4315 Mar 27, 2019
5b4cf74
Added tests for incorrect data type and no schema
dannyd4315 Mar 27, 2019
9896fb2
Removed unrequired item from classes
dannyd4315 Mar 27, 2019
45afcce
Updated configuration for avro messaging
dannyd4315 Mar 28, 2019
c67b04c
Remove old enviroment variables
dannyd4315 Mar 28, 2019
f365fa0
Added payload method to event class
dannyd4315 Mar 28, 2019
c9c372e
Changed naming of configuration options
dannyd4315 Mar 28, 2019
1dcfbbd
Updated typo on avro event
dannyd4315 Mar 28, 2019
9dd6129
Updated readme to make clear that registry url is required
dannyd4315 Mar 28, 2019
81da86e
merge with master
balvig Mar 29, 2019
b637f61
reduce copy/paste
balvig Mar 29, 2019
3992d9f
maybe no need to test these edge cases twice?
balvig Mar 29, 2019
126b710
Updated rspec helper
dannyd4315 Mar 29, 2019
d511c1a
Updated rspec helper for checking events
dannyd4315 Apr 2, 2019
c1332aa
Updated rspec helper for streamy
dannyd4315 Apr 2, 2019
9da4f0a
Updated rspec helper
dannyd4315 Apr 3, 2019
f07c00b
Updated rspec helper
dannyd4315 Apr 3, 2019
2e8ce08
Updated rspec helper
dannyd4315 Apr 3, 2019
61783c1
Updated rspec helper for expect event
dannyd4315 Apr 4, 2019
e927a20
Moved test schemas to fixtures
dannyd4315 Apr 4, 2019
9d31293
Version bump
dannyd4315 Apr 8, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 94 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ gem "streamy"

### Broadcasting events

Streamy includes support for two different types of event encoding (JSON and [Avro](https://avro.apache.org/docs/current/spec.html))

#### Events with JSON encoding

Add this to config/initializer/streamy.rb

```ruby
Expand All @@ -30,7 +34,7 @@ Create an event:

```ruby
module Events
class ReceivedPayment < Streamy::Event
class ReceivedPayment < Streamy::JsonEvent
def topic
"payments.transactions"
end
Expand All @@ -51,6 +55,94 @@ end
Publish it:


```ruby
Events::ReceivedPayment.publish
```

#### Events with Avro encoding

Add this to config/initializer/streamy.rb

```ruby
require "streamy/message_buses/kafka_message_bus"
dannyd4315 marked this conversation as resolved.
Show resolved Hide resolved
Streamy.message_bus = Streamy::MessageBuses::KafkaMessageBus.new(
client_id: "streamy",
seed_brokers: "broker.remote:9092",
ssl_ca_certs_from_system: true
)
```

Add environment variables for schema path and schema registry url:

```ruby
ENV["SCHEMA_REGISTRY_URL"] = "http://registry.example.com"
dannyd4315 marked this conversation as resolved.
Show resolved Hide resolved
ENV["SCHEMAS_PATH"] = "app/schemas"
```

*Default schemas path is "app/schemas"*
dannyd4315 marked this conversation as resolved.
Show resolved Hide resolved

Create an event:

```ruby
module Events
class ReceivedPayment < Streamy::AvroEvent
def topic
"payments.transactions"
end

def body
{
amount: 200
}
end

def event_time
Time.now
end
end
end
```

Create Avro schema (`received_payment.asvc`) for event in schema path above:

```json
{
"type": "record",
"name": "received_payment",
"fields": [
{
"name": "type",
"type": "string"
},
{
"name": "event_time",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
},
{
"name": "body",
"type": {
"type": "record",
"name": "body",
"fields": [
{
"name": "amount",
"type": ["null", "int"],
"default": null
}
]
}
}
]
}

```
dannyd4315 marked this conversation as resolved.
Show resolved Hide resolved

Publish event:


```ruby
Events::ReceivedPayment.publish
```
Expand All @@ -59,7 +151,7 @@ Events::ReceivedPayment.publish

### Consuming events

We use [karafka](https://github.com/karafka/karafka) to handle the bulk of the consumer logic.
We use [karafka](https://github.com/karafka/karafka) to handle the bulk of the consumer logic. You can also use [karafka/avro](https://github.com/karafka/avro) to consume Avro encoded events
dannyd4315 marked this conversation as resolved.
Show resolved Hide resolved

Configure karafka consumer:

Expand Down
6 changes: 6 additions & 0 deletions lib/streamy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ module Streamy
require "streamy/version"
require "streamy/consumer"
require "streamy/event"
require "streamy/json_event"
require "streamy/avro_event"
require "streamy/event_handler"
require "streamy/message_processor"
require "streamy/profiler"
Expand All @@ -21,6 +23,10 @@ module Streamy
require "streamy/message_buses/message_bus"
require "streamy/message_buses/test_message_bus"

# Avro
# require patches for avro to allow for logical types in schemas
dannyd4315 marked this conversation as resolved.
Show resolved Hide resolved
require "avro_patches"

class << self
attr_accessor :message_bus, :worker, :logger, :cache

Expand Down
23 changes: 23 additions & 0 deletions lib/streamy/avro_event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
require "avro_turf/messaging"

module Streamy
class AvroEvent < Event
def payload
avro.encode(payload_attributes.deep_stringify_keys, schema_name: type)
end

def payload_attributes
{
type: type,
body: body,
event_time: event_time
}
end

private

def avro
AvroTurf::Messaging.new(registry_url: ENV["SCHEMA_REGISTRY_URL"], schemas_path: ENV["SCHEMAS_PATH"])
dannyd4315 marked this conversation as resolved.
Show resolved Hide resolved
end
end
end
7 changes: 3 additions & 4 deletions lib/streamy/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class Event

def self.priority(level)
raise "unknown priority: #{level}" unless ALLOWED_PRIORITIES.include? level

dannyd4315 marked this conversation as resolved.
Show resolved Hide resolved
self.default_priority = level
end

Expand All @@ -20,10 +21,8 @@ def publish
message_bus.safe_deliver(
key: key,
topic: topic,
type: type,
body: body,
event_time: event_time,
priority: priority
priority: priority,
payload: payload
dannyd4315 marked this conversation as resolved.
Show resolved Hide resolved
)
end

Expand Down
11 changes: 11 additions & 0 deletions lib/streamy/json_event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module Streamy
class JsonEvent < Event
def payload
{
type: type,
body: body,
event_time: event_time
}.to_json
end
end
end
8 changes: 1 addition & 7 deletions lib/streamy/message_buses/kafka_message_bus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,7 @@ def initialize(config)
@kafka = Kafka.new(kafka_config)
end

def deliver(key:, topic:, type:, body:, event_time:, priority:)
payload = {
type: type,
body: body,
event_time: event_time
}.to_json

def deliver(key:, topic:, payload:, priority:)
producer(priority).tap do |p|
p.produce(payload, key: key, topic: topic)
case priority
Expand Down
2 changes: 1 addition & 1 deletion lib/streamy/message_buses/message_bus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def safe_deliver(*args)
raise PublicationFailedError.new(e, *args)
end

def deliver(key:, topic:, type:, body:, event_time:, priority:)
def deliver(key:, topic:, payload:, priority:)
raise "not implemented"
end
end
Expand Down
3 changes: 3 additions & 0 deletions streamy.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ Gem::Specification.new do |spec| # rubocop:disable Metrics/BlockLength
spec.add_development_dependency "pry", "~> 0.11"
spec.add_development_dependency "rake", "~> 10.0"
spec.add_development_dependency "ruby-kafka", "~> 0.6"
spec.add_development_dependency "sinatra"
dannyd4315 marked this conversation as resolved.
Show resolved Hide resolved

spec.add_dependency "activesupport", "~> 5.2"
spec.add_dependency "avro-patches"
dannyd4315 marked this conversation as resolved.
Show resolved Hide resolved
spec.add_dependency "avro_turf", "~> 0.8.1"
spec.add_dependency "webmock", "~> 3.3"
end
149 changes: 149 additions & 0 deletions test/avro_event_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
require "test_helper"
require "avro_turf/test/fake_confluent_schema_registry_server"
require "webmock/minitest"

module Streamy
class AvroEventTest < Minitest::Test
def setup
FakeConfluentSchemaRegistryServer.clear
registry_url = ENV["SCHEMA_REGISTRY_URL"]
stub_request(:any, /^#{registry_url}/).to_rack(FakeConfluentSchemaRegistryServer)
end

class EventWithoutTopic < AvroEvent
def event_time; end

def body; end
end

class EventWithoutEventTime < AvroEvent
def topic; end

def body; end
end

class EventWithoutBody < AvroEvent
def topic; end

def event_time; end
end

def test_helpful_error_message_on_missing_topic
assert_runtime_error "topic must be implemented on Streamy::AvroEventTest::EventWithoutTopic" do
EventWithoutTopic.publish
end
end

def test_helpful_error_message_on_missing_event_time
assert_runtime_error "event_time must be implemented on Streamy::AvroEventTest::EventWithoutEventTime" do
EventWithoutEventTime.publish
end
end

def test_helpful_error_message_on_missing_body
assert_runtime_error "body must be implemented on Streamy::AvroEventTest::EventWithoutBody" do
EventWithoutBody.publish
end
end

class IncorrectAttributeEvent < AvroEvent
def topic
:bacon
end

def body
{
smoked: "true",
streaky: 100
}
end

def event_time
"nowish"
end
end

def test_helpful_error_message_on_incorrect_attribute_type
assert_raises Avro::IO::AvroTypeError do
IncorrectAttributeEvent.publish
end
end

class EventWithNoSchema < AvroEvent
def topic; end

def body; end

def event_time; end
end

def test_helpful_error_message_on_event_with_no_schema
assert_raises AvroTurf::SchemaNotFoundError do
EventWithNoSchema.publish
end
end

class TestEvent < AvroEvent
def topic
:bacon
end

def body
{
smoked: "true",
streaky: "false"
}
end

def event_time
"nowish"
end
end

def test_publish
SecureRandom.stubs(:uuid).returns("IAMUUID")

assert_published_event(
TestEvent.new,
key: "IAMUUID",
topic: :bacon,
payload: "\u0000\u0000\u0000\u0000\u0000\u0014test_event\u0002\fnowish\u0002\btrue\u0002\nfalse"
)
end

def test_default_priority
assert_published_event(TestEvent.new, priority: :standard)
end

class OveriddenPriority < TestEvent
priority :low
end

def test_overidden_priority
assert_published_event(OveriddenPriority.new, priority: :low)
end

private

def assert_published_event(event, assertions)
bus = mock("message_bus")
Streamy.stubs(:message_bus).returns(bus)

bus.expects(:safe_deliver).with do |params|
assertions.each do |key, value|
assert_equal value, params[key]
end
end

event.publish
end

def assert_runtime_error(message, &block)
error = assert_raises RuntimeError do
yield
end

assert_equal message, error.message
end
end
end
Loading