Skip to content

Commit

Permalink
feat: Add sqlite read/write pool split
Browse files Browse the repository at this point in the history
For now, put a basic struct in ceramic_core that manages access to the pools. The sqlx details are not hidden and still relied upon by crates interating with the database.
  • Loading branch information
dav1do committed Jan 17, 2024
1 parent eaa7abd commit 3a909a9
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 59 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ serde_bytes.workspace = true
serde_ipld_dagcbor.workspace = true
serde_json.workspace = true
ssi.workspace = true
sqlx.workspace = true
unsigned-varint.workspace = true

[dev-dependencies]
Expand Down
2 changes: 2 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod jws;
mod network;
mod range;
mod signer;
mod sql;
mod stream_id;

pub use bytes::Bytes;
Expand All @@ -19,6 +20,7 @@ pub use jws::{Jws, JwsSignature};
pub use network::Network;
pub use range::RangeOpen;
pub use signer::{JwkSigner, Signer};
pub use sql::SqlitePool;
pub use stream_id::{StreamId, StreamIdType};

pub use cid::Cid;
Expand Down
47 changes: 47 additions & 0 deletions core/src/sql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::{path::Path, str::FromStr};

use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};

#[derive(Clone, Debug)]
/// The sqlite pool is split into a writer and a reader pool.
/// Wrapper around the sqlx::SqlitePool
pub struct SqlitePool {
writer: sqlx::SqlitePool,
reader: sqlx::SqlitePool,
}

impl SqlitePool {
/// Connect to the sqlite database at the given path. Creates the database if it does not exist.
/// Uses WAL journal mode.
pub async fn connect(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let db_path = format!("sqlite:{}", path.as_ref().display());
let conn_opts = SqliteConnectOptions::from_str(&db_path)?
.journal_mode(SqliteJournalMode::Wal)
.create_if_missing(true)
// .synchronous(sqlx::sqlite::SqliteSynchronous::Normal) // normally enough in WAL mode?
.optimize_on_close(true, None);

let ro_opts = conn_opts.clone().read_only(true);

let writer = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(conn_opts)
.await?;
let reader = SqlitePoolOptions::new()
.max_connections(32) //TODO
.connect_with(ro_opts)
.await?;

Ok(Self { writer, reader })
}

/// Get a reference to the writer database pool. The writer pool has only one connection.
pub fn writer(&self) -> &sqlx::SqlitePool {
&self.writer
}

/// Get a reference to the reader database pool. The reader pool has many connections.
pub fn reader(&self) -> &sqlx::SqlitePool {
&self.reader
}
}
1 change: 0 additions & 1 deletion one/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ serde_json = "1"
serde_repr = "0.1"
signal-hook = "0.3.17"
signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] }
sqlx.workspace = true
swagger.workspace = true
tokio-metrics = { version = "0.3.1", features = ["rt"] }
tokio-prometheus-client = "0.1"
Expand Down
11 changes: 2 additions & 9 deletions one/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use anyhow::Result;
use ceramic_core::SqlitePool;
use ceramic_p2p::SQLiteBlockStore;
use chrono::{SecondsFormat, Utc};
use cid::{multibase, multihash, Cid};
use clap::{Args, Subcommand};
use glob::{glob, Paths};
use sqlx::sqlite::SqlitePool;
use std::{fs, path::PathBuf};

#[derive(Subcommand, Debug)]
Expand Down Expand Up @@ -47,14 +47,7 @@ async fn slurp(opts: SlurpOpts) -> Result<()> {
output_ceramic_path.display()
);

let pool: sqlx::Pool<sqlx::Sqlite> = SqlitePool::connect(&format!(
"sqlite:{}?mode=rwc",
output_ceramic_path
.to_str()
.expect("path should be utf8 compatible")
))
.await
.unwrap();
let pool = SqlitePool::connect(output_ceramic_path).await.unwrap();
let store = SQLiteBlockStore::new(pool).await.unwrap();

