Skip to content

Commit

Permalink
Implement ability to read data directly from the underlying reader
Browse files Browse the repository at this point in the history
Co-authored-by: Daniel Alley <dalley@redhat.com>
  • Loading branch information
Mingun and dralley committed Jul 15, 2024
1 parent 10ddcb7 commit b112663
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 4 deletions.
5 changes: 5 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@

### New Features

- [#623]: Added `Reader::stream()` that can be used to read arbitrary data
from the inner reader while track position for XML reader.

### Bug Fixes

### Misc Changes

[#623]: https://github.com/tafia/quick-xml/issues/623


## 0.36.0 -- 2024-07-08

Expand Down
48 changes: 46 additions & 2 deletions src/reader/async_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
//! as underlying byte stream. This reader fully implements async/await so reading
//! can use non-blocking I/O.
use tokio::io::{self, AsyncBufRead, AsyncBufReadExt};
use std::pin::Pin;
use std::task::{Context, Poll};

use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, ReadBuf};

use crate::errors::{Error, Result, SyntaxError};
use crate::events::Event;
use crate::name::{QName, ResolveResult};
use crate::parser::{ElementParser, Parser, PiParser};
use crate::reader::buffered_reader::impl_buffered_source;
use crate::reader::{BangType, NsReader, ParseState, ReadTextResult, Reader, Span};
use crate::reader::{BangType, BinaryStream, NsReader, ParseState, ReadTextResult, Reader, Span};
use crate::utils::is_whitespace;

/// A struct for read XML asynchronously from an [`AsyncBufRead`].
Expand All @@ -24,6 +27,47 @@ impl<'a, R: AsyncBufRead + Unpin> TokioAdapter<'a, R> {

////////////////////////////////////////////////////////////////////////////////////////////////////

impl<'r, R> AsyncRead for BinaryStream<'r, R>
where
R: AsyncRead + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let start = buf.remaining();
let this = self.get_mut();
let poll = Pin::new(&mut *this.inner).poll_read(cx, buf);

// If something was read, update offset
if let Poll::Ready(Ok(_)) = poll {
let amt = start - buf.remaining();
*this.offset += amt as u64;
}
poll
}
}

impl<'r, R> AsyncBufRead for BinaryStream<'r, R>
where
R: AsyncBufRead + Unpin,
{
#[inline]
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
Pin::new(&mut *self.get_mut().inner).poll_fill_buf(cx)
}

