Skip to content

Commit

Permalink
hotfix(PredictionsPubSub): update state once per fetch key
Browse files Browse the repository at this point in the history
Avoid state where two pids with the same fetch_keys are both checking state before dispatching
  • Loading branch information
thecristen committed Dec 6, 2023
1 parent f34888d commit 3ff1a28
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions apps/predictions/lib/predictions_pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,7 @@ defmodule Predictions.PredictionsPubSub do
)
|> Enum.each(fn {fetch_keys, pids} ->
new_predictions = Store.fetch(fetch_keys)

pids
|> Enum.uniq()
|> Enum.each(&send(self(), {:dispatch, &1, fetch_keys, new_predictions}))
send(self(), {:dispatch, Enum.uniq(pids), fetch_keys, new_predictions})
end)
end)

Expand All @@ -117,13 +114,13 @@ defmodule Predictions.PredictionsPubSub do
end

def handle_info(
{:dispatch, pid, fetch_keys, predictions},
{:dispatch, pids, fetch_keys, predictions},
%{
last_dispatched_by_fetch_keys: last_dispatched
} = state
) do
if Map.get(last_dispatched, fetch_keys) != predictions do
send(pid, {:new_predictions, predictions})
if not_last_dispatched?(last_dispatched, fetch_keys, predictions) do
Enum.each(pids, &send(&1, {:new_predictions, predictions}))

{:noreply,
%{
Expand Down Expand Up @@ -156,6 +153,10 @@ defmodule Predictions.PredictionsPubSub do
{:noreply, %{state | callers_by_pid: new_callers}}
end

defp not_last_dispatched?(last_dispatched, fetch_keys, predictions) do
Map.get(last_dispatched, fetch_keys) != predictions
end

# find registrations for this filter from processes other than the indicated pid
defp no_other_subscribers?(stream, pid_to_omit) do
registry_key = self()
Expand Down

0 comments on commit 3ff1a28

Please sign in to comment.