Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK DataLoaders 1: barebones Rust support #5327

Merged
merged 4 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/re_data_source/src/load_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub fn load_from_path(
if !path.exists() {
return Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"path does not exist: {path:?}",
format!("path does not exist: {path:?}"),
)
.into());
}
Expand Down
2 changes: 2 additions & 0 deletions crates/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ pub enum FileSource {
Cli,
DragAndDrop,
FileDialog,
Sdk,
}

/// The source of a recording or blueprint.
Expand Down Expand Up @@ -358,6 +359,7 @@ impl std::fmt::Display for StoreSource {
FileSource::Cli => write!(f, "File via CLI"),
FileSource::DragAndDrop => write!(f, "File via drag-and-drop"),
FileSource::FileDialog => write!(f, "File via file dialog"),
FileSource::Sdk => write!(f, "File via SDK"),
},
Self::Viewer => write!(f, "Viewer-generated"),
Self::Other(string) => format!("{string:?}").fmt(f), // put it in quotes
Expand Down
7 changes: 7 additions & 0 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ all-features = true
[features]
default = []

## Support for using Rerun's data-loaders directly from the SDK.
##
## See our `log_file` example and <https://www.rerun.io/docs/howto/open-any-file>
## for more information.
data_loaders = ["dep:re_data_source", "dep:re_smart_channel"]

## Support serving a web viewer over HTTP.
##
## Enabling this inflates the binary size quite a bit, since it embeds the viewer wasm.
Expand Down Expand Up @@ -58,6 +64,7 @@ thiserror.workspace = true

# Optional dependencies

re_data_source = { workspace = true, optional = true }
re_smart_channel = { workspace = true, optional = true }
re_ws_comms = { workspace = true, optional = true }
re_web_viewer_server = { workspace = true, optional = true }
Expand Down
145 changes: 143 additions & 2 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::{atomic::AtomicI64, Arc};
use ahash::HashMap;
use crossbeam::channel::{Receiver, Sender};

