Skip to content

Commit

Permalink
Implement GetCallInvocationId and test it.
Browse files Browse the repository at this point in the history
Also reorganize a bit the fixtures and matchers in test code to reuse them.
  • Loading branch information
slinkydeveloper committed Sep 17, 2024
1 parent 0e08bf7 commit 78c0e64
Show file tree
Hide file tree
Showing 7 changed files with 575 additions and 312 deletions.
44 changes: 38 additions & 6 deletions crates/service-protocol/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,16 @@ mod test_util {
};
use restate_types::journal::{
AwakeableEntry, CancelInvocationEntry, CancelInvocationTarget, CompletableEntry,
CompleteAwakeableEntry, EntryResult, GetStateKeysEntry, GetStateKeysResult, InputEntry,
OutputEntry,
CompleteAwakeableEntry, EntryResult, GetCallInvocationIdEntry, GetCallInvocationIdResult,
GetStateKeysEntry, GetStateKeysResult, InputEntry, OutputEntry,
};
use restate_types::service_protocol::{
awakeable_entry_message, call_entry_message, cancel_invocation_entry_message,
complete_awakeable_entry_message, get_state_entry_message, get_state_keys_entry_message,
output_entry_message, AwakeableEntryMessage, CallEntryMessage,
CancelInvocationEntryMessage, ClearAllStateEntryMessage, ClearStateEntryMessage,
CompleteAwakeableEntryMessage, Failure, GetStateEntryMessage, GetStateKeysEntryMessage,
complete_awakeable_entry_message, get_call_invocation_id_entry_message,
get_state_entry_message, get_state_keys_entry_message, output_entry_message,
AwakeableEntryMessage, CallEntryMessage, CancelInvocationEntryMessage,
ClearAllStateEntryMessage, ClearStateEntryMessage, CompleteAwakeableEntryMessage, Failure,
GetCallInvocationIdEntryMessage, GetStateEntryMessage, GetStateKeysEntryMessage,
InputEntryMessage, OneWayCallEntryMessage, OutputEntryMessage, SetStateEntryMessage,
};

Expand Down Expand Up @@ -353,6 +354,12 @@ mod test_util {
EnrichedEntryHeader::CancelInvocation {},
Self::serialize_cancel_invocation_entry(entry),
),
Entry::GetCallInvocationId(entry) => EnrichedRawEntry::new(
EnrichedEntryHeader::GetCallInvocationId {
is_completed: entry.is_completed(),
},
Self::serialize_get_call_invocation_id_entry(entry),
),
_ => unimplemented!(),
}
}
Expand Down Expand Up @@ -463,6 +470,31 @@ mod test_util {
.encode_to_vec()
.into()
}

fn serialize_get_call_invocation_id_entry(
GetCallInvocationIdEntry {
call_entry_index,
result,
}: GetCallInvocationIdEntry,
) -> Bytes {
GetCallInvocationIdEntryMessage {
call_entry_index,
result: result.map(|res| match res {
GetCallInvocationIdResult::InvocationId(success) => {
get_call_invocation_id_entry_message::Result::Value(success)
}
GetCallInvocationIdResult::Failure(code, reason) => {
get_call_invocation_id_entry_message::Result::Failure(Failure {
code: code.into(),
message: reason.to_string(),
})
}
}),
..Default::default()
}
.encode_to_vec()
.into()
}
}
}

