Skip to content
This repository has been archived by the owner on Dec 18, 2023. It is now read-only.

Migrating PRs #292

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions iroh-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use iroh_resolver::resolver::Resolver;
use iroh_rpc_client::{Client, ClientStatus};
use iroh_unixfs::balanced_tree::DEFAULT_CODE;
use iroh_unixfs::{
builder::Entry as UnixfsEntry,
content_loader::{FullLoader, FullLoaderConfig},
Expand Down Expand Up @@ -182,15 +183,19 @@ impl Api {
) -> Result<BoxStream<'static, Result<(Cid, u64)>>> {
let blocks = match entry {
UnixfsEntry::File(f) => f.encode().await?.boxed(),
UnixfsEntry::Directory(d) => d.encode(),
UnixfsEntry::Directory(d) => d.encode(DEFAULT_CODE),
UnixfsEntry::Symlink(s) => Box::pin(async_stream::try_stream! {
yield s.encode()?
yield s.encode(DEFAULT_CODE)?
}),
UnixfsEntry::RawBlock(r) => Box::pin(async_stream::try_stream! {
yield r.encode()?
}),
};

Ok(Box::pin(
add_blocks_to_store(Some(self.client.clone()), blocks).await,
))
Ok(Box::pin(add_blocks_to_store(
Some(self.client.clone()),
blocks,
)))
}

/// The `add` method encodes the entry into a DAG and adds the resulting
Expand Down
2 changes: 1 addition & 1 deletion iroh-api/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ fn add_blocks_to_store_chunked<S: Store>(
}
}

