Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove all usage of gossipsub #209

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 2 additions & 36 deletions beetle/iroh-rpc-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

use crate::config::Config;
use crate::gateway::GatewayClient;
use crate::network::P2pClient;
use crate::status::{ClientStatus, ServiceStatus, ServiceType};
use crate::store::StoreClient;
Expand All @@ -11,7 +10,6 @@ use futures::{Stream, StreamExt};

#[derive(Debug, Clone)]
pub struct Client {
pub gateway: Option<GatewayClient>,
p2p: P2pLBClient,
store: StoreLBClient,
}
Expand Down Expand Up @@ -85,22 +83,11 @@ impl P2pLBClient {
impl Client {
pub async fn new(cfg: Config) -> Result<Self> {
let Config {
gateway_addr,
p2p_addr,
store_addr,
channels,
} = cfg;

let gateway = if let Some(addr) = gateway_addr {
Some(
GatewayClient::new(addr)
.await
.context("Could not create gateway rpc client")?,
)
} else {
None
};

let n_channels = channels.unwrap_or(1);

let mut p2p = P2pLBClient::new();
Expand All @@ -123,34 +110,18 @@ impl Client {
}
}

Ok(Client {
gateway,
p2p,
store,
})
Ok(Client { p2p, store })
}

pub fn try_p2p(&self) -> Result<P2pClient> {
self.p2p.get().context("missing rpc p2p connnection")
}

pub fn try_gateway(&self) -> Result<&GatewayClient> {
self.gateway
.as_ref()
.context("missing rpc gateway connnection")
}

pub fn try_store(&self) -> Result<StoreClient> {
self.store.get().context("missing rpc store connection")
}

pub async fn check(&self) -> crate::status::ClientStatus {
let g = if let Some(ref g) = self.gateway {
let (s, v) = g.check().await;
Some(ServiceStatus::new(ServiceType::Gateway, s, v))
} else {
None
};
let p = if let Some(ref p) = self.p2p.get() {
let (s, v) = p.check().await;
Some(ServiceStatus::new(ServiceType::P2p, s, v))
Expand All @@ -163,19 +134,14 @@ impl Client {
} else {
None
};
ClientStatus::new(g, p, s)
ClientStatus::new(p, s)
}

pub async fn watch(self) -> impl Stream<Item = ClientStatus> {
async_stream::stream! {
let mut status: ClientStatus = Default::default();
let mut streams = Vec::new();

if let Some(ref g) = self.gateway {
let g = g.watch().await;
let g = g.map(|(status, version)| ServiceStatus::new(ServiceType::Gateway, status, version));
streams.push(g.boxed());
}
if let Some(ref p) = self.p2p.get() {
let p = p.watch().await;
let p = p.map(|(status, version)| ServiceStatus::new(ServiceType::P2p, status, version));
Expand Down
5 changes: 1 addition & 4 deletions beetle/iroh-rpc-client/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use iroh_rpc_types::{gateway::GatewayAddr, p2p::P2pAddr, store::StoreAddr};
use iroh_rpc_types::{p2p::P2pAddr, store::StoreAddr};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
/// Config for the rpc Client.
pub struct Config {
/// Gateway rpc address.
pub gateway_addr: Option<GatewayAddr>,
/// P2p rpc address.
pub p2p_addr: Option<P2pAddr>,
/// Store rpc address.
Expand All @@ -20,7 +18,6 @@ pub struct Config {
impl Config {
pub fn default_network() -> Self {
Self {
gateway_addr: Some("irpc://127.0.0.1:4400".parse().unwrap()),
p2p_addr: Some("irpc://127.0.0.1:4401".parse().unwrap()),
store_addr: Some("irpc://127.0.0.1:4402".parse().unwrap()),
// disable load balancing by default by just having 1 channel
Expand Down
59 changes: 0 additions & 59 deletions beetle/iroh-rpc-client/src/gateway.rs

This file was deleted.

5 changes: 1 addition & 4 deletions beetle/iroh-rpc-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
pub mod client;
pub mod config;
pub mod gateway;
pub mod network;
pub mod status;
pub mod store;
pub use self::config::Config;
pub use client::Client;
pub use iroh_rpc_types::GossipsubEvent;
use iroh_rpc_types::{gateway::GatewayService, p2p::P2pService, store::StoreService, Addr};
use iroh_rpc_types::{p2p::P2pService, store::StoreService, Addr};
pub use network::{Lookup, P2pClient};
use quic_rpc::{
transport::{combined, http2, CombinedChannelTypes, Http2ChannelTypes, MemChannelTypes},
Expand Down Expand Up @@ -36,7 +34,6 @@ pub type ServerSocket<S: Service, C: quic_rpc::ChannelTypes = ChannelTypes> =
(C::SendSink<S::Res>, C::RecvStream<S::Req>);

pub type StoreServer = RpcServer<StoreService, ChannelTypes>;
pub type GatewayServer = RpcServer<GatewayService, ChannelTypes>;
pub type P2pServer = RpcServer<P2pService, ChannelTypes>;

pub async fn create_server<S: Service>(
Expand Down
94 changes: 1 addition & 93 deletions beetle/iroh-rpc-client/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use async_stream::stream;
use bytes::Bytes;
use cid::Cid;
use futures::{Stream, StreamExt};
use iroh_rpc_types::{p2p::*, GossipsubEvent, VersionRequest, WatchRequest};
use libp2p::gossipsub::{MessageId, TopicHash};
use iroh_rpc_types::{p2p::*, VersionRequest, WatchRequest};
use libp2p::{Multiaddr, PeerId};
use std::collections::{HashMap, HashSet};
use tracing::{debug, warn};
Expand Down Expand Up @@ -185,97 +184,6 @@ impl P2pClient {
Ok(())
}

#[tracing::instrument(skip(self))]
pub async fn gossipsub_add_explicit_peer(&self, peer_id: PeerId) -> Result<()> {
self.client
.rpc(GossipsubAddExplicitPeerRequest { peer_id })
.await??;
Ok(())
}

#[tracing::instrument(skip(self))]
pub async fn gossipsub_all_mesh_peers(&self) -> Result<Vec<PeerId>> {
let res = self.client.rpc(GossipsubAllMeshPeersRequest).await??;
Ok(res.peers)
}

#[tracing::instrument(skip(self))]
pub async fn gossipsub_all_peers(&self) -> Result<Vec<(PeerId, Vec<TopicHash>)>> {
let res = self.client.rpc(GossipsubAllPeersRequest).await??;
let res = res
.all
.into_iter()
.map(|(peer_id, topics)| {
let topics = topics.into_iter().map(TopicHash::from_raw).collect();
Ok((peer_id, topics))
})
.collect::<anyhow::Result<_>>()?;
Ok(res)
}

#[tracing::instrument(skip(self))]
pub async fn gossipsub_mesh_peers(&self, topic: TopicHash) -> Result<Vec<PeerId>> {
let res = self
.client
.rpc(GossipsubMeshPeersRequest {
topic_hash: topic.to_string(),
})
.await??;
Ok(res.peers)
}

#[tracing::instrument(skip(self, data))]
pub async fn gossipsub_publish(&self, topic_hash: TopicHash, data: Bytes) -> Result<MessageId> {
let req = GossipsubPublishRequest {
topic_hash: topic_hash.to_string(),
data,
};
let res = self.client.rpc(req).await??;
let message_id = MessageId::new(&res.message_id);
Ok(message_id)
}

#[tracing::instrument(skip(self))]
pub async fn gossipsub_remove_explicit_peer(&self, peer_id: PeerId) -> Result<()> {
let req = GossipsubRemoveExplicitPeerRequest { peer_id };
self.client.rpc(req).await??;
Ok(())
}

#[tracing::instrument(skip(self))]
pub async fn gossipsub_subscribe(
&self,
topic: TopicHash,
) -> Result<impl Stream<Item = Result<GossipsubEvent>>> {
let res = self
.client
.server_streaming(GossipsubSubscribeRequest {
topic_hash: topic.to_string(),
})
.await?;
let events = res.map(|e| {
let e = e?.event;
Ok(e)
});
Ok(events)
}

#[tracing::instrument(skip(self))]
pub async fn gossipsub_topics(&self) -> Result<Vec<TopicHash>> {
let res = self.client.rpc(GossipsubTopicsRequest).await??;
let topics = res.topics.into_iter().map(TopicHash::from_raw).collect();
Ok(topics)
}

#[tracing::instrument(skip(self))]
pub async fn gossipsub_unsubscribe(&self, topic: TopicHash) -> Result<bool> {
let req = GossipsubUnsubscribeRequest {
topic_hash: topic.to_string(),
};
let res = self.client.rpc(req).await??;
Ok(res.was_subscribed)
}

#[tracing::instrument(skip(self))]
pub async fn check(&self) -> (StatusType, String) {
match self.version().await {
Expand Down
Loading