use parking_lot::Mutex;
use re_log_types::{
ApplicationId, ArrowChunkReleaseCallback, DataCell, DataCellError, DataRow, DataTable,
DataTableBatcher, DataTableBatcherConfig, DataTableBatcherError, EntityPath, LogMsg, RowId,
Expand Down Expand Up @@ -61,7 +62,7 @@ pub enum RecordingStreamError {
#[error("Failed to spawn background thread '{name}': {err}")]
SpawnThread {
/// Name of the thread
name: &'static str,
name: String,

/// Inner error explaining why the thread failed to spawn.
err: std::io::Error,
Expand All @@ -79,6 +80,11 @@ pub enum RecordingStreamError {
/// An error that can occur because a row in the store has inconsistent columns.
#[error(transparent)]
DataReadError(#[from] re_log_types::DataReadError),

/// An error occurred while attempting to use a [`re_data_source::DataLoader`].
#[cfg(feature = "data_loaders")]
#[error(transparent)]
DataLoaderError(#[from] re_data_source::DataLoaderError),
}

/// Results that can occur when creating/manipulating a [`RecordingStream`].
Expand Down Expand Up @@ -623,6 +629,12 @@ struct RecordingStreamInner {
batcher: DataTableBatcher,
batcher_to_sink_handle: Option<std::thread::JoinHandle<()>>,

/// Keeps track of the top-level threads that were spawned in order to execute the `DataLoader`
/// machinery in the context of this `RecordingStream`.
///
/// See [`RecordingStream::log_file_from_path`] and [`RecordingStream::log_file_from_contents`].
dataloader_handles: Mutex<Vec<std::thread::JoinHandle<()>>>,

pid_at_creation: u32,
}

Expand All @@ -633,6 +645,16 @@ impl Drop for RecordingStreamInner {
return;
}

// Run all pending top-level `DataLoader` threads that were started from the SDK to completion.
//
// TODO(cmc): At some point we might want to make it configurable, though I cannot really
// think of a use case where you'd want to drop those threads immediately upon
// disconnection.
let dataloader_handles = std::mem::take(&mut *self.dataloader_handles.lock());
for handle in dataloader_handles {
handle.join().ok();
}

// NOTE: The command channel is private, if we're here, nothing is currently capable of
// sending data down the pipeline.
self.batcher.flush_blocking();
Expand Down Expand Up @@ -679,7 +701,10 @@ impl RecordingStreamInner {
let batcher = batcher.clone();
move || forwarding_thread(info, sink, cmds_rx, batcher.tables(), on_release)
})
.map_err(|err| RecordingStreamError::SpawnThread { name: NAME, err })?
.map_err(|err| RecordingStreamError::SpawnThread {
name: NAME.into(),
err,
})?
};

Ok(RecordingStreamInner {
Expand All @@ -688,6 +713,7 @@ impl RecordingStreamInner {
cmds_tx,
batcher,
batcher_to_sink_handle: Some(batcher_to_sink_handle),
dataloader_handles: Mutex::new(Vec::new()),
pid_at_creation: std::process::id(),
})
}
Expand Down Expand Up @@ -989,6 +1015,103 @@ impl RecordingStream {

Ok(())
}

/// Logs the file at the given `path` using all [`re_data_source::DataLoader`]s available.
///
/// A single `path` might be handled by more than one loader.
///
/// This method blocks until either at least one [`re_data_source::DataLoader`] starts
/// streaming data in or all of them fail.
///
/// See <https://www.rerun.io/docs/howto/open-any-file> for more information.
#[cfg(feature = "data_loaders")]
pub fn log_file_from_path(
&self,
filepath: impl AsRef<std::path::Path>,
) -> RecordingStreamResult<()> {
self.log_file(filepath, None)
}

/// Logs the given `contents` using all [`re_data_source::DataLoader`]s available.
///
/// A single `path` might be handled by more than one loader.
///
/// This method blocks until either at least one [`re_data_source::DataLoader`] starts
/// streaming data in or all of them fail.
///
/// See <https://www.rerun.io/docs/howto/open-any-file> for more information.
#[cfg(feature = "data_loaders")]
pub fn log_file_from_contents(
&self,
filepath: impl AsRef<std::path::Path>,
contents: std::borrow::Cow<'_, [u8]>,
) -> RecordingStreamResult<()> {
self.log_file(filepath, Some(contents))
}

#[cfg(feature = "data_loaders")]
fn log_file(
&self,
filepath: impl AsRef<std::path::Path>,
contents: Option<std::borrow::Cow<'_, [u8]>>,
) -> RecordingStreamResult<()> {
let filepath = filepath.as_ref();
let has_contents = contents.is_some();

let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::Sdk,
re_smart_channel::SmartChannelSource::File(filepath.into()),
);

let Some(store_id) = &self.store_info().map(|info| info.store_id.clone()) else {
// There's no recording.
return Ok(());
};
if let Some(contents) = contents {
re_data_source::load_from_file_contents(
store_id,
re_log_types::FileSource::Sdk,
filepath,
contents,
&tx,
)?;
} else {
re_data_source::load_from_path(store_id, re_log_types::FileSource::Sdk, filepath, &tx)?;
}
drop(tx);

// We can safely ignore the error on `recv()` as we're in complete control of both ends of
// the channel.
let thread_name = if has_contents {
format!("log_file_from_contents({filepath:?})")
} else {
format!("log_file_from_path({filepath:?})")
};
let handle = std::thread::Builder::new()
.name(thread_name.clone())
.spawn({
let this = self.clone();
move || {
while let Some(msg) = rx.recv().ok().and_then(|msg| msg.into_data()) {
this.record_msg(msg);
}
}
})
.map_err(|err| RecordingStreamError::SpawnThread {
name: thread_name,
err,
})?;

debug_assert!(
self.inner.is_some(),
"recording should always be fully init at this stage"
);
if let Some(inner) = self.inner.as_ref() {
inner.dataloader_handles.lock().push(handle);
}

Ok(())
}
}

#[allow(clippy::needless_pass_by_value)]
Expand Down Expand Up @@ -1450,6 +1573,22 @@ impl RecordingStream {
/// terms of data durability and ordering.
/// See [`Self::set_sink`] for more information.
pub fn disconnect(&self) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to disconnect() ignored");
return;
};

// When disconnecting, we need to make sure that pending top-level `DataLoader` threads that
// were started from the SDK run to completion.
//
// TODO(cmc): At some point we might want to make it configurable, though I cannot really
// think of a use case where you'd want to drop those threads immediately upon
// disconnection.
let dataloader_handles = std::mem::take(&mut *this.dataloader_handles.lock());
for handle in dataloader_handles {
handle.join().ok();
}

self.set_sink(Box::new(crate::sink::BufferedSink::new()));
}
}
Expand All @@ -1465,11 +1604,13 @@ impl fmt::Debug for RecordingStream {
cmds_tx: _,
batcher: _,
batcher_to_sink_handle: _,
dataloader_handles,
pid_at_creation,
}) => f
.debug_struct("RecordingStream")
.field("info", &info)
.field("tick", &tick)
.field("pending_dataloaders", &dataloader_handles.lock().len())
.field("pid_at_creation", &pid_at_creation)
.finish_non_exhaustive(),
None => write!(f, "RecordingStream {{ disabled }}"),
Expand Down
1 change: 1 addition & 0 deletions crates/re_viewer/src/viewer_analytics/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub fn open_recording(
re_log_types::FileSource::Cli => "file_cli".to_owned(),
re_log_types::FileSource::DragAndDrop => "file_drag_and_drop".to_owned(),
re_log_types::FileSource::FileDialog => "file_dialog".to_owned(),
re_log_types::FileSource::Sdk => "file_sdk".to_owned(),
},
S::Viewer => "viewer".to_owned(),
S::Other(other) => other.clone(),
Expand Down
Loading
Loading