Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

close unnamed portal after each executed extended query #2081

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions sqlx-core/src/postgres/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ impl PgConnection {
message if message.format == MessageFormat::PortalSuspended => {
// there was an open portal
// this can happen if the last time a statement was used it was not fully executed
// such as in [fetch_one]
}

message if message.format == MessageFormat::CloseComplete => {
Expand Down Expand Up @@ -219,8 +218,7 @@ impl PgConnection {
// patch holes created during encoding
arguments.apply_patches(self, &metadata.parameters).await?;

// apply patches use fetch_optional thaht may produce `PortalSuspended` message,
// consume messages til `ReadyForQuery` before bind and execute
// consume messages till `ReadyForQuery` before bind and execute
self.wait_until_ready().await?;

// bind to attach the arguments to the statement and create a portal
Expand All @@ -239,6 +237,16 @@ impl PgConnection {
portal: None,
limit: limit.into(),
});
// From https://www.postgresql.org/docs/current/protocol-flow.html:
//
// "An unnamed portal is destroyed at the end of the transaction, or as
// soon as the next Bind statement specifying the unnamed portal as
// destination is issued. (Note that a simple Query message also
// destroys the unnamed portal."

// we ask the database server to close the unnamed portal and free the associated resources
// earlier - after the execution of the current query.
self.stream.write(message::Close::Portal(None));

// finally, [Sync] asks postgres to process the messages that we sent and respond with
// a [ReadyForQuery] message when it's completely done. Theoretically, we could send
Expand Down Expand Up @@ -271,10 +279,17 @@ impl PgConnection {
MessageFormat::BindComplete
| MessageFormat::ParseComplete
| MessageFormat::ParameterDescription
| MessageFormat::NoData => {
| MessageFormat::NoData
// unnamed portal has been closed
| MessageFormat::CloseComplete
=> {
// harmless messages to ignore
}

// "Execute phase is always terminated by the appearance of
// exactly one of these messages: CommandComplete,
// EmptyQueryResponse (if the portal was created from an
// empty query string), ErrorResponse, or PortalSuspended"
MessageFormat::CommandComplete => {
// a SQL command completed normally
let cc: CommandComplete = message.decode()?;
Expand All @@ -290,6 +305,11 @@ impl PgConnection {
// empty query string passed to an unprepared execute
}

// Message::ErrorResponse is handled in self.stream.recv()

// incomplete query execution has finished
MessageFormat::PortalSuspended => {}

MessageFormat::RowDescription => {
// indicates that a *new* set of rows are about to be returned
let (columns, column_names) = self
Expand Down
5 changes: 3 additions & 2 deletions sqlx-core/src/postgres/message/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ const CLOSE_STATEMENT: u8 = b'S';
#[allow(dead_code)]
pub enum Close {
Statement(Oid),
Portal(Oid),
// None selects the unnamed portal
Portal(Option<Oid>),
}

impl Encode<'_> for Close {
Expand All @@ -26,7 +27,7 @@ impl Encode<'_> for Close {

Close::Portal(id) => {
buf.push(CLOSE_PORTAL);
buf.put_portal_name(Some(*id));
buf.put_portal_name(*id);
}
})
}
Expand Down