Skip to content

Commit

Permalink
Fix up HTTP server / client with new way of streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
TimonPost committed Nov 28, 2023
1 parent a7ec5ef commit 2e05f43
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 96 deletions.
4 changes: 2 additions & 2 deletions puffin/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl Stream {
fn write_scope_id(&mut self, scope_id: ScopeId) {
// Could potentially use varint encoding.
self.0
.write_u32::<LE>(scope_id.0 as u32)
.write_u32::<LE>(scope_id.0)
.expect("can't fail");
}

Expand Down Expand Up @@ -205,7 +205,7 @@ impl<'s> Reader<'s> {
fn parse_scope_id(&mut self) -> Result<ScopeId> {
self.0
.read_u32::<LE>()
.map(|x| ScopeId(x))
.map(ScopeId)
.map_err(|_err| Error::PrematureEnd)
}

Expand Down
136 changes: 120 additions & 16 deletions puffin/src/frame_data.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use crate::{Error, FrameIndex, NanoSecond, Result, StreamInfo, ThreadInfo};

use crate::{Error, FrameIndex, NanoSecond, Result, ScopeId, StreamInfo, ThreadInfo};
use crate::ScopeDetails;
#[cfg(feature = "packing")]
use parking_lot::RwLock;

use std::{collections::BTreeMap, sync::Arc};
use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
};

// ----------------------------------------------------------------------------

Expand Down Expand Up @@ -290,25 +293,33 @@ pub struct FrameData {
/// [`UnpackedFrameData::thread_streams`], compressed.
/// [`None`] if not yet compressed.
packed_streams: RwLock<Option<PackedStreams>>,

/// Scopes that were registered during this frame.
pub registered_scopes: HashSet<ScopeId>,
}

#[cfg(feature = "packing")]
impl FrameData {
pub fn new(
frame_index: FrameIndex,
thread_streams: BTreeMap<ThreadInfo, StreamInfo>,
registered_scopes: HashSet<ScopeId>,
) -> Result<Self> {
Ok(Self::from_unpacked(Arc::new(UnpackedFrameData::new(
frame_index,
thread_streams,
)?)))
Ok(Self::from_unpacked(
Arc::new(UnpackedFrameData::new(frame_index, thread_streams)?),
registered_scopes,
))
}

fn from_unpacked(unpacked_frame: Arc<UnpackedFrameData>) -> Self {
fn from_unpacked(
unpacked_frame: Arc<UnpackedFrameData>,
registered_scopes: HashSet<ScopeId>,
) -> Self {
Self {
meta: unpacked_frame.meta.clone(),
unpacked_frame: RwLock::new(Some(Ok(unpacked_frame))),
packed_streams: RwLock::new(None),
registered_scopes,
}
}

Expand Down Expand Up @@ -411,12 +422,15 @@ impl FrameData {
/// Writes one [`FrameData`] into a stream, prefixed by its length ([`u32`] le).
#[cfg(not(target_arch = "wasm32"))] // compression not supported on wasm
#[cfg(feature = "serialization")]
pub fn write_into(&self, write: &mut impl std::io::Write) -> anyhow::Result<()> {
pub fn write_into(&self, scope_details: &ScopeDetails, write: &mut impl std::io::Write) -> anyhow::Result<()> {

use bincode::Options as _;
use byteorder::WriteBytesExt as _;
use byteorder::{WriteBytesExt as _, LE};

use crate::SerdeScopeDetails;
let meta_serialized = bincode::options().serialize(&self.meta)?;

write.write_all(b"PFD3")?;
write.write_all(b"PFD4")?;
write.write_all(&(meta_serialized.len() as u32).to_le_bytes())?;
write.write_all(&meta_serialized)?;

Expand All @@ -428,6 +442,27 @@ impl FrameData {
write.write_u8(packed_streams.compression_kind as u8)?;
write.write_all(&packed_streams.bytes)?;

let mut to_serialize_scopes = Vec::new();

for new_scope_id in &self.registered_scopes {
scope_details.scopes_by_id(|details| {
if let Some(details) = details.get(new_scope_id) {
to_serialize_scopes.push(
SerdeScopeDetails {
scope_id: *new_scope_id,
scope_name: details.dynamic_scope_name.to_string(),
file_path: details.dynamic_file_path.to_string(),
line_nmr: details.line_nr,
},
);
}
});
}


let serialized_scopes = bincode::options().serialize(&to_serialize_scopes)?;
write.write_u32::<LE>(serialized_scopes.len() as u32)?;
write.write_all(&serialized_scopes)?;
Ok(())
}

Expand All @@ -436,11 +471,13 @@ impl FrameData {
/// [`None`] is returned if the end of the stream is reached (EOF),
/// or an end-of-stream sentinel of `0u32` is read.
#[cfg(feature = "serialization")]
pub fn read_next(read: &mut impl std::io::Read) -> anyhow::Result<Option<Self>> {
pub fn read_next(scope_details: &ScopeDetails, read: &mut impl std::io::Read) -> anyhow::Result<Option<Self>> {
use anyhow::Context as _;
use bincode::Options as _;
use byteorder::ReadBytesExt;
use byteorder::{ReadBytesExt, LE};

use crate::{ ScopeDetailsOwned, SerdeScopeDetails};

let mut header = [0_u8; 4];
if let Err(err) = read.read_exact(&mut header) {
if err.kind() == std::io::ErrorKind::UnexpectedEof {
Expand Down Expand Up @@ -480,7 +517,10 @@ impl FrameData {
}

fn into_frame_data(self) -> FrameData {
FrameData::from_unpacked(Arc::new(self.into_unpacked_frame_data()))
FrameData::from_unpacked(
Arc::new(self.into_unpacked_frame_data()),
Default::default(),
)
}
}

Expand Down Expand Up @@ -540,6 +580,7 @@ impl FrameData {
meta,
unpacked_frame: RwLock::new(None),
packed_streams: RwLock::new(Some(packed_streams)),
registered_scopes: Default::default(),
}))
} else if &header == b"PFD3" {
// Added 2023-05-13: CompressionKind field
Expand All @@ -561,6 +602,7 @@ impl FrameData {
let compression_kind = CompressionKind::from_u8(compression_kind)?;
let mut streams_compressed = vec![0_u8; streams_compressed_length];
read.read_exact(&mut streams_compressed)?;


let packed_streams = PackedStreams::new(compression_kind, streams_compressed);

Expand All @@ -570,11 +612,73 @@ impl FrameData {
meta,
unpacked_frame: RwLock::new(None),
packed_streams: RwLock::new(Some(packed_streams)),
registered_scopes: Default::default()
}))
} else {
}
else if &header == b"PFD4" {
// Added 2023-11-28: Send scope details separate from scope stream
let mut meta_length = [0_u8; 4];
read.read_exact(&mut meta_length)?;
let meta_length = u32::from_le_bytes(meta_length) as usize;
let mut meta = vec![0_u8; meta_length];
read.read_exact(&mut meta)?;

let meta: FrameMeta = bincode::options()
.deserialize(&meta)
.context("bincode deserialize")?;

let mut streams_compressed_length = [0_u8; 4];
read.read_exact(&mut streams_compressed_length)?;
let streams_compressed_length =
u32::from_le_bytes(streams_compressed_length) as usize;
let compression_kind = read.read_u8()?;
let compression_kind = CompressionKind::from_u8(compression_kind)?;
let mut streams_compressed = vec![0_u8; streams_compressed_length];
read.read_exact(&mut streams_compressed)?;

let packed_streams = PackedStreams::new(compression_kind, streams_compressed);

let serialized_scope_len = read.read_u32::<LE>()?;
let mut serialized_scopes: Vec<u8> = vec![0; serialized_scope_len as usize];
read.read_exact(&mut serialized_scopes)
.context("Can not deserialize scope details")?;

println!("{}",serialized_scope_len);

let deserialized_scopes: Vec<SerdeScopeDetails> = bincode::options()
.deserialize_from(&serialized_scopes[..]) // Use a slice instead of the whole vector
.context("Can not deserialize scope details")?;

for serde_scope_details in &deserialized_scopes {
scope_details.insert(
serde_scope_details.scope_id,
ScopeDetailsOwned{
dynamic_scope_name:serde_scope_details.scope_name.clone().into(),
dynamic_file_path: serde_scope_details.file_path.clone().into(),
line_nr:serde_scope_details.line_nmr,
raw_scope_name: "empty",
raw_file_path: "empty",
location: format!("{}:{}", serde_scope_details.file_path, serde_scope_details.line_nmr),
}
);
}

Ok(Some(Self {
meta,
unpacked_frame: RwLock::new(None),
packed_streams: RwLock::new(Some(packed_streams)),
registered_scopes: deserialized_scopes
.into_iter()
.map(|x| x.scope_id)
.collect(),
}))
}
else {
anyhow::bail!("Failed to decode: this data is newer than this reader. Please update your puffin version!");
}
} else {
}
else {
print!("else");
// Very old packet without magic header
let mut bytes = vec![0_u8; u32::from_le_bytes(header) as usize];
read.read_exact(&mut bytes)?;
Expand Down
Loading

0 comments on commit 2e05f43

Please sign in to comment.