From 995e95296afa31a8fc9f07b4cc849412d2de0912 Mon Sep 17 00:00:00 2001 From: David Palm Date: Tue, 30 Mar 2021 14:48:04 +0200 Subject: [PATCH 1/5] =?UTF-8?q?Return=20close=20reason=20in=20Error::Close?= =?UTF-8?q?d(=E2=80=A6)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/connection.rs | 101 +++++++++++++++++++++++++++++++--------------- 1 file changed, 69 insertions(+), 32 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index d9b6dc5e..ebe11ee6 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -205,26 +205,28 @@ impl Receiver { let mut first_fragment_opcode = None; let mut length: usize = 0; let message_len = message.len(); + let mut close_reason: Option = None; loop { if self.is_closed { - log::debug!("{}: can not receive, connection is closed", self.id); - return Err(Error::Closed) + log::debug!("{}: cannot receive, connection is closed", self.id); + return Err(Error::Closed(close_reason)); } self.ctrl_buffer.clear(); let mut header = self.receive_header().await?; log::trace!("{}: recv: {}", self.id, header); - // Handle control frames. + // Handle control frames: PING, PONG and CLOSE. if header.opcode().is_control() { self.read_buffer(&header).await?; self.ctrl_buffer = self.buffer.split_to(header.payload_len()); base::Codec::apply_mask(&header, &mut self.ctrl_buffer); if header.opcode() == OpCode::Pong { - return Ok(Incoming::Pong(&self.ctrl_buffer[..])) + return Ok(Incoming::Pong(&self.ctrl_buffer[..])); } - self.on_control(&header).await?; - continue + close_reason = self.on_control(&header).await?; + // TODO: why not return Err::Closed(reason) here? + continue; } length = length.saturating_add(header.payload_len()); @@ -351,7 +353,7 @@ impl Receiver { } /// Answer incoming control frames. - async fn on_control(&mut self, header: &Header) -> Result<(), Error> { + async fn on_control(&mut self, header: &Header) -> Result, Error> { match header.opcode() { OpCode::Ping => { let mut answer = Header::new(OpCode::Pong); @@ -359,15 +361,17 @@ impl Receiver { let mut data = Storage::Unique(&mut self.ctrl_buffer); write(self.id, self.mode, &mut self.codec, &mut self.writer, &mut answer, &mut data, &mut unused).await?; self.flush().await?; - Ok(()) + Ok(None) } - OpCode::Pong => Ok(()), + OpCode::Pong => Ok(None), OpCode::Close => { + log::trace!("Acknowledging close to sender"); self.is_closed = true; - let (mut header, code) = close_answer(&self.ctrl_buffer)?; + let (mut header, reason) = close_answer(&self.ctrl_buffer)?; + // Write back a Close frame let mut unused = Vec::new(); - if let Some(c) = code { - let mut data = c.to_be_bytes(); + if let Some(CloseReason { code, ..} ) = reason { + let mut data = code.to_be_bytes(); let mut data = Storage::Unique(&mut data); write(self.id, self.mode, &mut self.codec, &mut self.writer, &mut header, &mut data, &mut unused).await? } else { @@ -375,7 +379,8 @@ impl Receiver { write(self.id, self.mode, &mut self.codec, &mut self.writer, &mut header, &mut data, &mut unused).await? } self.flush().await?; - self.writer.lock().await.close().await.or(Err(Error::Closed)) + self.writer.lock().await.close().await.or(Err(Error::Closed(reason.clone())))?; // TODO: how to avoid the clone here? + Ok(reason) } OpCode::Binary | OpCode::Text @@ -411,7 +416,7 @@ impl Receiver { if self.is_closed { return Ok(()) } - self.writer.lock().await.flush().await.or(Err(Error::Closed)) + self.writer.lock().await.flush().await.or(Err(Error::Closed(None))) } } @@ -452,17 +457,18 @@ impl Sender { /// Flush the socket buffer. pub async fn flush(&mut self) -> Result<(), Error> { log::trace!("{}: flushing connection", self.id); - self.writer.lock().await.flush().await.or(Err(Error::Closed)) + self.writer.lock().await.flush().await.or(Err(Error::Closed(None))) } /// Send a close message and close the connection. + // TODO: send optional close reason pub async fn close(&mut self) -> Result<(), Error> { log::trace!("{}: closing connection", self.id); let mut header = Header::new(OpCode::Close); let code = 1000_u16.to_be_bytes(); // 1000 = normal closure self.write(&mut header, &mut Storage::Shared(&code[..])).await?; self.flush().await?; - self.writer.lock().await.close().await.or(Err(Error::Closed)) + self.writer.lock().await.close().await.or(Err(Error::Closed(None))) } /// Send arbitrary websocket frames. @@ -511,10 +517,10 @@ async fn write let header_bytes = codec.encode_header(&header); let mut w = writer.lock().await; - w.write_all(&header_bytes).await.or(Err(Error::Closed))?; + w.write_all(&header_bytes).await.or(Err(Error::Closed(None)))?; if !header.is_masked() { - return w.write_all(data.as_ref()).await.or(Err(Error::Closed)) + return w.write_all(data.as_ref()).await.or(Err(Error::Closed(None))) } match data { @@ -522,33 +528,46 @@ async fn write mask_buffer.clear(); mask_buffer.extend_from_slice(slice); base::Codec::apply_mask(header, mask_buffer); - w.write_all(mask_buffer).await.or(Err(Error::Closed)) + w.write_all(mask_buffer).await.or(Err(Error::Closed(None))) } Storage::Unique(slice) => { base::Codec::apply_mask(header, slice); - w.write_all(slice).await.or(Err(Error::Closed)) + w.write_all(slice).await.or(Err(Error::Closed(None))) } Storage::Owned(ref mut bytes) => { base::Codec::apply_mask(header, bytes); - w.write_all(bytes).await.or(Err(Error::Closed)) + w.write_all(bytes).await.or(Err(Error::Closed(None))) } } } -/// Create a close frame based on the given data. -fn close_answer(data: &[u8]) -> Result<(Header, Option), Error> { +/// Create a close frame based on the given data. The close frame is echoed back +/// to the sender. +fn close_answer(data: &[u8]) -> Result<(Header, Option), Error> { let answer = Header::new(OpCode::Close); if data.len() < 2 { - return Ok((answer, None)) + return Ok((answer, None)); } - std::str::from_utf8(&data[2 ..])?; // check reason is properly encoded + // Check that the reason string is properly encoded + let descr = std::str::from_utf8(&data[2..])?.into(); let code = u16::from_be_bytes([data[0], data[1]]); + let reason = CloseReason { code, descr: Some(descr) }; + log::trace!("Closing reason: {:?}", reason); + + // Status codes are defined in + // https://tools.ietf.org/html/rfc6455#section-7.4.1 and + // https://mailarchive.ietf.org/arch/msg/hybi/P_1vbD9uyHl63nbIIbFxKMfSwcM/ match code { | 1000 ..= 1003 | 1007 ..= 1011 + | 1012 // Service Restart + | 1013 // Try Again Later | 1015 - | 3000 ..= 4999 => Ok((answer, Some(code))), // acceptable codes - _ => Ok((answer, Some(1002))) // invalid code => protocol error (1002) + | 3000 ..= 4999 => Ok((answer, Some(reason))), // acceptable codes + _ => { + // invalid code => protocol error (1002) + Ok((answer, CloseReason::invalid())) + } } } @@ -569,7 +588,25 @@ pub enum Error { /// The total message payload data size exceeds the configured maximum. MessageTooLarge { current: usize, maximum: usize }, /// The connection is closed. - Closed + Closed(Option), +} + +/// Reason for closing the connection. +#[derive(Debug, Clone)] +pub struct CloseReason { + code: u16, + descr: Option, +} + +// TODO: does this carry its weight? +impl CloseReason { + fn invalid() -> Option { + Some(Self { + // protocol error + code: 1002, + descr: None, + }) + } } impl fmt::Display for Error { @@ -587,8 +624,8 @@ impl fmt::Display for Error { write!(f, "utf-8 error: {}", e), Error::MessageTooLarge { current, maximum } => write!(f, "message too large: len >= {}, maximum = {}", current, maximum), - Error::Closed => - f.write_str("connection closed") + Error::Closed(reason) => + write!(f, "connection closed (reason: {:?})", reason) } } } @@ -602,7 +639,7 @@ impl std::error::Error for Error { Error::Utf8(e) => Some(e), Error::UnexpectedOpCode(_) | Error::MessageTooLarge {..} - | Error::Closed + | Error::Closed(_) => None } } @@ -611,7 +648,7 @@ impl std::error::Error for Error { impl From for Error { fn from(e: io::Error) -> Self { if e.kind() == io::ErrorKind::UnexpectedEof { - Error::Closed + Error::Closed(None) } else { Error::Io(e) } From e8846649a3c00ba7ed8b498b2deaa2f24820b83a Mon Sep 17 00:00:00 2001 From: David Palm Date: Tue, 30 Mar 2021 15:04:29 +0200 Subject: [PATCH 2/5] Fix tests --- examples/autobahn_client.rs | 3 +-- examples/autobahn_server.rs | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/examples/autobahn_client.rs b/examples/autobahn_client.rs index 02612651..aa67e542 100644 --- a/examples/autobahn_client.rs +++ b/examples/autobahn_client.rs @@ -68,7 +68,7 @@ async fn run_case(n: usize) -> Result<(), BoxedError> { sender.send_text(std::str::from_utf8(&message)?).await?; sender.flush().await? } - Err(connection::Error::Closed) => return Ok(()), + Err(connection::Error::Closed(_)) => return Ok(()), Err(e) => return Err(e.into()) } } @@ -97,4 +97,3 @@ fn new_client(socket: TcpStream, path: &str) -> handshake::Client<'_, BufReader< client.add_extension(Box::new(deflate)); client } - diff --git a/examples/autobahn_server.rs b/examples/autobahn_server.rs index 1106df23..8c998c0d 100644 --- a/examples/autobahn_server.rs +++ b/examples/autobahn_server.rs @@ -50,7 +50,7 @@ async fn main() -> Result<(), BoxedError> { break } } - Err(connection::Error::Closed) => break, + Err(connection::Error::Closed(_)) => break, Err(e) => { log::error!("connection error: {}", e); break @@ -74,4 +74,3 @@ fn new_server<'a>(socket: TcpStream) -> handshake::Server<'a, BufReader Date: Tue, 30 Mar 2021 15:20:59 +0200 Subject: [PATCH 3/5] Return the close reason in the error directly, no need to loop again --- src/connection.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index ebe11ee6..2693fbee 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -205,11 +205,10 @@ impl Receiver { let mut first_fragment_opcode = None; let mut length: usize = 0; let message_len = message.len(); - let mut close_reason: Option = None; loop { if self.is_closed { log::debug!("{}: cannot receive, connection is closed", self.id); - return Err(Error::Closed(close_reason)); + return Err(Error::Closed(None)); } self.ctrl_buffer.clear(); @@ -224,8 +223,8 @@ impl Receiver { if header.opcode() == OpCode::Pong { return Ok(Incoming::Pong(&self.ctrl_buffer[..])); } - close_reason = self.on_control(&header).await?; - // TODO: why not return Err::Closed(reason) here? + self.on_control(&header).await?; + continue; } @@ -353,7 +352,11 @@ impl Receiver { } /// Answer incoming control frames. - async fn on_control(&mut self, header: &Header) -> Result, Error> { + /// `PING`: replied to immediately with a `PONG` + /// `PONG`: no action + /// `CLOSE`: replied to immediately with a `CLOSE`; returns an [`Error::Closed`] with the [`CloseReason`] + /// All other [`OpCode`]s return [`Error::UnexpectedOpCode`] + async fn on_control(&mut self, header: &Header) -> Result<(), Error> { match header.opcode() { OpCode::Ping => { let mut answer = Header::new(OpCode::Pong); @@ -361,9 +364,9 @@ impl Receiver { let mut data = Storage::Unique(&mut self.ctrl_buffer); write(self.id, self.mode, &mut self.codec, &mut self.writer, &mut answer, &mut data, &mut unused).await?; self.flush().await?; - Ok(None) + Ok(()) } - OpCode::Pong => Ok(None), + OpCode::Pong => Ok(()), OpCode::Close => { log::trace!("Acknowledging close to sender"); self.is_closed = true; @@ -380,7 +383,7 @@ impl Receiver { } self.flush().await?; self.writer.lock().await.close().await.or(Err(Error::Closed(reason.clone())))?; // TODO: how to avoid the clone here? - Ok(reason) + Err(Error::Closed(reason)) } OpCode::Binary | OpCode::Text From d8265930c384f44bb5e697d69b2e0d15929df1b3 Mon Sep 17 00:00:00 2001 From: David Palm Date: Wed, 31 Mar 2021 16:28:33 +0200 Subject: [PATCH 4/5] Resolve outstanding todos --- src/connection.rs | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 2693fbee..7c28c9ab 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -382,7 +382,7 @@ impl Receiver { write(self.id, self.mode, &mut self.codec, &mut self.writer, &mut header, &mut data, &mut unused).await? } self.flush().await?; - self.writer.lock().await.close().await.or(Err(Error::Closed(reason.clone())))?; // TODO: how to avoid the clone here? + self.writer.lock().await.close().await?; Err(Error::Closed(reason)) } OpCode::Binary @@ -464,7 +464,6 @@ impl Sender { } /// Send a close message and close the connection. - // TODO: send optional close reason pub async fn close(&mut self) -> Result<(), Error> { log::trace!("{}: closing connection", self.id); let mut header = Header::new(OpCode::Close); @@ -569,7 +568,7 @@ fn close_answer(data: &[u8]) -> Result<(Header, Option), Error> { | 3000 ..= 4999 => Ok((answer, Some(reason))), // acceptable codes _ => { // invalid code => protocol error (1002) - Ok((answer, CloseReason::invalid())) + Ok((answer, Some(CloseReason { code: 1002, descr: None}))) } } } @@ -595,23 +594,12 @@ pub enum Error { } /// Reason for closing the connection. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct CloseReason { code: u16, descr: Option, } -// TODO: does this carry its weight? -impl CloseReason { - fn invalid() -> Option { - Some(Self { - // protocol error - code: 1002, - descr: None, - }) - } -} - impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { From cf1e3c23c3aa0b8ae1a1d2924a2dd821a08407d4 Mon Sep 17 00:00:00 2001 From: David Date: Mon, 12 Apr 2021 15:28:00 +0200 Subject: [PATCH 5/5] Update src/connection.rs Co-authored-by: Pierre Krieger --- src/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connection.rs b/src/connection.rs index 7c28c9ab..0682c212 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -373,7 +373,7 @@ impl Receiver { let (mut header, reason) = close_answer(&self.ctrl_buffer)?; // Write back a Close frame let mut unused = Vec::new(); - if let Some(CloseReason { code, ..} ) = reason { + if let Some(CloseReason { code, .. }) = reason { let mut data = code.to_be_bytes(); let mut data = Storage::Unique(&mut data); write(self.id, self.mode, &mut self.codec, &mut self.writer, &mut header, &mut data, &mut unused).await?