Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the wtx crate #3934

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion diesel_bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ futures = {version = "0.3", optional = true}
diesel-async = {version = "0.4", optional = true, default-features = false}
criterion-perf-events = { version = "0.4", optional = true}
perfcnt = {version = "0.8", optional = true}
wtx = { default-features = false, features = ["atoi", "postgres", "simdutf8", "std", "tokio"], optional = true, version = "0.12" }
wtx = { default-features = false, features = ["atoi", "memchr", "postgres", "simdutf8", "std", "tokio"], optional = true, version = "0.14" }

[dependencies.diesel]
path = "../diesel"
Expand Down
115 changes: 45 additions & 70 deletions diesel_bench/benches/wtx.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use crate::Bencher;
use std::collections::HashMap;
use std::fmt::Write;
use std::{collections::HashMap, fmt::Write};
use tokio::{net::TcpStream, runtime::Runtime};
use wtx::{
database::{
client::postgres::{Config, Executor, ExecutorBuffer},
Encode, Executor as _, Record as _,
Executor as _, Record as _,
},
misc::UriPartsRef,
misc::{Either, UriRef},
rng::StdRng,
};

Expand Down Expand Up @@ -85,21 +84,15 @@ pub fn bench_loading_associations_sequentially(b: &mut Bencher) {

let mut posts_query =
String::from("SELECT id, title, user_id, body FROM posts WHERE user_id IN(");
let mut users_ids = Vec::with_capacity(LEN);
concat(
users.iter().enumerate(),
&mut posts_query,
|local_str, (idx, &User { id, .. })| {
local_str.write_fmt(format_args!("${}", idx + 1)).unwrap();
users_ids.push(id);
},
);
posts_query += ")";
concat(0..users.len(), &mut posts_query, |local_str, idx| {
local_str.write_fmt(format_args!("${}", idx + 1)).unwrap();
});
posts_query.push(')');

let mut posts = Vec::with_capacity(LEN);
conn.fetch_many_with_stmt::<wtx::Error, _, _>(
conn.fetch_many_with_stmt(
posts_query.as_str(),
users_ids.as_slice(),
&mut users.iter().map(|user| user.id),
|record| {
posts.push(Post {
body: record.decode_opt(3).unwrap(),
Expand All @@ -115,21 +108,15 @@ pub fn bench_loading_associations_sequentially(b: &mut Bencher) {

let mut comments_query =
String::from("SELECT id, post_id, text FROM comments WHERE post_id IN(");
let mut posts_ids = Vec::with_capacity(LEN);
concat(
posts.iter().enumerate(),
&mut comments_query,
|local_str, (idx, &Post { id, .. })| {
local_str.write_fmt(format_args!("${}", idx + 1)).unwrap();
posts_ids.push(id);
},
);
comments_query += ")";
concat(0..posts.len(), &mut comments_query, |local_str, idx| {
local_str.write_fmt(format_args!("${}", idx + 1)).unwrap();
});
comments_query.push(')');

let mut comments = Vec::with_capacity(LEN);
conn.fetch_many_with_stmt::<wtx::Error, _, _>(
conn.fetch_many_with_stmt(
comments_query.as_str(),
posts_ids.as_slice(),
&mut posts.iter().map(|post| post.id),
|record| {
comments.push(Comment {
id: record.decode(0).unwrap(),
Expand Down Expand Up @@ -197,7 +184,7 @@ pub fn bench_medium_complex_query(b: &mut Bencher, size: usize) {
b.iter(|| {
runtime.block_on(async {
let mut _rslt = Vec::with_capacity(size);
conn.fetch_many_with_stmt::<wtx::Error, _, _>(stmt_hash, ("black",), |record| {
conn.fetch_many_with_stmt(stmt_hash, ("black",), |record| {
let user = User {
id: record.decode(0).unwrap(),
name: record.decode(1).unwrap(),
Expand Down Expand Up @@ -237,8 +224,8 @@ pub fn bench_trivial_query(b: &mut Bencher, size: usize) {
(conn, stmt_hash)
});
b.iter(|| {
let mut users = Vec::with_capacity(size);
runtime.block_on(async {
let mut users = Vec::with_capacity(size);
conn.fetch_many_with_stmt(stmt_hash, (), |record| {
users.push(User {
id: record.decode(0).unwrap(),
Expand Down Expand Up @@ -266,34 +253,31 @@ where
}
}

async fn connection() -> Executor<ExecutorBuffer, TcpStream> {
async fn connection() -> Executor<wtx::Error, ExecutorBuffer, TcpStream> {
dotenvy::dotenv().ok();
let url = dotenvy::var("POSTGRES_DATABASE_URL")
.or_else(|_| dotenvy::var("DATABASE_URL"))
.expect("DATABASE_URL must be set in order to run tests");
let up = UriPartsRef::new(url.as_str());
let uri = UriRef::new(url.as_str());
let mut rng = StdRng::default();
let mut conn = Executor::connect(
&Config::from_uri_parts(&up).unwrap(),
&Config::from_uri(&uri).unwrap(),
ExecutorBuffer::with_default_params(&mut rng),
&mut rng,
TcpStream::connect(up.host()).await.unwrap(),
TcpStream::connect(uri.host()).await.unwrap(),
)
.await
.unwrap();
conn.execute(
"TRUNCATE TABLE comments CASCADE;TRUNCATE TABLE posts CASCADE;TRUNCATE TABLE users CASCADE",
|_| {},
)
.await
.unwrap();
conn.execute("TRUNCATE TABLE comments CASCADE", |_| {})
.await
.unwrap();
conn.execute("TRUNCATE TABLE posts CASCADE", |_| {})
.await
.unwrap();
conn.execute("TRUNCATE TABLE users CASCADE", |_| {})
.await
.unwrap();
conn
}

async fn insert_posts<const N: usize>(conn: &mut Executor<ExecutorBuffer, TcpStream>) {
async fn insert_posts<const N: usize>(conn: &mut Executor<wtx::Error, ExecutorBuffer, TcpStream>) {
let mut users_ids = Vec::with_capacity(N);
conn.fetch_many_with_stmt("SELECT id FROM users", (), |record| {
users_ids.push(record.decode(0).unwrap());
Expand All @@ -305,11 +289,17 @@ async fn insert_posts<const N: usize>(conn: &mut Executor<ExecutorBuffer, TcpStr
let params = users_ids
.into_iter()
.flat_map(|user_id| {
(0..10).map(move |idx| (format!("Post {idx} by user {user_id}"), user_id, None))
(0..10).map(move |idx| {
[
Either::Left(format!("Post {idx} by user {user_id}")),
Either::Right(Either::Left(user_id)),
Either::Right(Either::Right(None as Option<&'static str>)),
]
})
})
.collect::<Vec<_>>();

let mut insert_stmt = String::from("INSERT INTO posts(title, user_id, body) VALUES");
let mut insert_stmt = String::from("INSERT INTO posts(title, user_id, body) VALUES ");
concat(
0..params.len(),
&mut insert_stmt,
Expand All @@ -325,45 +315,30 @@ async fn insert_posts<const N: usize>(conn: &mut Executor<ExecutorBuffer, TcpStr
},
);

let params_ref = params
.iter()
.flat_map(|el: &(String, i32, Option<String>)| {
let a: &dyn Encode<_, _> = &el.0;
let b: &dyn Encode<_, _> = &el.1;
let c: &dyn Encode<_, _> = &el.2;
[a, b, c]
})
.collect::<Vec<&dyn Encode<_, _>>>();

conn.execute_with_stmt::<wtx::Error, _, _>(insert_stmt.as_str(), params_ref.as_slice())
conn.execute_with_stmt(insert_stmt.as_str(), &mut params.into_iter().flatten())
.await
.unwrap();
}

async fn insert_users<const N: usize>(
conn: &mut Executor<ExecutorBuffer, TcpStream>,
conn: &mut Executor<wtx::Error, ExecutorBuffer, TcpStream>,
hair_color_init: impl Fn(usize) -> Option<&'static str>,
) {
let mut query = String::from("INSERT INTO users (name, hair_color) VALUES");
let mut params = Vec::with_capacity(2 * N);

concat(0..N, &mut query, |local_query, idx| {
local_query
.write_fmt(format_args!("(${}, ${})", 2 * idx + 1, 2 * idx + 2))
.unwrap();
params.push((format!("User {idx}"), hair_color_init(idx)));
});

let params_ref = params
.iter()
.flat_map(|el: &(String, Option<&'static str>)| {
let a: &dyn Encode<_, _> = &el.0;
let b: &dyn Encode<_, _> = &el.1;
[a, b]
})
.collect::<Vec<_>>();
let mut params = (0..N).into_iter().flat_map(|idx| {
[
Either::Left(format!("User {idx}")),
Either::Right(hair_color_init(idx)),
]
});

conn.execute_with_stmt::<wtx::Error, _, _>(query.as_str(), params_ref.as_slice())
conn.execute_with_stmt(query.as_str(), &mut params)
.await
.unwrap();
}
Loading