Skip to content

Commit

Permalink
SDK DataLoaders 6: customizable (external) loaders for Python (#5355)
Browse files Browse the repository at this point in the history
Introduces the new `DataLoaderSettings` business to Python and update
examples accordingly (`external_data_loader` & `log_file`).

```bash
python examples/python/external_data_loader/main.py --recording-id this-one --entity-path-prefix a/b/c  --time sim_time=1000 --time wall_time=1709204046 --sequence sim_frame=42 examples/python/dna/main.py | rerun -
```

![image](https://github.com/rerun-io/rerun/assets/2910679/bfda567d-3d16-42cd-be8e-8b1a0767a784)



Checks:
- [x] external loader ran manually (`python loader | rerun`)
- [x] external loader via rerun (`rerun xxx.py`)
- [x] log_file with external loader (`log_file xxx.py`)

---

Part of series of PR to expose configurable `DataLoader`s to our SDKs:
- #5327 
- #5328 
- #5330
- #5337
- #5351
- #5355
- #5361
  • Loading branch information
teh-cmc authored Mar 1, 2024
1 parent 020eab4 commit 166721d
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 33 deletions.
2 changes: 1 addition & 1 deletion crates/re_data_source/src/data_loader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct DataLoaderSettings {
// TODO(#5350): actually support this
pub opened_store_id: Option<re_log_types::StoreId>,

/// What should the entity paths be prefixed with?
/// What should the logged entity paths be prefixed with?
pub entity_path_prefix: Option<EntityPath>,

/// At what time(s) should the data be logged to?
Expand Down
47 changes: 45 additions & 2 deletions examples/python/external_data_loader/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# It is up to you whether you make use of that shared recording ID or not.
# If you use it, the data will end up in the same recording as all other plugins interested in
# that file, otherwise you can just create a dedicated recording for it. Or both.
#
# Check out `re_data_source::DataLoaderSettings` documentation for an exhaustive listing of
# the available CLI parameters.
parser = argparse.ArgumentParser(
description="""
This is an example executable data-loader plugin for the Rerun Viewer.
Expand All @@ -28,7 +31,23 @@
"""
)
parser.add_argument("filepath", type=str)
parser.add_argument("--recording-id", type=str)
parser.add_argument("--recording-id", type=str, help="optional recommended ID for the recording")
parser.add_argument("--entity-path-prefix", type=str, help="optional prefix for all entity paths")
parser.add_argument(
"--timeless", action="store_true", default=False, help="optionally mark data to be logged as timeless"
)
parser.add_argument(
"--time",
type=str,
action="append",
help="optional timestamps to log at (e.g. `--time sim_time=1709203426`)",
)
parser.add_argument(
"--sequence",
type=str,
action="append",
help="optional sequences to log at (e.g. `--sequence sim_frame=42`)",
)
args = parser.parse_args()


Expand All @@ -44,10 +63,34 @@ def main() -> None:
# The most important part of this: log to standard output so the Rerun Viewer can ingest it!
rr.stdout()

set_time_from_args()

if args.entity_path_prefix:
entity_path = f"{args.entity_path_prefix}/{args.filepath}"
else:
entity_path = args.filepath

with open(args.filepath) as file:
body = file.read()
text = f"""## Some Python code\n```python\n{body}\n```\n"""
rr.log(args.filepath, rr.TextDocument(text, media_type=rr.MediaType.MARKDOWN), timeless=True)
rr.log(entity_path, rr.TextDocument(text, media_type=rr.MediaType.MARKDOWN), timeless=args.timeless)


def set_time_from_args() -> None:
if not args.timeless and args.time is not None:
for time_str in args.time:
parts = time_str.split("=")
if len(parts) != 2:
continue
timeline_name, time = parts
rr.set_time_seconds(timeline_name, float(time))

for time_str in args.time:
parts = time_str.split("=")
if len(parts) != 2:
continue
timeline_name, time = parts
rr.set_time_sequence(timeline_name, int(time))


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions examples/python/log_file/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
for filepath in args.filepaths:
if not args.from_contents:
# Either log the file using its path…
rr.log_file_from_path(filepath)
rr.log_file_from_path(filepath, entity_path_prefix="log_file_example")
else:
# …or using its contents if you already have them loaded for some reason.
try:
with open(filepath, "rb") as file:
rr.log_file_from_contents(filepath, file.read())
rr.log_file_from_contents(filepath, file.read(), entity_path_prefix="log_file_example")
except Exception:
pass

Expand Down
2 changes: 1 addition & 1 deletion examples/rust/external_data_loader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ struct Args {
#[argh(option)]
entity_path_prefix: Option<String>,

/// optional mark data to be logged as timeless
/// optionally mark data to be logged as timeless
#[argh(switch)]
timeless: bool,

Expand Down
43 changes: 41 additions & 2 deletions rerun_py/rerun_sdk/rerun/_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,14 @@ def log_components(
)


# TODO(#3841): expose timepoint settings once we implement stateless APIs
@catch_and_log_exceptions()
def log_file_from_path(
file_path: str | Path,
*,
recording_id: str | None = None,
entity_path_prefix: str | None = None,
timeless: bool | None = None,
recording: RecordingStream | None = None,
) -> None:
r"""
Expand All @@ -304,21 +308,40 @@ def log_file_from_path(
file_path:
Path to the file to be logged.
recording_id:
The recommended `RecordingId` to log the data to.
entity_path_prefix:
What should the logged entity paths be prefixed with?
timeless:
Should the logged data be timeless?
recording:
Specifies the [`rerun.RecordingStream`][] to use. If left unspecified,
defaults to the current active data recording, if there is one. See
also: [`rerun.init`][], [`rerun.set_global_data_recording`][].
"""

bindings.log_file_from_path(Path(file_path), recording=recording)
bindings.log_file_from_path(
Path(file_path),
recording_id=recording_id,
entity_path_prefix=entity_path_prefix,
timeless=timeless,
recording=recording,
)


# TODO(cmc): expose timepoint settings once we implement stateless APIs
@catch_and_log_exceptions()
def log_file_from_contents(
file_path: str | Path,
file_contents: bytes,
*,
recording_id: str | None = None,
entity_path_prefix: str | None = None,
timeless: bool | None = None,
recording: RecordingStream | None = None,
) -> None:
r"""
Expand All @@ -339,14 +362,30 @@ def log_file_from_contents(
file_contents:
Contents to be logged.
recording_id:
The recommended `RecordingId` to log the data to.
entity_path_prefix:
What should the logged entity paths be prefixed with?
timeless:
Should the logged data be timeless?
recording:
Specifies the [`rerun.RecordingStream`][] to use. If left unspecified,
defaults to the current active data recording, if there is one. See
also: [`rerun.init`][], [`rerun.set_global_data_recording`][].
"""

bindings.log_file_from_contents(Path(file_path), file_contents, recording=recording)
bindings.log_file_from_contents(
Path(file_path),
file_contents,
recording_id=recording_id,
entity_path_prefix=entity_path_prefix,
timeless=timeless,
recording=recording,
)


def escape_entity_path_part(part: str) -> str:
Expand Down
87 changes: 62 additions & 25 deletions rerun_py/src/python_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -967,59 +967,96 @@ fn log_arrow_msg(
#[pyfunction]
#[pyo3(signature = (
file_path,
recording_id = None,
entity_path_prefix = None,
timeless = None,
recording=None,
))]
fn log_file_from_path(
py: Python<'_>,
file_path: std::path::PathBuf,
recording_id: Option<String>,
entity_path_prefix: Option<String>,
timeless: Option<bool>,
recording: Option<&PyRecordingStream>,
) -> PyResult<()> {
let Some(recording) = get_data_recording(recording) else {
return Ok(());
};

let Some(recording_id) = recording.store_info().map(|info| info.store_id.clone()) else {
return Ok(());
};
let settings = rerun::DataLoaderSettings::recommended(recording_id);

recording
.log_file_from_path(&settings, file_path)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

py.allow_threads(flush_garbage_queue);

Ok(())
log_file(
py,
file_path,
None,
recording_id,
entity_path_prefix,
timeless,
recording,
)
}

#[pyfunction]
#[pyo3(signature = (
file_path,
file_contents,
recording_id = None,
entity_path_prefix = None,
timeless = None,
recording=None,
))]
fn log_file_from_contents(
py: Python<'_>,
file_path: std::path::PathBuf,
file_contents: &[u8],
recording_id: Option<String>,
entity_path_prefix: Option<String>,
timeless: Option<bool>,
recording: Option<&PyRecordingStream>,
) -> PyResult<()> {
log_file(
py,
file_path,
Some(file_contents),
recording_id,
entity_path_prefix,
timeless,
recording,
)
}

