Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add sqlite read/write pool split #218

Merged
merged 2 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ serde_bytes.workspace = true
serde_ipld_dagcbor.workspace = true
serde_json.workspace = true
ssi.workspace = true
sqlx.workspace = true
unsigned-varint.workspace = true

[dev-dependencies]
Expand Down
2 changes: 2 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod jws;
mod network;
mod range;
mod signer;
mod sql;
mod stream_id;

pub use bytes::Bytes;
Expand All @@ -19,6 +20,7 @@ pub use jws::{Jws, JwsSignature};
pub use network::Network;
pub use range::RangeOpen;
pub use signer::{JwkSigner, Signer};
pub use sql::SqlitePool;
pub use stream_id::{StreamId, StreamIdType};

pub use cid::Cid;
Expand Down
48 changes: 48 additions & 0 deletions core/src/sql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::{path::Path, str::FromStr};

use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};

#[derive(Clone, Debug)]
/// The sqlite pool is split into a writer and a reader pool.
/// Wrapper around the sqlx::SqlitePool
pub struct SqlitePool {
writer: sqlx::SqlitePool,
reader: sqlx::SqlitePool,
}

impl SqlitePool {
/// Connect to the sqlite database at the given path. Creates the database if it does not exist.
/// Uses WAL journal mode.
pub async fn connect(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let db_path = format!("sqlite:{}", path.as_ref().display());
// As we benchmark, we will likely adjust settings and make things configurable.
// A few ideas: number of RO connections, synchronize = NORMAL, mmap_size, temp_store = memory
let conn_opts = SqliteConnectOptions::from_str(&db_path)?
.journal_mode(SqliteJournalMode::Wal)
.create_if_missing(true)
.optimize_on_close(true, None);

let ro_opts = conn_opts.clone().read_only(true);

let writer = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(conn_opts)
.await?;
let reader = SqlitePoolOptions::new()
.max_connections(8)
.connect_with(ro_opts)
.await?;

Ok(Self { writer, reader })
}

/// Get a reference to the writer database pool. The writer pool has only one connection.
pub fn writer(&self) -> &sqlx::SqlitePool {
&self.writer
}

/// Get a reference to the reader database pool. The reader pool has many connections.
pub fn reader(&self) -> &sqlx::SqlitePool {
&self.reader
}
}
1 change: 0 additions & 1 deletion one/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ serde_json = "1"
serde_repr = "0.1"
signal-hook = "0.3.17"
signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] }
sqlx.workspace = true
swagger.workspace = true
tokio-metrics = { version = "0.3.1", features = ["rt"] }
tokio-prometheus-client = "0.1"
Expand Down
11 changes: 2 additions & 9 deletions one/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use anyhow::Result;
use ceramic_core::SqlitePool;
use ceramic_p2p::SQLiteBlockStore;
use chrono::{SecondsFormat, Utc};
use cid::{multibase, multihash, Cid};
use clap::{Args, Subcommand};
use glob::{glob, Paths};
use sqlx::sqlite::SqlitePool;
use std::{fs, path::PathBuf};

#[derive(Subcommand, Debug)]
Expand Down Expand Up @@ -47,14 +47,7 @@ async fn slurp(opts: SlurpOpts) -> Result<()> {
output_ceramic_path.display()
);

let pool: sqlx::Pool<sqlx::Sqlite> = SqlitePool::connect(&format!(
"sqlite:{}?mode=rwc",
output_ceramic_path
.to_str()
.expect("path should be utf8 compatible")
))
.await
.unwrap();
let pool = SqlitePool::connect(output_ceramic_path).await.unwrap();
let store = SQLiteBlockStore::new(pool).await.unwrap();

