diff --git a/beetle/iroh-rpc-client/src/client.rs b/beetle/iroh-rpc-client/src/client.rs index 13b88fb7c..a95b41e17 100644 --- a/beetle/iroh-rpc-client/src/client.rs +++ b/beetle/iroh-rpc-client/src/client.rs @@ -2,7 +2,6 @@ use std::sync::atomic::AtomicUsize; use std::sync::Arc; use crate::config::Config; -use crate::gateway::GatewayClient; use crate::network::P2pClient; use crate::status::{ClientStatus, ServiceStatus, ServiceType}; use crate::store::StoreClient; @@ -11,7 +10,6 @@ use futures::{Stream, StreamExt}; #[derive(Debug, Clone)] pub struct Client { - pub gateway: Option, p2p: P2pLBClient, store: StoreLBClient, } @@ -85,22 +83,11 @@ impl P2pLBClient { impl Client { pub async fn new(cfg: Config) -> Result { let Config { - gateway_addr, p2p_addr, store_addr, channels, } = cfg; - let gateway = if let Some(addr) = gateway_addr { - Some( - GatewayClient::new(addr) - .await - .context("Could not create gateway rpc client")?, - ) - } else { - None - }; - let n_channels = channels.unwrap_or(1); let mut p2p = P2pLBClient::new(); @@ -123,34 +110,18 @@ impl Client { } } - Ok(Client { - gateway, - p2p, - store, - }) + Ok(Client { p2p, store }) } pub fn try_p2p(&self) -> Result { self.p2p.get().context("missing rpc p2p connnection") } - pub fn try_gateway(&self) -> Result<&GatewayClient> { - self.gateway - .as_ref() - .context("missing rpc gateway connnection") - } - pub fn try_store(&self) -> Result { self.store.get().context("missing rpc store connection") } pub async fn check(&self) -> crate::status::ClientStatus { - let g = if let Some(ref g) = self.gateway { - let (s, v) = g.check().await; - Some(ServiceStatus::new(ServiceType::Gateway, s, v)) - } else { - None - }; let p = if let Some(ref p) = self.p2p.get() { let (s, v) = p.check().await; Some(ServiceStatus::new(ServiceType::P2p, s, v)) @@ -163,7 +134,7 @@ impl Client { } else { None }; - ClientStatus::new(g, p, s) + ClientStatus::new(p, s) } pub async fn watch(self) -> impl Stream { @@ -171,11 +142,6 @@ impl Client { let mut status: ClientStatus = Default::default(); let mut streams = Vec::new(); - if let Some(ref g) = self.gateway { - let g = g.watch().await; - let g = g.map(|(status, version)| ServiceStatus::new(ServiceType::Gateway, status, version)); - streams.push(g.boxed()); - } if let Some(ref p) = self.p2p.get() { let p = p.watch().await; let p = p.map(|(status, version)| ServiceStatus::new(ServiceType::P2p, status, version)); diff --git a/beetle/iroh-rpc-client/src/config.rs b/beetle/iroh-rpc-client/src/config.rs index eb828707e..efba420d4 100644 --- a/beetle/iroh-rpc-client/src/config.rs +++ b/beetle/iroh-rpc-client/src/config.rs @@ -1,11 +1,9 @@ -use iroh_rpc_types::{gateway::GatewayAddr, p2p::P2pAddr, store::StoreAddr}; +use iroh_rpc_types::{p2p::P2pAddr, store::StoreAddr}; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)] /// Config for the rpc Client. pub struct Config { - /// Gateway rpc address. - pub gateway_addr: Option, /// P2p rpc address. pub p2p_addr: Option, /// Store rpc address. @@ -20,7 +18,6 @@ pub struct Config { impl Config { pub fn default_network() -> Self { Self { - gateway_addr: Some("irpc://127.0.0.1:4400".parse().unwrap()), p2p_addr: Some("irpc://127.0.0.1:4401".parse().unwrap()), store_addr: Some("irpc://127.0.0.1:4402".parse().unwrap()), // disable load balancing by default by just having 1 channel diff --git a/beetle/iroh-rpc-client/src/gateway.rs b/beetle/iroh-rpc-client/src/gateway.rs deleted file mode 100644 index a6008ef5e..000000000 --- a/beetle/iroh-rpc-client/src/gateway.rs +++ /dev/null @@ -1,59 +0,0 @@ -use std::fmt; - -use anyhow::Result; -use async_stream::stream; -use futures::{Stream, StreamExt}; -use iroh_rpc_types::{gateway::*, VersionRequest, WatchRequest}; - -use crate::{StatusType, HEALTH_POLL_WAIT}; - -#[derive(Clone)] -pub struct GatewayClient { - client: quic_rpc::RpcClient, -} - -impl fmt::Debug for GatewayClient { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("GatewayClient2") - .field("client", &self.client) - .finish() - } -} - -impl GatewayClient { - pub async fn new(addr: GatewayAddr) -> anyhow::Result { - let client = crate::open_client::(addr).await?; - Ok(Self { client }) - } - - #[tracing::instrument(skip(self))] - pub async fn version(&self) -> Result { - let res = self.client.rpc(VersionRequest).await?; - Ok(res.version) - } - - #[tracing::instrument(skip(self))] - pub async fn check(&self) -> (StatusType, String) { - match self.version().await { - Ok(version) => (StatusType::Serving, version), - Err(_) => (StatusType::Down, String::new()), - } - } - - #[tracing::instrument(skip(self))] - pub async fn watch(&self) -> impl Stream { - let client = self.client.clone(); - stream! { - loop { - let res = client.server_streaming(WatchRequest).await; - if let Ok(mut res) = res { - while let Some(Ok(version)) = res.next().await { - yield (StatusType::Serving, version.version); - } - } - yield (StatusType::Down, String::new()); - tokio::time::sleep(HEALTH_POLL_WAIT).await; - } - } - } -} diff --git a/beetle/iroh-rpc-client/src/lib.rs b/beetle/iroh-rpc-client/src/lib.rs index ec99005c5..1d4971a7b 100644 --- a/beetle/iroh-rpc-client/src/lib.rs +++ b/beetle/iroh-rpc-client/src/lib.rs @@ -1,13 +1,11 @@ pub mod client; pub mod config; -pub mod gateway; pub mod network; pub mod status; pub mod store; pub use self::config::Config; pub use client::Client; -pub use iroh_rpc_types::GossipsubEvent; -use iroh_rpc_types::{gateway::GatewayService, p2p::P2pService, store::StoreService, Addr}; +use iroh_rpc_types::{p2p::P2pService, store::StoreService, Addr}; pub use network::{Lookup, P2pClient}; use quic_rpc::{ transport::{combined, http2, CombinedChannelTypes, Http2ChannelTypes, MemChannelTypes}, @@ -36,7 +34,6 @@ pub type ServerSocket = (C::SendSink, C::RecvStream); pub type StoreServer = RpcServer; -pub type GatewayServer = RpcServer; pub type P2pServer = RpcServer; pub async fn create_server( diff --git a/beetle/iroh-rpc-client/src/network.rs b/beetle/iroh-rpc-client/src/network.rs index 20908b9f3..3549dbc56 100644 --- a/beetle/iroh-rpc-client/src/network.rs +++ b/beetle/iroh-rpc-client/src/network.rs @@ -3,8 +3,7 @@ use async_stream::stream; use bytes::Bytes; use cid::Cid; use futures::{Stream, StreamExt}; -use iroh_rpc_types::{p2p::*, GossipsubEvent, VersionRequest, WatchRequest}; -use libp2p::gossipsub::{MessageId, TopicHash}; +use iroh_rpc_types::{p2p::*, VersionRequest, WatchRequest}; use libp2p::{Multiaddr, PeerId}; use std::collections::{HashMap, HashSet}; use tracing::{debug, warn}; @@ -185,97 +184,6 @@ impl P2pClient { Ok(()) } - #[tracing::instrument(skip(self))] - pub async fn gossipsub_add_explicit_peer(&self, peer_id: PeerId) -> Result<()> { - self.client - .rpc(GossipsubAddExplicitPeerRequest { peer_id }) - .await??; - Ok(()) - } - - #[tracing::instrument(skip(self))] - pub async fn gossipsub_all_mesh_peers(&self) -> Result> { - let res = self.client.rpc(GossipsubAllMeshPeersRequest).await??; - Ok(res.peers) - } - - #[tracing::instrument(skip(self))] - pub async fn gossipsub_all_peers(&self) -> Result)>> { - let res = self.client.rpc(GossipsubAllPeersRequest).await??; - let res = res - .all - .into_iter() - .map(|(peer_id, topics)| { - let topics = topics.into_iter().map(TopicHash::from_raw).collect(); - Ok((peer_id, topics)) - }) - .collect::>()?; - Ok(res) - } - - #[tracing::instrument(skip(self))] - pub async fn gossipsub_mesh_peers(&self, topic: TopicHash) -> Result> { - let res = self - .client - .rpc(GossipsubMeshPeersRequest { - topic_hash: topic.to_string(), - }) - .await??; - Ok(res.peers) - } - - #[tracing::instrument(skip(self, data))] - pub async fn gossipsub_publish(&self, topic_hash: TopicHash, data: Bytes) -> Result { - let req = GossipsubPublishRequest { - topic_hash: topic_hash.to_string(), - data, - }; - let res = self.client.rpc(req).await??; - let message_id = MessageId::new(&res.message_id); - Ok(message_id) - } - - #[tracing::instrument(skip(self))] - pub async fn gossipsub_remove_explicit_peer(&self, peer_id: PeerId) -> Result<()> { - let req = GossipsubRemoveExplicitPeerRequest { peer_id }; - self.client.rpc(req).await??; - Ok(()) - } - - #[tracing::instrument(skip(self))] - pub async fn gossipsub_subscribe( - &self, - topic: TopicHash, - ) -> Result>> { - let res = self - .client - .server_streaming(GossipsubSubscribeRequest { - topic_hash: topic.to_string(), - }) - .await?; - let events = res.map(|e| { - let e = e?.event; - Ok(e) - }); - Ok(events) - } - - #[tracing::instrument(skip(self))] - pub async fn gossipsub_topics(&self) -> Result> { - let res = self.client.rpc(GossipsubTopicsRequest).await??; - let topics = res.topics.into_iter().map(TopicHash::from_raw).collect(); - Ok(topics) - } - - #[tracing::instrument(skip(self))] - pub async fn gossipsub_unsubscribe(&self, topic: TopicHash) -> Result { - let req = GossipsubUnsubscribeRequest { - topic_hash: topic.to_string(), - }; - let res = self.client.rpc(req).await??; - Ok(res.was_subscribed) - } - #[tracing::instrument(skip(self))] pub async fn check(&self) -> (StatusType, String) { match self.version().await { diff --git a/beetle/iroh-rpc-client/src/status.rs b/beetle/iroh-rpc-client/src/status.rs index 7b1c7e84f..2806d120e 100644 --- a/beetle/iroh-rpc-client/src/status.rs +++ b/beetle/iroh-rpc-client/src/status.rs @@ -25,7 +25,6 @@ pub struct ServiceStatus { #[derive(Debug, Clone, PartialEq, Eq)] pub enum ServiceType { - Gateway, P2p, Store, } @@ -33,7 +32,6 @@ pub enum ServiceType { impl ServiceType { pub fn name(&self) -> &'static str { match self { - ServiceType::Gateway => "gateway", ServiceType::P2p => "p2p", ServiceType::Store => "store", } @@ -68,21 +66,13 @@ impl ServiceStatus { #[derive(Debug, Clone, PartialEq, Eq)] pub struct ClientStatus { - pub gateway: ServiceStatus, pub p2p: ServiceStatus, pub store: ServiceStatus, } impl ClientStatus { - pub fn new( - gateway: Option, - p2p: Option, - store: Option, - ) -> Self { + pub fn new(p2p: Option, store: Option) -> Self { Self { - gateway: gateway.unwrap_or_else(|| { - ServiceStatus::new(ServiceType::Gateway, StatusType::Unknown, "") - }), p2p: p2p .unwrap_or_else(|| ServiceStatus::new(ServiceType::P2p, StatusType::Unknown, "")), store: store @@ -99,7 +89,6 @@ impl ClientStatus { pub fn update(&mut self, s: ServiceStatus) { match s.typ { - ServiceType::Gateway => self.gateway = s, ServiceType::P2p => self.p2p = s, ServiceType::Store => self.store = s, } @@ -119,7 +108,6 @@ impl Iterator for ClientStatusIterator<'_> { let current = match self.iter { 0 => Some(self.table.store.to_owned()), 1 => Some(self.table.p2p.to_owned()), - 2 => Some(self.table.gateway.to_owned()), _ => None, }; @@ -130,7 +118,7 @@ impl Iterator for ClientStatusIterator<'_> { impl Default for ClientStatus { fn default() -> Self { - Self::new(None, None, None) + Self::new(None, None) } } @@ -141,24 +129,19 @@ mod tests { #[test] fn service_status_new() { let expect = ServiceStatus { - typ: ServiceType::Gateway, + typ: ServiceType::P2p, status: StatusType::Serving, version: "0.1.0".to_string(), }; assert_eq!( expect, - ServiceStatus::new(ServiceType::Gateway, StatusType::Serving, "0.1.0") + ServiceStatus::new(ServiceType::P2p, StatusType::Serving, "0.1.0") ); } #[test] fn client_status_default() { let expect = ClientStatus { - gateway: ServiceStatus { - typ: ServiceType::Gateway, - status: StatusType::Unknown, - version: "".to_string(), - }, p2p: ServiceStatus { typ: ServiceType::P2p, status: StatusType::Unknown, @@ -177,11 +160,6 @@ mod tests { #[test] fn status_table_new() { let expect = ClientStatus { - gateway: ServiceStatus { - typ: ServiceType::Gateway, - status: StatusType::Unknown, - version: "test".to_string(), - }, p2p: ServiceStatus { typ: ServiceType::P2p, status: StatusType::Unknown, @@ -196,11 +174,6 @@ mod tests { assert_eq!( expect, ClientStatus::new( - Some(ServiceStatus::new( - ServiceType::Gateway, - StatusType::Unknown, - "test" - )), Some(ServiceStatus::new( ServiceType::P2p, StatusType::Unknown, @@ -217,11 +190,6 @@ mod tests { #[test] fn status_table_update() { - let gateway = Some(ServiceStatus::new( - ServiceType::Gateway, - StatusType::Unknown, - "0.1.0", - )); let mut p2p = Some(ServiceStatus::new( ServiceType::P2p, StatusType::Unknown, @@ -232,15 +200,15 @@ mod tests { StatusType::Unknown, "0.1.0", )); - let mut got = ClientStatus::new(gateway.clone(), p2p.clone(), store.clone()); + let mut got = ClientStatus::new(p2p.clone(), store.clone()); store.as_mut().unwrap().status = StatusType::Serving; - let expect = ClientStatus::new(gateway.clone(), p2p.clone(), store.clone()); + let expect = ClientStatus::new(p2p.clone(), store.clone()); got.update(store.clone().unwrap()); assert_eq!(expect, got); p2p.as_mut().unwrap().status = StatusType::Down; - let expect = ClientStatus::new(gateway, p2p.clone(), store); + let expect = ClientStatus::new(p2p.clone(), store); got.update(p2p.unwrap()); assert_eq!(expect, got); } @@ -261,11 +229,6 @@ mod tests { status: StatusType::Unknown, version: "".to_string(), }, - ServiceStatus { - typ: ServiceType::Gateway, - status: StatusType::Unknown, - version: "".to_string(), - }, ], rows ); diff --git a/beetle/iroh-rpc-types/src/addr.rs b/beetle/iroh-rpc-types/src/addr.rs index bb1c1dfc9..9bf0b618a 100644 --- a/beetle/iroh-rpc-types/src/addr.rs +++ b/beetle/iroh-rpc-types/src/addr.rs @@ -104,14 +104,14 @@ mod tests { #[test] fn test_addr_roundtrip_irpc_http2() { - use crate::gateway::GatewayAddr; + use crate::p2p::P2pAddr; use crate::Addr; use std::net::SocketAddr; let socket: SocketAddr = "198.168.2.1:1234".parse().unwrap(); let addr = Addr::Irpc(socket); - assert_eq!(addr.to_string().parse::().unwrap(), addr); + assert_eq!(addr.to_string().parse::().unwrap(), addr); assert_eq!(addr.to_string(), "irpc://198.168.2.1:1234"); } } diff --git a/beetle/iroh-rpc-types/src/gateway.rs b/beetle/iroh-rpc-types/src/gateway.rs deleted file mode 100644 index 1063ca027..000000000 --- a/beetle/iroh-rpc-types/src/gateway.rs +++ /dev/null @@ -1,44 +0,0 @@ -use derive_more::{From, TryInto}; -use quic_rpc::{ - message::{Msg, RpcMsg, ServerStreaming}, - Service, -}; -use serde::{Deserialize, Serialize}; - -use crate::{RpcResult, VersionRequest, VersionResponse, WatchRequest, WatchResponse}; - -/// Gateway address -pub type GatewayAddr = crate::addr::Addr; - -#[derive(Serialize, Deserialize, Debug, From, TryInto)] -pub enum GatewayRequest { - Watch(WatchRequest), - Version(VersionRequest), -} - -#[derive(Serialize, Deserialize, Debug, From, TryInto)] -pub enum GatewayResponse { - Watch(WatchResponse), - Version(VersionResponse), - UnitResult(RpcResult<()>), -} - -#[derive(Debug, Clone, Copy)] -pub struct GatewayService; - -impl Service for GatewayService { - type Req = GatewayRequest; - type Res = GatewayResponse; -} - -impl RpcMsg for VersionRequest { - type Response = VersionResponse; -} - -impl Msg for WatchRequest { - type Response = WatchResponse; - - type Update = Self; - - type Pattern = ServerStreaming; -} diff --git a/beetle/iroh-rpc-types/src/gossipsub_event.rs b/beetle/iroh-rpc-types/src/gossipsub_event.rs deleted file mode 100644 index a44e32cce..000000000 --- a/beetle/iroh-rpc-types/src/gossipsub_event.rs +++ /dev/null @@ -1,53 +0,0 @@ -use futures::stream::BoxStream; -use libp2p::{ - gossipsub::{Message, MessageId, TopicHash}, - PeerId, -}; -use serde::{Deserialize, Serialize}; - -use crate::p2p::GossipsubSubscribeResponse; - -pub type GossipsubEventStream = BoxStream<'static, Box>; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum GossipsubEvent { - Subscribed { - peer_id: PeerId, - #[serde(with = "TopicHashDef")] - topic: TopicHash, - }, - Unsubscribed { - peer_id: PeerId, - #[serde(with = "TopicHashDef")] - topic: TopicHash, - }, - Message { - from: PeerId, - id: MessageId, - #[serde(with = "GossipsubMessageDef")] - message: Message, - }, -} - -#[derive(Serialize, Deserialize)] -#[serde(remote = "TopicHash")] -struct TopicHashDef { - #[serde(getter = "TopicHash::to_string")] - hash: String, -} - -impl From for TopicHash { - fn from(t: TopicHashDef) -> Self { - TopicHash::from_raw(t.hash) - } -} - -#[derive(Serialize, Deserialize)] -#[serde(remote = "Message")] -struct GossipsubMessageDef { - source: Option, - data: Vec, - sequence_number: Option, - #[serde(with = "TopicHashDef")] - topic: TopicHash, -} diff --git a/beetle/iroh-rpc-types/src/lib.rs b/beetle/iroh-rpc-types/src/lib.rs index bc0a0f49a..2f9a3d6e2 100644 --- a/beetle/iroh-rpc-types/src/lib.rs +++ b/beetle/iroh-rpc-types/src/lib.rs @@ -1,13 +1,10 @@ pub mod addr; -pub mod gateway; -mod gossipsub_event; pub mod p2p; pub mod store; use std::fmt; pub use addr::Addr; -pub use gossipsub_event::{GossipsubEvent, GossipsubEventStream}; use serde::{Deserialize, Serialize}; diff --git a/beetle/iroh-rpc-types/src/p2p.rs b/beetle/iroh-rpc-types/src/p2p.rs index e076cc6c1..6ac6ee2a9 100644 --- a/beetle/iroh-rpc-types/src/p2p.rs +++ b/beetle/iroh-rpc-types/src/p2p.rs @@ -9,7 +9,6 @@ use quic_rpc::{ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; -use crate::GossipsubEvent; use crate::{RpcResult, VersionRequest, VersionResponse, WatchRequest, WatchResponse}; pub type P2pAddr = super::addr::Addr; @@ -145,76 +144,6 @@ pub struct LookupResponse { pub observed_addrs: Vec, } -#[derive(Serialize, Deserialize, Debug)] -pub struct GossipsubAddExplicitPeerRequest { - pub peer_id: PeerId, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct GossipsubAllMeshPeersRequest; - -#[derive(Serialize, Deserialize, Debug)] -pub struct GossipsubPeersResponse { - pub peers: Vec, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct GossipsubAllPeersRequest; - -#[derive(Serialize, Deserialize, Debug)] -pub struct GossipsubAllPeersResponse { - pub all: Vec<(PeerId, Vec)>, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct GossipsubMeshPeersRequest { - pub topic_hash: String, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct GossipsubPublishRequest { - pub topic_hash: String, - pub data: Bytes, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct GossipsubPublishResponse { - pub message_id: Bytes, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct GossipsubRemoveExplicitPeerRequest { - pub peer_id: PeerId, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct GossipsubSubscribeRequest { - pub topic_hash: String, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct GossipsubSubscribeResponse { - pub event: GossipsubEvent, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct GossipsubTopicsRequest; - -#[derive(Serialize, Deserialize, Debug)] -pub struct GossipsubTopicsResponse { - pub topics: Vec, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct GossipsubUnsubscribeRequest { - pub topic_hash: String, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct GossipsubUnsubscribeResponse { - pub was_subscribed: bool, -} - #[derive(Serialize, Deserialize, Debug, From, TryInto)] pub enum P2pRequest { Watch(WatchRequest), @@ -231,15 +160,6 @@ pub enum P2pRequest { PeerDisconnect(DisconnectRequest), Lookup(LookupRequest), LookupLocal(LookupLocalRequest), - GossipsubAddExplicitPeer(GossipsubAddExplicitPeerRequest), - GossipsubAllMeshPeers(GossipsubAllMeshPeersRequest), - GossipsubAllPeers(GossipsubAllPeersRequest), - GossipsubMeshPeers(GossipsubMeshPeersRequest), - GossipsubPublish(GossipsubPublishRequest), - GossipsubRemoveExplicitPeer(GossipsubRemoveExplicitPeerRequest), - GossipsubSubscribe(GossipsubSubscribeRequest), - GossipsubTopics(GossipsubTopicsRequest), - GossipsubUnsubscribe(GossipsubUnsubscribeRequest), StartProviding(StartProvidingRequest), StopProviding(StopProvidingRequest), LocalPeerId(LocalPeerIdRequest), @@ -256,12 +176,6 @@ pub enum P2pResponse { GetListeningAddrs(RpcResult), GetPeers(RpcResult), Lookup(RpcResult), - GossipsubPeers(RpcResult), - GossipsubAllPeers(RpcResult), - GossipsubPublish(RpcResult), - GossipsubSubscribe(Box), - GossipsubTopics(RpcResult), - GossipsubUnsubscribe(RpcResult), LocalPeerId(RpcResult), ExternalAddrs(RpcResult), Listeners(RpcResult), @@ -340,46 +254,6 @@ impl RpcMsg for LookupLocalRequest { type Response = RpcResult; } -impl RpcMsg for GossipsubAddExplicitPeerRequest { - type Response = RpcResult<()>; -} - -impl RpcMsg for GossipsubAllMeshPeersRequest { - type Response = RpcResult; -} - -impl RpcMsg for GossipsubMeshPeersRequest { - type Response = RpcResult; -} - -impl RpcMsg for GossipsubAllPeersRequest { - type Response = RpcResult; -} - -impl RpcMsg for GossipsubPublishRequest { - type Response = RpcResult; -} - -impl RpcMsg for GossipsubTopicsRequest { - type Response = RpcResult; -} - -impl Msg for GossipsubSubscribeRequest { - type Response = Box; - - type Update = Self; - - type Pattern = ServerStreaming; -} - -impl RpcMsg for GossipsubUnsubscribeRequest { - type Response = RpcResult; -} - -impl RpcMsg for GossipsubRemoveExplicitPeerRequest { - type Response = RpcResult<()>; -} - impl RpcMsg for StartProvidingRequest { type Response = RpcResult<()>; } diff --git a/ci-scripts/gen_kubo_rpc_server.sh b/ci-scripts/gen_kubo_rpc_server.sh index 71e71825b..3c54cbf79 100755 --- a/ci-scripts/gen_kubo_rpc_server.sh +++ b/ci-scripts/gen_kubo_rpc_server.sh @@ -20,19 +20,9 @@ npx @openapitools/openapi-generator-cli \ echo "#![allow(suspicious_double_ref_op)]" | cat - ./kubo-rpc-server/examples/server/server.rs > ./kubo-rpc-server/examples/server/server.rs.tmp mv ./kubo-rpc-server/examples/server/server.rs.tmp ./kubo-rpc-server/examples/server/server.rs +# Remove conversion feature from generated code because it doesn't build and we do not use it. +sed -i 's/conversion = .*//' ./kubo-rpc-server/Cargo.toml + # Format the generated code cargo fmt -p ceramic-kubo-rpc-server -# HACK: We patch the generated code to fix two issues: -# -# 1. Allow for a streaming response to pubsub subscribe endpoint -# 2. The `type` field name breaks the `conversion` feature of the generated code. It is renamed to `typ`. -# -# Steps to generate this patch file: -# 1. Get the code into the desired state and commit changes. -# 2. Comment out the following `patch` line and run the script. -# 3. Run `git diff -R kubo-rpc-server > ./kubo-rpc-server.patch`. -# 4. Manually edit the patch file to remove the patch of the date in the README file. -# 5. Run `git checkout kubo-rpc-server`. -# 6. Revert and run this script to validate it works. It should run without errors or warnings. -patch -p1 <./kubo-rpc-server.patch diff --git a/kubo-rpc-server.patch b/kubo-rpc-server.patch deleted file mode 100644 index c6a788f2b..000000000 --- a/kubo-rpc-server.patch +++ /dev/null @@ -1,125 +0,0 @@ -diff --git b/kubo-rpc-server/src/client/mod.rs a/kubo-rpc-server/src/client/mod.rs -index b248a1d..fc1f694 100644 ---- b/kubo-rpc-server/src/client/mod.rs -+++ a/kubo-rpc-server/src/client/mod.rs -@@ -1,7 +1,7 @@ - use async_trait::async_trait; - use futures::{ - future, future::BoxFuture, future::FutureExt, future::TryFutureExt, stream, stream::StreamExt, -- Stream, -+ Stream, TryStreamExt, - }; - use hyper::header::{HeaderName, HeaderValue, CONTENT_TYPE}; - use hyper::{service::Service, Body, Request, Response, Uri}; -@@ -1867,12 +1867,9 @@ where - match response.status().as_u16() { - 200 => { - let body = response.into_body(); -- let body = body -- .into_raw() -- .map_err(|e| ApiError(format!("Failed to read response: {}", e))) -- .await?; -- let body = swagger::ByteArray(body.to_vec()); -- Ok(PubsubSubPostResponse::Success(body)) -+ Ok(PubsubSubPostResponse::Success(Box::pin(body.map_err( -+ Box::::from, -+ )))) - } - 400 => { - let body = response.into_body(); -diff --git b/kubo-rpc-server/src/lib.rs a/kubo-rpc-server/src/lib.rs -index e6c7274..65fafc5 100644 ---- b/kubo-rpc-server/src/lib.rs -+++ a/kubo-rpc-server/src/lib.rs -@@ -11,7 +11,8 @@ - #![allow(clippy::derive_partial_eq_without_eq, clippy::disallowed_names)] - - use async_trait::async_trait; --use futures::Stream; -+use futures::{stream::BoxStream, Stream}; -+use hyper::body::Bytes; - use serde::{Deserialize, Serialize}; - use std::error::Error; - use std::task::{Context, Poll}; -@@ -132,15 +133,24 @@ pub enum PubsubPubPostResponse { - BadRequest(models::Error), - } - --#[derive(Debug, PartialEq, Serialize, Deserialize)] - #[must_use] - pub enum PubsubSubPostResponse { - /// success -- Success(swagger::ByteArray), -+ Success(BoxStream<'static, Result>>), -+ - /// bad request - BadRequest(models::Error), - } - -+impl std::fmt::Debug for PubsubSubPostResponse { -+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { -+ match self { -+ Self::Success(arg0) => f.debug_tuple("Success").finish(), -+ Self::BadRequest(arg0) => f.debug_tuple("BadRequest").field(arg0).finish(), -+ } -+ } -+} -+ - #[derive(Debug, PartialEq, Serialize, Deserialize)] - #[must_use] - pub enum SwarmConnectPostResponse { -diff --git b/kubo-rpc-server/src/models.rs a/kubo-rpc-server/src/models.rs -index 4644ce0..ce429ca 100644 ---- b/kubo-rpc-server/src/models.rs -+++ a/kubo-rpc-server/src/models.rs -@@ -909,7 +909,7 @@ pub struct Error { - pub code: f64, - - #[serde(rename = "Type")] -- pub r#type: String, -+ pub typ: String, - } - - impl Error { -@@ -918,7 +918,7 @@ impl Error { - Error { - message, - code, -- r#type, -+ typ: r#type, - } - } - } -@@ -934,7 +934,7 @@ impl std::string::ToString for Error { - Some("Code".to_string()), - Some(self.code.to_string()), - Some("Type".to_string()), -- Some(self.r#type.to_string()), -+ Some(self.typ.to_string()), - ]; - - params.into_iter().flatten().collect::>().join(",") -@@ -1012,7 +1012,7 @@ impl std::str::FromStr for Error { - .into_iter() - .next() - .ok_or_else(|| "Code missing in Error".to_string())?, -- r#type: intermediate_rep -+ typ: intermediate_rep - .r#type - .into_iter() - .next() -diff --git b/kubo-rpc-server/src/server/mod.rs a/kubo-rpc-server/src/server/mod.rs -index de4300c..f4f7ac3 100644 ---- b/kubo-rpc-server/src/server/mod.rs -+++ a/kubo-rpc-server/src/server/mod.rs -@@ -1507,8 +1507,8 @@ where - CONTENT_TYPE, - HeaderValue::from_str("application/octet-stream") - .expect("Unable to create Content-Type header for PUBSUB_SUB_POST_SUCCESS")); -- let body = body.0; -- *response.body_mut() = Body::from(body); -+ -+ *response.body_mut() = Body::wrap_stream(body); - } - PubsubSubPostResponse::BadRequest(body) => { - *response.status_mut() = StatusCode::from_u16(400) diff --git a/kubo-rpc-server/.openapi-generator/FILES b/kubo-rpc-server/.openapi-generator/FILES index 5cd5c8887..6df69a957 100644 --- a/kubo-rpc-server/.openapi-generator/FILES +++ b/kubo-rpc-server/.openapi-generator/FILES @@ -14,7 +14,7 @@ docs/Error.md docs/IdPost200Response.md docs/Multihash.md docs/PinAddPost200Response.md -docs/PubsubLsPost200Response.md +docs/SwarmConnectPost200Response.md docs/SwarmPeersPost200Response.md docs/SwarmPeersPost200ResponsePeersInner.md docs/VersionPost200Response.md diff --git a/kubo-rpc-server/Cargo.toml b/kubo-rpc-server/Cargo.toml index 9ee2218f8..c50bffdfe 100644 --- a/kubo-rpc-server/Cargo.toml +++ b/kubo-rpc-server/Cargo.toml @@ -18,7 +18,7 @@ server = [ "multipart", "multipart/server", "serde_ignored", "hyper", "regex", "percent-encoding", "url", "lazy_static" ] -conversion = ["frunk", "frunk_derives", "frunk_core", "frunk-enum-core", "frunk-enum-derive"] + [target.'cfg(any(target_os = "macos", target_os = "windows", target_os = "ios"))'.dependencies] native-tls = { version = "0.2", optional = true } diff --git a/kubo-rpc-server/README.md b/kubo-rpc-server/README.md index 38a1bf67a..d71716d29 100644 --- a/kubo-rpc-server/README.md +++ b/kubo-rpc-server/README.md @@ -15,7 +15,7 @@ To see how to make this your own, look here: [README]((https://openapi-generator.tech)) - API version: 0.9.0 -- Build date: 2023-11-13T20:39:18.296854378Z[Etc/UTC] +- Build date: 2023-12-22T09:16:10.851194724-07:00[America/Denver] @@ -72,9 +72,6 @@ cargo run --example client DagResolvePost cargo run --example client IdPost cargo run --example client PinAddPost cargo run --example client PinRmPost -cargo run --example client PubsubLsPost -cargo run --example client PubsubPubPost -cargo run --example client PubsubSubPost cargo run --example client SwarmConnectPost cargo run --example client SwarmPeersPost cargo run --example client VersionPost @@ -121,9 +118,6 @@ Method | HTTP request | Description [****](docs/default_api.md#) | **POST** /id | Report identifying information about a node [****](docs/default_api.md#) | **POST** /pin/add | Add a block to the pin store [****](docs/default_api.md#) | **POST** /pin/rm | Remove a block from the pin store -[****](docs/default_api.md#) | **POST** /pubsub/ls | List topic with active subscriptions -[****](docs/default_api.md#) | **POST** /pubsub/pub | Publish a message to a topic -[****](docs/default_api.md#) | **POST** /pubsub/sub | Subscribe to a topic, blocks until a message is received [****](docs/default_api.md#) | **POST** /swarm/connect | Connect to peers [****](docs/default_api.md#) | **POST** /swarm/peers | Report connected peers [****](docs/default_api.md#) | **POST** /version | Report server version @@ -142,7 +136,7 @@ Method | HTTP request | Description - [IdPost200Response](docs/IdPost200Response.md) - [Multihash](docs/Multihash.md) - [PinAddPost200Response](docs/PinAddPost200Response.md) - - [PubsubLsPost200Response](docs/PubsubLsPost200Response.md) + - [SwarmConnectPost200Response](docs/SwarmConnectPost200Response.md) - [SwarmPeersPost200Response](docs/SwarmPeersPost200Response.md) - [SwarmPeersPost200ResponsePeersInner](docs/SwarmPeersPost200ResponsePeersInner.md) - [VersionPost200Response](docs/VersionPost200Response.md) diff --git a/kubo-rpc-server/api/openapi.yaml b/kubo-rpc-server/api/openapi.yaml index b4e13d99f..8c5b5e861 100644 --- a/kubo-rpc-server/api/openapi.yaml +++ b/kubo-rpc-server/api/openapi.yaml @@ -341,74 +341,6 @@ paths: $ref: '#/components/schemas/Error' description: bad request summary: Remove a block from the pin store - /pubsub/ls: - post: - responses: - "200": - content: - application/json: - schema: - $ref: '#/components/schemas/_pubsub_ls_post_200_response' - description: success - "400": - content: - application/json: - schema: - $ref: '#/components/schemas/Error' - description: bad request - summary: List topic with active subscriptions - /pubsub/pub: - post: - parameters: - - description: Multibase encoded topic name - explode: true - in: query - name: arg - required: true - schema: - type: string - style: form - requestBody: - content: - multipart/form-data: - schema: - $ref: '#/components/schemas/_dag_put_post_request' - responses: - "200": - description: success - "400": - content: - application/json: - schema: - $ref: '#/components/schemas/Error' - description: bad request - summary: Publish a message to a topic - /pubsub/sub: - post: - parameters: - - description: Multibase encoded topic name - explode: true - in: query - name: arg - required: true - schema: - type: string - style: form - responses: - "200": - content: - application/octet-stream: - schema: - format: binary - type: string - description: success - "400": - content: - application/json: - schema: - $ref: '#/components/schemas/Error' - description: bad request - summary: "Subscribe to a topic, blocks until a message is received" /swarm/peers: post: responses: @@ -443,7 +375,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/_pubsub_ls_post_200_response' + $ref: '#/components/schemas/_swarm_connect_post_200_response' description: success "400": content: @@ -617,19 +549,6 @@ components: required: - Pins type: object - _pubsub_ls_post_200_response: - example: - Strings: - - Strings - - Strings - properties: - Strings: - items: - type: string - type: array - required: - - Strings - type: object _swarm_peers_post_200_response_Peers_inner: example: Peer: Peer @@ -658,6 +577,19 @@ components: required: - Peers type: object + _swarm_connect_post_200_response: + example: + Strings: + - Strings + - Strings + properties: + Strings: + items: + type: string + type: array + required: + - Strings + type: object _version_post_200_response: example: Commit: Commit diff --git a/kubo-rpc-server/docs/default_api.md b/kubo-rpc-server/docs/default_api.md index 58172a0d4..b71463129 100644 --- a/kubo-rpc-server/docs/default_api.md +++ b/kubo-rpc-server/docs/default_api.md @@ -14,9 +14,6 @@ Method | HTTP request | Description ****](default_api.md#) | **POST** /id | Report identifying information about a node ****](default_api.md#) | **POST** /pin/add | Add a block to the pin store ****](default_api.md#) | **POST** /pin/rm | Remove a block from the pin store -****](default_api.md#) | **POST** /pubsub/ls | List topic with active subscriptions -****](default_api.md#) | **POST** /pubsub/pub | Publish a message to a topic -****](default_api.md#) | **POST** /pubsub/sub | Subscribe to a topic, blocks until a message is received ****](default_api.md#) | **POST** /swarm/connect | Connect to peers ****](default_api.md#) | **POST** /swarm/peers | Report connected peers ****](default_api.md#) | **POST** /version | Report server version @@ -330,80 +327,7 @@ No authorization required [[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) # **** -> models::PubsubLsPost200Response () -List topic with active subscriptions - -### Required Parameters -This endpoint does not need any parameter. - -### Return type - -[**models::PubsubLsPost200Response**](_pubsub_ls_post_200_response.md) - -### Authorization - -No authorization required - -### HTTP request headers - - - **Content-Type**: Not defined - - **Accept**: application/json - -[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) - -# **** -> (arg, file) -Publish a message to a topic - -### Required Parameters - -Name | Type | Description | Notes -------------- | ------------- | ------------- | ------------- - **arg** | **String**| Multibase encoded topic name | - **file** | **swagger::ByteArray**| | - -### Return type - - (empty response body) - -### Authorization - -No authorization required - -### HTTP request headers - - - **Content-Type**: multipart/form-data - - **Accept**: application/json - -[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) - -# **** -> swagger::ByteArray (arg) -Subscribe to a topic, blocks until a message is received - -### Required Parameters - -Name | Type | Description | Notes -------------- | ------------- | ------------- | ------------- - **arg** | **String**| Multibase encoded topic name | - -### Return type - -[**swagger::ByteArray**](file.md) - -### Authorization - -No authorization required - -### HTTP request headers - - - **Content-Type**: Not defined - - **Accept**: application/json, application/octet-stream - -[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) - -# **** -> models::PubsubLsPost200Response (arg) +> models::SwarmConnectPost200Response (arg) Connect to peers ### Required Parameters @@ -414,7 +338,7 @@ Name | Type | Description | Notes ### Return type -[**models::PubsubLsPost200Response**](_pubsub_ls_post_200_response.md) +[**models::SwarmConnectPost200Response**](_swarm_connect_post_200_response.md) ### Authorization diff --git a/kubo-rpc-server/examples/client/main.rs b/kubo-rpc-server/examples/client/main.rs index c10c28026..8c6bc6dc2 100644 --- a/kubo-rpc-server/examples/client/main.rs +++ b/kubo-rpc-server/examples/client/main.rs @@ -5,8 +5,7 @@ use ceramic_kubo_rpc_server::{ models, Api, ApiNoContext, BlockGetPostResponse, BlockPutPostResponse, BlockStatPostResponse, Client, ContextWrapperExt, DagGetPostResponse, DagImportPostResponse, DagPutPostResponse, DagResolvePostResponse, IdPostResponse, PinAddPostResponse, PinRmPostResponse, - PubsubLsPostResponse, PubsubPubPostResponse, PubsubSubPostResponse, SwarmConnectPostResponse, - SwarmPeersPostResponse, VersionPostResponse, + SwarmConnectPostResponse, SwarmPeersPostResponse, VersionPostResponse, }; use clap::{App, Arg}; #[allow(unused_imports)] @@ -46,9 +45,6 @@ fn main() { "IdPost", "PinAddPost", "PinRmPost", - "PubsubLsPost", - "PubsubPubPost", - "PubsubSubPost", "SwarmConnectPost", "SwarmPeersPost", "VersionPost", @@ -202,33 +198,6 @@ fn main() { (client.context() as &dyn Has).get().clone() ); } - Some("PubsubLsPost") => { - let result = rt.block_on(client.pubsub_ls_post()); - info!( - "{:?} (X-Span-ID: {:?})", - result, - (client.context() as &dyn Has).get().clone() - ); - } - Some("PubsubPubPost") => { - let result = rt.block_on(client.pubsub_pub_post( - "arg_example".to_string(), - swagger::ByteArray(Vec::from("BYTE_ARRAY_DATA_HERE")), - )); - info!( - "{:?} (X-Span-ID: {:?})", - result, - (client.context() as &dyn Has).get().clone() - ); - } - Some("PubsubSubPost") => { - let result = rt.block_on(client.pubsub_sub_post("arg_example".to_string())); - info!( - "{:?} (X-Span-ID: {:?})", - result, - (client.context() as &dyn Has).get().clone() - ); - } Some("SwarmConnectPost") => { let result = rt.block_on(client.swarm_connect_post(&Vec::new())); info!( diff --git a/kubo-rpc-server/examples/server/server.rs b/kubo-rpc-server/examples/server/server.rs index 3a6ce19aa..f7504cf01 100644 --- a/kubo-rpc-server/examples/server/server.rs +++ b/kubo-rpc-server/examples/server/server.rs @@ -103,8 +103,8 @@ use ceramic_kubo_rpc_server::server::MakeService; use ceramic_kubo_rpc_server::{ Api, BlockGetPostResponse, BlockPutPostResponse, BlockStatPostResponse, DagGetPostResponse, DagImportPostResponse, DagPutPostResponse, DagResolvePostResponse, IdPostResponse, - PinAddPostResponse, PinRmPostResponse, PubsubLsPostResponse, PubsubPubPostResponse, - PubsubSubPostResponse, SwarmConnectPostResponse, SwarmPeersPostResponse, VersionPostResponse, + PinAddPostResponse, PinRmPostResponse, SwarmConnectPostResponse, SwarmPeersPostResponse, + VersionPostResponse, }; use std::error::Error; use swagger::ApiError; @@ -276,48 +276,6 @@ where Err(ApiError("Generic failure".into())) } - /// List topic with active subscriptions - async fn pubsub_ls_post(&self, context: &C) -> Result { - let context = context.clone(); - info!( - "pubsub_ls_post() - X-Span-ID: {:?}", - context.get().0.clone() - ); - Err(ApiError("Generic failure".into())) - } - - /// Publish a message to a topic - async fn pubsub_pub_post( - &self, - arg: String, - file: swagger::ByteArray, - context: &C, - ) -> Result { - let context = context.clone(); - info!( - "pubsub_pub_post(\"{}\", {:?}) - X-Span-ID: {:?}", - arg, - file, - context.get().0.clone() - ); - Err(ApiError("Generic failure".into())) - } - - /// Subscribe to a topic, blocks until a message is received - async fn pubsub_sub_post( - &self, - arg: String, - context: &C, - ) -> Result { - let context = context.clone(); - info!( - "pubsub_sub_post(\"{}\") - X-Span-ID: {:?}", - arg, - context.get().0.clone() - ); - Err(ApiError("Generic failure".into())) - } - /// Connect to peers async fn swarm_connect_post( &self, diff --git a/kubo-rpc-server/src/client/mod.rs b/kubo-rpc-server/src/client/mod.rs index fc1f69489..cdeb49a24 100644 --- a/kubo-rpc-server/src/client/mod.rs +++ b/kubo-rpc-server/src/client/mod.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use futures::{ future, future::BoxFuture, future::FutureExt, future::TryFutureExt, stream, stream::StreamExt, - Stream, TryStreamExt, + Stream, }; use hyper::header::{HeaderName, HeaderValue, CONTENT_TYPE}; use hyper::{service::Service, Body, Request, Response, Uri}; @@ -48,8 +48,8 @@ const ID_ENCODE_SET: &AsciiSet = &FRAGMENT_ENCODE_SET.add(b'|'); use crate::{ Api, BlockGetPostResponse, BlockPutPostResponse, BlockStatPostResponse, DagGetPostResponse, DagImportPostResponse, DagPutPostResponse, DagResolvePostResponse, IdPostResponse, - PinAddPostResponse, PinRmPostResponse, PubsubLsPostResponse, PubsubPubPostResponse, - PubsubSubPostResponse, SwarmConnectPostResponse, SwarmPeersPostResponse, VersionPostResponse, + PinAddPostResponse, PinRmPostResponse, SwarmConnectPostResponse, SwarmPeersPostResponse, + VersionPostResponse, }; /// Convert input into a base path, e.g. "http://example:123". Also checks the scheme as it goes. @@ -1580,329 +1580,6 @@ where } } - async fn pubsub_ls_post(&self, context: &C) -> Result { - let mut client_service = self.client_service.clone(); - let mut uri = format!("{}/api/v0/pubsub/ls", self.base_path); - - // Query parameters - let query_string = { - let mut query_string = form_urlencoded::Serializer::new("".to_owned()); - query_string.finish() - }; - if !query_string.is_empty() { - uri += "?"; - uri += &query_string; - } - - let uri = match Uri::from_str(&uri) { - Ok(uri) => uri, - Err(err) => return Err(ApiError(format!("Unable to build URI: {}", err))), - }; - - let mut request = match Request::builder() - .method("POST") - .uri(uri) - .body(Body::empty()) - { - Ok(req) => req, - Err(e) => return Err(ApiError(format!("Unable to create request: {}", e))), - }; - - let header = HeaderValue::from_str(Has::::get(context).0.as_str()); - request.headers_mut().insert( - HeaderName::from_static("x-span-id"), - match header { - Ok(h) => h, - Err(e) => { - return Err(ApiError(format!( - "Unable to create X-Span ID header value: {}", - e - ))) - } - }, - ); - - let response = client_service - .call((request, context.clone())) - .map_err(|e| ApiError(format!("No response received: {}", e))) - .await?; - - match response.status().as_u16() { - 200 => { - let body = response.into_body(); - let body = body - .into_raw() - .map_err(|e| ApiError(format!("Failed to read response: {}", e))) - .await?; - let body = str::from_utf8(&body) - .map_err(|e| ApiError(format!("Response was not valid UTF8: {}", e)))?; - let body = - serde_json::from_str::(body).map_err(|e| { - ApiError(format!("Response body did not match the schema: {}", e)) - })?; - Ok(PubsubLsPostResponse::Success(body)) - } - 400 => { - let body = response.into_body(); - let body = body - .into_raw() - .map_err(|e| ApiError(format!("Failed to read response: {}", e))) - .await?; - let body = str::from_utf8(&body) - .map_err(|e| ApiError(format!("Response was not valid UTF8: {}", e)))?; - let body = serde_json::from_str::(body).map_err(|e| { - ApiError(format!("Response body did not match the schema: {}", e)) - })?; - Ok(PubsubLsPostResponse::BadRequest(body)) - } - code => { - let headers = response.headers().clone(); - let body = response.into_body().take(100).into_raw().await; - Err(ApiError(format!( - "Unexpected response code {}:\n{:?}\n\n{}", - code, - headers, - match body { - Ok(body) => match String::from_utf8(body) { - Ok(body) => body, - Err(e) => format!("", e), - }, - Err(e) => format!("", e), - } - ))) - } - } - } - - async fn pubsub_pub_post( - &self, - param_arg: String, - param_file: swagger::ByteArray, - context: &C, - ) -> Result { - let mut client_service = self.client_service.clone(); - let mut uri = format!("{}/api/v0/pubsub/pub", self.base_path); - - // Query parameters - let query_string = { - let mut query_string = form_urlencoded::Serializer::new("".to_owned()); - query_string.append_pair("arg", ¶m_arg); - query_string.finish() - }; - if !query_string.is_empty() { - uri += "?"; - uri += &query_string; - } - - let uri = match Uri::from_str(&uri) { - Ok(uri) => uri, - Err(err) => return Err(ApiError(format!("Unable to build URI: {}", err))), - }; - - let mut request = match Request::builder() - .method("POST") - .uri(uri) - .body(Body::empty()) - { - Ok(req) => req, - Err(e) => return Err(ApiError(format!("Unable to create request: {}", e))), - }; - - let (body_string, multipart_header) = { - let mut multipart = Multipart::new(); - - // For each parameter, encode as appropriate and add to the multipart body as a stream. - - let file_vec = param_file.to_vec(); - - let file_mime = match mime_0_2::Mime::from_str("application/octet-stream") { - Ok(mime) => mime, - Err(err) => return Err(ApiError(format!("Unable to get mime type: {:?}", err))), - }; - - let file_cursor = Cursor::new(file_vec); - - let filename = None as Option<&str>; - multipart.add_stream("file", file_cursor, filename, Some(file_mime)); - - let mut fields = match multipart.prepare() { - Ok(fields) => fields, - Err(err) => return Err(ApiError(format!("Unable to build request: {}", err))), - }; - - let mut body_string = String::new(); - - match fields.read_to_string(&mut body_string) { - Ok(_) => (), - Err(err) => return Err(ApiError(format!("Unable to build body: {}", err))), - } - - let boundary = fields.boundary(); - - let multipart_header = format!("multipart/form-data;boundary={}", boundary); - - (body_string, multipart_header) - }; - - *request.body_mut() = Body::from(body_string); - - request.headers_mut().insert( - CONTENT_TYPE, - match HeaderValue::from_str(&multipart_header) { - Ok(h) => h, - Err(e) => { - return Err(ApiError(format!( - "Unable to create header: {} - {}", - multipart_header, e - ))) - } - }, - ); - - let header = HeaderValue::from_str(Has::::get(context).0.as_str()); - request.headers_mut().insert( - HeaderName::from_static("x-span-id"), - match header { - Ok(h) => h, - Err(e) => { - return Err(ApiError(format!( - "Unable to create X-Span ID header value: {}", - e - ))) - } - }, - ); - - let response = client_service - .call((request, context.clone())) - .map_err(|e| ApiError(format!("No response received: {}", e))) - .await?; - - match response.status().as_u16() { - 200 => Ok(PubsubPubPostResponse::Success), - 400 => { - let body = response.into_body(); - let body = body - .into_raw() - .map_err(|e| ApiError(format!("Failed to read response: {}", e))) - .await?; - let body = str::from_utf8(&body) - .map_err(|e| ApiError(format!("Response was not valid UTF8: {}", e)))?; - let body = serde_json::from_str::(body).map_err(|e| { - ApiError(format!("Response body did not match the schema: {}", e)) - })?; - Ok(PubsubPubPostResponse::BadRequest(body)) - } - code => { - let headers = response.headers().clone(); - let body = response.into_body().take(100).into_raw().await; - Err(ApiError(format!( - "Unexpected response code {}:\n{:?}\n\n{}", - code, - headers, - match body { - Ok(body) => match String::from_utf8(body) { - Ok(body) => body, - Err(e) => format!("", e), - }, - Err(e) => format!("", e), - } - ))) - } - } - } - - async fn pubsub_sub_post( - &self, - param_arg: String, - context: &C, - ) -> Result { - let mut client_service = self.client_service.clone(); - let mut uri = format!("{}/api/v0/pubsub/sub", self.base_path); - - // Query parameters - let query_string = { - let mut query_string = form_urlencoded::Serializer::new("".to_owned()); - query_string.append_pair("arg", ¶m_arg); - query_string.finish() - }; - if !query_string.is_empty() { - uri += "?"; - uri += &query_string; - } - - let uri = match Uri::from_str(&uri) { - Ok(uri) => uri, - Err(err) => return Err(ApiError(format!("Unable to build URI: {}", err))), - }; - - let mut request = match Request::builder() - .method("POST") - .uri(uri) - .body(Body::empty()) - { - Ok(req) => req, - Err(e) => return Err(ApiError(format!("Unable to create request: {}", e))), - }; - - let header = HeaderValue::from_str(Has::::get(context).0.as_str()); - request.headers_mut().insert( - HeaderName::from_static("x-span-id"), - match header { - Ok(h) => h, - Err(e) => { - return Err(ApiError(format!( - "Unable to create X-Span ID header value: {}", - e - ))) - } - }, - ); - - let response = client_service - .call((request, context.clone())) - .map_err(|e| ApiError(format!("No response received: {}", e))) - .await?; - - match response.status().as_u16() { - 200 => { - let body = response.into_body(); - Ok(PubsubSubPostResponse::Success(Box::pin(body.map_err( - Box::::from, - )))) - } - 400 => { - let body = response.into_body(); - let body = body - .into_raw() - .map_err(|e| ApiError(format!("Failed to read response: {}", e))) - .await?; - let body = str::from_utf8(&body) - .map_err(|e| ApiError(format!("Response was not valid UTF8: {}", e)))?; - let body = serde_json::from_str::(body).map_err(|e| { - ApiError(format!("Response body did not match the schema: {}", e)) - })?; - Ok(PubsubSubPostResponse::BadRequest(body)) - } - code => { - let headers = response.headers().clone(); - let body = response.into_body().take(100).into_raw().await; - Err(ApiError(format!( - "Unexpected response code {}:\n{:?}\n\n{}", - code, - headers, - match body { - Ok(body) => match String::from_utf8(body) { - Ok(body) => body, - Err(e) => format!("", e), - }, - Err(e) => format!("", e), - } - ))) - } - } - } - async fn swarm_connect_post( &self, param_arg: &Vec, @@ -1971,8 +1648,8 @@ where .await?; let body = str::from_utf8(&body) .map_err(|e| ApiError(format!("Response was not valid UTF8: {}", e)))?; - let body = - serde_json::from_str::(body).map_err(|e| { + let body = serde_json::from_str::(body) + .map_err(|e| { ApiError(format!("Response body did not match the schema: {}", e)) })?; Ok(SwarmConnectPostResponse::Success(body)) diff --git a/kubo-rpc-server/src/lib.rs b/kubo-rpc-server/src/lib.rs index de87d5d53..c6781dde8 100644 --- a/kubo-rpc-server/src/lib.rs +++ b/kubo-rpc-server/src/lib.rs @@ -11,8 +11,7 @@ #![allow(clippy::derive_partial_eq_without_eq, clippy::disallowed_names)] use async_trait::async_trait; -use futures::{stream::BoxStream, Stream}; -use hyper::body::Bytes; +use futures::Stream; use serde::{Deserialize, Serialize}; use std::error::Error; use std::task::{Context, Poll}; @@ -115,47 +114,11 @@ pub enum PinRmPostResponse { BadRequest(models::Error), } -#[derive(Debug, PartialEq, Serialize, Deserialize)] -#[must_use] -pub enum PubsubLsPostResponse { - /// success - Success(models::PubsubLsPost200Response), - /// bad request - BadRequest(models::Error), -} - -#[derive(Debug, PartialEq, Serialize, Deserialize)] -#[must_use] -pub enum PubsubPubPostResponse { - /// success - Success, - /// bad request - BadRequest(models::Error), -} - -#[must_use] -pub enum PubsubSubPostResponse { - /// success - Success(BoxStream<'static, Result>>), - - /// bad request - BadRequest(models::Error), -} - -impl std::fmt::Debug for PubsubSubPostResponse { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Success(arg0) => f.debug_tuple("Success").finish(), - Self::BadRequest(arg0) => f.debug_tuple("BadRequest").field(arg0).finish(), - } - } -} - #[derive(Debug, PartialEq, Serialize, Deserialize)] #[must_use] pub enum SwarmConnectPostResponse { /// success - Success(models::PubsubLsPost200Response), + Success(models::SwarmConnectPost200Response), /// bad request BadRequest(models::Error), } @@ -261,24 +224,6 @@ pub trait Api { /// Remove a block from the pin store async fn pin_rm_post(&self, arg: String, context: &C) -> Result; - /// List topic with active subscriptions - async fn pubsub_ls_post(&self, context: &C) -> Result; - - /// Publish a message to a topic - async fn pubsub_pub_post( - &self, - arg: String, - file: swagger::ByteArray, - context: &C, - ) -> Result; - - /// Subscribe to a topic, blocks until a message is received - async fn pubsub_sub_post( - &self, - arg: String, - context: &C, - ) -> Result; - /// Connect to peers async fn swarm_connect_post( &self, @@ -362,19 +307,6 @@ pub trait ApiNoContext { /// Remove a block from the pin store async fn pin_rm_post(&self, arg: String) -> Result; - /// List topic with active subscriptions - async fn pubsub_ls_post(&self) -> Result; - - /// Publish a message to a topic - async fn pubsub_pub_post( - &self, - arg: String, - file: swagger::ByteArray, - ) -> Result; - - /// Subscribe to a topic, blocks until a message is received - async fn pubsub_sub_post(&self, arg: String) -> Result; - /// Connect to peers async fn swarm_connect_post( &self, @@ -509,28 +441,6 @@ impl + Send + Sync, C: Clone + Send + Sync> ApiNoContext for Contex self.api().pin_rm_post(arg, &context).await } - /// List topic with active subscriptions - async fn pubsub_ls_post(&self) -> Result { - let context = self.context().clone(); - self.api().pubsub_ls_post(&context).await - } - - /// Publish a message to a topic - async fn pubsub_pub_post( - &self, - arg: String, - file: swagger::ByteArray, - ) -> Result { - let context = self.context().clone(); - self.api().pubsub_pub_post(arg, file, &context).await - } - - /// Subscribe to a topic, blocks until a message is received - async fn pubsub_sub_post(&self, arg: String) -> Result { - let context = self.context().clone(); - self.api().pubsub_sub_post(arg, &context).await - } - /// Connect to peers async fn swarm_connect_post( &self, diff --git a/kubo-rpc-server/src/models.rs b/kubo-rpc-server/src/models.rs index ce429cafd..4235e7d44 100644 --- a/kubo-rpc-server/src/models.rs +++ b/kubo-rpc-server/src/models.rs @@ -909,7 +909,7 @@ pub struct Error { pub code: f64, #[serde(rename = "Type")] - pub typ: String, + pub r#type: String, } impl Error { @@ -918,7 +918,7 @@ impl Error { Error { message, code, - typ: r#type, + r#type, } } } @@ -934,7 +934,7 @@ impl std::string::ToString for Error { Some("Code".to_string()), Some(self.code.to_string()), Some("Type".to_string()), - Some(self.typ.to_string()), + Some(self.r#type.to_string()), ]; params.into_iter().flatten().collect::>().join(",") @@ -1012,7 +1012,7 @@ impl std::str::FromStr for Error { .into_iter() .next() .ok_or_else(|| "Code missing in Error".to_string())?, - typ: intermediate_rep + r#type: intermediate_rep .r#type .into_iter() .next() @@ -1456,22 +1456,22 @@ impl std::convert::TryFrom #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] #[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] -pub struct PubsubLsPost200Response { +pub struct SwarmConnectPost200Response { #[serde(rename = "Strings")] pub strings: Vec, } -impl PubsubLsPost200Response { +impl SwarmConnectPost200Response { #[allow(clippy::new_without_default)] - pub fn new(strings: Vec) -> PubsubLsPost200Response { - PubsubLsPost200Response { strings } + pub fn new(strings: Vec) -> SwarmConnectPost200Response { + SwarmConnectPost200Response { strings } } } -/// Converts the PubsubLsPost200Response value to the Query Parameters representation (style=form, explode=false) +/// Converts the SwarmConnectPost200Response value to the Query Parameters representation (style=form, explode=false) /// specified in https://swagger.io/docs/specification/serialization/ /// Should be implemented in a serde serializer -impl std::string::ToString for PubsubLsPost200Response { +impl std::string::ToString for SwarmConnectPost200Response { fn to_string(&self) -> String { let params: Vec> = vec![ Some("Strings".to_string()), @@ -1488,10 +1488,10 @@ impl std::string::ToString for PubsubLsPost200Response { } } -/// Converts Query Parameters representation (style=form, explode=false) to a PubsubLsPost200Response value +/// Converts Query Parameters representation (style=form, explode=false) to a SwarmConnectPost200Response value /// as specified in https://swagger.io/docs/specification/serialization/ /// Should be implemented in a serde deserializer -impl std::str::FromStr for PubsubLsPost200Response { +impl std::str::FromStr for SwarmConnectPost200Response { type Err = String; fn from_str(s: &str) -> std::result::Result { @@ -1513,7 +1513,7 @@ impl std::str::FromStr for PubsubLsPost200Response { Some(x) => x, None => { return std::result::Result::Err( - "Missing value while parsing PubsubLsPost200Response".to_string(), + "Missing value while parsing SwarmConnectPost200Response".to_string(), ) } }; @@ -1521,8 +1521,8 @@ impl std::str::FromStr for PubsubLsPost200Response { if let Some(key) = key_result { #[allow(clippy::match_single_binding)] match key { - "Strings" => return std::result::Result::Err("Parsing a container in this style is not supported in PubsubLsPost200Response".to_string()), - _ => return std::result::Result::Err("Unexpected key while parsing PubsubLsPost200Response".to_string()) + "Strings" => return std::result::Result::Err("Parsing a container in this style is not supported in SwarmConnectPost200Response".to_string()), + _ => return std::result::Result::Err("Unexpected key while parsing SwarmConnectPost200Response".to_string()) } } @@ -1531,32 +1531,32 @@ impl std::str::FromStr for PubsubLsPost200Response { } // Use the intermediate representation to return the struct - std::result::Result::Ok(PubsubLsPost200Response { + std::result::Result::Ok(SwarmConnectPost200Response { strings: intermediate_rep .strings .into_iter() .next() - .ok_or_else(|| "Strings missing in PubsubLsPost200Response".to_string())?, + .ok_or_else(|| "Strings missing in SwarmConnectPost200Response".to_string())?, }) } } -// Methods for converting between header::IntoHeaderValue and hyper::header::HeaderValue +// Methods for converting between header::IntoHeaderValue and hyper::header::HeaderValue #[cfg(any(feature = "client", feature = "server"))] -impl std::convert::TryFrom> +impl std::convert::TryFrom> for hyper::header::HeaderValue { type Error = String; fn try_from( - hdr_value: header::IntoHeaderValue, + hdr_value: header::IntoHeaderValue, ) -> std::result::Result { let hdr_value = hdr_value.to_string(); match hyper::header::HeaderValue::from_str(&hdr_value) { std::result::Result::Ok(value) => std::result::Result::Ok(value), std::result::Result::Err(e) => std::result::Result::Err(format!( - "Invalid header value for PubsubLsPost200Response - value: {} is invalid {}", + "Invalid header value for SwarmConnectPost200Response - value: {} is invalid {}", hdr_value, e )), } @@ -1565,19 +1565,19 @@ impl std::convert::TryFrom> #[cfg(any(feature = "client", feature = "server"))] impl std::convert::TryFrom - for header::IntoHeaderValue + for header::IntoHeaderValue { type Error = String; fn try_from(hdr_value: hyper::header::HeaderValue) -> std::result::Result { match hdr_value.to_str() { std::result::Result::Ok(value) => { - match ::from_str(value) { + match ::from_str(value) { std::result::Result::Ok(value) => { std::result::Result::Ok(header::IntoHeaderValue(value)) } std::result::Result::Err(err) => std::result::Result::Err(format!( - "Unable to convert header value '{}' into PubsubLsPost200Response - {}", + "Unable to convert header value '{}' into SwarmConnectPost200Response - {}", value, err )), } diff --git a/kubo-rpc-server/src/server/mod.rs b/kubo-rpc-server/src/server/mod.rs index f4f7ac396..75f47d9b6 100644 --- a/kubo-rpc-server/src/server/mod.rs +++ b/kubo-rpc-server/src/server/mod.rs @@ -26,8 +26,8 @@ type ServiceFuture = BoxFuture<'static, Result, crate::ServiceErr use crate::{ Api, BlockGetPostResponse, BlockPutPostResponse, BlockStatPostResponse, DagGetPostResponse, DagImportPostResponse, DagPutPostResponse, DagResolvePostResponse, IdPostResponse, - PinAddPostResponse, PinRmPostResponse, PubsubLsPostResponse, PubsubPubPostResponse, - PubsubSubPostResponse, SwarmConnectPostResponse, SwarmPeersPostResponse, VersionPostResponse, + PinAddPostResponse, PinRmPostResponse, SwarmConnectPostResponse, SwarmPeersPostResponse, + VersionPostResponse, }; mod paths { @@ -45,9 +45,6 @@ mod paths { r"^/api/v0/id$", r"^/api/v0/pin/add$", r"^/api/v0/pin/rm$", - r"^/api/v0/pubsub/ls$", - r"^/api/v0/pubsub/pub$", - r"^/api/v0/pubsub/sub$", r"^/api/v0/swarm/connect$", r"^/api/v0/swarm/peers$", r"^/api/v0/version$" @@ -64,12 +61,9 @@ mod paths { pub(crate) static ID_ID: usize = 7; pub(crate) static ID_PIN_ADD: usize = 8; pub(crate) static ID_PIN_RM: usize = 9; - pub(crate) static ID_PUBSUB_LS: usize = 10; - pub(crate) static ID_PUBSUB_PUB: usize = 11; - pub(crate) static ID_PUBSUB_SUB: usize = 12; - pub(crate) static ID_SWARM_CONNECT: usize = 13; - pub(crate) static ID_SWARM_PEERS: usize = 14; - pub(crate) static ID_VERSION: usize = 15; + pub(crate) static ID_SWARM_CONNECT: usize = 10; + pub(crate) static ID_SWARM_PEERS: usize = 11; + pub(crate) static ID_VERSION: usize = 12; } pub struct MakeService @@ -1275,264 +1269,6 @@ where Ok(response) } - // PubsubLsPost - POST /pubsub/ls - hyper::Method::POST if path.matched(paths::ID_PUBSUB_LS) => { - let result = api_impl.pubsub_ls_post(&context).await; - let mut response = Response::new(Body::empty()); - response.headers_mut().insert( - HeaderName::from_static("x-span-id"), - HeaderValue::from_str( - (&context as &dyn Has) - .get() - .0 - .clone() - .as_str(), - ) - .expect("Unable to create X-Span-ID header value"), - ); - - match result { - Ok(rsp) => match rsp { - PubsubLsPostResponse::Success(body) => { - *response.status_mut() = StatusCode::from_u16(200) - .expect("Unable to turn 200 into a StatusCode"); - response.headers_mut().insert( - CONTENT_TYPE, - HeaderValue::from_str("application/json") - .expect("Unable to create Content-Type header for PUBSUB_LS_POST_SUCCESS")); - let body = serde_json::to_string(&body) - .expect("impossible to fail to serialize"); - *response.body_mut() = Body::from(body); - } - PubsubLsPostResponse::BadRequest(body) => { - *response.status_mut() = StatusCode::from_u16(400) - .expect("Unable to turn 400 into a StatusCode"); - response.headers_mut().insert( - CONTENT_TYPE, - HeaderValue::from_str("application/json") - .expect("Unable to create Content-Type header for PUBSUB_LS_POST_BAD_REQUEST")); - let body = serde_json::to_string(&body) - .expect("impossible to fail to serialize"); - *response.body_mut() = Body::from(body); - } - }, - Err(_) => { - // Application code returned an error. This should not happen, as the implementation should - // return a valid response. - *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - *response.body_mut() = Body::from("An internal error occurred"); - } - } - - Ok(response) - } - - // PubsubPubPost - POST /pubsub/pub - hyper::Method::POST if path.matched(paths::ID_PUBSUB_PUB) => { - let boundary = - match swagger::multipart::form::boundary(&headers) { - Some(boundary) => boundary.to_string(), - None => return Ok(Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::from("Couldn't find valid multipart body".to_string())) - .expect( - "Unable to create Bad Request response for incorrect boundary", - )), - }; - - // Query parameters (note that non-required or collection query parameters will ignore garbage values, rather than causing a 400 response) - let query_params = - form_urlencoded::parse(uri.query().unwrap_or_default().as_bytes()) - .collect::>(); - let param_arg = query_params - .iter() - .filter(|e| e.0 == "arg") - .map(|e| e.1.clone()) - .next(); - let param_arg = match param_arg { - Some(param_arg) => { - let param_arg = ::from_str(¶m_arg); - match param_arg { - Ok(param_arg) => Some(param_arg), - Err(e) => return Ok(Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::from(format!("Couldn't parse query parameter arg - doesn't match schema: {}", e))) - .expect("Unable to create Bad Request response for invalid query parameter arg")), - } - } - None => None, - }; - let param_arg = match param_arg { - Some(param_arg) => param_arg, - None => return Ok(Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::from("Missing required query parameter arg")) - .expect("Unable to create Bad Request response for missing query parameter arg")), - }; - - // Form Body parameters (note that non-required body parameters will ignore garbage - // values, rather than causing a 400 response). Produce warning header and logs for - // any unused fields. - let result = body.into_raw(); - match result.await { - Ok(body) => { - use std::io::Read; - - // Read Form Parameters from body - let mut entries = match Multipart::with_body(&body.to_vec()[..], boundary).save().temp() { - SaveResult::Full(entries) => { - entries - }, - _ => { - return Ok(Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::from("Unable to process all message parts".to_string())) - .expect("Unable to create Bad Request response due to failure to process all message")) - }, - }; - let field_file = entries.fields.remove("file"); - let param_file = match field_file { - Some(field) => { - let mut reader = field[0].data.readable().expect("Unable to read field for file"); - let mut data = vec![]; - reader.read_to_end(&mut data).expect("Reading saved binary data should never fail"); - swagger::ByteArray(data) - }, - None => { - return Ok( - Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::from("Missing required form parameter file".to_string())) - .expect("Unable to create Bad Request due to missing required form parameter file")) - } - }; - let result = api_impl.pubsub_pub_post( - param_arg, - param_file, - &context - ).await; - let mut response = Response::new(Body::empty()); - response.headers_mut().insert( - HeaderName::from_static("x-span-id"), - HeaderValue::from_str((&context as &dyn Has).get().0.clone().as_str()) - .expect("Unable to create X-Span-ID header value")); - - match result { - Ok(rsp) => match rsp { - PubsubPubPostResponse::Success - => { - *response.status_mut() = StatusCode::from_u16(200).expect("Unable to turn 200 into a StatusCode"); - }, - PubsubPubPostResponse::BadRequest - (body) - => { - *response.status_mut() = StatusCode::from_u16(400).expect("Unable to turn 400 into a StatusCode"); - response.headers_mut().insert( - CONTENT_TYPE, - HeaderValue::from_str("application/json") - .expect("Unable to create Content-Type header for PUBSUB_PUB_POST_BAD_REQUEST")); - let body = serde_json::to_string(&body).expect("impossible to fail to serialize"); - *response.body_mut() = Body::from(body); - }, - }, - Err(_) => { - // Application code returned an error. This should not happen, as the implementation should - // return a valid response. - *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - *response.body_mut() = Body::from("An internal error occurred"); - }, - } - - Ok(response) - }, - Err(e) => Ok(Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::from("Couldn't read multipart body".to_string())) - .expect("Unable to create Bad Request response due to unable read multipart body")), - } - } - - // PubsubSubPost - POST /pubsub/sub - hyper::Method::POST if path.matched(paths::ID_PUBSUB_SUB) => { - // Query parameters (note that non-required or collection query parameters will ignore garbage values, rather than causing a 400 response) - let query_params = - form_urlencoded::parse(uri.query().unwrap_or_default().as_bytes()) - .collect::>(); - let param_arg = query_params - .iter() - .filter(|e| e.0 == "arg") - .map(|e| e.1.clone()) - .next(); - let param_arg = match param_arg { - Some(param_arg) => { - let param_arg = ::from_str(¶m_arg); - match param_arg { - Ok(param_arg) => Some(param_arg), - Err(e) => return Ok(Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::from(format!("Couldn't parse query parameter arg - doesn't match schema: {}", e))) - .expect("Unable to create Bad Request response for invalid query parameter arg")), - } - } - None => None, - }; - let param_arg = match param_arg { - Some(param_arg) => param_arg, - None => return Ok(Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::from("Missing required query parameter arg")) - .expect("Unable to create Bad Request response for missing query parameter arg")), - }; - - let result = api_impl.pubsub_sub_post(param_arg, &context).await; - let mut response = Response::new(Body::empty()); - response.headers_mut().insert( - HeaderName::from_static("x-span-id"), - HeaderValue::from_str( - (&context as &dyn Has) - .get() - .0 - .clone() - .as_str(), - ) - .expect("Unable to create X-Span-ID header value"), - ); - - match result { - Ok(rsp) => match rsp { - PubsubSubPostResponse::Success(body) => { - *response.status_mut() = StatusCode::from_u16(200) - .expect("Unable to turn 200 into a StatusCode"); - response.headers_mut().insert( - CONTENT_TYPE, - HeaderValue::from_str("application/octet-stream") - .expect("Unable to create Content-Type header for PUBSUB_SUB_POST_SUCCESS")); - - *response.body_mut() = Body::wrap_stream(body); - } - PubsubSubPostResponse::BadRequest(body) => { - *response.status_mut() = StatusCode::from_u16(400) - .expect("Unable to turn 400 into a StatusCode"); - response.headers_mut().insert( - CONTENT_TYPE, - HeaderValue::from_str("application/json") - .expect("Unable to create Content-Type header for PUBSUB_SUB_POST_BAD_REQUEST")); - let body = serde_json::to_string(&body) - .expect("impossible to fail to serialize"); - *response.body_mut() = Body::from(body); - } - }, - Err(_) => { - // Application code returned an error. This should not happen, as the implementation should - // return a valid response. - *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - *response.body_mut() = Body::from("An internal error occurred"); - } - } - - Ok(response) - } - // SwarmConnectPost - POST /swarm/connect hyper::Method::POST if path.matched(paths::ID_SWARM_CONNECT) => { // Query parameters (note that non-required or collection query parameters will ignore garbage values, rather than causing a 400 response) @@ -1712,9 +1448,6 @@ where _ if path.matched(paths::ID_ID) => method_not_allowed(), _ if path.matched(paths::ID_PIN_ADD) => method_not_allowed(), _ if path.matched(paths::ID_PIN_RM) => method_not_allowed(), - _ if path.matched(paths::ID_PUBSUB_LS) => method_not_allowed(), - _ if path.matched(paths::ID_PUBSUB_PUB) => method_not_allowed(), - _ if path.matched(paths::ID_PUBSUB_SUB) => method_not_allowed(), _ if path.matched(paths::ID_SWARM_CONNECT) => method_not_allowed(), _ if path.matched(paths::ID_SWARM_PEERS) => method_not_allowed(), _ if path.matched(paths::ID_VERSION) => method_not_allowed(), @@ -1754,12 +1487,6 @@ impl RequestParser for ApiRequestParser { hyper::Method::POST if path.matched(paths::ID_PIN_ADD) => Some("PinAddPost"), // PinRmPost - POST /pin/rm hyper::Method::POST if path.matched(paths::ID_PIN_RM) => Some("PinRmPost"), - // PubsubLsPost - POST /pubsub/ls - hyper::Method::POST if path.matched(paths::ID_PUBSUB_LS) => Some("PubsubLsPost"), - // PubsubPubPost - POST /pubsub/pub - hyper::Method::POST if path.matched(paths::ID_PUBSUB_PUB) => Some("PubsubPubPost"), - // PubsubSubPost - POST /pubsub/sub - hyper::Method::POST if path.matched(paths::ID_PUBSUB_SUB) => Some("PubsubSubPost"), // SwarmConnectPost - POST /swarm/connect hyper::Method::POST if path.matched(paths::ID_SWARM_CONNECT) => { Some("SwarmConnectPost") diff --git a/kubo-rpc/kubo-rpc.yaml b/kubo-rpc/kubo-rpc.yaml index 9147613f1..31d20cb09 100644 --- a/kubo-rpc/kubo-rpc.yaml +++ b/kubo-rpc/kubo-rpc.yaml @@ -427,83 +427,6 @@ paths: application/json: schema: $ref: '#/components/schemas/Error' - '/pubsub/ls': - post: - summary: List topic with active subscriptions - responses: - '200': - description: success - content: - application/json: - schema: - type: object - required: - - Strings - properties: - Strings: - type: array - items: - type: string - '400': - description: bad request - content: - application/json: - schema: - $ref: '#/components/schemas/Error' - '/pubsub/pub': - post: - summary: Publish a message to a topic - parameters: - - name: arg - in: query - description: Multibase encoded topic name - schema: - type: string - required: true - requestBody: - content: - multipart/form-data: - schema: - type: object - required: - - file - properties: - file: - type: string - format: byte - responses: - '200': - description: success - '400': - description: bad request - content: - application/json: - schema: - $ref: '#/components/schemas/Error' - '/pubsub/sub': - post: - summary: Subscribe to a topic, blocks until a message is received - parameters: - - name: arg - in: query - description: Multibase encoded topic name - schema: - type: string - required: true - responses: - '200': - description: success - content: - application/octet-stream: - schema: - type: string - format: binary - '400': - description: bad request - content: - application/json: - schema: - $ref: '#/components/schemas/Error' '/swarm/peers': post: summary: Report connected peers diff --git a/kubo-rpc/src/http.rs b/kubo-rpc/src/http.rs index e4fd35608..16188fbbf 100644 --- a/kubo-rpc/src/http.rs +++ b/kubo-rpc/src/http.rs @@ -1,7 +1,6 @@ //! Provides an http implementation of the Kubo RPC methods. mod metrics; -mod stream_drop; pub use metrics::{api::MetricsMiddleware, Metrics}; @@ -13,32 +12,25 @@ use ceramic_kubo_rpc_server::{ models::{ self, BlockPutPost200Response, Codecs, DagImportPost200Response, DagPutPost200Response, DagPutPost200ResponseCid, DagResolvePost200Response, DagResolvePost200ResponseCid, - IdPost200Response, Multihash, PinAddPost200Response, PubsubLsPost200Response, + IdPost200Response, Multihash, PinAddPost200Response, SwarmConnectPost200Response, SwarmPeersPost200Response, SwarmPeersPost200ResponsePeersInner, VersionPost200Response, }, Api, BlockGetPostResponse, BlockPutPostResponse, BlockStatPostResponse, DagGetPostResponse, DagImportPostResponse, DagPutPostResponse, DagResolvePostResponse, IdPostResponse, - PinAddPostResponse, PinRmPostResponse, PubsubLsPostResponse, PubsubPubPostResponse, - PubsubSubPostResponse, SwarmConnectPostResponse, SwarmPeersPostResponse, VersionPostResponse, -}; -use cid::{ - multibase::{self, Base}, - Cid, + PinAddPostResponse, PinRmPostResponse, SwarmConnectPostResponse, SwarmPeersPostResponse, + VersionPostResponse, }; +use cid::Cid; use dag_jose::DagJoseCodec; -use futures_util::StreamExt; use go_parse_duration::parse_duration; use libipld::{cbor::DagCborCodec, json::DagJsonCodec, raw::RawCodec}; -use libp2p::{gossipsub::Message, Multiaddr, PeerId}; +use libp2p::{Multiaddr, PeerId}; use multiaddr::Protocol; use serde::Serialize; use swagger::{ApiError, ByteArray}; -use tracing::{instrument, warn, Level}; +use tracing::{instrument, Level}; -use crate::{ - block, dag, http::stream_drop::StreamDrop, id, pin, pubsub, swarm, version, Bytes, - GossipsubEvent, IpfsDep, IpfsPath, -}; +use crate::{block, dag, id, pin, swarm, version, IpfsDep, IpfsPath}; /// Kubo RPC API Server implementation. #[derive(Clone)] @@ -67,7 +59,7 @@ fn create_error(msg: &str) -> models::Error { models::Error { message: msg.to_string(), code: 0f64, - typ: "error".to_string(), + r#type: "error".to_string(), } } @@ -379,112 +371,6 @@ where })) } - #[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))] - async fn pubsub_ls_post(&self, _context: &C) -> Result { - let topics = pubsub::topics(self.ipfs.clone()) - .await - .map_err(to_api_error)? - .into_iter() - .map(|topic| multibase::encode(Base::Base64Url, topic)) - .collect(); - Ok(PubsubLsPostResponse::Success(PubsubLsPost200Response { - strings: topics, - })) - } - - #[instrument(skip(self, _context, file), fields(file.len = file.0.len()), ret(level = Level::DEBUG), err(level = Level::ERROR))] - async fn pubsub_pub_post( - &self, - arg: String, - file: swagger::ByteArray, - _context: &C, - ) -> Result { - let (_base, topic_bytes) = - try_or_bad_request!(multibase::decode(&arg), PubsubPubPostResponse); - - let topic = try_or_bad_request!(String::from_utf8(topic_bytes), PubsubPubPostResponse); - - pubsub::publish(self.ipfs.clone(), topic, file.0.into()) - .await - .map_err(to_api_error)?; - Ok(PubsubPubPostResponse::Success) - } - - #[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))] - async fn pubsub_sub_post( - &self, - arg: String, - _context: &C, - ) -> Result { - let (_base, topic_bytes) = - try_or_bad_request!(multibase::decode(&arg), PubsubSubPostResponse); - - let topic = try_or_bad_request!(String::from_utf8(topic_bytes), PubsubSubPostResponse); - - let subscription = pubsub::subscribe(self.ipfs.clone(), topic.clone()) - .await - .map_err(to_api_error)?; - - #[derive(Serialize)] - struct MessageResponse { - from: String, - data: String, - seqno: String, - #[serde(rename = "topicIDs")] - topic_ids: Vec, - } - - let messages = subscription.map(|event| { - event - .map(|e| { - let m = match e { - // Ignore Subscribed and Unsubscribed events - GossipsubEvent::Subscribed { .. } | GossipsubEvent::Unsubscribed { .. } => { - return Bytes::default() - } - GossipsubEvent::Message { - from: _, - id: _, - message: - Message { - source, - data, - sequence_number, - topic, - }, - } => MessageResponse { - from: source.map(|p| p.to_string()).unwrap_or_default(), - seqno: multibase::encode( - Base::Base64Url, - sequence_number - .map(|seqno| seqno.to_le_bytes()) - .unwrap_or_default(), - ), - data: multibase::encode(Base::Base64Url, data), - topic_ids: vec![multibase::encode( - Base::Base64Url, - topic.to_string().as_bytes(), - )], - }, - }; - let mut data = - serde_json::to_vec(&m).expect("gossip event should serialize to JSON"); - data.push(b'\n'); - data.into() - }) - .map_err(Box::::from) - }); - let ipfs = self.ipfs.clone(); - let messages = StreamDrop::new(messages, async move { - let ret = pubsub::unsubscribe(ipfs.clone(), topic.clone()).await; - if let Err(error) = ret { - warn!(topic, %error, "failed to unsubscribe"); - }; - }); - - Ok(PubsubSubPostResponse::Success(Box::pin(messages))) - } - #[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))] async fn swarm_connect_post( &self, @@ -534,9 +420,11 @@ where swarm::connect(self.ipfs.clone(), peer_id, addrs) .await .map_err(to_api_error)?; - Ok(SwarmConnectPostResponse::Success(PubsubLsPost200Response { - strings: vec![format!("connect {} success", peer_id)], - })) + Ok(SwarmConnectPostResponse::Success( + SwarmConnectPost200Response { + strings: vec![format!("connect {} success", peer_id)], + }, + )) } #[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))] @@ -589,12 +477,9 @@ mod tests { use super::*; use crate::{tests::MockIpfsDepTest, PeerInfo}; - use async_stream::stream; + use bytes::Bytes; use ceramic_metadata::Version; - use cid::multibase::{self, Base}; - use futures_util::TryStreamExt; use libipld::{pb::DagPbCodec, prelude::Decode, Ipld}; - use libp2p::gossipsub::{Message, TopicHash}; use mockall::predicate; use tracing_test::traced_test; @@ -746,7 +631,7 @@ mod tests { Error { message: "block was not found locally (offline)", code: 0.0, - typ: "error", + type: "error", }, ) "#]] @@ -802,7 +687,7 @@ mod tests { Error { message: "Failed to parse multihash", code: 0.0, - typ: "error", + type: "error", }, ) "#]] @@ -878,7 +763,7 @@ mod tests { Error { message: "recursive pinning is not supported", code: 0.0, - typ: "error", + type: "error", }, ) "#]] @@ -934,7 +819,7 @@ mod tests { Error { message: "Failed to parse multihash", code: 0.0, - typ: "error", + type: "error", }, ) "#]] @@ -1037,7 +922,7 @@ mod tests { Error { message: "invalid cid", code: 0.0, - typ: "error", + type: "error", }, ) "#]] @@ -1196,7 +1081,7 @@ mod tests { Error { message: "unsupported codec combination, input-codec: dag-json, store-codec: raw", code: 0.0, - typ: "error", + type: "error", }, ) "#]] @@ -1212,7 +1097,7 @@ mod tests { Error { message: "unsupported codec combination, input-codec: raw, store-codec: dag-cbor", code: 0.0, - typ: "error", + type: "error", }, ) "#]] @@ -1304,7 +1189,7 @@ mod tests { Error { message: "invalid cid", code: 0.0, - typ: "error", + type: "error", }, ) "#]] @@ -1470,7 +1355,7 @@ mod tests { Error { message: "base-58 decode error: provided string contained invalid character 'l' at byte 4", code: 0.0, - typ: "error", + type: "error", }, ) "#]] @@ -1535,7 +1420,7 @@ mod tests { Error { message: "invalid cid", code: 0.0, - typ: "error", + type: "error", }, ) "#]] @@ -1551,7 +1436,7 @@ mod tests { Error { message: "recursive pinning is not supported", code: 0.0, - typ: "error", + type: "error", }, ) "#]] @@ -1567,7 +1452,7 @@ mod tests { Error { message: "pin progress is not supported", code: 0.0, - typ: "error", + type: "error", }, ) "#]] @@ -1613,7 +1498,7 @@ mod tests { Error { message: "invalid cid", code: 0.0, - typ: "error", + type: "error", }, ) "#]] @@ -1662,166 +1547,6 @@ mod tests { "#]] .assert_debug_eq(&resp); } - #[tokio::test] - #[traced_test] - async fn pubsub_ls() { - let mut mock_ipfs = MockIpfsDepTest::new(); - mock_ipfs.expect_clone().once().return_once(|| { - let mut m = MockIpfsDepTest::new(); - m.expect_topics() - .once() - .return_once(|| Ok(vec!["topic1".to_string(), "topic2".to_string()])); - m - }); - let server = Server::new(mock_ipfs); - let resp = server.pubsub_ls_post(&Context).await.unwrap(); - - expect![[r#" - Success( - PubsubLsPost200Response { - strings: [ - "udG9waWMx", - "udG9waWMy", - ], - }, - ) - "#]] - .assert_debug_eq(&resp); - } - - #[tokio::test] - #[traced_test] - async fn pubsub_pub() { - let topic = "test-topic".to_string(); - let topic_encoded = multibase::encode(Base::Base64, topic.as_bytes()); - let message = b"message bytes"; - let message_bytes = Bytes::from(&message[..]); - - let mut mock_ipfs = MockIpfsDepTest::new(); - mock_ipfs.expect_clone().once().return_once(move || { - let mut m = MockIpfsDepTest::new(); - m.expect_publish() - .once() - .with(predicate::eq(topic), predicate::eq(message_bytes)) - .return_once(|_, _| Ok(())); - m - }); - let server = Server::new(mock_ipfs); - let resp = server - .pubsub_pub_post(topic_encoded, ByteArray(message.to_vec()), &Context) - .await - .unwrap(); - - expect![[r#" - Success - "#]] - .assert_debug_eq(&resp); - } - #[tokio::test] - #[traced_test] - async fn pubsub_sub() { - let topic = "test-topic".to_string(); - let topic_encoded = multibase::encode(Base::Base64, topic.as_bytes()); - - let mut mock_ipfs = MockIpfsDepTest::new(); - let t = topic.clone(); - mock_ipfs.expect_clone().once().return_once(move || { - let mut m = MockIpfsDepTest::new(); - m.expect_subscribe() - .once() - .with(predicate::eq(t)) - .return_once(|_| { - let first = Ok(GossipsubEvent::Message { - from: PeerId::from_str( - "12D3KooWHUfjwiTRVV8jxFcKRSQTPatayC4nQCNX86oxRF5XWzGe", - ) - .unwrap(), - id: libp2p::gossipsub::MessageId::new(&[]), - message: Message { - source: Some( - PeerId::from_str( - "12D3KooWM68GyFKBT9JsuTRB6CYkF61PtMuSkynUauSQEGBX51JW", - ) - .unwrap(), - ), - data: "message 1".as_bytes().to_vec(), - sequence_number: Some(0), - topic: TopicHash::from_raw("topicA"), - }, - }); - let second = Ok(GossipsubEvent::Message { - from: PeerId::from_str( - "12D3KooWGnKwtpSh2ZLTvoC8mjiexMNRLNkT92pxq7MDgyJHktNJ", - ) - .unwrap(), - id: libp2p::gossipsub::MessageId::new(&[]), - message: Message { - source: Some( - PeerId::from_str( - "12D3KooWQVU9Pv3BqD6bD9w96tJxLedKCj4VZ75oqX9Tav4R4rUS", - ) - .unwrap(), - ), - data: "message 2".as_bytes().to_vec(), - sequence_number: Some(1), - topic: TopicHash::from_raw("topicA"), - }, - }); - - Ok(Box::pin(stream! { - yield first; - yield second; - })) - }); - m - }); - let t = topic.clone(); - mock_ipfs.expect_clone().once().return_once(move || { - let mut m = MockIpfsDepTest::new(); - m.expect_unsubscribe() - .once() - .with(predicate::eq(t)) - .return_once(|_| Ok(())); - m - }); - let server = Server::new(mock_ipfs); - let resp = server - .pubsub_sub_post(topic_encoded, &Context) - .await - .unwrap(); - - if let PubsubSubPostResponse::Success(body) = resp { - let messages: Result, _> = body.try_collect().await; - let messages: Vec = messages - .unwrap() - .into_iter() - .map(|m| UnquotedString(bytes_to_pretty_str(m.to_vec()))) - .collect(); - expect![[r#" - [ - { - "data": "ubWVzc2FnZSAx", - "from": "12D3KooWM68GyFKBT9JsuTRB6CYkF61PtMuSkynUauSQEGBX51JW", - "seqno": "uAAAAAAAAAAA", - "topicIDs": [ - "udG9waWNB" - ] - }, - { - "data": "ubWVzc2FnZSAy", - "from": "12D3KooWQVU9Pv3BqD6bD9w96tJxLedKCj4VZ75oqX9Tav4R4rUS", - "seqno": "uAQAAAAAAAAA", - "topicIDs": [ - "udG9waWNB" - ] - }, - ] - "#]] - .assert_debug_eq(&messages); - } else { - panic!("did not get success from server"); - } - } #[tokio::test] #[traced_test] @@ -1846,7 +1571,7 @@ mod tests { expect![[r#" Success( - PubsubLsPost200Response { + SwarmConnectPost200Response { strings: [ "connect 12D3KooWFtPWZ1uHShnbvmxYJGmygUfTVmcb6iSQfiAm4XnmsQ8t success", ], @@ -1870,7 +1595,7 @@ mod tests { Error { message: "no peer id specificed in multiaddrs", code: 0.0, - typ: "error", + type: "error", }, ) "#]] diff --git a/kubo-rpc/src/http/metrics/api.rs b/kubo-rpc/src/http/metrics/api.rs index 406c56b8d..4d97c2592 100644 --- a/kubo-rpc/src/http/metrics/api.rs +++ b/kubo-rpc/src/http/metrics/api.rs @@ -2,9 +2,8 @@ use async_trait::async_trait; use ceramic_kubo_rpc_server::{ models, Api, BlockGetPostResponse, BlockPutPostResponse, BlockStatPostResponse, DagGetPostResponse, DagImportPostResponse, DagPutPostResponse, DagResolvePostResponse, - IdPostResponse, PinAddPostResponse, PinRmPostResponse, PubsubLsPostResponse, - PubsubPubPostResponse, PubsubSubPostResponse, SwarmConnectPostResponse, SwarmPeersPostResponse, - VersionPostResponse, + IdPostResponse, PinAddPostResponse, PinRmPostResponse, SwarmConnectPostResponse, + SwarmPeersPostResponse, VersionPostResponse, }; use ceramic_metrics::Recorder; use futures_util::Future; @@ -162,33 +161,6 @@ where .await } - /// List topic with active subscriptions - async fn pubsub_ls_post(&self, context: &C) -> Result { - self.record("/pubsub/ls", self.api.pubsub_ls_post(context)) - .await - } - - /// Publish a message to a topic - async fn pubsub_pub_post( - &self, - arg: String, - file: ByteArray, - context: &C, - ) -> Result { - self.record("/pubsub/pub", self.api.pubsub_pub_post(arg, file, context)) - .await - } - - /// Subscribe to a topic, blocks until a message is received - async fn pubsub_sub_post( - &self, - arg: String, - context: &C, - ) -> Result { - self.record("/pubsub/sub", self.api.pubsub_sub_post(arg, context)) - .await - } - /// Connect to peers async fn swarm_connect_post( &self, diff --git a/kubo-rpc/src/http/stream_drop.rs b/kubo-rpc/src/http/stream_drop.rs deleted file mode 100644 index 537a7a1cc..000000000 --- a/kubo-rpc/src/http/stream_drop.rs +++ /dev/null @@ -1,57 +0,0 @@ -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - -use futures_util::{Future, Stream}; -use pin_project::{pin_project, pinned_drop}; - -/// Wraps a stream and on drop spawns a future as its own task. -/// The future is not gauranteed to complete. -#[pin_project(PinnedDrop)] -pub struct StreamDrop -where - D: Future + Send + 'static, -{ - #[pin] - stream: S, - drop_fut: Option, -} - -impl StreamDrop -where - D: Future + Send + 'static, -{ - pub fn new(stream: S, drop_fn: D) -> Self { - Self { - stream, - drop_fut: Some(drop_fn), - } - } -} - -impl Stream for StreamDrop -where - S: Stream, - D: Future + Send + 'static, -{ - type Item = S::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - Stream::poll_next(this.stream, cx) - } -} - -#[pinned_drop] -impl PinnedDrop for StreamDrop -where - D: Future + Send + 'static, -{ - fn drop(self: Pin<&mut Self>) { - let this = self.project(); - if let Some(drop_fn) = this.drop_fut.take() { - tokio::spawn(drop_fn); - } - } -} diff --git a/kubo-rpc/src/ipfs_metrics.rs b/kubo-rpc/src/ipfs_metrics.rs index df75be4f7..27aba102c 100644 --- a/kubo-rpc/src/ipfs_metrics.rs +++ b/kubo-rpc/src/ipfs_metrics.rs @@ -5,8 +5,7 @@ use bytes::Bytes; use ceramic_metadata::Version; use ceramic_metrics::Recorder; use cid::Cid; -use futures_util::{stream::BoxStream, Future}; -use iroh_rpc_types::GossipsubEvent; +use futures_util::Future; use libipld::Ipld; use libp2p_identity::PeerId; use multiaddr::Multiaddr; @@ -140,22 +139,6 @@ where self.record("connect", self.ipfs.connect(peer_id, addrs)) .await } - async fn publish(&self, topic: String, data: Bytes) -> Result<(), Error> { - self.record("publish", self.ipfs.publish(topic, data)).await - } - async fn subscribe( - &self, - topic: String, - ) -> Result>, Error> { - self.record("subscribe", self.ipfs.subscribe(topic)).await - } - async fn unsubscribe(&self, topic: String) -> Result<(), Error> { - self.record("unsubscribe", self.ipfs.unsubscribe(topic)) - .await - } - async fn topics(&self) -> Result, Error> { - self.record("topics", self.ipfs.topics()).await - } async fn version(&self) -> Result { self.record("version", self.ipfs.version()).await } diff --git a/kubo-rpc/src/lib.rs b/kubo-rpc/src/lib.rs index 86bcf6cdc..86532f388 100644 --- a/kubo-rpc/src/lib.rs +++ b/kubo-rpc/src/lib.rs @@ -10,27 +10,21 @@ use std::{ fmt::{self, Display, Formatter}, io::Cursor, path::PathBuf, - sync::{ - atomic::{AtomicUsize, Ordering}, - Mutex, - }, + sync::atomic::{AtomicUsize, Ordering}, }; use std::{str::FromStr, sync::Arc}; use anyhow::{anyhow, Context}; use async_trait::async_trait; use dag_jose::DagJoseCodec; -use futures_util::stream::BoxStream; use iroh_rpc_client::P2pClient; use libipld::{cbor::DagCborCodec, json::DagJsonCodec, prelude::Decode}; -use libp2p::gossipsub::TopicHash; use tracing::{error, instrument, trace}; // Pub use any types we export as part of an trait or struct pub use bytes::Bytes; pub use ceramic_metadata::Version; pub use cid::Cid; -pub use iroh_rpc_types::GossipsubEvent; pub use libipld::Ipld; pub use libp2p::Multiaddr; pub use libp2p_identity::PeerId; @@ -46,7 +40,6 @@ pub mod http; pub mod id; mod ipfs_metrics; pub mod pin; -pub mod pubsub; pub mod swarm; pub mod version; @@ -164,17 +157,6 @@ pub trait IpfsDep: Clone { async fn peers(&self) -> Result>, Error>; /// Connect to a specific peer node. async fn connect(&self, peer_id: PeerId, addrs: Vec) -> Result<(), Error>; - /// Publish a message on a pub/sub topic. - async fn publish(&self, topic: String, data: Bytes) -> Result<(), Error>; - /// Subscribe to a pub/sub topic. - async fn subscribe( - &self, - topic: String, - ) -> Result>, Error>; - /// Unsubscribe from a pub/sub topic. - async fn unsubscribe(&self, topic: String) -> Result<(), Error>; - /// List topics to which, we are currently subscribed - async fn topics(&self) -> Result, Error>; /// Current version of ceramic async fn version(&self) -> Result; } @@ -184,7 +166,6 @@ pub struct IpfsService { p2p: P2pClient, store: SQLiteBlockStore, resolver: Resolver, - topics: Mutex>, } impl IpfsService { @@ -200,7 +181,6 @@ impl IpfsService { p2p, store, resolver, - topics: Mutex::new(HashMap::new()), } } } @@ -300,76 +280,6 @@ impl IpfsDep for Arc { .map_err(Error::Internal)?) } #[instrument(skip(self))] - async fn publish(&self, topic: String, data: Bytes) -> Result<(), Error> { - let topic = TopicHash::from_raw(topic); - self.p2p - .gossipsub_publish(topic, data) - .await - .map_err(Error::Internal)?; - Ok(()) - } - #[instrument(skip(self))] - async fn subscribe( - &self, - topic: String, - ) -> Result>, Error> { - { - self.topics - .lock() - .expect("should be able to lock topics set") - .entry(topic.clone()) - .and_modify(|count| *count += 1) - .or_insert(1); - } - let topic = TopicHash::from_raw(topic); - Ok(Box::pin( - self.p2p - .gossipsub_subscribe(topic) - .await - .map_err(Error::Internal)?, - )) - } - #[instrument(skip(self))] - async fn unsubscribe(&self, topic: String) -> Result<(), Error> { - let count = { - let mut topics = self - .topics - .lock() - .expect("should be able to lock topics set"); - - let count = if let Some(count) = topics.get_mut(&topic) { - *count -= 1; - *count - } else { - 0 - }; - if count <= 0 { - topics.remove(&topic); - } - count - }; - let topic = TopicHash::from_raw(topic); - // Only unsubscribe if this is the last subscription - if count <= 0 { - self.p2p - .gossipsub_unsubscribe(topic) - .await - .map_err(Error::Internal)?; - } - Ok(()) - } - #[instrument(skip(self))] - async fn topics(&self) -> Result, Error> { - Ok(self - .p2p - .gossipsub_topics() - .await - .map_err(Error::Internal)? - .iter() - .map(|t| t.to_string()) - .collect()) - } - #[instrument(skip(self))] async fn version(&self) -> Result { Ok(ceramic_metadata::Version::default()) } @@ -539,10 +449,6 @@ pub(crate) mod tests { async fn resolve(&self, ipfs_path: &IpfsPath) -> Result<(Cid, String), Error>; async fn peers(&self) -> Result>, Error>; async fn connect(&self, peer_id: PeerId, addrs: Vec) -> Result<(), Error>; - async fn publish(&self, topic: String, data: Bytes) -> Result<(), Error>; - async fn subscribe( &self, topic: String) -> Result>, Error>; - async fn unsubscribe(&self, topic: String) -> Result<(), Error>; - async fn topics(&self) -> Result, Error>; async fn version(&self) -> Result; } impl Clone for IpfsDepTest { diff --git a/kubo-rpc/src/pubsub.rs b/kubo-rpc/src/pubsub.rs deleted file mode 100644 index eff1ee632..000000000 --- a/kubo-rpc/src/pubsub.rs +++ /dev/null @@ -1,42 +0,0 @@ -//! Publish Subscribe API - -use futures_util::stream::BoxStream; - -use crate::{error::Error, Bytes, GossipsubEvent, IpfsDep}; - -/// Publish a message to a topic -#[tracing::instrument(skip(client, data))] -pub async fn publish(client: T, topic: String, data: Bytes) -> Result<(), Error> -where - T: IpfsDep, -{ - client.publish(topic, data).await?; - Ok(()) -} -/// Subscribe to a topic returning a stream of messages from that topic -#[tracing::instrument(skip(client))] -pub async fn subscribe( - client: T, - topic: String, -) -> Result>, Error> -where - T: IpfsDep, -{ - client.subscribe(topic).await -} -/// Unsubscribe from a topic -#[tracing::instrument(skip(client), ret)] -pub async fn unsubscribe(client: T, topic: String) -> Result<(), Error> -where - T: IpfsDep, -{ - client.unsubscribe(topic).await -} -/// Returns a list of topics, to which we are currently subscribed. -#[tracing::instrument(skip(client))] -pub async fn topics(client: T) -> Result, Error> -where - T: IpfsDep, -{ - client.topics().await -} diff --git a/one/src/lib.rs b/one/src/lib.rs index 7a3c2119a..1196e2885 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -5,36 +5,29 @@ mod events; mod http; mod metrics; mod network; -mod pubsub; mod recon_loop; mod sql; -use std::{env, num::NonZeroUsize, path::PathBuf, str::FromStr, sync::Arc, time::Duration}; +use std::{env, num::NonZeroUsize, path::PathBuf, time::Duration}; use anyhow::{anyhow, Result}; use ceramic_core::{EventId, Interest, PeerId}; -use ceramic_kubo_rpc::{dag, IpfsDep, IpfsPath, Multiaddr}; +use ceramic_kubo_rpc::Multiaddr; -use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle, Recorder}; +use ceramic_metrics::{config::Config as MetricsConfig, MetricsHandle}; use ceramic_p2p::{load_identity, DiskStorage, Keychain, Libp2pConfig}; use clap::{Args, Parser, Subcommand, ValueEnum}; use futures::StreamExt; -use futures_util::future; -use libipld::json::DagJsonCodec; use multibase::Base; use multihash::{Code, Hasher, Multihash, MultihashDigest}; use recon::{FullInterests, Recon, ReconInterestProvider, SQLiteStore, Server, Sha256a}; use signal_hook::consts::signal::*; use signal_hook_tokio::Signals; use swagger::{auth::MakeAllowAllAuthenticator, EmptyContext}; -use tokio::{io::AsyncReadExt, sync::oneshot, task, time::timeout}; +use tokio::{io::AsyncReadExt, sync::oneshot}; use tracing::{debug, info, warn}; -use crate::{ - metrics::{Metrics, TipLoadResult}, - network::Ipfs, - pubsub::Message, -}; +use crate::network::Ipfs; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -47,8 +40,6 @@ struct Cli { enum Command { /// Run a daemon process Daemon(DaemonOpts), - /// Run a process that locally pins all stream tips - Eye(EyeOpts), /// Event store tools #[command(subcommand)] Events(events::EventsCommand), @@ -283,12 +274,6 @@ impl Network { } } -#[derive(Args, Debug)] -struct EyeOpts { - #[command(flatten)] - daemon: DaemonOpts, -} - /// Run the ceramic one binary process pub async fn run() -> Result<()> { let args = Cli::parse(); @@ -297,7 +282,6 @@ pub async fn run() -> Result<()> { let daemon = Daemon::build(opts).await?; daemon.run().await } - Command::Eye(opts) => eye(opts).await, Command::Events(opts) => events::events(opts).await, } } @@ -316,7 +300,6 @@ struct Daemon { network: ceramic_core::Network, ipfs: Ipfs, metrics_handle: MetricsHandle, - metrics: Arc, recon_interest: ReconInterest, recon_model: ReconModel, } @@ -341,10 +324,11 @@ impl Daemon { }; info.apply_to_metrics_config(&mut metrics_config); - let metrics = ceramic_metrics::MetricsHandle::register(|registry| { + // Currently only an info metric is recoreded so we do not need to keep the handle to the + // Metrics struct. That will change once we add more metrics. + let _metrics = ceramic_metrics::MetricsHandle::register(|registry| { crate::metrics::Metrics::register(info.clone(), registry) }); - let metrics = Arc::new(metrics); // Logging Tracing and metrics are initialized here, // debug,info etc will not work until after this line @@ -390,7 +374,6 @@ impl Daemon { autonat: !opts.disable_autonat, relay_server: true, relay_client: true, - gossipsub: true, max_conns_out: opts.max_conns_out, max_conns_in: opts.max_conns_in, max_conns_pending_out: opts.max_conns_pending_out, @@ -487,7 +470,6 @@ impl Daemon { network, ipfs, metrics_handle, - metrics, recon_interest, recon_model, }) @@ -620,99 +602,6 @@ async fn handle_signals(mut signals: Signals, shutdown: oneshot::Sender<()>) { } } -async fn eye(opts: EyeOpts) -> Result<()> { - let daemon = Daemon::build(opts.daemon).await?; - - // Start subscription - let subscription = daemon - .ipfs - .api() - .subscribe(daemon.network.name().clone()) - .await?; - - let client = daemon.ipfs.api(); - let metrics = daemon.metrics.clone(); - - let p2p_events_handle = task::spawn(subscription.for_each(move |event| { - match event.expect("should be a message") { - ceramic_kubo_rpc::GossipsubEvent::Subscribed { .. } => {} - ceramic_kubo_rpc::GossipsubEvent::Unsubscribed { .. } => {} - ceramic_kubo_rpc::GossipsubEvent::Message { - // From is the direct peer that forwarded the message - from: _, - id: _, - message: pubsub_msg, - } => { - let ceramic_msg: Message = serde_json::from_slice(pubsub_msg.data.as_slice()) - .expect("should be json message"); - info!(?ceramic_msg); - match &ceramic_msg { - Message::Update { - stream: _, - tip, - model: _, - } => { - if let Ok(ipfs_path) = IpfsPath::from_str(tip) { - // Spawn task to get the data for a stream tip when we see one - let client = client.clone(); - let metrics = metrics.clone(); - task::spawn(async move { load_tip(client, metrics, &ipfs_path).await }); - } else { - warn!("invalid update tip: {}", tip) - } - } - Message::Response { id: _, tips } => { - for tip in tips.values() { - if let Ok(ipfs_path) = IpfsPath::from_str(tip) { - // Spawn task to get the data for a stream tip when we see one - let client = client.clone(); - let metrics = metrics.clone(); - task::spawn( - async move { load_tip(client, metrics, &ipfs_path).await }, - ); - } else { - warn!("invalid update tip: {}", tip) - } - } - } - _ => {} - }; - metrics.record(&(pubsub_msg.source, ceramic_msg)); - } - } - future::ready(()) - })); - - daemon.run().await?; - - p2p_events_handle.abort(); - p2p_events_handle.await.ok(); - Ok(()) -} - -async fn load_tip(client: T, metrics: Arc, ipfs_path: &IpfsPath) { - let result = timeout( - Duration::from_secs(60 * 60), - dag::get(client, ipfs_path, DagJsonCodec), - ) - .await; - let lr = match result { - Ok(Ok(_)) => { - info!("succeed in loading stream tip: {}", ipfs_path); - TipLoadResult::Success - } - Ok(Err(err)) => { - warn!("failed to load stream tip: {}", err); - TipLoadResult::Failure - } - Err(_) => { - warn!("timeout loading stream tip"); - TipLoadResult::Failure - } - }; - metrics.record(&lr); -} - /// Static information about the current process. #[derive(Debug, Clone)] pub struct Info { diff --git a/one/src/metrics.rs b/one/src/metrics.rs index 93aeeda3b..9e1d38cb8 100644 --- a/one/src/metrics.rs +++ b/one/src/metrics.rs @@ -1,50 +1,14 @@ use std::{convert::Infallible, net::SocketAddr}; use anyhow::Result; -use ceramic_kubo_rpc::PeerId; -use ceramic_metrics::Recorder; use hyper::{ http::HeaderValue, service::{make_service_fn, service_fn}, Body, Request, Response, }; -use prometheus_client::{encoding::EncodeLabelSet, metrics::counter::Counter}; -use prometheus_client::{encoding::EncodeLabelValue, metrics::family::Family}; -use prometheus_client::{metrics::info::Info, registry::Registry}; +use prometheus_client::{encoding::EncodeLabelSet, metrics::info::Info, registry::Registry}; use tokio::{sync::oneshot, task::JoinHandle}; -use crate::pubsub; - -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -struct MsgLabels { - msg_type: MsgType, -} - -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)] -enum MsgType { - Update, - Query, - Response, - Keepalive, -} - -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -struct PeerLabels { - peer_id: String, - version: String, -} - -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] -struct TipLoadLabels { - result: TipLoadResult, -} - -#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)] -pub enum TipLoadResult { - Success, - Failure, -} - #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] struct InfoLabels { service_name: String, @@ -66,99 +30,16 @@ impl From for InfoLabels { } } -pub struct Metrics { - messages: Family, - peers: Family, - tip_loads: Family, -} +pub struct Metrics {} impl Metrics { pub fn register(info: crate::Info, registry: &mut Registry) -> Self { let sub_registry = registry.sub_registry_with_prefix("ceramic"); - let messages = Family::::default(); - - // Create each combination of labels so that we have explicit zeros reported - // until the first message arrives. - messages - .get_or_create(&MsgLabels { - msg_type: MsgType::Update, - }) - .get(); - messages - .get_or_create(&MsgLabels { - msg_type: MsgType::Query, - }) - .get(); - messages - .get_or_create(&MsgLabels { - msg_type: MsgType::Response, - }) - .get(); - messages - .get_or_create(&MsgLabels { - msg_type: MsgType::Keepalive, - }) - .get(); - - sub_registry.register( - "pubsub_messages", - "Number of ceramic pubsub messages received", - messages.clone(), - ); - - let peers = Family::::default(); - sub_registry.register( - "peers", - "Number of keepalive messages from each peer, useful for understanding network topology", - peers.clone(), - ); - - let tip_loads = Family::::default(); - sub_registry.register("tip_loads", "Number tip loads", tip_loads.clone()); - let info: Info = Info::new(info.into()); sub_registry.register("one", "Information about the ceramic-one process", info); - Self { - messages, - peers, - tip_loads, - } - } -} - -impl Recorder<(Option, pubsub::Message)> for Metrics { - fn record(&self, event: &(Option, pubsub::Message)) { - let msg_type = match &event.1 { - pubsub::Message::Update { .. } => MsgType::Update, - pubsub::Message::Query { .. } => MsgType::Query, - pubsub::Message::Response { .. } => MsgType::Response, - pubsub::Message::Keepalive { ver, .. } => { - // Record peers total - if let Some(source) = event.0 { - self.peers - .get_or_create(&PeerLabels { - peer_id: source.to_string(), - version: ver.to_owned(), - }) - .inc(); - } - MsgType::Keepalive - } - }; - // Record messages total - self.messages.get_or_create(&MsgLabels { msg_type }).inc(); - } -} - -impl Recorder for Metrics { - fn record(&self, result: &TipLoadResult) { - self.tip_loads - .get_or_create(&TipLoadLabels { - result: result.clone(), - }) - .inc(); + Self {} } } diff --git a/one/src/pubsub.rs b/one/src/pubsub.rs deleted file mode 100644 index 851724294..000000000 --- a/one/src/pubsub.rs +++ /dev/null @@ -1,329 +0,0 @@ -use std::{collections::HashMap, fmt}; - -use serde::de; -use serde::Deserializer; -use serde_repr::{Deserialize_repr, Serialize_repr}; - -#[derive(Debug, Serialize_repr, Deserialize_repr)] -#[repr(i8)] -pub enum MessageType { - Update = 0, - Query = 1, - Response = 2, - Keepalive = 3, -} - -#[derive(Debug)] -pub enum Message { - Update { - stream: String, - tip: String, - model: Option, - }, - Query { - id: String, - stream: String, - }, - Response { - id: String, - tips: HashMap, - }, - Keepalive { - ts: i64, - ver: String, - }, -} - -// We need manually implement deserialize for Message until this feature is implemented -// https://github.com/serde-rs/serde/issues/745 -// PR implementing the feature https://github.com/serde-rs/serde/pull/2056 -impl<'de> de::Deserialize<'de> for Message { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - struct MessageVisitor; - - impl<'de> de::Visitor<'de> for MessageVisitor { - type Value = Message; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("message type") - } - - fn visit_map(self, mut map: V) -> Result - where - V: de::MapAccess<'de>, - { - if let Some(key) = map.next_key::()? { - if key != "typ" { - return Err(de::Error::missing_field("typ")); - } - let typ: MessageType = map.next_value()?; - match typ { - MessageType::Update => { - let mut stream: Option = None; - let mut tip: Option = None; - let mut model: Option = None; - loop { - match map.next_key::<&str>()? { - Some("stream") => { - stream = Some(map.next_value()?); - } - Some("tip") => { - tip = Some(map.next_value()?); - } - Some("model") => { - model = Some(map.next_value()?); - } - // Explicitly ignore the doc field - Some("doc") => { - map.next_value::<&str>()?; - } - // Error on unknown fields - Some(k) => { - return Err(de::Error::unknown_field( - k, - &["stream", "tip", "model", "doc"], - )); - } - None => { - // We are done, validate we got all fields and return - if let Some(stream) = stream { - if let Some(tip) = tip { - return Ok(Message::Update { stream, tip, model }); - } else { - return Err(de::Error::missing_field("tip")); - } - } else { - return Err(de::Error::missing_field("stream")); - } - } - } - } - } - MessageType::Query => { - let mut id: Option = None; - let mut stream: Option = None; - loop { - match map.next_key::<&str>()? { - Some("id") => { - id = Some(map.next_value()?); - } - Some("stream") => { - stream = Some(map.next_value()?); - } - // Explicitly ignore the doc field - Some("doc") => { - map.next_value::<&str>()?; - } - // Error on unknown fields - Some(k) => { - return Err(de::Error::unknown_field( - k, - &["id", "stream", "doc"], - )); - } - None => { - // We are done, validate we got all fields and return - if let Some(stream) = stream { - if let Some(id) = id { - return Ok(Message::Query { stream, id }); - } else { - return Err(de::Error::missing_field("id")); - } - } else { - return Err(de::Error::missing_field("stream")); - } - } - } - } - } - MessageType::Response => { - let mut id: Option = None; - let mut tips: Option> = None; - loop { - match map.next_key::<&str>()? { - Some("id") => { - id = Some(map.next_value()?); - } - Some("tips") => { - tips = Some(map.next_value()?); - } - // Error on unknown fields - Some(k) => { - return Err(de::Error::unknown_field(k, &["id", "tips"])); - } - None => { - // We are done, validate we got all fields and return - if let Some(id) = id { - if let Some(tips) = tips { - return Ok(Message::Response { id, tips }); - } else { - return Err(de::Error::missing_field("tips")); - } - } else { - return Err(de::Error::missing_field("id")); - } - } - } - } - } - MessageType::Keepalive => { - let mut ts: Option = None; - let mut ver: Option = None; - loop { - match map.next_key::<&str>()? { - Some("ts") => { - ts = Some(map.next_value()?); - } - Some("ver") => { - ver = Some(map.next_value()?); - } - // Error on unknown fields - Some(k) => { - return Err(de::Error::unknown_field(k, &["ts", "ver"])); - } - None => { - // We are done, validate we got all fields and return - if let Some(ts) = ts { - if let Some(ver) = ver { - return Ok(Message::Keepalive { ts, ver }); - } else { - return Err(de::Error::missing_field("ver")); - } - } else { - return Err(de::Error::missing_field("ts")); - } - } - } - } - } - } - } else { - Err(de::Error::missing_field("typ")) - } - } - } - - deserializer.deserialize_map(MessageVisitor {}) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use expect_test::{expect, Expect}; - - fn deserialize(json: &str, expect: Expect) { - let msg: Message = serde_json::from_str(json).unwrap(); - expect.assert_debug_eq(&msg); - } - - #[test] - fn test_de_update() { - // Test with minimum required fields - deserialize( - r#"{ - "typ":0, - "stream": "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39", - "tip": "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39" -}"#, - expect![[r#" - Update { - stream: "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39", - tip: "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39", - model: None, - } - "#]], - ); - // Test with maximum fields - deserialize( - r#"{ - "typ":0, - "stream": "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39", - "doc": "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39", - "model": "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39", - "tip": "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39" -}"#, - expect![[r#" - Update { - stream: "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39", - tip: "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39", - model: Some( - "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39", - ), - } - "#]], - ); - } - - #[test] - fn test_de_query() { - // Test with minimum required fields - deserialize( - r#"{ - "typ":1, - "id":"EiDJzhPfx5LNWKc2G49O42NvgfXb76bhKwTTdXapSujoGg", - "stream": "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39" -}"#, - expect![[r#" - Query { - id: "EiDJzhPfx5LNWKc2G49O42NvgfXb76bhKwTTdXapSujoGg", - stream: "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39", - } - "#]], - ); - // Test with maximum fields - deserialize( - r#"{ - "typ":1, - "id":"EiDJzhPfx5LNWKc2G49O42NvgfXb76bhKwTTdXapSujoGg", - "stream": "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39", - "doc":"kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39" -}"#, - expect![[r#" - Query { - id: "EiDJzhPfx5LNWKc2G49O42NvgfXb76bhKwTTdXapSujoGg", - stream: "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39", - } - "#]], - ); - } - #[test] - fn test_de_response() { - // Test with minimum required fields - deserialize( - r#"{ - "typ":2, - "id":"EiDJzhPfx5LNWKc2G49O42NvgfXb76bhKwTTdXapSujoGg", - "tips": {"kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39": - "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39"} -}"#, - expect![[r#" - Response { - id: "EiDJzhPfx5LNWKc2G49O42NvgfXb76bhKwTTdXapSujoGg", - tips: { - "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39": "kjzl6cwe1jw145as2el62s5k2n5xwij7snqu1mngluhpr05xy5wylfswe1zvq39", - }, - } - "#]], - ); - } - #[test] - fn test_de_keepalive() { - deserialize( - r#"{ - "typ":3, - "ts": 5, - "ver": "2.23.0" -}"#, - expect![[r#" - Keepalive { - ts: 5, - ver: "2.23.0", - } - "#]], - ); - } -} diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index e2e6ba02b..61924abb7 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -56,7 +56,6 @@ features = [ "dcutr", "dns", "ed25519", - "gossipsub", "identify", "kad", "macros", diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index 609bdf7ec..4d2f3cff7 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -9,9 +9,7 @@ use iroh_rpc_client::Client; use libp2p::{ autonat, connection_limits::{self, ConnectionLimits}, - dcutr, - gossipsub::{self, MessageAuthenticity}, - identify, + dcutr, identify, kad::{ self, store::{MemoryStore, MemoryStoreConfig}, @@ -58,7 +56,6 @@ pub(crate) struct NodeBehaviour { relay: Toggle, relay_client: Toggle, dcutr: Toggle, - pub(crate) gossipsub: Toggle, recon: Toggle>, } @@ -211,19 +208,6 @@ where identify::Behaviour::new(config) }; - let gossipsub = if config.gossipsub { - info!("init gossipsub"); - let gossipsub_config = gossipsub::Config::default(); - let message_authenticity = MessageAuthenticity::Signed(local_key.clone()); - Some( - gossipsub::Behaviour::new(message_authenticity, gossipsub_config) - .map_err(|e| anyhow::anyhow!("{}", e))?, - ) - } else { - None - } - .into(); - let limits = connection_limits::Behaviour::new( ConnectionLimits::default() .with_max_established_outgoing(Some(config.max_conns_out)) @@ -245,7 +229,6 @@ where relay, dcutr: dcutr.into(), relay_client: relay_client.into(), - gossipsub, peer_manager: CeramicPeerManager::new(&config.ceramic_peers, metrics)?, limits, recon: recon.into(), diff --git a/p2p/src/behaviour/event.rs b/p2p/src/behaviour/event.rs index 9e98e2b3c..1934d98b0 100644 --- a/p2p/src/behaviour/event.rs +++ b/p2p/src/behaviour/event.rs @@ -1,5 +1,5 @@ use iroh_bitswap::BitswapEvent; -use libp2p::{autonat, dcutr, gossipsub, identify, kad, mdns, ping, relay}; +use libp2p::{autonat, dcutr, identify, kad, mdns, ping, relay}; use super::ceramic_peer_manager::PeerManagerEvent; @@ -17,7 +17,6 @@ pub enum Event { Relay(relay::Event), RelayClient(relay::client::Event), Dcutr(dcutr::Event), - Gossipsub(gossipsub::Event), PeerManager(PeerManagerEvent), Recon(recon::libp2p::Event), Void, @@ -53,12 +52,6 @@ impl From for Event { } } -impl From for Event { - fn from(event: gossipsub::Event) -> Self { - Event::Gossipsub(event) - } -} - impl From for Event { fn from(event: autonat::Event) -> Self { Event::Autonat(event) diff --git a/p2p/src/config.rs b/p2p/src/config.rs index dafd2e58c..71e3e294d 100644 --- a/p2p/src/config.rs +++ b/p2p/src/config.rs @@ -52,8 +52,6 @@ pub struct Libp2pConfig { pub relay_server: bool, /// Relay client enabled. pub relay_client: bool, - /// Gossipsub enabled. - pub gossipsub: bool, pub max_conns_out: u32, pub max_conns_in: u32, pub max_conns_pending_out: u32, @@ -119,7 +117,6 @@ impl Default for Libp2pConfig { autonat: true, relay_server: true, relay_client: true, - gossipsub: true, bitswap_client: true, bitswap_server: true, max_conns_pending_out: 256, diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 04f862a98..ee82f2cff 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -14,7 +14,6 @@ pub use self::config::*; pub use self::keys::{DiskStorage, Keychain, MemoryStorage}; pub use self::metrics::Metrics; pub use self::node::*; -pub use iroh_rpc_types::{GossipsubEvent, GossipsubEventStream}; pub use libp2p::PeerId; pub use sqliteblockstore::{SQLiteBlock, SQLiteBlockStore}; diff --git a/p2p/src/node.rs b/p2p/src/node.rs index 77a828e9c..cb4ec3dce 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -18,7 +18,6 @@ use iroh_rpc_types::p2p::P2pAddr; use libp2p::{ autonat::{self, OutboundProbeEvent}, core::Multiaddr, - gossipsub::IdentTopic, identify, identity::Keypair, kad::{ @@ -49,7 +48,7 @@ use crate::{ rpc::{self, RpcMessage}, rpc::{P2p, ProviderRequestKey}, swarm::build_swarm, - Config, GossipsubEvent, + Config, }; use recon::{libp2p::Recon, Sha256a}; @@ -58,7 +57,6 @@ use recon::{libp2p::Recon, Sha256a}; pub enum NetworkEvent { PeerConnected(PeerId), PeerDisconnected(PeerId), - Gossipsub(GossipsubEvent), CancelLookupQuery(PeerId), } @@ -841,31 +839,6 @@ where libp2p_metrics().record(&e); Ok(None) } - Event::Gossipsub(e) => { - libp2p_metrics().record(&e); - if let libp2p::gossipsub::Event::Message { - propagation_source, - message_id, - message, - } = e - { - self.emit_network_event(NetworkEvent::Gossipsub(GossipsubEvent::Message { - from: propagation_source, - id: message_id, - message, - })); - } else if let libp2p::gossipsub::Event::Subscribed { peer_id, topic } = e { - self.emit_network_event(NetworkEvent::Gossipsub(GossipsubEvent::Subscribed { - peer_id, - topic, - })); - } else if let libp2p::gossipsub::Event::Unsubscribed { peer_id, topic } = e { - self.emit_network_event(NetworkEvent::Gossipsub( - GossipsubEvent::Unsubscribed { peer_id, topic }, - )); - } - Ok(None) - } Event::Mdns(e) => { match e { mdns::Event::Discovered(peers) => { @@ -1116,79 +1089,6 @@ where .send(()) .map_err(|_| anyhow!("sender dropped"))?; } - RpcMessage::Gossipsub(g) => { - let gossipsub = match self.swarm.behaviour_mut().gossipsub.as_mut() { - Some(gossipsub) => gossipsub, - None => { - tracing::warn!("Unexpected gossipsub message"); - return Ok(false); - } - }; - match g { - rpc::GossipsubMessage::AddExplicitPeer(response_channel, peer_id) => { - gossipsub.add_explicit_peer(&peer_id); - response_channel - .send(()) - .map_err(|_| anyhow!("sender dropped"))?; - } - rpc::GossipsubMessage::AllMeshPeers(response_channel) => { - let peers = gossipsub.all_mesh_peers().copied().collect(); - response_channel - .send(peers) - .map_err(|_| anyhow!("sender dropped"))?; - } - rpc::GossipsubMessage::AllPeers(response_channel) => { - let all_peers = gossipsub - .all_peers() - .map(|(p, t)| (*p, t.into_iter().cloned().collect())) - .collect(); - response_channel - .send(all_peers) - .map_err(|_| anyhow!("sender dropped"))?; - } - rpc::GossipsubMessage::MeshPeers(response_channel, topic_hash) => { - let peers = gossipsub.mesh_peers(&topic_hash).copied().collect(); - response_channel - .send(peers) - .map_err(|_| anyhow!("sender dropped"))?; - } - rpc::GossipsubMessage::Publish(response_channel, topic_hash, bytes) => { - let res = gossipsub - .publish(IdentTopic::new(topic_hash.into_string()), bytes.to_vec()); - response_channel - .send(res) - .map_err(|_| anyhow!("sender dropped"))?; - } - rpc::GossipsubMessage::RemoveExplicitPeer(response_channel, peer_id) => { - gossipsub.remove_explicit_peer(&peer_id); - response_channel - .send(()) - .map_err(|_| anyhow!("sender dropped"))?; - } - rpc::GossipsubMessage::Subscribe(response_channel, topic_hash) => { - let t = IdentTopic::new(topic_hash.into_string()); - let res = gossipsub - .subscribe(&t) - .map(|_| self.network_events()) - .map_err(anyhow::Error::new); - response_channel - .send(res) - .map_err(|_| anyhow!("sender dropped"))?; - } - rpc::GossipsubMessage::Topics(response_channel) => { - let topics = gossipsub.topics().cloned().collect(); - response_channel - .send(topics) - .map_err(|_| anyhow!("sender dropped"))?; - } - rpc::GossipsubMessage::Unsubscribe(response_channel, topic_hash) => { - let res = gossipsub.unsubscribe(&IdentTopic::new(topic_hash.into_string())); - response_channel - .send(res) - .map_err(|_| anyhow!("sender dropped"))?; - } - } - } RpcMessage::ListenForIdentify(response_channel, peer_id) => { let channels = self.lookup_queries.entry(peer_id).or_default(); channels.push(response_channel); @@ -1286,8 +1186,7 @@ mod tests { use crate::keys::Keypair; use async_trait::async_trait; - use bytes::Bytes; - use futures::{future, TryStreamExt}; + use futures::TryStreamExt; use rand::prelude::*; use rand_chacha::ChaCha8Rng; use recon::Sha256a; @@ -1723,148 +1622,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_gossipsub() -> Result<()> { - let mut test_runner_a = TestRunnerBuilder::new().no_bootstrap().build().await?; - // peer_id 12D3KooWLo6JTNKXfjkZtKf8ooLQoXVXUEeuu4YDY3CYqK6rxHXt - let test_runner_b = TestRunnerBuilder::new() - .no_bootstrap() - .with_seed(ChaCha8Rng::from_seed([0; 32])) - .build() - .await?; - let addrs_b = vec![test_runner_b.addr.clone()]; - - test_runner_a - .client - .connect(test_runner_b.peer_id, addrs_b) - .await?; - - match test_runner_a.network_events.recv().await { - Some(NetworkEvent::PeerConnected(peer_id)) => { - assert_eq!(test_runner_b.peer_id, peer_id); - } - Some(n) => { - anyhow::bail!("unexpected network event: {:?}", n); - } - None => { - anyhow::bail!("expected NetworkEvent::PeerConnected, received no event"); - } - }; - let peers = test_runner_a.client.gossipsub_all_peers().await?; - assert!(peers.len() == 1); - let got_peer = peers.get(0).unwrap(); - assert_eq!(test_runner_b.peer_id, got_peer.0); - - // create topic - let topic = libp2p::gossipsub::TopicHash::from_raw("test_topic"); - // subscribe both to same topic - let mut subscription_a = test_runner_a - .client - .gossipsub_subscribe(topic.clone()) - .await?; - let subscription_b = test_runner_b - .client - .gossipsub_subscribe(topic.clone()) - .await?; - - // Spawn a task to read all messages from b, but ignore them. - // This ensures the subscription request is actually processed. - tokio::task::spawn(subscription_b.for_each(|_| future::ready(()))); - - match subscription_a.next().await { - Some(Ok(GossipsubEvent::Subscribed { - peer_id, - topic: subscribed_topic, - })) => { - assert_eq!(test_runner_b.peer_id, peer_id); - assert_eq!(topic, subscribed_topic); - } - Some(n) => { - anyhow::bail!( - "unexpected network event, expecting a GossipsubEvent::Subscribed, got: {:?}", - n - ); - } - None => { - anyhow::bail!("expected GossipsubEvent::Subscribed, received no event"); - } - }; - - let peers = test_runner_a.client.gossipsub_all_peers().await?; - assert!(peers.len() == 1); - let got_peer = peers.get(0).unwrap(); - assert_eq!(test_runner_b.peer_id, got_peer.0); - assert_eq!(&topic, got_peer.1.get(0).unwrap()); - - // get mesh peer for topic - let peers = test_runner_a - .client - .gossipsub_mesh_peers(topic.clone()) - .await?; - - assert!(peers.len() == 1); - assert_eq!(test_runner_b.peer_id, *peers.get(0).unwrap()); - - let peers = test_runner_a.client.gossipsub_all_mesh_peers().await?; - assert!(peers.len() == 1); - assert_eq!(&test_runner_b.peer_id, peers.get(0).unwrap()); - - let msg = Bytes::from(&b"hello world!"[..]); - test_runner_b - .client - .gossipsub_publish(topic.clone(), msg.clone()) - .await?; - - match subscription_a.next().await { - Some(Ok(GossipsubEvent::Message { from, message, .. })) => { - assert_eq!(test_runner_b.peer_id, from); - assert_eq!(topic, message.topic); - assert_eq!(test_runner_b.peer_id, message.source.unwrap()); - assert_eq!(msg.to_vec(), message.data); - } - Some(Ok(n)) => { - anyhow::bail!( - "unexpected network event, expecting a GossipsubEvent::Message, got: {:?}", - n - ); - } - Some(Err(e)) => { - anyhow::bail!("unexpected network error: {:?}", e); - } - None => { - anyhow::bail!("expected GossipsubEvent::Message, received no event"); - } - }; - - test_runner_b - .client - .gossipsub_unsubscribe(topic.clone()) - .await?; - - match subscription_a.next().await { - Some(Ok(GossipsubEvent::Unsubscribed { - peer_id, - topic: unsubscribe_topic, - })) => { - assert_eq!(test_runner_b.peer_id, peer_id); - assert_eq!(topic, unsubscribe_topic); - } - Some(Ok(n)) => { - anyhow::bail!( - "unexpected network event, expecting a GossipsubEvent::Unsubscribed, got: {:?}", - n - ); - } - Some(Err(e)) => { - anyhow::bail!("unexpected network error: {:?}", e); - } - None => { - anyhow::bail!("expected NetworkEvent::Gossipsub(Unsubscribed), received no event"); - } - }; - Ok(()) - } - #[test(tokio::test)] async fn test_dht() -> Result<()> { // set up three nodes diff --git a/p2p/src/rpc.rs b/p2p/src/rpc.rs index 0e565e8a0..29fe764d5 100644 --- a/p2p/src/rpc.rs +++ b/p2p/src/rpc.rs @@ -1,10 +1,9 @@ use anyhow::{anyhow, ensure, Context, Result}; -use bytes::Bytes; use cid::Cid; use futures::StreamExt; use futures::{ stream::{BoxStream, Stream}, - FutureExt, TryFutureExt, + TryFutureExt, }; use iroh_bitswap::Block; use iroh_rpc_client::{ @@ -15,34 +14,25 @@ use iroh_rpc_types::{ BitswapRequest, BitswapResponse, ConnectByPeerIdRequest, ConnectRequest, DisconnectRequest, ExternalAddrsRequest, ExternalAddrsResponse, FetchProvidersDhtRequest, FetchProvidersDhtResponse, GetListeningAddrsRequest, GetListeningAddrsResponse, - GetPeersRequest, GetPeersResponse, GossipsubAddExplicitPeerRequest, - GossipsubAllMeshPeersRequest, GossipsubAllPeersRequest, GossipsubAllPeersResponse, - GossipsubMeshPeersRequest, GossipsubPeersResponse, GossipsubPublishRequest, - GossipsubPublishResponse, GossipsubRemoveExplicitPeerRequest, GossipsubSubscribeRequest, - GossipsubSubscribeResponse, GossipsubTopicsRequest, GossipsubTopicsResponse, - GossipsubUnsubscribeRequest, GossipsubUnsubscribeResponse, ListenersRequest, - ListenersResponse, LocalPeerIdRequest, LocalPeerIdResponse, LookupLocalRequest, - LookupRequest, LookupResponse, NotifyNewBlocksBitswapRequest, P2pAddr, P2pRequest, - P2pService, ShutdownRequest, StartProvidingRequest, StopProvidingRequest, - StopSessionBitswapRequest, + GetPeersRequest, GetPeersResponse, ListenersRequest, ListenersResponse, LocalPeerIdRequest, + LocalPeerIdResponse, LookupLocalRequest, LookupRequest, LookupResponse, + NotifyNewBlocksBitswapRequest, P2pAddr, P2pRequest, P2pService, ShutdownRequest, + StartProvidingRequest, StopProvidingRequest, StopSessionBitswapRequest, }, RpcError, RpcResult, VersionRequest, VersionResponse, WatchRequest, WatchResponse, }; use libp2p::identify::Info as IdentifyInfo; +use libp2p::kad::RecordKey; use libp2p::Multiaddr; use libp2p::PeerId; -use libp2p::{ - gossipsub::{MessageId, PublishError, TopicHash}, - kad::RecordKey, -}; use std::collections::{HashMap, HashSet}; use std::result; -use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::oneshot; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, info, trace}; -use crate::{GossipsubEvent, GossipsubEventStream, NetworkEvent, DEFAULT_PROVIDER_LIMIT, VERSION}; +use crate::{DEFAULT_PROVIDER_LIMIT, VERSION}; #[derive(Clone)] pub(crate) struct P2p { @@ -410,171 +400,6 @@ impl P2p { let lookup = r.await?; Ok(peer_info_from_lookup(lookup)) } - - #[tracing::instrument(skip(self))] - fn gossipsub_subscribe(self, req: GossipsubSubscribeRequest) -> GossipsubEventStream { - async move { - self.gossipsub_subscribe_0(req) - .await - .expect("FIX THIS - NEEDS TO BE ABLE TO RETURN A RESULT") - } - .flatten_stream() - .boxed() - } - - #[tracing::instrument(skip(self))] - async fn gossipsub_subscribe_0( - self, - req: GossipsubSubscribeRequest, - ) -> Result>> { - let t = TopicHash::from_raw(req.topic_hash); - let (s, r) = oneshot::channel(); - self.sender - .send(RpcMessage::Gossipsub(GossipsubMessage::Subscribe( - s, - t.clone(), - ))) - .await?; - - let mut r = r.await??; - let stream = async_stream::stream! { - while let Some(network_event) = r.recv().await { - if let NetworkEvent::Gossipsub(event) = network_event { - match &event { - GossipsubEvent::Subscribed { topic, .. } | - GossipsubEvent::Unsubscribed {topic, .. } | - GossipsubEvent::Message { message: libp2p::gossipsub::Message{ topic , .. }, .. } => { - if *topic == t { - yield Box::new(GossipsubSubscribeResponse {event}); - } - }, - }; - } - } - } - .boxed(); - Ok(stream) - } - - #[tracing::instrument(skip(self, req))] - async fn gossipsub_add_explicit_peer(self, req: GossipsubAddExplicitPeerRequest) -> Result<()> { - let (s, r) = oneshot::channel(); - let msg = RpcMessage::Gossipsub(GossipsubMessage::AddExplicitPeer(s, req.peer_id)); - self.sender.send(msg).await?; - r.await?; - - Ok(()) - } - - #[tracing::instrument(skip(self))] - async fn gossipsub_all_mesh_peers( - self, - _: GossipsubAllMeshPeersRequest, - ) -> Result { - let (s, r) = oneshot::channel(); - let msg = RpcMessage::Gossipsub(GossipsubMessage::AllMeshPeers(s)); - self.sender.send(msg).await?; - let peers = r.await?; - Ok(GossipsubPeersResponse { peers }) - } - - #[tracing::instrument(skip(self))] - async fn gossipsub_all_peers( - self, - _: GossipsubAllPeersRequest, - ) -> Result { - let (s, r) = oneshot::channel(); - let msg = RpcMessage::Gossipsub(GossipsubMessage::AllPeers(s)); - self.sender.send(msg).await?; - - let all_peers = r.await?; - let all = all_peers - .into_iter() - .map(|(peer_id, topics)| { - ( - peer_id, - topics.into_iter().map(|t| t.into_string()).collect(), - ) - }) - .collect(); - - Ok(GossipsubAllPeersResponse { all }) - } - - #[tracing::instrument(skip(self, req))] - async fn gossipsub_mesh_peers( - self, - req: GossipsubMeshPeersRequest, - ) -> Result { - let topic = TopicHash::from_raw(req.topic_hash); - let (s, r) = oneshot::channel(); - let msg = RpcMessage::Gossipsub(GossipsubMessage::MeshPeers(s, topic)); - self.sender.send(msg).await?; - - let peers = r.await?; - Ok(GossipsubPeersResponse { peers }) - } - - #[tracing::instrument(skip(self, req))] - async fn gossipsub_publish( - self, - req: GossipsubPublishRequest, - ) -> Result { - let data = req.data; - let topic_hash = TopicHash::from_raw(req.topic_hash); - let (s, r) = oneshot::channel(); - let msg = RpcMessage::Gossipsub(GossipsubMessage::Publish(s, topic_hash, data)); - self.sender.send(msg).await?; - - let message_id = r.await??; - - Ok(GossipsubPublishResponse { - message_id: message_id.0.into(), - }) - } - - #[tracing::instrument(skip(self, req))] - async fn gossipsub_remove_explicit_peer( - self, - req: GossipsubRemoveExplicitPeerRequest, - ) -> Result<()> { - let peer_id = req.peer_id; - let (s, r) = oneshot::channel(); - let msg = RpcMessage::Gossipsub(GossipsubMessage::RemoveExplicitPeer(s, peer_id)); - self.sender.send(msg).await?; - - r.await?; - Ok(()) - } - - #[tracing::instrument(skip(self))] - async fn gossipsub_topics(self, _: GossipsubTopicsRequest) -> Result { - let (s, r) = oneshot::channel(); - let msg = RpcMessage::Gossipsub(GossipsubMessage::Topics(s)); - - self.sender.send(msg).await?; - - let topics: Vec = r.await?.into_iter().map(|t| t.into_string()).collect(); - - Ok(GossipsubTopicsResponse { topics }) - } - - #[tracing::instrument(skip(self, req))] - async fn gossipsub_unsubscribe( - self, - req: GossipsubUnsubscribeRequest, - ) -> Result { - let (s, r) = oneshot::channel(); - let msg = RpcMessage::Gossipsub(GossipsubMessage::Unsubscribe( - s, - TopicHash::from_raw(req.topic_hash), - )); - - self.sender.send(msg).await?; - let was_subscribed = r.await??; - - Ok(GossipsubUnsubscribeResponse { was_subscribed }) - } } /// dispatch a single request from the server @@ -586,15 +411,6 @@ async fn dispatch(s: P2pServer, req: P2pRequest, chan: ServerSocket, Version(req) => s.rpc(req, chan, target, P2p::version).await, Shutdown(req) => s.rpc_map_err(req, chan, target, P2p::shutdown).await, FetchBitswap(req) => s.rpc_map_err(req, chan, target, P2p::fetch_bitswap).await, - GossipsubAddExplicitPeer(req) => s.rpc_map_err(req, chan, target, P2p::gossipsub_add_explicit_peer).await, - GossipsubAllPeers(req) => s.rpc_map_err(req, chan, target, P2p::gossipsub_all_peers).await, - GossipsubMeshPeers(req) => s.rpc_map_err(req, chan, target, P2p::gossipsub_mesh_peers).await, - GossipsubAllMeshPeers(req) => s.rpc_map_err(req, chan, target, P2p::gossipsub_all_mesh_peers).await, - GossipsubPublish(req) => s.rpc_map_err(req, chan, target, P2p::gossipsub_publish).await, - GossipsubRemoveExplicitPeer(req) => s.rpc_map_err(req, chan, target, P2p::gossipsub_remove_explicit_peer).await, - GossipsubSubscribe(req) => s.server_streaming(req, chan, target, P2p::gossipsub_subscribe).await, - GossipsubTopics(req) => s.rpc_map_err(req, chan, target, P2p::gossipsub_topics).await, - GossipsubUnsubscribe(req) => s.rpc_map_err(req, chan, target, P2p::gossipsub_unsubscribe).await, StopSessionBitswap(req) => s.rpc_map_err(req, chan, target, P2p::stop_session_bitswap).await, StartProviding(req) => s.rpc_map_err(req, chan, target, P2p::start_providing).await, StopProviding(req) => s.rpc_map_err(req, chan, target, P2p::stop_providing).await, @@ -689,7 +505,6 @@ pub enum RpcMessage { NetConnectByPeerId(oneshot::Sender>, PeerId), NetConnect(oneshot::Sender>, PeerId, Vec), NetDisconnect(oneshot::Sender<()>, PeerId), - Gossipsub(GossipsubMessage), FindPeerOnDHT(oneshot::Sender>, PeerId), LookupPeerInfo(oneshot::Sender>, PeerId), ListenForIdentify(oneshot::Sender>, PeerId), @@ -698,23 +513,3 @@ pub enum RpcMessage { LookupLocalPeerInfo(oneshot::Sender), Shutdown, } - -#[derive(Debug)] -pub enum GossipsubMessage { - AddExplicitPeer(oneshot::Sender<()>, PeerId), - AllMeshPeers(oneshot::Sender>), - AllPeers(oneshot::Sender)>>), - MeshPeers(oneshot::Sender>, TopicHash), - Publish( - oneshot::Sender>, - TopicHash, - Bytes, - ), - RemoveExplicitPeer(oneshot::Sender<()>, PeerId), - Subscribe( - oneshot::Sender>>, - TopicHash, - ), - Topics(oneshot::Sender>), - Unsubscribe(oneshot::Sender>, TopicHash), -}