-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Conversation
It looks like @twittner hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement at the following URL: https://cla.parity.io Once you've signed, plesae reply to this thread with Many thanks, Parity Technologies CLA Bot |
[clabot:check] |
It looks like @twittner signed our Contributor License Agreement. 👍 Many thanks, Parity Technologies CLA Bot |
On `clone` the counter is incremented, on `drop` decremented. Once 0 we send `None` over the channel, expecting the background thread to end.
fef7c11
to
dc8ed22
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM other than the nitpicks. Although I really only browsed through the code for reasonable-ness, didn't dig too deep into all the logic around the read implementation and such.
util/fetch/src/client.rs
Outdated
break | ||
} | ||
} else { | ||
let body = self.body.take().expect("within this loop `self.body` is always defined"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tiny nitpick but we have a standardized format for these texts: https://wiki.parity.io/Coding-guide
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I updated the message.
util/fetch/src/client.rs
Outdated
fn it_should_fetch() { | ||
let client = Client::new().unwrap(); | ||
let abort = Abort::default().with_max_duration(Duration::from_secs(10)); | ||
let future = client.fetch("https://httpbin.org/drip?numbytes=3&duration=3&delay=1&code=200", abort); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there no library to mock responses like this locally so we don't have to have an internet connection to run tests? (And potentially fail tests because httpbin.org
is down or blocking us or something)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know of mockito which can replace some of these tests. However it requires RUST_TEST_THREADS=1
and certain test cases like timeouts are curently not supported AFAICT.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps just spawn an http server in tests? Or at least bind a TcpSocket to a random port and respond with some predefined data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the tests to run a local test server instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, couple of nitpicks as well.
|
||
fn fetch_with_abort(&self, url: &str, _abort: fetch::Abort) -> Self::Result { | ||
fn fetch(&self, url: &str, abort: fetch::Abort) -> Self::Result { | ||
let u = Url::parse(url).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps Url
crate should be re-exported from fetch
if it's required to instantiate some other public types (Response
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this may be convenient.
@@ -303,7 +307,7 @@ impl ContentFetcherHandler { | |||
Ok(ValidatorResponse::Streaming(stream)) => { | |||
trace!(target: "dapps", "Validation OK. Streaming response."); | |||
let (reading, response) = stream.into_response(); | |||
fetch2.process_and_forget(reading); | |||
pool.spawn(reading).forget(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That could probably be pass-through (Since we use BodyParser
to convert async->sync and later we use into_response()
to convert from sync->async). To be investigated in subsequent PRs.
util/fetch/src/client.rs
Outdated
/// Create a new fetch client. | ||
pub fn new() -> Result<Self, Error> { | ||
let startup_done = Arc::new((Mutex::new(Ok(())), Condvar::new())); | ||
let (tx_proto, rx_proto) = mpsc::channel(64); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be a bit too big for our use case, since every sender get's an additional slot anyway. Probably 16 would do.
util/fetch/src/client.rs
Outdated
impl Client { | ||
/// Create a new fetch client. | ||
pub fn new() -> Result<Self, Error> { | ||
let startup_done = Arc::new((Mutex::new(Ok(())), Condvar::new())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose std::sync::mpsc::channel/sync_channel()
would be a bit simpler here, and also supports timeouts.
return Err(Error::Aborted) | ||
} | ||
if let Some(next_url) = redirect_location(url, &resp) { | ||
if redirects >= abort.max_redirects() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to flatten the indentation here?
util/fetch/src/client.rs
Outdated
} | ||
match try_ready!(self.body.poll()) { | ||
None => Ok(Async::Ready(None)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mixed tabs and spaces
util/fetch/src/client.rs
Outdated
while self.body.is_some() { | ||
// Can we still read from the current chunk? | ||
if self.offset < self.chunk.len() { | ||
let k = cmp::min(self.chunk.len() - self.offset, buf.len() - m); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to use a bit more descriptive names for m
and k
util/fetch/src/client.rs
Outdated
} else { | ||
let body = self.body.take().expect("within this loop `self.body` is always defined"); | ||
match body.into_future().wait() { // wait for next chunk | ||
Err((e, _)) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bunch of more tabs and space mixture for identation.
util/fetch/src/client.rs
Outdated
self.offset += k; | ||
m += k; | ||
if m == buf.len() { | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would put return Ok(m)
here and instead of the other break
- thanks to this we don't need the Ok(m)
at the bottom
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Ok(m)
is still needed there, as the while
loop might theoretically not be entered at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, that's true. We could use loop
and if let Some(body) = self.body.take()
inside to avoid expect
?
util/fetch/src/client.rs
Outdated
fn it_should_fetch() { | ||
let client = Client::new().unwrap(); | ||
let abort = Abort::default().with_max_duration(Duration::from_secs(10)); | ||
let future = client.fetch("https://httpbin.org/drip?numbytes=3&duration=3&delay=1&code=200", abort); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps just spawn an http server in tests? Or at least bind a TcpSocket to a random port and respond with some predefined data.
util/fetch/src/client.rs
Outdated
self.offset += k; | ||
m += k; | ||
if m == buf.len() { | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, that's true. We could use loop
and if let Some(body) = self.body.take()
inside to avoid expect
?
🎉 |
The approach taken is to spawn a background thread running a tokio
Core
and hyperClient
, processing requests sent over ampsc
channel and send back responses overoneshot
channels. Hyper client itself does not implementSend
so this way we can keep theSend
bound of theFetch
trait itself and make this a local change.To accommodate existing synchronous uses of fetch responses an adapter
BodyReader
is provided which waits for response bodyChunk
s to arrive.Resolves #7829.