Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Postgres LISTEN interface. #98

Closed
wants to merge 9 commits into from

Conversation

thedodd
Copy link
Contributor

@thedodd thedodd commented Jan 29, 2020

This changeset introduces an interface for using PostgreSQL's LISTEN
functionality from within sqlx.

The listen interface is implemented over the PgConnection, PgPool & the
PgPoolConnection types for ease of use. In the case of PgPool, a new
connection is acquired, and is then used for LISTEN functionality.

Closes #87

todo

  • PgListener conn is dedicated to receiving notifications only, so ensure it is properly encapsulated.
  • No notification buffering, though users could implement buffering on their own if needed.
  • Use Ext traits for everything.
  • Expose only a .listen(&[&str]) interface on PgConnection, PgPoolConnection & PgPool (covers single & multi channel).
  • Isolate all of the business logic of the PgListener in a single receive method.
  • Impl reconnect behavior when created from a PgPool.
  • Impl Stream in terms of receive.
  • docs. Added an example app.

@thedodd thedodd requested review from mehcode and abonander January 29, 2020 21:04
@thedodd
Copy link
Contributor Author

thedodd commented Jan 29, 2020

Haven't written any tests for this yet. Happy to do so, but I will have to tackle that over the weekend probably.

Copy link
Collaborator

@abonander abonander left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good but a few nits.

sqlx-core/src/postgres/listen.rs Outdated Show resolved Hide resolved
sqlx-core/src/postgres/listen.rs Outdated Show resolved Hide resolved
sqlx-core/src/postgres/listen.rs Outdated Show resolved Hide resolved
sqlx-core/src/postgres/listen.rs Outdated Show resolved Hide resolved
sqlx-core/src/postgres/listen.rs Outdated Show resolved Hide resolved
@abonander
Copy link
Collaborator

Re: tests, yes please, whenever it's convenient.

@thedodd
Copy link
Contributor Author

thedodd commented Jan 30, 2020

Ok, addressed all of the review comments and also put together a quick and dirty demo on my local machine to ensure the API was working as needed. Had to update the AsMut<_> bit to DerefMut<_> as that is already implemented by all potential types of the PgListener's C type param.

Only outstanding item at this point is the discussion here: #98 (comment) about buffering of notifications. Should be quite simple to implement, I just want to get some feedback from you guys before moving forward on one of the patterns.

Cheers.

@thedodd thedodd requested a review from abonander January 30, 2020 04:58
@mehcode
Copy link
Member

mehcode commented Jan 30, 2020

For context on buffering and API design:

  • libpq (official reference impl, C) uses an internal linked that grows without bound

  • psycopg2 (Python) uses a list that it appends without bound and you receive messages asynchronously by polling

  • go/pg (Go) prevents you from using the connection apart from receiving messages and has you receive messages inline (as we are in this PR)

  • vertx-sql-client (Java) allows you to use the connection and sends messages to preregistered callback functions

  • node-postgres (Node) has you register an event handler for notifications. Its' not clear how you poll for new ones.

  • JDBC (more Java) allows you to poll a getNotifications() that returns a list of notifications since the last time you called it


My recommendation would be no buffering and prevent external usage of the connection (e.g., remove the derefs). An application could make use of their shared pool to checkout another connection to do work on a notification.

My gut says this is the recommended approach when considering high throughput. You don't want to lock up the listening connection by using it for something else. As a a parallel, you don't want to lock up the TcpListener and should immediately spawn with the newly created TcpStream object.


Here is a theoretical example:

let mut listener: PgListener = pool.listen("channel").await?;

while let Some(message) = listener.receive().await? {
  // Immediately spawn so we can receive the next message
  task::spawn(async move {
    // Handle message
  })
}

API design thoughts:

  • Keep database-specific extensions to Ext traits. Maybe some other database driver wants to use the name Pool::listen for something.

  • Keep all work inside of PgListener::receive. E.g., keep listen sync and infallible. This lets people use .listen("foo").listen("other") as a builder cleanly and additionally we should be collected channels to a vec anyway so we can implement reconnections in the future.

  • Re-connections. Not something we should block on having the feature at all but we should think how this affects the API. I would expect a dedicated PgListener type to internally handle re-connections if I make it from a Pool.

PgListener
struct PgListener { }

impl PgListener {
  // Listen to _one_ more channel
  fn listen(mut self, channel: &str) -> Self;

  // Wait for a notification
  async fn receive(&mut self) -> sqlx::Result<PgNotification<'_>>;

  // Check if there is an available notification without blocking
  fn try_receive(&mut self) -> Option<PgNotification<'_>>;

  // Close the listener and its owned connection
  async fn close(self) -> sqlx::Result<()>;
}
PgNotification
#[derive(Debug)]
struct PgNotification<'a> {
  // We should be able internally re-use a work buffer for this payload
  pub payload: &'a str,

  // Postgres process ID that sent this
  // We should probably surface the one in `PgConnection` in a later task
  pub process_id: i32,
}
PgConnectionExt
#[async_trait]
trait PgConnectionExt {
  fn listen(self, channel: &str) -> PgListener;
}

impl<C> PgConnectionExt for C where C: Connection<Backend = Postgres> { ... }
PgPoolExt
#[async_trait]
trait PgPoolExt {
  fn listen(&self, channel: &str) -> PgListener;
}

impl PgPoolExt for Pool<Postgres> { ... }

@mehcode
Copy link
Member

mehcode commented Jan 30, 2020