Expand Down
17 changes: 17 additions & 0 deletions crates/types/src/journal/entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ impl Entry {
pub fn cancel_invocation(target: CancelInvocationTarget) -> Entry {
Entry::CancelInvocation(CancelInvocationEntry { target })
}

pub fn get_call_invocation_id(
call_entry_index: EntryIndex,
result: Option<GetCallInvocationIdResult>,
) -> Entry {
Entry::GetCallInvocationId(GetCallInvocationIdEntry {
call_entry_index,
result,
})
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -222,6 +232,7 @@ mod private {
impl Sealed for SleepEntry {}
impl Sealed for InvokeEntry {}
impl Sealed for AwakeableEntry {}
impl Sealed for GetCallInvocationIdEntry {}
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -427,3 +438,9 @@ pub struct GetCallInvocationIdEntry {
pub call_entry_index: EntryIndex,
pub result: Option<GetCallInvocationIdResult>,
}

impl CompletableEntry for GetCallInvocationIdEntry {
fn is_completed(&self) -> bool {
self.result.is_some()
}
}
120 changes: 81 additions & 39 deletions crates/worker/src/partition/state_machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2378,8 +2378,39 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
self.apply_cancel_invocation_journal_entry_action(ctx, &invocation_id, entry)
.await?;
}
EntryHeader::GetCallInvocationId { .. } => {
todo!()
EntryHeader::GetCallInvocationId { is_completed } => {
if !is_completed {
let_assert!(
Entry::GetCallInvocationId(entry) =
journal_entry.deserialize_entry_ref::<Codec>()?
);
let callee_invocation_id = Self::get_journal_entry_callee_invocation_id(
ctx,
&invocation_id,
entry.call_entry_index,
)
.await?;

if let Some(callee_invocation_id) = callee_invocation_id {
let completion_result = CompletionResult::Success(Bytes::from(
callee_invocation_id.to_string(),
));

Codec::write_completion(&mut journal_entry, completion_result.clone())?;
Self::do_forward_completion(
ctx,
invocation_id,
Completion::new(entry_index, completion_result),
);
} else {
// Nothing we can do here, just forward an empty completion (which is invalid for this entry).
Self::do_forward_completion(
ctx,
invocation_id,
Completion::new(entry_index, CompletionResult::Empty),
);
}
}
}
}

Expand Down Expand Up @@ -2422,44 +2453,8 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
}
CancelInvocationTarget::CallEntryIndex(call_entry_index) => {
// Look for the given entry index, then resolve the invocation id.
match ctx
.storage
.get_journal_entry(invocation_id, call_entry_index)
Self::get_journal_entry_callee_invocation_id(ctx, invocation_id, call_entry_index)
.await?
{
Some(JournalEntry::Entry(e)) => {
match e.header() {
EnrichedEntryHeader::Call {
enrichment_result: Some(CallEnrichmentResult { invocation_id, .. }),
..
}
| EnrichedEntryHeader::OneWayCall {
enrichment_result: CallEnrichmentResult { invocation_id, .. },
..
} => Some(*invocation_id),
// This is the corner case when there is no enrichment result due to
// the invocation being already completed from the SDK. Nothing to do here.
EnrichedEntryHeader::Call {
enrichment_result: None,
..
} => None,
_ => {
warn!(
"The given journal entry index '{}' is not a Call/OneWayCall entry. This is potentially an SDK bug.",
call_entry_index
);
None
}
}
}
_ => {
warn!(
"The given journal entry index '{}' does not exist. This is potentially an SDK bug.",
call_entry_index
);
None
}
}
}
};

Expand All @@ -2476,6 +2471,53 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> {
Ok(())
}

async fn get_journal_entry_callee_invocation_id<State: ReadOnlyJournalTable>(
ctx: &mut StateMachineApplyContext<'_, State>,
invocation_id: &InvocationId,
call_entry_index: EntryIndex,
) -> Result<Option<InvocationId>, Error> {
Ok(
match ctx
.storage
.get_journal_entry(invocation_id, call_entry_index)
.await?
{
Some(JournalEntry::Entry(e)) => {
match e.header() {
EnrichedEntryHeader::Call {
enrichment_result: Some(CallEnrichmentResult { invocation_id, .. }),
..
}
| EnrichedEntryHeader::OneWayCall {
enrichment_result: CallEnrichmentResult { invocation_id, .. },
..
} => Some(*invocation_id),
// This is the corner case when there is no enrichment result due to
// the invocation being already completed from the SDK. Nothing to do here.
EnrichedEntryHeader::Call {
enrichment_result: None,
..
} => None,
_ => {
warn!(
"The given journal entry index '{}' is not a Call/OneWayCall entry.",
call_entry_index
);
None
}
}
}
_ => {
warn!(
"The given journal entry index '{}' does not exist.",
call_entry_index
);
None
}
},
)
}

