Skip to content

Commit

Permalink
Add state keys
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Aug 8, 2024
1 parent d8b7c88 commit 5fcf45a
Show file tree
Hide file tree
Showing 9 changed files with 510 additions and 63 deletions.
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,12 @@ impl From<AsyncResultHandle> for u32 {

#[derive(Debug, Eq, PartialEq)]
pub enum Value {
// a void/None/undefined success
/// a void/None/undefined success
Void,
Success(Vec<u8>),
Failure(Failure),
/// Only returned for get_state_keys
StateKeys(Vec<String>),
}

/// Terminal failure
Expand Down Expand Up @@ -165,7 +167,7 @@ pub trait VM: Sized {

fn sys_get_state(&mut self, key: String) -> VMResult<AsyncResultHandle>;

// TODO sys_get_keys_state(&mut self)
fn sys_get_keys_state(&mut self) -> VMResult<AsyncResultHandle>;

fn sys_set_state(&mut self, key: String, value: Vec<u8>) -> VMResult<()>;

Expand Down
167 changes: 127 additions & 40 deletions src/service_protocol/messages.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::service_protocol::messages::get_state_keys_entry_message::StateKeys;
use crate::service_protocol::{MessageHeader, MessageType};
use crate::{NonEmptyValue, Value};
use crate::vm::errors::{DecodeStateKeysProst, DecodeStateKeysUtf8, EmptyStateKeys};
use crate::{NonEmptyValue, VMError, Value};
use paste::paste;
use prost::Message;

Expand All @@ -23,7 +25,8 @@ pub trait EntryMessageHeaderEq {

pub trait CompletableEntryMessage: RestateMessage + EntryMessage + EntryMessageHeaderEq {
fn is_completed(&self) -> bool;
fn into_completion(self) -> Option<Value>;
fn into_completion(self) -> Result<Option<Value>, VMError>;
fn completion_parsing_hint() -> CompletionParsingHint;
}

impl<M: CompletableEntryMessage> WriteableRestateMessage for M {
Expand Down Expand Up @@ -70,8 +73,12 @@ macro_rules! impl_message_traits {
self.result.is_some()
}

fn into_completion(self) -> Option<Value> {
self.result.map(Into::into)
fn into_completion(self) -> Result<Option<Value>, VMError> {
self.result.map(TryInto::try_into).transpose()
}

fn completion_parsing_hint() -> CompletionParsingHint {
CompletionParsingHint::EmptyOrSuccessOrValue
}
}
};
Expand Down Expand Up @@ -118,6 +125,27 @@ impl EntryMessageHeaderEq for GetStateEntryMessage {
}
}

impl_message_traits!(GetStateKeysEntry: message);
impl_message_traits!(GetStateKeysEntry: entry);
impl CompletableEntryMessage for GetStateKeysEntryMessage {
fn is_completed(&self) -> bool {
self.result.is_some()
}

fn into_completion(self) -> Result<Option<Value>, VMError> {
self.result.map(TryInto::try_into).transpose()
}

fn completion_parsing_hint() -> CompletionParsingHint {
CompletionParsingHint::StateKeys
}
}
impl EntryMessageHeaderEq for GetStateKeysEntryMessage {
fn header_eq(&self, _: &Self) -> bool {
true
}
}

impl_message_traits!(SetStateEntry: non_completable_entry);

impl_message_traits!(ClearStateEntry: non_completable_entry);
Expand Down Expand Up @@ -207,78 +235,101 @@ impl EntryMessageHeaderEq for RunEntryMessage {

// --- Completion extraction

impl From<completion_message::Result> for Value {
fn from(value: completion_message::Result) -> Self {
match value {
completion_message::Result::Empty(_) => Value::Void,
completion_message::Result::Value(b) => Value::Success(b.to_vec()),
completion_message::Result::Failure(f) => Value::Failure(f.into()),
}
}
}
impl TryFrom<get_state_entry_message::Result> for Value {
type Error = VMError;

impl From<get_state_entry_message::Result> for Value {
fn from(value: get_state_entry_message::Result) -> Self {
match value {
fn try_from(value: get_state_entry_message::Result) -> Result<Self, Self::Error> {
Ok(match value {
get_state_entry_message::Result::Empty(_) => Value::Void,
get_state_entry_message::Result::Value(b) => Value::Success(b.into()),
get_state_entry_message::Result::Failure(f) => Value::Failure(f.into()),
}
})
}
}

impl From<sleep_entry_message::Result> for Value {
fn from(value: sleep_entry_message::Result) -> Self {
impl TryFrom<get_state_keys_entry_message::Result> for Value {
type Error = VMError;

fn try_from(value: get_state_keys_entry_message::Result) -> Result<Self, Self::Error> {
match value {
get_state_keys_entry_message::Result::Value(state_keys) => {
let mut state_keys = state_keys
.keys
.into_iter()
.map(|b| String::from_utf8(b.to_vec()).map_err(DecodeStateKeysUtf8))
.collect::<Result<Vec<_>, _>>()?;
state_keys.sort();
Ok(Value::StateKeys(state_keys))
}
get_state_keys_entry_message::Result::Failure(f) => Ok(Value::Failure(f.into())),
}
}
}

impl TryFrom<sleep_entry_message::Result> for Value {
type Error = VMError;

fn try_from(value: sleep_entry_message::Result) -> Result<Self, Self::Error> {
Ok(match value {
sleep_entry_message::Result::Empty(_) => Value::Void,
sleep_entry_message::Result::Failure(f) => Value::Failure(f.into()),
}
})
}
}

impl From<call_entry_message::Result> for Value {
fn from(value: call_entry_message::Result) -> Self {
match value {
impl TryFrom<call_entry_message::Result> for Value {
type Error = VMError;

fn try_from(value: call_entry_message::Result) -> Result<Self, Self::Error> {
Ok(match value {
call_entry_message::Result::Value(b) => Value::Success(b.into()),
call_entry_message::Result::Failure(f) => Value::Failure(f.into()),
}
})
}
}

impl From<awakeable_entry_message::Result> for Value {
fn from(value: awakeable_entry_message::Result) -> Self {
match value {
impl TryFrom<awakeable_entry_message::Result> for Value {
type Error = VMError;

fn try_from(value: awakeable_entry_message::Result) -> Result<Self, Self::Error> {
Ok(match value {
awakeable_entry_message::Result::Value(b) => Value::Success(b.into()),
awakeable_entry_message::Result::Failure(f) => Value::Failure(f.into()),
}
})
}
}

impl From<get_promise_entry_message::Result> for Value {
fn from(value: get_promise_entry_message::Result) -> Self {
match value {
impl TryFrom<get_promise_entry_message::Result> for Value {
type Error = VMError;

fn try_from(value: get_promise_entry_message::Result) -> Result<Self, Self::Error> {
Ok(match value {
get_promise_entry_message::Result::Value(b) => Value::Success(b.into()),
get_promise_entry_message::Result::Failure(f) => Value::Failure(f.into()),
}
})
}
}

impl From<peek_promise_entry_message::Result> for Value {
fn from(value: peek_promise_entry_message::Result) -> Self {
match value {
impl TryFrom<peek_promise_entry_message::Result> for Value {
type Error = VMError;

fn try_from(value: peek_promise_entry_message::Result) -> Result<Self, Self::Error> {
Ok(match value {
peek_promise_entry_message::Result::Empty(_) => Value::Void,
peek_promise_entry_message::Result::Value(b) => Value::Success(b.into()),
peek_promise_entry_message::Result::Failure(f) => Value::Failure(f.into()),
}
})
}
}

impl From<complete_promise_entry_message::Result> for Value {
fn from(value: complete_promise_entry_message::Result) -> Self {
match value {
impl TryFrom<complete_promise_entry_message::Result> for Value {
type Error = VMError;

fn try_from(value: complete_promise_entry_message::Result) -> Result<Self, Self::Error> {
Ok(match value {
complete_promise_entry_message::Result::Empty(_) => Value::Void,
complete_promise_entry_message::Result::Failure(f) => Value::Failure(f.into()),
}
})
}
}

Expand Down Expand Up @@ -310,3 +361,39 @@ impl From<Failure> for crate::Failure {
}
}
}

// --- Completion parsing

#[derive(Debug)]
pub(crate) enum CompletionParsingHint {
StateKeys,
/// The normal case
EmptyOrSuccessOrValue,
}

impl CompletionParsingHint {
pub(crate) fn parse(self, result: completion_message::Result) -> Result<Value, VMError> {
match self {
CompletionParsingHint::StateKeys => match result {
completion_message::Result::Empty(_) => Err(EmptyStateKeys.into()),
completion_message::Result::Value(b) => {
let mut state_keys = StateKeys::decode(b)
.map_err(DecodeStateKeysProst)?
.keys
.into_iter()
.map(|b| String::from_utf8(b.to_vec()).map_err(DecodeStateKeysUtf8))
.collect::<Result<Vec<_>, _>>()?;
state_keys.sort();

Ok(Value::StateKeys(state_keys))
}
completion_message::Result::Failure(f) => Ok(Value::Failure(f.into())),
},
CompletionParsingHint::EmptyOrSuccessOrValue => Ok(match result {
completion_message::Result::Empty(_) => Value::Void,
completion_message::Result::Value(b) => Value::Success(b.to_vec()),
completion_message::Result::Failure(f) => Value::Failure(f.into()),
}),
}
}
}
3 changes: 3 additions & 0 deletions src/tests/promise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod get_promise {
}
Value::Success(s) => NonEmptyValue::Success(s),
Value::Failure(f) => NonEmptyValue::Failure(f),
v => panic!("Unexpected value {v:?}"),
};

vm.sys_write_output(output).unwrap();
Expand Down Expand Up @@ -136,6 +137,7 @@ mod peek_promise {
Value::Void => NonEmptyValue::Success("null".into()),
Value::Success(s) => NonEmptyValue::Success(s),
Value::Failure(f) => NonEmptyValue::Failure(f),
v => panic!("Unexpected value {v:?}"),
};

vm.sys_write_output(output).unwrap();
Expand Down Expand Up @@ -294,6 +296,7 @@ mod complete_promise {
Value::Void => RESOLVED,
Value::Success(_) => panic!("Unexpected success completion"),
Value::Failure(_) => REJECTED,
v => panic!("Unexpected value {v:?}"),
};

vm.sys_write_output(NonEmptyValue::Success(output.to_vec()))
Expand Down
Loading

0 comments on commit 5fcf45a

Please sign in to comment.