Skip to content

Commit

Permalink
Let PartitionProcessorManager sync on logs configuration before start…
Browse files Browse the repository at this point in the history
…ing partition processors
  • Loading branch information
tillrohrmann committed Oct 14, 2024
1 parent d5250e6 commit 12920f5
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 38 deletions.
72 changes: 46 additions & 26 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,23 @@
// by the Apache License, Version 2.0.

use crate::cluster_controller::scheduler::ObservedClusterState;
use rand::prelude::IteratorRandom;
use rand::{thread_rng, RngCore};
use restate_bifrost::{Bifrost, BifrostAdmin};
use restate_core::metadata_store::{MetadataStoreClient, Precondition, WriteError};
use restate_core::{task_center, Metadata, MetadataWriter};
use restate_core::{metadata, task_center, Metadata, MetadataWriter};
use restate_types::logs::builder::{ChainBuilder, LogsBuilder};
use restate_types::logs::metadata::{
Chain, LogletConfig, LogletParams, Logs, ProviderKind, SegmentIndex,
};
use restate_types::logs::{LogId, Lsn};
use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;
use restate_types::nodes_config::Role;
use restate_types::partition_table::PartitionTable;
use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletParams, ReplicationProperty};
use restate_types::{Version, Versioned};
use std::collections::BTreeMap;
use std::num::NonZeroU8;
use rand::prelude::IteratorRandom;
use tokio::task::JoinSet;
use tracing::debug;

Expand Down Expand Up @@ -220,30 +221,44 @@ fn find_new_replicated_loglet_configuration(
) -> Option<ReplicatedLogletParams> {
let mut rng = rand::thread_rng();
// todo make min nodeset size configurable, respect roles, etc.
if observed_cluster_state.alive_nodes.len() >= 3 {
let nodes_config = metadata().nodes_config_ref();

let log_servers: Vec<_> = observed_cluster_state
.alive_nodes
.values()
.filter(|node| {
nodes_config
.find_node_by_id(**node)
.ok()
.is_some_and(|config| config.has_role(Role::LogServer))
})
.collect();

if log_servers.len() >= 3 {
let replication = ReplicationProperty::new(NonZeroU8::new(2).expect("to be valid"));
let mut nodeset = NodeSet::empty();

for node in observed_cluster_state.alive_nodes.keys() {
nodeset.insert(*node);
for node in &log_servers {
nodeset.insert(node.as_plain());
}

Some(ReplicatedLogletParams {
loglet_id: rng.next_u64().into(),
sequencer: *observed_cluster_state.alive_nodes.values().choose(&mut rng).expect("one node must exist"),
sequencer: **log_servers
.iter()
.choose(&mut rng)
.expect("one node must exist"),
replication,
nodeset,
write_set: None,
})
} else if let Some(sequencer) = log_servers.iter().choose(&mut rng) {
previous_configuration.cloned().map(|mut configuration| {
configuration.sequencer = **sequencer;
configuration
})
} else {
if let Some(sequencer) = observed_cluster_state.alive_nodes.values().choose(&mut rng) {
previous_configuration.cloned().map(|mut configuration| {
configuration.sequencer = *sequencer;
configuration
})
} else {
None
}
None
}
}

Expand Down Expand Up @@ -307,13 +322,8 @@ impl LogletConfiguration {
}
};

if let Some(new_configuration) =
find_new_replicated_loglet_configuration(observed_cluster_state, previous_configuration)
{
Some(LogletConfiguration::Replicated(new_configuration))
} else {
None
}
find_new_replicated_loglet_configuration(observed_cluster_state, previous_configuration)
.map(LogletConfiguration::Replicated)
}
}

Expand Down Expand Up @@ -371,7 +381,11 @@ struct Inner {
}

