Skip to content

Commit

Permalink
SDK DataLoaders 5: customizable (external) loaders for Rust (#5351)
Browse files Browse the repository at this point in the history
Adds new `RecommendedLoadSettings` that gets passed to all `DataLoader`s
-- builtin and external -- in order to customize their behaviors.

This includes:
- A recommended recording ID
- The ID of the currently opened recording in the viewer (not
implemented)
  - Related: #5350
- A recommended entity path prefix
- A recommended timepoint

```bash
cargo r -p rerun-loader-rust-file -- run_wasm/src/main.rs  --recording-id this-one --entity-path-prefix a/b/c  --time sim_time=1000 --time wall_time=1709204046 --sequence sim_frame=42 | rerun -
```

![image](https://github.com/rerun-io/rerun/assets/2910679/631d9798-c198-4e86-b6f8-32d0b849e7b2)


Checks:
- [x] external loader ran manually (`loader | rerun`)
- [x] external loader via rerun (`rerun xxx.rs`)
- [x] log_file with external loader (`log_file xxx.rs`)

---

Part of series of PR to expose configurable `DataLoader`s to our SDKs:
- #5327 
- #5328 
- #5330
- #5337
- #5351
- #5355
  • Loading branch information
teh-cmc authored Feb 29, 2024
1 parent c413bd5 commit 9e3ffc2
Show file tree
Hide file tree
Showing 17 changed files with 274 additions and 66 deletions.
6 changes: 3 additions & 3 deletions crates/re_data_source/src/data_loader/loader_archetype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl DataLoader for ArchetypeLoader {
#[cfg(not(target_arch = "wasm32"))]
fn load_from_path(
&self,
store_id: re_log_types::StoreId,
settings: &crate::DataLoaderSettings,
filepath: std::path::PathBuf,
tx: std::sync::mpsc::Sender<LoadedData>,
) -> Result<(), crate::DataLoaderError> {
Expand All @@ -35,12 +35,12 @@ impl DataLoader for ArchetypeLoader {
.with_context(|| format!("Failed to read file {filepath:?}"))?;
let contents = std::borrow::Cow::Owned(contents);

self.load_from_file_contents(store_id, filepath, contents, tx)
self.load_from_file_contents(settings, filepath, contents, tx)
}

fn load_from_file_contents(
&self,
_store_id: re_log_types::StoreId,
_settings: &crate::DataLoaderSettings,
filepath: std::path::PathBuf,
contents: std::borrow::Cow<'_, [u8]>,
tx: std::sync::mpsc::Sender<LoadedData>,
Expand Down
8 changes: 4 additions & 4 deletions crates/re_data_source/src/data_loader/loader_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl crate::DataLoader for DirectoryLoader {
#[cfg(not(target_arch = "wasm32"))]
fn load_from_path(
&self,
store_id: re_log_types::StoreId,
settings: &crate::DataLoaderSettings,
dirpath: std::path::PathBuf,
tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
Expand All @@ -39,7 +39,7 @@ impl crate::DataLoader for DirectoryLoader {

let filepath = entry.path();
if filepath.is_file() {
let store_id = store_id.clone();
let settings = settings.clone();
let filepath = filepath.to_owned();
let tx = tx.clone();

Expand All @@ -51,7 +51,7 @@ impl crate::DataLoader for DirectoryLoader {
_ = std::thread::Builder::new()
.name(format!("load_dir_entry({filepath:?})"))
.spawn(move || {
let data = match crate::load_file::load(&store_id, &filepath, None) {
let data = match crate::load_file::load(&settings, &filepath, None) {
Ok(data) => data,
Err(err) => {
re_log::error!(?filepath, %err, "Failed to load directory entry");
Expand All @@ -74,7 +74,7 @@ impl crate::DataLoader for DirectoryLoader {
#[inline]
fn load_from_file_contents(
&self,
_store_id: re_log_types::StoreId,
_settings: &crate::DataLoaderSettings,
path: std::path::PathBuf,
_contents: std::borrow::Cow<'_, [u8]>,
_tx: std::sync::mpsc::Sender<crate::LoadedData>,
Expand Down
11 changes: 6 additions & 5 deletions crates/re_data_source/src/data_loader/loader_external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl crate::DataLoader for ExternalLoader {

fn load_from_path(
&self,
store_id: re_log_types::StoreId,
settings: &crate::DataLoaderSettings,
filepath: std::path::PathBuf,
tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
Expand All @@ -126,8 +126,9 @@ impl crate::DataLoader for ExternalLoader {
struct CompatibleLoaderFound;
let (tx_feedback, rx_feedback) = std::sync::mpsc::channel::<CompatibleLoaderFound>();

let args = settings.to_cli_args();
for exe in EXTERNAL_LOADER_PATHS.iter() {
let store_id = store_id.clone();
let args = args.clone();
let filepath = filepath.clone();
let tx = tx.clone();
let tx_feedback = tx_feedback.clone();
Expand All @@ -139,7 +140,7 @@ impl crate::DataLoader for ExternalLoader {

let child = Command::new(exe)
.arg(filepath.clone())
.args(["--recording-id".to_owned(), store_id.to_string()])
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn();
Expand Down Expand Up @@ -277,12 +278,12 @@ impl crate::DataLoader for ExternalLoader {
#[inline]
fn load_from_file_contents(
&self,
_store_id: re_log_types::StoreId,
_settings: &crate::DataLoaderSettings,
path: std::path::PathBuf,
_contents: std::borrow::Cow<'_, [u8]>,
_tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
// TODO(cmc): You could imagine a world where plugins can be streamed rrd data via their
// TODO(#5324): You could imagine a world where plugins can be streamed rrd data via their
// standard input… but today is not world.
Err(crate::DataLoaderError::Incompatible(path))
}
Expand Down
4 changes: 2 additions & 2 deletions crates/re_data_source/src/data_loader/loader_rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl crate::DataLoader for RrdLoader {
fn load_from_path(
&self,
// NOTE: The Store ID comes from the rrd file itself.
_store_id: re_log_types::StoreId,
_settings: &crate::DataLoaderSettings,
filepath: std::path::PathBuf,
tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
Expand Down Expand Up @@ -58,7 +58,7 @@ impl crate::DataLoader for RrdLoader {
fn load_from_file_contents(
&self,
// NOTE: The Store ID comes from the rrd file itself.
_store_id: re_log_types::StoreId,
_settings: &crate::DataLoaderSettings,
filepath: std::path::PathBuf,
contents: std::borrow::Cow<'_, [u8]>,
tx: std::sync::mpsc::Sender<crate::LoadedData>,
Expand Down
102 changes: 99 additions & 3 deletions crates/re_data_source/src/data_loader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,106 @@ use std::sync::Arc;

use once_cell::sync::Lazy;

use re_log_types::{ArrowMsg, DataRow, LogMsg};
use re_log_types::{ArrowMsg, DataRow, EntityPath, LogMsg, TimePoint};

// ---

/// Recommended settings for the [`DataLoader`].
///
/// The loader is free to ignore some or all of these.
///
/// External [`DataLoader`]s will be passed the following CLI parameters:
/// * `--recording-id <store_id>`
/// * `--opened-recording-id <opened_store_id>` (if set)
/// * `--entity-path-prefix <entity_path_prefix>` (if set)
/// * `--timeless` (if `timepoint` is set to the timeless timepoint)
/// * `--time <timeline1>=<time1> <timeline2>=<time2> ...` (if `timepoint` contains temporal data)
/// * `--sequence <timeline1>=<seq1> <timeline2>=<seq2> ...` (if `timepoint` contains sequence data)
#[derive(Debug, Clone)]
pub struct DataLoaderSettings {
/// The recommended [`re_log_types::StoreId`] to log the data to, based on the surrounding context.
pub store_id: re_log_types::StoreId,

/// The [`re_log_types::StoreId`] that is currently opened in the viewer, if any.
///
/// Log data to this recording if you want it to appear in a new recording shared by all
/// data-loaders for the current loading session.
//
// TODO(#5350): actually support this
pub opened_store_id: Option<re_log_types::StoreId>,

/// What should the entity paths be prefixed with?
pub entity_path_prefix: Option<EntityPath>,

/// At what time(s) should the data be logged to?
pub timepoint: Option<TimePoint>,
}

impl DataLoaderSettings {
#[inline]
pub fn recommended(store_id: impl Into<re_log_types::StoreId>) -> Self {
Self {
store_id: store_id.into(),
opened_store_id: Default::default(),
entity_path_prefix: Default::default(),
timepoint: Default::default(),
}
}

/// Generates CLI flags from these settings, for external data loaders.
pub fn to_cli_args(&self) -> Vec<String> {
let Self {
store_id,
opened_store_id,
entity_path_prefix,
timepoint,
} = self;

let mut args = Vec::new();

args.extend(["--recording-id".to_owned(), format!("{store_id}")]);

if let Some(opened_store_id) = opened_store_id {
args.extend([
"--opened-recording-id".to_owned(),
format!("{opened_store_id}"),
]);
}

if let Some(entity_path_prefix) = entity_path_prefix {
args.extend([
"--entity-path-prefix".to_owned(),
format!("{entity_path_prefix}"),
]);
}

if let Some(timepoint) = timepoint {
if timepoint.is_timeless() {
args.push("--timeless".to_owned());
}

for (timeline, time) in timepoint.iter() {
match timeline.typ() {
re_log_types::TimeType::Time => {
args.extend([
"--time".to_owned(),
format!("{}={}", timeline.name(), time.as_i64()),
]);
}
re_log_types::TimeType::Sequence => {
args.extend([
"--sequence".to_owned(),
format!("{}={}", timeline.name(), time.as_i64()),
]);
}
}
}
}

args
}
}

/// A [`DataLoader`] loads data from a file path and/or a file's contents.
///
/// Files can be loaded in 3 different ways:
Expand Down Expand Up @@ -90,7 +186,7 @@ pub trait DataLoader: Send + Sync {
#[cfg(not(target_arch = "wasm32"))]
fn load_from_path(
&self,
store_id: re_log_types::StoreId,
settings: &DataLoaderSettings,
path: std::path::PathBuf,
tx: std::sync::mpsc::Sender<LoadedData>,
) -> Result<(), DataLoaderError>;
Expand Down Expand Up @@ -122,7 +218,7 @@ pub trait DataLoader: Send + Sync {
/// with a [`DataLoaderError::Incompatible`] error.
fn load_from_file_contents(
&self,
store_id: re_log_types::StoreId,
settings: &DataLoaderSettings,
filepath: std::path::PathBuf,
contents: std::borrow::Cow<'_, [u8]>,
tx: std::sync::mpsc::Sender<LoadedData>,
Expand Down
12 changes: 8 additions & 4 deletions crates/re_data_source/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ impl DataSource {
// This `StoreId` will be communicated to all `DataLoader`s, which may or may not
// decide to use it depending on whether they want to share a common recording
// or not.
let store_id = re_log_types::StoreId::random(re_log_types::StoreKind::Recording);
crate::load_from_path(&store_id, file_source, &path, &tx)
let shared_store_id =
re_log_types::StoreId::random(re_log_types::StoreKind::Recording);
let settings = crate::DataLoaderSettings::recommended(shared_store_id);
crate::load_from_path(&settings, file_source, &path, &tx)
.with_context(|| format!("{path:?}"))?;

if let Some(on_msg) = on_msg {
Expand All @@ -156,9 +158,11 @@ impl DataSource {
// This `StoreId` will be communicated to all `DataLoader`s, which may or may not
// decide to use it depending on whether they want to share a common recording
// or not.
let store_id = re_log_types::StoreId::random(re_log_types::StoreKind::Recording);
let shared_store_id =
re_log_types::StoreId::random(re_log_types::StoreKind::Recording);
let settings = crate::DataLoaderSettings::recommended(shared_store_id);
crate::load_from_file_contents(
&store_id,
&settings,
file_source,
&std::path::PathBuf::from(file_contents.name),
std::borrow::Cow::Borrowed(&file_contents.bytes),
Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_source/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod load_stdin;

pub use self::data_loader::{
iter_loaders, register_custom_data_loader, ArchetypeLoader, DataLoader, DataLoaderError,
DirectoryLoader, LoadedData, RrdLoader,
DataLoaderSettings, DirectoryLoader, LoadedData, RrdLoader,
};
pub use self::data_source::DataSource;
pub use self::load_file::{extension, load_from_file_contents};
Expand Down
Loading

0 comments on commit 9e3ffc2

Please sign in to comment.