Skip to content

Commit

Permalink
add EntityPathHash to metadata registry
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Nov 30, 2023
1 parent b6138a9 commit 76ceb29
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 26 deletions.
6 changes: 2 additions & 4 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub struct MetadataRegistry<T: Clone> {
pub heap_size_bytes: u64,
}

impl Default for MetadataRegistry<TimePoint> {
impl Default for MetadataRegistry<(TimePoint, EntityPathHash)> {
fn default() -> Self {
let mut this = Self {
registry: Default::default(),
Expand Down Expand Up @@ -201,9 +201,7 @@ pub struct DataStore {
pub(crate) type_registry: DataTypeRegistry,

/// Keeps track of arbitrary per-row metadata.
///
/// Only used to map `RowId`s to their original [`TimePoint`]s at the moment.
pub(crate) metadata_registry: MetadataRegistry<TimePoint>,
pub(crate) metadata_registry: MetadataRegistry<(TimePoint, EntityPathHash)>,

/// Used to cache auto-generated cluster cells (`[0]`, `[0, 1]`, `[0, 1, 2]`, …)
/// so that they can be properly deduplicated on insertion.
Expand Down
7 changes: 4 additions & 3 deletions crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl DataStore {
// 2. Find all tables that potentially hold data associated with that `RowId`
// 3. Drop the associated row and account for the space we got back

for (row_id, timepoint) in &self.metadata_registry.registry {
for (row_id, (timepoint, entity_path_hash)) in &self.metadata_registry.registry {
if num_bytes_to_drop <= 0.0 {
break;
}
Expand Down Expand Up @@ -272,8 +272,9 @@ impl DataStore {
// Only decrement the metadata size trackers if we're actually certain that we'll drop
// that RowId in the end.
if diff.is_some() {
let metadata_dropped_size_bytes =
row_id.total_size_bytes() + timepoint.total_size_bytes();
let metadata_dropped_size_bytes = row_id.total_size_bytes()
+ timepoint.total_size_bytes()
+ entity_path_hash.total_size_bytes();
self.metadata_registry.heap_size_bytes = self
.metadata_registry
.heap_size_bytes
Expand Down
9 changes: 5 additions & 4 deletions crates/re_arrow_store/src/store_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use std::{collections::VecDeque, ops::RangeBounds, sync::atomic::Ordering};

use itertools::Itertools;
use re_log::trace;
use re_log_types::{DataCell, EntityPath, RowId, TimeInt, TimePoint, TimeRange, Timeline};
use re_log_types::{
DataCell, EntityPath, EntityPathHash, RowId, TimeInt, TimePoint, TimeRange, Timeline,
};
use re_types_core::{ComponentName, ComponentNameSet};

use crate::{
Expand Down Expand Up @@ -495,9 +497,8 @@ impl DataStore {
}
}

pub fn get_msg_metadata(&self, row_id: &RowId) -> Option<&TimePoint> {
re_tracing::profile_function!();

#[inline]
pub fn get_msg_metadata(&self, row_id: &RowId) -> Option<&(TimePoint, EntityPathHash)> {
self.metadata_registry.get(row_id)
}

Expand Down
29 changes: 16 additions & 13 deletions crates/re_arrow_store/src/store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use parking_lot::RwLock;

use re_log::{debug, trace};
use re_log_types::{
DataCell, DataCellColumn, DataCellError, DataRow, RowId, TimeInt, TimePoint, TimeRange,
VecDequeRemovalExt as _,
DataCell, DataCellColumn, DataCellError, DataRow, EntityPathHash, RowId, TimeInt, TimePoint,
TimeRange, VecDequeRemovalExt as _,
};
use re_types_core::{
components::InstanceKey, ComponentName, ComponentNameSet, Loggable, SizeBytes as _,
Expand Down Expand Up @@ -72,12 +72,13 @@ impl DataStore {
let DataRow {
row_id,
timepoint,
entity_path: ent_path,
entity_path,
num_instances,
cells,
} = row;

self.metadata_registry.upsert(*row_id, timepoint.clone())?;
self.metadata_registry
.upsert(*row_id, (timepoint.clone(), entity_path.hash()))?;

re_tracing::profile_function!();

Expand Down Expand Up @@ -113,7 +114,7 @@ impl DataStore {
}
}

let ent_path_hash = ent_path.hash();
let ent_path_hash = entity_path.hash();
let num_instances = *num_instances;

trace!(
Expand All @@ -123,7 +124,7 @@ impl DataStore {
timelines = ?timepoint.iter()
.map(|(timeline, time)| (timeline.name(), timeline.typ().format_utc(*time)))
.collect::<Vec<_>>(),
entity = %ent_path,
entity = %entity_path,
components = ?cells.iter().map(|cell| cell.component_name()).collect_vec(),
"insertion started…"
);
Expand Down Expand Up @@ -165,12 +166,14 @@ impl DataStore {
let index = self
.timeless_tables
.entry(ent_path_hash)
.or_insert_with(|| PersistentIndexedTable::new(self.cluster_key, ent_path.clone()));
.or_insert_with(|| {
PersistentIndexedTable::new(self.cluster_key, entity_path.clone())
});

index.insert_row(insert_id, generated_cluster_cell.clone(), row);
} else {
for (timeline, time) in timepoint.iter() {
let ent_path = ent_path.clone(); // shallow
let ent_path = entity_path.clone(); // shallow
let index = self
.tables
.entry((*timeline, ent_path_hash))
Expand All @@ -186,7 +189,7 @@ impl DataStore {
}
}

let diff = StoreDiff::addition(*row_id, ent_path.clone())
let diff = StoreDiff::addition(*row_id, entity_path.clone())
.at_timepoint(timepoint.clone())
.with_cells(cells.iter().cloned());

Expand Down Expand Up @@ -249,16 +252,16 @@ impl DataStore {
}
}

impl MetadataRegistry<TimePoint> {
fn upsert(&mut self, row_id: RowId, timepoint: TimePoint) -> WriteResult<()> {
impl MetadataRegistry<(TimePoint, EntityPathHash)> {
fn upsert(&mut self, row_id: RowId, data: (TimePoint, EntityPathHash)) -> WriteResult<()> {
match self.entry(row_id) {
std::collections::btree_map::Entry::Occupied(_) => Err(WriteError::ReusedRowId(row_id)),
std::collections::btree_map::Entry::Vacant(entry) => {
// NOTE: In a map, thus on the heap!
let added_size_bytes = row_id.total_size_bytes() + timepoint.total_size_bytes();
let added_size_bytes = row_id.total_size_bytes() + data.total_size_bytes();

// This is valuable information even for a timeless timepoint!
entry.insert(timepoint);
entry.insert(data);

self.heap_size_bytes += added_size_bytes;

Expand Down
4 changes: 2 additions & 2 deletions crates/re_space_view_text_log/src/space_view_class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,10 @@ impl ViewTextFilters {
// ---

fn get_time_point(ctx: &ViewerContext<'_>, entry: &Entry) -> Option<TimePoint> {
if let Some(time_point) = ctx.store_db.store().get_msg_metadata(&entry.row_id) {
if let Some((time_point, _)) = ctx.store_db.store().get_msg_metadata(&entry.row_id) {
Some(time_point.clone())
} else {
re_log::warn_once!("Missing meta-data for {:?}", entry.entity_path);
re_log::warn_once!("Missing metadata for {:?}", entry.entity_path);
None
}
}
Expand Down

0 comments on commit 76ceb29

Please sign in to comment.