Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement paged data serialization #132

Merged
merged 5 commits into from
Sep 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 78 additions & 31 deletions analyzeme/src/profiling_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ use crate::lightweight_event::LightweightEvent;
use crate::timestamp::Timestamp;
use crate::StringTable;
use measureme::file_header::{
read_file_header, write_file_header, CURRENT_FILE_FORMAT_VERSION, FILE_HEADER_SIZE,
FILE_MAGIC_EVENT_STREAM,
verify_file_header, write_file_header, FILE_EXTENSION, FILE_HEADER_SIZE,
FILE_MAGIC_EVENT_STREAM, FILE_MAGIC_TOP_LEVEL,
};
use measureme::{
EventId, PageTag, RawEvent, SerializationSink, SerializationSinkBuilder, StringTableBuilder,
};
use measureme::{EventId, ProfilerFiles, RawEvent, SerializationSink, StringTableBuilder};
use serde::{Deserialize, Deserializer};
use std::error::Error;
use std::fs;
use std::mem;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{error::Error, path::PathBuf};

const RAW_EVENT_SIZE: usize = mem::size_of::<RawEvent>();

Expand Down Expand Up @@ -43,35 +45,60 @@ pub struct ProfilingData {
}

impl ProfilingData {
pub fn new(path_stem: &Path) -> Result<ProfilingData, Box<dyn Error>> {
let paths = ProfilerFiles::new(path_stem);
pub fn new(path_stem: &Path) -> Result<ProfilingData, Box<dyn Error + Send + Sync>> {
let paged_path = path_stem.with_extension(FILE_EXTENSION);

if paged_path.exists() {
let data = fs::read(&paged_path)?;

verify_file_header(&data, FILE_MAGIC_TOP_LEVEL, Some(&paged_path), "top-level")?;

let mut split_data = measureme::split_streams(&data[FILE_HEADER_SIZE..]);

let string_data = split_data.remove(&PageTag::StringData).unwrap();
let index_data = split_data.remove(&PageTag::StringIndex).unwrap();
let event_data = split_data.remove(&PageTag::Events).unwrap();

let string_data = fs::read(paths.string_data_file).expect("couldn't read string_data file");
let index_data =
fs::read(paths.string_index_file).expect("couldn't read string_index file");
let event_data = fs::read(paths.events_file).expect("couldn't read events file");
ProfilingData::from_buffers(string_data, index_data, event_data, Some(&paged_path))
} else {
let mut msg = format!(
"Could not find profiling data file `{}`.",
paged_path.display()
);

ProfilingData::from_buffers(string_data, index_data, event_data)
// Let's try to give a helpful error message if we encounter files
michaelwoerister marked this conversation as resolved.
Show resolved Hide resolved
// in the old three-file-format:
let paths = ProfilerFiles::new(path_stem);

if paths.events_file.exists()
|| paths.string_data_file.exists()
|| paths.string_index_file.exists()
{
msg += "It looks like your profiling data has been generated \
by an out-dated version of measureme (0.7 or older).";
}

return Err(From::from(msg));
}
}

pub fn from_buffers(
string_data: Vec<u8>,
string_index: Vec<u8>,
events: Vec<u8>,
) -> Result<ProfilingData, Box<dyn Error>> {
diagnostic_file_path: Option<&Path>,
) -> Result<ProfilingData, Box<dyn Error + Send + Sync>> {
let index_data = string_index;
let event_data = events;

let event_data_format = read_file_header(&event_data, FILE_MAGIC_EVENT_STREAM)?;
if event_data_format != CURRENT_FILE_FORMAT_VERSION {
Err(format!(
"Event stream file format version '{}' is not supported
by this version of `measureme`.",
event_data_format
))?;
}
verify_file_header(
&event_data,
FILE_MAGIC_EVENT_STREAM,
diagnostic_file_path,
"event",
)?;

let string_table = StringTable::new(string_data, index_data)?;
let string_table = StringTable::new(string_data, index_data, diagnostic_file_path)?;

let metadata = string_table.get_metadata().to_string();
let metadata: Metadata = serde_json::from_str(&metadata)?;
Expand Down Expand Up @@ -207,17 +234,20 @@ pub struct ProfilingDataBuilder {

impl ProfilingDataBuilder {
pub fn new() -> ProfilingDataBuilder {
let event_sink = SerializationSink::new_in_memory();
let string_table_data_sink = Arc::new(SerializationSink::new_in_memory());
let string_table_index_sink = Arc::new(SerializationSink::new_in_memory());
let sink_builder = SerializationSinkBuilder::new_in_memory();

let event_sink = sink_builder.new_sink(PageTag::Events);
let string_table_data_sink = Arc::new(sink_builder.new_sink(PageTag::StringData));
let string_table_index_sink = Arc::new(sink_builder.new_sink(PageTag::StringIndex));

// The first thing in every file we generate must be the file header.
write_file_header(&event_sink, FILE_MAGIC_EVENT_STREAM);
write_file_header(&mut event_sink.as_std_write(), FILE_MAGIC_EVENT_STREAM).unwrap();

let string_table = StringTableBuilder::new(
string_table_data_sink.clone(),
string_table_index_sink.clone(),
);
)
.unwrap();

ProfilingDataBuilder {
event_sink,
Expand Down Expand Up @@ -287,11 +317,9 @@ impl ProfilingDataBuilder {
.unwrap()
.into_bytes();

assert_eq!(
read_file_header(&event_data, FILE_MAGIC_EVENT_STREAM).unwrap(),
CURRENT_FILE_FORMAT_VERSION
);
let string_table = StringTable::new(data_bytes, index_bytes).unwrap();
verify_file_header(&event_data, FILE_MAGIC_EVENT_STREAM, None, "event").unwrap();

let string_table = StringTable::new(data_bytes, index_bytes, None).unwrap();
let metadata = Metadata {
start_time: UNIX_EPOCH,
process_id: 0,
Expand Down Expand Up @@ -319,6 +347,25 @@ fn event_index_to_addr(event_index: usize) -> usize {
FILE_HEADER_SIZE + event_index * mem::size_of::<RawEvent>()
}

// This struct reflects what filenames were in old versions of measureme. It is
// used only for giving helpful error messages now if a user tries to load old
// data.
struct ProfilerFiles {
pub events_file: PathBuf,
pub string_data_file: PathBuf,
pub string_index_file: PathBuf,
}

impl ProfilerFiles {
fn new<P: AsRef<Path>>(path_stem: P) -> ProfilerFiles {
ProfilerFiles {
events_file: path_stem.as_ref().with_extension("events"),
string_data_file: path_stem.as_ref().with_extension("string_data"),
string_index_file: path_stem.as_ref().with_extension("string_index"),
}
}
}

#[rustfmt::skip]
#[cfg(test)]
mod tests {
Expand Down
55 changes: 30 additions & 25 deletions analyzeme/src/stringtable.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! See module-level documentation `measureme::stringtable`.

use measureme::file_header::{
read_file_header, strip_file_header, CURRENT_FILE_FORMAT_VERSION, FILE_MAGIC_STRINGTABLE_DATA,
strip_file_header, verify_file_header, FILE_MAGIC_STRINGTABLE_DATA,
FILE_MAGIC_STRINGTABLE_INDEX,
};
use measureme::stringtable::{METADATA_STRING_ID, STRING_ID_MASK, TERMINATOR};
Expand All @@ -11,6 +11,7 @@ use rustc_hash::FxHashMap;
use std::borrow::Cow;
use std::convert::TryInto;
use std::error::Error;
use std::path::Path;

fn deserialize_index_entry(bytes: &[u8]) -> (StringId, Addr) {
(
Expand Down Expand Up @@ -204,21 +205,23 @@ pub struct StringTable {
}

impl StringTable {
pub fn new(string_data: Vec<u8>, index_data: Vec<u8>) -> Result<StringTable, Box<dyn Error>> {
let string_data_format = read_file_header(&string_data, FILE_MAGIC_STRINGTABLE_DATA)?;
let index_data_format = read_file_header(&index_data, FILE_MAGIC_STRINGTABLE_INDEX)?;

if string_data_format != index_data_format {
Err("Mismatch between StringTable DATA and INDEX format version")?;
}

if string_data_format != CURRENT_FILE_FORMAT_VERSION {
Err(format!(
"StringTable file format version '{}' is not supported
by this version of `measureme`.",
string_data_format
))?;
}
pub fn new(
string_data: Vec<u8>,
index_data: Vec<u8>,
diagnostic_file_path: Option<&Path>,
) -> Result<StringTable, Box<dyn Error + Send + Sync>> {
verify_file_header(
&string_data,
FILE_MAGIC_STRINGTABLE_DATA,
diagnostic_file_path,
"StringTable Data",
)?;
verify_file_header(
&index_data,
FILE_MAGIC_STRINGTABLE_INDEX,
diagnostic_file_path,
"StringTable Index",
)?;

assert!(index_data.len() % 8 == 0);
let index: FxHashMap<_, _> = strip_file_header(&index_data)
Expand All @@ -243,13 +246,14 @@ impl StringTable {
#[cfg(test)]
mod tests {
use super::*;
use measureme::{SerializationSink, StringComponent, StringTableBuilder};
use measureme::{PageTag, SerializationSinkBuilder, StringComponent, StringTableBuilder};
use std::sync::Arc;

#[test]
fn simple_strings() {
let data_sink = Arc::new(SerializationSink::new_in_memory());
let index_sink = Arc::new(SerializationSink::new_in_memory());
let sink_builder = SerializationSinkBuilder::new_in_memory();
let data_sink = Arc::new(sink_builder.new_sink(PageTag::StringData));
let index_sink = Arc::new(sink_builder.new_sink(PageTag::StringIndex));

let expected_strings = &[
"abc",
Expand All @@ -264,7 +268,7 @@ mod tests {
let mut string_ids = vec![];

{
let builder = StringTableBuilder::new(data_sink.clone(), index_sink.clone());
let builder = StringTableBuilder::new(data_sink.clone(), index_sink.clone()).unwrap();

for &s in expected_strings {
string_ids.push(builder.alloc(s));
Expand All @@ -274,7 +278,7 @@ mod tests {
let data_bytes = Arc::try_unwrap(data_sink).unwrap().into_bytes();
let index_bytes = Arc::try_unwrap(index_sink).unwrap().into_bytes();

let string_table = StringTable::new(data_bytes, index_bytes).unwrap();
let string_table = StringTable::new(data_bytes, index_bytes, None).unwrap();

for (&id, &expected_string) in string_ids.iter().zip(expected_strings.iter()) {
let str_ref = string_table.get(id);
Expand All @@ -289,8 +293,9 @@ mod tests {

#[test]
fn composite_string() {
let data_sink = Arc::new(SerializationSink::new_in_memory());
let index_sink = Arc::new(SerializationSink::new_in_memory());
let sink_builder = SerializationSinkBuilder::new_in_memory();
let data_sink = Arc::new(sink_builder.new_sink(PageTag::StringData));
let index_sink = Arc::new(sink_builder.new_sink(PageTag::StringIndex));

let expected_strings = &[
"abc", // 0
Expand All @@ -306,7 +311,7 @@ mod tests {
let mut string_ids = vec![];

{
let builder = StringTableBuilder::new(data_sink.clone(), index_sink.clone());
let builder = StringTableBuilder::new(data_sink.clone(), index_sink.clone()).unwrap();

let r = |id| StringComponent::Ref(id);
let v = |s| StringComponent::Value(s);
Expand All @@ -329,7 +334,7 @@ mod tests {
let data_bytes = Arc::try_unwrap(data_sink).unwrap().into_bytes();
let index_bytes = Arc::try_unwrap(index_sink).unwrap().into_bytes();

let string_table = StringTable::new(data_bytes, index_bytes).unwrap();
let string_table = StringTable::new(data_bytes, index_bytes, None).unwrap();

for (&id, &expected_string) in string_ids.iter().zip(expected_strings.iter()) {
let str_ref = string_table.get(id);
Expand Down
12 changes: 7 additions & 5 deletions analyzeme/src/testing_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,20 @@ fn generate_profiling_data(
})
.collect();

let expected_events: Vec<_> = threads
.into_iter()
.flat_map(|t| t.join().unwrap())
.collect();

// An example of allocating the string contents of an event id that has
// already been used
profiler.map_virtual_to_concrete_string(
event_id_virtual.to_string_id(),
profiler.alloc_string("SomeQuery"),
);

drop(profiler);

let expected_events: Vec<_> = threads
.into_iter()
.flat_map(|t| t.join().unwrap())
.collect();

expected_events
}

Expand Down
2 changes: 1 addition & 1 deletion crox/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ fn get_args(full_event: &analyzeme::Event) -> Option<FxHashMap<String, String>>
}
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let opt = Opt::from_args();

let chrome_file = BufWriter::new(fs::File::create("chrome_profiler.json")?);
Expand Down
2 changes: 1 addition & 1 deletion flamegraph/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ struct Opt {
file_prefix: PathBuf,
}

fn main() -> Result<(), Box<dyn Error>> {
fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let opt = Opt::from_args();

let profiling_data = ProfilingData::new(&opt.file_prefix)?;
Expand Down
Loading