pub async fn add_blocks_to_store<S: Store>(
pub fn add_blocks_to_store<S: Store>(
store: Option<S>,
blocks: Pin<Box<dyn Stream<Item = Result<Block>> + Send>>,
) -> impl Stream<Item = Result<(Cid, u64)>> {
Expand Down
27 changes: 11 additions & 16 deletions iroh-bitswap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,16 @@ pub struct Bitswap<S: Store> {
_workers: Arc<Vec<JoinHandle<()>>>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
enum PeerState {
Connected(ConnectionId),
Responsive(ConnectionId, ProtocolId),
Unresponsive,
#[default]
Disconnected,
DialFailure(Instant),
}

impl Default for PeerState {
fn default() -> Self {
PeerState::Disconnected
}
}

impl PeerState {
fn is_connected(self) -> bool {
matches!(self, PeerState::Connected(_) | PeerState::Responsive(_, _))
Expand Down Expand Up @@ -146,7 +141,7 @@ impl<S: Store> Bitswap<S> {
};
let client = Client::new(network.clone(), store, cb, config.client).await;

let (sender_msg, mut receiver_msg) = mpsc::channel(2048);
let (sender_msg, mut receiver_msg) = mpsc::channel::<(PeerId, BitswapMessage)>(2048);
let (sender_con, mut receiver_con) = mpsc::channel(2048);
let (sender_dis, mut receiver_dis) = mpsc::channel(2048);

Expand All @@ -157,7 +152,13 @@ impl<S: Store> Bitswap<S> {

async move {
// process messages serially but without blocking the p2p loop
while let Some((peer, message)) = receiver_msg.recv().await {
while let Some((peer, mut message)) = receiver_msg.recv().await {
let message = tokio::task::spawn_blocking(move || {
message.verify_blocks();
message
})
.await
.expect("cannot spawn blocking thread");
if let Some(ref server) = server {
futures::future::join(
client.receive_message(&peer, &message),
Expand Down Expand Up @@ -490,14 +491,8 @@ impl<S: Store> NetworkBehaviour for Bitswap<S> {
}
}
}
HandlerEvent::Message {
mut message,
protocol,
} => {
// mark peer as responsive
HandlerEvent::Message { message, protocol } => {
self.set_peer_state(&peer_id, PeerState::Responsive(connection, protocol));

message.verify_blocks();
self.receive_message(peer_id, message);
}
HandlerEvent::FailedToSendMessage { .. } => {
Expand Down
4 changes: 2 additions & 2 deletions iroh-gateway/assets/404.html
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link rel="shortcut icon"
href="" />
<link rel="stylesheet" href="/style.css" />
<link rel="stylesheet" href="/icons.css">
<link rel="stylesheet" href="{{ public_url_base }}style.css"/>
<link rel="stylesheet" href="{{ public_url_base }}icons.css"/>
<title>{{ root_path }}</title>
</head>

Expand Down
4 changes: 2 additions & 2 deletions iroh-gateway/assets/dir_list.html
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
<meta name="image" content="https://gateway.ipfs.io/ipfs/QmSDeYAe9mga6NdTozAZuyGL3Q1XjsLtvX28XFxJH8oPjq">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link rel="shortcut icon" href="" />
<link rel="stylesheet" href="/style.css"/>
<link rel="stylesheet" href="/icons.css">
<link rel="stylesheet" href="{{ public_url_base }}style.css"/>
<link rel="stylesheet" href="{{ public_url_base }}icons.css"/>
<title>{{ root_path }}</title>
</head>
<body>
Expand Down
87 changes: 50 additions & 37 deletions iroh-gateway/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use anyhow::Result;
use bytes::Bytes;
use cid::Cid;
use futures::{StreamExt, TryStream};
use http::HeaderMap;
use http::{HeaderMap, StatusCode};
use iroh_car::{CarHeader, CarWriter};
use iroh_metrics::{
core::{MObserver, MRecorder},
Expand All @@ -15,15 +15,18 @@ use iroh_metrics::{
resolver::OutMetrics,
};
use iroh_resolver::dns_resolver::Config;
use iroh_resolver::resolver::{CidOrDomain, Metadata, Out, OutPrettyReader, OutType, Resolver};
use iroh_resolver::resolver::{Metadata, Out, OutPrettyReader, OutType, Resolver};
use iroh_unixfs::{content_loader::ContentLoader, Source};
use mime::Mime;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWrite};
use tokio_util::io::ReaderStream;
use tracing::{info, warn};

use crate::constants::RECURSION_LIMIT;
use crate::error::GatewayError;
use crate::handler_params::GetParams;
use crate::ipfs_request::IpfsRequest;
use crate::response::ResponseFormat;
use crate::{constants::RECURSION_LIMIT, handler_params::GetParams};

#[derive(Debug, Clone)]
pub struct Client<T: ContentLoader> {
Expand Down Expand Up @@ -95,26 +98,58 @@ impl<T: ContentLoader + std::marker::Unpin> Client<T> {
}
}

#[tracing::instrument(skip(self))]
pub async fn build_ipfs_request(
&self,
path: &iroh_resolver::resolver::Path,
query_params: &GetParams,
format: ResponseFormat,
subdomain_mode: bool,
) -> Result<IpfsRequest, GatewayError> {
info!("build ipfs request {}", path);
let path_metadata = match self
.retrieve_path_metadata(path.clone(), format == ResponseFormat::Raw)
.await
{
Ok(metadata) => metadata,
Err(e) => {
if e == "offline" {
return Err(GatewayError::new(StatusCode::SERVICE_UNAVAILABLE, &e));
} else if e.starts_with("failed to find") {
return Err(GatewayError::new(StatusCode::NOT_FOUND, &e));
} else {
return Err(GatewayError::new(StatusCode::INTERNAL_SERVER_ERROR, &e));
}
}
};
Ok(IpfsRequest {
format,
cid: path.root().clone(),
resolved_path: path.clone(),
query_params: query_params.clone(),
subdomain_mode,
path_metadata,
})
}

#[tracing::instrument(skip(self))]
pub async fn retrieve_path_metadata(
&self,
path: iroh_resolver::resolver::Path,
format: Option<ResponseFormat>,
raw_format: bool,
) -> Result<Out, String> {
info!("retrieve path metadata {}", path);
if let Some(f) = format {
if f == ResponseFormat::Raw {
return self
.resolver
.resolve_raw(path)
.await
.map_err(|e| e.to_string());
}
if raw_format {
self.resolver
.resolve_raw(path)
.await
.map_err(|e| e.to_string())
} else {
self.resolver.resolve(path).await.map_err(|e| e.to_string())
}
self.resolver.resolve(path).await.map_err(|e| e.to_string())
}

#[tracing::instrument(skip(self))]
#[tracing::instrument(skip(self, path_metadata))]
pub async fn get_file(
&self,
path: iroh_resolver::resolver::Path,
Expand All @@ -126,7 +161,7 @@ impl<T: ContentLoader + std::marker::Unpin> Client<T> {
let path_metadata = if let Some(path_metadata) = path_metadata {
path_metadata
} else {
self.retrieve_path_metadata(path.clone(), None).await?
self.retrieve_path_metadata(path.clone(), false).await?
};
let metadata = path_metadata.metadata().clone();
record_ttfb_metrics(start_time, &metadata.source);
Expand Down Expand Up @@ -241,28 +276,6 @@ impl<T: ContentLoader> Client<T> {
}
}

#[derive(Debug, Clone)]
pub struct IpfsRequest {
pub format: ResponseFormat,
pub cid: CidOrDomain,
pub resolved_path: iroh_resolver::resolver::Path,
pub query_file_name: String,
pub download: bool,
pub query_params: GetParams,
pub subdomain_mode: bool,
pub path_metadata: Out,
}

impl IpfsRequest {
pub fn request_path_for_redirection(&self) -> String {
if self.subdomain_mode {
self.resolved_path.to_relative_string()
} else {
self.resolved_path.to_string()
}
}
}

async fn fetch_car_recursive<T, W>(
resolver: &Resolver<T>,
path: iroh_resolver::resolver::Path,
Expand Down
6 changes: 3 additions & 3 deletions iroh-gateway/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ mod tests {
use iroh_rpc_client::Config as RpcClientConfig;
use iroh_rpc_types::store::StoreAddr;
use iroh_rpc_types::Addr;
use iroh_unixfs::balanced_tree::DEFAULT_CODE;
use iroh_unixfs::builder::{DirectoryBuilder, FileBuilder};
use iroh_unixfs::content_loader::{FullLoader, FullLoaderConfig};
use iroh_unixfs::unixfs::UnixfsNode;
Expand Down Expand Up @@ -214,13 +215,12 @@ mod tests {
.name(name)
.content_bytes(content.to_vec())
.build()
.await
.unwrap();
dir_builder = dir_builder.add_file(file);
}

let root_dir = dir_builder.build().await.unwrap();
let mut parts = root_dir.encode();
let root_dir = dir_builder.build().unwrap();
let mut parts = root_dir.encode(DEFAULT_CODE);
while let Some(part) = parts.next().await {
let (cid, bytes, links) = part.unwrap().into_parts();
cids.push(cid);
Expand Down
Loading