Skip to content

Commit

Permalink
Merge pull request #1249 from beligante/fix-multiple-pushes-to-same-c…
Browse files Browse the repository at this point in the history
…ontext-id

Bugfix: multiple pushes per client for subscriptions that have a `context_id`
  • Loading branch information
benwilson512 authored Jun 30, 2023
2 parents 2a84558 + 97ea4dc commit 86e6c80
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 3 deletions.
3 changes: 1 addition & 2 deletions lib/absinthe/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,7 @@ defmodule Absinthe.Subscription do
pubsub
|> registry_name
|> Registry.lookup(key)
|> Enum.map(fn match ->
{_, {doc_id, doc}} = match
|> Map.new(fn {_, {doc_id, doc}} ->
doc = Map.update!(doc, :initial_phases, &PipelineSerializer.unpack/1)

{doc_id, doc}
Expand Down
39 changes: 38 additions & 1 deletion test/absinthe/execution/subscription_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ defmodule Absinthe.Execution.SubscriptionTest do
@behaviour Absinthe.Subscription.Pubsub

def start_link() do
Registry.start_link(keys: :unique, name: __MODULE__)
Registry.start_link(keys: :duplicate, name: __MODULE__)
end

def node_name() do
Expand Down Expand Up @@ -678,6 +678,43 @@ defmodule Absinthe.Execution.SubscriptionTest do
:telemetry.detach(context.test)
end

@query """
subscription {
otherUser { id }
}
"""
test "de-duplicates pushes to the same context" do
documents =
Enum.map(1..5, fn _index ->
{:ok, doc} = run_subscription(@query, Schema, context: %{context_id: "global"})
doc
end)

# assert that all documents are the same
assert [document] = Enum.dedup(documents)

Absinthe.Subscription.publish(
PubSub,
%{id: "global_user_id"},
other_user: "*"
)

topic_id = document["subscribed"]

for _i <- 1..5 do
assert_receive(
{:broadcast,
%{
event: "subscription:data",
result: %{data: %{"otherUser" => %{"id" => "global_user_id"}}},
topic: ^topic_id
}}
)
end

refute_receive({:broadcast, _})
end

defp run_subscription(query, schema, opts \\ []) do
opts = Keyword.update(opts, :context, %{pubsub: PubSub}, &Map.put(&1, :pubsub, PubSub))

Expand Down

0 comments on commit 86e6c80

Please sign in to comment.