Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PgListener getting messages from channels it's not subscribed to #612

Closed
arlyon opened this issue Aug 5, 2020 · 9 comments · Fixed by #1100
Closed

PgListener getting messages from channels it's not subscribed to #612

arlyon opened this issue Aug 5, 2020 · 9 comments · Fixed by #1100
Labels
bug db:postgres Related to PostgreSQL

Comments

@arlyon
Copy link
Contributor

arlyon commented Aug 5, 2020

I have a graphql endpoint with a subscription:

#[async_graphql::Subscription]
impl Subscription {
    async fn scale(&self, ctx: &Context<'_>, id: Uuid) -> impl Stream<Item=FieldResult<UncommittedStockReading>> {
        let pool = ctx.data::<PgPool>();
        let mut listener = PgListener::from_pool(pool).await.unwrap();
        log::info!("listening for updates on : {}", id);
        listener.listen(&id.to_string()).await.unwrap();

        listener.into_stream().map(move |result| {
            let notification = result.unwrap();
            log::info!("notification: {}", notification.payload());
            serde_json::from_str(notification.payload()).map_err(|e| e.into())
        })
    }
}

When a subscription request is created, a listener is created from the pool, listening to events on id channel. Interestingly, in some cases, a listener can accidentally receive events from a channel it's not subscribed to:

[2020-08-05T19:42:45Z INFO  server::schemas::subscription] listening for updates on : 4dadd140-23e8-4f86-af19-c32ec5fea80f
[2020-08-05T19:42:45Z INFO  server::schemas::subscription] listening for updates on : 45cd1f01-3281-4255-a247-b1e497f40da6
[2020-08-05T19:42:48Z INFO  server::schemas::mutation] Sending notify: NOTIFY "4dadd140-23e8-4f86-af19-c32ec5fea80f", '{"stock":{"id":"5ea464c8-fb78-43e6-ad8d-2c7586c90205","name":"M5 Screw","description":"This is my M5 Screw","warehouse_id":"7e837ef3-222c-441b-8ab3-7227969b11f4","unit_weight":1.23},"weight":3.0}';
[2020-08-05T19:42:48Z INFO  server::schemas::subscription] notification: {"stock":{"id":"5ea464c8-fb78-43e6-ad8d-2c7586c90205","name":"M5 Screw","description":"This is my M5 Screw","warehouse_id":"7e837ef3-222c-441b-8ab3-7227969b11f4","unit_weight":1.23},"weight":3.0}
[2020-08-05T19:42:48Z INFO  server::schemas::subscription] notification: {"stock":{"id":"5ea464c8-fb78-43e6-ad8d-2c7586c90205","name":"M5 Screw","description":"This is my M5 Screw","warehouse_id":"7e837ef3-222c-441b-8ab3-7227969b11f4","unit_weight":1.23},"weight":3.0}
[2020-08-05T19:42:48Z INFO  warp::filters::log] 192.168.1.10:54822 "POST /graphql HTTP/1.1" 200 "-" "ESP32HTTPClient" 36.407945ms
[2020-08-05T19:42:49Z INFO  server::schemas::mutation] Sending notify: NOTIFY "4dadd140-23e8-4f86-af19-c32ec5fea80f", '{"stock":{"id":"5ea464c8-fb78-43e6-ad8d-2c7586c90205","name":"M5 Screw","description":"This is my M5 Screw","warehouse_id":"7e837ef3-222c-441b-8ab3-7227969b11f4","unit_weight":1.23},"weight":3.0}';
[2020-08-05T19:42:49Z INFO  server::schemas::subscription] notification: {"stock":{"id":"5ea464c8-fb78-43e6-ad8d-2c7586c90205","name":"M5 Screw","description":"This is my M5 Screw","warehouse_id":"7e837ef3-222c-441b-8ab3-7227969b11f4","unit_weight":1.23},"weight":3.0}
[2020-08-05T19:42:49Z INFO  server::schemas::subscription] notification: {"stock":{"id":"5ea464c8-fb78-43e6-ad8d-2c7586c90205","name":"M5 Screw","description":"This is my M5 Screw","warehouse_id":"7e837ef3-222c-441b-8ab3-7227969b11f4","unit_weight":1.23},"weight":3.0}

It doesn't happen consistently, and I wish I could provide some more details. Is there anything that jumps out as immediately suspicious?

@mehcode
Copy link
Member

mehcode commented Aug 8, 2020

Can you try making PgListener from a URL and not from a pool and see if the problem still occurs?


We may be getting a "used" connection that already had LISTEN run on it. That means the from_pool function is buggy and shouldn't be used if you have multiple PgListeners.

@mehcode mehcode added bug db:postgres Related to PostgreSQL labels Aug 8, 2020
@arlyon
Copy link
Contributor Author

arlyon commented Aug 9, 2020

Hi thanks for the reply. After some more testing, I think that that is exactly the issue. Moreso, I think that connections in the pool aren't unsubscribing properly once the listener is dropped. I am using a low number of connections (8 specifically) and 'not happening consistently' is really more like 'absolutely happening consistently' as the probability of getting an already-listening connection goes up.

Creating a new pool each time sidesteps the issue.

@abonander
Copy link
Collaborator

abonander commented Aug 18, 2020

So PgListener on-drop needs to spawn a task to run UNLISTEN *; on the connection. Sounds like an easy enough fix.

@arlyon
Copy link
Contributor Author

arlyon commented Aug 20, 2020

I had considered doing that but it's not clear how to fire off an async task in the drop method without blocking. Spawning a task requires we block until the task is executed otherwise the future outlives the &mut self. I saw some talk about async drop (see here: https://internals.rust-lang.org/t/asynchronous-destructors/11127) but nothing close to stable.

@NyxCode
Copy link

NyxCode commented Aug 21, 2020

In the meantime, we could do something like this:

struct PgListener(Option<InternalPgListener>);
impl Drop for PgListener {
    fn drop(&mut self) {
        let listener = self.0.take();
        tokio::spawn(listener.async_drop());
    }
}

@abonander
Copy link
Collaborator

Something that would not necessarily be an optimal fix but would address the issue is to just close the connection on-drop. If the connection is part of a pool it'll allow a task waiting on acquire() to wake up and open a new connection.

@NyxCode
Copy link

NyxCode commented Oct 4, 2020

Another suggestion: Could this be solved by using one of the hooks in PoolOptions to run UNLISTEN *?

@arlyon
Copy link
Contributor Author

arlyon commented Oct 6, 2020

Something along the same lines as @NyxCode's suggestion above is to have the connection notify the pool that the unsub needs to take place if it is dropped while subscribed, and then the pool may unsubscribe when that connection is aquired by something else.

@abonander
Copy link
Collaborator

abonander commented Oct 19, 2020

A simple fix actually would be to just push a UNLISTEN *; command to the connection's outgoing buffer on-drop; we don't have to wait for the message to be sent, it'll go through the next time the buffer is flushed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug db:postgres Related to PostgreSQL
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants