Skip to content

Commit

Permalink
fix(client): close Pooled streams on sockopt error
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Nov 28, 2016
1 parent 4bc4f1f commit d5ffee2
Showing 1 changed file with 21 additions and 7 deletions.
28 changes: 21 additions & 7 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::fmt;
use std::io::{self, Read, Write};
use std::net::{SocketAddr, Shutdown};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};

use std::time::Duration;

Expand Down Expand Up @@ -130,7 +131,7 @@ impl<C: NetworkConnector<Stream=S>, S: NetworkStream + Send> NetworkConnector fo
};
Ok(PooledStream {
inner: Some(inner),
is_closed: false,
is_closed: AtomicBool::new(false),
pool: self.inner.clone(),
})
}
Expand All @@ -139,7 +140,7 @@ impl<C: NetworkConnector<Stream=S>, S: NetworkStream + Send> NetworkConnector fo
/// A Stream that will try to be returned to the Pool when dropped.
pub struct PooledStream<S> {
inner: Option<PooledStreamInner<S>>,
is_closed: bool,
is_closed: AtomicBool,
pool: Arc<Mutex<PoolImpl<S>>>,
}

Expand All @@ -148,7 +149,7 @@ impl<S> fmt::Debug for PooledStream<S> where S: fmt::Debug + 'static {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("PooledStream")
.field("inner", &self.inner)
.field("is_closed", &self.is_closed)
.field("is_closed", &self.is_closed.load(Ordering::Relaxed))
.field("pool", &self.pool)
.finish()
}
Expand Down Expand Up @@ -176,7 +177,7 @@ impl<S: NetworkStream> Read for PooledStream<S> {
// if the wrapped stream returns EOF (Ok(0)), that means the
// server has closed the stream. we must be sure this stream
// is dropped and not put back into the pool.
self.is_closed = true;
self.is_closed.store(true, Ordering::Relaxed);
Ok(0)
},
r => r
Expand All @@ -200,21 +201,33 @@ impl<S: NetworkStream> NetworkStream for PooledStream<S> {
#[inline]
fn peer_addr(&mut self) -> io::Result<SocketAddr> {
self.inner.as_mut().unwrap().stream.peer_addr()
.map_err(|e| {
self.is_closed.store(true, Ordering::Relaxed);
e
})
}

#[inline]
fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.inner.as_ref().unwrap().stream.set_read_timeout(dur)
.map_err(|e| {
self.is_closed.store(true, Ordering::Relaxed);
e
})
}

#[inline]
fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.inner.as_ref().unwrap().stream.set_write_timeout(dur)
.map_err(|e| {
self.is_closed.store(true, Ordering::Relaxed);
e
})
}

#[inline]
fn close(&mut self, how: Shutdown) -> io::Result<()> {
self.is_closed = true;
self.is_closed.store(true, Ordering::Relaxed);
self.inner.as_mut().unwrap().stream.close(how)
}

Expand All @@ -234,8 +247,9 @@ impl<S: NetworkStream> NetworkStream for PooledStream<S> {

impl<S> Drop for PooledStream<S> {
fn drop(&mut self) {
trace!("PooledStream.drop, is_closed={}", self.is_closed);
if !self.is_closed {
let is_closed = self.is_closed.load(Ordering::Relaxed);
trace!("PooledStream.drop, is_closed={}", is_closed);
if !is_closed {
self.inner.take().map(|inner| {
if let Ok(mut pool) = self.pool.lock() {
pool.reuse(inner.key.clone(), inner);
Expand Down

0 comments on commit d5ffee2

Please sign in to comment.