Skip to content

Commit

Permalink
feat: add Connection::shrink_buffers, PoolConnection::close
Browse files Browse the repository at this point in the history
closes #2372
  • Loading branch information
abonander committed Mar 4, 2023
1 parent 728280a commit c4b835c
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 21 deletions.
5 changes: 5 additions & 0 deletions sqlx-core/src/any/connection/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ pub trait AnyConnectionBackend: std::any::Any + Debug + Send + 'static {
Box::pin(async move { Ok(()) })
}

/// Forward to [`Connection::shrink_buffers()`].
///
/// [`Connection::shrink_buffers()`]: method@crate::connection::Connection::shrink_buffers
fn shrink_buffers(&mut self);

#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, crate::Result<()>>;

Expand Down
4 changes: 4 additions & 0 deletions sqlx-core/src/any/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ impl Connection for AnyConnection {
self.backend.clear_cached_statements()
}

fn shrink_buffers(&mut self) {
self.backend.shrink_buffers()
}

#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
self.backend.flush()
Expand Down
14 changes: 14 additions & 0 deletions sqlx-core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,20 @@ pub trait Connection: Send {
Box::pin(async move { Ok(()) })
}

/// Restore any buffers in the connection to their default capacity, if possible.
///
/// Sending a large query or receiving a resultset with many columns can cause the connection
/// to allocate additional buffer space to fit the data which is retained afterwards in
/// case it's needed again. This can give the outward appearance of a memory leak, but is
/// in fact the intended behavior.
///
/// Calling this method tells the connection to release that excess memory if it can,
/// though be aware that calling this too often can cause unnecessary thrashing or
/// fragmentation in the global allocator. If there's still data in the connection buffers
/// (unlikely if the last query was run to completion) then it may need to be moved to
/// allow the buffers to shrink.
fn shrink_buffers(&mut self);

#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>>;

Expand Down
87 changes: 66 additions & 21 deletions sqlx-core/src/net/socket/buffered.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::net::Socket;
use bytes::BytesMut;
use std::io;
use std::{cmp, io};

use crate::error::Error;

Expand Down Expand Up @@ -46,26 +46,7 @@ impl<S: Socket> BufferedSocket<S> {
}

pub async fn read_buffered(&mut self, len: usize) -> io::Result<BytesMut> {
while self.read_buf.read.len() < len {
self.read_buf.reserve(len);

let read = self.socket.read(&mut self.read_buf.available).await?;

if read == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!(
"expected to read {} bytes, got {} bytes at EOF",
len,
self.read_buf.read.len()
),
));
}

self.read_buf.advance(read);
}

Ok(self.read_buf.drain(len))
self.read_buf.read(len, &mut self.socket).await
}

pub fn write_buffer(&self) -> &WriteBuffer {
Expand Down Expand Up @@ -123,6 +104,12 @@ impl<S: Socket> BufferedSocket<S> {
self.socket.shutdown().await
}

pub fn shrink_buffers(&mut self) {
// Won't drop data still in the buffer.
self.write_buf.shrink();
self.read_buf.shrink();
}

pub fn into_inner(self) -> S {
self.socket
}
Expand Down Expand Up @@ -197,6 +184,22 @@ impl WriteBuffer {
&mut self.buf[self.bytes_flushed..self.bytes_written]
}

pub fn shrink(&mut self) {
if self.bytes_flushed > 0 {
// Move any data that remains to be flushed to the beginning of the buffer,
// if necessary.
self.buf
.copy_within(self.bytes_flushed..self.bytes_written, 0);
self.bytes_written -= self.bytes_flushed;
self.bytes_flushed = 0
}

// Drop excess capacity.
self.buf
.truncate(cmp::max(self.bytes_written, DEFAULT_BUF_SIZE));
self.buf.shrink_to_fit();
}

fn consume(&mut self, amt: usize) {
let new_bytes_flushed = self
.bytes_flushed
Expand All @@ -218,6 +221,31 @@ impl WriteBuffer {
}

impl ReadBuffer {
async fn read(&mut self, len: usize, socket: &mut impl Socket) -> io::Result<BytesMut> {
// Because of how `BytesMut` works, we should only be shifting capacity back and forth
// between `read` and `available` unless we have to read an oversize message.
while self.read.len() < len {
self.reserve(len - self.read.len());

let read = socket.read(&mut self.available).await?;

if read == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!(
"expected to read {} bytes, got {} bytes at EOF",
len,
self.read.len()
),
));
}

self.advance(read);
}

Ok(self.drain(len))
}