Awesome work so far though @thedodd 🎉 It looks pretty good 💯 . We just need to tweak some of the details I think.

@thedodd

This comment has been minimized.

@thedodd
Copy link
Contributor Author

thedodd commented Jan 30, 2020

Per some extensive discussion with @mehcode in the discord channel, here is where we landed:

  • PgListener conn is dedicated to receiving notifications only.
  • No notification buffering, though users could implement buffering on their own if needed.
  • Expose only a .listen(&[&str]) interface on PgConnection, PgPoolConnection & PgPool (covers single & multi channel).
  • Isolate all of the business logic of the PgListener in a single receive method.
  • Impl Stream in terms of receive.
  • Impl reconnect behavior when created from a PgPool.
  • Use Ext traits for everything.

@thedodd thedodd force-pushed the 87-listen-notify branch 2 times, most recently from e851b66 to 969cb40 Compare January 31, 2020 00:40
@thedodd thedodd requested a review from mehcode January 31, 2020 00:40
@thedodd thedodd force-pushed the 87-listen-notify branch 2 times, most recently from 3e28a5f to 8e7fbba Compare January 31, 2020 00:50
@thedodd thedodd requested review from mehcode and abonander and removed request for mehcode and abonander January 31, 2020 01:30
pub channel: String,
/// The payload of the notification.
pub payload: String,
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mehcode you mentioned some stuff about the structure of this guy in an earlier review. Not sure if this is what you wanted to see. If not, we could just wrap the Box<NotificationResponse> and provide accessors.

@thedodd thedodd removed the request for review from mehcode January 31, 2020 18:38
@thedodd thedodd removed the request for review from abonander January 31, 2020 18:38
@thedodd thedodd force-pushed the 87-listen-notify branch 2 times, most recently from 59b2acb to 4657c48 Compare January 31, 2020 22:49
@thedodd thedodd requested review from mehcode and abonander January 31, 2020 22:50
@mehcode mehcode added this to the 0.3 milestone Feb 1, 2020
@@ -4,2268 +4,2275 @@
name = "aho-corasick"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is all due to an update of the Cargo.lock format ... I'm happy to revert this change (which was unintentional). Not that it is actually used by consumers of this library.

@thedodd
Copy link
Contributor Author

thedodd commented Feb 10, 2020

@mehcode && @abonander just wanted to ping you guys to see if you have time for another round of review. I'm happy to start on modifications right away once review is done. I don't want to hold up 0.3 :). Also, I'm happy to chat on the discord channel if you guys want to fast-track any deeper design changes and such.

pub async fn close(self) -> BoxFuture<'static, Result<()>> {
self.connection.close()
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you all think about this, but we could wrap the connection in an Option so that we can take the connection out and spawn a task to call this close method from inside of a Drop impl. That way user's wouldn't have to worry about forgetting to call close on their own.

With the PgPoolListener that is easy (already implemented below), as it wraps the current connection in an Option for internally required reasons.

Thoughts?

@mehcode
Copy link
Member

mehcode commented Feb 11, 2020

@thedodd, Sorry for the radio silence. I'm still working on the 0.3 changes. Hopefully will be opening a PR for it soon. Don't worry about holding up the 0.3 release. We'll get there.

thedodd added 9 commits March 1, 2020 21:51
This changeset introduces an interface for using PostgreSQL's LISTEN
functionality from within sqlx.

The listen interface is implemented over the PgConnection, PgPool & the
PgPoolConnection types for ease of use. In the case of PgPool, a new
connection is acquired, and is then used for LISTEN functionality.

Closes launchbadge#87
Extension traits are now being used for PgConnection, PgPoolConnection &
PgPool for listen/notify functionality. Only two extension traits were
introduced.

Only a single trait method is present on the extension traits and it
works for single or multi channel listening setups.

Automatic reconnect behavior is implemented for PgPool based listeners.
All logic has been cut over to the `recv` impls for the PgListener
variants.

Use async-stream for a nice Stream interface.
Broke up PgListener into two types. PgListener for basic one-off
connections, and PgPoolListener for the listener created from the
PgPool.

The API is a bit more clear now with this change in terms of reconnect
behavior and the like.

Update `fn stream` to be `fn into_stream`, as that nomenclature is a bit
more normative in the Rust ecosystem.
The basic PgListener stream impl now yields `Result<PgNotification>`
elements without an `Option` in the result. The option condition
originally represented the closure of the underlying connection. Now
such conditions will terminate the stream, as one would expect. The
`PgListener.recv()` method signature has not been changed.

PgPoolListener has also been updated. The interfaces on this struct will
never yield an inner `Option` as it will instead acquire a new
connection and continue its work. Both the stream impl & the `recv`
method have received an update to their signatures.
This is being removed as it was causing undesired behavior under some
contexts.
@thedodd thedodd force-pushed the 87-listen-notify branch from 7f9b1af to 4cb0d5d Compare March 2, 2020 03:51
@thedodd
Copy link
Contributor Author

thedodd commented Mar 16, 2020

@mehcode want me to go ahead and rebase this? I remember you saying something about potentially already having done this while you were testing things out. I'm happy to rebase.

@mehcode
Copy link
Member

mehcode commented Mar 16, 2020

@thedodd No worries. I'll open a PR with the rebase plus a few ergonomic changes soon and we can discuss there.

@mehcode
Copy link
Member

mehcode commented Mar 17, 2020

This was merged in via #131

@mehcode mehcode closed this Mar 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Postgres specific LISTEN/NOTIFY support.
3 participants