Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GC improvements 5: Store{Diff,Event} optimizations #4399

Merged
merged 3 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 20 additions & 46 deletions crates/re_arrow_store/src/store_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,17 @@ pub struct StoreDiff {
/// one addition and (optionally) one deletion (in that order!).
pub row_id: RowId,

/// The [`TimePoint`] associated with that row.
/// The time data associated with that row.
///
/// Since insertions and deletions both work on a row-level basis, this is guaranteed to be the
/// same value for both the insertion and deletion events (if any).
pub timepoint: TimePoint,
///
/// This is not a [`TimePoint`] for performance reasons.
//
// NOTE: Empirical testing shows that a SmallVec isn't any better in the best case, and can be a
// significant performant drop at worst.
// pub times: SmallVec<[(Timeline, TimeInt); 5]>, // "5 timelines ought to be enough for anyone"
Comment on lines +122 to +124
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

pub times: Vec<(Timeline, TimeInt)>,

/// The [`EntityPath`] associated with that row.
///
Expand All @@ -137,7 +143,7 @@ impl StoreDiff {
Self {
kind: StoreDiffKind::Addition,
row_id: row_id.into(),
timepoint: TimePoint::timeless(),
times: Default::default(),
entity_path: entity_path.into(),
cells: Default::default(),
}
Expand All @@ -148,75 +154,43 @@ impl StoreDiff {
Self {
kind: StoreDiffKind::Deletion,
row_id: row_id.into(),
timepoint: TimePoint::timeless(),
times: Default::default(),
entity_path: entity_path.into(),
cells: Default::default(),
}
}

#[inline]
pub fn at_timepoint(mut self, timepoint: impl Into<TimePoint>) -> StoreDiff {
self.timepoint = self.timepoint.union_max(&timepoint.into());
pub fn at_timepoint(&mut self, timepoint: impl Into<TimePoint>) -> &mut Self {
self.times.extend(timepoint.into());
self
}

#[inline]
pub fn at_timestamp(
mut self,
&mut self,
timeline: impl Into<Timeline>,
time: impl Into<TimeInt>,
) -> StoreDiff {
self.timepoint.insert(timeline.into(), time.into());
) -> &mut Self {
self.times.push((timeline.into(), time.into()));
self
}

#[inline]
pub fn with_cells(mut self, cells: impl IntoIterator<Item = DataCell>) -> Self {
pub fn with_cells(&mut self, cells: impl IntoIterator<Item = DataCell>) -> &mut Self {
self.cells
.extend(cells.into_iter().map(|cell| (cell.component_name(), cell)));
self
}

/// Returns the union of two [`StoreDiff`]s.
///
/// They must share the same [`RowId`], [`EntityPath`] and [`StoreDiffKind`].
#[inline]
pub fn union(&self, rhs: &Self) -> Option<Self> {
let Self {
kind: lhs_kind,
row_id: lhs_row_id,
timepoint: lhs_timepoint,
entity_path: lhs_entity_path,
cells: lhs_cells,
} = self;
let Self {
kind: rhs_kind,
row_id: rhs_row_id,
timepoint: rhs_timepoint,
entity_path: rhs_entity_path,
cells: rhs_cells,
} = rhs;

let same_kind = lhs_kind == rhs_kind;
let same_row_id = lhs_row_id == rhs_row_id;
let same_entity_path = lhs_entity_path == rhs_entity_path;

(same_kind && same_row_id && same_entity_path).then(|| Self {
kind: *lhs_kind,
row_id: *lhs_row_id,
timepoint: lhs_timepoint.clone().union_max(rhs_timepoint),
entity_path: lhs_entity_path.clone(),
cells: [lhs_cells.values(), rhs_cells.values()]
.into_iter()
.flatten()
.map(|cell| (cell.component_name(), cell.clone()))
.collect(),
})
pub fn timepoint(&self) -> TimePoint {
self.times.clone().into_iter().collect()
}

#[inline]
pub fn is_timeless(&self) -> bool {
self.timepoint.is_timeless()
self.times.is_empty()
}

/// `-1` for deletions, `+1` for additions.
Expand Down Expand Up @@ -297,7 +271,7 @@ mod tests {
if event.is_timeless() {
self.timeless += delta;
} else {
for (&timeline, &time) in &event.timepoint {
for &(timeline, time) in &event.times {
*self.timelines.entry(timeline).or_default() += delta;
*self.times.entry(time).or_default() += delta;
}
Expand Down
19 changes: 9 additions & 10 deletions crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl DataStore {
table.try_drop_row(&self.cluster_cell_cache, row_id, time.as_i64());
if let Some(inner) = diff.as_mut() {
if let Some(removed) = removed {
diff = inner.union(&removed);
inner.times.extend(removed.times);
}
} else {
diff = removed;
Expand All @@ -258,7 +258,7 @@ impl DataStore {
table.try_drop_row(&self.cluster_cell_cache, row_id);
if let Some(inner) = diff.as_mut() {
if let Some(removed) = removed {
diff = inner.union(&removed);
inner.times.extend(removed.times);
}
} else {
diff = removed;
Expand Down Expand Up @@ -476,7 +476,7 @@ impl DataStore {
.entry(row_id)
.or_insert_with(|| StoreDiff::deletion(row_id, entity_path.clone()));

diff.timepoint.insert(bucket.timeline, time.into());
diff.times.push((bucket.timeline, time.into()));

for column in &mut inner.columns.values_mut() {
let cell = column[i].take();
Expand Down Expand Up @@ -657,10 +657,9 @@ impl IndexedBucketInner {
if let Some(inner) = diff.as_mut() {
inner.cells.insert(cell.component_name(), cell);
} else {
diff = StoreDiff::deletion(removed_row_id, ent_path.clone())
.at_timestamp(timeline, time)
.with_cells([cell])
.into();
let mut d = StoreDiff::deletion(removed_row_id, ent_path.clone());
d.at_timestamp(timeline, time).with_cells([cell]);
diff = Some(d);
}
}
}
Expand Down Expand Up @@ -752,9 +751,9 @@ impl PersistentIndexedTable {
if let Some(inner) = diff.as_mut() {
inner.cells.insert(cell.component_name(), cell);
} else {
diff = StoreDiff::deletion(removed_row_id, ent_path.clone())
.with_cells([cell])
.into();
let mut d = StoreDiff::deletion(removed_row_id, ent_path.clone());
d.cells.insert(cell.component_name(), cell);
diff = Some(d);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/re_arrow_store/src/store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ impl DataStore {
}
}

let diff = StoreDiff::addition(*row_id, entity_path.clone())
.at_timepoint(timepoint.clone())
let mut diff = StoreDiff::addition(*row_id, entity_path.clone());
diff.at_timepoint(timepoint.clone())
.with_cells(cells.iter().cloned());

// TODO(#4220): should we fire for auto-generated data?
Expand Down
22 changes: 11 additions & 11 deletions crates/re_data_store/src/entity_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl CompactedStoreEvents {
event.delta().unsigned_abs();
}
} else {
for (&timeline, &time) in &event.timepoint {
for &(timeline, time) in &event.times {
let per_timeline = this.timeful.entry(event.entity_path.hash()).or_default();
let per_component = per_timeline.entry(timeline).or_default();
for component_name in event.cells.keys() {
Expand Down Expand Up @@ -205,7 +205,7 @@ impl EntityTree {
let leaf = self.create_subtrees_recursively(
event.diff.entity_path.as_slice(),
0,
&event.diff.timepoint,
&event.diff.times,
event.num_components() as _,
);

Expand Down Expand Up @@ -235,7 +235,7 @@ impl EntityTree {
pending_clears = self.flat_clears.clone().into_iter().collect_vec();
Default::default()
});
per_component.add(&store_diff.timepoint, 1);
per_component.add(&store_diff.times, 1);

// Is the newly added component under the influence of previously logged `Clear`
// component?
Expand Down Expand Up @@ -343,7 +343,7 @@ impl EntityTree {
next,
is_recursive,
store_diff.row_id,
store_diff.timepoint.clone(),
store_diff.timepoint(),
));
stack.extend(next.children.values_mut().collect::<Vec<&mut Self>>());
}
Expand All @@ -352,7 +352,7 @@ impl EntityTree {
self,
is_recursive,
store_diff.row_id,
store_diff.timepoint.clone(),
store_diff.timepoint(),
));
}

Expand Down Expand Up @@ -387,7 +387,7 @@ impl EntityTree {
.entry(component_path.entity_path().clone())
.or_default();

*timepoint = store_diff.timepoint.clone().union_max(timepoint);
*timepoint = store_diff.timepoint().union_max(timepoint);
component_paths.insert(component_path.clone());
}
}
Expand Down Expand Up @@ -433,7 +433,7 @@ impl EntityTree {
for event in filtered_events.iter().filter(|e| &e.entity_path == path) {
for component_name in event.cells.keys() {
if let Some(histo) = self.time_histograms_per_component.get_mut(component_name) {
histo.remove(&event.timepoint, 1);
histo.remove(&event.timepoint(), 1);
if histo.is_empty() {
self.time_histograms_per_component.remove(component_name);
}
Expand All @@ -442,7 +442,7 @@ impl EntityTree {
}

for event in &filtered_events {
recursive_time_histogram.remove(&event.timepoint, event.num_components() as _);
recursive_time_histogram.remove(&event.timepoint(), event.num_components() as _);
}

children.retain(|_, child| {
Expand All @@ -458,10 +458,10 @@ impl EntityTree {
&mut self,
full_path: &[EntityPathPart],
depth: usize,
timepoint: &TimePoint,
times: &[(Timeline, TimeInt)],
num_components: u32,
) -> &mut Self {
self.recursive_time_histogram.add(timepoint, num_components);
self.recursive_time_histogram.add(times, num_components);

match full_path.get(depth) {
None => {
Expand All @@ -473,7 +473,7 @@ impl EntityTree {
.or_insert_with(|| {
EntityTree::new(full_path[..depth + 1].into(), self.recursive_clears.clone())
})
.create_subtrees_recursively(full_path, depth + 1, timepoint, num_components),
.create_subtrees_recursively(full_path, depth + 1, times, num_components),
}
}

Expand Down
12 changes: 6 additions & 6 deletions crates/re_data_store/src/time_histogram_per_timeline.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::BTreeMap;

use re_arrow_store::{StoreEvent, StoreSubscriber};
use re_log_types::{TimePoint, Timeline};
use re_log_types::{TimeInt, TimePoint, Timeline};

// ---

Expand Down Expand Up @@ -51,8 +51,8 @@ impl TimeHistogramPerTimeline {
self.num_timeless_messages
}

pub fn add(&mut self, timepoint: &TimePoint, n: u32) {
if timepoint.is_timeless() {
pub fn add(&mut self, times: &[(Timeline, TimeInt)], n: u32) {
if times.is_empty() {
self.num_timeless_messages = self
.num_timeless_messages
.checked_add(n as u64)
Expand All @@ -61,11 +61,11 @@ impl TimeHistogramPerTimeline {
u64::MAX
});
} else {
for (timeline, time_value) in timepoint.iter() {
for &(timeline, time) in times {
self.times
.entry(*timeline)
.entry(timeline)
.or_default()
.increment(time_value.as_i64(), n);
.increment(time.as_i64(), n);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_store/src/times_per_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl StoreSubscriber for TimesPerTimeline {
re_tracing::profile_function!(format!("num_events={}", events.len()));

for event in events {
for (&timeline, &time) in &event.timepoint {
for &(timeline, time) in &event.times {
let per_time = self.0.entry(timeline).or_default();
let count = per_time.entry(time).or_default();

Expand Down
2 changes: 1 addition & 1 deletion examples/rust/custom_store_subscriber/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl StoreSubscriber for TimeRangesPerEntity {

fn on_events(&mut self, events: &[StoreEvent]) {
for event in events {
for (&timeline, &time) in &event.timepoint {
for &(timeline, time) in &event.times {
// update counters
let per_timeline = self.times.entry(event.entity_path.clone()).or_default();
let per_time = per_timeline.entry(timeline).or_default();
Expand Down
Loading