Skip to content

Commit

Permalink
remove image-msg::frame-id; use timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel committed Apr 30, 2023
1 parent 956a313 commit 8f5f87f
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 75 deletions.
122 changes: 83 additions & 39 deletions src/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "context.h"
#include "fw-update/fw-update-factory.h"
#ifdef BUILD_WITH_DDS
#include <realdds/dds-time.h>
#include <realdds/dds-device-watcher.h>
#include <realdds/dds-participant.h>
#include <realdds/dds-device.h>
Expand Down Expand Up @@ -63,6 +64,14 @@ struct dds_domain_context
//
static std::map< realdds::dds_domain_id, dds_domain_context > dds_domain_context_by_id;

// Constants for Json lookups
static const std::string frame_number_key( "frame-number", 12 );
static const std::string metadata_key( "metadata", 8 );
static const std::string timestamp_key( "timestamp", 9 );
static const std::string timestamp_domain_key( "timestamp-domain", 16 );
static const std::string depth_units_key( "depth-units", 11 );
static const std::string metadata_header_key( "header", 6 );

#endif // BUILD_WITH_DDS

#include <rsutils/json.h>
Expand Down Expand Up @@ -490,8 +499,14 @@ namespace librealsense
typedef realdds::dds_metadata_syncer syncer_type;
static void frame_releaser( syncer_type::frame_type * f ) { static_cast< frame * >( f )->release(); }

struct streaming_impl
{
syncer_type syncer;
std::atomic< unsigned long long > last_frame_number{ 0 };
};

std::map< sid_index, std::shared_ptr< realdds::dds_stream > > _streams;
std::map< std::string, syncer_type > _stream_name_to_syncer;
std::map< std::string, streaming_impl > _streaming_by_name;

public:
dds_sensor_proxy( std::string const & sensor_name,
Expand Down Expand Up @@ -612,14 +627,14 @@ namespace librealsense

void handle_video_data( realdds::topics::image_msg && dds_frame,
const std::shared_ptr< stream_profile_interface > & profile,
syncer_type & syncer )
streaming_impl & streaming )
{
frame_additional_data data; // with NO metadata by default!
data.timestamp; // from metadata
data.timestamp_domain; // from metadata
data.depth_units; // from metadata
if( ! rsutils::string::string_to_value( dds_frame.frame_id, data.frame_number ))
data.frame_number = 0;
data.timestamp // in ms
= static_cast< rs2_time_t >( realdds::time_to_double( dds_frame.timestamp ) * 1e3 );
data.timestamp_domain; // from metadata, or leave default (hardware domain)
data.depth_units; // from metadata
data.frame_number; // filled in only once metadata is known

auto vid_profile = dynamic_cast< video_stream_profile_interface * >( profile.get() );
if( ! vid_profile )
Expand All @@ -638,8 +653,8 @@ namespace librealsense

if( _md_enabled )
{
syncer.enqueue_frame( data.frame_number, // for now, we use this as the synced-to ID
syncer.hold( new_frame ) );
streaming.syncer.enqueue_frame( dds_frame.timestamp.to_ns(),
streaming.syncer.hold( new_frame ) );
}
else
{
Expand All @@ -649,18 +664,49 @@ namespace librealsense
}
}

