Skip to content

Commit

Permalink
GC improvements 3: turn datastore columns into ring-buffers (#4397)
Browse files Browse the repository at this point in the history
This turns every single column in `DataStore`/`DataTable` into a
ringbuffer (`VecDeque`).

This means that on the common/happy path of data being ingested in
order:
1. Inserting new rows doesn't require re-sorting the bucket (that's
nothing new), and
2. garbage collecting rows doesn't require re-sorting the bucket nor
copying anything (that's very new).

This leads to very significant performance improvements on the common
path.

- Fixes #1823 

### Benchmarks

Compared to `main`:
```
group                                                     gc_improvements_0                       gc_improvements_3
-----                                                     -----------------                       -----------------
.../plotting_dashboard/drop_at_least=0.3/bucketsz=1024    4.50   1084.0±4.47ms 54.1 KElem/sec     1.00    241.0±1.66ms 243.1 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=2048    8.86       2.1±0.02s 27.6 KElem/sec     1.00    239.9±2.70ms 244.3 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=256     1.88    465.8±2.50ms 125.8 KElem/sec    1.00    247.4±3.94ms 236.8 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=512     2.72    655.3±2.61ms 89.4 KElem/sec     1.00    241.2±2.06ms 243.0 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/default          2.72    652.8±4.12ms 89.8 KElem/sec     1.00    239.6±1.98ms 244.6 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=1024         40.21      2.4±0.05s 24.2 KElem/sec     1.00     60.3±1.16ms 972.3 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=2048         40.08      2.4±0.03s 24.1 KElem/sec     1.00     60.8±1.14ms 964.3 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=256          40.97      2.5±0.08s 23.5 KElem/sec     1.00     61.0±1.99ms 960.9 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=512          39.45      2.4±0.02s 24.5 KElem/sec     1.00     60.6±1.45ms 966.9 KElem/sec
.../timeless_logs/drop_at_least=0.3/default               41.78      2.4±0.03s 24.4 KElem/sec     1.00     57.6±0.35ms 1018.1 KElem/sec
```

Compared to previous PR:
```
group                                                     gc_improvements_1                       gc_improvements_3
-----                                                     -----------------                       -----------------
.../plotting_dashboard/drop_at_least=0.3/bucketsz=1024    4.63   1117.2±9.07ms 52.4 KElem/sec     1.00    241.0±1.66ms 243.1 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=2048    8.96       2.1±0.01s 27.3 KElem/sec     1.00    239.9±2.70ms 244.3 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=256     1.91    471.5±4.76ms 124.3 KElem/sec    1.00    247.4±3.94ms 236.8 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/bucketsz=512     2.76    666.7±6.64ms 87.9 KElem/sec     1.00    241.2±2.06ms 243.0 KElem/sec
.../plotting_dashboard/drop_at_least=0.3/default          2.78    665.6±4.67ms 88.0 KElem/sec     1.00    239.6±1.98ms 244.6 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=1024         134.66      8.1±0.10s  7.2 KElem/sec    1.00     60.3±1.16ms 972.3 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=2048         132.44      8.0±0.09s  7.3 KElem/sec    1.00     60.8±1.14ms 964.3 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=256          132.22      8.1±0.11s  7.3 KElem/sec    1.00     61.0±1.99ms 960.9 KElem/sec
.../timeless_logs/drop_at_least=0.3/bucketsz=512          133.27      8.1±0.11s  7.3 KElem/sec    1.00     60.6±1.45ms 966.9 KElem/sec
.../timeless_logs/drop_at_least=0.3/default               140.04      8.1±0.07s  7.3 KElem/sec    1.00     57.6±0.35ms 1018.1 KElem/sec
```

---

Part of the GC improvements series:
- #4394
- #4395
- #4396
- #4397
- #4398
- #4399
- #4400
- #4401
  • Loading branch information
teh-cmc authored Dec 2, 2023
1 parent 7da6b27 commit 0d81047
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 114 deletions.
8 changes: 3 additions & 5 deletions crates/re_arrow_store/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::collections::BTreeMap;
use std::collections::{BTreeMap, VecDeque};
use std::sync::atomic::AtomicU64;

use ahash::HashMap;
use arrow2::datatypes::DataType;
use nohash_hasher::IntMap;
use parking_lot::RwLock;
use smallvec::SmallVec;

use re_log_types::{
DataCell, DataCellColumn, EntityPath, EntityPathHash, ErasedTimeVec, NumInstancesVec, RowId,
RowIdVec, StoreId, TimeInt, TimePoint, TimeRange, Timeline,
Expand Down Expand Up @@ -72,7 +70,7 @@ impl DataStoreConfig {

// ---

pub type InsertIdVec = SmallVec<[u64; 4]>;
pub type InsertIdVec = VecDeque<u64>;

/// Keeps track of datatype information for all component types that have been written to the store
/// so far.
Expand Down Expand Up @@ -325,7 +323,7 @@ impl DataStore {
let entry = oldest_time_per_timeline
.entry(bucket.timeline)
.or_insert(TimeInt::MAX);
if let Some(time) = bucket.inner.read().col_time.first() {
if let Some(time) = bucket.inner.read().col_time.front() {
*entry = TimeInt::min(*entry, (*time).into());
}
}
Expand Down
18 changes: 9 additions & 9 deletions crates/re_arrow_store/src/store_arrow.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::BTreeMap;
use std::collections::{BTreeMap, VecDeque};

use arrow2::{array::Array, chunk::Chunk, datatypes::Schema};
use nohash_hasher::IntMap;
Expand Down Expand Up @@ -94,10 +94,10 @@ impl PersistentIndexedTable {

fn serialize(
cluster_key: &ComponentName,
col_time: Option<(Timeline, &[i64])>,
col_insert_id: &[u64],
col_row_id: &[RowId],
col_num_instances: &[NumInstances],
col_time: Option<(Timeline, &VecDeque<i64>)>,
col_insert_id: &VecDeque<u64>,
col_row_id: &VecDeque<RowId>,
col_num_instances: &VecDeque<NumInstances>,
table: &IntMap<ComponentName, DataCellColumn>,
) -> DataTableResult<(Schema, Chunk<Box<dyn Array>>)> {
re_tracing::profile_function!();
Expand Down Expand Up @@ -129,10 +129,10 @@ fn serialize(
}

fn serialize_control_columns(
col_time: Option<(Timeline, &[i64])>,
col_insert_id: &[u64],
col_row_id: &[RowId],
col_num_instances: &[NumInstances],
col_time: Option<(Timeline, &VecDeque<i64>)>,
col_insert_id: &VecDeque<u64>,
col_row_id: &VecDeque<RowId>,
col_num_instances: &VecDeque<NumInstances>,
) -> DataTableResult<(Schema, Vec<Box<dyn Array>>)> {
re_tracing::profile_function!();

Expand Down
39 changes: 26 additions & 13 deletions crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::BTreeMap;

use ahash::{HashMap, HashSet};

use re_log_types::{EntityPath, RowId, TimeInt, TimeRange, Timeline};
use re_log_types::{EntityPath, RowId, TimeInt, TimeRange, Timeline, VecDequeRemovalExt as _};
use re_types_core::{ComponentName, SizeBytes as _};

use crate::{
Expand Down Expand Up @@ -599,7 +599,7 @@ impl IndexedBucketInner {
// We removed the last row
*time_range = TimeRange::EMPTY;
} else {
*is_sorted = false;
*is_sorted = row_index == 0 || row_index.saturating_add(1) == col_row_id.len();

// We have at least two rows, so we can safely [index] here:
if row_index == 0 {
Expand All @@ -613,25 +613,32 @@ impl IndexedBucketInner {
}

// col_row_id
let removed_row_id = col_row_id.swap_remove(row_index);
let Some(removed_row_id) = col_row_id.swap_remove(row_index) else {
continue;
};
debug_assert_eq!(row_id, removed_row_id);
dropped_num_bytes += removed_row_id.total_size_bytes();

// col_time
let row_time = col_time.swap_remove(row_index);
dropped_num_bytes += row_time.total_size_bytes();
if let Some(row_time) = col_time.swap_remove(row_index) {
dropped_num_bytes += row_time.total_size_bytes();
}

// col_insert_id (if present)
if !col_insert_id.is_empty() {
dropped_num_bytes += col_insert_id.swap_remove(row_index).total_size_bytes();
if let Some(insert_id) = col_insert_id.swap_remove(row_index) {
dropped_num_bytes += insert_id.total_size_bytes();
}
}

// col_num_instances
dropped_num_bytes += col_num_instances.swap_remove(row_index).total_size_bytes();
if let Some(num_instances) = col_num_instances.swap_remove(row_index) {
dropped_num_bytes += num_instances.total_size_bytes();
}

// each data column
for column in columns.values_mut() {
let cell = column.0.swap_remove(row_index);
let cell = column.0.swap_remove(row_index).flatten();

// TODO(#1809): once datatype deduplication is in, we should really not count
// autogenerated keys as part of the memory stats (same on write path).
Expand Down Expand Up @@ -703,24 +710,30 @@ impl PersistentIndexedTable {
let mut diff: Option<StoreDiff> = None;

if let Ok(row_index) = col_row_id.binary_search(&row_id) {
*is_sorted = row_index.saturating_add(1) == col_row_id.len();
*is_sorted = row_index == 0 || row_index.saturating_add(1) == col_row_id.len();

// col_row_id
let removed_row_id = col_row_id.swap_remove(row_index);
let Some(removed_row_id) = col_row_id.swap_remove(row_index) else {
return (None, 0);
};
debug_assert_eq!(row_id, removed_row_id);
dropped_num_bytes += removed_row_id.total_size_bytes();

// col_insert_id (if present)
if !col_insert_id.is_empty() {
dropped_num_bytes += col_insert_id.swap_remove(row_index).total_size_bytes();
if let Some(insert_id) = col_insert_id.swap_remove(row_index) {
dropped_num_bytes += insert_id.total_size_bytes();
}
}

// col_num_instances
dropped_num_bytes += col_num_instances.swap_remove(row_index).total_size_bytes();
if let Some(num_instances) = col_num_instances.swap_remove(row_index) {
dropped_num_bytes += num_instances.total_size_bytes();
}

// each data column
for column in columns.values_mut() {
let cell = column.0.swap_remove(row_index);
let cell = column.0.swap_remove(row_index).flatten();

// TODO(#1809): once datatype deduplication is in, we should really not count
// autogenerated keys as part of the memory stats (same on write path).
Expand Down
6 changes: 3 additions & 3 deletions crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(clippy::all, unused_variables, dead_code)]

use std::collections::BTreeSet;
use std::collections::{BTreeSet, VecDeque};

use arrow2::{
array::{new_empty_array, Array, BooleanArray, ListArray, Utf8Array},
Expand Down Expand Up @@ -268,7 +268,7 @@ impl IndexedBucket {
fn insert_ids_as_series(col_insert_id: &InsertIdVec) -> Series {
re_tracing::profile_function!();

let insert_ids = arrow2::array::UInt64Array::from_slice(col_insert_id.as_slice());
let insert_ids = DataTable::serialize_primitive_deque(col_insert_id);
new_infallible_series(
DataStore::insert_id_component_name().as_ref(),
&insert_ids,
Expand All @@ -281,7 +281,7 @@ fn column_as_series(
num_rows: usize,
datatype: arrow2::datatypes::DataType,
component: ComponentName,
cells: &[Option<DataCell>],
cells: &VecDeque<Option<DataCell>>,
) -> Series {
re_tracing::profile_function!();

Expand Down
11 changes: 5 additions & 6 deletions crates/re_arrow_store/src/store_read.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::{ops::RangeBounds, sync::atomic::Ordering};
use std::{collections::VecDeque, ops::RangeBounds, sync::atomic::Ordering};

use itertools::Itertools;
use re_log::trace;
use re_log_types::{DataCell, EntityPath, RowId, TimeInt, TimePoint, TimeRange, Timeline};
use re_types_core::{ComponentName, ComponentNameSet};
use smallvec::SmallVec;

use crate::{
store::PersistentIndexedTableInner, DataStore, IndexedBucket, IndexedBucketInner, IndexedTable,
Expand Down Expand Up @@ -990,8 +989,8 @@ impl IndexedBucketInner {
{
re_tracing::profile_scope!("control");

fn reshuffle_control_column<T: Copy, const N: usize>(
column: &mut SmallVec<[T; N]>,
fn reshuffle_control_column<T: Copy>(
column: &mut VecDeque<T>,
swaps: &[(usize, usize)],
) {
let source = {
Expand Down Expand Up @@ -1232,8 +1231,8 @@ impl PersistentIndexedTableInner {
{
re_tracing::profile_scope!("control");

fn reshuffle_control_column<T: Copy, const N: usize>(
column: &mut SmallVec<[T; N]>,
fn reshuffle_control_column<T: Copy>(
column: &mut VecDeque<T>,
swaps: &[(usize, usize)],
) {
let source = {
Expand Down
6 changes: 3 additions & 3 deletions crates/re_arrow_store/src/store_sanity.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use re_log_types::{DataCellColumn, NumInstances, RowId, TimeRange};
use re_log_types::{DataCellColumn, NumInstances, RowId, TimeRange, VecDequeSortingExt as _};
use re_types_core::{ComponentName, Loggable, SizeBytes as _};

use crate::{
Expand Down Expand Up @@ -173,8 +173,8 @@ impl IndexedBucket {
let mut times = col_time.clone();
times.sort();

let expected_min = times.first().copied().unwrap_or(i64::MAX).into();
let expected_max = times.last().copied().unwrap_or(i64::MIN).into();
let expected_min = times.front().copied().unwrap_or(i64::MAX).into();
let expected_max = times.back().copied().unwrap_or(i64::MIN).into();
let expected_time_range = TimeRange::new(expected_min, expected_max);

if expected_time_range != *time_range {
Expand Down
Loading

0 comments on commit 0d81047

Please sign in to comment.