Skip to content

Commit

Permalink
refactor(Predictions.PredictionsPubSub): adjust dispatching approach (#…
Browse files Browse the repository at this point in the history
…1816)

* tweak(PredictionsPubSub): send first broadcast quickly

* tweak(PredictionsPubSub): separate the timed broadcast call

This lets us manually call for a :broadcast without starting a new repeating timer.

* refactor(Predictions.Stream): update the store

instead of broadcasting to `PredictionsPubSub` to update `Predictions.Store`, just update the store directly from the stream.

* tweak(PredictionsPubSub): change state shape

* feat(Predictions.Stream): broadcast on :reset

* feat(PredictionsPubSub): track dispatched predictions

don't dispatch if the predictions are unchanged.
- refactor: fetch predictions only once for each distinct set of fetch_keys, before dispatching to one or more pids
  • Loading branch information
thecristen authored Dec 6, 2023
1 parent e005cba commit f34888d
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 182 deletions.
75 changes: 53 additions & 22 deletions apps/predictions/lib/predictions_pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ defmodule Predictions.PredictionsPubSub do
Allow channels to subscribe to prediction streams, which are collected into an
ETS table keyed by prediction ID, route ID, stop ID, direction ID, trip ID,
and vehicle ID for easy retrieval.
Set up to broadcast predictions periodically, or can be broadcast on-demand.
"""

use GenServer
Expand Down Expand Up @@ -54,22 +56,29 @@ defmodule Predictions.PredictionsPubSub do
def init(opts) do
subscribe_fn = Keyword.get(opts, :subscribe_fn, &Phoenix.PubSub.subscribe/2)
subscribe_fn.(Predictions.PubSub, "predictions")
broadcast_timer(@broadcast_interval_ms)
{:ok, %{}}
broadcast_timer(50)

{:ok,
%{
callers_by_pid: %{},
last_dispatched_by_fetch_keys: %{}
}}
end

@impl GenServer
def handle_call(
{:subscribe, stream_filter},
{from_pid, _ref},
state
%{
callers_by_pid: callers
} = state
) do
registry_key = self()

# Let us detect when the calling process goes down, and save the associated
# API params for easier lookup
Process.monitor(from_pid)
new_state = Map.put_new(state, from_pid, stream_filter)
new_state = %{state | callers_by_pid: Map.put_new(callers, from_pid, stream_filter)}

predictions =
stream_filter
Expand All @@ -84,28 +93,56 @@ defmodule Predictions.PredictionsPubSub do
registry_key = self()

Registry.dispatch(@subscribers, registry_key, fn entries ->
entries
|> Enum.uniq_by(fn {pid, {fetch_keys, _}} ->
{pid, fetch_keys}
Enum.group_by(
entries,
fn {_, {fetch_keys, _}} -> fetch_keys end,
fn {pid, {_, _}} -> pid end
)
|> Enum.each(fn {fetch_keys, pids} ->
new_predictions = Store.fetch(fetch_keys)

pids
|> Enum.uniq()
|> Enum.each(&send(self(), {:dispatch, &1, fetch_keys, new_predictions}))
end)
|> Enum.each(&send_data/1)
end)

broadcast_timer(@broadcast_interval_ms)
{:noreply, state}
end

def handle_info({event, predictions}, state) do
:ok = Store.update(event, predictions)
def handle_info(:timed_broadcast, state) do
send(self(), :broadcast)
broadcast_timer()
{:noreply, state}
end

def handle_info(
{:dispatch, pid, fetch_keys, predictions},
%{
last_dispatched_by_fetch_keys: last_dispatched
} = state
) do
if Map.get(last_dispatched, fetch_keys) != predictions do
send(pid, {:new_predictions, predictions})

{:noreply,
%{
state
| last_dispatched_by_fetch_keys: Map.put_new(last_dispatched, fetch_keys, predictions)
}}
else
{:noreply, state}
end
end

def handle_info(
{:DOWN, parent_ref, :process, caller_pid, _reason},
state
%{
callers_by_pid: callers
} = state
) do
Process.demonitor(parent_ref, [:flush])
{%Predictions.StreamTopic{streams: streams}, new_state} = Map.pop(state, caller_pid)
{%Predictions.StreamTopic{streams: streams}, new_callers} = Map.pop(callers, caller_pid)

Enum.each(streams, fn stream ->
# Here we can check if there are other subscribers for the associated key.
Expand All @@ -116,7 +153,7 @@ defmodule Predictions.PredictionsPubSub do
end
end)

{:noreply, new_state}
{:noreply, %{state | callers_by_pid: new_callers}}
end

# find registrations for this filter from processes other than the indicated pid
Expand All @@ -128,13 +165,7 @@ defmodule Predictions.PredictionsPubSub do
Registry.select(@subscribers, [{pattern, guards, body}]) == []
end

@spec send_data({pid, registry_value()}) :: broadcast_message()
defp send_data({pid, {fetch_keys, _}}) do
new_predictions = fetch_keys |> Store.fetch()
send(pid, {:new_predictions, new_predictions})
end

defp broadcast_timer(interval) do
Process.send_after(self(), :broadcast, interval)
defp broadcast_timer(interval \\ @broadcast_interval_ms) do
Process.send_after(self(), :timed_broadcast, interval)
end
end
8 changes: 5 additions & 3 deletions apps/predictions/lib/store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ defmodule Predictions.Store do
GenServer.call(__MODULE__, {:fetch, keys})
end

@spec update(atom, [Prediction.t()]) :: :ok
def update(event, predictions) do
GenServer.cast(__MODULE__, {event, predictions})
@spec update({atom, [[Prediction.t()]]}) :: :ok
def update({event, predictions_batches}) do
Enum.each(predictions_batches, &GenServer.cast(__MODULE__, {event, &1}))
end

# Server
Expand All @@ -47,6 +47,8 @@ defmodule Predictions.Store do
end

@impl true
def handle_cast({_, []}, table), do: {:noreply, table}

def handle_cast({event, predictions}, table) when event in [:add, :update, :reset] do
:ets.insert(table, Enum.map(predictions, &to_record/1))

Expand Down
54 changes: 28 additions & 26 deletions apps/predictions/lib/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ defmodule Predictions.Stream do
use GenStage
require Logger

alias V3Api.Stream.Event
alias Phoenix.PubSub
alias Predictions.{Prediction, Repo, StreamParser}
alias Predictions.{Repo, Store, StreamParser}

@type event_type :: :reset | :add | :update | :remove

Expand All @@ -22,46 +21,49 @@ defmodule Predictions.Stream do
)
end

@impl GenStage
def init(opts) do
producer_consumer = Keyword.fetch!(opts, :subscribe_to)
broadcast_fn = Keyword.get(opts, :broadcast_fn, &PubSub.broadcast/3)
{:consumer, %{broadcast_fn: broadcast_fn}, subscribe_to: [producer_consumer]}

initial_state =
Map.new()
|> Map.put_new(:broadcast_fn, Keyword.get(opts, :broadcast_fn, &PubSub.broadcast/3))
|> Map.put_new(:started?, false)

{:consumer, initial_state, subscribe_to: [producer_consumer]}
end

@impl GenStage
def handle_events(events, _from, state) do
:ok = Enum.each(events, &send_event(&1, state.broadcast_fn))
{:noreply, [], state}
batches = Enum.group_by(events, & &1.event, &to_predictions(&1.data))
:ok = Enum.each(batches, &Store.update/1)
{:noreply, [], initial_broadcast(state)}
end

# Broadcast when the first event for this stream is received
def initial_broadcast(%{started?: false} = state) do
broadcast(state.broadcast_fn)
%{state | started?: true}
end

defp send_event(
%Event{
event: type,
data: %JsonApi{data: data}
},
broadcast_fn
) do
def initial_broadcast(state), do: state

defp to_predictions(%JsonApi{data: data}) do
data
|> Enum.filter(&Repo.has_trip?/1)
|> Enum.map(&StreamParser.parse/1)
|> broadcast(type, broadcast_fn)
end

defp send_event(
%Event{
data: {:error, _} = error
},
_broadcast_fn
) do
error
defp to_predictions({:error, _} = error) do
_ = log_errors(error)
[]
end

@typep broadcast_fn :: (atom, String.t(), any -> :ok | {:error, any})
@spec broadcast([Prediction.t() | String.t()], event_type, broadcast_fn) :: :ok
defp broadcast([], _type, _broadcast_fn), do: :ok

defp broadcast(data, type, broadcast_fn) do
@spec broadcast(broadcast_fn) :: :ok
defp broadcast(broadcast_fn) do
Predictions.PubSub
|> broadcast_fn.("predictions", {type, data})
|> broadcast_fn.("predictions", :broadcast)
|> log_errors()
end

Expand Down
110 changes: 20 additions & 90 deletions apps/predictions/test/predictions_pub_sub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@ defmodule Predictions.PredictionsPubSubTest do
route: %Route{id: "39"},
stop: %Stop{id: @stop_id}
}
@prediction66 %Prediction{
id: "prediction66",
direction_id: 1,
route: %Route{id: "66"},
stop: %Stop{id: "other_stop"}
}

@channel_args "stop:#{@stop_id}"

setup_with_mocks([
Expand Down Expand Up @@ -100,75 +93,6 @@ defmodule Predictions.PredictionsPubSubTest do
end
end

describe "handle_info/2 - {:reset, predictions}" do
test "resets the predictions", %{pid: pid} do
PredictionsPubSub.subscribe(@channel_args, pid)
send(pid, {:reset, [@prediction39]})
assert_receive {:new_predictions, [@prediction39]}
end

test "broadcasts new predictions lists to subscribers", %{pid: pid} do
PredictionsPubSub.subscribe(@channel_args, pid)
send(pid, {:reset, [@prediction66]})
send(pid, {:reset, [@prediction39]})
assert_receive {:new_predictions, [@prediction39]}
# we're not subscribed to this
refute_receive {:new_predictions, [@prediction66]}
end
end

describe "handle_info/2 - {:add, predictions}" do
test "adds the new predictions by route ID", %{pid: pid} do
PredictionsPubSub.subscribe(@channel_args, pid)
send(pid, {:add, [@prediction39]})
assert_receive {:new_predictions, [@prediction39]}
end

test "broadcasts new predictions lists to subscribers", %{pid: pid} do
PredictionsPubSub.subscribe(@channel_args, pid)

send(pid, {:add, [@prediction66]})
send(pid, {:add, [@prediction39]})

assert_receive {:new_predictions, [@prediction39]}
refute_receive {:new_predictions, [@prediction66]}
end
end

describe "handle_info/2 - :update and :remove" do
test "updates the predictions", %{pid: pid} do
modified_prediction = %{
@prediction39
| status: "Now boarding"
}

PredictionsPubSub.subscribe(@channel_args, pid)
send(pid, {:update, [modified_prediction]})

assert_receive {:new_predictions, [prediction]}

assert prediction.status == "Now boarding"
end

test "broadcasts new predictions lists to subscribers", %{pid: pid} do
PredictionsPubSub.subscribe(@channel_args, pid)

send(pid, {:update, [@prediction66]})
send(pid, {:update, [@prediction39]})

assert_receive {:new_predictions, [@prediction39]}
refute_receive {:new_predictions, [@prediction66]}
end

test "removes the given predictions and broadcasts new predictions lists to subscribers", %{
pid: pid
} do
PredictionsPubSub.subscribe(@channel_args, pid)
send(pid, {:remove, [@prediction39]})
assert_receive {:new_predictions, []}
end
end

describe "handle_info/2 - :DOWN" do
test "can observe when the caller/subscribing task is exited, and remove from state" do
{:ok, pid} =
Expand All @@ -181,11 +105,15 @@ defmodule Predictions.PredictionsPubSubTest do
p2 = spawn(fn -> true end)
p3 = spawn(fn -> true end)

:sys.replace_state(pid, fn state ->
state
|> Map.put_new(p1, "stop=1")
|> Map.put_new(p2, "stop=2")
|> Map.put_new(p3, "stop=3")
:sys.replace_state(pid, fn %{callers_by_pid: callers} = state ->
%{
state
| callers_by_pid:
callers
|> Map.put_new(p1, "stop=1")
|> Map.put_new(p2, "stop=2")
|> Map.put_new(p3, "stop=3")
}
end)

# A new caller process subscribes, adds its PID and channel name to state
Expand All @@ -196,10 +124,12 @@ defmodule Predictions.PredictionsPubSubTest do
this = self()

assert %{
^this => _filters,
^p1 => "stop=1",
^p2 => "stop=2",
^p3 => "stop=3"
callers_by_pid: %{
^this => _filters,
^p1 => "stop=1",
^p2 => "stop=2",
^p3 => "stop=3"
}
} = :sys.get_state(pid)
end,
[:monitor]
Expand All @@ -214,12 +144,12 @@ defmodule Predictions.PredictionsPubSubTest do
assert_receive {:trace, ^pid, :send, {:"$gen_call", _, {:terminate_child, _}}, _}, 2000

# The caller process is removed from the state
assert task not in Map.keys(:sys.get_state(pid))

assert %{
^p1 => "stop=1",
^p2 => "stop=2",
^p3 => "stop=3"
:callers_by_pid => %{
^p1 => "stop=1",
^p2 => "stop=2",
^p3 => "stop=3"
}
} = :sys.get_state(pid)
end
end
Expand Down
Loading

0 comments on commit f34888d

Please sign in to comment.