Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CancelInvocation and GetCallInvocationId entries. #1967

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/invoker-api/src/entry_enricher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ pub mod test_util {
}
PlainEntryHeader::Run {} => EnrichedEntryHeader::Run {},
PlainEntryHeader::Custom { code } => EnrichedEntryHeader::Custom { code },
PlainEntryHeader::CancelInvocation => EnrichedEntryHeader::CancelInvocation,
PlainEntryHeader::GetCallInvocationId { is_completed } => {
EnrichedEntryHeader::GetCallInvocationId { is_completed }
}
};

Ok(RawEntry::new(enriched_header, entry))
Expand Down
5 changes: 5 additions & 0 deletions crates/invoker-impl/src/invocation_task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ const SERVICE_PROTOCOL_VERSION_V1: HeaderValue =
const SERVICE_PROTOCOL_VERSION_V2: HeaderValue =
HeaderValue::from_static("application/vnd.restate.invocation.v2");

#[allow(clippy::declare_interior_mutable_const)]
const SERVICE_PROTOCOL_VERSION_V3: HeaderValue =
HeaderValue::from_static("application/vnd.restate.invocation.v3");

#[allow(clippy::declare_interior_mutable_const)]
const X_RESTATE_SERVER: HeaderName = HeaderName::from_static("x-restate-server");

Expand Down Expand Up @@ -511,6 +515,7 @@ fn service_protocol_version_to_header_value(
}
ServiceProtocolVersion::V1 => SERVICE_PROTOCOL_VERSION_V1,
ServiceProtocolVersion::V2 => SERVICE_PROTOCOL_VERSION_V2,
ServiceProtocolVersion::V3 => SERVICE_PROTOCOL_VERSION_V3,
}
}

Expand Down
70 changes: 64 additions & 6 deletions crates/service-protocol/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ impl RawEntryCodec for ProtobufRawEntryCodec {
OneWayCall,
Awakeable,
CompleteAwakeable,
Run
Run,
CancelInvocation,
GetCallInvocationId
})
}

Expand Down Expand Up @@ -174,14 +176,17 @@ mod test_util {
AwakeableEnrichmentResult, CallEnrichmentResult, EnrichedEntryHeader, EnrichedRawEntry,
};
use restate_types::journal::{
AwakeableEntry, CompletableEntry, CompleteAwakeableEntry, EntryResult, GetStateKeysEntry,
GetStateKeysResult, InputEntry, OutputEntry,
AwakeableEntry, CancelInvocationEntry, CancelInvocationTarget, CompletableEntry,
CompleteAwakeableEntry, EntryResult, GetCallInvocationIdEntry, GetCallInvocationIdResult,
GetStateKeysEntry, GetStateKeysResult, InputEntry, OutputEntry,
};
use restate_types::service_protocol::{
awakeable_entry_message, call_entry_message, complete_awakeable_entry_message,
awakeable_entry_message, call_entry_message, cancel_invocation_entry_message,
complete_awakeable_entry_message, get_call_invocation_id_entry_message,
get_state_entry_message, get_state_keys_entry_message, output_entry_message,
AwakeableEntryMessage, CallEntryMessage, ClearAllStateEntryMessage, ClearStateEntryMessage,
CompleteAwakeableEntryMessage, Failure, GetStateEntryMessage, GetStateKeysEntryMessage,
AwakeableEntryMessage, CallEntryMessage, CancelInvocationEntryMessage,
ClearAllStateEntryMessage, ClearStateEntryMessage, CompleteAwakeableEntryMessage, Failure,
GetCallInvocationIdEntryMessage, GetStateEntryMessage, GetStateKeysEntryMessage,
InputEntryMessage, OneWayCallEntryMessage, OutputEntryMessage, SetStateEntryMessage,
};

Expand Down Expand Up @@ -345,6 +350,16 @@ mod test_util {
},
Self::serialize_awakeable_entry(entry),
),
Entry::CancelInvocation(entry) => EnrichedRawEntry::new(
EnrichedEntryHeader::CancelInvocation {},
Self::serialize_cancel_invocation_entry(entry),
),
Entry::GetCallInvocationId(entry) => EnrichedRawEntry::new(
EnrichedEntryHeader::GetCallInvocationId {
is_completed: entry.is_completed(),
},
Self::serialize_get_call_invocation_id_entry(entry),
),
_ => unimplemented!(),
}
}
Expand Down Expand Up @@ -437,6 +452,49 @@ mod test_util {
.encode_to_vec()
.into()
}

