diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 29e542e..dd9c9e1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -97,7 +97,37 @@ jobs: ./build/mvstore --data-plane 127.0.0.1:7000 --admin-api 127.0.0.1:7001 --metadata-prefix mvstore-test --raw-data-prefix m & sleep 1 curl http://localhost:7001/api/create_namespace -d '{"key":"stress","metadata":""}' - RUST_LOG=error ./build/mvstore-stress --concurrency 50 --data-plane http://localhost:7000 --iterations 1000 --ns-key stress --pages 1000 + RUST_LOG=error,mvstore_stress=info ./build/mvstore-stress --concurrency 50 --data-plane http://localhost:7000 --admin-api http://localhost:7001 --iterations 1000 --ns-key stress --pages 1000 + stress-exotic: + name: mvstore stress test (exotic knobs) + runs-on: ubuntu-20.04 + needs: + - build + steps: + - name: Checkout + uses: actions/checkout@v2 + - name: Install system dependencies + run: | + set -e + curl -L https://github.com/apple/foundationdb/releases/download/7.1.17/foundationdb-clients_7.1.17-1_amd64.deb --output fdb-client.deb + sudo dpkg -i fdb-client.deb + curl -L https://github.com/apple/foundationdb/releases/download/7.1.17/foundationdb-server_7.1.17-1_amd64.deb --output fdb-server.deb + sudo dpkg -i fdb-server.deb + - name: Fetch binaries + uses: actions/download-artifact@v2 + with: + name: build + path: ./build + - name: Run it + run: | + set -e + chmod +x ./build/mvstore ./build/mvstore-stress + export RUST_LOG=info + ./build/mvstore --data-plane 127.0.0.1:7000 --admin-api 127.0.0.1:7001 --metadata-prefix mvstore-test --raw-data-prefix m \ + --knob-gc-scan-batch-size 50 & + sleep 1 + curl http://localhost:7001/api/create_namespace -d '{"key":"stress","metadata":""}' + RUST_LOG=error,mvstore_stress=info ./build/mvstore-stress --concurrency 50 --data-plane http://localhost:7000 --admin-api http://localhost:7001 --iterations 1000 --ns-key stress --pages 1000 stress-buggify: name: mvstore stress test (buggify) runs-on: ubuntu-20.04 @@ -130,13 +160,14 @@ jobs: sleep 1 ./build/mvstore --data-plane 127.0.0.1:7000 --admin-api 127.0.0.1:7001 --metadata-prefix mvstore-test --raw-data-prefix m --fdb-buggify & sleep 1 - RUST_LOG=error ./build/mvstore-stress --concurrency 50 --data-plane http://localhost:7000 --iterations 1000 --ns-key stress --pages 1000 + RUST_LOG=error,mvstore_stress=info ./build/mvstore-stress --concurrency 50 --data-plane http://localhost:7000 --admin-api http://localhost:7001 --iterations 1000 --ns-key stress --pages 1000 release: name: Release needs: - build - stress - stress-buggify + - stress-exotic - build-deb if: startsWith(github.ref, 'refs/tags/') runs-on: ubuntu-20.04 diff --git a/Cargo.lock b/Cargo.lock index e7107c3..b09aee1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -173,6 +173,12 @@ dependencies = [ "which", ] +[[package]] +name = "bit-vec" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02b4ff8b16e6076c3e14220b39fbc1fabb6737522281a388998046859400895f" + [[package]] name = "bitflags" version = "1.3.2" @@ -202,6 +208,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bloom" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d00ac8e5056d6d65376a3c1aa5c7c34850d6949ace17f0266953a254eb3d6fe8" +dependencies = [ + "bit-vec", +] + [[package]] name = "bumpalo" version = "3.10.0" @@ -1082,6 +1097,7 @@ version = "0.1.0" dependencies = [ "anyhow", "blake3", + "bloom", "bytes", "foundationdb", "futures", diff --git a/README.md b/README.md index a4b0254..a3ebb05 100644 --- a/README.md +++ b/README.md @@ -13,9 +13,8 @@ Distributed, MVCC SQLite that runs on top of [FoundationDB](https://github.com/a - [Caveats](#caveats) - [The "database is locked" error](#the-database-is-locked-error) - [No ABA-style idempotency](#no-aba-style-idempotency) - - [Limits](#limits) + - [Transaction size and time limit](#transaction-size-and-time-limit) - [Read latency](#read-latency) - - [Not yet implemented: garbage collection](#not-yet-implemented-garbage-collection) ## Features @@ -128,14 +127,12 @@ This means that, in a very rare circumstance as described below: The first client will get a commit conflict and abort. This is the expected behavior, since unbounded idempotency requires too much overhead. -## Limits +### Transaction size and time limit + +Currently the max transaction size in mvsqlite is ~1GB and the time limit is 1 hour. ### Read latency SQLite does synchronous "disk" I/O. While we can (and do) concurrently execute write operations, reads from FoundationDB block the SQLite thread. This is probably fine if you don't expect to get very I/O intensive on a single database, but you may want to enable `coroutine` in the `IoEngine` config if you have an event loop outside, so that network I/O won't block the thread. - -### Not yet implemented: garbage collection - -Currently history versions will be kept in the database forever. There is no garbage collection yet. In a future version this will be fixed. diff --git a/mvclient/src/lib.rs b/mvclient/src/lib.rs index b134cc4..221f5c0 100644 --- a/mvclient/src/lib.rs +++ b/mvclient/src/lib.rs @@ -468,7 +468,9 @@ async fn request_and_check_returning_status( }; Ok(Some((headers, body))) } else if res.status().is_server_error() { - tracing::error!(status = %res.status(), "server error"); + let status = res.status(); + let text = res.text().await.unwrap_or_default(); + tracing::error!(status = %status, text = text, "server error"); Ok(None) } else { let status = res.status(); diff --git a/mvstore-stress/src/inmem.rs b/mvstore-stress/src/inmem.rs index b853bb5..0a50a21 100644 --- a/mvstore-stress/src/inmem.rs +++ b/mvstore-stress/src/inmem.rs @@ -4,10 +4,10 @@ use rand::{thread_rng, Rng}; use rpds::RedBlackTreeMapSync; pub struct Inmem { - versions: BTreeMap>, + pub versions: BTreeMap>, inflight: BTreeMap>, next_inflight_id: u64, - version_list: Vec, + pub version_list: Vec, } impl Inmem { @@ -49,11 +49,11 @@ impl Inmem { version.insert_mut(index, *hash.as_bytes()); } - pub fn verify_page(&self, id: u64, index: u32, data: &[u8]) { + pub fn verify_page(&self, id: u64, index: u32, data: &[u8], desc: &str) { let version = self.inflight.get(&id).expect("inflight not found"); if data.is_empty() { - assert!(version.get(&index).is_none(), "page should not exist"); + assert!(version.get(&index).is_none(), "page should not exist ({})", desc); } else { let computed_hash = blake3::hash(data); let stored_hash = *version.get(&index).expect("page not found"); diff --git a/mvstore-stress/src/main.rs b/mvstore-stress/src/main.rs index 5af218e..34903fb 100644 --- a/mvstore-stress/src/main.rs +++ b/mvstore-stress/src/main.rs @@ -14,6 +14,10 @@ struct Opt { /// Data plane URL. #[structopt(long)] data_plane: String, + + /// Admin API URL. + #[structopt(long)] + admin_api: String, /// Output log in JSON format. #[structopt(long)] @@ -62,7 +66,7 @@ async fn main() -> Result<()> { data_plane: opt.data_plane.parse()?, ns_key: opt.ns_key.clone(), })?; - let t = Tester::new(client.clone(), opt.pages); + let t = Tester::new(client.clone(), opt.admin_api.clone(), opt.pages); t.run(opt.concurrency as _, opt.iterations as _).await; println!("Test succeeded."); Ok(()) diff --git a/mvstore-stress/src/tester.rs b/mvstore-stress/src/tester.rs index 676f272..492ec69 100644 --- a/mvstore-stress/src/tester.rs +++ b/mvstore-stress/src/tester.rs @@ -1,4 +1,8 @@ -use std::sync::Arc; +use std::{ + collections::{BTreeMap, HashSet}, + sync::{Arc, Mutex}, + time::Duration, +}; use tokio::sync::RwLock; use anyhow::Result; @@ -11,18 +15,25 @@ pub struct Tester { mem: RwLock, client: Arc, num_pages: u32, + admin_api: String, + busy_versions: Mutex>, } impl Tester { - pub fn new(client: Arc, num_pages: u32) -> Arc { + pub fn new(client: Arc, admin_api: String, num_pages: u32) -> Arc { Arc::new(Self { mem: RwLock::new(Inmem::new()), client, num_pages, + admin_api, + busy_versions: Mutex::new(BTreeMap::new()), }) } pub async fn run(self: &Arc, concurrency: usize, iterations: usize) { + let truncate_worker = tokio::spawn(self.clone().truncate_worker()); + let delete_unreferenced_content_worker = + tokio::spawn(self.clone().delete_unreferenced_content_worker()); let handles = (0..concurrency) .map(|i| { let me = self.clone(); @@ -32,18 +43,140 @@ impl Tester { for handle in handles { handle.await.unwrap().unwrap(); } + truncate_worker.abort(); + delete_unreferenced_content_worker.abort(); + } + + async fn truncate_worker(self: Arc) { + let rc = reqwest::Client::new(); + loop { + let sleep_dur_ms = rand::thread_rng().gen_range(1..1000); + let sleep_dur = Duration::from_millis(sleep_dur_ms); + tokio::time::sleep(sleep_dur).await; + + let mut remove_point: String; + { + let mut mem = self.mem.write().await; + let mut versions = mem.versions.keys().cloned().collect::>(); + versions.pop(); // never remove the latest version, if any + + if versions.len() == 0 { + continue; + } + let split_point = rand::thread_rng().gen_range(0..versions.len()); + remove_point = versions[split_point].clone(); + + if let Some((k, _)) = self.busy_versions.lock().unwrap().iter().next() { + if *k < remove_point { + remove_point = k.clone(); + } + } + let mut removals: HashSet = HashSet::new(); + for x in &versions { + if *x < remove_point { + mem.versions.remove(x); + removals.insert(x.clone()); + } + } + mem.version_list = mem + .version_list + .iter() + .filter(|x| !removals.contains(*x)) + .cloned() + .collect::>(); + } + let payload = serde_json::json!({ + "key": &self.client.config().ns_key, + "before_version": &remove_point, + "apply": true, + }); + tracing::info!(remove_point = remove_point, "triggering truncation"); + match rc + .post(format!("{}/api/truncate_namespace", self.admin_api,)) + .json(&payload) + .send() + .await + { + Ok(res) => match res.bytes().await { + Ok(_) => { + tracing::info!("truncated namespace"); + } + Err(e) => { + tracing::error!(error = %e, "failed to read response for truncate namespace"); + } + }, + Err(e) => { + tracing::error!(error = %e, "failed to truncate namespace"); + } + } + } + } + + async fn delete_unreferenced_content_worker(self: Arc) { + let rc = reqwest::Client::new(); + loop { + let sleep_dur_ms = rand::thread_rng().gen_range(1..10000); + let sleep_dur = Duration::from_millis(sleep_dur_ms); + tokio::time::sleep(sleep_dur).await; + let payload = serde_json::json!({ + "key": &self.client.config().ns_key, + "apply": true, + }); + tracing::info!("triggering duc"); + match rc + .post(format!( + "{}/api/delete_unreferenced_content", + self.admin_api, + )) + .json(&payload) + .send() + .await + { + Ok(res) => match res.bytes().await { + Ok(_) => { + tracing::info!("deleted unreferenced content"); + } + Err(e) => { + tracing::error!(error = %e, "failed to read response for duc"); + } + }, + Err(e) => { + tracing::error!(error = %e, "failed to delete unreferenced content"); + } + } + } + } + + fn acquire_version(&self, version: &str) { + *self + .busy_versions + .lock() + .unwrap() + .entry(version.to_string()) + .or_default() += 1; + } + + fn release_version(&self, version: &str) { + let mut versions = self.busy_versions.lock().unwrap(); + let entry = versions.get_mut(version).unwrap(); + assert!(*entry > 0, "version {} is not acquired", version); + *entry -= 1; + if *entry == 0 { + versions.remove(version); + } } async fn task(self: Arc, task_id: usize, iterations: usize) -> Result<()> { let mut mem = self.mem.write().await; let mut txn = self.client.create_transaction().await?; let mut txn_id = mem.start_transaction(txn.version()); + self.acquire_version(txn.version()); drop(mem); let mut last_writes: Vec>> = vec![None; self.num_pages as usize]; for it in 0..iterations { - let mode = rand::thread_rng().gen_range(0..10); - tracing::info!(task = task_id, iteration = it, mode = mode, "iteration"); + let mode = rand::thread_rng().gen_range(0..11); + //tracing::info!(task = task_id, iteration = it, mode = mode, "iteration"); match mode { 0..=5 => { let num_reads = rand::thread_rng().gen_range(1..=10); @@ -53,8 +186,8 @@ impl Tester { let pages = txn.read_many(&reads).await?; let mem = self.mem.read().await; for (&index, page) in reads.iter().zip(pages.iter()) { - tracing::info!(task = task_id, iteration = it, index = index, "read"); - mem.verify_page(txn_id, index, page); + //tracing::info!(task = task_id, iteration = it, index = index, "read"); + mem.verify_page(txn_id, index, page, txn.version()); } } 6..=7 => { @@ -83,7 +216,7 @@ impl Tester { last_writes[index as usize] = Some(data.clone()); } - tracing::info!(task = task_id, iteration = it, index = index, "write"); + //tracing::info!(task = task_id, iteration = it, index = index, "write"); (index, data) }) @@ -100,20 +233,31 @@ impl Tester { } 8 => { let mut mem = self.mem.write().await; + let version = txn.version().to_string(); match txn.commit(None).await? { Some(info) => { mem.commit_transaction(txn_id, &info.version); } None => mem.drop_transaction(txn_id), } + self.release_version(&version); drop(mem); tokio::task::yield_now().await; (txn, txn_id) = self.create_transaction_random_base().await?; + //tracing::info!(version = txn.version(), "created txn"); } 9 => { - self.mem.write().await.drop_transaction(txn_id); + let mut mem = self.mem.write().await; + mem.drop_transaction(txn_id); + self.release_version(txn.version()); + drop(mem); tokio::task::yield_now().await; (txn, txn_id) = self.create_transaction_random_base().await?; + //tracing::info!(version = txn.version(), "created txn"); + } + 10 => { + let dur_millis = rand::thread_rng().gen_range(1..100); + tokio::time::sleep(Duration::from_millis(dur_millis)).await; } _ => unreachable!(), } @@ -129,12 +273,14 @@ impl Tester { if let Some(version) = mem.pick_random_version() { let txn = self.client.create_transaction_at_version(version); let txn_id = mem.start_transaction(txn.version()); + self.acquire_version(txn.version()); return Ok((txn, txn_id)); } } let txn = self.client.create_transaction().await?; let txn_id = mem.start_transaction(txn.version()); + self.acquire_version(txn.version()); Ok((txn, txn_id)) } } diff --git a/mvstore/Cargo.toml b/mvstore/Cargo.toml index 5f889ec..181f406 100644 --- a/mvstore/Cargo.toml +++ b/mvstore/Cargo.toml @@ -40,3 +40,4 @@ serde_json = "1" serde_bytes = "0.11" zstd = "0.11.2" moka = { version = "0.9.2", features = ["future"] } +bloom = "0.3.2" diff --git a/mvstore/src/commit.rs b/mvstore/src/commit.rs new file mode 100644 index 0000000..a4b3f1f --- /dev/null +++ b/mvstore/src/commit.rs @@ -0,0 +1,136 @@ +use std::time::SystemTime; + +use anyhow::Result; +use foundationdb::{options::MutationType, FdbError}; +use rand::RngCore; + +use crate::server::{generate_suffix_versionstamp_atomic_op, ContentIndex, Server}; + +pub enum CommitResult { + Committed { versionstamp: [u8; 10] }, + Conflict, + BadPageReference, +} + +pub struct CommitContext<'a> { + pub ns_id: [u8; 10], + pub client_assumed_version: [u8; 10], + pub idempotency_key: [u8; 16], + pub index_writes: &'a [(u32, [u8; 32])], + pub metadata: Option<&'a str>, +} + +impl Server { + pub async fn commit<'a>(&self, ctx: CommitContext<'a>) -> Result { + // Begin the writes. + // We do three-phase commit for large transactions here. + let multi_phase = ctx.index_writes.len() >= 1000; + let commit_token_key = self.construct_ns_commit_token_key(ctx.ns_id); + let mut commit_token = [0u8; 16]; + rand::thread_rng().fill_bytes(&mut commit_token); + + // Phase 1 - Idempotency && Check page existence + let mut txn = self.db.create_trx()?; + let last_write_version_key = self.construct_last_write_version_key(ctx.ns_id); + let last_write_version = txn.get(&last_write_version_key, false).await?; + match &last_write_version { + Some(x) if x.len() != 16 + 10 => { + anyhow::bail!("invalid last write version"); + } + Some(x) => { + let actual_idempotency_key = <[u8; 16]>::try_from(&x[0..16]).unwrap(); + let actual_last_write_version = <[u8; 10]>::try_from(&x[16..26]).unwrap(); + + if actual_idempotency_key == ctx.idempotency_key { + return Ok(CommitResult::Committed { + versionstamp: actual_last_write_version, + }); + } + + if ctx.client_assumed_version < actual_last_write_version { + return Ok(CommitResult::Conflict); + } + } + None => {} + } + + for (_, page_hash) in ctx.index_writes { + let content_index_key = self.construct_contentindex_key(ctx.ns_id, *page_hash); + let data = txn.get(&content_index_key, false).await?; + if data.is_none() { + return Ok(CommitResult::BadPageReference); + } + } + + if multi_phase { + txn.set(&commit_token_key, &commit_token); + txn = txn.commit().await.map_err(|e| FdbError::from(e))?.reset(); + if txn + .get(&commit_token_key, false) + .await? + .as_ref() + .map(|x| &x[..]) + .unwrap_or_default() + != commit_token + { + anyhow::bail!("commit interrupted before phase 2"); + } + } + + // Phase 2 - content index insertion + + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + for (_, page_hash) in ctx.index_writes { + let key = self.construct_contentindex_key(ctx.ns_id, *page_hash); + let value = ContentIndex::generate_mutation_payload(now); + txn.atomic_op(&key, &value, MutationType::SetVersionstampedValue); + } + + if multi_phase { + txn = txn.commit().await.map_err(|e| FdbError::from(e))?.reset(); + if txn + .get(&commit_token_key, false) + .await? + .as_ref() + .map(|x| &x[..]) + .unwrap_or_default() + != commit_token + { + anyhow::bail!("commit interrupted before phase 3"); + } + } + + // Phase 3 - content insertion + if let Some(md) = ctx.metadata { + let metadata_key = self.construct_nsmd_key(ctx.ns_id); + txn.set(&metadata_key, md.as_bytes()); + } + + for (page_index, page_hash) in ctx.index_writes { + let page_key_template = self.construct_page_key(ctx.ns_id, *page_index, [0u8; 10]); + let page_key_atomic_op = generate_suffix_versionstamp_atomic_op(&page_key_template); + txn.atomic_op( + &page_key_atomic_op, + page_hash, + MutationType::SetVersionstampedKey, + ); + } + + let mut last_write_version_atomic_op_value = [0u8; 16 + 10 + 4]; + last_write_version_atomic_op_value[0..16].copy_from_slice(&ctx.idempotency_key); + last_write_version_atomic_op_value[26..30].copy_from_slice(&16u32.to_le_bytes()[..]); + txn.atomic_op( + &last_write_version_key, + &last_write_version_atomic_op_value, + MutationType::SetVersionstampedValue, + ); + let versionstamp_fut = txn.get_versionstamp(); + txn.commit().await.map_err(|e| FdbError::from(e))?; + let versionstamp = versionstamp_fut.await?; + Ok(CommitResult::Committed { + versionstamp: <[u8; 10]>::try_from(&versionstamp[..]).unwrap(), + }) + } +} diff --git a/mvstore/src/gc.rs b/mvstore/src/gc.rs new file mode 100644 index 0000000..8633f4d --- /dev/null +++ b/mvstore/src/gc.rs @@ -0,0 +1,402 @@ +use std::{ + sync::{Arc, atomic::{AtomicUsize, Ordering, AtomicU64}}, + time::{Duration, SystemTime}, +}; + +use anyhow::{Context, Result}; +use bloom::{BloomFilter, ASMS}; +use foundationdb::{future::FdbKeyValue, options::StreamingMode, RangeOption}; + +use crate::{ + lock::DistributedLock, + server::{ContentIndex, Server}, +}; + +pub static GC_SCAN_BATCH_SIZE: AtomicUsize = AtomicUsize::new(10000); +pub static GC_FRESH_PAGE_TTL_SECS: AtomicU64 = AtomicU64::new(3600); + +impl Server { + pub async fn truncate_versions( + self: Arc, + dry_run: bool, + ns_id: [u8; 10], + before_version: [u8; 10], + mut progress_callback: impl FnMut(Option), + ) -> Result<()> { + let scan_start = self.construct_page_key(ns_id, 0, [0u8; 10]); + let scan_end = self.construct_page_key(ns_id, std::u32::MAX, [0xffu8; 10]); + let mut scan_cursor = scan_start.clone(); + let mut lock = DistributedLock::new( + self.construct_nstask_key(ns_id, "truncate_versions"), + "truncate_versions".into(), + ); + + let me = self.clone(); + let locked = lock + .lock( + move || { + me.db + .create_trx() + .with_context(|| "transaction creation failed") + }, + Duration::from_secs(5), + ) + .await?; + if !locked { + anyhow::bail!("failed to acquire lock"); + } + + let mut total_count = 0u64; + + loop { + let scan_result = loop { + let txn = lock.create_txn_and_check_sync(&self.db).await?; + let range = match txn + .get_range( + &RangeOption { + limit: Some(GC_SCAN_BATCH_SIZE.load(Ordering::Relaxed)), + reverse: false, + mode: StreamingMode::WantAll, + ..RangeOption::from(scan_cursor.clone()..=scan_end.clone()) + }, + 0, + true, + ) + .await + { + Ok(x) => x, + Err(e) => { + txn.on_error(e).await?; + continue; + } + }; + break range; + }; + + // In the one-page case, we are sure we don't want to gc that + if scan_result.len() <= 1 { + break; + } + + let mut deletion_set: Vec> = vec![]; + + { + let scan_result = &scan_result[..]; + + // Yes, we do want to rescan the last item + scan_cursor = scan_result.last().unwrap().key().to_vec(); + + for (kv, next) in scan_result[..scan_result.len() - 1] + .iter() + .zip(scan_result[1..].iter()) + { + let this_version = extract_10_byte_suffix(kv.key()); + let next_version = extract_10_byte_suffix(next.key()); + + // Never truncate the latest version of a page in the specified version range + let is_latest_version_in_range = truncate_10_byte_suffix(kv.key()) + != truncate_10_byte_suffix(next.key()) + || next_version > before_version; + if !is_latest_version_in_range && this_version < before_version { + deletion_set.push(kv.key().to_vec()); + } + } + } + drop(scan_result); + + if !deletion_set.is_empty() { + if !dry_run { + loop { + let txn = lock.create_txn_and_check_sync(&self.db).await?; + for item in &deletion_set { + txn.clear(item); + } + match txn.commit().await { + Ok(_) => break, + Err(e) => { + e.on_error().await?; + } + } + } + } + tracing::info!( + ns = hex::encode(&ns_id), + count = deletion_set.len(), + dry = dry_run, + "truncated pages" + ); + total_count += deletion_set.len() as u64; + progress_callback(Some(total_count)); + } + } + + progress_callback(None); + Ok(()) + } + + async fn scan_range_simple( + self: &Arc, + lock: &DistributedLock, + scan_start: Vec, + scan_end: Vec, + mut cb: impl FnMut(&FdbKeyValue), + ) -> Result<()> { + let mut scan_cursor = scan_start.clone(); + + loop { + let scan_result = loop { + let txn = lock.create_txn_and_check_sync(&self.db).await?; + let range = match txn + .get_range( + &RangeOption { + limit: Some(GC_SCAN_BATCH_SIZE.load(Ordering::Relaxed)), + reverse: false, + mode: StreamingMode::WantAll, + ..RangeOption::from(scan_cursor.clone()..=scan_end.clone()) + }, + 0, + true, + ) + .await + { + Ok(x) => x, + Err(e) => { + txn.on_error(e).await?; + continue; + } + }; + break range; + }; + + if scan_result.len() == 0 { + break; + } + + let scan_result = &scan_result[..]; + scan_cursor = scan_result.last().unwrap().key().to_vec(); + scan_cursor.push(0x00); + + for kv in scan_result { + cb(kv); + } + } + Ok(()) + } + + async fn scan_page_index_simple( + self: &Arc, + lock: &DistributedLock, + ns_id: [u8; 10], + mut cb: impl FnMut([u8; 32]), + ) -> Result<()> { + let scan_start = self.construct_page_key(ns_id, 0, [0u8; 10]); + let scan_end = self.construct_page_key(ns_id, std::u32::MAX, [0xffu8; 10]); + self.scan_range_simple(lock, scan_start, scan_end, |kv| { + if let Ok(x) = <[u8; 32]>::try_from(kv.value()) { + cb(x); + } + }) + .await + } + + async fn scan_delta_referrer_simple( + self: &Arc, + lock: &DistributedLock, + ns_id: [u8; 10], + mut cb: impl FnMut([u8; 32]), + ) -> Result<()> { + let scan_start = self.construct_delta_referrer_key(ns_id, [0u8; 32]); + let scan_end = self.construct_delta_referrer_key(ns_id, [0xffu8; 32]); + self.scan_range_simple(lock, scan_start, scan_end, |kv| { + if let Ok(x) = <[u8; 32]>::try_from(kv.value()) { + cb(x); + } + }) + .await + } + + pub async fn delete_unreferenced_content( + self: Arc, + dry_run: bool, + ns_id: [u8; 10], + mut progress_callback: impl FnMut(String), + ) -> Result<()> { + let ns_id_hex = hex::encode(&ns_id); + let commit_token_key = self.construct_ns_commit_token_key(ns_id); + let mut lock = DistributedLock::new( + self.construct_nstask_key(ns_id, "delete_unreferenced_content"), + "delete_unreferenced_content".into(), + ); + let me = self.clone(); + let locked = lock + .lock( + move || { + me.db + .create_trx() + .with_context(|| "transaction creation failed") + }, + Duration::from_secs(5), + ) + .await?; + if !locked { + anyhow::bail!("failed to acquire lock"); + } + + // Step 1: Fetch read version RAW-RV. + let read_version = self.db.create_trx()?.get_read_version().await?; + + // Step 2: Collect inconsistent snapshot of hashes. + // 2a. Scan the page index incrementally and collect all hashes. + // 2b. Scan the delta referrer index incrementally and collect all hashes. + + // First, estimate the set size. + let mut page_ref_set_size = 0usize; + self.scan_page_index_simple(&lock, ns_id, |_| page_ref_set_size += 1) + .await?; + self.scan_delta_referrer_simple(&lock, ns_id, |_| page_ref_set_size += 1) + .await?; + + // Nothing to do + if page_ref_set_size == 0 { + progress_callback(format!("DONE\n")); + return Ok(()); + } + + let mut page_ref_set: BloomFilter = + BloomFilter::with_rate(0.01, page_ref_set_size.min(std::u32::MAX as usize) as u32); + tracing::info!(ns = %ns_id_hex, size = page_ref_set_size, num_bits = page_ref_set.num_bits(), num_hashes = page_ref_set.num_hashes(), "created bloom filter"); + + // Then, collect the hashes + self.scan_page_index_simple(&lock, ns_id, |hash| { + page_ref_set.insert(&hash); + }) + .await?; + self.scan_delta_referrer_simple(&lock, ns_id, |hash| { + page_ref_set.insert(&hash); + }) + .await?; + + // Step 3: scan the content index + { + let scan_start = self.construct_contentindex_key(ns_id, [0u8; 32]); + let scan_end = self.construct_contentindex_key(ns_id, [0xffu8; 32]); + let mut scan_cursor = scan_start.clone(); + let prefix_len = scan_start.len() - 32; + let mut count = 0usize; + loop { + let scan_result = loop { + let txn = lock.create_txn_and_check_sync(&self.db).await?; + let range = match txn + .get_range( + &RangeOption { + limit: Some(GC_SCAN_BATCH_SIZE.load(Ordering::Relaxed)), + reverse: false, + mode: StreamingMode::WantAll, + ..RangeOption::from(scan_cursor.clone()..=scan_end.clone()) + }, + 0, + true, + ) + .await + { + Ok(x) => x, + Err(e) => { + txn.on_error(e).await?; + continue; + } + }; + break range; + }; + + if scan_result.len() == 0 { + break; + } + + scan_cursor = scan_result.last().unwrap().key().to_vec(); + scan_cursor.push(0x00); + + let mut delete_queue: Vec<[u8; 32]> = vec![]; + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + + for kv in &scan_result { + let ci = match ContentIndex::decode(kv.value()) { + Ok(x) => x, + Err(_) => continue, + }; + + let hash = <[u8; 32]>::try_from(&kv.key()[prefix_len..]).unwrap(); + + // 3a. Filter out those hashes seen in step 2. + if page_ref_set.contains(&hash) { + continue; + } + + // 3b. Filter out those hashes added within a time duration. + // This is not necessary for correctness, but removing this may cause transactions to fail. + let their_secs = ci.time.as_secs(); + let our_secs = now.as_secs(); + if their_secs > our_secs || our_secs - their_secs < GC_FRESH_PAGE_TTL_SECS.load(Ordering::Relaxed) { + continue; + } + + // 3c. Filter out those CAM entries modified after RAW-RV. + let their_version = + i64::from_be_bytes(ci.versionstamp[0..8].try_into().unwrap()); + if their_version > read_version { + continue; + } + delete_queue.push(hash); + } + drop(scan_result); + + if delete_queue.len() == 0 { + continue; + } + + loop { + let txn = lock.create_txn_and_check_sync(&self.db).await?; + for hash in &delete_queue { + let ci_key = self.construct_contentindex_key(ns_id, *hash); + let content_key = self.construct_content_key(ns_id, *hash); + let delta_referrer_key = self.construct_delta_referrer_key(ns_id, *hash); + txn.clear(&ci_key); + txn.clear(&content_key); + txn.clear(&delta_referrer_key); + } + txn.clear(&commit_token_key); + + if !dry_run { + match txn.commit().await { + Ok(_) => {} + Err(e) => match e.on_error().await { + Ok(_) => continue, + Err(e) => { + tracing::error!(error = %e, "delete_unreferenced_content: failed to commit transaction"); + break; + } + }, + } + } + count += delete_queue.len(); + progress_callback(format!("{}\n", count)); + break; + } + } + } + + progress_callback(format!("DONE\n")); + Ok(()) + } +} + +fn truncate_10_byte_suffix(data: &[u8]) -> &[u8] { + assert!(data.len() >= 10); + &data[..data.len() - 10] +} + +fn extract_10_byte_suffix(data: &[u8]) -> [u8; 10] { + assert!(data.len() >= 10); + <[u8; 10]>::try_from(&data[data.len() - 10..]).unwrap() +} diff --git a/mvstore/src/lock.rs b/mvstore/src/lock.rs new file mode 100644 index 0000000..4cc8532 --- /dev/null +++ b/mvstore/src/lock.rs @@ -0,0 +1,231 @@ +use std::time::{Duration, SystemTime}; + +use anyhow::{Context, Result}; +use foundationdb::{Database, FdbError, Transaction}; +use rand::{thread_rng, RngCore}; +use serde::{Deserialize, Serialize}; +use tokio::sync::oneshot; + +#[derive(Serialize, Deserialize)] +pub struct LockInfo<'a> { + expiry: u64, + #[serde(with = "serde_bytes")] + owner_id: &'a [u8], +} + +pub struct DistributedLock { + key: Vec, + description: String, + owner_id: [u8; 16], + + bg: Option<(oneshot::Sender<()>, oneshot::Receiver<()>)>, +} + +impl DistributedLock { + pub fn new(key: Vec, description: String) -> Self { + let mut owner_id = [0u8; 16]; + thread_rng().fill_bytes(&mut owner_id); + Self { + key, + description, + owner_id, + bg: None, + } + } + + async fn raw_check_sync(key: &[u8], owner_id: [u8; 16], txn: &Transaction) -> Result { + let data = txn.get(key, false).await?; + if let Some(x) = data { + let info: LockInfo = match rmp_serde::from_slice(&x) { + Ok(info) => info, + Err(_) => { + return Ok(false); + } + }; + Ok(info.owner_id == owner_id) + } else { + Ok(false) + } + } + + pub async fn check_sync(&self, txn: &Transaction) -> Result<()> { + if self.bg.is_none() { + anyhow::bail!("lock is not acquired"); + } + + if !Self::raw_check_sync(&self.key, self.owner_id, txn).await? { + anyhow::bail!("lock no longer hold by us"); + } + + Ok(()) + } + + pub async fn create_txn_and_check_sync(&self, db: &Database) -> Result { + let mut txn = db.create_trx()?; + loop { + match self.check_sync(&txn).await { + Ok(()) => return Ok(txn), + Err(e) => match e.downcast::() { + Ok(x) => { + txn = txn.on_error(x).await?; + } + Err(e) => return Err(e), + }, + } + } + } + + pub async fn lock( + &mut self, + mut txn_gen: impl FnMut() -> Result + Send + Sync + 'static, + ttl: Duration, + ) -> Result { + if self.bg.is_some() { + anyhow::bail!("already locked"); + } + let mut txn = txn_gen()?; + loop { + let data = txn.get(&self.key, false).await?; + let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?; + if let Some(x) = data { + let info: LockInfo = rmp_serde::from_slice(&x) + .with_context(|| "failed to deserialize DistributedLock")?; + if now.as_secs() < info.expiry { + return Ok(false); + } + } + let info = LockInfo { + expiry: (now + ttl).as_secs(), + owner_id: &self.owner_id, + }; + let info = rmp_serde::to_vec_named(&info) + .with_context(|| "failed to serialize DistributedLock")?; + txn.set(&self.key, &info); + match txn.commit().await { + Ok(_) => break, + Err(e) => { + txn = e.on_error().await?; + } + } + } + let (close_req_tx, close_req_rx) = oneshot::channel(); + let (close_ack_tx, close_ack_rx) = oneshot::channel(); + self.bg = Some((close_req_tx, close_ack_rx)); + let key = self.key.clone(); + let description = self.description.clone(); + let owner_id = self.owner_id.clone(); + tokio::spawn(async move { + let _close_ack_tx = close_ack_tx; + tokio::select! { + _ = close_req_rx => { + + } + _ = renew_loop(&mut txn_gen, &key, &description, owner_id, ttl) => { + tracing::error!(desc = description, "lock dropped"); + } + } + let mut unlock_ok = false; + if let Ok(txn) = txn_gen() { + if let Ok(true) = Self::raw_check_sync(&key, owner_id, &txn).await { + txn.clear(&key); + if txn.commit().await.is_ok() { + unlock_ok = true; + tracing::debug!(desc = description, "unlocked"); + } + } + } + + if !unlock_ok { + tracing::warn!(desc = description, "failed to unlock"); + } + }); + Ok(true) + } + + #[allow(dead_code)] + pub async fn unlock(&mut self) { + let (close_req_tx, close_ack_rx) = self.bg.take().expect("unlock without lock"); + drop(close_req_tx); + let _ = close_ack_rx.await; + } +} + +async fn renew_loop( + txn_gen: &mut (impl FnMut() -> Result + Send + Sync), + key: &[u8], + description: &str, + owner_id: [u8; 16], + ttl: Duration, +) { + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + let mut txn = match txn_gen() { + Ok(txn) => txn, + Err(e) => { + tracing::error!(error = %e, desc = description, "renew_loop failed to create txn"); + return; + } + }; + loop { + let data = match txn.get(key, false).await { + Ok(data) => data, + Err(e) => { + tracing::error!(error = %e, desc = description, "renew_loop read failed"); + match txn.on_error(e).await { + Ok(recovered) => { + txn = recovered; + continue; + } + Err(_) => { + return; + } + } + } + }; + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + if let Some(x) = data { + let mut info: LockInfo = match rmp_serde::from_slice(&x) { + Ok(info) => info, + Err(e) => { + tracing::error!(error = %e, desc = description, "renew_loop failed to deserialize lock info"); + return; + } + }; + if info.owner_id != owner_id { + tracing::error!( + desc = description, + new_owner_id = hex::encode(&info.owner_id), + "lock preempted" + ); + return; + } + info.expiry = (now + ttl).as_secs(); + let info = rmp_serde::to_vec_named(&info).unwrap(); + txn.set(key, &info); + match txn.commit().await { + Ok(_) => { + tracing::debug!(desc = description, "renewed distributed lock"); + break; + } + Err(e) => { + tracing::error!(error = %e, desc = description, "renew_loop commit failed"); + match e.on_error().await { + Ok(recovered) => { + txn = recovered; + continue; + } + Err(_) => { + return; + } + } + } + } + } else { + tracing::error!(desc = description, "lock was lost"); + return; + } + } + } +} diff --git a/mvstore/src/main.rs b/mvstore/src/main.rs index ae44dd9..1d4c49a 100644 --- a/mvstore/src/main.rs +++ b/mvstore/src/main.rs @@ -1,6 +1,9 @@ +mod commit; +mod gc; +mod lock; mod server; -use std::net::SocketAddr; +use std::{net::SocketAddr, sync::atomic::Ordering}; use anyhow::{Context, Result}; use foundationdb::{api::FdbApiBuilder, options::NetworkOption}; @@ -80,9 +83,27 @@ struct Opt { /// Metadata prefix. This value is tuple-encoded as a string. #[structopt(long, env = "MVSTORE_METADATA_PREFIX")] metadata_prefix: String, + + /// ADVANCED. Configure the GC scan batch size. + #[structopt(long, env = "MVSTORE_KNOB_GC_SCAN_BATCH_SIZE")] + knob_gc_scan_batch_size: Option, + + /// ADVANCED. Configure the max time-to-live for unreferenced fresh pages. This will also be the max time an SQLite transaction can be active. + #[structopt(long, env = "MVSTORE_KNOB_GC_FRESH_PAGE_TTL_SECS")] + knob_gc_fresh_page_ttl_secs: Option, } async fn async_main(opt: Opt) -> Result<()> { + if let Some(x) = opt.knob_gc_scan_batch_size { + gc::GC_SCAN_BATCH_SIZE.store(x, Ordering::Relaxed); + tracing::info!(value = x, "configured gc scan batch size"); + } + + if let Some(x) = opt.knob_gc_fresh_page_ttl_secs { + gc::GC_FRESH_PAGE_TTL_SECS.store(x, Ordering::Relaxed); + tracing::info!(value = x, "configured gc fresh page ttl"); + } + let server = Server::open(ServerConfig { cluster: opt.cluster.clone(), raw_data_prefix: opt.raw_data_prefix.clone(), @@ -90,6 +111,8 @@ async fn async_main(opt: Opt) -> Result<()> { }) .with_context(|| "failed to initialize server")?; + server.clone().spawn_background_tasks(); + let data_plane_server = { let server = server.clone(); hyper::Server::bind(&opt.data_plane).serve(make_service_fn(move |_conn| { diff --git a/mvstore/src/server.rs b/mvstore/src/server.rs index 455d9c9..925cd74 100644 --- a/mvstore/src/server.rs +++ b/mvstore/src/server.rs @@ -10,20 +10,30 @@ use futures::StreamExt; use hyper::{body::HttpBody, Body, Request, Response}; use moka::future::Cache; use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, convert::Infallible, sync::Arc, time::Duration}; -use tokio::io::AsyncRead; +use std::{ + collections::HashMap, + convert::Infallible, + sync::Arc, + time::{Duration, SystemTime}, +}; +use tokio::{io::AsyncRead, sync::oneshot}; use tokio_util::{ codec::{Decoder, FramedRead, LengthDelimitedCodec}, io::StreamReader, }; +use crate::{ + commit::{CommitContext, CommitResult}, + lock::DistributedLock, +}; + const MAX_MESSAGE_SIZE: usize = 10 * 1024; // 10 KiB const COMMIT_MESSAGE_SIZE: usize = 9 * 1024 * 1024; // 9 MiB const MAX_PAGE_SIZE: usize = 8192; const COMMITTED_VERSION_HDR_NAME: &str = "x-committed-version"; pub struct Server { - db: Database, + pub db: Database, raw_data_prefix: Vec, metadata_prefix: String, @@ -102,6 +112,21 @@ pub struct AdminDeleteNamespaceRequest { pub key: String, } +#[derive(Deserialize)] +pub struct AdminTruncateNamespaceRequest { + pub key: String, + pub before_version: String, + #[serde(default)] + pub apply: bool, +} + +#[derive(Deserialize)] +pub struct AdminDeleteUnreferencedContentInNamespaceRequest { + pub key: String, + #[serde(default)] + pub apply: bool, +} + pub struct Page { pub version: String, pub data: FdbSlice, @@ -263,6 +288,152 @@ impl Server { } } } + + "/api/truncate_namespace" => { + let body = hyper::body::to_bytes(req.body_mut()).await?; + let body: AdminTruncateNamespaceRequest = serde_json::from_slice(&body)?; + let nskey_key = self.construct_nskey_key(&body.key); + + let txn = self.db.create_trx()?; + let ns_id = match txn.get(&nskey_key, false).await? { + Some(v) => v, + None => { + return Ok(Response::builder() + .status(400) + .body(Body::from("this key does not exist"))?); + } + }; + drop(txn); + + let ns_id = + <[u8; 10]>::try_from(&ns_id[..]).with_context(|| "cannot parse ns_id")?; + let before_version = decode_version(&body.before_version)?; + let (mut res_sender, res_body) = Body::channel(); + let (progress_ch_tx, mut progress_ch_rx) = + tokio::sync::mpsc::channel::>(1000); + let (stop_tx, stop_rx) = oneshot::channel::<()>(); + let apply = body.apply; + tokio::spawn(async move { + let work = async move { + if let Err(e) = self + .truncate_versions(!body.apply, ns_id, before_version, |progress| { + let _ = progress_ch_tx.try_send(progress); + }) + .await + { + tracing::error!(ns = hex::encode(&ns_id), before_version = hex::encode(&before_version), apply = apply, error = %e, "truncate_namespace failed"); + } + }; + tokio::select! { + _ = work => { + + } + _ = stop_rx => { + tracing::error!(ns = hex::encode(&ns_id), before_version = hex::encode(&before_version), apply = apply, "truncate_namespace interrupted"); + } + } + }); + tokio::spawn(async move { + let _stop_tx = stop_tx; + loop { + let mut exit = false; + let text = match progress_ch_rx.recv().await { + Some(progress) => match progress { + Some(x) => { + format!("{}\n", x) + } + None => { + exit = true; + "DONE\n".to_string() + } + }, + None => { + exit = true; + "ERROR\n".to_string() + } + }; + if res_sender.send_data(Bytes::from(text)).await.is_err() { + break; + } + if exit { + break; + } + } + }); + res = Response::builder().status(200).body(res_body)?; + } + + "/api/delete_unreferenced_content" => { + let body = hyper::body::to_bytes(req.body_mut()).await?; + let body: AdminDeleteUnreferencedContentInNamespaceRequest = + serde_json::from_slice(&body)?; + let nskey_key = self.construct_nskey_key(&body.key); + + let txn = self.db.create_trx()?; + let ns_id = match txn.get(&nskey_key, false).await? { + Some(v) => v, + None => { + return Ok(Response::builder() + .status(400) + .body(Body::from("this key does not exist"))?); + } + }; + drop(txn); + + let ns_id = + <[u8; 10]>::try_from(&ns_id[..]).with_context(|| "cannot parse ns_id")?; + let (mut res_sender, res_body) = Body::channel(); + let (progress_ch_tx, mut progress_ch_rx) = + tokio::sync::mpsc::channel::(1000); + let (stop_tx, stop_rx) = oneshot::channel::<()>(); + let apply = body.apply; + tokio::spawn(async move { + let work = async move { + if let Err(e) = self + .delete_unreferenced_content(!body.apply, ns_id, |progress| { + let _ = progress_ch_tx.try_send(progress); + }) + .await + { + tracing::error!(ns = hex::encode(&ns_id), apply = apply, error = %e, "delete_unreferenced_content failed"); + } + }; + tokio::select! { + _ = work => { + + } + _ = stop_rx => { + tracing::error!(ns = hex::encode(&ns_id), apply = apply, "delete_unreferenced_content interrupted"); + } + } + }); + tokio::spawn(async move { + let _stop_tx = stop_tx; + loop { + let mut exit = false; + let text = match progress_ch_rx.recv().await { + Some(progress) => { + if progress == "DONE\n" { + exit = true; + } + progress + } + None => { + exit = true; + "ERROR\n".to_string() + } + }; + if res_sender.send_data(Bytes::from(text)).await.is_err() { + break; + } + if exit { + break; + } + } + }); + res = Response::builder().status(200).body(res_body)?; + } + _ => { res = Response::builder().status(404).body(Body::empty())?; } @@ -283,6 +454,71 @@ impl Server { } } + pub fn spawn_background_tasks(self: Arc) { + tokio::spawn(self.clone().globaltask_timekeeper()); + } + + async fn globaltask_timekeeper(self: Arc) { + let mut lock = DistributedLock::new( + self.construct_globaltask_key("timekeeper"), + "timekeeper".into(), + ); + 'outer: loop { + loop { + let me = self.clone(); + match lock + .lock( + move || me.db.create_trx().with_context(|| "failed to create txn"), + Duration::from_secs(5), + ) + .await + { + Ok(true) => break, + Ok(false) => {} + Err(e) => { + tracing::error!(error = %e, "timekeeper lock error"); + } + } + tokio::time::sleep(Duration::from_secs(5)).await; + } + + tracing::info!("timekeeper started"); + + loop { + loop { + let txn = match lock.create_txn_and_check_sync(&self.db).await { + Ok(x) => x, + Err(e) => { + tracing::error!(error = %e, "timekeeper create_txn_and_check_sync error"); + lock.unlock().await; + continue 'outer; + } + }; + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + let key = self.construct_time2version_key(now); + let value = [0u8; 14]; + txn.atomic_op(&key, &value, MutationType::SetVersionstampedValue); + match txn.commit().await { + Ok(_) => break, + Err(e) => match e.on_error().await { + Ok(_) => {} + Err(e) => { + tracing::error!(error = %e, "timekeeper commit error"); + lock.unlock().await; + continue 'outer; + } + }, + } + } + + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + } + async fn lookup_nskey(&self, nskey: &str) -> Result> { enum GetError { NotFound, @@ -293,7 +529,7 @@ impl Server { .try_get_with(nskey.to_string(), async { let txn = self.db.create_trx(); match txn { - Ok(txn) => match txn.get(&self.construct_nskey_key(nskey), true).await { + Ok(txn) => match txn.get(&self.construct_nskey_key(nskey), false).await { Ok(Some(x)) => <[u8; 10]>::try_from(&x[..]) .with_context(|| "invalid namespace id") .map_err(GetError::Other), @@ -392,7 +628,7 @@ impl Server { "/stat" => { let txn = self.db.create_trx()?; let buf = self.get_read_version_as_versionstamp(&txn).await?; - let nsmd = txn.get(&self.construct_nsmd_key(ns_id), true).await?; + let nsmd = txn.get(&self.construct_nsmd_key(ns_id), false).await?; let nsmd = nsmd .as_ref() .map(|x| std::str::from_utf8(&x[..]).unwrap_or_default()) @@ -492,7 +728,7 @@ impl Server { } }; let content_key = self.construct_content_key(ns_id, hash); - match txn.get(&content_key, true).await { + match txn.get(&content_key, false).await { Ok(Some(x)) => Some(Page { version: read_req.version.to_string(), data: x, @@ -562,6 +798,9 @@ impl Server { res = Response::builder().body(res_body)?; let me = self.clone(); let txn = self.db.create_trx()?; + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); tokio::spawn(async move { loop { let message = match body.next().await { @@ -625,7 +864,7 @@ impl Server { // This is not only an optimization. Without doing this check it is possible to form // loops in delta page construction. - match txn.get(&content_key, true).await { + match txn.get(&content_key, false).await { Ok(x) => { if x.is_some() { early_completion = true; @@ -650,10 +889,19 @@ impl Server { .construct_delta_referrer_key( ns_id, *hash.as_bytes(), - delta_base_hash, ); txn.set(&content_key, &x); - txn.set(&delta_referrer_key, b""); + txn.set(&delta_referrer_key, &delta_base_hash); + let base_content_index_key = self + .construct_contentindex_key(ns_id, delta_base_hash); + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + txn.atomic_op( + &base_content_index_key, + &ContentIndex::generate_mutation_payload(now), + MutationType::SetVersionstampedValue, + ); early_completion = true; } } @@ -674,6 +922,15 @@ impl Server { txn.set(&content_key, &Page::compress_zstd(write_req.data)); } + // Set content index + let content_index_key = + self.construct_contentindex_key(ns_id, *hash.as_bytes()); + txn.atomic_op( + &content_index_key, + &ContentIndex::generate_mutation_payload(now), + MutationType::SetVersionstampedValue, + ); + let payload = match rmp_serde::to_vec_named(&WriteResponse { hash: hash.as_bytes(), }) { @@ -714,94 +971,49 @@ impl Server { let idempotency_key = <[u8; 16]>::try_from(commit_init.idempotency_key) .with_context(|| "invalid idempotency key")?; - // Ensure that we finish the commit as quickly as possible - there's the five-second limit here. - let mut txn = self.db.create_trx()?; - loop { - // Check version - let last_write_version_key = self.construct_last_write_version_key(ns_id); - let last_write_version = txn.get(&last_write_version_key, false).await?; - match &last_write_version { - Some(x) if x.len() != 16 + 10 => { - anyhow::bail!("invalid last write version"); - } - Some(x) => { - let actual_idempotency_key = <[u8; 16]>::try_from(&x[0..16]).unwrap(); - let actual_last_write_version = - <[u8; 10]>::try_from(&x[16..26]).unwrap(); - - if actual_idempotency_key == idempotency_key { - return Ok(Response::builder() - .header( - COMMITTED_VERSION_HDR_NAME, - hex::encode(&actual_last_write_version), - ) - .body(Body::empty())?); - } + // Decode data + let mut index_writes: Vec<(u32, [u8; 32])> = Vec::new(); - if client_assumed_version < actual_last_write_version { - return Ok(Response::builder().status(409).body(Body::empty())?); - } - } - None => {} + for _ in 0..commit_init.num_pages { + let message = match reader.next().await { + Some(x) => x.with_context(|| "error reading commit request")?, + None => anyhow::bail!("early end of commit stream"), + }; + let commit_req: CommitRequest = rmp_serde::from_slice(&message) + .with_context(|| "error deserializing commit request")?; + if commit_req.hash.len() != 32 { + return Ok(Response::builder() + .status(400) + .body(Body::from("invalid hash"))?); } + index_writes.push(( + commit_req.page_index, + <[u8; 32]>::try_from(commit_req.hash).unwrap(), + )); + } - // Write metadata - if let Some(md) = commit_init.metadata { - let metadata_key = self.construct_nsmd_key(ns_id); - txn.set(&metadata_key, md.as_bytes()); + match self + .commit(CommitContext { + ns_id, + client_assumed_version, + idempotency_key, + metadata: commit_init.metadata, + index_writes: &index_writes, + }) + .await? + { + CommitResult::BadPageReference => { + res = Response::builder() + .status(400) + .body(Body::from("bad page reference"))?; } - for _ in 0..commit_init.num_pages { - let message = match reader.next().await { - Some(x) => x.with_context(|| "error reading commit request")?, - None => anyhow::bail!("early end of commit stream"), - }; - let commit_req: CommitRequest = rmp_serde::from_slice(&message) - .with_context(|| "error deserializing commit request")?; - if commit_req.hash.len() != 32 { - return Ok(Response::builder() - .status(400) - .body(Body::from("invalid hash"))?); - } - - let page_key_template = - self.construct_page_key(ns_id, commit_req.page_index, [0u8; 10]); - let page_key_atomic_op = - generate_suffix_versionstamp_atomic_op(&page_key_template); - txn.atomic_op( - &page_key_atomic_op, - commit_req.hash, - MutationType::SetVersionstampedKey, - ); + CommitResult::Committed { versionstamp } => { + res = Response::builder() + .header(COMMITTED_VERSION_HDR_NAME, hex::encode(&versionstamp)) + .body(Body::empty())?; } - let mut last_write_version_atomic_op_value = [0u8; 16 + 10 + 4]; - last_write_version_atomic_op_value[0..16].copy_from_slice(&idempotency_key); - last_write_version_atomic_op_value[26..30] - .copy_from_slice(&16u32.to_le_bytes()[..]); - txn.atomic_op( - &last_write_version_key, - &last_write_version_atomic_op_value, - MutationType::SetVersionstampedValue, - ); - let versionstamp_fut = txn.get_versionstamp(); - match txn.commit().await { - Ok(_) => { - let versionstamp = versionstamp_fut.await?; - let versionstamp = hex::encode(&versionstamp); - res = Response::builder() - .header(COMMITTED_VERSION_HDR_NAME, versionstamp) - .body(Body::empty())?; - break; - } - Err(e) => { - txn = match e.on_error().await { - Ok(x) => x, - Err(e) => { - return Ok(Response::builder() - .status(400) - .body(Body::from(format!("{}", e)))?) - } - }; - } + CommitResult::Conflict => { + res = Response::builder().status(409).body(Body::empty())?; } } } @@ -869,10 +1081,10 @@ impl Server { txn: &Transaction, ns_id: [u8; 10], page_index: u32, - block_version_hex: &str, + page_version_hex: &str, ) -> Result> { let (version, hash) = match self - .read_page_hash(txn, ns_id, page_index, block_version_hex) + .read_page_hash(txn, ns_id, page_index, page_version_hex) .await? { Some(x) => x, @@ -880,7 +1092,7 @@ impl Server { }; let content_key = self.construct_content_key(ns_id, hash); let content = txn - .get(&content_key, true) + .get(&content_key, false) .await? .with_context(|| "cannot find content for the provided hash")?; Ok(Some(Page { @@ -889,18 +1101,43 @@ impl Server { })) } - fn construct_nsmd_key(&self, ns_id: [u8; 10]) -> Vec { + pub fn construct_nsmd_key(&self, ns_id: [u8; 10]) -> Vec { let mut key = pack(&(self.metadata_prefix.as_str(), "nsmd")); key.push(0x32); key.extend_from_slice(&ns_id); key } - fn construct_nskey_key(&self, ns_key: &str) -> Vec { + pub fn construct_nstask_key(&self, ns_id: [u8; 10], task: &str) -> Vec { + let mut key = pack(&(self.metadata_prefix.as_str(), "nstask")); + key.push(0x32); + key.extend_from_slice(&ns_id); + key.extend_from_slice(&pack(&(task,))); + key + } + + pub fn construct_globaltask_key(&self, task: &str) -> Vec { + let key = pack(&(self.metadata_prefix.as_str(), "globaltask", task)); + key + } + + pub fn construct_time2version_key(&self, time_secs: u64) -> Vec { + let key = pack(&(self.metadata_prefix.as_str(), "time2version", time_secs)); + key + } + + pub fn construct_nskey_key(&self, ns_key: &str) -> Vec { pack(&(self.metadata_prefix.as_str(), "nskey", ns_key)) } - fn construct_last_write_version_key(&self, ns_id: [u8; 10]) -> Vec { + pub fn construct_ns_commit_token_key(&self, ns_id: [u8; 10]) -> Vec { + let mut key = pack(&(self.metadata_prefix.as_str(), "ns_commit_token")); + key.push(0x32); + key.extend_from_slice(&ns_id); + key + } + + pub fn construct_last_write_version_key(&self, ns_id: [u8; 10]) -> Vec { let mut buf: Vec = Vec::with_capacity(self.raw_data_prefix.len() + ns_id.len() + 1); buf.extend_from_slice(&self.raw_data_prefix); buf.extend_from_slice(&ns_id); @@ -908,35 +1145,35 @@ impl Server { buf } - fn construct_ns_data_prefix(&self, ns_id: [u8; 10]) -> Vec { + pub fn construct_ns_data_prefix(&self, ns_id: [u8; 10]) -> Vec { let mut buf: Vec = Vec::with_capacity(self.raw_data_prefix.len() + ns_id.len()); buf.extend_from_slice(&self.raw_data_prefix); buf.extend_from_slice(&ns_id); buf } - fn construct_page_key( + pub fn construct_page_key( &self, ns_id: [u8; 10], page_index: u32, - block_version: [u8; 10], + page_version: [u8; 10], ) -> Vec { let mut buf: Vec = Vec::with_capacity( self.raw_data_prefix.len() + ns_id.len() + 1 + std::mem::size_of::() - + block_version.len(), + + page_version.len(), ); buf.extend_from_slice(&self.raw_data_prefix); buf.extend_from_slice(&ns_id); buf.push(b'p'); buf.extend_from_slice(&page_index.to_be_bytes()); - buf.extend_from_slice(&block_version); + buf.extend_from_slice(&page_version); buf } - fn construct_content_key(&self, ns_id: [u8; 10], hash: [u8; 32]) -> Vec { + pub fn construct_content_key(&self, ns_id: [u8; 10], hash: [u8; 32]) -> Vec { let mut buf: Vec = Vec::with_capacity(self.raw_data_prefix.len() + ns_id.len() + 1 + hash.len()); buf.extend_from_slice(&self.raw_data_prefix); @@ -946,19 +1183,27 @@ impl Server { buf } - fn construct_delta_referrer_key( + pub fn construct_contentindex_key(&self, ns_id: [u8; 10], hash: [u8; 32]) -> Vec { + let mut buf: Vec = + Vec::with_capacity(self.raw_data_prefix.len() + ns_id.len() + 1 + hash.len()); + buf.extend_from_slice(&self.raw_data_prefix); + buf.extend_from_slice(&ns_id); + buf.push(b'd'); + buf.extend_from_slice(&hash); + buf + } + + pub fn construct_delta_referrer_key( &self, ns_id: [u8; 10], from_hash: [u8; 32], - to_hash: [u8; 32], ) -> Vec { let mut buf: Vec = Vec::with_capacity( - self.raw_data_prefix.len() + ns_id.len() + 1 + to_hash.len() + from_hash.len(), + self.raw_data_prefix.len() + ns_id.len() + 1 + from_hash.len() ); buf.extend_from_slice(&self.raw_data_prefix); buf.extend_from_slice(&ns_id); buf.push(b'r'); - buf.extend_from_slice(&to_hash); buf.extend_from_slice(&from_hash); buf } @@ -981,7 +1226,7 @@ impl Server { }; let (undecoded_base, delta_base_hash) = { let base_page_key = self.construct_content_key(ns_id, delta_base_hash); - let base = match txn.get(&base_page_key, true).await? { + let base = match txn.get(&base_page_key, false).await? { Some(x) => x, None => return Ok(None), }; @@ -995,7 +1240,7 @@ impl Server { } let flattened_base_hash = <[u8; 32]>::try_from(&base[1..33]).unwrap(); let flattened_base_key = self.construct_content_key(ns_id, flattened_base_hash); - let flattened_base = match txn.get(&flattened_base_key, true).await? { + let flattened_base = match txn.get(&flattened_base_key, false).await? { Some(x) => x, None => return Ok(None), }; @@ -1092,7 +1337,7 @@ impl Server { } let base_page_hash = <[u8; 32]>::try_from(&data[1..33]).unwrap(); let base_page_key = self.construct_content_key(ns_id, base_page_hash); - let base_page = match txn.get(&base_page_key, true).await? { + let base_page = match txn.get(&base_page_key, false).await? { Some(x) => x, None => anyhow::bail!("base page not found"), }; @@ -1128,7 +1373,7 @@ fn new_body_reader( reader } -fn generate_suffix_versionstamp_atomic_op(template: &[u8]) -> Vec { +pub fn generate_suffix_versionstamp_atomic_op(template: &[u8]) -> Vec { let mut out: Vec = Vec::with_capacity(template.len() + 4); out.extend_from_slice(template); out.extend_from_slice(&(template.len() as u32 - 10).to_le_bytes()); @@ -1147,3 +1392,26 @@ fn decode_version(version: &str) -> Result<[u8; 10]> { hex::decode_to_slice(version, &mut bytes).with_context(|| "cannot decode version")?; Ok(bytes) } + +pub struct ContentIndex { + pub time: Duration, + pub versionstamp: [u8; 10], +} + +impl ContentIndex { + pub fn generate_mutation_payload(now: Duration) -> [u8; 22] { + let mut buf = [0u8; 22]; + buf[0..8].copy_from_slice(&now.as_secs().to_be_bytes()); + buf[18..22].copy_from_slice(&8u32.to_le_bytes()[..]); + buf + } + + pub fn decode(x: &[u8]) -> Result { + if x.len() != 18 { + return Err(anyhow::anyhow!("invalid content index")); + } + let time = Duration::from_secs(u64::from_be_bytes(x[0..8].try_into().unwrap())); + let versionstamp = x[8..18].try_into().unwrap(); + Ok(Self { time, versionstamp }) + } +}