From 76ceb29b4a409c22da6debee38f2ad18b9310128 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 29 Nov 2023 14:13:28 +0100 Subject: [PATCH] add EntityPathHash to metadata registry --- crates/re_arrow_store/src/store.rs | 6 ++-- crates/re_arrow_store/src/store_gc.rs | 7 +++-- crates/re_arrow_store/src/store_read.rs | 9 +++--- crates/re_arrow_store/src/store_write.rs | 29 ++++++++++--------- .../src/space_view_class.rs | 4 +-- 5 files changed, 29 insertions(+), 26 deletions(-) diff --git a/crates/re_arrow_store/src/store.rs b/crates/re_arrow_store/src/store.rs index 345c74288cde..3880f59d4e88 100644 --- a/crates/re_arrow_store/src/store.rs +++ b/crates/re_arrow_store/src/store.rs @@ -104,7 +104,7 @@ pub struct MetadataRegistry { pub heap_size_bytes: u64, } -impl Default for MetadataRegistry { +impl Default for MetadataRegistry<(TimePoint, EntityPathHash)> { fn default() -> Self { let mut this = Self { registry: Default::default(), @@ -201,9 +201,7 @@ pub struct DataStore { pub(crate) type_registry: DataTypeRegistry, /// Keeps track of arbitrary per-row metadata. - /// - /// Only used to map `RowId`s to their original [`TimePoint`]s at the moment. - pub(crate) metadata_registry: MetadataRegistry, + pub(crate) metadata_registry: MetadataRegistry<(TimePoint, EntityPathHash)>, /// Used to cache auto-generated cluster cells (`[0]`, `[0, 1]`, `[0, 1, 2]`, …) /// so that they can be properly deduplicated on insertion. diff --git a/crates/re_arrow_store/src/store_gc.rs b/crates/re_arrow_store/src/store_gc.rs index e5038a675a6e..abeab17f2bbc 100644 --- a/crates/re_arrow_store/src/store_gc.rs +++ b/crates/re_arrow_store/src/store_gc.rs @@ -224,7 +224,7 @@ impl DataStore { // 2. Find all tables that potentially hold data associated with that `RowId` // 3. Drop the associated row and account for the space we got back - for (row_id, timepoint) in &self.metadata_registry.registry { + for (row_id, (timepoint, entity_path_hash)) in &self.metadata_registry.registry { if num_bytes_to_drop <= 0.0 { break; } @@ -272,8 +272,9 @@ impl DataStore { // Only decrement the metadata size trackers if we're actually certain that we'll drop // that RowId in the end. if diff.is_some() { - let metadata_dropped_size_bytes = - row_id.total_size_bytes() + timepoint.total_size_bytes(); + let metadata_dropped_size_bytes = row_id.total_size_bytes() + + timepoint.total_size_bytes() + + entity_path_hash.total_size_bytes(); self.metadata_registry.heap_size_bytes = self .metadata_registry .heap_size_bytes diff --git a/crates/re_arrow_store/src/store_read.rs b/crates/re_arrow_store/src/store_read.rs index 59a6fc7ffd1b..05f98aa9136c 100644 --- a/crates/re_arrow_store/src/store_read.rs +++ b/crates/re_arrow_store/src/store_read.rs @@ -2,7 +2,9 @@ use std::{collections::VecDeque, ops::RangeBounds, sync::atomic::Ordering}; use itertools::Itertools; use re_log::trace; -use re_log_types::{DataCell, EntityPath, RowId, TimeInt, TimePoint, TimeRange, Timeline}; +use re_log_types::{ + DataCell, EntityPath, EntityPathHash, RowId, TimeInt, TimePoint, TimeRange, Timeline, +}; use re_types_core::{ComponentName, ComponentNameSet}; use crate::{ @@ -495,9 +497,8 @@ impl DataStore { } } - pub fn get_msg_metadata(&self, row_id: &RowId) -> Option<&TimePoint> { - re_tracing::profile_function!(); - + #[inline] + pub fn get_msg_metadata(&self, row_id: &RowId) -> Option<&(TimePoint, EntityPathHash)> { self.metadata_registry.get(row_id) } diff --git a/crates/re_arrow_store/src/store_write.rs b/crates/re_arrow_store/src/store_write.rs index a6ca69d8734e..662fbbd8e6a1 100644 --- a/crates/re_arrow_store/src/store_write.rs +++ b/crates/re_arrow_store/src/store_write.rs @@ -5,8 +5,8 @@ use parking_lot::RwLock; use re_log::{debug, trace}; use re_log_types::{ - DataCell, DataCellColumn, DataCellError, DataRow, RowId, TimeInt, TimePoint, TimeRange, - VecDequeRemovalExt as _, + DataCell, DataCellColumn, DataCellError, DataRow, EntityPathHash, RowId, TimeInt, TimePoint, + TimeRange, VecDequeRemovalExt as _, }; use re_types_core::{ components::InstanceKey, ComponentName, ComponentNameSet, Loggable, SizeBytes as _, @@ -72,12 +72,13 @@ impl DataStore { let DataRow { row_id, timepoint, - entity_path: ent_path, + entity_path, num_instances, cells, } = row; - self.metadata_registry.upsert(*row_id, timepoint.clone())?; + self.metadata_registry + .upsert(*row_id, (timepoint.clone(), entity_path.hash()))?; re_tracing::profile_function!(); @@ -113,7 +114,7 @@ impl DataStore { } } - let ent_path_hash = ent_path.hash(); + let ent_path_hash = entity_path.hash(); let num_instances = *num_instances; trace!( @@ -123,7 +124,7 @@ impl DataStore { timelines = ?timepoint.iter() .map(|(timeline, time)| (timeline.name(), timeline.typ().format_utc(*time))) .collect::>(), - entity = %ent_path, + entity = %entity_path, components = ?cells.iter().map(|cell| cell.component_name()).collect_vec(), "insertion started…" ); @@ -165,12 +166,14 @@ impl DataStore { let index = self .timeless_tables .entry(ent_path_hash) - .or_insert_with(|| PersistentIndexedTable::new(self.cluster_key, ent_path.clone())); + .or_insert_with(|| { + PersistentIndexedTable::new(self.cluster_key, entity_path.clone()) + }); index.insert_row(insert_id, generated_cluster_cell.clone(), row); } else { for (timeline, time) in timepoint.iter() { - let ent_path = ent_path.clone(); // shallow + let ent_path = entity_path.clone(); // shallow let index = self .tables .entry((*timeline, ent_path_hash)) @@ -186,7 +189,7 @@ impl DataStore { } } - let diff = StoreDiff::addition(*row_id, ent_path.clone()) + let diff = StoreDiff::addition(*row_id, entity_path.clone()) .at_timepoint(timepoint.clone()) .with_cells(cells.iter().cloned()); @@ -249,16 +252,16 @@ impl DataStore { } } -impl MetadataRegistry { - fn upsert(&mut self, row_id: RowId, timepoint: TimePoint) -> WriteResult<()> { +impl MetadataRegistry<(TimePoint, EntityPathHash)> { + fn upsert(&mut self, row_id: RowId, data: (TimePoint, EntityPathHash)) -> WriteResult<()> { match self.entry(row_id) { std::collections::btree_map::Entry::Occupied(_) => Err(WriteError::ReusedRowId(row_id)), std::collections::btree_map::Entry::Vacant(entry) => { // NOTE: In a map, thus on the heap! - let added_size_bytes = row_id.total_size_bytes() + timepoint.total_size_bytes(); + let added_size_bytes = row_id.total_size_bytes() + data.total_size_bytes(); // This is valuable information even for a timeless timepoint! - entry.insert(timepoint); + entry.insert(data); self.heap_size_bytes += added_size_bytes; diff --git a/crates/re_space_view_text_log/src/space_view_class.rs b/crates/re_space_view_text_log/src/space_view_class.rs index fedeaf07889b..7a256769a9fe 100644 --- a/crates/re_space_view_text_log/src/space_view_class.rs +++ b/crates/re_space_view_text_log/src/space_view_class.rs @@ -251,10 +251,10 @@ impl ViewTextFilters { // --- fn get_time_point(ctx: &ViewerContext<'_>, entry: &Entry) -> Option { - if let Some(time_point) = ctx.store_db.store().get_msg_metadata(&entry.row_id) { + if let Some((time_point, _)) = ctx.store_db.store().get_msg_metadata(&entry.row_id) { Some(time_point.clone()) } else { - re_log::warn_once!("Missing meta-data for {:?}", entry.entity_path); + re_log::warn_once!("Missing metadata for {:?}", entry.entity_path); None } }