Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bifrost] GetDigest message in replicated loglets #2042

Merged
merged 1 commit into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,9 @@ impl<T: TransportConnect> Sequencer<T> {
.await
.unwrap();

// Why is this AclRel?
// We are updating the next write offset and we want to make sure that after this call that
// we observe if task_center()'s shutdown signal was set or not consistently across
// threads.
//
// The situation we want to avoid is that we fail to spawn an appender due to shutdown but
// the subsequent fetch_add don't observe task-center's internal shutdown atomic.
let offset = LogletOffset::new(
self.sequencer_shared_state
.next_write_offset
Expand Down
70 changes: 56 additions & 14 deletions crates/log-server/src/loglet_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct LogletWorkerHandle {
get_records_tx: mpsc::UnboundedSender<Incoming<GetRecords>>,
trim_tx: mpsc::UnboundedSender<Incoming<Trim>>,
wait_for_tail_tx: mpsc::UnboundedSender<Incoming<WaitForTail>>,
get_digest_tx: mpsc::UnboundedSender<Incoming<GetDigest>>,
tc_handle: TaskHandle<()>,
}

Expand All @@ -49,48 +50,45 @@ impl LogletWorkerHandle {
self.tc_handle
}

pub fn enqueue_get_digest(&self, msg: Incoming<GetDigest>) -> Result<(), Incoming<GetDigest>> {
self.get_digest_tx.send(msg).map_err(|e| e.0)
}

pub fn enqueue_wait_for_tail(
&self,
msg: Incoming<WaitForTail>,
) -> Result<(), Incoming<WaitForTail>> {
self.wait_for_tail_tx.send(msg).map_err(|e| e.0)?;
Ok(())
self.wait_for_tail_tx.send(msg).map_err(|e| e.0)
}

pub fn enqueue_store(&self, msg: Incoming<Store>) -> Result<(), Incoming<Store>> {
self.store_tx.send(msg).map_err(|e| e.0)?;
Ok(())
self.store_tx.send(msg).map_err(|e| e.0)
}

pub fn enqueue_release(&self, msg: Incoming<Release>) -> Result<(), Incoming<Release>> {
self.release_tx.send(msg).map_err(|e| e.0)?;
Ok(())
self.release_tx.send(msg).map_err(|e| e.0)
}

pub fn enqueue_seal(&self, msg: Incoming<Seal>) -> Result<(), Incoming<Seal>> {
self.seal_tx.send(msg).map_err(|e| e.0)?;
Ok(())
self.seal_tx.send(msg).map_err(|e| e.0)
}

pub fn enqueue_get_loglet_info(
&self,
msg: Incoming<GetLogletInfo>,
) -> Result<(), Incoming<GetLogletInfo>> {
self.get_loglet_info_tx.send(msg).map_err(|e| e.0)?;
Ok(())
self.get_loglet_info_tx.send(msg).map_err(|e| e.0)
}

pub fn enqueue_get_records(
&self,
msg: Incoming<GetRecords>,
) -> Result<(), Incoming<GetRecords>> {
self.get_records_tx.send(msg).map_err(|e| e.0)?;
Ok(())
self.get_records_tx.send(msg).map_err(|e| e.0)
}

pub fn enqueue_trim(&self, msg: Incoming<Trim>) -> Result<(), Incoming<Trim>> {
self.trim_tx.send(msg).map_err(|e| e.0)?;
Ok(())
self.trim_tx.send(msg).map_err(|e| e.0)
}
}

Expand Down Expand Up @@ -122,6 +120,7 @@ impl<S: LogStore> LogletWorker<S> {
let (get_records_tx, get_records_rx) = mpsc::unbounded_channel();
let (trim_tx, trim_rx) = mpsc::unbounded_channel();
let (wait_for_tail_tx, wait_for_tail_rx) = mpsc::unbounded_channel();
let (get_digest_tx, get_digest_rx) = mpsc::unbounded_channel();
let tc_handle = task_center.spawn_unmanaged(
TaskKind::LogletWriter,
"loglet-worker",
Expand All @@ -134,6 +133,7 @@ impl<S: LogStore> LogletWorker<S> {
get_records_rx,
trim_rx,
wait_for_tail_rx,
get_digest_rx,
),
)?;
Ok(LogletWorkerHandle {
Expand All @@ -144,6 +144,7 @@ impl<S: LogStore> LogletWorker<S> {
get_records_tx,
trim_tx,
wait_for_tail_tx,
get_digest_tx,
tc_handle,
})
}
Expand All @@ -158,6 +159,7 @@ impl<S: LogStore> LogletWorker<S> {
mut get_records_rx: mpsc::UnboundedReceiver<Incoming<GetRecords>>,
mut trim_rx: mpsc::UnboundedReceiver<Incoming<Trim>>,
mut wait_for_tail_rx: mpsc::UnboundedReceiver<Incoming<WaitForTail>>,
mut get_digest_rx: mpsc::UnboundedReceiver<Incoming<GetDigest>>,
) {
// The worker is the sole writer to this loglet's local-tail so it's safe to maintain a moving
// local tail view and serialize changes to logstore as long as we send them in the correct
Expand All @@ -182,6 +184,12 @@ impl<S: LogStore> LogletWorker<S> {
debug!(loglet_id = %self.loglet_id, "Loglet writer shutting down");
return;
}
// GET_DIGEST
Some(msg) = get_digest_rx.recv() => {
self.loglet_state.notify_known_global_tail(msg.body().header.known_global_tail);
// digest responses are spawned as tasks
self.process_get_digest(msg);
}
Some(_) = in_flight_stores.next() => {}
// The in-flight seal (if any)
Some(Ok(_)) = &mut in_flight_seal => {
Expand Down Expand Up @@ -462,6 +470,40 @@ impl<S: LogStore> LogletWorker<S> {
);
}

fn process_get_digest(&mut self, msg: Incoming<GetDigest>) {
let mut log_store = self.log_store.clone();
let loglet_state = self.loglet_state.clone();
// fails on shutdown, in this case, we ignore the request
let _ = self.task_center.spawn(
TaskKind::Disposable,
"logserver-get-digest",
None,
async move {
let (reciprocal, msg) = msg.split();
// validation. Note that to_offset is inclusive.
if msg.from_offset > msg.to_offset {
let response =
reciprocal.prepare(Digest::empty().with_status(Status::Malformed));
// ship the response to the original connection
let _ = response.send().await;
return Ok(());
}
let digest = match log_store.get_records_digest(msg, &loglet_state).await {
Ok(digest) => digest,
Err(_) => Digest::new(
loglet_state.local_tail(),
loglet_state.known_global_tail(),
Default::default(),
)
.with_status(Status::Disabled),
};
// ship the response to the original connection
let _ = reciprocal.prepare(digest).send().await;
Ok(())
},
);
}

fn process_trim(&mut self, msg: Incoming<Trim>) {
// When trimming, we eagerly update the in-memory view of the trim-point _before_ we
// perform the trim on the log-store since it's safer to over report the trim-point than
Expand Down
8 changes: 7 additions & 1 deletion crates/log-server/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tokio::sync::oneshot;

use restate_bifrost::loglet::OperationError;
use restate_core::ShutdownError;
use restate_types::net::log_server::{GetRecords, Records, Seal, Store, Trim};
use restate_types::net::log_server::{Digest, GetDigest, GetRecords, Records, Seal, Store, Trim};
use restate_types::replicated_loglet::ReplicatedLogletId;

use crate::metadata::{LogStoreMarker, LogletState};
Expand Down Expand Up @@ -56,6 +56,12 @@ pub trait LogStore: Clone + Send + 'static {
get_records_message: GetRecords,
loglet_state: &LogletState,
) -> impl Future<Output = Result<Records, OperationError>> + Send;

fn get_records_digest(
&mut self,
get_records_message: GetDigest,
loglet_state: &LogletState,
) -> impl Future<Output = Result<Digest, OperationError>> + Send;
}

/// A future that resolves when a log-store operation is completed
Expand Down
25 changes: 25 additions & 0 deletions crates/log-server/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub struct RequestPump {
get_records_stream: MessageStream<GetRecords>,
trim_stream: MessageStream<Trim>,
wait_for_tail_stream: MessageStream<WaitForTail>,
get_digest_stream: MessageStream<GetDigest>,
}

impl RequestPump {
Expand All @@ -73,6 +74,7 @@ impl RequestPump {
let get_records_stream = router_builder.subscribe_to_stream(queue_length);
let trim_stream = router_builder.subscribe_to_stream(queue_length);
let wait_for_tail_stream = router_builder.subscribe_to_stream(queue_length);
let get_digest_stream = router_builder.subscribe_to_stream(queue_length);
Self {
task_center,
_metadata: metadata,
Expand All @@ -84,6 +86,7 @@ impl RequestPump {
get_records_stream,
trim_stream,
wait_for_tail_stream,
get_digest_stream,
}
}

Expand All @@ -101,6 +104,7 @@ impl RequestPump {
mut get_records_stream,
mut trim_stream,
mut wait_for_tail_stream,
mut get_digest_stream,
..
} = self;

Expand Down Expand Up @@ -135,6 +139,18 @@ impl RequestPump {
Self::shutdown(loglet_workers).await;
return Ok(());
}
Some(get_digest) = get_digest_stream.next() => {
// find the worker or create one.
// enqueue.
let worker = Self::find_or_create_worker(
get_digest.body().header.loglet_id,
&log_store,
&task_center,
&mut state_map,
&mut loglet_workers,
).await?;
Self::on_get_digest(worker, get_digest);
}
Some(wait_for_tail) = wait_for_tail_stream.next() => {
// find the worker or create one.
// enqueue.
Expand Down Expand Up @@ -234,6 +250,15 @@ impl RequestPump {
trace!("All loglet workers have terminated");
}

fn on_get_digest(worker: &LogletWorkerHandle, msg: Incoming<GetDigest>) {
if let Err(msg) = worker.enqueue_get_digest(msg) {
// worker has crashed or shutdown in progress. Notify the sender and drop the message.
if let Err(e) = msg.to_rpc_response(Digest::empty()).try_send() {
debug!(?e.source, peer = %e.original.peer(), "Failed to respond to GetDigest message with status Disabled due to peer channel capacity being full");
}
}
}

fn on_wait_for_tail(worker: &LogletWorkerHandle, msg: Incoming<WaitForTail>) {
if let Err(msg) = worker.enqueue_wait_for_tail(msg) {
// worker has crashed or shutdown in progress. Notify the sender and drop the message.
Expand Down
Loading
Loading