fn serialize_cancel_invocation_entry(
CancelInvocationEntry { target }: CancelInvocationEntry,
) -> Bytes {
CancelInvocationEntryMessage {
target: Some(match target {
CancelInvocationTarget::InvocationId(id) => {
cancel_invocation_entry_message::Target::InvocationId(id.to_string())
}
CancelInvocationTarget::CallEntryIndex(idx) => {
cancel_invocation_entry_message::Target::CallEntryIndex(idx)
}
}),
..Default::default()
}
.encode_to_vec()
.into()
}

fn serialize_get_call_invocation_id_entry(
GetCallInvocationIdEntry {
call_entry_index,
result,
}: GetCallInvocationIdEntry,
) -> Bytes {
GetCallInvocationIdEntryMessage {
call_entry_index,
result: result.map(|res| match res {
GetCallInvocationIdResult::InvocationId(success) => {
get_call_invocation_id_entry_message::Result::Value(success)
}
GetCallInvocationIdResult::Failure(code, reason) => {
get_call_invocation_id_entry_message::Result::Failure(Failure {
code: code.into(),
message: reason.to_string(),
})
}
}),
..Default::default()
}
.encode_to_vec()
.into()
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions crates/service-protocol/src/message/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ fn message_header_to_raw_header(message_header: &MessageHeader) -> PlainEntryHea
enrichment_result: (),
},
MessageType::SideEffectEntry => PlainEntryHeader::Run {},
MessageType::CancelInvocationEntry => PlainEntryHeader::CancelInvocation {},
MessageType::GetCallInvocationIdEntry => PlainEntryHeader::GetCallInvocationId {
is_completed: expect_flag!(message_header, completed),
},
MessageType::CustomEntry(code) => PlainEntryHeader::Custom { code },
}
}
Expand All @@ -350,6 +354,8 @@ fn raw_header_to_message_type(entry_header: &PlainEntryHeader) -> MessageType {
PlainEntryHeader::Awakeable { .. } => MessageType::AwakeableEntry,
PlainEntryHeader::CompleteAwakeable { .. } => MessageType::CompleteAwakeableEntry,
PlainEntryHeader::Run { .. } => MessageType::SideEffectEntry,
PlainEntryHeader::CancelInvocation => MessageType::CancelInvocationEntry,
PlainEntryHeader::GetCallInvocationId { .. } => MessageType::GetCallInvocationIdEntry,
PlainEntryHeader::Custom { code, .. } => MessageType::CustomEntry(*code),
}
}
Expand Down
13 changes: 13 additions & 0 deletions crates/service-protocol/src/message/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub enum MessageType {
GetPromiseEntry,
PeekPromiseEntry,
CompletePromiseEntry,
CancelInvocationEntry,
GetCallInvocationIdEntry,
CustomEntry(u16),
}

Expand Down Expand Up @@ -77,6 +79,8 @@ impl MessageType {
MessageType::GetPromiseEntry => MessageKind::State,
MessageType::PeekPromiseEntry => MessageKind::State,
MessageType::CompletePromiseEntry => MessageKind::State,
MessageType::CancelInvocationEntry => MessageKind::Syscall,
MessageType::GetCallInvocationIdEntry => MessageKind::Syscall,
MessageType::CustomEntry(_) => MessageKind::CustomEntry,
}
}
Expand All @@ -92,6 +96,7 @@ impl MessageType {
| MessageType::GetPromiseEntry
| MessageType::PeekPromiseEntry
| MessageType::CompletePromiseEntry
| MessageType::GetCallInvocationIdEntry
)
}

