diff --git a/Cargo.lock b/Cargo.lock index 2f251fdd..22c51c58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2381,7 +2381,6 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", - "cid 0.9.0", "clap 4.0.29", "criterion", "derive_more", diff --git a/Cargo.toml b/Cargo.toml index b960dbce..59375a90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ authors = ["Brooklyn Zelenka "] [lib] path = "src/lib.rs" bench = false -doctest = false +doctest = true [[bin]] name = "ipvm" @@ -33,7 +33,6 @@ required-features = ["test_utils"] [dependencies] anyhow = "1.0" async-trait = "0.1" -cid = "0.9" clap = { version = "4.0", features = ["derive"] } derive_more = "0.99.17" diesel = { version = "2.0", features = ["sqlite"] } @@ -58,7 +57,7 @@ ucan = "0.1" url = "2.3" wasmer = { version = "3.1", features = ["compiler"] } wasmer-compiler-singlepass = "3.1.0" -wasmer-middlewares = "3.1.0" +wasmer-middlewares = "3.1" [dev-dependencies] criterion = "0.4" diff --git a/src/main.rs b/src/main.rs index a9348205..c3e314b5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -187,8 +187,6 @@ async fn main() -> Result<()> { .execute(&mut conn) .expect("Error saving new post"); - //todo!("advertise receipt"); - let res_copy = res.clone().into_bytes(); println!("Wasm CID: {closure_cid}"); diff --git a/src/network/client.rs b/src/network/client.rs index 87fd7be0..0ba77f7f 100644 --- a/src/network/client.rs +++ b/src/network/client.rs @@ -3,14 +3,13 @@ use crate::network::{ swarm::ComposedBehaviour, }; use anyhow::Result; -use libp2p::{identity::Keypair, request_response::ResponseChannel, Multiaddr, PeerId, Swarm}; +use libp2p::{request_response::ResponseChannel, Multiaddr, PeerId, Swarm}; use std::collections::HashSet; use tokio::sync::{mpsc, oneshot}; #[derive(Clone)] pub struct Client { sender: mpsc::Sender, - peer_id: PeerId, } impl Client { @@ -20,13 +19,10 @@ impl Client { ) -> Result<(Self, mpsc::Receiver, EventLoop)> { let (command_sender, command_receiver) = mpsc::channel(1); let (event_sender, event_receiver) = mpsc::channel(1); - let keypair = Keypair::generate_ed25519(); - let peer_id = keypair.public().to_peer_id(); Ok(( Client { sender: command_sender, - peer_id, }, event_receiver, EventLoop::new(swarm, command_receiver, event_sender), diff --git a/src/workflow/closure.rs b/src/workflow/closure.rs index 3120ebd4..4730bbcd 100644 --- a/src/workflow/closure.rs +++ b/src/workflow/closure.rs @@ -1,16 +1,22 @@ //! The smallest unit of work in IPVM -use crate::workflow::pointer::{InvokedTaskPointer, Promise, Status}; +use crate::workflow::pointer::{InvokedTaskPointer, Promise, Status, OK_BRANCH}; use anyhow::anyhow; use libipld::{ - cbor::DagCborCodec, cid::multibase::Base, prelude::Encode, serde as ipld_serde, Cid, Ipld, Link, + cbor::DagCborCodec, cid::multibase::Base, codec::Codec, serde::from_ipld, Cid, Ipld, Link, }; use multihash::{Code, MultihashDigest}; use std::{collections::btree_map::BTreeMap, convert::TryFrom, fmt}; use url::Url; +const WITH_KEY: &str = "with"; +const DO_KEY: &str = "do"; +const INPUTS_KEY: &str = "inputs"; + /// The suspended representation of the smallest unit of work in IPVM /// +/// # Example +/// /// ``` /// use libipld::Ipld; /// use url::Url; @@ -19,7 +25,7 @@ use url::Url; /// Closure { /// resource: Url::parse("ipfs://bafkreihf37goitzzlatlhwgiadb2wxkmn4k2edremzfjsm7qhnoxwlfstm").expect("IPFS URL"), /// action: Action::from("wasm/run"), -/// inputs: Input::from(Ipld::Null), +/// inputs: Input::try_from(Ipld::Null).unwrap(), /// }; /// ``` #[derive(Clone, Debug, PartialEq)] @@ -41,11 +47,11 @@ impl TryFrom for Link { type Error = anyhow::Error; fn try_from(closure: Closure) -> Result, Self::Error> { - let mut closure_bytes = Vec::new(); - >::into(closure).encode(DagCborCodec, &mut closure_bytes)?; + let ipld: Ipld = closure.into(); + let bytes = DagCborCodec.encode(&ipld)?; Ok(Link::new(Cid::new_v1( DagCborCodec.into(), - Code::Sha3_256.digest(&closure_bytes), + Code::Sha3_256.digest(&bytes), ))) } } @@ -53,9 +59,9 @@ impl TryFrom for Link { impl From for Ipld { fn from(closure: Closure) -> Self { Ipld::Map(BTreeMap::from([ - ("with".to_string(), Ipld::String(closure.resource.into())), - ("do ".to_string(), closure.action.into()), - ("inputs".to_string(), closure.inputs.into()), + (WITH_KEY.into(), Ipld::String(closure.resource.into())), + (DO_KEY.into(), closure.action.into()), + (INPUTS_KEY.into(), closure.inputs.into()), ])) } } @@ -64,27 +70,37 @@ impl TryFrom for Closure { type Error = anyhow::Error; fn try_from(ipld: Ipld) -> Result { - match ipld { - Ipld::Map(assoc) => Ok(Closure { - action: Action::try_from(assoc.get("do").ok_or(anyhow!("Bad"))?.clone()) - .or_else(|_| Err(anyhow!("Bad")))?, - - inputs: Input::from(assoc.get("inputs").ok_or(anyhow!("Bad"))?.clone()), - resource: match assoc.get("with").ok_or(anyhow!("Bad"))? { - Ipld::Link(cid) => cid - .to_string_of_base(Base::Base32HexLower) - .or(Err(anyhow!("Bad"))) - .and_then(|txt| { - Url::parse(format!("{}{}", "ipfs://", txt).as_str()) - .or(Err(anyhow!("Bad"))) - }), - Ipld::String(txt) => Url::parse(txt.as_str()).or(Err(anyhow!("Bad"))), - _ => Err(anyhow!("Bad")), - }?, - }), - - _ => Err(anyhow!("Bad")), - } + let map = from_ipld::>(ipld)?; + let action = Action::try_from( + map.get(DO_KEY) + .ok_or_else(|| anyhow!("No do action set."))? + .to_owned(), + )?; + let inputs = Input::try_from( + map.get(INPUTS_KEY) + .ok_or_else(|| anyhow!("No inputs key set."))? + .to_owned(), + )?; + + let resource = match map.get(WITH_KEY) { + Some(Ipld::Link(cid)) => cid + .to_string_of_base(Base::Base32HexLower) + .map_err(|e| anyhow!("Failed to encode CID into multibase string: {e}")) + .and_then(|txt| { + Url::parse(format!("{}{}", "ipfs://", txt).as_str()) + .map_err(|e| anyhow!("Failed to parse URL: {e}")) + }), + Some(Ipld::String(txt)) => { + Url::parse(txt.as_str()).map_err(|e| anyhow!("Failed to parse URL: {e}")) + } + _ => Err(anyhow!("No resource/with set.")), + }?; + + Ok(Closure { + resource, + action, + inputs, + }) } } @@ -115,31 +131,26 @@ impl From for Input { } } -impl From for Input { - fn from(ipld: Ipld) -> Input { - match ipld { - Ipld::Map(ref map) => { - if map.len() != 1 { - return Input::IpldData(ipld); - } - match map.get("ucan/ok") { - Some(Ipld::List(pointer)) => { - if let Ok(invoked_task) = - InvokedTaskPointer::try_from(Ipld::List(pointer.clone())) - { - Input::Deferred(Promise { - branch_selector: Some(Status::Success), - invoked_task, - }) - } else { - Input::IpldData(ipld) - } - } - - _ => Input::IpldData(ipld), - } - } - _ => Input::IpldData(ipld), +impl TryFrom for Input { + type Error = anyhow::Error; + + fn try_from(ipld: Ipld) -> Result { + let Ok(map) = from_ipld::>(ipld.clone()) else { + return Ok(Input::IpldData(ipld)) + }; + + if map.len() > 1 { + map.get(OK_BRANCH) + .map_or(Ok(Input::IpldData(ipld)), |ipld| { + let pointer = from_ipld(ipld.clone())?; + let invoked_task = InvokedTaskPointer::try_from(Ipld::List(pointer))?; + Ok(Input::Deferred(Promise { + result: Some(Status::Success), + invoked_task, + })) + }) + } else { + Ok(Input::IpldData(ipld)) } } } @@ -197,7 +208,7 @@ impl TryFrom for Action { type Error = anyhow::Error; fn try_from(ipld: Ipld) -> Result { - let action = ipld_serde::from_ipld::(ipld)?; + let action = from_ipld::(ipld)?; Ok(Action::from(action)) } } diff --git a/src/workflow/config.rs b/src/workflow/config.rs index bfe9d7d7..b495aa22 100644 --- a/src/workflow/config.rs +++ b/src/workflow/config.rs @@ -1,6 +1,7 @@ //! Configuration module -use libipld::{serde as ipld_serde, Ipld}; +use anyhow::anyhow; +use libipld::{serde::from_ipld, Ipld}; use std::{collections::BTreeMap, default::Default, time::Duration}; const FUEL_KEY: &str = "fuel"; @@ -72,8 +73,18 @@ impl TryFrom for Resources { type Error = anyhow::Error; fn try_from(ipld: Ipld) -> Result { - let fuel = ipld_serde::from_ipld(ipld.get(FUEL_KEY)?.to_owned())?; - let time = ipld_serde::from_ipld(ipld.take(TIMEOUT_KEY)?)?; + let map = from_ipld::>(ipld)?; + let fuel = from_ipld( + map.get(FUEL_KEY) + .ok_or_else(|| anyhow!("No fuel set."))? + .to_owned(), + )?; + + let time = from_ipld( + map.get(TIMEOUT_KEY) + .ok_or_else(|| anyhow!("No timeout set."))? + .to_owned(), + )?; Ok(Resources { fuel, time }) } diff --git a/src/workflow/invocation.rs b/src/workflow/invocation.rs index 8e36bc23..ca55ed25 100644 --- a/src/workflow/invocation.rs +++ b/src/workflow/invocation.rs @@ -1,11 +1,14 @@ use crate::workflow::{pointer::TaskLabel, task::Task}; -use anyhow::{anyhow, bail}; -use core::ops::ControlFlow; +use anyhow::anyhow; use derive_more::{Into, IntoIterator}; -use libipld::{Ipld, Link}; +use libipld::{cid::Cid, serde::from_ipld, Ipld, Link}; use std::collections::BTreeMap; use ucan::ipld::UcanIpld; +const RUN_KEY: &str = "run"; +const METADATA_KEY: &str = "meta"; +const PROOF_KEY: &str = "prf"; + #[derive(Clone, PartialEq)] pub struct Invocation { pub run: Batch, @@ -17,10 +20,10 @@ pub struct Invocation { impl From for Ipld { fn from(invocation: Invocation) -> Self { Ipld::Map(BTreeMap::from([ - ("run".to_string(), invocation.run.clone().into()), - ("meta".to_string(), invocation.meta), + (RUN_KEY.into(), invocation.run.clone().into()), + (METADATA_KEY.into(), invocation.meta), ( - "prf".to_string(), + PROOF_KEY.into(), Ipld::List( invocation .prf @@ -37,29 +40,30 @@ impl TryFrom for Invocation { type Error = anyhow::Error; fn try_from(ipld: Ipld) -> Result { - match ipld { - Ipld::Map(assoc) => Ok(Invocation { - meta: assoc.get("meta").map(Clone::clone).unwrap_or(Ipld::Null), - run: assoc - .get("run") - .ok_or(anyhow!("run field is empty")) - .and_then(Batch::try_from) - .unwrap(), - prf: match assoc.get("prf") { - Some(Ipld::List(vec)) => { - vec.iter().try_fold(Vec::new(), |mut acc, ipld| match ipld { - Ipld::Link(cid) => { - acc.push(Link::new(*cid)); - Ok(acc) - } - _ => bail!("Not a link"), - }) - } - other => bail!("Expected a List, but got something else: {:?}", other), - }?, - }), - other => bail!("Expected an IPLD map, but got {:?}", other), - } + let map = from_ipld::>(ipld)?; + let run = Batch::try_from( + map.get(RUN_KEY) + .ok_or_else(|| anyhow!("No run/batch set."))? + .to_owned(), + )?; + + let meta = map + .get(METADATA_KEY) + .ok_or_else(|| anyhow!("No metadata set."))? + .to_owned(); + + let prf = map + .get(PROOF_KEY) + .ok_or_else(|| anyhow!("No proof set."))? + .to_owned() + .iter() + .try_fold(vec![], |mut acc, ipld| { + let cid = from_ipld::(ipld.clone())?; + acc.push(Link::new(cid)); + Ok::<_, anyhow::Error>(acc) + })?; + + Ok(Invocation { meta, prf, run }) } } @@ -68,13 +72,15 @@ pub struct Batch(BTreeMap); impl From for Ipld { fn from(batch: Batch) -> Self { - let mut assoc = BTreeMap::new(); - - batch.0.iter().for_each(|(TaskLabel(label), task)| { - assoc.insert(label.clone(), task.clone().into()); - }); + let new_batch = batch + .0 + .iter() + .fold(BTreeMap::new(), |mut acc, (task_label, task)| { + acc.insert(task_label.label().into(), task.clone().into()); + acc + }); - Ipld::Map(assoc) + Ipld::Map(new_batch) } } @@ -90,26 +96,15 @@ impl TryFrom for Batch { type Error = anyhow::Error; fn try_from(ipld: Ipld) -> Result { - match ipld { - Ipld::Map(assoc) => { - let mut batch = BTreeMap::new(); - - let flow = assoc - .iter() - .try_for_each(|(key, value)| match Task::try_from(value) { - Ok(task) => { - batch.insert(TaskLabel(key.to_string()), task); - ControlFlow::Continue(()) - } - _ => ControlFlow::Break("invalid IPLD Task"), - }); - - match flow { - ControlFlow::Continue(_) => Ok(Batch(batch)), - ControlFlow::Break(reason) => bail!(reason), - } - } - _ => bail!("Can only convert from a map"), - } + let map = from_ipld::>(ipld)?; + let flow = map + .iter() + .try_fold(BTreeMap::new(), |mut acc, (key, value)| { + let task = Task::try_from(value)?; + acc.insert(TaskLabel::new(key.to_string()), task); + Ok::<_, anyhow::Error>(acc) + })?; + + Ok(Batch(flow)) } } diff --git a/src/workflow/pointer.rs b/src/workflow/pointer.rs index be1325af..53531a9f 100644 --- a/src/workflow/pointer.rs +++ b/src/workflow/pointer.rs @@ -1,10 +1,14 @@ //! Pointers to workflow types -use anyhow::{anyhow, bail, ensure}; -use cid::Cid; -use derive_more::Into; -use libipld::Ipld; + +use anyhow::{anyhow, ensure}; +use libipld::{cid::Cid, serde::from_ipld, Ipld}; use std::{collections::btree_map::BTreeMap, result::Result}; +/// Successful Promise result. +pub const OK_BRANCH: &str = "ucan/ok"; +const ERR_BRANCH: &str = "ucan/err"; +const PTR_BRANCH: &str = "ucan/ptr"; + /// A pointer to an unresolved `Invocation` and `Task`, /// optionally including the `Success` or `Failure` branch. #[derive(Clone, Debug, PartialEq, Eq)] @@ -13,15 +17,15 @@ pub struct Promise { pub invoked_task: InvokedTaskPointer, /// An optional narrowing to a particular [Status] branch. - pub branch_selector: Option, + pub result: Option, } impl From for Ipld { fn from(promise: Promise) -> Self { - let key: String = match promise.branch_selector { - Some(Status::Success) => "ucan/ok".to_string(), - Some(Status::Failure) => "ucan/err".to_string(), - None => "ucan/promise".to_string(), + let key: String = match promise.result { + Some(Status::Success) => OK_BRANCH.to_string(), + Some(Status::Failure) => ERR_BRANCH.to_string(), + None => PTR_BRANCH.to_string(), }; Ipld::Map(BTreeMap::from([(key, promise.invoked_task.into())])) @@ -32,27 +36,23 @@ impl TryFrom for Promise { type Error = anyhow::Error; fn try_from(ipld: Ipld) -> Result { - match ipld { - Ipld::Map(assoc) => { - ensure!(assoc.len() == 1, "Unexpected keys in Promise"); - - let (key, value) = assoc.iter().next().unwrap(); - let invoked_task = InvokedTaskPointer::try_from(value.clone())?; - - let branch_selector = match key.as_str() { - "ucan/ok" => Ok(Some(Status::Success)), - "ucan/err" => Ok(Some(Status::Failure)), - "ucan/promise" => Ok(None), - other => Err(anyhow!("Unexpected Promise branch selector: {}", other)), - }?; - - Ok(Promise { - invoked_task, - branch_selector, - }) - } - other => bail!("Promises must be a maps: {:?}", other), - } + let map = from_ipld::>(ipld)?; + ensure!(map.len() == 1, "Unexpected keys in Promise."); + + let (key, value) = map.iter().next().unwrap(); + let invoked_task = InvokedTaskPointer::try_from(value.clone())?; + + let result = match key.as_str() { + OK_BRANCH => Ok(Some(Status::Success)), + ERR_BRANCH => Ok(Some(Status::Failure)), + PTR_BRANCH => Ok(None), + other => Err(anyhow!("Unexpected Promise branch selector: {other}.")), + }?; + + Ok(Promise { + invoked_task, + result, + }) } } @@ -82,23 +82,29 @@ impl TryFrom for InvocationPointer { type Error = anyhow::Error; fn try_from(ipld: Ipld) -> Result { - match ipld { - Ipld::String(s) => match s.as_str() { - "/" => Ok(InvocationPointer::Local), - other => match Cid::try_from(other) { - Ok(cid) => Ok(InvocationPointer::Remote(cid)), - Err(_) => bail!("Not an InvocationPointer: {:?}", other), - }, - }, - _ => bail!("InvocationPointer must be a string"), + let s = from_ipld::(ipld)?; + + match s.as_str() { + "/" => Ok(InvocationPointer::Local), + other => Ok(InvocationPointer::Remote(Cid::try_from(other)?)), } } } #[derive(Clone, Debug, PartialEq, Eq)] pub struct InvokedTaskPointer { - pub invocation: InvocationPointer, - pub label: TaskLabel, + invocation: InvocationPointer, + label: TaskLabel, +} + +impl InvokedTaskPointer { + pub fn invocation(&self) -> &InvocationPointer { + &self.invocation + } + + pub fn label(&self) -> &TaskLabel { + &self.label + } } impl From for Ipld { @@ -111,34 +117,46 @@ impl TryFrom for InvokedTaskPointer { type Error = anyhow::Error; fn try_from(ipld: Ipld) -> Result { - match ipld { - Ipld::List(list) => match &list[..] { - [Ipld::String(s), Ipld::String(label)] => match s.as_str() { - "/" => Ok(InvokedTaskPointer { + let list: Vec = from_ipld(ipld)?; + + match &list[..] { + [Ipld::String(s), Ipld::String(label)] => { + if s.as_str() == "/" { + Ok(InvokedTaskPointer { invocation: InvocationPointer::Local, label: TaskLabel(label.to_string()), - }), - _ => bail!("Unexpected format for local InvokedTaskPointer"), - }, - - [Ipld::Link(ptr), Ipld::String(label)] => Ok(InvokedTaskPointer { - invocation: InvocationPointer::Remote(*ptr), - label: TaskLabel(label.to_string()), - }), - - _ => bail!("Unexpected number of segments in IPLD tuple"), - }, - _ => bail!("InvokedTaskPointer must be a List"), + }) + } else { + Err(anyhow!("Unexpected format for local InvokedTaskPointer")) + } + } + [Ipld::Link(ptr), Ipld::String(label)] => Ok(InvokedTaskPointer { + invocation: InvocationPointer::Remote(*ptr), + label: TaskLabel(label.to_string()), + }), + + _ => Err(anyhow!("Unexpected number of segments in IPLD tuple")), } } } -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Into)] -pub struct TaskLabel(pub String); +/// A Task label. +#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)] +pub struct TaskLabel(String); + +impl TaskLabel { + pub fn new(label: String) -> Self { + TaskLabel(label) + } + + pub fn label(&self) -> &str { + &self.0 + } +} impl From for Ipld { - fn from(label: TaskLabel) -> Self { - Ipld::String(label.0.to_string()) + fn from(label: TaskLabel) -> Ipld { + Ipld::String(label.0) } } @@ -146,9 +164,7 @@ impl TryFrom for TaskLabel { type Error = anyhow::Error; fn try_from(ipld: Ipld) -> Result { - match ipld { - Ipld::String(label) => Ok(TaskLabel(label)), - _ => bail!("TaskLabel must be a string"), - } + let label = from_ipld(ipld)?; + Ok(TaskLabel(label)) } } diff --git a/src/workflow/task.rs b/src/workflow/task.rs index a6092ba8..80c95fce 100644 --- a/src/workflow/task.rs +++ b/src/workflow/task.rs @@ -1,7 +1,7 @@ //! A [Closure] wrapped with Configuration, metadata, and optional secret. use crate::workflow::{closure::Closure, config::Resources}; -use libipld::{serde as ipld_serde, Ipld}; +use libipld::{serde::from_ipld, Ipld}; use std::collections::BTreeMap; const RESOURCES_KEY: &str = "resources"; @@ -78,7 +78,7 @@ impl TryFrom for Task { type Error = anyhow::Error; fn try_from(ipld: Ipld) -> Result { - let map = ipld_serde::from_ipld::>(ipld.clone())?; + let map = from_ipld::>(ipld.clone())?; let resources = map .get(RESOURCES_KEY) @@ -90,7 +90,7 @@ impl TryFrom for Task { metadata: map.get(META_KEY).unwrap_or(&Ipld::Null).to_owned(), secret: map .get(SECRETS_KEY) - .map(|ipld| ipld_serde::from_ipld(ipld.to_owned()).unwrap_or(false)), + .map(|ipld| from_ipld(ipld.to_owned()).unwrap_or(false)), }) } }