if let Some(input_ceramic_db) = opts.input_ceramic_db {
Expand Down
5 changes: 2 additions & 3 deletions one/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ mod http;
mod metrics;
mod network;
mod recon_loop;
mod sql;

use std::{env, num::NonZeroUsize, path::PathBuf, time::Duration};

use anyhow::{anyhow, Result};
use ceramic_core::{EventId, Interest, PeerId};
use ceramic_core::{EventId, Interest, PeerId, SqlitePool};
use ceramic_kubo_rpc::Multiaddr;

use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle};
Expand Down Expand Up @@ -426,7 +425,7 @@ impl Daemon {

// Connect to sqlite
let sql_db_path: PathBuf = dir.join("db.sqlite3");
let sql_pool = sql::connect(&sql_db_path).await?;
let sql_pool = SqlitePool::connect(&sql_db_path).await?;

// Create recon store for interests.
let interest_store = InterestStore::new(sql_pool.clone(), "interest".to_string()).await?;
Expand Down
3 changes: 1 addition & 2 deletions one/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
use std::sync::Arc;

use anyhow::Result;
use ceramic_core::{EventId, Interest};
use ceramic_core::{EventId, Interest, SqlitePool};
use ceramic_kubo_rpc::{IpfsMetrics, IpfsMetricsMiddleware, IpfsService};
use ceramic_p2p::{Config as P2pConfig, Libp2pConfig, Node, SQLiteBlockStore};
use iroh_rpc_client::P2pClient;
use iroh_rpc_types::{p2p::P2pAddr, Addr};
use libp2p::identity::Keypair;
use recon::{libp2p::Recon, Sha256a};
use sqlx::SqlitePool;
use tokio::task::{self, JoinHandle};
use tracing::{debug, error};

Expand Down
18 changes: 0 additions & 18 deletions one/src/sql.rs

This file was deleted.

3 changes: 1 addition & 2 deletions p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{sync::atomic::Ordering, time::Duration};

use ahash::AHashMap;
use anyhow::{anyhow, bail, Context, Result};
use ceramic_core::{EventId, Interest};
use ceramic_core::{EventId, Interest, SqlitePool};
use ceramic_metrics::{libp2p_metrics, Recorder};
use cid::Cid;
use futures_util::stream::StreamExt;
Expand All @@ -30,7 +30,6 @@ use libp2p::{
swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent},
PeerId, StreamProtocol, Swarm,
};
use sqlx::SqlitePool;
use tokio::sync::oneshot::{self, Sender as OneShotSender};
use tokio::task::JoinHandle;
use tokio::{
Expand Down
29 changes: 15 additions & 14 deletions p2p/src/sqliteblockstore.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use bytes::Bytes;
use ceramic_core::SqlitePool;
use cid::{
multihash::Code::{Keccak256, Sha2_256},
multihash::MultihashDigest,
Expand All @@ -9,7 +10,7 @@ use cid::{
use futures_util::stream::BoxStream;
use iroh_bitswap::{Block, Store};
use multihash::Multihash;
use sqlx::{sqlite::Sqlite, Error, Row, SqlitePool};
use sqlx::{sqlite::Sqlite, Error, Row};

#[derive(Debug, Clone)]
pub struct SQLiteBlockStore {
Expand Down Expand Up @@ -49,7 +50,7 @@ impl SQLiteBlockStore {
);
",
)
.execute(&self.pool)
.execute(self.pool.writer())
.await?;
Ok(())
}
Expand All @@ -76,13 +77,13 @@ impl SQLiteBlockStore {
)
};
let hashes = hashes_query
.fetch_all(&self.pool)
.fetch_all(self.pool.reader())
.await?
.into_iter()
.map(|row| Multihash::from_bytes(row.get::<'_, &[u8], _>(0)))
.collect::<Result<Vec<Multihash>, multihash::Error>>()?;
let remaining = remaining_query
.fetch_one(&self.pool)
.fetch_one(self.pool.reader())
.await?
.get::<'_, i64, _>(0)
// Do not count the hashes we just got in the remaining count.
Expand All @@ -94,7 +95,7 @@ impl SQLiteBlockStore {
Ok(Some(
sqlx::query("SELECT length(bytes) FROM blocks WHERE multihash = ?;")
.bind(cid.hash().to_bytes())
.fetch_one(&self.pool)
.fetch_one(self.pool.reader())
.await?
.get::<'_, i64, _>(0) as u64,
))
Expand All @@ -103,14 +104,14 @@ impl SQLiteBlockStore {
pub async fn get(&self, cid: Cid) -> Result<Option<Bytes>> {
Ok(sqlx::query("SELECT bytes FROM blocks WHERE multihash = ?;")
.bind(cid.hash().to_bytes())
.fetch_optional(&self.pool)
.fetch_optional(self.pool.reader())
.await?
.map(|row| row.get::<'_, Vec<u8>, _>(0).into()))
}

pub fn scan(&self) -> BoxStream<Result<SQLiteBlock, Error>> {
sqlx::query_as::<Sqlite, SQLiteBlock>("SELECT multihash, bytes FROM blocks;")
.fetch(&self.pool)
.fetch(self.pool.reader())
}

/// Store a DAG node into IPFS.
Expand Down Expand Up @@ -138,7 +139,7 @@ impl SQLiteBlockStore {
match sqlx::query("INSERT INTO blocks (multihash, bytes) VALUES (?, ?)")
.bind(cid.hash().to_bytes())
.bind(blob.to_vec())
.execute(&self.pool)
.execute(self.pool.writer())
.await
{
Ok(_) => Ok(true),
Expand All @@ -158,7 +159,7 @@ impl SQLiteBlockStore {
",
)
.bind(input_ceramic_db_filename)
.execute(&self.pool)
.execute(self.pool.writer())
.await?;
Ok(())
}
Expand All @@ -167,7 +168,7 @@ impl SQLiteBlockStore {
pub async fn backup_to_sqlite(&self, output_ceramic_db_filename: &str) -> Result<()> {
sqlx::query(".backup ?")
.bind(output_ceramic_db_filename)
.execute(&self.pool)
.execute(self.pool.writer())
.await?;
Ok(())
}
Expand All @@ -182,7 +183,7 @@ impl Store for SQLiteBlockStore {
Ok(
sqlx::query("SELECT length(bytes) FROM blocks WHERE multihash = ?;")
.bind(cid.hash().to_bytes())
.fetch_one(&self.pool)
.fetch_one(self.pool.reader())
.await?
.get::<'_, i64, _>(0) as usize,
)
Expand All @@ -195,7 +196,7 @@ impl Store for SQLiteBlockStore {
Ok(Block::new(
sqlx::query("SELECT bytes FROM blocks WHERE multihash = ?;")
.bind(cid.hash().to_bytes())
.fetch_one(&self.pool)
.fetch_one(self.pool.reader())
.await?
.get::<'_, Vec<u8>, _>(0)
.into(),
Expand All @@ -210,7 +211,7 @@ impl Store for SQLiteBlockStore {
Ok(
sqlx::query("SELECT count(1) FROM blocks WHERE multihash = ?;")
.bind(cid.hash().to_bytes())
.fetch_one(&self.pool)
.fetch_one(self.pool.reader())
.await?
.get::<'_, i64, _>(0)
> 0,
Expand All @@ -225,10 +226,10 @@ mod tests {
use crate::SQLiteBlockStore;
use anyhow::Error;
use bytes::Bytes;
use ceramic_core::SqlitePool;
use cid::{Cid, CidGeneric};
use expect_test::expect;
use iroh_bitswap::Store;
use sqlx::SqlitePool;

#[tokio::test]
async fn test_store_block() {
Expand Down
Loading
Loading