async fn handle_completion<State: JournalTable + InvocationStatusTable>(
ctx: &mut StateMachineApplyContext<'_, State>,
invocation_id: InvocationId,
Expand Down
121 changes: 121 additions & 0 deletions crates/worker/src/partition/state_machine/tests/fixtures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::partition::state_machine::tests::TestEnv;
use crate::partition::state_machine::Action;
use bytes::Bytes;
use googletest::prelude::*;
use restate_invoker_api::InvokeInputJournal;
use restate_storage_api::journal_table::JournalEntry;
use restate_types::identifiers::{InvocationId, ServiceId};
use restate_types::invocation::{
InvocationTarget, ServiceInvocation, ServiceInvocationSpanContext, Source,
};
use restate_types::journal::enriched::{
CallEnrichmentResult, EnrichedEntryHeader, EnrichedRawEntry,
};
use restate_wal_protocol::Command;

pub fn completed_invoke_entry(invocation_id: InvocationId) -> JournalEntry {
JournalEntry::Entry(EnrichedRawEntry::new(
EnrichedEntryHeader::Call {
is_completed: true,
enrichment_result: Some(CallEnrichmentResult {
invocation_id,
invocation_target: InvocationTarget::mock_service(),
completion_retention_time: None,
span_context: ServiceInvocationSpanContext::empty(),
}),
},
Bytes::default(),
))
}

pub fn background_invoke_entry(invocation_id: InvocationId) -> JournalEntry {
JournalEntry::Entry(EnrichedRawEntry::new(
EnrichedEntryHeader::OneWayCall {
enrichment_result: CallEnrichmentResult {
invocation_id,
invocation_target: InvocationTarget::mock_service(),
completion_retention_time: None,
span_context: ServiceInvocationSpanContext::empty(),
},
},
Bytes::default(),
))
}

pub fn uncompleted_invoke_entry(invocation_id: InvocationId) -> JournalEntry {
JournalEntry::Entry(EnrichedRawEntry::new(
EnrichedEntryHeader::Call {
is_completed: false,
enrichment_result: Some(CallEnrichmentResult {
invocation_id,
invocation_target: InvocationTarget::mock_service(),
completion_retention_time: None,
span_context: ServiceInvocationSpanContext::empty(),
}),
},
Bytes::default(),
))
}

pub async fn mock_start_invocation_with_service_id(
state_machine: &mut TestEnv,
service_id: ServiceId,
) -> InvocationId {
mock_start_invocation_with_invocation_target(
state_machine,
InvocationTarget::mock_from_service_id(service_id),
)
.await
}

pub async fn mock_start_invocation_with_invocation_target(
state_machine: &mut TestEnv,
invocation_target: InvocationTarget,
) -> InvocationId {
let invocation_id = InvocationId::generate(&invocation_target);

let actions = state_machine
.apply(Command::Invoke(ServiceInvocation {
invocation_id,
invocation_target: invocation_target.clone(),
argument: Default::default(),
source: Source::Ingress,
response_sink: None,
span_context: Default::default(),
headers: vec![],
execution_time: None,
completion_retention_duration: None,
idempotency_key: None,
submit_notification_sink: None,
}))
.await;

assert_that!(
actions,
contains(pat!(Action::Invoke {
invocation_id: eq(invocation_id),
invocation_target: eq(invocation_target),
invoke_input_journal: pat!(InvokeInputJournal::CachedJournal(_, _))
}))
);

invocation_id
}

pub async fn mock_start_invocation(state_machine: &mut TestEnv) -> InvocationId {
mock_start_invocation_with_invocation_target(
state_machine,
InvocationTarget::mock_virtual_object(),
)
.await
}
Loading

0 comments on commit 78c0e64

Please sign in to comment.