Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Avro support #62

Merged
merged 29 commits into from
Apr 8, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
52af024
Add Avro support
dannyd4315 Mar 8, 2019
295398b
Added new events and updated message bus
dannyd4315 Mar 22, 2019
773b297
Update specs message bus
dannyd4315 Mar 22, 2019
1a3bdfc
Fixed specs for test events without values
dannyd4315 Mar 22, 2019
54e5f16
Update specs
dannyd4315 Mar 25, 2019
5286d7a
Added avro event support and associated specs
dannyd4315 Mar 26, 2019
45d0889
Removed byebug from dependencys
dannyd4315 Mar 26, 2019
961d57c
Updated documentation to include new avro events
dannyd4315 Mar 27, 2019
dc7e505
Updated formatting in Readme
dannyd4315 Mar 27, 2019
5b4cf74
Added tests for incorrect data type and no schema
dannyd4315 Mar 27, 2019
9896fb2
Removed unrequired item from classes
dannyd4315 Mar 27, 2019
45afcce
Updated configuration for avro messaging
dannyd4315 Mar 28, 2019
c67b04c
Remove old enviroment variables
dannyd4315 Mar 28, 2019
f365fa0
Added payload method to event class
dannyd4315 Mar 28, 2019
c9c372e
Changed naming of configuration options
dannyd4315 Mar 28, 2019
1dcfbbd
Updated typo on avro event
dannyd4315 Mar 28, 2019
9dd6129
Updated readme to make clear that registry url is required
dannyd4315 Mar 28, 2019
81da86e
merge with master
balvig Mar 29, 2019
b637f61
reduce copy/paste
balvig Mar 29, 2019
3992d9f
maybe no need to test these edge cases twice?
balvig Mar 29, 2019
126b710
Updated rspec helper
dannyd4315 Mar 29, 2019
d511c1a
Updated rspec helper for checking events
dannyd4315 Apr 2, 2019
c1332aa
Updated rspec helper for streamy
dannyd4315 Apr 2, 2019
9da4f0a
Updated rspec helper
dannyd4315 Apr 3, 2019
f07c00b
Updated rspec helper
dannyd4315 Apr 3, 2019
2e8ce08
Updated rspec helper
dannyd4315 Apr 3, 2019
61783c1
Updated rspec helper for expect event
dannyd4315 Apr 4, 2019
e927a20
Moved test schemas to fixtures
dannyd4315 Apr 4, 2019
9d31293
Version bump
dannyd4315 Apr 8, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/streamy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ module Streamy
# TODO: Move into classes that use them
require "active_support"
require "active_support/core_ext/string"
require 'byebug'
dannyd4315 marked this conversation as resolved.
Show resolved Hide resolved

require "streamy/version"
require "streamy/consumer"
require "streamy/event"
require "streamy/json_event"
require "streamy/avro_event"
require "streamy/event_handler"
require "streamy/message_processor"
require "streamy/profiler"
Expand Down
17 changes: 17 additions & 0 deletions lib/streamy/avro_event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
require "active_support/core_ext/class/attribute"

module Streamy
class AvroEvent < Event
def payload
avro.encode(payload_attributes.stringify_keys, schema_name: type)
end

def payload_attributes
{
type: type,
body: body,
event_time: event_time
}
end
end
end
4 changes: 1 addition & 3 deletions lib/streamy/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ def self.publish(*args)

def publish
message_bus.safe_deliver(
payload: payload,
key: key,
topic: topic,
type: type,
body: body,
event_time: event_time,
priority: priority
)
end
Expand Down
13 changes: 13 additions & 0 deletions lib/streamy/json_event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
require "active_support/core_ext/class/attribute"

module Streamy
class JsonEvent < Event
def payload
{
type: type,
body: body,
event_time: event_time
}.to_json
end
end
end
8 changes: 1 addition & 7 deletions lib/streamy/message_buses/kafka_message_bus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,7 @@ def initialize(config)
@kafka = Kafka.new(kafka_config)
end

def deliver(key:, topic:, type:, body:, event_time:, priority:)
payload = {
type: type,
body: body,
event_time: event_time
}.to_json

def deliver(key:, topic:, payload:, priority:)
producer(priority).tap do |p|
p.produce(payload, key: key, topic: topic)
case priority
Expand Down
2 changes: 1 addition & 1 deletion lib/streamy/message_buses/message_bus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def safe_deliver(*args)
raise PublicationFailedError.new(e, *args)
end

def deliver(key:, topic:, type:, body:, event_time:, priority:)
def deliver(key:, topic:, payload:, priority:)
raise "not implemented"
end
end
Expand Down
1 change: 1 addition & 0 deletions streamy.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Gem::Specification.new do |spec| # rubocop:disable Metrics/BlockLength
spec.require_paths = ["lib"]

spec.add_development_dependency "bundler", "~> 1.14"
spec.add_development_dependency "byebug"
spec.add_development_dependency "minitest", "~> 5.0"
spec.add_development_dependency "minitest-focus"
spec.add_development_dependency "mocha", "~> 1.4"
Expand Down
16 changes: 8 additions & 8 deletions test/event_test.rb → test/json_event_test.rb
Original file line number Diff line number Diff line change
@@ -1,44 +1,44 @@
require "test_helper"

module Streamy
class EventTest < Minitest::Test
class EventWithoutTopic < Event
class JsonEventTest < Minitest::Test
class EventWithoutTopic < JsonEvent
def event_time; end

def body; end
end

class EventWithoutEventTime < Event
class EventWithoutEventTime < JsonEvent
def topic; end

def body; end
end

class EventWithoutBody < Event
class EventWithoutBody < JsonEvent
def topic; end

def event_time; end
end

def test_helpful_error_message_on_missing_topic
assert_runtime_error "topic must be implemented on Streamy::EventTest::EventWithoutTopic" do
assert_runtime_error "topic must be implemented on Streamy::JsonEventTest::EventWithoutTopic" do
EventWithoutTopic.publish
end
end

def test_helpful_error_message_on_missing_event_time
assert_runtime_error "event_time must be implemented on Streamy::EventTest::EventWithoutEventTime" do
assert_runtime_error "event_time must be implemented on Streamy::JsonEventTest::EventWithoutEventTime" do
EventWithoutEventTime.publish
end
end

def test_helpful_error_message_on_missing_body
assert_runtime_error "body must be implemented on Streamy::EventTest::EventWithoutBody" do
assert_runtime_error "body must be implemented on Streamy::JsonEventTest::EventWithoutBody" do
EventWithoutBody.publish
end
end

class TestEvent < Event
class TestEvent < JsonEvent
def topic
:bacon
end
Expand Down
12 changes: 9 additions & 3 deletions test/message_buses/kafka_message_bus_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,21 @@ def teardown

def example_delivery(priority)
bus.deliver(
payload: payload,
key: "prk-sg-001",
type: "sausage",
topic: "charcuterie",
event_time: "2018",
body: { meat: "pork", herbs: "sage" },
priority: priority
)
end

def payload
{
type: "sausage",
body: { meat: "pork", herbs: "sage" },
event_time: "2018"
}.to_json
end

def expected_event
[
{
Expand Down