From 0d81047c7ad21917e08130db8bf3875f0a9c1887 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Sat, 2 Dec 2023 12:55:08 +0100 Subject: [PATCH] GC improvements 3: turn datastore columns into ring-buffers (#4397) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This turns every single column in `DataStore`/`DataTable` into a ringbuffer (`VecDeque`). This means that on the common/happy path of data being ingested in order: 1. Inserting new rows doesn't require re-sorting the bucket (that's nothing new), and 2. garbage collecting rows doesn't require re-sorting the bucket nor copying anything (that's very new). This leads to very significant performance improvements on the common path. - Fixes #1823 ### Benchmarks Compared to `main`: ``` group gc_improvements_0 gc_improvements_3 ----- ----------------- ----------------- .../plotting_dashboard/drop_at_least=0.3/bucketsz=1024 4.50 1084.0±4.47ms 54.1 KElem/sec 1.00 241.0±1.66ms 243.1 KElem/sec .../plotting_dashboard/drop_at_least=0.3/bucketsz=2048 8.86 2.1±0.02s 27.6 KElem/sec 1.00 239.9±2.70ms 244.3 KElem/sec .../plotting_dashboard/drop_at_least=0.3/bucketsz=256 1.88 465.8±2.50ms 125.8 KElem/sec 1.00 247.4±3.94ms 236.8 KElem/sec .../plotting_dashboard/drop_at_least=0.3/bucketsz=512 2.72 655.3±2.61ms 89.4 KElem/sec 1.00 241.2±2.06ms 243.0 KElem/sec .../plotting_dashboard/drop_at_least=0.3/default 2.72 652.8±4.12ms 89.8 KElem/sec 1.00 239.6±1.98ms 244.6 KElem/sec .../timeless_logs/drop_at_least=0.3/bucketsz=1024 40.21 2.4±0.05s 24.2 KElem/sec 1.00 60.3±1.16ms 972.3 KElem/sec .../timeless_logs/drop_at_least=0.3/bucketsz=2048 40.08 2.4±0.03s 24.1 KElem/sec 1.00 60.8±1.14ms 964.3 KElem/sec .../timeless_logs/drop_at_least=0.3/bucketsz=256 40.97 2.5±0.08s 23.5 KElem/sec 1.00 61.0±1.99ms 960.9 KElem/sec .../timeless_logs/drop_at_least=0.3/bucketsz=512 39.45 2.4±0.02s 24.5 KElem/sec 1.00 60.6±1.45ms 966.9 KElem/sec .../timeless_logs/drop_at_least=0.3/default 41.78 2.4±0.03s 24.4 KElem/sec 1.00 57.6±0.35ms 1018.1 KElem/sec ``` Compared to previous PR: ``` group gc_improvements_1 gc_improvements_3 ----- ----------------- ----------------- .../plotting_dashboard/drop_at_least=0.3/bucketsz=1024 4.63 1117.2±9.07ms 52.4 KElem/sec 1.00 241.0±1.66ms 243.1 KElem/sec .../plotting_dashboard/drop_at_least=0.3/bucketsz=2048 8.96 2.1±0.01s 27.3 KElem/sec 1.00 239.9±2.70ms 244.3 KElem/sec .../plotting_dashboard/drop_at_least=0.3/bucketsz=256 1.91 471.5±4.76ms 124.3 KElem/sec 1.00 247.4±3.94ms 236.8 KElem/sec .../plotting_dashboard/drop_at_least=0.3/bucketsz=512 2.76 666.7±6.64ms 87.9 KElem/sec 1.00 241.2±2.06ms 243.0 KElem/sec .../plotting_dashboard/drop_at_least=0.3/default 2.78 665.6±4.67ms 88.0 KElem/sec 1.00 239.6±1.98ms 244.6 KElem/sec .../timeless_logs/drop_at_least=0.3/bucketsz=1024 134.66 8.1±0.10s 7.2 KElem/sec 1.00 60.3±1.16ms 972.3 KElem/sec .../timeless_logs/drop_at_least=0.3/bucketsz=2048 132.44 8.0±0.09s 7.3 KElem/sec 1.00 60.8±1.14ms 964.3 KElem/sec .../timeless_logs/drop_at_least=0.3/bucketsz=256 132.22 8.1±0.11s 7.3 KElem/sec 1.00 61.0±1.99ms 960.9 KElem/sec .../timeless_logs/drop_at_least=0.3/bucketsz=512 133.27 8.1±0.11s 7.3 KElem/sec 1.00 60.6±1.45ms 966.9 KElem/sec .../timeless_logs/drop_at_least=0.3/default 140.04 8.1±0.07s 7.3 KElem/sec 1.00 57.6±0.35ms 1018.1 KElem/sec ``` --- Part of the GC improvements series: - #4394 - #4395 - #4396 - #4397 - #4398 - #4399 - #4400 - #4401 --- crates/re_arrow_store/src/store.rs | 8 +-- crates/re_arrow_store/src/store_arrow.rs | 18 +++--- crates/re_arrow_store/src/store_gc.rs | 39 ++++++++---- crates/re_arrow_store/src/store_polars.rs | 6 +- crates/re_arrow_store/src/store_read.rs | 11 ++-- crates/re_arrow_store/src/store_sanity.rs | 6 +- crates/re_arrow_store/src/store_write.rs | 77 +++++++++-------------- crates/re_log_types/src/data_table.rs | 71 +++++++++++++-------- crates/re_sdk/Cargo.toml | 1 + crates/re_types_core/src/size_bytes.rs | 13 +++- 10 files changed, 136 insertions(+), 114 deletions(-) diff --git a/crates/re_arrow_store/src/store.rs b/crates/re_arrow_store/src/store.rs index 8431ebe84646..345c74288cde 100644 --- a/crates/re_arrow_store/src/store.rs +++ b/crates/re_arrow_store/src/store.rs @@ -1,12 +1,10 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use std::sync::atomic::AtomicU64; use ahash::HashMap; use arrow2::datatypes::DataType; use nohash_hasher::IntMap; use parking_lot::RwLock; -use smallvec::SmallVec; - use re_log_types::{ DataCell, DataCellColumn, EntityPath, EntityPathHash, ErasedTimeVec, NumInstancesVec, RowId, RowIdVec, StoreId, TimeInt, TimePoint, TimeRange, Timeline, @@ -72,7 +70,7 @@ impl DataStoreConfig { // --- -pub type InsertIdVec = SmallVec<[u64; 4]>; +pub type InsertIdVec = VecDeque; /// Keeps track of datatype information for all component types that have been written to the store /// so far. @@ -325,7 +323,7 @@ impl DataStore { let entry = oldest_time_per_timeline .entry(bucket.timeline) .or_insert(TimeInt::MAX); - if let Some(time) = bucket.inner.read().col_time.first() { + if let Some(time) = bucket.inner.read().col_time.front() { *entry = TimeInt::min(*entry, (*time).into()); } } diff --git a/crates/re_arrow_store/src/store_arrow.rs b/crates/re_arrow_store/src/store_arrow.rs index dcae7a56b3ab..8148ede60b54 100644 --- a/crates/re_arrow_store/src/store_arrow.rs +++ b/crates/re_arrow_store/src/store_arrow.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use arrow2::{array::Array, chunk::Chunk, datatypes::Schema}; use nohash_hasher::IntMap; @@ -94,10 +94,10 @@ impl PersistentIndexedTable { fn serialize( cluster_key: &ComponentName, - col_time: Option<(Timeline, &[i64])>, - col_insert_id: &[u64], - col_row_id: &[RowId], - col_num_instances: &[NumInstances], + col_time: Option<(Timeline, &VecDeque)>, + col_insert_id: &VecDeque, + col_row_id: &VecDeque, + col_num_instances: &VecDeque, table: &IntMap, ) -> DataTableResult<(Schema, Chunk>)> { re_tracing::profile_function!(); @@ -129,10 +129,10 @@ fn serialize( } fn serialize_control_columns( - col_time: Option<(Timeline, &[i64])>, - col_insert_id: &[u64], - col_row_id: &[RowId], - col_num_instances: &[NumInstances], + col_time: Option<(Timeline, &VecDeque)>, + col_insert_id: &VecDeque, + col_row_id: &VecDeque, + col_num_instances: &VecDeque, ) -> DataTableResult<(Schema, Vec>)> { re_tracing::profile_function!(); diff --git a/crates/re_arrow_store/src/store_gc.rs b/crates/re_arrow_store/src/store_gc.rs index ad8b92e7d5a6..a0d6a00d85c9 100644 --- a/crates/re_arrow_store/src/store_gc.rs +++ b/crates/re_arrow_store/src/store_gc.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use ahash::{HashMap, HashSet}; -use re_log_types::{EntityPath, RowId, TimeInt, TimeRange, Timeline}; +use re_log_types::{EntityPath, RowId, TimeInt, TimeRange, Timeline, VecDequeRemovalExt as _}; use re_types_core::{ComponentName, SizeBytes as _}; use crate::{ @@ -599,7 +599,7 @@ impl IndexedBucketInner { // We removed the last row *time_range = TimeRange::EMPTY; } else { - *is_sorted = false; + *is_sorted = row_index == 0 || row_index.saturating_add(1) == col_row_id.len(); // We have at least two rows, so we can safely [index] here: if row_index == 0 { @@ -613,25 +613,32 @@ impl IndexedBucketInner { } // col_row_id - let removed_row_id = col_row_id.swap_remove(row_index); + let Some(removed_row_id) = col_row_id.swap_remove(row_index) else { + continue; + }; debug_assert_eq!(row_id, removed_row_id); dropped_num_bytes += removed_row_id.total_size_bytes(); // col_time - let row_time = col_time.swap_remove(row_index); - dropped_num_bytes += row_time.total_size_bytes(); + if let Some(row_time) = col_time.swap_remove(row_index) { + dropped_num_bytes += row_time.total_size_bytes(); + } // col_insert_id (if present) if !col_insert_id.is_empty() { - dropped_num_bytes += col_insert_id.swap_remove(row_index).total_size_bytes(); + if let Some(insert_id) = col_insert_id.swap_remove(row_index) { + dropped_num_bytes += insert_id.total_size_bytes(); + } } // col_num_instances - dropped_num_bytes += col_num_instances.swap_remove(row_index).total_size_bytes(); + if let Some(num_instances) = col_num_instances.swap_remove(row_index) { + dropped_num_bytes += num_instances.total_size_bytes(); + } // each data column for column in columns.values_mut() { - let cell = column.0.swap_remove(row_index); + let cell = column.0.swap_remove(row_index).flatten(); // TODO(#1809): once datatype deduplication is in, we should really not count // autogenerated keys as part of the memory stats (same on write path). @@ -703,24 +710,30 @@ impl PersistentIndexedTable { let mut diff: Option = None; if let Ok(row_index) = col_row_id.binary_search(&row_id) { - *is_sorted = row_index.saturating_add(1) == col_row_id.len(); + *is_sorted = row_index == 0 || row_index.saturating_add(1) == col_row_id.len(); // col_row_id - let removed_row_id = col_row_id.swap_remove(row_index); + let Some(removed_row_id) = col_row_id.swap_remove(row_index) else { + return (None, 0); + }; debug_assert_eq!(row_id, removed_row_id); dropped_num_bytes += removed_row_id.total_size_bytes(); // col_insert_id (if present) if !col_insert_id.is_empty() { - dropped_num_bytes += col_insert_id.swap_remove(row_index).total_size_bytes(); + if let Some(insert_id) = col_insert_id.swap_remove(row_index) { + dropped_num_bytes += insert_id.total_size_bytes(); + } } // col_num_instances - dropped_num_bytes += col_num_instances.swap_remove(row_index).total_size_bytes(); + if let Some(num_instances) = col_num_instances.swap_remove(row_index) { + dropped_num_bytes += num_instances.total_size_bytes(); + } // each data column for column in columns.values_mut() { - let cell = column.0.swap_remove(row_index); + let cell = column.0.swap_remove(row_index).flatten(); // TODO(#1809): once datatype deduplication is in, we should really not count // autogenerated keys as part of the memory stats (same on write path). diff --git a/crates/re_arrow_store/src/store_polars.rs b/crates/re_arrow_store/src/store_polars.rs index 0d86e64accad..5417236789d9 100644 --- a/crates/re_arrow_store/src/store_polars.rs +++ b/crates/re_arrow_store/src/store_polars.rs @@ -1,6 +1,6 @@ #![allow(clippy::all, unused_variables, dead_code)] -use std::collections::BTreeSet; +use std::collections::{BTreeSet, VecDeque}; use arrow2::{ array::{new_empty_array, Array, BooleanArray, ListArray, Utf8Array}, @@ -268,7 +268,7 @@ impl IndexedBucket { fn insert_ids_as_series(col_insert_id: &InsertIdVec) -> Series { re_tracing::profile_function!(); - let insert_ids = arrow2::array::UInt64Array::from_slice(col_insert_id.as_slice()); + let insert_ids = DataTable::serialize_primitive_deque(col_insert_id); new_infallible_series( DataStore::insert_id_component_name().as_ref(), &insert_ids, @@ -281,7 +281,7 @@ fn column_as_series( num_rows: usize, datatype: arrow2::datatypes::DataType, component: ComponentName, - cells: &[Option], + cells: &VecDeque>, ) -> Series { re_tracing::profile_function!(); diff --git a/crates/re_arrow_store/src/store_read.rs b/crates/re_arrow_store/src/store_read.rs index c35d9ecfbf48..59a6fc7ffd1b 100644 --- a/crates/re_arrow_store/src/store_read.rs +++ b/crates/re_arrow_store/src/store_read.rs @@ -1,10 +1,9 @@ -use std::{ops::RangeBounds, sync::atomic::Ordering}; +use std::{collections::VecDeque, ops::RangeBounds, sync::atomic::Ordering}; use itertools::Itertools; use re_log::trace; use re_log_types::{DataCell, EntityPath, RowId, TimeInt, TimePoint, TimeRange, Timeline}; use re_types_core::{ComponentName, ComponentNameSet}; -use smallvec::SmallVec; use crate::{ store::PersistentIndexedTableInner, DataStore, IndexedBucket, IndexedBucketInner, IndexedTable, @@ -990,8 +989,8 @@ impl IndexedBucketInner { { re_tracing::profile_scope!("control"); - fn reshuffle_control_column( - column: &mut SmallVec<[T; N]>, + fn reshuffle_control_column( + column: &mut VecDeque, swaps: &[(usize, usize)], ) { let source = { @@ -1232,8 +1231,8 @@ impl PersistentIndexedTableInner { { re_tracing::profile_scope!("control"); - fn reshuffle_control_column( - column: &mut SmallVec<[T; N]>, + fn reshuffle_control_column( + column: &mut VecDeque, swaps: &[(usize, usize)], ) { let source = { diff --git a/crates/re_arrow_store/src/store_sanity.rs b/crates/re_arrow_store/src/store_sanity.rs index 1ec3a355d04c..50e79008d3bb 100644 --- a/crates/re_arrow_store/src/store_sanity.rs +++ b/crates/re_arrow_store/src/store_sanity.rs @@ -1,4 +1,4 @@ -use re_log_types::{DataCellColumn, NumInstances, RowId, TimeRange}; +use re_log_types::{DataCellColumn, NumInstances, RowId, TimeRange, VecDequeSortingExt as _}; use re_types_core::{ComponentName, Loggable, SizeBytes as _}; use crate::{ @@ -173,8 +173,8 @@ impl IndexedBucket { let mut times = col_time.clone(); times.sort(); - let expected_min = times.first().copied().unwrap_or(i64::MAX).into(); - let expected_max = times.last().copied().unwrap_or(i64::MIN).into(); + let expected_min = times.front().copied().unwrap_or(i64::MAX).into(); + let expected_max = times.back().copied().unwrap_or(i64::MIN).into(); let expected_time_range = TimeRange::new(expected_min, expected_max); if expected_time_range != *time_range { diff --git a/crates/re_arrow_store/src/store_write.rs b/crates/re_arrow_store/src/store_write.rs index 6db010dce747..ff759c131eab 100644 --- a/crates/re_arrow_store/src/store_write.rs +++ b/crates/re_arrow_store/src/store_write.rs @@ -2,11 +2,11 @@ use arrow2::datatypes::DataType; use itertools::Itertools as _; use nohash_hasher::{IntMap, IntSet}; use parking_lot::RwLock; -use smallvec::SmallVec; use re_log::{debug, trace}; use re_log_types::{ DataCell, DataCellColumn, DataCellError, DataRow, RowId, TimeInt, TimePoint, TimeRange, + VecDequeRemovalExt as _, }; use re_types_core::{ components::InstanceKey, ComponentName, ComponentNameSet, Loggable, SizeBytes as _, @@ -339,7 +339,7 @@ impl IndexedTable { let (bucket_upper_bound, bucket_len) = { let guard = bucket.inner.read(); - (guard.col_time.last().copied(), guard.col_time.len()) + (guard.col_time.back().copied(), guard.col_time.len()) }; if let Some(upper_bound) = bucket_upper_bound { @@ -447,23 +447,23 @@ impl IndexedBucket { // append time to primary column and update time range appropriately - if let (Some(last_time), Some(last_row_id)) = (col_time.last(), col_row_id.last()) { + if let (Some(last_time), Some(last_row_id)) = (col_time.back(), col_row_id.back()) { // NOTE: Within a single timestamp, we use the Row ID as tie-breaker *is_sorted &= (*last_time, *last_row_id) <= (time.as_i64(), row.row_id()); } - col_time.push(time.as_i64()); + col_time.push_back(time.as_i64()); *time_range = TimeRange::new(time_range.min.min(time), time_range.max.max(time)); size_bytes_added += time.as_i64().total_size_bytes(); // update all control columns if let Some(insert_id) = insert_id { - col_insert_id.push(insert_id); + col_insert_id.push_back(insert_id); size_bytes_added += insert_id.total_size_bytes(); } - col_row_id.push(row.row_id()); + col_row_id.push_back(row.row_id()); size_bytes_added += row.row_id().total_size_bytes(); - col_num_instances.push(row.num_instances()); + col_num_instances.push_back(row.num_instances()); size_bytes_added += row.num_instances().total_size_bytes(); // insert auto-generated cluster cell if present @@ -476,7 +476,7 @@ impl IndexedBucket { column }); size_bytes_added += cluster_cell.total_size_bytes(); - column.0.push(Some(cluster_cell.clone())); + column.0.push_back(Some(cluster_cell.clone())); } // append components to their respective columns (2-way merge) @@ -491,7 +491,7 @@ impl IndexedBucket { column }); size_bytes_added += cell.total_size_bytes(); - column.0.push(Some(cell.clone() /* shallow */)); + column.0.push_back(Some(cell.clone() /* shallow */)); } // 2-way merge, step 2: right-to-left @@ -506,7 +506,7 @@ impl IndexedBucket { if !components.contains(component_name) { let none_cell: Option = None; size_bytes_added += none_cell.total_size_bytes(); - column.0.push(none_cell); + column.0.push_back(none_cell); } } @@ -583,35 +583,22 @@ impl IndexedBucket { // Used in debug builds to assert that we've left everything in a sane state. let _num_rows = col_time1.len(); - fn split_off_column( - column: &mut SmallVec<[T; N]>, - split_idx: usize, - ) -> SmallVec<[T; N]> { - if split_idx >= column.len() { - return SmallVec::default(); - } - - let second_half = SmallVec::from_slice(&column[split_idx..]); - column.truncate(split_idx); - second_half - } - let (min2, bucket2) = { - let split_idx = find_split_index(col_time1).expect("must be splittable at this point"); + col_time1.make_contiguous(); + let (times1, &[]) = col_time1.as_slices() else { + unreachable!(); + }; + let split_idx = find_split_index(times1).expect("must be splittable at this point"); let (time_range2, col_time2, col_insert_id2, col_row_id2, col_num_instances2) = { re_tracing::profile_scope!("control"); + // update everything _in place_! ( - // this updates `time_range1` in-place! - split_time_range_off(split_idx, col_time1, time_range1), - // this updates `col_time1` in-place! - split_off_column(col_time1, split_idx), - // this updates `col_insert_id1` in-place! - split_off_column(col_insert_id1, split_idx), - // this updates `col_row_id1` in-place! - split_off_column(col_row_id1, split_idx), - // this updates `col_num_instances1` in-place! - split_off_column(col_num_instances1, split_idx), + split_time_range_off(split_idx, times1, time_range1), + col_time1.split_off_or_default(split_idx), + col_insert_id1.split_off_or_default(split_idx), + col_row_id1.split_off_or_default(split_idx), + col_num_instances1.split_off_or_default(split_idx), ) }; @@ -622,15 +609,11 @@ impl IndexedBucket { .iter_mut() .map(|(name, column1)| { if split_idx >= column1.len() { - return (*name, DataCellColumn(SmallVec::default())); + return (*name, DataCellColumn(Default::default())); } // this updates `column1` in-place! - let column2 = DataCellColumn({ - let second_half = SmallVec::from(&column1.0[split_idx..]); - column1.0.truncate(split_idx); - second_half - }); + let column2 = DataCellColumn(column1.split_off(split_idx)); (*name, column2) }) .collect() @@ -839,7 +822,7 @@ impl PersistentIndexedTableInner { is_sorted, } = self; - if let Some(last_row_id) = col_row_id.last() { + if let Some(last_row_id) = col_row_id.back() { *is_sorted &= *last_row_id <= row.row_id(); } @@ -848,10 +831,10 @@ impl PersistentIndexedTableInner { // --- update all control columns --- if let Some(insert_id) = insert_id { - col_insert_id.push(insert_id); + col_insert_id.push_back(insert_id); } - col_row_id.push(row.row_id()); - col_num_instances.push(row.num_instances()); + col_row_id.push_back(row.row_id()); + col_num_instances.push_back(row.num_instances()); // --- append components to their respective columns (2-way merge) --- @@ -860,7 +843,7 @@ impl PersistentIndexedTableInner { let column = columns .entry(cluster_cell.component_name()) .or_insert_with(|| DataCellColumn::empty(num_rows)); - column.0.push(Some(cluster_cell.clone())); + column.0.push_back(Some(cluster_cell.clone())); } // 2-way merge, step 1: left-to-right @@ -868,7 +851,7 @@ impl PersistentIndexedTableInner { let column = columns .entry(cell.component_name()) .or_insert_with(|| DataCellColumn::empty(num_rows)); - column.0.push(Some(cell.clone() /* shallow */)); + column.0.push_back(Some(cell.clone() /* shallow */)); } // 2-way merge, step 2: right-to-left @@ -881,7 +864,7 @@ impl PersistentIndexedTableInner { } if !components.contains(component) { - column.0.push(None); + column.0.push_back(None); } } } diff --git a/crates/re_log_types/src/data_table.rs b/crates/re_log_types/src/data_table.rs index 74661e5ddf61..284060382575 100644 --- a/crates/re_log_types/src/data_table.rs +++ b/crates/re_log_types/src/data_table.rs @@ -1,9 +1,8 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use ahash::HashMap; use itertools::{izip, Itertools as _}; use nohash_hasher::IntSet; -use smallvec::SmallVec; use re_types_core::{ComponentName, Loggable, SizeBytes}; @@ -51,19 +50,19 @@ pub type DataTableResult = ::std::result::Result; // --- -pub type RowIdVec = SmallVec<[RowId; 4]>; +pub type RowIdVec = VecDeque; -pub type TimeOptVec = SmallVec<[Option; 4]>; +pub type TimeOptVec = VecDeque>; -pub type TimePointVec = SmallVec<[TimePoint; 4]>; +pub type TimePointVec = VecDeque; -pub type ErasedTimeVec = SmallVec<[i64; 4]>; +pub type ErasedTimeVec = VecDeque; -pub type EntityPathVec = SmallVec<[EntityPath; 4]>; +pub type EntityPathVec = VecDeque; -pub type NumInstancesVec = SmallVec<[NumInstances; 4]>; +pub type NumInstancesVec = VecDeque; -pub type DataCellOptVec = SmallVec<[Option; 4]>; +pub type DataCellOptVec = VecDeque>; /// A column's worth of [`DataCell`]s: a sparse collection of [`DataCell`]s that share the same /// underlying type and likely point to shared, contiguous memory. @@ -73,7 +72,7 @@ pub type DataCellOptVec = SmallVec<[Option; 4]>; pub struct DataCellColumn(pub DataCellOptVec); impl std::ops::Deref for DataCellColumn { - type Target = [Option]; + type Target = VecDeque>; #[inline] fn deref(&self) -> &Self::Target { @@ -81,8 +80,6 @@ impl std::ops::Deref for DataCellColumn { } } -// TODO(cmc): Those Deref don't actually do their job most of the time for some reason… - impl std::ops::DerefMut for DataCellColumn { #[inline] fn deref_mut(&mut self) -> &mut Self::Target { @@ -109,7 +106,7 @@ impl std::ops::IndexMut for DataCellColumn { impl DataCellColumn { #[inline] pub fn empty(num_rows: usize) -> Self { - Self(smallvec::smallvec![None; num_rows]) + Self(vec![None; num_rows].into()) } /// Compute and cache the size of each individual underlying [`DataCell`]. @@ -418,12 +415,12 @@ impl DataTable { match col_timelines.entry(*timeline) { std::collections::btree_map::Entry::Vacant(entry) => { entry - .insert(smallvec::smallvec![None; i]) - .push(Some(time.as_i64())); + .insert(vec![None; i].into()) + .push_back(Some(time.as_i64())); } std::collections::btree_map::Entry::Occupied(mut entry) => { let entry = entry.get_mut(); - entry.push(Some(time.as_i64())); + entry.push_back(Some(time.as_i64())); } } } @@ -431,7 +428,7 @@ impl DataTable { // handle potential sparseness for (timeline, col_time) in &mut col_timelines { if timepoint.get(timeline).is_none() { - col_time.push(None); + col_time.push_back(None); } } } @@ -439,10 +436,7 @@ impl DataTable { // Pre-allocate all columns (one per component). let mut columns = BTreeMap::default(); for component in components { - columns.insert( - component, - DataCellColumn(smallvec::smallvec![None; column.len()]), - ); + columns.insert(component, DataCellColumn(vec![None; column.len()].into())); } // Fill all columns (where possible: data is likely sparse). @@ -565,6 +559,7 @@ use arrow2::{ chunk::Chunk, datatypes::{DataType, Field, Schema, TimeUnit}, offset::Offsets, + types::NativeType, }; pub const METADATA_KIND: &str = "rerun.kind"; @@ -624,7 +619,7 @@ impl DataTable { timeline: Timeline, times: &TimeOptVec, ) -> (Field, Box) { - let data = PrimitiveArray::from(times.as_slice()).to(timeline.datatype()); + let data = DataTable::serialize_primitive_deque_opt(times).to(timeline.datatype()); let field = Field::new(timeline.name().as_str(), data.data_type().clone(), false) .with_metadata([(METADATA_KIND.to_owned(), METADATA_KIND_TIME.to_owned())].into()); @@ -694,7 +689,7 @@ impl DataTable { /// Serializes a single control column: an iterable of dense arrow-like data. pub fn serialize_control_column<'a, C: re_types_core::Component + 'a>( - values: &'a [C], + values: &'a VecDeque, ) -> DataTableResult<(Field, Box)> where std::borrow::Cow<'a, C>: std::convert::From<&'a C>, @@ -720,12 +715,12 @@ impl DataTable { /// Serializes a single control column; optimized path for primitive datatypes. pub fn serialize_primitive_column( name: &str, - values: &[T], + values: &VecDeque, datatype: Option, ) -> (Field, Box) { re_tracing::profile_function!(); - let data = PrimitiveArray::from_slice(values); + let data = DataTable::serialize_primitive_deque(values); let datatype = datatype.unwrap_or(data.data_type().clone()); let data = data.to(datatype.clone()).boxed(); @@ -779,7 +774,7 @@ impl DataTable { /// Serializes a single data column. pub fn serialize_data_column( name: &str, - column: &[Option], + column: &VecDeque>, ) -> DataTableResult<(Field, Box)> { re_tracing::profile_function!(); @@ -788,7 +783,7 @@ impl DataTable { /// * Before: `[C, C, C, C, C, C, C, …]` /// * After: `ListArray[ [[C, C], [C, C, C], None, [C], [C], …] ]` fn data_to_lists( - column: &[Option], + column: &VecDeque>, data: Box, ext_name: Option, ) -> Box { @@ -858,6 +853,28 @@ impl DataTable { Ok((field, data)) } + + pub fn serialize_primitive_deque_opt( + data: &VecDeque>, + ) -> PrimitiveArray { + let datatype = T::PRIMITIVE.into(); + let values = data + .iter() + .copied() + .map(Option::unwrap_or_default) + .collect(); + let validity = data + .iter() + .any(Option::is_none) + .then(|| data.iter().map(Option::is_some).collect()); + PrimitiveArray::new(datatype, values, validity) + } + + pub fn serialize_primitive_deque(data: &VecDeque) -> PrimitiveArray { + let datatype = T::PRIMITIVE.into(); + let values = data.iter().copied().collect(); + PrimitiveArray::new(datatype, values, None) + } } impl DataTable { diff --git a/crates/re_sdk/Cargo.toml b/crates/re_sdk/Cargo.toml index 406f458689d6..66fddc4d2ebd 100644 --- a/crates/re_sdk/Cargo.toml +++ b/crates/re_sdk/Cargo.toml @@ -69,6 +69,7 @@ webbrowser = { workspace = true, optional = true } [dev-dependencies] re_arrow_store.workspace = true +re_log_types = { workspace = true, features = ["testing"] } itertools.workspace = true ndarray-rand.workspace = true diff --git a/crates/re_types_core/src/size_bytes.rs b/crates/re_types_core/src/size_bytes.rs index 1f0a002990c4..eb73698ed89e 100644 --- a/crates/re_types_core/src/size_bytes.rs +++ b/crates/re_types_core/src/size_bytes.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, VecDeque}; use arrow2::datatypes::{DataType, Field}; use smallvec::SmallVec; @@ -69,6 +69,17 @@ impl SizeBytes for Vec { } } +impl SizeBytes for VecDeque { + /// Does not take capacity into account. + #[inline] + fn heap_size_bytes(&self) -> u64 { + // TODO(cmc): This is sub-optimal if these types are PODs. + + // NOTE: It's all on the heap at this point. + self.iter().map(SizeBytes::total_size_bytes).sum::() + } +} + impl SizeBytes for SmallVec<[T; N]> { /// Does not take capacity into account. #[inline]