Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add serialized_size to StorageEncode #2227

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions crates/admin/src/cluster_controller/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use std::time::Duration;

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use tonic::{async_trait, Request, Response, Status};
use tracing::info;

Expand Down Expand Up @@ -289,7 +289,6 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
}

fn serialize_value<T: StorageEncode>(value: T) -> Bytes {
let mut buf = BytesMut::new();
StorageCodec::encode(&value, &mut buf).expect("We can always serialize");
let buf = StorageCodec::encode(&value).expect("We can always serialize");
buf.freeze()
}
32 changes: 12 additions & 20 deletions crates/bifrost/src/providers/local_loglet/log_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,22 +104,18 @@ impl LogStateUpdates {

impl LogStateUpdates {
pub fn to_bytes(&self) -> Result<BytesMut, LogStoreError> {
// trying to avoid buffer resizing by having plenty of space for serialization
let mut buf = BytesMut::with_capacity(std::mem::size_of::<Self>() * 2);
self.encode(&mut buf)?;
let buf = self.encode()?;
Ok(buf)
}

pub fn encode_and_split(&self, buf: &mut BytesMut) -> Result<BytesMut, LogStoreError> {
self.encode(buf)?;
pub fn encode_and_split(&self) -> Result<BytesMut, LogStoreError> {
let mut buf = self.encode()?;
Ok(buf.split())
}

pub fn encode(&self, buf: &mut BytesMut) -> Result<(), LogStoreError> {
// trying to avoid buffer resizing by having plenty of space for serialization
buf.reserve(std::mem::size_of::<Self>() * 2);
StorageCodec::encode(self, buf)?;
Ok(())
pub fn encode(&self) -> Result<BytesMut, LogStoreError> {
let buf = StorageCodec::encode(self)?;
Ok(buf)
}

pub fn from_slice(mut data: &[u8]) -> Result<Self, LogStoreError> {
Expand Down Expand Up @@ -149,23 +145,19 @@ pub struct LogState {

impl LogState {
pub fn to_bytes(&self) -> Result<BytesMut, LogStoreError> {
// trying to avoid buffer resizing by having plenty of space for serialization
let mut buf = BytesMut::with_capacity(std::mem::size_of::<Self>() * 2);
self.encode(&mut buf)?;
let buf = self.encode()?;
Ok(buf)
}

#[allow(unused)]
pub fn encode_and_split(&self, buf: &mut BytesMut) -> Result<BytesMut, LogStoreError> {
self.encode(buf)?;
pub fn encode_and_split(&self) -> Result<BytesMut, LogStoreError> {
let mut buf = self.encode()?;
Ok(buf.split())
}

pub fn encode(&self, buf: &mut BytesMut) -> Result<(), LogStoreError> {
// trying to avoid buffer resizing by having plenty of space for serialization
buf.reserve(std::mem::size_of::<Self>() * 2);
StorageCodec::encode(self, buf)?;
Ok(())
pub fn encode(&self) -> Result<BytesMut, LogStoreError> {
let buf = StorageCodec::encode(self)?;
Ok(buf)
}

pub fn from_slice(mut data: &[u8]) -> Result<Self, LogStoreError> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ impl LogStoreWriter {
&mut write_batch,
command.loglet_id,
logstate_updates,
buffer,
)
}

Expand All @@ -203,9 +202,8 @@ impl LogStoreWriter {
write_batch: &mut WriteBatch,
loglet_id: u64,
updates: LogStateUpdates,
buffer: &mut BytesMut,
) {
let buffer = updates.encode_and_split(buffer).expect("encode");
let buffer = updates.encode_and_split().expect("encode");
write_batch.merge_cf(
metadata_cf,
MetadataKey::new(loglet_id, MetadataKind::LogState).to_bytes(),
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/providers/local_loglet/record_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ fn write_record(record: &Record, buf: &mut BytesMut) -> BytesMut {
match record.body() {
PolyBytes::Bytes(raw_bytes) => buf.put_slice(raw_bytes),
PolyBytes::Typed(encodeable) => {
StorageCodec::encode(encodeable.deref(), buf).expect("record serde is infallible")
StorageCodec::encode(encodeable.deref()).expect("record serde is infallible");
}
}
buf.split()
Expand Down
5 changes: 2 additions & 3 deletions crates/core/src/metadata_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod test_util;
#[cfg(any(test, feature = "test-util"))]
use crate::metadata_store::test_util::InMemoryMetadataStore;
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use bytestring::ByteString;
use restate_types::errors::GenericError;
use restate_types::retries::RetryPolicy;
Expand Down Expand Up @@ -173,8 +173,7 @@ impl MetadataStoreClient {
{
let version = value.version();

let mut buf = BytesMut::default();
StorageCodec::encode(value, &mut buf).map_err(|err| WriteError::Codec(err.into()))?;
let buf = StorageCodec::encode(value).map_err(|err| WriteError::Codec(err.into()))?;

self.inner
.put(
Expand Down
6 changes: 3 additions & 3 deletions crates/metadata-store/src/local/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl LocalMetadataStore {
value: &VersionedValue,
) -> Result<()> {
self.buffer.clear();
Self::encode(value, &mut self.buffer)?;
Self::encode(value)?;

let write_options = self.write_options();
let cf_handle = self.kv_cf_handle();
Expand Down Expand Up @@ -324,8 +324,8 @@ impl LocalMetadataStore {
.map_err(Into::into)
}

fn encode<T: StorageEncode>(value: &T, buf: &mut BytesMut) -> Result<()> {
StorageCodec::encode(value, buf)?;
fn encode<T: StorageEncode>(value: &T) -> Result<()> {
StorageCodec::encode(value)?;
Ok(())
}

Expand Down
10 changes: 5 additions & 5 deletions crates/node/src/network_server/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,22 @@ impl<T: TransportConnect> NodeSvc for NodeSvcHandler<T> {
.await
.map_err(|e| Status::internal(e.to_string()))?;
}
let mut encoded = BytesMut::new();
let encoded = BytesMut::new();
match kind {
MetadataKind::NodesConfiguration => {
StorageCodec::encode(metadata.nodes_config_ref().as_ref(), &mut encoded)
StorageCodec::encode(metadata.nodes_config_ref().as_ref())
.expect("We can always serialize");
}
MetadataKind::Schema => {
StorageCodec::encode(metadata.schema_ref().as_ref(), &mut encoded)
StorageCodec::encode(metadata.schema_ref().as_ref())
.expect("We can always serialize");
}
MetadataKind::PartitionTable => {
StorageCodec::encode(metadata.partition_table_ref().as_ref(), &mut encoded)
StorageCodec::encode(metadata.partition_table_ref().as_ref())
.expect("We can always serialize");
}
MetadataKind::Logs => {
StorageCodec::encode(metadata.logs_ref().as_ref(), &mut encoded)
StorageCodec::encode(metadata.logs_ref().as_ref())
.expect("We can always serialize");
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/partition-store/src/partition_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ pub(crate) trait StorageAccess {
let key_buffer = key_buffer.split();

let value_buffer = self.cleared_value_buffer_mut(0);
StorageCodec::encode(value, value_buffer).unwrap();
StorageCodec::encode(value).unwrap();
let value_buffer = value_buffer.split();

self.put_cf(K::TABLE, key_buffer, value_buffer);
Expand Down
3 changes: 3 additions & 0 deletions crates/storage-api/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ macro_rules! protobuf_storage_encode_decode {
restate_types::storage::StorageEncodeError::EncodeValue(err.into())
})
}
fn serialized_size(&self) -> usize {
bytes::BytesMut::default().len() // todo: to be calculated
}
}

impl restate_types::storage::StorageDecode for $ty {
Expand Down
6 changes: 2 additions & 4 deletions crates/types/src/partition_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ impl Iterator for EqualSizedPartitionPartitioner {

#[cfg(test)]
mod tests {
use bytes::BytesMut;
use test_log::test;

use crate::identifiers::{PartitionId, PartitionKey};
Expand Down Expand Up @@ -446,16 +445,15 @@ mod tests {
let version = Version::from(42);
let num_partitions = 1337;
let expected_fixed_partition_table = FixedPartitionTable::new(version, num_partitions);
let mut buf = BytesMut::default();

StorageCodec::encode(&expected_fixed_partition_table, &mut buf)?;
let mut buf = StorageCodec::encode(&expected_fixed_partition_table)?;
let partition_table = StorageCodec::decode::<PartitionTable, _>(&mut buf)?;

assert_eq!(partition_table.version, version);
assert_eq!(partition_table.num_partitions(), num_partitions);

buf.clear();
StorageCodec::encode(&partition_table, &mut buf)?;
buf = StorageCodec::encode(&partition_table)?;
let fixed_partition_table = StorageCodec::decode::<FixedPartitionTable, _>(&mut buf)?;

assert_eq!(fixed_partition_table.version, version);
Expand Down
24 changes: 8 additions & 16 deletions crates/types/src/schema/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,10 @@ mod serde_tests {

#[test]
fn can_deserialise_without_http_version() {
let mut buf = bytes::BytesMut::default();
StorageCodec::encode(
&OldDeploymentType::Http {
address: Uri::from_static("google.com"),
protocol_type: ProtocolType::BidiStream,
},
&mut buf,
)
let mut buf = StorageCodec::encode(&OldDeploymentType::Http {
address: Uri::from_static("google.com"),
protocol_type: ProtocolType::BidiStream,
})
.unwrap();
let dt: DeploymentType = StorageCodec::decode(&mut buf).unwrap();
assert_eq!(
Expand All @@ -174,14 +170,10 @@ mod serde_tests {
dt
);

let mut buf = bytes::BytesMut::default();
StorageCodec::encode(
&OldDeploymentType::Http {
address: Uri::from_static("google.com"),
protocol_type: ProtocolType::RequestResponse,
},
&mut buf,
)
let mut buf = StorageCodec::encode(&OldDeploymentType::Http {
address: Uri::from_static("google.com"),
protocol_type: ProtocolType::RequestResponse,
})
.unwrap();
let dt: DeploymentType = StorageCodec::decode(&mut buf).unwrap();
assert_eq!(
Expand Down
50 changes: 38 additions & 12 deletions crates/types/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,27 @@ impl TryFrom<u8> for StorageCodecKind {
pub struct StorageCodec;

impl StorageCodec {
pub fn encode<T: StorageEncode + ?Sized>(
value: &T,
buf: &mut BytesMut,
) -> Result<(), StorageEncodeError> {
// write codec
pub fn encode<T: StorageEncode + ?Sized>(value: &T) -> Result<BytesMut, StorageEncodeError> {
// Calculate the exact size required for serialization
let size = value.serialized_size();

// Create a buffer with enough capacity
let mut buf = BytesMut::with_capacity(size + 1); // +1 for codec version byte

// Write codec version byte
buf.put_u8(value.default_codec().into());
// encode value
value.encode(buf)

// Encode the value into the buffer
value.encode(&mut buf)?;

Ok(buf)
}

pub fn encode_and_split<T: StorageEncode + ?Sized>(
value: &T,
buf: &mut BytesMut,
) -> Result<BytesMut, StorageEncodeError> {
Self::encode(value, buf)?;
Self::encode(value)?;
Ok(buf.split())
}

Expand Down Expand Up @@ -119,6 +125,9 @@ pub trait StorageEncode: DowncastSync {

/// Codec which is used when encode new values.
fn default_codec(&self) -> StorageCodecKind;

/// Returns the expected serialized size in bytes of the encoded value.
fn serialized_size(&self) -> usize;
}
impl_downcast!(sync StorageEncode);

Expand Down Expand Up @@ -153,6 +162,12 @@ macro_rules! flexbuffers_storage_encode_decode {
$crate::storage::encode_as_flexbuffers(self, buf)
.map_err(|err| $crate::storage::StorageEncodeError::EncodeValue(err.into()))
}


fn serialized_size(&self) -> usize {
bytes::BytesMut::default().len() // todo: to be calculated
}

}

impl $crate::storage::StorageDecode for $name {
Expand Down Expand Up @@ -195,7 +210,7 @@ impl StorageEncode for PolyBytes {
match self {
PolyBytes::Bytes(bytes) => buf.put_slice(bytes.as_ref()),
PolyBytes::Typed(typed) => {
StorageCodec::encode(&**typed, buf)?;
StorageCodec::encode(&**typed)?;
}
};
Ok(())
Expand All @@ -204,6 +219,12 @@ impl StorageEncode for PolyBytes {
fn default_codec(&self) -> StorageCodecKind {
StorageCodecKind::FlexbuffersSerde
}
fn serialized_size(&self) -> usize {
match self {
PolyBytes::Bytes(bytes) => bytes.len(),
PolyBytes::Typed(typed) => typed.serialized_size(),
}
}
}

/// SerializeAs/DeserializeAs to implement ser/de trait for [`PolyBytes`]
Expand All @@ -218,9 +239,7 @@ impl serde_with::SerializeAs<PolyBytes> for EncodedPolyBytes {
match source {
PolyBytes::Bytes(bytes) => serializer.serialize_bytes(bytes.as_ref()),
PolyBytes::Typed(typed) => {
// todo: estimate size to avoid re allocations
let mut buf = BytesMut::new();
StorageCodec::encode(&**typed, &mut buf).expect("record serde is infallible");
let buf = StorageCodec::encode(&**typed).expect("record serde is infallible");
serializer.serialize_bytes(buf.as_ref())
}
}
Expand Down Expand Up @@ -266,6 +285,10 @@ impl StorageEncode for String {
buf.put_slice(my_bytes);
Ok(())
}

fn serialized_size(&self) -> usize {
4 + self.len() // 4 bytes for length + string byte length
}
}
impl StorageDecode for String {
fn decode<B: ::bytes::Buf>(
Expand Down Expand Up @@ -332,6 +355,9 @@ impl StorageEncode for bytes::Bytes {
buf.put_slice(&self[..]);
Ok(())
}
fn serialized_size(&self) -> usize {
4 + self.len() // 4 bytes for length + byte slice length
}
}

/// Utility method to encode a [`Serialize`] type as flexbuffers using serde.
Expand Down
7 changes: 3 additions & 4 deletions crates/wal-protocol/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct AnnounceLeader {
#[cfg(test)]
mod tests {
use crate::control::AnnounceLeader;
use bytes::BytesMut;

use restate_types::identifiers::LeaderEpoch;
use restate_types::storage::StorageCodec;
use restate_types::{flexbuffers_storage_encode_decode, GenerationalNodeId};
Expand Down Expand Up @@ -60,8 +60,7 @@ mod tests {
leader_epoch,
};

let mut buf = BytesMut::default();
StorageCodec::encode(&old_announce_leader, &mut buf)?;
let mut buf = StorageCodec::encode(&old_announce_leader)?;

let new_announce_leader = StorageCodec::decode::<AnnounceLeader, _>(&mut buf)?;

Expand All @@ -76,7 +75,7 @@ mod tests {
);

buf.clear();
StorageCodec::encode(&new_announce_leader, &mut buf)?;
buf = StorageCodec::encode(&new_announce_leader)?;

let announce_leader = StorageCodec::decode::<OldAnnounceLeader, _>(&mut buf)?;

Expand Down
Loading
Loading