fn log_file(
py: Python<'_>,
file_path: std::path::PathBuf,
file_contents: Option<&[u8]>,
recording_id: Option<String>,
entity_path_prefix: Option<String>,
timeless: Option<bool>,
recording: Option<&PyRecordingStream>,
) -> PyResult<()> {
let Some(recording) = get_data_recording(recording) else {
return Ok(());
};

let Some(recording_id) = recording.store_info().map(|info| info.store_id.clone()) else {
let Some(recording_id) = recording
.store_info()
.map(|info| info.store_id.clone())
.or(recording_id.map(|id| StoreId::from_string(StoreKind::Recording, id)))
else {
return Ok(());
};
let settings = rerun::DataLoaderSettings::recommended(recording_id);

recording
.log_file_from_contents(
&settings,
file_path,
std::borrow::Cow::Borrowed(file_contents),
)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
let settings = rerun::DataLoaderSettings {
store_id: recording_id,
opened_store_id: None,
entity_path_prefix: entity_path_prefix.map(Into::into),
timepoint: timeless.unwrap_or(false).then(TimePoint::timeless),
};

if let Some(contents) = file_contents {
recording
.log_file_from_contents(&settings, file_path, std::borrow::Cow::Borrowed(contents))
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
} else {
recording
.log_file_from_path(&settings, file_path)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
}

py.allow_threads(flush_garbage_queue);

Expand Down

0 comments on commit 166721d

Please sign in to comment.