Expand Down Expand Up @@ -125,6 +130,8 @@ const BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE: u16 = 0x0C02;
const AWAKEABLE_ENTRY_MESSAGE_TYPE: u16 = 0x0C03;
const COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE: u16 = 0x0C04;
const SIDE_EFFECT_ENTRY_MESSAGE_TYPE: u16 = 0x0C05;
const CANCEL_INVOCATION_ENTRY_MESSAGE_TYPE: u16 = 0x0C06;
const GET_CALL_INVOCATION_ID_ENTRY_MESSAGE_TYPE: u16 = 0x0C07;

impl From<MessageType> for MessageTypeId {
fn from(mt: MessageType) -> Self {
Expand All @@ -151,6 +158,8 @@ impl From<MessageType> for MessageTypeId {
MessageType::GetPromiseEntry => GET_PROMISE_ENTRY_MESSAGE_TYPE,
MessageType::PeekPromiseEntry => PEEK_PROMISE_ENTRY_MESSAGE_TYPE,
MessageType::CompletePromiseEntry => COMPLETE_PROMISE_ENTRY_MESSAGE_TYPE,
MessageType::CancelInvocationEntry => CANCEL_INVOCATION_ENTRY_MESSAGE_TYPE,
MessageType::GetCallInvocationIdEntry => GET_CALL_INVOCATION_ID_ENTRY_MESSAGE_TYPE,
MessageType::CustomEntry(id) => id,
}
}
Expand Down Expand Up @@ -187,6 +196,8 @@ impl TryFrom<MessageTypeId> for MessageType {
PEEK_PROMISE_ENTRY_MESSAGE_TYPE => Ok(MessageType::PeekPromiseEntry),
COMPLETE_PROMISE_ENTRY_MESSAGE_TYPE => Ok(MessageType::CompletePromiseEntry),
SIDE_EFFECT_ENTRY_MESSAGE_TYPE => Ok(MessageType::SideEffectEntry),
CANCEL_INVOCATION_ENTRY_MESSAGE_TYPE => Ok(MessageType::CancelInvocationEntry),
GET_CALL_INVOCATION_ID_ENTRY_MESSAGE_TYPE => Ok(MessageType::GetCallInvocationIdEntry),
v if ((v & CUSTOM_MESSAGE_MASK) != 0) => Ok(MessageType::CustomEntry(v)),
v => Err(UnknownMessageType(v)),
}
Expand Down Expand Up @@ -214,6 +225,8 @@ impl TryFrom<MessageType> for EntryType {
MessageType::GetPromiseEntry => Ok(EntryType::GetPromise),
MessageType::PeekPromiseEntry => Ok(EntryType::PeekPromise),
MessageType::CompletePromiseEntry => Ok(EntryType::CompletePromise),
MessageType::CancelInvocationEntry => Ok(EntryType::CancelInvocation),
MessageType::GetCallInvocationIdEntry => Ok(EntryType::GetCallInvocationId),
MessageType::CustomEntry(_) => Ok(EntryType::Custom),
MessageType::Start
| MessageType::Completion
Expand Down
9 changes: 9 additions & 0 deletions crates/storage-api/proto/dev/restate/storage/v1/domain.proto
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,13 @@ message EnrichedEntryHeader {
message SideEffect {
}

message CancelInvocation {
}

message GetCallInvocationId {
bool is_completed = 1;
}

message Custom {
uint32 code = 1;
}
Expand All @@ -411,6 +418,8 @@ message EnrichedEntryHeader {
CompleteAwakeable complete_awakeable = 10;
Custom custom = 11;
SideEffect side_effect = 14;
CancelInvocation cancel_invocation = 18;
GetCallInvocationId get_call_invocation_id = 19;
}
}

Expand Down
21 changes: 18 additions & 3 deletions crates/storage-api/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ pub mod v1 {

use crate::storage::v1::dedup_sequence_number::Variant;
use crate::storage::v1::enriched_entry_header::{
Awakeable, BackgroundCall, ClearAllState, ClearState, CompleteAwakeable,
CompletePromise, Custom, GetPromise, GetState, GetStateKeys, Input, Invoke, Output,
PeekPromise, SetState, SideEffect, Sleep,
Awakeable, BackgroundCall, CancelInvocation, ClearAllState, ClearState,
CompleteAwakeable, CompletePromise, Custom, GetCallInvocationId, GetPromise, GetState,
GetStateKeys, Input, Invoke, Output, PeekPromise, SetState, SideEffect, Sleep,
};
use crate::storage::v1::invocation_status::{Completed, Free, Inboxed, Invoked, Suspended};
use crate::storage::v1::journal_entry::completion_result::{Empty, Failure, Success};
Expand Down Expand Up @@ -2210,6 +2210,14 @@ pub mod v1 {
enriched_entry_header::Kind::SideEffect(_) => {
restate_types::journal::enriched::EnrichedEntryHeader::Run {}
}
enriched_entry_header::Kind::CancelInvocation(_) => {
restate_types::journal::enriched::EnrichedEntryHeader::CancelInvocation {}
}
enriched_entry_header::Kind::GetCallInvocationId(entry) => {
restate_types::journal::enriched::EnrichedEntryHeader::GetCallInvocationId {
is_completed: entry.is_completed,
}
}
enriched_entry_header::Kind::Custom(custom) => {
restate_types::journal::enriched::EnrichedEntryHeader::Custom {
code: u16::try_from(custom.code)
Expand Down Expand Up @@ -2306,6 +2314,13 @@ pub mod v1 {
} => enriched_entry_header::Kind::CompletePromise(CompletePromise {
is_completed,
}),
restate_types::journal::enriched::EnrichedEntryHeader::CancelInvocation {
..
} => enriched_entry_header::Kind::CancelInvocation(CancelInvocation {}),
restate_types::journal::enriched::EnrichedEntryHeader::GetCallInvocationId {
is_completed,
..
} => enriched_entry_header::Kind::GetCallInvocationId(GetCallInvocationId { is_completed }),
};

EnrichedEntryHeader { kind: Some(kind) }
Expand Down
2 changes: 2 additions & 0 deletions crates/types/service-protocol/buf.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Generated by buf. DO NOT EDIT.
version: v2
8 changes: 8 additions & 0 deletions crates/types/service-protocol/buf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: v2
name: buf.build/restatedev/service-protocol
lint:
use:
- DEFAULT
breaking:
use:
- FILE
40 changes: 37 additions & 3 deletions crates/types/service-protocol/dev/restate/service/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ enum ServiceProtocolVersion {
// Added
// * Entry retry mechanism: ErrorMessage.next_retry_delay, StartMessage.retry_count_since_last_stored_entry and StartMessage.duration_since_last_stored_entry
V2 = 2;
// Added
// * New entry to cancel invocations: CancelInvocationEntryMessage
// * New entry to retrieve the invocation id: GetCallInvocationIdEntryMessage
V3 = 3;
}

// --- Core frames ---
Expand Down Expand Up @@ -53,8 +57,8 @@ message StartMessage {

// Retry count since the last stored entry.
//
// Please not this count might not be accurate, as it's not durably stored,
// thus it's susceptible to Restate's crashes/leader election changes.
// Please note that this count might not be accurate, as it's not durably stored,
// thus it might get reset in case Restate crashes/changes leader.
uint32 retry_count_since_last_stored_entry = 7;

// Duration since the last stored entry, in milliseconds.
Expand Down Expand Up @@ -383,13 +387,43 @@ message CompleteAwakeableEntryMessage {
message RunEntryMessage {
oneof result {
bytes value = 14;
dev.restate.service.protocol.Failure failure = 15;
Failure failure = 15;
};

// Entry name
string name = 12;
}

// Completable: No
// Fallible: Yes
// Type: 0x0C00 + 6
message CancelInvocationEntryMessage {
oneof target {
// Target invocation id to cancel
string invocation_id = 1;
// Target index of the call/one way call journal entry in this journal.
uint32 call_entry_index = 2;
}

// Entry name
string name = 12;
}

// Completable: Yes
// Fallible: Yes
// Type: 0x0C00 + 7
message GetCallInvocationIdEntryMessage {
// Index of the call/one way call journal entry in this journal.
uint32 call_entry_index = 1;

oneof result {
string value = 14;
Failure failure = 15;
};

string name = 12;
}

// --- Nested messages

// This failure object carries user visible errors,
Expand Down
Loading
Loading