Skip to content

Commit

Permalink
[Bifrost] Replicated loglet params
Browse files Browse the repository at this point in the history
Introduces some helper types that will be used to configure the replicated loglet. At the moment this doesn't include any object-store related configuration as it's unclear still what configuration parameters will need to be there.
  • Loading branch information
AhmedSoliman committed Aug 26, 2024
1 parent 4712c64 commit bbe0ac8
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 1 deletion.
81 changes: 80 additions & 1 deletion crates/types/src/node_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::str::FromStr;

/// A generational node identifier. Nodes with the same ID but different generations
/// represent the same node across different instances (restarts) of its lifetime.
///
Expand Down Expand Up @@ -50,6 +52,36 @@ pub enum NodeId {
#[display("{}:{}", _0, _1)]
pub struct GenerationalNodeId(PlainNodeId, u32);

#[derive(Debug, thiserror::Error)]
#[error("invalid plain node id: {0}")]
pub struct MalformedPlainNodeId(String);

#[derive(Debug, thiserror::Error)]
#[error("invalid generational node id: {0}")]
pub struct MalformedGenerationalNodeId(String);

impl FromStr for GenerationalNodeId {
type Err = MalformedGenerationalNodeId;

fn from_str(s: &str) -> Result<Self, Self::Err> {
// generational node id can be in "N<id>:<generation>" format or <id>:<gen> format.
// parse the id and generation and construct GenerationalNodeId from this string
let (id_part, gen_part) = s
.split_once(':')
.ok_or_else(|| MalformedGenerationalNodeId(s.to_string()))?;
let id = id_part
.trim_start_matches('N')
.parse()
.map_err(|_| MalformedGenerationalNodeId(s.to_string()))?;

let generation = gen_part
.parse()
.map_err(|_| MalformedGenerationalNodeId(s.to_string()))?;

Ok(GenerationalNodeId::new(id, generation))
}
}

#[derive(
Debug,
Default,
Expand All @@ -61,7 +93,6 @@ pub struct GenerationalNodeId(PlainNodeId, u32);
Copy,
Hash,
derive_more::From,
derive_more::FromStr,
derive_more::Into,
derive_more::Display,
serde::Serialize,
Expand All @@ -72,6 +103,20 @@ pub struct GenerationalNodeId(PlainNodeId, u32);
#[display("N{}", _0)]
pub struct PlainNodeId(u32);

impl FromStr for PlainNodeId {
type Err = MalformedPlainNodeId;

fn from_str(s: &str) -> Result<Self, Self::Err> {
// plain id can be in "N<id>" format or <id> format.
let id = s
.trim_start_matches('N')
.parse()
.map_err(|_| MalformedPlainNodeId(s.to_string()))?;

Ok(PlainNodeId::new(id))
}
}

impl NodeId {
pub fn new(id: u32, generation: Option<u32>) -> NodeId {
match generation {
Expand Down Expand Up @@ -179,6 +224,10 @@ impl From<GenerationalNodeId> for crate::protobuf::common::NodeId {
}

impl PlainNodeId {
pub const fn new(id: u32) -> PlainNodeId {
PlainNodeId(id)
}

pub fn with_generation(self, generation: u32) -> GenerationalNodeId {
GenerationalNodeId(self, generation)
}
Expand Down Expand Up @@ -256,6 +305,36 @@ mod tests {
assert_eq!("N1:2", GenerationalNodeId(PlainNodeId(1), 2).to_string());
}

#[test]
fn test_parse_plain_node_id_string() {
let plain = NodeId::Plain(PlainNodeId(25));
assert_eq!("N25", plain.to_string());
let parsed_1: PlainNodeId = "N25".parse().unwrap();
assert_eq!(parsed_1, plain);
let parsed_2: PlainNodeId = "25".parse().unwrap();
assert_eq!(parsed_2, plain);
// invalid
assert!("25:10".parse::<PlainNodeId>().is_err());
// invalid
assert!("N25:".parse::<PlainNodeId>().is_err());
// invalid
assert!("N25:10".parse::<PlainNodeId>().is_err());
}

#[test]
fn test_parse_generational_node_id_string() {
let generational = GenerationalNodeId::new(25, 18);
assert_eq!("N25:18", generational.to_string());
let parsed_1: GenerationalNodeId = "N25:18".parse().unwrap();
assert_eq!(parsed_1, generational);
let parsed_2: GenerationalNodeId = "25:18".parse().unwrap();
assert_eq!(parsed_2, generational);
// invalid
assert!("25".parse::<GenerationalNodeId>().is_err());
// invalid
assert!("N25".parse::<GenerationalNodeId>().is_err());
}

#[test]
fn test_equality() {
let plain1 = NodeId::Plain(PlainNodeId(1));
Expand Down
4 changes: 4 additions & 0 deletions crates/types/src/replicated_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod params;

pub use params::*;
123 changes: 123 additions & 0 deletions crates/types/src/replicated_loglet/params.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// todo: remove when fleshed out
#![allow(unused)]

use std::collections::HashSet;
use std::num::NonZeroU8;

use serde_with::{DisplayFromStr, VecSkipError};

use crate::{GenerationalNodeId, PlainNodeId};

/// Configuration parameters of a replicated loglet segment
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
pub struct ReplicatedLogletParams {
/// Unique identifier for this loglet
loglet_id: ReplicatedLogletId,
/// The sequencer node
#[serde(with = "serde_with::As::<serde_with::DisplayFromStr>")]
sequencer: GenerationalNodeId,
/// Replication properties of this loglet
replication: Replication,
nodeset: NodeSet,
/// The set of nodes the sequencer has been considering for writes after the last LGC advance
/// If unset, the entire nodeset is considered as part of the write set
/// If set, tail repair will attempt reading only from this set.
#[serde(skip_serializing_if = "Option::is_none")]
write_set: Option<NodeSet>,
}

impl ReplicatedLogletParams {
pub fn deserialize_from(slice: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice(slice)
}

pub fn serialize(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(self)
}
}

#[derive(
serde::Serialize, serde::Deserialize, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Clone, Copy,
)]
#[serde(transparent)]
#[repr(transparent)]
pub struct ReplicatedLogletId(u64);

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
pub struct Replication {
/// The write-quorum for appends
replication_factor: NonZeroU8,
/// The number of extra copies (best effort) to be replicated in addition to the
/// replication_factor.
///
/// default is 0
#[serde(default)]
extra_copies: u8,
#[serde(default)]
durability: DurabilityLevel,
}

impl Replication {
pub fn read_quorum_size(&self, nodeset: &NodeSet) -> u8 {
// N - replication_factor + 1
nodeset.len() - self.replication_factor.get() + 1
}
}

#[serde_with::serde_as]
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct NodeSet(#[serde_as(as = "HashSet<DisplayFromStr>")] HashSet<PlainNodeId>);

impl NodeSet {
pub fn from_single(node: PlainNodeId) -> Self {
let mut set = HashSet::new();
set.insert(node);
Self(set)
}

pub fn len(&self) -> u8 {
self.0
.len()
.try_into()
.expect("nodeset cannot exceed 255 nodes")
}

pub fn iter(&self) -> impl Iterator<Item = &PlainNodeId> {
self.0.iter()
}
}

/// Durability level from lowest to highest
#[derive(
serde::Serialize,
serde::Deserialize,
Debug,
Clone,
Copy,
Ord,
PartialOrd,
Eq,
PartialEq,
Default,
)]
#[serde(rename_all = "kebab-case")]
#[repr(u8)]
pub enum DurabilityLevel {
/// WAL is disabled, Commit to memory only and acknowledge
Memory = 1,
/// Commit to WAL but do not wait for fllesystem buffer cache to be flushed
#[default]
WalAsync = 2,
/// Commit to WAL and only acknowledge after filesystem buffer cache flush is complete
WalSync = 3,
}

0 comments on commit bbe0ac8

Please sign in to comment.