Skip to content

Commit

Permalink
feat: return optional opts when encoding
Browse files Browse the repository at this point in the history
This allows us to pass through the `partial?` flag and use it when
publishing MQTT messages.

The partial encoders will accept all non-empty updates from
merge_filter, and emit them for the MQTT sink.

The other encoders will accept only full updates from merge_filter, and
pass them to the S3 and Filesystem sinks.
  • Loading branch information
paulswartz committed Sep 17, 2023
1 parent 028f1ef commit 79ec7e4
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 14 deletions.
2 changes: 1 addition & 1 deletion lib/concentrate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ defmodule Concentrate do
end

defp decode_sinks_object(_, acc) do
acc
Keyword.new(acc)
end

defp is_possible_env_var(value) do
Expand Down
6 changes: 5 additions & 1 deletion lib/concentrate/encoder/producer_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ defmodule Concentrate.Encoder.ProducerConsumer do
"#{__MODULE__} encoded filename=#{inspect(filename)} time=#{time / 1000}"
end)

{filename, encoded}
if FeedUpdate.partial?(update) do
{filename, encoded, partial?: true}
else
{filename, encoded}
end
end

{:noreply, responses, state, :hibernate}
Expand Down
10 changes: 10 additions & 0 deletions lib/concentrate/sink/filesystem.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ defmodule Concentrate.Sink.Filesystem do
"""
require Logger

def start_link(opts, file_data)

def start_link(opts, {filename, body, file_opts}) do
if file_opts[:partial?] do
:ignore
else
start_link(opts, {filename, body})
end
end

def start_link(opts, {filename, body}) do
directory = Keyword.fetch!(opts, :directory)

Expand Down
26 changes: 21 additions & 5 deletions lib/concentrate/sink/mqtt/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ defmodule Concentrate.Sink.Mqtt.Connection do
GenServer.start_link(__MODULE__, opts, name: @name)
end

def publish({filename, body}, opts) do
def publish(file_data, opts) do
topic_prefix = opts[:prefix] || ""
GenServer.cast(@name, {:publish, topic_prefix <> filename, body})
GenServer.cast(@name, {:publish, topic_prefix, file_data})
end

@impl GenServer
Expand All @@ -36,11 +36,27 @@ defmodule Concentrate.Sink.Mqtt.Connection do
end

@impl GenServer
def handle_cast({:publish, topic, body}, client) do
def handle_cast({:publish, topic_prefix, file_data}, client) do
{filename, body, file_opts} =
case file_data do
{filename, body} ->
{filename, body, []}

_ ->
file_data
end

topic = topic_prefix <> filename
payload = :zlib.gzip(body)

message =
struct!(%EmqttFailover.Message{topic: topic, payload: payload}, qos: 1, retain?: true)
message_opts =
if file_opts[:partial?] do
[qos: 0]
else
[qos: 1, retain?: true]
end

message = struct!(%EmqttFailover.Message{topic: topic, payload: payload}, message_opts)

_ = EmqttFailover.Connection.publish(client, message)

Expand Down
14 changes: 12 additions & 2 deletions lib/concentrate/sink/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,22 @@ defmodule Concentrate.Sink.S3 do

@ex_aws Application.compile_env(:concentrate, [:sink_s3, :ex_aws], ExAws)

def start_link(opts, file_data) do
def start_link(opts, file_data)

def start_link(opts, {filename, body, file_opts}) do
if file_opts[:partial?] do
:ignore
else
start_link(opts, {filename, body})
end
end

def start_link(opts, {filename, body}) do
bucket = Keyword.fetch!(opts, :bucket)
prefix = Keyword.get(opts, :prefix, "")
acl = Keyword.get(opts, :acl, :public_read)
state = %{bucket: bucket, prefix: prefix, acl: acl}
Task.start_link(__MODULE__, :upload_to_s3, [file_data, state])
Task.start_link(__MODULE__, :upload_to_s3, [{filename, body}, state])
end

def upload_to_s3({filename, body}, state) do
Expand Down
17 changes: 13 additions & 4 deletions lib/concentrate/supervisor/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@ defmodule Concentrate.Supervisor.Pipeline do

def children(config) do
{source_names, source_children} = sources(config[:sources])

{output_names, output_children} = encoders(config[:encoders])

file_tap = [{Concentrate.Producer.FileTap, config[:file_tap]}]
merge_filter = merge(source_names, config)
source_reporters = source_reporters(source_names, config[:source_reporters])
reporters = reporters(config[:reporters])
sinks = sinks(config[:sinks], [Concentrate.Producer.FileTap] ++ output_names)

sinks =
sinks(
config[:sinks],
[Concentrate.Producer.FileTap] ++ output_names
)

Enum.concat([
source_children,
Expand Down Expand Up @@ -107,17 +114,19 @@ defmodule Concentrate.Supervisor.Pipeline do
def encoders(config) do
children =
for {filename, encoder} <- config[:files] do
name = encoder

child_spec(
{
Concentrate.Encoder.ProducerConsumer,
name: encoder,
name: name,
files: [{filename, encoder}],
subscribe_to: [
merge_filter: [max_demand: 10, selector: &(not Concentrate.FeedUpdate.partial?(&1))]
merge_filter: [max_demand: 10]
],
buffer_size: 1
},
id: encoder
id: name
)
end

Expand Down
3 changes: 2 additions & 1 deletion test/concentrate_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ defmodule ConcentrateTest do

assert config[:gtfs][:url] == "gtfs_url"
assert config[:alerts][:url] == "alerts_url"
assert is_list(config[:sinks])
assert is_list(config[:sinks][:s3])
assert config[:sinks][:s3][:bucket] == "s3-bucket"
assert config[:sinks][:s3][:prefix] == "bucket_prefix"
Expand All @@ -124,7 +125,7 @@ defmodule ConcentrateTest do
assert config[:sources][:gtfs_realtime] == %{}
assert config[:sources][:gtfs_realtime_enhanced] == %{}
assert config[:gtfs] == nil
assert config[:sinks] == %{}
assert config[:sinks] == []
end

test "gtfs_realtime sources can have additional route configuration" do
Expand Down

0 comments on commit 79ec7e4

Please sign in to comment.