void add_frame_metadata( frame * const f, json && dds_md )
void add_frame_metadata( frame * const f, json && dds_md, streaming_impl & streaming )
{
json const & md_header = dds_md[std::string( "header", 6 )];
json const & md = dds_md[std::string( "metadata", 8 )];
if( dds_md.empty() )
{
// Without MD, we have no way of knowing the frame-number - we assume it's one higher than
// the last
f->additional_data.last_frame_number = streaming.last_frame_number.fetch_add( 1 );
f->additional_data.frame_number = f->additional_data.last_frame_number + 1;
// the frame should already have empty metadata, so no need to do anything else
return;
}

json const & md_header = dds_md[metadata_header_key];
json const & md = dds_md[metadata_key];

// A frame number is "optional". If the server supplies it, we try to use it for the simple fact that,
// otherwise, we have no way of detecting drops without some advanced heuristic tracking the FPS and
// timestamps. If not supplied, we use an increasing counter.
// Note that if we have no metadata, we have no frame-numbers! So we need a way of generating them
if( rsutils::json::get_ex( md_header, frame_number_key, &f->additional_data.frame_number ) )
{
f->additional_data.last_frame_number = streaming.last_frame_number.exchange( f->additional_data.frame_number );
if( f->additional_data.frame_number != f->additional_data.last_frame_number + 1
&& f->additional_data.last_frame_number )
{
LOG_DEBUG( "frame drop? expecting " << f->additional_data.last_frame_number + 1 << "; got "
<< f->additional_data.frame_number );
}
}
else
{
f->additional_data.last_frame_number = streaming.last_frame_number.fetch_add( 1 );
f->additional_data.frame_number = f->additional_data.last_frame_number + 1;
}

// Always expected metadata
f->additional_data.timestamp = rsutils::json::get< rs2_time_t >( md_header, std::string( "timestamp", 9 ) );
f->additional_data.timestamp_domain
= rsutils::json::get< rs2_timestamp_domain >( md_header, std::string( "timestamp-domain", 16 ) );
// Timestamp is already set in the frame - must be communicated in the metadata, but only for syncing
// purposes, so we ignore here. The domain is optional, and really only rs-dds-server communicates it
// because the source is librealsense...
f->additional_data.timestamp;
rsutils::json::get_ex( md_header, timestamp_domain_key, &f->additional_data.timestamp_domain );

// Expected metadata for all depth images
rsutils::json::get_ex( md_header, std::string( "depth-units", 11 ), &f->additional_data.depth_units );
rsutils::json::get_ex( md_header, depth_units_key, &f->additional_data.depth_units );

// Other metadata fields. Metadata fields that are present but unknown by librealsense will be ignored.
auto & metadata = reinterpret_cast< metadata_array & >( f->additional_data.metadata_blob );
Expand All @@ -682,14 +728,14 @@ namespace librealsense

void handle_motion_data( realdds::topics::image_msg && dds_frame,
const std::shared_ptr< stream_profile_interface > & profile,
syncer_type & syncer )
streaming_impl & streaming )
{
frame_additional_data data; // with NO metadata by default!
data.timestamp; // from metadata
data.timestamp_domain; // from metadata
data.depth_units; // from metadata
if( ! rsutils::string::string_to_value( dds_frame.frame_id, data.frame_number ) )
data.frame_number = 0;
data.timestamp
= static_cast< rs2_time_t >( realdds::time_to_double( dds_frame.timestamp ) * 1e-3 ); // milliseconds
data.timestamp_domain; // from metadata, or leave default (hardware domain)
data.depth_units; // from metadata
data.frame_number; // filled in only once metadata is known

auto new_frame_interface
= allocate_new_frame( RS2_EXTENSION_MOTION_FRAME, profile.get(), std::move( data ) );
Expand All @@ -701,8 +747,8 @@ namespace librealsense

if( _md_enabled )
{
syncer.enqueue_frame( data.frame_number, // for now, we use this as the synced-to ID
syncer.hold( new_frame ));
streaming.syncer.enqueue_frame( dds_frame.timestamp.to_ns(),
streaming.syncer.hold( new_frame ) );
}
else
{
Expand All @@ -717,10 +763,10 @@ namespace librealsense
if( ! _md_enabled )
return;

auto it = _stream_name_to_syncer.find( stream_name );
if( it != _stream_name_to_syncer.end() )
it->second.enqueue_metadata(
std::stoull( rsutils::json::get< std::string >( dds_md["header"], "frame-id" ) ),
auto it = _streaming_by_name.find( stream_name );
if( it != _streaming_by_name.end() )
it->second.syncer.enqueue_metadata(
rsutils::json::get< realdds::dds_nsec >( dds_md[metadata_header_key], timestamp_key ),
std::move( dds_md ) );
else
throw std::runtime_error( "Stream '" + stream_name
Expand All @@ -734,29 +780,27 @@ namespace librealsense
auto & dds_stream = _streams[sid_index( profile->get_unique_id(), profile->get_stream_index() )];
// Opening it will start streaming on the server side automatically
dds_stream->open( "rt/" + _dev->device_info().topic_root + '_' + dds_stream->name(), _dev->subscriber());
auto & syncer = _stream_name_to_syncer[dds_stream->name()];
syncer.on_frame_release( frame_releaser );
syncer.on_frame_ready(
[this]( syncer_type::frame_holder && fh, json && md )
auto & streaming = _streaming_by_name[dds_stream->name()];
streaming.syncer.on_frame_release( frame_releaser );
streaming.syncer.on_frame_ready(
[this, &streaming]( syncer_type::frame_holder && fh, json && md )
{
if( ! md.empty() )
add_frame_metadata( static_cast< frame * >( fh.get() ), std::move( md ) );
// else the frame should already have empty metadata!
add_frame_metadata( static_cast< frame * >( fh.get() ), std::move( md ), streaming );
invoke_new_frame( static_cast< frame * >( fh.release() ), nullptr, nullptr );
} );

if( Is< realdds::dds_video_stream >( dds_stream ) )
{
As< realdds::dds_video_stream >( dds_stream )->on_data_available(
[profile, this, dds_stream]( realdds::topics::image_msg && dds_frame ) {
handle_video_data( std::move( dds_frame ), profile, _stream_name_to_syncer[dds_stream->name()] );
handle_video_data( std::move( dds_frame ), profile, _streaming_by_name[dds_stream->name()] );
} );
}
else if( Is< realdds::dds_motion_stream >( dds_stream ) )
{
As< realdds::dds_motion_stream >( dds_stream )->on_data_available(
[profile, this, dds_stream]( realdds::topics::image_msg && dds_frame ) {
handle_motion_data( std::move( dds_frame ), profile, _stream_name_to_syncer[dds_stream->name()] );
handle_motion_data( std::move( dds_frame ), profile, _streaming_by_name[dds_stream->name()] );
} );
}
else
Expand All @@ -770,7 +814,7 @@ namespace librealsense

void stop()
{
_stream_name_to_syncer.clear();
_streaming_by_name.clear();

for( auto & profile : sensor_base::get_active_streams() )
{
Expand Down
4 changes: 2 additions & 2 deletions third-party/realdds/doc/DDS topics.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ For instance, using `rs2::sensor.open( profile )` will be converted to DDS **ope

## Metadata

Metadata samples are refering to a specific, streaming, stream. They include some mandatory fields in the header, i.e. frame ID and timestamp, and other fields that are relevant to the specific frame and its type, e.g, "exposure" for images.
Metadata samples are refering to a specific, streaming, stream. They include some mandatory fields in the header, i.e. frame number and timestamp, and other fields that are relevant to the specific frame and its type, e.g, "exposure" for images.

"stream-name":"color",
"header":{"frame-id":"1234", "timestamp":"123456789", "timestamp-domain":"0"},
"header":{"frame-number":1234, "timestamp":123456789, "timestamp-domain":0},
"metadata":{"Exposure":"123", "Gain":"456"}


2 changes: 0 additions & 2 deletions third-party/realdds/include/realdds/dds-metadata-syncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ class dds_metadata_syncer
on_frame_ready_callback _on_frame_ready = nullptr;
on_metadata_dropped_callback _on_metadata_dropped = nullptr;

key_type _last_frame_id = 0;

public:
void enqueue_frame( key_type, frame_holder && );
void enqueue_metadata( key_type, metadata_type && );
Expand Down
1 change: 0 additions & 1 deletion third-party/realdds/include/realdds/topics/image-msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class image_msg
eprosima::fastdds::dds::SampleInfo * optional_info = nullptr );

std::vector< uint8_t > raw_data;
std::string frame_id;
int width = -1;
int height = -1;
dds_time timestamp;
Expand Down
9 changes: 6 additions & 3 deletions third-party/realdds/py/pyrealdds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,18 @@ PYBIND11_MODULE(NAME, m) {
using realdds::dds_nsec;
py::class_< dds_time >( m, "time" )
.def( py::init<>() )
.def( py::init< int32_t, uint32_t >() ) // sec, nsec
.def( py::init< long double >() ) // inexact (1.001 -> 1.000999999)
.def( py::init( []( dds_nsec ns ) { return realdds::time_from( ns ); } ) ) // exact
.def_readwrite( "seconds", &dds_time::seconds )
.def_readwrite( "nanosec", &dds_time::nanosec )
.def( "to_ns", &dds_time::to_ns )
.def_static( "from_ns", []( dds_nsec ns ) { return realdds::time_from( ns ); } )
.def_static( "from_double", []( long double d ) { return realdds::dds_time( d ); } )
.def( "to_double", &realdds::time_to_double )
.def( "__repr__", &realdds::time_to_string );
.def( "__repr__", &realdds::time_to_string )
.def( pybind11::self == pybind11::self )
.def( pybind11::self != pybind11::self );


// We need a timestamp function that returns timestamps in the same domain as the sample-info timestamps
Expand Down Expand Up @@ -451,7 +456,6 @@ PYBIND11_MODULE(NAME, m) {
using image_msg = realdds::topics::image_msg;
py::class_< image_msg, std::shared_ptr< image_msg > >( m, "image_msg" )
.def( py::init<>() )
.def_readwrite( "frame_id", &image_msg::frame_id )
.def_readwrite( "data", &image_msg::raw_data )
.def_readwrite( "width", &image_msg::width )
.def_readwrite( "height", &image_msg::height )
Expand Down Expand Up @@ -605,7 +609,6 @@ PYBIND11_MODULE(NAME, m) {
image_msg img_copy;
img_copy.raw_data = img.raw_data;
img_copy.timestamp = img.timestamp;
img_copy.frame_id = img.frame_id;
img_copy.width = img.width;
img_copy.height = img.height;
self.publish_image( std::move( img_copy ) );
Expand Down
3 changes: 0 additions & 3 deletions third-party/realdds/src/dds-metadata-syncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ void dds_metadata_syncer::enqueue_frame( key_type id, frame_holder && frame )
DDS_THROW( runtime_error,
"frame " + std::to_string( id ) + " cannot be enqueued after "
+ std::to_string( _frame_queue.back().first ) );
if( id - _last_frame_id > 1 && _last_frame_id )
LOG_DEBUG( "frame drop: expecting " << _last_frame_id + 1 << "; got " << id );
_last_frame_id = id;
// We must push the new one before releasing the lock, else someone else may push theirs ahead of ours
_frame_queue.push_back( key_frame{ id, std::move( frame ) } );
while( _frame_queue.size() > max_frame_queue_size )
Expand Down
21 changes: 18 additions & 3 deletions third-party/realdds/src/dds-stream-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,23 +217,38 @@ void dds_video_stream_server::publish_image( topics::image_msg && image )

// LOG_DEBUG( "publishing a DDS video frame for topic: " << _writer->topic()->get()->get_name() );
sensor_msgs::msg::Image raw_image;
raw_image.header().frame_id() = image.frame_id;

// "The frame_id in a message specifies the point of reference for data contained in that message."
//
// I.e., the frame_id in a ROS header is the ID of the sensor collecting the data, and only relevant
// in context of a "/tf" topic that describes transformations between "frame" entities in the world.
//
// See here: https://answers.ros.org/question/34684/header-frame_id/
//
// For us, for now, we specify the name of the sensor and the device that owns it (TODO):
//
raw_image.header().frame_id() = sensor_name();

raw_image.header().stamp().sec() = image.timestamp.seconds;
raw_image.header().stamp().nanosec() = image.timestamp.nanosec;

raw_image.encoding() = _image_header.format.to_string();
raw_image.height() = _image_header.height;
raw_image.width() = _image_header.width;
raw_image.step() = uint32_t( image.raw_data.size() / _image_header.height );

raw_image.is_bigendian() = false;

raw_image.data() = std::move( image.raw_data );
LOG_DEBUG( "publishing '" << name() << "' " << raw_image.encoding() << " frame @ " << time_to_string( image.timestamp ) );

LOG_DEBUG( "publishing '" << name() << "' " << raw_image.encoding() << " frame @ " << time_to_string( image.timestamp ) );
DDS_API_CALL( _writer->get()->write( &raw_image ) );
}


void dds_motion_stream_server::publish_motion( topics::image_msg && image )
{
// Same as publish_image() for now
if( ! is_streaming() )
DDS_THROW( runtime_error, "stream '" + name() + "' cannot publish before start_streaming()" );

Expand All @@ -248,7 +263,7 @@ void dds_motion_stream_server::publish_motion( topics::image_msg && image )

// LOG_DEBUG( "publishing a DDS video frame for topic: " << _writer->topic()->get()->get_name() );
sensor_msgs::msg::Image raw_image;
raw_image.header().frame_id() = image.frame_id;
raw_image.header().frame_id() = sensor_name();
raw_image.header().stamp().sec() = image.timestamp.seconds;
raw_image.header().stamp().nanosec() = image.timestamp.nanosec;
raw_image.encoding() = _image_header.format.to_string();
Expand Down
3 changes: 2 additions & 1 deletion third-party/realdds/src/dds-time.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ std::string time_to_string( dds_time const & t )
std::string nsec = std::to_string( t.nanosec );
if( t.nanosec )
{
nsec.insert( 0, 9 - nsec.length(), '0' );
// DDS spec 2.3.2: "the nanosec field must verify 0 <= nanosec < 1000000000"
nsec.insert( 0, 9 - nsec.length(), '0' ); // will throw if more than 9 digits!
while( nsec.length() > 1 && nsec.back() == '0' )
nsec.pop_back();
}
Expand Down
2 changes: 0 additions & 2 deletions third-party/realdds/src/topics/image-msg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ namespace topics {
image_msg::image_msg( sensor_msgs::msg::Image && rhs )
{
raw_data = std::move( rhs.data() );
frame_id = std::move( rhs.header().frame_id() );
width = std::move( rhs.width() );
height = std::move( rhs.height() );
timestamp = dds_time( rhs.header().stamp().sec(), rhs.header().stamp().nanosec() );
Expand All @@ -30,7 +29,6 @@ image_msg::image_msg( sensor_msgs::msg::Image && rhs )
image_msg & image_msg::operator=( sensor_msgs::msg::Image && rhs )
{
raw_data = std::move( rhs.data() );
frame_id = std::move( rhs.header().frame_id() );
width = std::move( rhs.width() );
height = std::move( rhs.height() );
timestamp = dds_time( rhs.header().stamp().sec(), rhs.header().stamp().nanosec() );
Expand Down
Loading

0 comments on commit 8f5f87f

Please sign in to comment.