Skip to content

Commit

Permalink
Merge pull request #8 from losfair/fix/delta-direct
Browse files Browse the repository at this point in the history
Delta encoding optimizations
  • Loading branch information
losfair authored Aug 2, 2022
2 parents 0f94b70 + b5f861b commit a84f7d3
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 43 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion mvstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,3 @@ serde_json = "1"
serde_bytes = "0.11"
zstd = "0.11.2"
moka = { version = "0.9.2", features = ["future"] }
async-recursion = "1.0.0"
126 changes: 85 additions & 41 deletions mvstore/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::{Context, Result};
use async_recursion::async_recursion;
use bytes::{Bytes, BytesMut};
use foundationdb::{
future::FdbSlice,
Expand Down Expand Up @@ -111,7 +110,6 @@ pub struct Page {
#[derive(Default)]
pub struct DecodedPage {
pub data: Vec<u8>,
pub delta_depth: u32,
}

const PAGE_ENCODING_NONE: u8 = 0;
Expand Down Expand Up @@ -647,10 +645,16 @@ impl Server {
.await
{
Ok(x) => {
if let Some(x) = x {
if let Some((x, delta_base_hash)) = x {
let delta_referrer_key = self
.construct_delta_referrer_key(
ns_id,
*hash.as_bytes(),
delta_base_hash,
);
txn.set(&content_key, &x);
txn.set(&delta_referrer_key, b"");
early_completion = true;
tracing::debug!(ns = ns_id_hex, "delta encoded");
}
}
Err(e) => {
Expand Down Expand Up @@ -942,33 +946,70 @@ impl Server {
buf
}

fn construct_delta_referrer_key(
&self,
ns_id: [u8; 10],
from_hash: [u8; 32],
to_hash: [u8; 32],
) -> Vec<u8> {
let mut buf: Vec<u8> = Vec::with_capacity(
self.raw_data_prefix.len() + ns_id.len() + 1 + to_hash.len() + from_hash.len(),
);
buf.extend_from_slice(&self.raw_data_prefix);
buf.extend_from_slice(&ns_id);
buf.push(b'r');
buf.extend_from_slice(&to_hash);
buf.extend_from_slice(&from_hash);
buf
}

async fn delta_encode(
&self,
txn: &Transaction,
ns_id: [u8; 10],
base_page_index: u32,
this_page: &[u8],
) -> Result<Option<Vec<u8>>> {
) -> Result<Option<(Vec<u8>, [u8; 32])>> {
let version_hex = hex::encode(&self.get_read_version_as_versionstamp(&txn).await?);
let delta_base_hash = self
.read_page_hash(&txn, ns_id, base_page_index, &version_hex)
.await?;

let (_, delta_base_hash) = match delta_base_hash {
let (_, delta_base_hash) = match self
.read_page_hash(&txn, ns_id, base_page_index, &version_hex)
.await?
{
Some(x) => x,
None => return Ok(None),
};

let base_page_key = self.construct_content_key(ns_id, delta_base_hash);
let base_page = match txn.get(&base_page_key, true).await? {
Some(x) => x,
None => return Ok(None),
let (undecoded_base, delta_base_hash) = {
let base_page_key = self.construct_content_key(ns_id, delta_base_hash);
let base = match txn.get(&base_page_key, true).await? {
Some(x) => x,
None => return Ok(None),
};
if base.len() == 0 {
return Ok(None);
}
if base[0] == PAGE_ENCODING_DELTA {
// Flatten the link
if base.len() < 33 {
return Ok(None);
}
let flattened_base_hash = <[u8; 32]>::try_from(&base[1..33]).unwrap();
let flattened_base_key = self.construct_content_key(ns_id, flattened_base_hash);
let flattened_base = match txn.get(&flattened_base_key, true).await? {
Some(x) => x,
None => return Ok(None),
};
tracing::debug!(
from = hex::encode(&delta_base_hash),
to = hex::encode(&flattened_base_hash),
"flattened delta page"
);
(flattened_base, flattened_base_hash)
} else {
(base, delta_base_hash)
}
};
let base_page = self.decode_page(txn, ns_id, &base_page).await?;
if base_page.delta_depth >= 4 {
return Ok(None);
}

let base_page = self.decode_page_no_delta(&undecoded_base)?;
if base_page.data.len() != this_page.len() || this_page.is_empty() {
return Ok(None);
}
Expand Down Expand Up @@ -1002,19 +1043,12 @@ impl Server {
tracing::debug!(
ns = hex::encode(&ns_id),
base = hex::encode(&delta_base_hash),
depth = base_page.delta_depth + 1,
"delta encoded"
);
Ok(Some(output))
Ok(Some((output, delta_base_hash)))
}

#[async_recursion]
async fn decode_page(
&self,
txn: &Transaction,
ns_id: [u8; 10],
data: &[u8],
) -> Result<DecodedPage> {
fn decode_page_no_delta(&self, data: &[u8]) -> Result<DecodedPage> {
if data.len() == 0 {
return Ok(DecodedPage::default());
}
Expand All @@ -1025,18 +1059,33 @@ impl Server {
// not compressed
Ok(DecodedPage {
data: data[1..].to_vec(),
delta_depth: 0,
})
}
PAGE_ENCODING_ZSTD => {
// zstd
let data = zstd::bulk::decompress(&data[1..], MAX_PAGE_SIZE)
.with_context(|| "zstd decompress failed")?;
Ok(DecodedPage {
data,
delta_depth: 0,
})
Ok(DecodedPage { data })
}
_ => Err(anyhow::anyhow!(
"decode_page_no_delta: unknown page encoding: {}",
encode_type
)),
}
}

async fn decode_page(
&self,
txn: &Transaction,
ns_id: [u8; 10],
data: &[u8],
) -> Result<DecodedPage> {
if data.len() == 0 {
return Ok(DecodedPage::default());
}

let encode_type = data[0];
match encode_type {
PAGE_ENCODING_DELTA => {
if data.len() < 33 {
anyhow::bail!("invalid delta encoding");
Expand All @@ -1047,7 +1096,7 @@ impl Server {
Some(x) => x,
None => anyhow::bail!("base page not found"),
};
let base_page = self.decode_page(txn, ns_id, &base_page).await?;
let base_page = self.decode_page_no_delta(&base_page)?;
let mut delta_data = zstd::bulk::decompress(&data[33..], MAX_PAGE_SIZE)?;
if delta_data.len() != base_page.data.len() {
anyhow::bail!("delta and base have different sizes");
Expand All @@ -1057,14 +1106,9 @@ impl Server {
*b ^= base_page.data[i];
}

Ok(DecodedPage {
data: delta_data,
delta_depth: base_page.delta_depth + 1,
})
}
_ => {
anyhow::bail!("unsupported page encoding: {}", encode_type);
Ok(DecodedPage { data: delta_data })
}
_ => self.decode_page_no_delta(data),
}
}
}
Expand Down

0 comments on commit a84f7d3

Please sign in to comment.