#[inline]
fn consume(self: Pin<&mut Self>, amt: usize) {
let this = self.get_mut();
this.inner.consume(amt);
*this.offset += amt as u64;
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////

impl<R: AsyncBufRead + Unpin> Reader<R> {
/// An asynchronous version of [`read_event_into()`]. Reads the next event into
/// given buffer.
Expand Down
128 changes: 128 additions & 0 deletions src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,68 @@ impl EncodingRef {

////////////////////////////////////////////////////////////////////////////////////////////////////

/// A direct stream to the underlying [`Reader`]s reader which updates
/// [`Reader::buffer_position()`] when read from it.
#[derive(Debug)]
#[must_use = "streams do nothing unless read or polled"]
pub struct BinaryStream<'r, R> {
inner: &'r mut R,
offset: &'r mut u64,
}

impl<'r, R> BinaryStream<'r, R> {
/// Returns current position in bytes in the original source.
#[inline]
pub const fn offset(&self) -> u64 {
*self.offset
}

/// Gets a reference to the underlying reader.
#[inline]
pub const fn get_ref(&self) -> &R {
self.inner
}

/// Gets a mutable reference to the underlying reader.
///
/// Avoid read from this reader because this will not update reader's position
/// and will lead to incorrect positions of errors. Read from this stream instead.
#[inline]
pub fn get_mut(&mut self) -> &mut R {
self.inner
}
}

impl<'r, R> io::Read for BinaryStream<'r, R>
where
R: io::Read,
{
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let amt = self.inner.read(buf)?;
*self.offset += amt as u64;
Ok(amt)
}
}

impl<'r, R> io::BufRead for BinaryStream<'r, R>
where
R: io::BufRead,
{
#[inline]
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.inner.fill_buf()
}

#[inline]
fn consume(&mut self, amt: usize) {
self.inner.consume(amt);
*self.offset += amt as u64;
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////

/// A low level encoding-agnostic XML event reader.
///
/// Consumes bytes and streams XML [`Event`]s.
Expand Down Expand Up @@ -716,6 +778,12 @@ impl<R> Reader<R> {
}

/// Gets a mutable reference to the underlying reader.
///
/// Avoid read from this reader because this will not update reader's position
/// and will lead to incorrect positions of errors. If you want to read, use
/// [`stream()`] instead.
///
/// [`stream()`]: Self::stream
pub fn get_mut(&mut self) -> &mut R {
&mut self.reader
}
Expand Down Expand Up @@ -759,6 +827,66 @@ impl<R> Reader<R> {
pub const fn decoder(&self) -> Decoder {
self.state.decoder()
}

/// Get the direct access to the underlying reader, but tracks the amount of
/// read data and update [`Reader::buffer_position()`] accordingly.
///
/// Note, that this method gives you access to the internal reader and read
/// data will not be returned in any subsequent events read by `read_event`
/// family of methods.
///
/// # Example
///
/// This example demonstrates how to read stream raw bytes from an XML document.
/// This could be used to implement streaming read of text, or to read raw binary
/// bytes embedded in an XML document. (Documents with embedded raw bytes are not
/// valid XML, but XML-derived file formats exist where such documents are valid).
///
/// ```
/// # use pretty_assertions::assert_eq;
/// use std::io::{BufRead, Read};
/// use quick_xml::events::{BytesEnd, BytesStart, Event};
/// use quick_xml::reader::Reader;
///
/// let mut reader = Reader::from_str("<tag>binary << data&></tag>");
/// // ^ ^ ^ ^
/// // 0 5 21 27
///
/// assert_eq!(
/// (reader.read_event().unwrap(), reader.buffer_position()),
/// // 5 - end of the `<tag>`
/// (Event::Start(BytesStart::new("tag")), 5)
/// );
///
/// // Reading directly from underlying reader will not update position
/// // let mut inner = reader.get_mut();
///
/// // Reading from the stream() advances position
/// let mut inner = reader.stream();
///
/// // Read binary data. We must know its size
/// let mut binary = [0u8; 16];
/// inner.read_exact(&mut binary).unwrap();
/// assert_eq!(&binary, b"binary << data&>");
/// // 21 - end of the `binary << data&>`
/// assert_eq!(inner.offset(), 21);
/// assert_eq!(reader.buffer_position(), 21);
///
/// assert_eq!(
/// (reader.read_event().unwrap(), reader.buffer_position()),
/// // 27 - end of the `</tag>`
/// (Event::End(BytesEnd::new("tag")), 27)
/// );
///
/// assert_eq!(reader.read_event().unwrap(), Event::Eof);
/// ```
#[inline]
pub fn stream(&mut self) -> BinaryStream<R> {
BinaryStream {
inner: &mut self.reader,
offset: &mut self.state.offset,
}
}
}

/// Private sync reading methods
Expand Down
46 changes: 45 additions & 1 deletion tests/async-tokio.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::io::Cursor;
use std::iter;

use pretty_assertions::assert_eq;
use quick_xml::events::{BytesEnd, BytesStart, BytesText, Event::*};
use quick_xml::name::QName;
use quick_xml::reader::Reader;
use tokio::io::BufReader;
use quick_xml::utils::Bytes;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};

// Import `small_buffers_tests!`
#[macro_use]
Expand Down Expand Up @@ -88,6 +90,48 @@ mod read_to_end {
}
}

#[tokio::test]
async fn issue623() {
let mut buf = Vec::new();
let mut reader = Reader::from_reader(Cursor::new(
b"
<AppendedData>
_binary << data&>
</AppendedData>
",
));
reader.config_mut().trim_text(true);

assert_eq!(
(
reader.read_event_into_async(&mut buf).await.unwrap(),
reader.buffer_position()
),
(Start(BytesStart::new("AppendedData")), 23)
);

let mut inner = reader.stream();
// Read to start of data marker
inner.read_until(b'_', &mut buf).await.unwrap();

// Read binary data. We must know its size
let mut binary = [0u8; 16];
inner.read_exact(&mut binary).await.unwrap();
assert_eq!(Bytes(&binary), Bytes(b"binary << data&>"));
assert_eq!(inner.offset(), 53);
assert_eq!(reader.buffer_position(), 53);

assert_eq!(
(
reader.read_event_into_async(&mut buf).await.unwrap(),
reader.buffer_position()
),
(End(BytesEnd::new("AppendedData")), 77)
);

assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Eof);
}

/// Regression test for https://github.com/tafia/quick-xml/issues/751
///
/// Actually, that error was not found in async reader, but we would to test it as well.
Expand Down
Loading

0 comments on commit b112663

Please sign in to comment.