Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GetInvocationId and CancelInvocation entries #10

Merged
merged 5 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion service-protocol/dev/restate/service/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ enum ServiceProtocolVersion {
// Added
// * Entry retry mechanism: ErrorMessage.next_retry_delay, StartMessage.retry_count_since_last_stored_entry and StartMessage.duration_since_last_stored_entry
V2 = 2;
// Added
// * New entry to cancel invocations: CancelInvocationEntryMessage
// * New entry to retrieve the invocation id: GetCallInvocationIdEntryMessage
// * New field to set idempotency key for Call entries
V3 = 3;
}

// --- Core frames ---
Expand Down Expand Up @@ -313,6 +318,8 @@ message CallEntryMessage {
// If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise.
string key = 5;

string idempotency_key = 6;

oneof result {
bytes value = 14;
Failure failure = 15;
Expand Down Expand Up @@ -342,6 +349,8 @@ message OneWayCallEntryMessage {
// If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise.
string key = 6;

string idempotency_key = 7;

// Entry name
string name = 12;
}
Expand Down Expand Up @@ -383,13 +392,43 @@ message CompleteAwakeableEntryMessage {
message RunEntryMessage {
oneof result {
bytes value = 14;
dev.restate.service.protocol.Failure failure = 15;
Failure failure = 15;
};

// Entry name
string name = 12;
}

// Completable: No
// Fallible: Yes
// Type: 0x0C00 + 6
message CancelInvocationEntryMessage {
oneof target {
// Target invocation id to cancel
string invocation_id = 1;
// Target index of the call/one way call journal entry in this journal.
uint32 call_entry_index = 2;
}

// Entry name
string name = 12;
}

// Completable: Yes
// Fallible: Yes
// Type: 0x0C00 + 7
message GetCallInvocationIdEntryMessage {
// Index of the call/one way call journal entry in this journal.
uint32 call_entry_index = 1;

oneof result {
string value = 14;
Failure failure = 15;
};

string name = 12;
}

// --- Nested messages

// This failure object carries user visible errors,
Expand Down
38 changes: 20 additions & 18 deletions service-protocol/service-invocation-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,24 +330,26 @@ used for observability purposes by Restate observability tools.
The following tables describe the currently available journal entries. For more details, check the protobuf message
descriptions in [`protocol.proto`](dev/restate/service/protocol.proto).

| Message | Type | Completable | Fallible | Description |
| ------------------------------- | -------- | ----------- | -------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `InputEntryMessage` | `0x0400` | No | No | Carries the invocation input message(s) of the invocation. |
| `GetStateEntryMessage` | `0x0800` | Yes | No | Get the value of a service instance state key. |
| `GetStateKeysEntryMessage` | `0x0804` | Yes | No | Get all the known state keys for this service instance. Note: the completion value for this message is a protobuf of type `GetStateKeysEntryMessage.StateKeys`. |
| `SleepEntryMessage` | `0x0C00` | Yes | No | Initiate a timer that completes after the given time. |
| `CallEntryMessage` | `0x0C01` | Yes | Yes | Invoke another Restate service. |
| `AwakeableEntryMessage` | `0x0C03` | Yes | No | Arbitrary result container which can be completed from another service, given a specific id. See [Awakeable identifier](#awakeable-identifier) for more details. |
| `OneWayCallEntryMessage` | `0x0C02` | No | Yes | Invoke another Restate service at the given time, without waiting for the response. |
| `CompleteAwakeableEntryMessage` | `0x0C04` | No | Yes | Complete an `Awakeable`, given its id. See [Awakeable identifier](#awakeable-identifier) for more details. |
| `OutputEntryMessage` | `0x0401` | No | No | Carries the invocation output message(s) or terminal failure of the invocation. |
| `SetStateEntryMessage` | `0x0800` | No | No | Set the value of a service instance state key. |
| `ClearStateEntryMessage` | `0x0801` | No | No | Clear the value of a service instance state key. |
| `ClearAllStateEntryMessage` | `0x0802` | No | No | Clear all the values of the service instance state. |
| `RunEntryMessage` | `0x0C05` | No | No | Run non-deterministic user provided code and persist the result. |
| `GetPromiseEntryMessage` | `0x0808` | Yes | No | Get or wait the value of the given promise. If the value is not present yet, this entry will block waiting for the value. |
| `PeekPromiseEntryMessage` | `0x0809` | Yes | No | Get the value of the given promise. If the value is not present, this entry completes immediately with empty completion. |
| `CompletePromiseEntryMessage` | `0x080A` | Yes | No | Complete the given promise. If the promise was completed already, this entry completes with a failure. |
| Message | Type | Completable | Fallible | Description |
|-----------------------------------|----------|-------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `InputEntryMessage` | `0x0400` | No | No | Carries the invocation input message(s) of the invocation. |
| `GetStateEntryMessage` | `0x0800` | Yes | No | Get the value of a service instance state key. |
| `GetStateKeysEntryMessage` | `0x0804` | Yes | No | Get all the known state keys for this service instance. Note: the completion value for this message is a protobuf of type `GetStateKeysEntryMessage.StateKeys`. |
| `SleepEntryMessage` | `0x0C00` | Yes | No | Initiate a timer that completes after the given time. |
| `CallEntryMessage` | `0x0C01` | Yes | Yes | Invoke another Restate service. |
| `AwakeableEntryMessage` | `0x0C03` | Yes | No | Arbitrary result container which can be completed from another service, given a specific id. See [Awakeable identifier](#awakeable-identifier) for more details. |
| `OneWayCallEntryMessage` | `0x0C02` | No | Yes | Invoke another Restate service at the given time, without waiting for the response. |
| `CompleteAwakeableEntryMessage` | `0x0C04` | No | Yes | Complete an `Awakeable`, given its id. See [Awakeable identifier](#awakeable-identifier) for more details. |
| `OutputEntryMessage` | `0x0401` | No | No | Carries the invocation output message(s) or terminal failure of the invocation. |
| `SetStateEntryMessage` | `0x0800` | No | No | Set the value of a service instance state key. |
| `ClearStateEntryMessage` | `0x0801` | No | No | Clear the value of a service instance state key. |
| `ClearAllStateEntryMessage` | `0x0802` | No | No | Clear all the values of the service instance state. |
| `RunEntryMessage` | `0x0C05` | No | No | Run non-deterministic user provided code and persist the result. |
| `GetPromiseEntryMessage` | `0x0808` | Yes | No | Get or wait the value of the given promise. If the value is not present yet, this entry will block waiting for the value. |
| `PeekPromiseEntryMessage` | `0x0809` | Yes | No | Get the value of the given promise. If the value is not present, this entry completes immediately with empty completion. |
| `CompletePromiseEntryMessage` | `0x080A` | Yes | No | Complete the given promise. If the promise was completed already, this entry completes with a failure. |
| `CancelInvocationEntryMessage` | `0x0C06` | No | Yes | Cancel the target invocation id or the target journal entry. |
| `GetCallInvocationIdEntryMessage` | `0x0C07` | Yes | Yes | Get the invocation id of a previously created call/one way call. |

#### Awakeable identifier

Expand Down
40 changes: 39 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pub struct Target {
pub service: String,
pub handler: String,
pub key: Option<String>,
pub idempotency_key: Option<String>,
}

#[derive(Debug, Hash, Clone, Copy, Eq, PartialEq)]
Expand All @@ -139,6 +140,21 @@ impl From<AsyncResultHandle> for u32 {
}
}

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct SendHandle(u32);

impl From<u32> for SendHandle {
fn from(value: u32) -> Self {
SendHandle(value)
}
}

impl From<SendHandle> for u32 {
fn from(value: SendHandle) -> Self {
value.0
}
}

#[derive(Debug, Eq, PartialEq)]
pub enum Value {
/// a void/None/undefined success
Expand All @@ -147,6 +163,8 @@ pub enum Value {
Failure(TerminalFailure),
/// Only returned for get_state_keys
StateKeys(Vec<String>),
/// Only returned for get_call_invocation_id
InvocationId(String),
CombinatorResult(Vec<AsyncResultHandle>),
}

Expand Down Expand Up @@ -196,6 +214,19 @@ impl From<NonEmptyValue> for Value {
}
}

#[derive(Debug, Eq, PartialEq)]
pub enum GetInvocationIdTarget {
CallEntry(AsyncResultHandle),
SendEntry(SendHandle),
}

#[derive(Debug, Eq, PartialEq)]
pub enum CancelInvocationTarget {
InvocationId(String),
CallEntry(AsyncResultHandle),
SendEntry(SendHandle),
}

#[derive(Debug, Eq, PartialEq)]
pub enum TakeOutputResult {
Buffer(Bytes),
Expand Down Expand Up @@ -274,7 +305,7 @@ pub trait VM: Sized {
target: Target,
input: Bytes,
execution_time_since_unix_epoch: Option<Duration>,
) -> VMResult<()>;
) -> VMResult<SendHandle>;

fn sys_awakeable(&mut self) -> VMResult<(String, AsyncResultHandle)>;

Expand All @@ -298,6 +329,13 @@ pub trait VM: Sized {
retry_policy: RetryPolicy,
) -> VMResult<AsyncResultHandle>;

fn sys_get_call_invocation_id(
&mut self,
call: GetInvocationIdTarget,
) -> VMResult<AsyncResultHandle>;

fn sys_cancel_invocation(&mut self, target: CancelInvocationTarget) -> VMResult<()>;

fn sys_write_output(&mut self, value: NonEmptyValue) -> VMResult<()>;

fn sys_end(&mut self) -> VMResult<()>;
Expand Down
57 changes: 57 additions & 0 deletions src/service_protocol/generated/dev.restate.service.protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ pub struct CallEntryMessage {
/// If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise.
#[prost(string, tag = "5")]
pub key: ::prost::alloc::string::String,
#[prost(string, tag = "6")]
pub idempotency_key: ::prost::alloc::string::String,
/// Entry name
#[prost(string, tag = "12")]
pub name: ::prost::alloc::string::String,
Expand Down Expand Up @@ -399,6 +401,8 @@ pub struct OneWayCallEntryMessage {
/// If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise.
#[prost(string, tag = "6")]
pub key: ::prost::alloc::string::String,
#[prost(string, tag = "7")]
pub idempotency_key: ::prost::alloc::string::String,
/// Entry name
#[prost(string, tag = "12")]
pub name: ::prost::alloc::string::String,
Expand Down Expand Up @@ -471,6 +475,52 @@ pub mod run_entry_message {
Failure(super::Failure),
}
}
/// Completable: No
/// Fallible: Yes
/// Type: 0x0C00 + 6
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CancelInvocationEntryMessage {
/// Entry name
#[prost(string, tag = "12")]
pub name: ::prost::alloc::string::String,
#[prost(oneof = "cancel_invocation_entry_message::Target", tags = "1, 2")]
pub target: ::core::option::Option<cancel_invocation_entry_message::Target>,
}
/// Nested message and enum types in `CancelInvocationEntryMessage`.
pub mod cancel_invocation_entry_message {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Target {
/// Target invocation id to cancel
#[prost(string, tag = "1")]
InvocationId(::prost::alloc::string::String),
/// Target index of the call/one way call journal entry in this journal.
#[prost(uint32, tag = "2")]
CallEntryIndex(u32),
}
}
/// Completable: Yes
/// Fallible: Yes
/// Type: 0x0C00 + 7
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetCallInvocationIdEntryMessage {
/// Index of the call/one way call journal entry in this journal.
#[prost(uint32, tag = "1")]
pub call_entry_index: u32,
#[prost(string, tag = "12")]
pub name: ::prost::alloc::string::String,
#[prost(oneof = "get_call_invocation_id_entry_message::Result", tags = "14, 15")]
pub result: ::core::option::Option<get_call_invocation_id_entry_message::Result>,
}
/// Nested message and enum types in `GetCallInvocationIdEntryMessage`.
pub mod get_call_invocation_id_entry_message {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Result {
#[prost(string, tag = "14")]
Value(::prost::alloc::string::String),
#[prost(message, tag = "15")]
Failure(super::Failure),
}
}
/// This failure object carries user visible errors,
/// e.g. invocation failure return value or failure result of an InvokeEntryMessage.
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -501,6 +551,11 @@ pub enum ServiceProtocolVersion {
/// Added
/// * Entry retry mechanism: ErrorMessage.next_retry_delay, StartMessage.retry_count_since_last_stored_entry and StartMessage.duration_since_last_stored_entry
V2 = 2,
/// Added
/// * New entry to cancel invocations: CancelInvocationEntryMessage
/// * New entry to retrieve the invocation id: GetCallInvocationIdEntryMessage
/// * New field to set idempotency key for Call entries
V3 = 3,
}
impl ServiceProtocolVersion {
/// String value of the enum field names used in the ProtoBuf definition.
Expand All @@ -512,6 +567,7 @@ impl ServiceProtocolVersion {
Self::Unspecified => "SERVICE_PROTOCOL_VERSION_UNSPECIFIED",
Self::V1 => "V1",
Self::V2 => "V2",
Self::V3 => "V3",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
Expand All @@ -520,6 +576,7 @@ impl ServiceProtocolVersion {
"SERVICE_PROTOCOL_VERSION_UNSPECIFIED" => Some(Self::Unspecified),
"V1" => Some(Self::V1),
"V2" => Some(Self::V2),
"V3" => Some(Self::V3),
_ => None,
}
}
Expand Down
Loading