Skip to content

Commit

Permalink
perf: box MySqlConnection to reduce sizes of futures (#3265)
Browse files Browse the repository at this point in the history
  • Loading branch information
stepantubanov authored Jun 6, 2024
1 parent 4d9f67b commit 5da0f73
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 60 deletions.
12 changes: 7 additions & 5 deletions sqlx-mysql/src/connection/establish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures_core::future::BoxFuture;

use crate::collation::{CharSet, Collation};
use crate::common::StatementCache;
use crate::connection::{tls, MySqlStream, MAX_PACKET_SIZE};
use crate::connection::{tls, MySqlConnectionInner, MySqlStream, MAX_PACKET_SIZE};
use crate::error::Error;
use crate::net::{Socket, WithSocket};
use crate::protocol::connect::{
Expand All @@ -25,10 +25,12 @@ impl MySqlConnection {
let stream = handshake.await?;

Ok(Self {
stream,
transaction_depth: 0,
cache_statement: StatementCache::new(options.statement_cache_capacity),
log_settings: options.log_settings.clone(),
inner: Box::new(MySqlConnectionInner {
stream,
transaction_depth: 0,
cache_statement: StatementCache::new(options.statement_cache_capacity),
log_settings: options.log_settings.clone(),
}),
})
}
}
Expand Down
76 changes: 46 additions & 30 deletions sqlx-mysql/src/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,22 @@ impl MySqlConnection {
// https://dev.mysql.com/doc/internals/en/com-stmt-prepare.html
// https://dev.mysql.com/doc/internals/en/com-stmt-prepare-response.html#packet-COM_STMT_PREPARE_OK

self.stream.send_packet(Prepare { query: sql }).await?;
self.inner
.stream
.send_packet(Prepare { query: sql })
.await?;

let ok: PrepareOk = self.stream.recv().await?;
let ok: PrepareOk = self.inner.stream.recv().await?;

// the parameter definitions are very unreliable so we skip over them
// as we have little use

if ok.params > 0 {
for _ in 0..ok.params {
let _def: ColumnDefinition = self.stream.recv().await?;
let _def: ColumnDefinition = self.inner.stream.recv().await?;
}

self.stream.maybe_recv_eof().await?;
self.inner.stream.maybe_recv_eof().await?;
}

// the column definitions are berefit the type information from the
Expand All @@ -54,7 +57,7 @@ impl MySqlConnection {
let mut columns = Vec::new();

let column_names = if ok.columns > 0 {
recv_result_metadata(&mut self.stream, ok.columns as usize, &mut columns).await?
recv_result_metadata(&mut self.inner.stream, ok.columns as usize, &mut columns).await?
} else {
Default::default()
};
Expand All @@ -73,16 +76,23 @@ impl MySqlConnection {
&mut self,
sql: &str,
) -> Result<(u32, MySqlStatementMetadata), Error> {
if let Some(statement) = self.cache_statement.get_mut(sql) {
if let Some(statement) = self.inner.cache_statement.get_mut(sql) {
// <MySqlStatementMetadata> is internally reference-counted
return Ok((*statement).clone());
}

let (id, metadata) = self.prepare_statement(sql).await?;

// in case of the cache being full, close the least recently used statement
if let Some((id, _)) = self.cache_statement.insert(sql, (id, metadata.clone())) {
self.stream.send_packet(StmtClose { statement: id }).await?;
if let Some((id, _)) = self
.inner
.cache_statement
.insert(sql, (id, metadata.clone()))
{
self.inner
.stream
.send_packet(StmtClose { statement: id })
.await?;
}

Ok((id, metadata))
Expand All @@ -96,10 +106,10 @@ impl MySqlConnection {
persistent: bool,
) -> Result<impl Stream<Item = Result<Either<MySqlQueryResult, MySqlRow>, Error>> + 'e, Error>
{
let mut logger = QueryLogger::new(sql, self.log_settings.clone());
let mut logger = QueryLogger::new(sql, self.inner.log_settings.clone());

self.stream.wait_until_ready().await?;
self.stream.waiting.push_back(Waiting::Result);
self.inner.stream.wait_until_ready().await?;
self.inner.stream.waiting.push_back(Waiting::Result);

Ok(Box::pin(try_stream! {
// make a slot for the shared column data
Expand All @@ -108,13 +118,13 @@ impl MySqlConnection {
let mut columns = Arc::new(Vec::new());

let (mut column_names, format, mut needs_metadata) = if let Some(arguments) = arguments {
if persistent && self.cache_statement.is_enabled() {
if persistent && self.inner.cache_statement.is_enabled() {
let (id, metadata) = self
.get_or_prepare_statement(sql)
.await?;

// https://dev.mysql.com/doc/internals/en/com-stmt-execute.html
self.stream
self.inner.stream
.send_packet(StatementExecute {
statement: id,
arguments: &arguments,
Expand All @@ -128,28 +138,28 @@ impl MySqlConnection {
.await?;

// https://dev.mysql.com/doc/internals/en/com-stmt-execute.html
self.stream
self.inner.stream
.send_packet(StatementExecute {
statement: id,
arguments: &arguments,
})
.await?;

self.stream.send_packet(StmtClose { statement: id }).await?;
self.inner.stream.send_packet(StmtClose { statement: id }).await?;

(metadata.column_names, MySqlValueFormat::Binary, false)
}
} else {
// https://dev.mysql.com/doc/internals/en/com-query.html
self.stream.send_packet(Query(sql)).await?;
self.inner.stream.send_packet(Query(sql)).await?;

(Arc::default(), MySqlValueFormat::Text, true)
};

loop {
// query response is a meta-packet which may be one of:
// Ok, Err, ResultSet, or (unhandled) LocalInfileRequest
let mut packet = self.stream.recv_packet().await?;
let mut packet = self.inner.stream.recv_packet().await?;

if packet[0] == 0x00 || packet[0] == 0xff {
// first packet in a query response is OK or ERR
Expand All @@ -170,31 +180,31 @@ impl MySqlConnection {
continue;
}

self.stream.waiting.pop_front();
self.inner.stream.waiting.pop_front();
return Ok(());
}

// otherwise, this first packet is the start of the result-set metadata,
*self.stream.waiting.front_mut().unwrap() = Waiting::Row;
*self.inner.stream.waiting.front_mut().unwrap() = Waiting::Row;

let num_columns = packet.get_uint_lenenc() as usize; // column count

if needs_metadata {
column_names = Arc::new(recv_result_metadata(&mut self.stream, num_columns, Arc::make_mut(&mut columns)).await?);
column_names = Arc::new(recv_result_metadata(&mut self.inner.stream, num_columns, Arc::make_mut(&mut columns)).await?);
} else {
// next time we hit here, it'll be a new result set and we'll need the
// full metadata
needs_metadata = true;

recv_result_columns(&mut self.stream, num_columns, Arc::make_mut(&mut columns)).await?;
recv_result_columns(&mut self.inner.stream, num_columns, Arc::make_mut(&mut columns)).await?;
}

// finally, there will be none or many result-rows
loop {
let packet = self.stream.recv_packet().await?;
let packet = self.inner.stream.recv_packet().await?;

if packet[0] == 0xfe && packet.len() < 9 {
let eof = packet.eof(self.stream.capabilities)?;
let eof = packet.eof(self.inner.stream.capabilities)?;

r#yield!(Either::Left(MySqlQueryResult {
rows_affected: 0,
Expand All @@ -203,11 +213,11 @@ impl MySqlConnection {

if eof.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
// more result sets exist, continue to the next one
*self.stream.waiting.front_mut().unwrap() = Waiting::Result;
*self.inner.stream.waiting.front_mut().unwrap() = Waiting::Result;
break;
}

self.stream.waiting.pop_front();
self.inner.stream.waiting.pop_front();
return Ok(());
}

Expand Down Expand Up @@ -290,14 +300,17 @@ impl<'c> Executor<'c> for &'c mut MySqlConnection {
'c: 'e,
{
Box::pin(async move {
self.stream.wait_until_ready().await?;
self.inner.stream.wait_until_ready().await?;

let metadata = if self.cache_statement.is_enabled() {
let metadata = if self.inner.cache_statement.is_enabled() {
self.get_or_prepare_statement(sql).await?.1
} else {
let (id, metadata) = self.prepare_statement(sql).await?;

self.stream.send_packet(StmtClose { statement: id }).await?;
self.inner
.stream
.send_packet(StmtClose { statement: id })
.await?;

metadata
};
Expand All @@ -316,11 +329,14 @@ impl<'c> Executor<'c> for &'c mut MySqlConnection {
'c: 'e,
{
Box::pin(async move {
self.stream.wait_until_ready().await?;
self.inner.stream.wait_until_ready().await?;

let (id, metadata) = self.prepare_statement(sql).await?;

self.stream.send_packet(StmtClose { statement: id }).await?;
self.inner
.stream
.send_packet(StmtClose { statement: id })
.await?;

let columns = (&*metadata.columns).clone();

Expand Down
29 changes: 17 additions & 12 deletions sqlx-mysql/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ const MAX_PACKET_SIZE: u32 = 1024;

/// A connection to a MySQL database.
pub struct MySqlConnection {
pub(crate) inner: Box<MySqlConnectionInner>,
}

pub(crate) struct MySqlConnectionInner {
// underlying TCP stream,
// wrapped in a potentially TLS stream,
// wrapped in a buffered stream
Expand Down Expand Up @@ -50,43 +54,44 @@ impl Connection for MySqlConnection {

fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
Box::pin(async move {
self.stream.send_packet(Quit).await?;
self.stream.shutdown().await?;
self.inner.stream.send_packet(Quit).await?;
self.inner.stream.shutdown().await?;

Ok(())
})
}

fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
Box::pin(async move {
self.stream.shutdown().await?;
self.inner.stream.shutdown().await?;
Ok(())
})
}

fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
self.stream.wait_until_ready().await?;
self.stream.send_packet(Ping).await?;
self.stream.recv_ok().await?;
self.inner.stream.wait_until_ready().await?;
self.inner.stream.send_packet(Ping).await?;
self.inner.stream.recv_ok().await?;

Ok(())
})
}

#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
self.stream.wait_until_ready().boxed()
self.inner.stream.wait_until_ready().boxed()
}

fn cached_statements_size(&self) -> usize {
self.cache_statement.len()
self.inner.cache_statement.len()
}

fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
while let Some((statement_id, _)) = self.cache_statement.remove_lru() {
self.stream
while let Some((statement_id, _)) = self.inner.cache_statement.remove_lru() {
self.inner
.stream
.send_packet(StmtClose {
statement: statement_id,
})
Expand All @@ -99,7 +104,7 @@ impl Connection for MySqlConnection {

#[doc(hidden)]
fn should_flush(&self) -> bool {
!self.stream.write_buffer().is_empty()
!self.inner.stream.write_buffer().is_empty()
}

fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
Expand All @@ -110,6 +115,6 @@ impl Connection for MySqlConnection {
}

fn shrink_buffers(&mut self) {
self.stream.shrink_buffers();
self.inner.stream.shrink_buffers();
}
}
4 changes: 2 additions & 2 deletions sqlx-mysql/src/options/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ impl ConnectOptions for MySqlConnectOptions {
if self.set_names {
options.push(format!(
r#"NAMES {} COLLATE {}"#,
conn.stream.charset.as_str(),
conn.stream.collation.as_str()
conn.inner.stream.charset.as_str(),
conn.inner.stream.collation.as_str()
))
}

Expand Down
23 changes: 12 additions & 11 deletions sqlx-mysql/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@ impl TransactionManager for MySqlTransactionManager {

fn begin(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let depth = conn.transaction_depth;
let depth = conn.inner.transaction_depth;

conn.execute(&*begin_ansi_transaction_sql(depth)).await?;
conn.transaction_depth = depth + 1;
conn.inner.transaction_depth = depth + 1;

Ok(())
})
}

fn commit(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let depth = conn.transaction_depth;
let depth = conn.inner.transaction_depth;

if depth > 0 {
conn.execute(&*commit_ansi_transaction_sql(depth)).await?;
conn.transaction_depth = depth - 1;
conn.inner.transaction_depth = depth - 1;
}

Ok(())
Expand All @@ -40,27 +40,28 @@ impl TransactionManager for MySqlTransactionManager {

fn rollback(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let depth = conn.transaction_depth;
let depth = conn.inner.transaction_depth;

if depth > 0 {
conn.execute(&*rollback_ansi_transaction_sql(depth)).await?;
conn.transaction_depth = depth - 1;
conn.inner.transaction_depth = depth - 1;
}

Ok(())
})
}

fn start_rollback(conn: &mut MySqlConnection) {
let depth = conn.transaction_depth;
let depth = conn.inner.transaction_depth;

if depth > 0 {
conn.stream.waiting.push_back(Waiting::Result);
conn.stream.sequence_id = 0;
conn.stream
conn.inner.stream.waiting.push_back(Waiting::Result);
conn.inner.stream.sequence_id = 0;
conn.inner
.stream
.write_packet(Query(&*rollback_ansi_transaction_sql(depth)));

conn.transaction_depth = depth - 1;
conn.inner.transaction_depth = depth - 1;
}
}
}

0 comments on commit 5da0f73

Please sign in to comment.