Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix garbage collection #1560

Merged
merged 2 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion crates/re_arrow_store/src/store_gc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;

use arrow2::array::{Array, ListArray};
use re_log::info;
use re_log::{info, trace};
use re_log_types::{ComponentName, TimeRange, Timeline};

use crate::{ComponentBucket, DataStore};
Expand Down Expand Up @@ -110,6 +110,7 @@ impl DataStore {
) -> Vec<Box<dyn Array>> {
let mut dropped = Vec::<Box<dyn Array>>::new();

let mut i = 0usize;
while drop_at_least_size_bytes > 0.0 {
// Find and drop the earliest (in terms of _insertion order_) primary component bucket
// that we can find.
Expand All @@ -119,11 +120,31 @@ impl DataStore {
.and_then(|table| (table.buckets.len() > 1).then(|| table.buckets.pop_front()))
.flatten()
else {
trace!(
kind = "gc",
id = self.gc_id,
timeline = %primary_timeline.name(),
%primary_component,
iter = i,
remaining = re_format::format_bytes(drop_at_least_size_bytes),
"no more primary buckets, giving up"
);
break;
};

drop_at_least_size_bytes -= primary_bucket.total_size_bytes() as f64;

trace!(
kind = "gc",
id = self.gc_id,
timeline = %primary_timeline.name(),
%primary_component,
iter = i,
reclaimed = re_format::format_bytes(primary_bucket.total_size_bytes() as f64),
remaining = re_format::format_bytes(drop_at_least_size_bytes),
"found & dropped primary component bucket"
);

// From there, find and drop all component buckets (in _insertion order_) that do not
// contain any data more recent than the time range covered by the primary
// component bucket (for the primary timeline!).
Expand All @@ -137,10 +158,22 @@ impl DataStore {
if primary_bucket.encompasses(primary_timeline, &bucket.time_ranges) {
let bucket = table.buckets.pop_front().unwrap();
drop_at_least_size_bytes -= bucket.total_size_bytes() as f64;
trace!(
kind = "gc",
id = self.gc_id,
timeline = %primary_timeline.name(),
%primary_component,
iter = i,
reclaimed = re_format::format_bytes(bucket.total_size_bytes() as f64),
remaining = re_format::format_bytes(drop_at_least_size_bytes),
"found & dropped secondary component bucket"
);
} else {
break;
}
}

i += 1;
}

// We don't collect indices: they behave as tombstones.
Expand Down
14 changes: 12 additions & 2 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,21 @@ impl Default for EntityDb {
data_store: re_arrow_store::DataStore::new(
InstanceKey::name(),
DataStoreConfig {
component_bucket_size_bytes: 1024 * 1024, // 1 MiB
// Garbage collection of the datastore is currently driven by the `MsgId`
// component column, as a workaround for the `MsgId` mismatch issue.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is "the MsgId mismatch issue"? Is there an actual issue describing it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially, the store has no easy way of returning a list of MsgIds to the main app when it garbage collects data (because there's no such thing as a message in the store: it gets split into an arbitrary number of index and component buckets during insertion).
We do some major hacks in the current GC implementation to somewhat work around the issue, but this leads to a whole bunch of other problems: one of which is addressed by this PR.

Waiting for us to discuss more during this week's session before opening issues.

//
// Since this component is only a few bytes large, trying to trigger a GC
// based on bucket size is a lost cause, so make sure to have a small enough
// row limit.
//
// TODO(cmc): Reasses once the whole `MsgId` mismatch issue is resolved
// (probably once batching is implemented).
component_bucket_nb_rows: 128,
component_bucket_size_bytes: 10 * 1024 * 1024, // 10 MiB
// We do not garbage collect index buckets at the moment, and so the size of
// individual index buckets is irrelevant, only their total number of rows
// matter.
// See <pr-link> for details.
// See https://github.com/rerun-io/rerun/pull/1558 for details.
//
// TODO(cmc): Bring back index GC once the whole `MsgId` mismatch issue is
// resolved (probably once batching is implemented).
Expand Down