if let Some(input_ceramic_db) = opts.input_ceramic_db {
Expand Down
5 changes: 2 additions & 3 deletions one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ mod http;
mod metrics;
mod network;
mod recon_loop;
mod sql;

use std::{env, num::NonZeroUsize, path::PathBuf, time::Duration};

use anyhow::{anyhow, Result};
use ceramic_core::{EventId, Interest, PeerId};
use ceramic_core::{EventId, Interest, PeerId, SqlitePool};
use ceramic_kubo_rpc::Multiaddr;

use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle};
Expand Down Expand Up @@ -426,7 +425,7 @@ impl Daemon {

// Connect to sqlite
let sql_db_path: PathBuf = dir.join("db.sqlite3");
let sql_pool = sql::connect(&sql_db_path).await?;
let sql_pool = SqlitePool::connect(&sql_db_path).await?;

// Create recon store for interests.
let interest_store = InterestStore::new(sql_pool.clone(), "interest".to_string()).await?;
Expand Down
3 changes: 1 addition & 2 deletions one/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
use std::sync::Arc;

use anyhow::Result;
use ceramic_core::{EventId, Interest};
use ceramic_core::{EventId, Interest, SqlitePool};
use ceramic_kubo_rpc::{IpfsMetrics, IpfsMetricsMiddleware, IpfsService};
use ceramic_p2p::{Config as P2pConfig, Libp2pConfig, Node, SQLiteBlockStore};
use iroh_rpc_client::P2pClient;
use iroh_rpc_types::{p2p::P2pAddr, Addr};
use libp2p::identity::Keypair;
use recon::{libp2p::Recon, Sha256a};
use sqlx::SqlitePool;
use tokio::task::{self, JoinHandle};
use tracing::{debug, error};

Expand Down
18 changes: 0 additions & 18 deletions one/src/sql.rs

This file was deleted.

3 changes: 1 addition & 2 deletions p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{sync::atomic::Ordering, time::Duration};

use ahash::AHashMap;
use anyhow::{anyhow, bail, Context, Result};
use ceramic_core::{EventId, Interest};
use ceramic_core::{EventId, Interest, SqlitePool};
use ceramic_metrics::{libp2p_metrics, Recorder};
use cid::Cid;
use futures_util::stream::StreamExt;
Expand All @@ -30,7 +30,6 @@ use libp2p::{
swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent},
PeerId, StreamProtocol, Swarm,
};
use sqlx::SqlitePool;
use tokio::sync::oneshot::{self, Sender as OneShotSender};
use tokio::task::JoinHandle;
use tokio::{
Expand Down
29 changes: 15 additions & 14 deletions p2p/src/sqliteblockstore.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use bytes::Bytes;
use ceramic_core::SqlitePool;
use cid::{
multihash::Code::{Keccak256, Sha2_256},
multihash::MultihashDigest,
Expand All @@ -9,7 +10,7 @@ use cid::{
use futures_util::stream::BoxStream;
use iroh_bitswap::{Block, Store};
use multihash::Multihash;
use sqlx::{sqlite::Sqlite, Error, Row, SqlitePool};
use sqlx::{sqlite::Sqlite, Error, Row};

#[derive(Debug, Clone)]
pub struct SQLiteBlockStore {
Expand Down Expand Up @@ -49,7 +50,7 @@ impl SQLiteBlockStore {
);
",
)
.execute(&self.pool)
.execute(self.pool.writer())
.await?;
Ok(())
}
Expand All @@ -76,13 +77,13 @@ impl SQLiteBlockStore {
)
};
let hashes = hashes_query
.fetch_all(&self.pool)
.fetch_all(self.pool.reader())
.await?
.into_iter()
.map(|row| Multihash::from_bytes(row.get::<'_, &[u8], _>(0)))
.collect::<Result<Vec<Multihash>, multihash::Error>>()?;
let remaining = remaining_query
.fetch_one(&self.pool)
.fetch_one(self.pool.reader())
.await?
.get::<'_, i64, _>(0)
// Do not count the hashes we just got in the remaining count.
Expand All @@ -94,7 +95,7 @@ impl SQLiteBlockStore {
Ok(Some(
sqlx::query("SELECT length(bytes) FROM blocks WHERE multihash = ?;")
.bind(cid.hash().to_bytes())
.fetch_one(&self.pool)
.fetch_one(self.pool.reader())
.await?
.get::<'_, i64, _>(0) as u64,
))
Expand All @@ -103,14 +104,14 @@ impl SQLiteBlockStore {
pub async fn get(&self, cid: Cid) -> Result<Option<Bytes>> {
Ok(sqlx::query("SELECT bytes FROM blocks WHERE multihash = ?;")
.bind(cid.hash().to_bytes())
.fetch_optional(&self.pool)
.fetch_optional(self.pool.reader())
.await?
.map(|row| row.get::<'_, Vec<u8>, _>(0).into()))
}

pub fn scan(&self) -> BoxStream<Result<SQLiteBlock, Error>> {
sqlx::query_as::<Sqlite, SQLiteBlock>("SELECT multihash, bytes FROM blocks;")
.fetch(&self.pool)
.fetch(self.pool.reader())
}

/// Store a DAG node into IPFS.
Expand Down Expand Up @@ -138,7 +139,7 @@ impl SQLiteBlockStore {
match sqlx::query("INSERT INTO blocks (multihash, bytes) VALUES (?, ?)")
.bind(cid.hash().to_bytes())
.bind(blob.to_vec())
.execute(&self.pool)
.execute(self.pool.writer())
.await
{
Ok(_) => Ok(true),
Expand All @@ -158,7 +159,7 @@ impl SQLiteBlockStore {
",
)
.bind(input_ceramic_db_filename)
.execute(&self.pool)
.execute(self.pool.writer())
.await?;
Ok(())
}
Expand All @@ -167,7 +168,7 @@ impl SQLiteBlockStore {
pub async fn backup_to_sqlite(&self, output_ceramic_db_filename: &str) -> Result<()> {
sqlx::query(".backup ?")
.bind(output_ceramic_db_filename)
.execute(&self.pool)
.execute(self.pool.writer())
.await?;
Ok(())
}
Expand All @@ -182,7 +183,7 @@ impl Store for SQLiteBlockStore {
Ok(
sqlx::query("SELECT length(bytes) FROM blocks WHERE multihash = ?;")
.bind(cid.hash().to_bytes())
.fetch_one(&self.pool)
.fetch_one(self.pool.reader())
.await?
.get::<'_, i64, _>(0) as usize,
)
Expand All @@ -195,7 +196,7 @@ impl Store for SQLiteBlockStore {
Ok(Block::new(
sqlx::query("SELECT bytes FROM blocks WHERE multihash = ?;")
.bind(cid.hash().to_bytes())
.fetch_one(&self.pool)
.fetch_one(self.pool.reader())
.await?
.get::<'_, Vec<u8>, _>(0)
.into(),
Expand All @@ -210,7 +211,7 @@ impl Store for SQLiteBlockStore {
Ok(
sqlx::query("SELECT count(1) FROM blocks WHERE multihash = ?;")
.bind(cid.hash().to_bytes())
.fetch_one(&self.pool)
.fetch_one(self.pool.reader())
.await?
.get::<'_, i64, _>(0)
> 0,
Expand All @@ -225,10 +226,10 @@ mod tests {
use crate::SQLiteBlockStore;
use anyhow::Error;
use bytes::Bytes;
use ceramic_core::SqlitePool;
use cid::{Cid, CidGeneric};
use expect_test::expect;
use iroh_bitswap::Store;
use sqlx::SqlitePool;

#[tokio::test]
async fn test_store_block() {
Expand Down
Loading

0 comments on commit 3a909a9

Please sign in to comment.