Skip to content

Commit

Permalink
Flatten delta pages
Browse files Browse the repository at this point in the history
  • Loading branch information
losfair committed Aug 2, 2022
1 parent 0f94b70 commit e3593d4
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 40 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion mvstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,3 @@ serde_json = "1"
serde_bytes = "0.11"
zstd = "0.11.2"
moka = { version = "0.9.2", features = ["future"] }
async-recursion = "1.0.0"
96 changes: 58 additions & 38 deletions mvstore/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::{Context, Result};
use async_recursion::async_recursion;
use bytes::{Bytes, BytesMut};
use foundationdb::{
future::FdbSlice,
Expand Down Expand Up @@ -111,7 +110,6 @@ pub struct Page {
#[derive(Default)]
pub struct DecodedPage {
pub data: Vec<u8>,
pub delta_depth: u32,
}

const PAGE_ENCODING_NONE: u8 = 0;
Expand Down Expand Up @@ -650,7 +648,6 @@ impl Server {
if let Some(x) = x {
txn.set(&content_key, &x);
early_completion = true;
tracing::debug!(ns = ns_id_hex, "delta encoded");
}
}
Err(e) => {
Expand Down Expand Up @@ -950,25 +947,45 @@ impl Server {
this_page: &[u8],
) -> Result<Option<Vec<u8>>> {
let version_hex = hex::encode(&self.get_read_version_as_versionstamp(&txn).await?);
let delta_base_hash = self
.read_page_hash(&txn, ns_id, base_page_index, &version_hex)
.await?;

let (_, delta_base_hash) = match delta_base_hash {
let (_, delta_base_hash) = match self
.read_page_hash(&txn, ns_id, base_page_index, &version_hex)
.await?
{
Some(x) => x,
None => return Ok(None),
};

let base_page_key = self.construct_content_key(ns_id, delta_base_hash);
let base_page = match txn.get(&base_page_key, true).await? {
Some(x) => x,
None => return Ok(None),
let (undecoded_base, delta_base_hash) = {
let base_page_key = self.construct_content_key(ns_id, delta_base_hash);
let base = match txn.get(&base_page_key, true).await? {
Some(x) => x,
None => return Ok(None),
};
if base.len() == 0 {
return Ok(None);
}
if base[0] == PAGE_ENCODING_DELTA {
// Flatten the link
if base.len() < 33 {
return Ok(None);
}
let flattened_base_hash = <[u8; 32]>::try_from(&base[1..33]).unwrap();
let flattened_base_key = self.construct_content_key(ns_id, flattened_base_hash);
let flattened_base = match txn.get(&flattened_base_key, true).await? {
Some(x) => x,
None => return Ok(None),
};
tracing::debug!(
from = hex::encode(&delta_base_hash),
to = hex::encode(&flattened_base_hash),
"flattened delta page"
);
(flattened_base, flattened_base_hash)
} else {
(base, delta_base_hash)
}
};
let base_page = self.decode_page(txn, ns_id, &base_page).await?;
if base_page.delta_depth >= 4 {
return Ok(None);
}

let base_page = self.decode_page_no_delta(&undecoded_base)?;
if base_page.data.len() != this_page.len() || this_page.is_empty() {
return Ok(None);
}
Expand Down Expand Up @@ -1002,19 +1019,12 @@ impl Server {
tracing::debug!(
ns = hex::encode(&ns_id),
base = hex::encode(&delta_base_hash),
depth = base_page.delta_depth + 1,
"delta encoded"
);
Ok(Some(output))
}

#[async_recursion]
async fn decode_page(
&self,
txn: &Transaction,
ns_id: [u8; 10],
data: &[u8],
) -> Result<DecodedPage> {
fn decode_page_no_delta(&self, data: &[u8]) -> Result<DecodedPage> {
if data.len() == 0 {
return Ok(DecodedPage::default());
}
Expand All @@ -1025,18 +1035,33 @@ impl Server {
// not compressed
Ok(DecodedPage {
data: data[1..].to_vec(),
delta_depth: 0,
})
}
PAGE_ENCODING_ZSTD => {
// zstd
let data = zstd::bulk::decompress(&data[1..], MAX_PAGE_SIZE)
.with_context(|| "zstd decompress failed")?;
Ok(DecodedPage {
data,
delta_depth: 0,
})
Ok(DecodedPage { data })
}
_ => Err(anyhow::anyhow!(
"decode_page_no_delta: unknown page encoding: {}",
encode_type
)),
}
}

async fn decode_page(
&self,
txn: &Transaction,
ns_id: [u8; 10],
data: &[u8],
) -> Result<DecodedPage> {
if data.len() == 0 {
return Ok(DecodedPage::default());
}

let encode_type = data[0];
match encode_type {
PAGE_ENCODING_DELTA => {
if data.len() < 33 {
anyhow::bail!("invalid delta encoding");
Expand All @@ -1047,7 +1072,7 @@ impl Server {
Some(x) => x,
None => anyhow::bail!("base page not found"),
};
let base_page = self.decode_page(txn, ns_id, &base_page).await?;
let base_page = self.decode_page_no_delta(&base_page)?;
let mut delta_data = zstd::bulk::decompress(&data[33..], MAX_PAGE_SIZE)?;
if delta_data.len() != base_page.data.len() {
anyhow::bail!("delta and base have different sizes");
Expand All @@ -1057,14 +1082,9 @@ impl Server {
*b ^= base_page.data[i];
}

Ok(DecodedPage {
data: delta_data,
delta_depth: base_page.delta_depth + 1,
})
}
_ => {
anyhow::bail!("unsupported page encoding: {}", encode_type);
Ok(DecodedPage { data: delta_data })
}
_ => self.decode_page_no_delta(data),
}
}
}
Expand Down

0 comments on commit e3593d4

Please sign in to comment.