From b1126631f887aed92967654b6a1cc36b63a6ac7d Mon Sep 17 00:00:00 2001 From: Mingun Date: Fri, 12 Jul 2024 23:45:11 +0500 Subject: [PATCH] Implement ability to read data directly from the underlying reader Co-authored-by: Daniel Alley --- Changelog.md | 5 ++ src/reader/async_tokio.rs | 48 +++++++++++++- src/reader/mod.rs | 128 ++++++++++++++++++++++++++++++++++++++ tests/async-tokio.rs | 46 +++++++++++++- tests/issues.rs | 88 +++++++++++++++++++++++++- 5 files changed, 311 insertions(+), 4 deletions(-) diff --git a/Changelog.md b/Changelog.md index 97caff84..6f894176 100644 --- a/Changelog.md +++ b/Changelog.md @@ -15,10 +15,15 @@ ### New Features +- [#623]: Added `Reader::stream()` that can be used to read arbitrary data + from the inner reader while track position for XML reader. + ### Bug Fixes ### Misc Changes +[#623]: https://github.com/tafia/quick-xml/issues/623 + ## 0.36.0 -- 2024-07-08 diff --git a/src/reader/async_tokio.rs b/src/reader/async_tokio.rs index 2e75b43f..ac74e232 100644 --- a/src/reader/async_tokio.rs +++ b/src/reader/async_tokio.rs @@ -2,14 +2,17 @@ //! as underlying byte stream. This reader fully implements async/await so reading //! can use non-blocking I/O. -use tokio::io::{self, AsyncBufRead, AsyncBufReadExt}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, ReadBuf}; use crate::errors::{Error, Result, SyntaxError}; use crate::events::Event; use crate::name::{QName, ResolveResult}; use crate::parser::{ElementParser, Parser, PiParser}; use crate::reader::buffered_reader::impl_buffered_source; -use crate::reader::{BangType, NsReader, ParseState, ReadTextResult, Reader, Span}; +use crate::reader::{BangType, BinaryStream, NsReader, ParseState, ReadTextResult, Reader, Span}; use crate::utils::is_whitespace; /// A struct for read XML asynchronously from an [`AsyncBufRead`]. @@ -24,6 +27,47 @@ impl<'a, R: AsyncBufRead + Unpin> TokioAdapter<'a, R> { //////////////////////////////////////////////////////////////////////////////////////////////////// +impl<'r, R> AsyncRead for BinaryStream<'r, R> +where + R: AsyncRead + Unpin, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let start = buf.remaining(); + let this = self.get_mut(); + let poll = Pin::new(&mut *this.inner).poll_read(cx, buf); + + // If something was read, update offset + if let Poll::Ready(Ok(_)) = poll { + let amt = start - buf.remaining(); + *this.offset += amt as u64; + } + poll + } +} + +impl<'r, R> AsyncBufRead for BinaryStream<'r, R> +where + R: AsyncBufRead + Unpin, +{ + #[inline] + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut *self.get_mut().inner).poll_fill_buf(cx) + } + + #[inline] + fn consume(self: Pin<&mut Self>, amt: usize) { + let this = self.get_mut(); + this.inner.consume(amt); + *this.offset += amt as u64; + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + impl Reader { /// An asynchronous version of [`read_event_into()`]. Reads the next event into /// given buffer. diff --git a/src/reader/mod.rs b/src/reader/mod.rs index 6e030b73..9a81e90e 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -567,6 +567,68 @@ impl EncodingRef { //////////////////////////////////////////////////////////////////////////////////////////////////// +/// A direct stream to the underlying [`Reader`]s reader which updates +/// [`Reader::buffer_position()`] when read from it. +#[derive(Debug)] +#[must_use = "streams do nothing unless read or polled"] +pub struct BinaryStream<'r, R> { + inner: &'r mut R, + offset: &'r mut u64, +} + +impl<'r, R> BinaryStream<'r, R> { + /// Returns current position in bytes in the original source. + #[inline] + pub const fn offset(&self) -> u64 { + *self.offset + } + + /// Gets a reference to the underlying reader. + #[inline] + pub const fn get_ref(&self) -> &R { + self.inner + } + + /// Gets a mutable reference to the underlying reader. + /// + /// Avoid read from this reader because this will not update reader's position + /// and will lead to incorrect positions of errors. Read from this stream instead. + #[inline] + pub fn get_mut(&mut self) -> &mut R { + self.inner + } +} + +impl<'r, R> io::Read for BinaryStream<'r, R> +where + R: io::Read, +{ + #[inline] + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let amt = self.inner.read(buf)?; + *self.offset += amt as u64; + Ok(amt) + } +} + +impl<'r, R> io::BufRead for BinaryStream<'r, R> +where + R: io::BufRead, +{ + #[inline] + fn fill_buf(&mut self) -> io::Result<&[u8]> { + self.inner.fill_buf() + } + + #[inline] + fn consume(&mut self, amt: usize) { + self.inner.consume(amt); + *self.offset += amt as u64; + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// + /// A low level encoding-agnostic XML event reader. /// /// Consumes bytes and streams XML [`Event`]s. @@ -716,6 +778,12 @@ impl Reader { } /// Gets a mutable reference to the underlying reader. + /// + /// Avoid read from this reader because this will not update reader's position + /// and will lead to incorrect positions of errors. If you want to read, use + /// [`stream()`] instead. + /// + /// [`stream()`]: Self::stream pub fn get_mut(&mut self) -> &mut R { &mut self.reader } @@ -759,6 +827,66 @@ impl Reader { pub const fn decoder(&self) -> Decoder { self.state.decoder() } + + /// Get the direct access to the underlying reader, but tracks the amount of + /// read data and update [`Reader::buffer_position()`] accordingly. + /// + /// Note, that this method gives you access to the internal reader and read + /// data will not be returned in any subsequent events read by `read_event` + /// family of methods. + /// + /// # Example + /// + /// This example demonstrates how to read stream raw bytes from an XML document. + /// This could be used to implement streaming read of text, or to read raw binary + /// bytes embedded in an XML document. (Documents with embedded raw bytes are not + /// valid XML, but XML-derived file formats exist where such documents are valid). + /// + /// ``` + /// # use pretty_assertions::assert_eq; + /// use std::io::{BufRead, Read}; + /// use quick_xml::events::{BytesEnd, BytesStart, Event}; + /// use quick_xml::reader::Reader; + /// + /// let mut reader = Reader::from_str("binary << data&>"); + /// // ^ ^ ^ ^ + /// // 0 5 21 27 + /// + /// assert_eq!( + /// (reader.read_event().unwrap(), reader.buffer_position()), + /// // 5 - end of the `` + /// (Event::Start(BytesStart::new("tag")), 5) + /// ); + /// + /// // Reading directly from underlying reader will not update position + /// // let mut inner = reader.get_mut(); + /// + /// // Reading from the stream() advances position + /// let mut inner = reader.stream(); + /// + /// // Read binary data. We must know its size + /// let mut binary = [0u8; 16]; + /// inner.read_exact(&mut binary).unwrap(); + /// assert_eq!(&binary, b"binary << data&>"); + /// // 21 - end of the `binary << data&>` + /// assert_eq!(inner.offset(), 21); + /// assert_eq!(reader.buffer_position(), 21); + /// + /// assert_eq!( + /// (reader.read_event().unwrap(), reader.buffer_position()), + /// // 27 - end of the `` + /// (Event::End(BytesEnd::new("tag")), 27) + /// ); + /// + /// assert_eq!(reader.read_event().unwrap(), Event::Eof); + /// ``` + #[inline] + pub fn stream(&mut self) -> BinaryStream { + BinaryStream { + inner: &mut self.reader, + offset: &mut self.state.offset, + } + } } /// Private sync reading methods diff --git a/tests/async-tokio.rs b/tests/async-tokio.rs index b525dfbf..25ec86bc 100644 --- a/tests/async-tokio.rs +++ b/tests/async-tokio.rs @@ -1,10 +1,12 @@ +use std::io::Cursor; use std::iter; use pretty_assertions::assert_eq; use quick_xml::events::{BytesEnd, BytesStart, BytesText, Event::*}; use quick_xml::name::QName; use quick_xml::reader::Reader; -use tokio::io::BufReader; +use quick_xml::utils::Bytes; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader}; // Import `small_buffers_tests!` #[macro_use] @@ -88,6 +90,48 @@ mod read_to_end { } } +#[tokio::test] +async fn issue623() { + let mut buf = Vec::new(); + let mut reader = Reader::from_reader(Cursor::new( + b" + + _binary << data&> + + ", + )); + reader.config_mut().trim_text(true); + + assert_eq!( + ( + reader.read_event_into_async(&mut buf).await.unwrap(), + reader.buffer_position() + ), + (Start(BytesStart::new("AppendedData")), 23) + ); + + let mut inner = reader.stream(); + // Read to start of data marker + inner.read_until(b'_', &mut buf).await.unwrap(); + + // Read binary data. We must know its size + let mut binary = [0u8; 16]; + inner.read_exact(&mut binary).await.unwrap(); + assert_eq!(Bytes(&binary), Bytes(b"binary << data&>")); + assert_eq!(inner.offset(), 53); + assert_eq!(reader.buffer_position(), 53); + + assert_eq!( + ( + reader.read_event_into_async(&mut buf).await.unwrap(), + reader.buffer_position() + ), + (End(BytesEnd::new("AppendedData")), 77) + ); + + assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Eof); +} + /// Regression test for https://github.com/tafia/quick-xml/issues/751 /// /// Actually, that error was not found in async reader, but we would to test it as well. diff --git a/tests/issues.rs b/tests/issues.rs index c14f09c6..95dd64ff 100644 --- a/tests/issues.rs +++ b/tests/issues.rs @@ -2,7 +2,7 @@ //! //! Name each module / test as `issue` and keep sorted by issue number -use std::io::BufReader; +use std::io::{BufRead, BufReader, Cursor, Read}; use std::iter; use std::sync::mpsc; @@ -10,6 +10,9 @@ use quick_xml::errors::{Error, IllFormedError, SyntaxError}; use quick_xml::events::{BytesDecl, BytesEnd, BytesStart, BytesText, Event}; use quick_xml::name::QName; use quick_xml::reader::Reader; +use quick_xml::utils::Bytes; + +use pretty_assertions::assert_eq; /// Regression test for https://github.com/tafia/quick-xml/issues/94 #[test] @@ -258,6 +261,89 @@ fn issue622() { } } +/// Regression test for https://github.com/tafia/quick-xml/issues/623 +mod issue623 { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn borrowed() { + let mut reader = Reader::from_str( + " + + _binary << data&> + + ", + ); + reader.config_mut().trim_text(true); + + assert_eq!( + (reader.read_event().unwrap(), reader.buffer_position()), + (Event::Start(BytesStart::new("AppendedData")), 27) + ); + + let mut inner = reader.stream(); + // Read to start of data marker + inner.read_until(b'_', &mut Vec::new()).unwrap(); + + // Read binary data. We must know its size + let mut binary = [0u8; 16]; + inner.read_exact(&mut binary).unwrap(); + assert_eq!(Bytes(&binary), Bytes(b"binary << data&>")); + assert_eq!(inner.offset(), 61); + assert_eq!(reader.buffer_position(), 61); + + assert_eq!( + (reader.read_event().unwrap(), reader.buffer_position()), + (Event::End(BytesEnd::new("AppendedData")), 89) + ); + + assert_eq!(reader.read_event().unwrap(), Event::Eof); + } + + #[test] + fn buffered() { + let mut buf = Vec::new(); + let mut reader = Reader::from_reader(Cursor::new( + b" + + _binary << data&> + + ", + )); + reader.config_mut().trim_text(true); + + assert_eq!( + ( + reader.read_event_into(&mut buf).unwrap(), + reader.buffer_position() + ), + (Event::Start(BytesStart::new("AppendedData")), 27) + ); + + let mut inner = reader.stream(); + // Read to start of data marker + inner.read_until(b'_', &mut buf).unwrap(); + + // Read binary data. We must know its size + let mut binary = [0u8; 16]; + inner.read_exact(&mut binary).unwrap(); + assert_eq!(Bytes(&binary), Bytes(b"binary << data&>")); + assert_eq!(inner.offset(), 61); + assert_eq!(reader.buffer_position(), 61); + + assert_eq!( + ( + reader.read_event_into(&mut buf).unwrap(), + reader.buffer_position() + ), + (Event::End(BytesEnd::new("AppendedData")), 89) + ); + + assert_eq!(reader.read_event_into(&mut buf).unwrap(), Event::Eof); + } +} + /// Regression test for https://github.com/tafia/quick-xml/issues/706 #[test] fn issue706() {