fn reserve(&mut self, amt: usize) {
if let Some(additional) = amt.checked_sub(self.available.capacity()) {
self.available.reserve(additional);
Expand All @@ -231,4 +259,21 @@ impl ReadBuffer {
fn drain(&mut self, amt: usize) -> BytesMut {
self.read.split_to(amt)
}

fn shrink(&mut self) {
if self.available.capacity() > DEFAULT_BUF_SIZE {
// `BytesMut` doesn't have a way to shrink its capacity,
// but we only use `available` for spare capacity anyway so we can just replace it.
//
// If `self.read` still contains data on the next call to `advance` then this might
// force a memcpy as they'll no longer be pointing to the same allocation,
// but that's kind of unavoidable.
//
// The `async-std` impl of `Socket` will also need to re-zero the buffer,
// but that's also kind of unavoidable.
//
// We should be warning the user not to call this often.
self.available = BytesMut::with_capacity(DEFAULT_BUF_SIZE);
}
}
}
12 changes: 12 additions & 0 deletions sqlx-core/src/pool/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ impl<DB: Database> AsMut<DB::Connection> for PoolConnection<DB> {
}

impl<DB: Database> PoolConnection<DB> {
/// Close this connection, allowing the pool to open a replacement.
///
/// Equivalent to calling [`.detach()`] then [`.close()`], but the connection permit is retained
/// for the duration so that the pool may not exceed `max_connections`.
///
/// [`.detach()`]: PoolConnection::detach
/// [`.close()`]: Connection::close
pub async fn close(mut self) -> Result<(), Error> {
let floating = self.take_live().float(self.pool.clone());
floating.inner.raw.close().await
}

/// Detach this connection from the pool, allowing it to open a replacement.
///
/// Note that if your application uses a single shared pool, this
Expand Down
4 changes: 4 additions & 0 deletions sqlx-mysql/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ impl AnyConnectionBackend for MySqlConnection {
MySqlTransactionManager::start_rollback(self)
}

fn shrink_buffers(&mut self) {
Connection::shrink_buffers(self);
}

fn flush(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
Connection::flush(self)
}
Expand Down
4 changes: 4 additions & 0 deletions sqlx-mysql/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,8 @@ impl Connection for MySqlConnection {
{
Transaction::begin(self)
}

fn shrink_buffers(&mut self) {
self.stream.shrink_buffers();
}
}
4 changes: 4 additions & 0 deletions sqlx-postgres/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ impl AnyConnectionBackend for PgConnection {
PgTransactionManager::start_rollback(self)
}

fn shrink_buffers(&mut self) {
Connection::shrink_buffers(self);
}

fn flush(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
Connection::flush(self)
}
Expand Down
4 changes: 4 additions & 0 deletions sqlx-postgres/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ impl Connection for PgConnection {
})
}

fn shrink_buffers(&mut self) {
self.stream.shrink_buffers();
}

#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
self.wait_until_ready().boxed()
Expand Down
4 changes: 4 additions & 0 deletions sqlx-sqlite/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ impl AnyConnectionBackend for SqliteConnection {
SqliteTransactionManager::start_rollback(self)
}

fn shrink_buffers(&mut self) {
// NO-OP.
}

fn flush(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
Connection::flush(self)
}
Expand Down
5 changes: 5 additions & 0 deletions sqlx-sqlite/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ impl Connection for SqliteConnection {
})
}

#[inline]
fn shrink_buffers(&mut self) {
// No-op.
}

#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
// For SQLite, FLUSH does effectively nothing...
Expand Down
31 changes: 31 additions & 0 deletions tests/mysql/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,3 +446,34 @@ async fn it_can_work_with_transactions() -> anyhow::Result<()> {

Ok(())
}

#[sqlx_macros::test]
async fn test_shrink_buffers() -> anyhow::Result<()> {
// We don't really have a good way to test that `.shrink_buffers()` functions as expected
// without exposing a lot of internals, but we can at least be sure it doesn't
// materially affect the operation of the connection.

let mut conn = new::<MySql>().await?;

// The connection buffer is only 8 KiB by default so this should definitely force it to grow.
let data = "This string should be 32 bytes!\n".repeat(1024);
assert_eq!(data.len(), 32 * 1024);

let ret: String = sqlx::query_scalar("SELECT ?")
.bind(&data)
.fetch_one(&mut conn)
.await?;

assert_eq!(ret, data);

conn.shrink_buffers();

let ret: i64 = sqlx::query_scalar("SELECT ?")
.bind(&12345678i64)
.fetch_one(&mut conn)
.await?;

assert_eq!(ret, 12345678i64);

Ok(())
}
30 changes: 30 additions & 0 deletions tests/postgres/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1789,3 +1789,33 @@ async fn test_postgres_bytea_hex_deserialization_errors() -> anyhow::Result<()>
}
Ok(())
}

#[sqlx_macros::test]
async fn test_shrink_buffers() -> anyhow::Result<()> {
// We don't really have a good way to test that `.shrink_buffers()` functions as expected
// without exposing a lot of internals, but we can at least be sure it doesn't
// materially affect the operation of the connection.

let mut conn = new::<Postgres>().await?;

// The connection buffer is only 8 KiB by default so this should definitely force it to grow.
let data = vec![0u8; 32 * 1024];

let ret: Vec<u8> = sqlx::query_scalar("SELECT $1::bytea")
.bind(&data)
.fetch_one(&mut conn)
.await?;

assert_eq!(ret, data);

conn.shrink_buffers();

let ret: i64 = sqlx::query_scalar("SELECT $1::int8")
.bind(&12345678i64)
.fetch_one(&mut conn)
.await?;

assert_eq!(ret, 12345678i64);

Ok(())
}

0 comments on commit c4b835c

Please sign in to comment.