Skip to content

Commit

Permalink
feat(service/postgresql): support connection pool (#3176)
Browse files Browse the repository at this point in the history
Signed-off-by: Manjusaka <me@manjusaka.me>
  • Loading branch information
Zheaoli authored Sep 25, 2023
1 parent 0a98e32 commit bb738f9
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 67 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ services-oss = [
"reqsign?/reqwest_request",
]
services-persy = ["dep:persy"]
services-postgresql = ["dep:tokio-postgres"]
services-postgresql = ["dep:tokio-postgres", "dep:bb8", "dep:bb8-postgres"]
services-redb = ["dep:redb"]
services-redis = ["dep:redis"]
services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"]
Expand Down Expand Up @@ -275,7 +275,7 @@ tokio-postgres = { version = "0.7.8", optional = true }
tracing = { version = "0.1", optional = true }
uuid = { version = "1", features = ["serde", "v4"] }
mysql_async = { version = "0.32.2", optional = true }

bb8-postgres = { version = "0.8.1", optional = true }

[dev-dependencies]
criterion = { version = "0.4", features = ["async", "async_tokio"] }
Expand Down
90 changes: 25 additions & 65 deletions core/src/services/postgresql/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use std::str::FromStr;
use std::sync::Arc;

use async_trait::async_trait;
use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use tokio::sync::OnceCell;
use tokio_postgres::Client;
use tokio_postgres::Config;
use tokio_postgres::Statement;

use crate::raw::adapters::kv;
use crate::raw::*;
Expand Down Expand Up @@ -185,15 +185,11 @@ impl Builder for PostgresqlBuilder {
);

Ok(PostgresqlBackend::new(Adapter {
client: OnceCell::new(),
pool: OnceCell::new(),
config,
table,
key_field,
value_field,

statement_get: OnceCell::new(),
statement_set: OnceCell::new(),
statement_del: OnceCell::new(),
})
.with_root(&root))
}
Expand All @@ -204,17 +200,12 @@ pub type PostgresqlBackend = kv::Backend<Adapter>;

#[derive(Clone)]
pub struct Adapter {
client: OnceCell<Arc<Client>>,
pool: OnceCell<Arc<Pool<PostgresConnectionManager<tokio_postgres::NoTls>>>>,
config: Config,

table: String,
key_field: String,
value_field: String,

/// Prepared statements for get/put/delete.
statement_get: OnceCell<Statement>,
statement_set: OnceCell<Statement>,
statement_del: OnceCell<Statement>,
}

impl Debug for Adapter {
Expand All @@ -225,21 +216,14 @@ impl Debug for Adapter {
}

impl Adapter {
async fn get_client(&self) -> Result<&Client> {
self.client
async fn get_client(&self) -> Result<&Pool<PostgresConnectionManager<tokio_postgres::NoTls>>> {
self.pool
.get_or_try_init(|| async {
// TODO: add tls support.
let (client, conn) = self.config.connect(tokio_postgres::NoTls).await?;

// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = conn.await {
eprintln!("postgresql connection error: {}", e);
}
});

Ok(Arc::new(client))
let manager =
PostgresConnectionManager::new(self.config.clone(), tokio_postgres::NoTls);
let pool = Pool::builder().build(manager).await?;
Ok(Arc::new(pool))
})
.await
.map(|v| v.as_ref())
Expand All @@ -265,18 +249,11 @@ impl kv::Adapter for Adapter {
"SELECT {} FROM {} WHERE {} = $1 LIMIT 1",
self.value_field, self.table, self.key_field
);
let statement = self
.statement_get
.get_or_try_init(|| async {
self.get_client()
.await?
.prepare(&query)
.await
.map_err(Error::from)
})
.await?;

let rows = self.get_client().await?.query(statement, &[&path]).await?;
let connection = self.get_client().await?.get().await.map_err(|err| {
Error::new(ErrorKind::Unexpected, "unhandled error from postgresql").set_source(err)
})?;
let statement = connection.prepare(&query).await?;
let rows = connection.query(&statement, &[&path]).await?;
if rows.is_empty() {
return Ok(None);
}
Expand All @@ -294,39 +271,22 @@ impl kv::Adapter for Adapter {
ON CONFLICT ({key_field}) \
DO UPDATE SET {value_field} = EXCLUDED.{value_field}",
);
let statement = self
.statement_set
.get_or_try_init(|| async {
self.get_client()
.await?
.prepare(&query)
.await
.map_err(Error::from)
})
.await?;

let _ = self
.get_client()
.await?
.query(statement, &[&path, &value])
.await?;
let connection = self.get_client().await?.get().await.map_err(|err| {
Error::new(ErrorKind::Unexpected, "unhandled error from postgresql").set_source(err)
})?;
let statement = connection.prepare(&query).await?;
let _ = connection.query(&statement, &[&path, &value]).await?;
Ok(())
}

async fn delete(&self, path: &str) -> Result<()> {
let query = format!("DELETE FROM {} WHERE {} = $1", self.table, self.key_field);
let statement = self
.statement_del
.get_or_try_init(|| async {
self.get_client()
.await?
.prepare(&query)
.await
.map_err(Error::from)
})
.await?;
let connection = self.get_client().await?.get().await.map_err(|err| {
Error::new(ErrorKind::Unexpected, "unhandled error from postgresql").set_source(err)
})?;
let statement = connection.prepare(&query).await?;

let _ = self.get_client().await?.query(statement, &[&path]).await?;
let _ = connection.query(&statement, &[&path]).await?;
Ok(())
}
}
Expand Down

0 comments on commit bb738f9

Please sign in to comment.