impl Inner {
fn new(logs: Logs, partition_table: &PartitionTable, default_provider: ProviderKind) -> Result<Self, anyhow::Error> {
fn new(
logs: Logs,
partition_table: &PartitionTable,
default_provider: ProviderKind,
) -> Result<Self, anyhow::Error> {
let mut this = Self {
default_provider,
logs: Logs::default(),
Expand Down Expand Up @@ -444,7 +458,9 @@ impl Inner {
logs_builder: &mut LogsBuilder,
) -> Result<(), anyhow::Error> {
for (log_id, log_state) in &mut self.logs_state {
let mut chain_builder = logs_builder.chain(log_id).expect("should be present");
let mut chain_builder = logs_builder
.chain(log_id)
.expect("Log with '{log_id}' should be present");
log_state.try_reconfiguring(observed_cluster_state, &mut chain_builder)?;
}

Expand Down Expand Up @@ -580,11 +596,15 @@ impl LogsController {
default_provider: ProviderKind,
) -> anyhow::Result<Self> {
let logs = metadata_store_client
.get_or_insert(BIFROST_CONFIG_KEY.clone(), || Logs::default())
.get_or_insert(BIFROST_CONFIG_KEY.clone(), Logs::default)
.await?;
Ok(Self {
effects: Some(Vec::new()),
inner: Inner::new(logs, metadata.partition_table_ref().as_ref(), default_provider)?,
inner: Inner::new(
logs,
metadata.partition_table_ref().as_ref(),
default_provider,
)?,
bifrost,
metadata_store_client,
metadata_writer,
Expand Down Expand Up @@ -692,7 +712,7 @@ impl LogsController {
let bifrost = self.bifrost.clone();
let metadata_store_client = self.metadata_store_client.clone();
let metadata_writer = self.metadata_writer.clone();
self.async_operations.spawn_local(async move {
self.async_operations.spawn(async move {
tc.run_in_scope("seal-log", None, async {
let bifrost_admin =
BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client);
Expand Down
10 changes: 5 additions & 5 deletions crates/admin/src/cluster_controller/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use restate_core::metadata_store::{
MetadataStoreClient, Precondition, ReadError, ReadWriteError, WriteError,
};
use restate_core::network::{NetworkSender, Networking, Outgoing, TransportConnect};
use restate_core::{ShutdownError, SyncError, TaskCenter, TaskKind};
use restate_core::{metadata, ShutdownError, SyncError, TaskCenter, TaskKind};
use restate_types::cluster::cluster_state::{ClusterState, NodeState, RunMode};
use restate_types::cluster_controller::{
ReplicationStrategy, SchedulingPlan, SchedulingPlanBuilder, TargetPartitionState,
Expand All @@ -29,7 +29,7 @@ use restate_types::net::partition_processor_manager::{
ControlProcessor, ControlProcessors, ProcessorCommand,
};
use restate_types::partition_table::PartitionTable;
use restate_types::{GenerationalNodeId, PlainNodeId, Version, Versioned};
use restate_types::{GenerationalNodeId, PlainNodeId, Versioned};

#[derive(Debug, thiserror::Error)]
#[error("failed reading scheduling plan from metadata store: {0}")]
Expand Down Expand Up @@ -80,7 +80,7 @@ impl<T: TransportConnect> Scheduler<T> {
networking: Networking<T>,
) -> Result<Self, BuildError> {
let scheduling_plan = metadata_store_client
.get_or_insert(SCHEDULING_PLAN_KEY.clone(), || SchedulingPlan::default())
.get_or_insert(SCHEDULING_PLAN_KEY.clone(), SchedulingPlan::default)
.await?;

Ok(Self {
Expand Down Expand Up @@ -332,7 +332,8 @@ impl<T: TransportConnect> Scheduler<T> {
for (node_id, commands) in commands.into_iter() {
let control_processors = ControlProcessors {
// todo: Maybe remove unneeded partition table version
min_partition_table_version: Version::MIN,
min_partition_table_version: metadata().partition_table_version(),
min_logs_table_version: metadata().logs_version(),
commands,
};

Expand Down Expand Up @@ -495,7 +496,6 @@ mod tests {
use rand::Rng;
use std::collections::{BTreeMap, BTreeSet};
use std::num::NonZero;
use std::sync::Arc;
use std::time::Duration;
use test_log::test;
use tokio::sync::mpsc;
Expand Down
7 changes: 2 additions & 5 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ impl Node {
.await?;

metadata_writer.update(partition_table).await?;
// metadata_writer.update(logs).await?;
} else {
// otherwise, just sync the required metadata
metadata
Expand All @@ -320,12 +319,10 @@ impl Node {
.await?;

// safety check until we can tolerate missing partition table and logs configuration
if metadata.partition_table_version() == Version::INVALID
|| metadata.logs_version() == Version::INVALID
{
if metadata.partition_table_version() == Version::INVALID {
return Err(Error::SafetyCheck(
format!(
"Missing partition table or logs configuration for cluster '{}'. This indicates that the cluster bootstrap is incomplete. Please re-run with '--allow-bootstrap true'.",
"Missing partition table for cluster '{}'. This indicates that the cluster bootstrap is incomplete. Please re-run with '--allow-bootstrap true'.",
config.common.cluster_name(),
)))?;
}
Expand Down
1 change: 1 addition & 0 deletions crates/types/src/net/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ define_message! {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ControlProcessors {
pub min_partition_table_version: Version,
pub min_logs_table_version: Version,
pub commands: Vec<ControlProcessor>,
}

Expand Down
7 changes: 7 additions & 0 deletions crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use restate_types::logs::SequenceNumber;
use restate_types::metadata_store::keys::partition_processor_epoch_key;
use restate_types::net::cluster_controller::AttachRequest;
use restate_types::net::cluster_controller::{Action, AttachResponse};
use restate_types::net::metadata::MetadataKind;
use restate_types::net::partition_processor_manager::{
ControlProcessor, ControlProcessors, CreateSnapshotResponse, GetProcessorsState,
ProcessorCommand, SnapshotError,
Expand Down Expand Up @@ -587,6 +588,12 @@ impl<T: TransportConnect> PartitionProcessorManager<T> {
) -> Result<(), Error> {
let (_, control_processors) = control_processor.split();

self.metadata
.wait_for_version(
MetadataKind::Logs,
control_processors.min_logs_table_version,
)
.await?;
let partition_table = self
.metadata
.wait_for_partition_table(control_processors.min_partition_table_version)
Expand Down
11 changes: 9 additions & 2 deletions server/tests/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,25 @@ async fn replicated_loglet() {
let nodes = Node::new_test_nodes_with_metadata(
base_config.clone(),
BinarySource::CargoTest,
enum_set!(Role::Worker | Role::Admin | Role::LogServer),
enum_set!(Role::Worker | Role::LogServer),
3,
);

let cluster = Cluster::builder()
.cluster_name("cluster-1")
.temp_base_dir()
.nodes(nodes)
.build()
.start()
.await
.unwrap();

assert!(cluster.wait_healthy(Duration::from_secs(30)).await);

for idx in 1..=3 {
assert!(cluster.nodes[idx]
.lines("PartitionProcessor starting up".parse().unwrap())
.next()
.await
.is_some())
}
}

0 comments on commit 12920f5

Please sign in to comment.