Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Further improve C++ logging for many individual log calls by introducing a component type registry #4296

Merged
merged 9 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"type": "cppdbg",
"request": "launch",
"cwd": "${workspaceFolder}",
"program": "${workspaceFolder}/build/examples/cpp/minimal/example_minimal",
"program": "${workspaceFolder}/build/debug/examples/cpp/minimal/example_minimal",
"args": [],
"stopAtEntry": false,
"environment": [],
Expand Down
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
],
"C_Cpp.default.configurationProvider": "ms-vscode.cmake-tools", // Use cmake-tools to grab configs.
"C_Cpp.autoAddFileAssociations": false,
"cmake.buildDirectory": "${workspaceRoot}/build/",
"cmake.buildDirectory": "${workspaceRoot}/build/debug",
"cmake.generator": "Ninja", // Use Ninja, just like we do in our just/pixi command.
"rust-analyzer.showUnlinkedFileNotification": false,
"ruff.format.args": [
Expand Down
11 changes: 8 additions & 3 deletions crates/re_types_builder/src/codegen/cpp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1266,10 +1266,10 @@ fn component_to_data_cell_method(
#NEWLINE_TOKEN
#todo_pool
arrow::MemoryPool* pool = arrow::default_memory_pool();
auto datatype = arrow_datatype();
#NEWLINE_TOKEN
#NEWLINE_TOKEN

ARROW_ASSIGN_OR_RAISE(auto builder, arrow::MakeBuilder(arrow_datatype(), pool))
ARROW_ASSIGN_OR_RAISE(auto builder, arrow::MakeBuilder(datatype, pool))
if (instances && num_instances > 0) {
RR_RETURN_NOT_OK(#type_ident::fill_arrow_array_builder(
static_cast<arrow::#arrow_builder_type*>(builder.get()),
Expand All @@ -1281,10 +1281,15 @@ fn component_to_data_cell_method(
ARROW_RETURN_NOT_OK(builder->Finish(&array));
#NEWLINE_TOKEN
#NEWLINE_TOKEN
// Lazily register component type.
static const Result<ComponentTypeHandle> component_type = ComponentType(NAME, datatype).register_component();
RR_RETURN_NOT_OK(component_type.error);
#NEWLINE_TOKEN
#NEWLINE_TOKEN
DataCell cell;
cell.num_instances = num_instances;
cell.component_name = #type_ident::NAME;
cell.array = std::move(array);
cell.component_type = component_type.value;
return cell;
},
inline: false,
Expand Down
36 changes: 36 additions & 0 deletions crates/rerun_c/src/component_type_registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use re_sdk::ComponentName;

use crate::CComponentTypeHandle;

pub struct ComponentType {
pub name: ComponentName,
pub datatype: arrow2::datatypes::DataType,
}

#[derive(Default)]
pub struct ComponentTypeRegistry {
next_id: CComponentTypeHandle,
types: Vec<ComponentType>,
}

impl ComponentTypeRegistry {
pub fn register(
&mut self,
name: ComponentName,
Wumpf marked this conversation as resolved.
Show resolved Hide resolved
datatype: arrow2::datatypes::DataType,
) -> CComponentTypeHandle {
let id = self.next_id;
self.next_id += 1;
self.types.push(ComponentType { name, datatype });
id
}

pub fn get(&self, id: CComponentTypeHandle) -> Option<&ComponentType> {
self.types.get(id as usize)
}
}

/// All registered component types.
pub static COMPONENT_TYPES: Lazy<RwLock<ComponentTypeRegistry>> = Lazy::new(RwLock::default);
193 changes: 96 additions & 97 deletions crates/rerun_c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@
#![crate_type = "staticlib"]
#![allow(clippy::missing_safety_doc, clippy::undocumented_unsafe_blocks)] // Too much unsafe

mod component_type_registry;
mod error;
mod ptr;
mod recording_streams;

use std::ffi::{c_char, CString};

use component_type_registry::COMPONENT_TYPES;
use once_cell::sync::Lazy;
use parking_lot::Mutex;

use re_sdk::{
external::re_log_types::{self},
log::{DataCell, DataRow},
ComponentName, EntityPath, RecordingStream, RecordingStreamBuilder, StoreKind, TimePoint,
};
use recording_streams::{recording_stream, RECORDING_STREAMS};

// ----------------------------------------------------------------------------
// Types:
Expand All @@ -41,7 +44,13 @@ impl CStringView {
}
}

type CRecordingStream = u32;
pub type CRecordingStream = u32;

pub type CComponentTypeHandle = u32;

pub const RR_REC_STREAM_CURRENT_RECORDING: CRecordingStream = 0xFFFFFFFF;
pub const RR_REC_STREAM_CURRENT_BLUEPRINT: CRecordingStream = 0xFFFFFFFE;
pub const RR_COMPONENT_TYPE_HANDLE_INVALID: CComponentTypeHandle = 0xFFFFFFFF;

/// C version of [`re_sdk::SpawnOptions`].
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -109,13 +118,15 @@ pub struct CStoreInfo {
}

#[repr(C)]
pub struct CDataCell {
pub component_name: CStringView,
pub struct CComponentType {
pub name: CStringView,
pub schema: arrow2::ffi::ArrowSchema,
}

#[repr(C)]
pub struct CDataCell {
pub component_type: CComponentTypeHandle,
pub array: arrow2::ffi::ArrowArray,

/// TODO(andreas): Use a schema registry.
pub schema: arrow2::ffi::ArrowSchema,
}

#[repr(C)]
Expand All @@ -136,6 +147,7 @@ pub enum CErrorCode {
InvalidStringArgument,
InvalidRecordingStreamHandle,
InvalidSocketAddress,
InvalidComponentTypeHandle,

_CategoryRecordingStream = 0x0000_00100,
RecordingStreamCreationFailure,
Expand All @@ -158,56 +170,6 @@ pub struct CError {
pub message: [c_char; Self::MAX_MESSAGE_SIZE_BYTES],
}

// ----------------------------------------------------------------------------
// Global data:

const RERUN_REC_STREAM_CURRENT_RECORDING: CRecordingStream = 0xFFFFFFFF;
const RERUN_REC_STREAM_CURRENT_BLUEPRINT: CRecordingStream = 0xFFFFFFFE;

#[derive(Default)]
pub struct RecStreams {
next_id: CRecordingStream,
streams: ahash::HashMap<CRecordingStream, RecordingStream>,
}

impl RecStreams {
fn insert(&mut self, stream: RecordingStream) -> CRecordingStream {
let id = self.next_id;
self.next_id += 1;
self.streams.insert(id, stream);
id
}

fn get(&self, id: CRecordingStream) -> Option<RecordingStream> {
match id {
RERUN_REC_STREAM_CURRENT_RECORDING => RecordingStream::get(StoreKind::Recording, None)
.or(Some(RecordingStream::disabled())),
RERUN_REC_STREAM_CURRENT_BLUEPRINT => RecordingStream::get(StoreKind::Blueprint, None)
.or(Some(RecordingStream::disabled())),
_ => self.streams.get(&id).cloned(),
}
}

fn remove(&mut self, id: CRecordingStream) -> Option<RecordingStream> {
match id {
RERUN_REC_STREAM_CURRENT_BLUEPRINT | RERUN_REC_STREAM_CURRENT_RECORDING => None,
_ => self.streams.remove(&id),
}
}
}

/// All recording streams created from C.
static RECORDING_STREAMS: Lazy<Mutex<RecStreams>> = Lazy::new(Mutex::default);

/// Access a C created recording stream.
#[allow(clippy::result_large_err)]
fn recording_stream(stream: CRecordingStream) -> Result<RecordingStream, CError> {
RECORDING_STREAMS
.lock()
.get(stream)
.ok_or(CError::invalid_recording_stream_handle())
}

// ----------------------------------------------------------------------------
// Public functions:

Expand Down Expand Up @@ -245,6 +207,42 @@ pub extern "C" fn rr_spawn(spawn_opts: *const CSpawnOptions, error: *mut CError)
}
}

#[allow(clippy::result_large_err)]
#[allow(unsafe_code)]
fn rr_register_component_type_impl(
component_type: &CComponentType,
) -> Result<CComponentTypeHandle, CError> {
let component_name = component_type.name.as_str("component_type.name")?;
let component_name = ComponentName::from(component_name);
let schema =
unsafe { arrow2::ffi::import_field_from_c(&component_type.schema) }.map_err(|err| {
CError::new(
CErrorCode::ArrowFfiSchemaImportError,
&format!("Failed to import ffi schema: {err}"),
)
})?;

Ok(COMPONENT_TYPES
.write()
.register(component_name, schema.data_type))
}

#[allow(unsafe_code)]
#[no_mangle]
pub extern "C" fn rr_register_component_type(
// Note that since this is passed by value, Arrow2 will release the schema on drop!
component_type: CComponentType,
error: *mut CError,
) -> u32 {
match rr_register_component_type_impl(&component_type) {
Ok(id) => id,
Err(err) => {
err.write_error(error);
RR_COMPONENT_TYPE_HANDLE_INVALID
}
}
}

#[allow(clippy::result_large_err)]
fn rr_recording_stream_new_impl(
store_info: *const CStoreInfo,
Expand Down Expand Up @@ -587,50 +585,51 @@ fn rr_log_impl(

let data_cells = unsafe { std::slice::from_raw_parts_mut(data_cells, num_data_cells) };

for data_cell in data_cells {
// Arrow2 implements drop for ArrowArray and ArrowSchema.
//
// Therefore, for things to work correctly we have to take ownership of the data cell!
// The C interface is documented to take ownership of the data cell - the user should NOT call `release`.
// This makes sense because from here on out we want to manage the lifetime of the underlying schema and array data:
// the schema won't survive a loop iteration since it's reference passed for import, whereas the ArrowArray lives
// on a longer within the resulting arrow::Array.
let CDataCell {
component_name,
array,
schema,
} = unsafe { std::ptr::read(data_cell) };

// It would be nice to now mark the data_cell as "consumed" by setting the original release method to nullptr.
// This would signifies to the calling code that the data_cell is no longer owned.
// However, Arrow2 doesn't allow us to access the fields of the ArrowArray and ArrowSchema structs.

let component_name = component_name.as_str("data_cells[i].component_name")?;
let component_name = ComponentName::from(component_name);

let field = unsafe { arrow2::ffi::import_field_from_c(&schema) }.map_err(|err| {
CError::new(
CErrorCode::ArrowFfiSchemaImportError,
&format!("Failed to import ffi schema: {err}"),
)
})?;

let values =
unsafe { arrow2::ffi::import_array_from_c(array, field.data_type) }.map_err(|err| {
{
let component_type_registry = COMPONENT_TYPES.read();

for data_cell in data_cells {
// Arrow2 implements drop for ArrowArray and ArrowSchema.
//
// Therefore, for things to work correctly we have to take ownership of the data cell!
// The C interface is documented to take ownership of the data cell - the user should NOT call `release`.
// This makes sense because from here on out we want to manage the lifetime of the underlying schema and array data:
// the schema won't survive a loop iteration since it's reference passed for import, whereas the ArrowArray lives
// on a longer within the resulting arrow::Array.
let CDataCell {
component_type,
array,
} = unsafe { std::ptr::read(data_cell) };

// It would be nice to now mark the data_cell as "consumed" by setting the original release method to nullptr.
// This would signifies to the calling code that the data_cell is no longer owned.
// However, Arrow2 doesn't allow us to access the fields of the ArrowArray and ArrowSchema structs.

let component_type = component_type_registry.get(component_type).ok_or_else(|| {
CError::new(
CErrorCode::ArrowFfiArrayImportError,
&format!("Failed to import ffi array: {err}"),
CErrorCode::InvalidComponentTypeHandle,
&format!("Invalid component type handle: {component_type}"),
)
})?;

cells.push(
DataCell::try_from_arrow(component_name, values).map_err(|err| {
CError::new(
CErrorCode::ArrowDataCellError,
&format!("Failed to create arrow datacell: {err}"),
)
})?,
);
let values =
unsafe { arrow2::ffi::import_array_from_c(array, component_type.datatype.clone()) }
.map_err(|err| {
CError::new(
CErrorCode::ArrowFfiArrayImportError,
&format!("Failed to import ffi array: {err}"),
)
})?;

cells.push(
DataCell::try_from_arrow(component_type.name, values).map_err(|err| {
CError::new(
CErrorCode::ArrowDataCellError,
&format!("Failed to create arrow datacell: {err}"),
)
})?,
);
}
}

let data_row = DataRow::from_cells(
Expand Down
51 changes: 51 additions & 0 deletions crates/rerun_c/src/recording_streams.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use re_sdk::{RecordingStream, StoreKind};

use crate::{
CError, CRecordingStream, RR_REC_STREAM_CURRENT_BLUEPRINT, RR_REC_STREAM_CURRENT_RECORDING,
};

#[derive(Default)]
pub struct RecStreams {
next_id: CRecordingStream,
streams: ahash::HashMap<CRecordingStream, RecordingStream>,
}

impl RecStreams {
pub fn insert(&mut self, stream: RecordingStream) -> CRecordingStream {
let id = self.next_id;
self.next_id += 1;
self.streams.insert(id, stream);
id
}

pub fn get(&self, id: CRecordingStream) -> Option<RecordingStream> {
match id {
RR_REC_STREAM_CURRENT_RECORDING => RecordingStream::get(StoreKind::Recording, None)
.or(Some(RecordingStream::disabled())),
RR_REC_STREAM_CURRENT_BLUEPRINT => RecordingStream::get(StoreKind::Blueprint, None)
.or(Some(RecordingStream::disabled())),
_ => self.streams.get(&id).cloned(),
}
}

pub fn remove(&mut self, id: CRecordingStream) -> Option<RecordingStream> {
match id {
RR_REC_STREAM_CURRENT_BLUEPRINT | RR_REC_STREAM_CURRENT_RECORDING => None,
_ => self.streams.remove(&id),
}
}
}

/// All recording streams created from C.
pub static RECORDING_STREAMS: Lazy<Mutex<RecStreams>> = Lazy::new(Mutex::default);

/// Access a C created recording stream.
#[allow(clippy::result_large_err)]
pub fn recording_stream(stream: CRecordingStream) -> Result<RecordingStream, CError> {
RECORDING_STREAMS
.lock()
.get(stream)
.ok_or(CError::invalid_recording_stream_handle())
}
Loading
Loading