From a49d873589cdbd00ad878648a36b52985b92d673 Mon Sep 17 00:00:00 2001 From: aangerma Date: Wed, 17 Feb 2021 13:02:03 +0200 Subject: [PATCH] 1. fixed l500 matcher hierarchy 2. Added mutex to protect try_dequeue and calling to callback 3. added log parameter to syncer --- src/l500/l500-device.cpp | 4 +- src/proc/syncer-processing-block.cpp | 123 +++++++++++++++++---------- src/proc/syncer-processing-block.h | 14 ++- src/sync.cpp | 31 ++++--- src/sync.h | 9 +- 5 files changed, 116 insertions(+), 65 deletions(-) diff --git a/src/l500/l500-device.cpp b/src/l500/l500-device.cpp index 17005153c7..598882769b 100644 --- a/src/l500/l500-device.cpp +++ b/src/l500/l500-device.cpp @@ -283,7 +283,7 @@ namespace librealsense [=]() { auto z16rot = std::make_shared(RS2_FORMAT_Z16, RS2_STREAM_DEPTH, RS2_EXTENSION_DEPTH_FRAME); auto y8rot = std::make_shared(RS2_FORMAT_Y8, RS2_STREAM_INFRARED, RS2_EXTENSION_VIDEO_FRAME); - auto sync = std::make_shared(); // is_zo_enabled_opt ); + auto sync = std::make_shared(nullptr, false); // is_zo_enabled_opt ); auto cpb = std::make_shared(); cpb->add(z16rot); @@ -310,7 +310,7 @@ namespace librealsense auto z16rot = std::make_shared(RS2_FORMAT_Z16, RS2_STREAM_DEPTH, RS2_EXTENSION_DEPTH_FRAME); auto y8rot = std::make_shared(RS2_FORMAT_Y8, RS2_STREAM_INFRARED, RS2_EXTENSION_VIDEO_FRAME); auto conf = std::make_shared(); - auto sync = std::make_shared(); // is_zo_enabled_opt ); + auto sync = std::make_shared(nullptr, false); // is_zo_enabled_opt ); auto cpb = std::make_shared(); cpb->add(z16rot); diff --git a/src/proc/syncer-processing-block.cpp b/src/proc/syncer-processing-block.cpp index 61ce562b99..52efbf6467 100644 --- a/src/proc/syncer-processing-block.cpp +++ b/src/proc/syncer-processing-block.cpp @@ -10,63 +10,98 @@ namespace librealsense { - syncer_process_unit::syncer_process_unit( std::initializer_list< bool_option::ptr > enable_opts ) - : processing_block("syncer"), _matcher((new timestamp_composite_matcher({}))) - , _enable_opts( enable_opts.begin(), enable_opts.end() ) - { - _matcher->set_callback([this](frame_holder f, syncronization_environment env) +syncer_process_unit::syncer_process_unit( std::initializer_list< bool_option::ptr > enable_opts, + bool log ) + : processing_block( "syncer" ) + , _enable_opts( enable_opts.begin(), enable_opts.end() ) +{ + auto f = [this, log]( frame_holder frame, synthetic_source_interface * source ) { + // if the syncer is disabled passthrough the frame + bool enabled = false; + size_t n_opts = 0; + for( auto & wopt : _enable_opts ) { - std::stringstream ss; - ss << "SYNCED: "; - auto composite = dynamic_cast(f.frame); - for (int i = 0; i < composite->get_embedded_frames_count(); i++) + auto opt = wopt.lock(); + if( opt ) { - auto matched = composite->get_frame(i); - ss << matched->get_stream()->get_stream_type() << " " << matched->get_frame_number() << ", "<get_frame_timestamp()<<" "; + ++n_opts; + if( opt->is_true() ) + { + enabled = true; + break; + } } + } + if( n_opts && ! enabled ) + { + get_source().frame_ready( std::move( frame ) ); + return; + } - LOG_DEBUG(ss.str()); - env.matches.enqueue(std::move(f)); - }); - - auto f = [&](frame_holder frame, synthetic_source_interface* source) { - // if the syncer is disabled passthrough the frame - bool enabled = false; - size_t n_opts = 0; - for( auto& wopt : _enable_opts ) + std::lock_guard< std::mutex > lock( _mutex ); + + if( _matcher == nullptr ) { - auto opt = wopt.lock(); - if( opt ) - { - ++n_opts; - if( opt->is_true() ) - { - enabled = true; - break; - } - } + create_matcher( frame, log ); } - if( n_opts && ! enabled ) + + _matcher->dispatch( std::move( frame ), { source, matches, log } ); + } + + frame_holder f; + { + std::lock_guard< std::mutex > lock(callback_mutex); + + while( matches.try_dequeue( &f ) ) { - get_source().frame_ready( std::move( frame ) ); - return; + get_source().frame_ready( std::move( f ) ); } + } + }; - single_consumer_frame_queue matches; + set_processing_callback( std::shared_ptr< rs2_frame_processor_callback >( + new internal_frame_processor_callback< decltype( f ) >( f ) ) ); +} +void syncer_process_unit::create_matcher( const frame_holder & frame, bool log ) +{ + auto sensor = frame.frame->get_sensor().get(); + const device_interface * dev = nullptr; + try + { + dev = sensor->get_device().shared_from_this().get(); + } + catch( const std::bad_weak_ptr & ) + { + LOG_WARNING( "Device destroyed" ); + } + if( dev ) + { + _matcher = dev->create_matcher( frame ); + } + else + { + _matcher = std::shared_ptr< matcher >( new timestamp_composite_matcher( {} ) ); + } + + _matcher->set_callback( [this, log]( frame_holder f, syncronization_environment env ) { + if( log ) + { + std::stringstream ss; + ss << "SYNCED: "; + auto composite = dynamic_cast< composite_frame * >( f.frame ); + for( int i = 0; i < composite->get_embedded_frames_count(); i++ ) { - std::lock_guard lock(_mutex); - _matcher->dispatch(std::move(frame), { source, matches }); + auto matched = composite->get_frame( i ); + ss << matched->get_stream()->get_stream_type() << " " << matched->get_frame_number() + << ", " << std::fixed << matched->get_frame_timestamp() << " "; } - frame_holder f; - while (matches.try_dequeue(&f)) - get_source().frame_ready(std::move(f)); - - }; + LOG_DEBUG( ss.str() ); + } - set_processing_callback(std::shared_ptr( - new internal_frame_processor_callback(f))); - } + env.matches.enqueue( std::move( f ) ); + } ); } +} // namespace librealsense diff --git a/src/proc/syncer-processing-block.h b/src/proc/syncer-processing-block.h index 458d9b54b8..dff91a7abd 100644 --- a/src/proc/syncer-processing-block.h +++ b/src/proc/syncer-processing-block.h @@ -19,10 +19,10 @@ namespace librealsense class syncer_process_unit : public processing_block { public: - syncer_process_unit( std::initializer_list< bool_option::ptr > enable_opts ); + syncer_process_unit(std::initializer_list< bool_option::ptr > enable_opts, bool log = true); - syncer_process_unit( bool_option::ptr is_enabled_opt = nullptr ) - : syncer_process_unit( { is_enabled_opt } ) {} + syncer_process_unit( bool_option::ptr is_enabled_opt = nullptr, bool log = true) + : syncer_process_unit( { is_enabled_opt }, log) {} void add_enabling_option( bool_option::ptr is_enabled_opt ) { @@ -34,7 +34,13 @@ namespace librealsense _matcher.reset(); } private: - std::unique_ptr _matcher; + void create_matcher(const frame_holder& frame, bool log = true); + + std::shared_ptr _matcher; std::vector< std::weak_ptr > _enable_opts; + + single_consumer_frame_queue matches; + std::mutex callback_mutex; + bool _log; }; } diff --git a/src/sync.cpp b/src/sync.cpp index a1d79974aa..ecd7129361 100644 --- a/src/sync.cpp +++ b/src/sync.cpp @@ -32,6 +32,14 @@ namespace librealsense return s.str(); } + void log_if_enable(std::string str, syncronization_environment env) + { + if (env.log) + { + LOG_DEBUG(str); + } + } + matcher::matcher(std::vector streams_id) : _streams_id(streams_id){} @@ -100,7 +108,7 @@ namespace librealsense { std::stringstream s; s <<_name<<"--> "<< f->get_stream()->get_stream_type() << " " << f->get_frame_number() << ", "<get_frame_timestamp()<<"\n"; - LOG_DEBUG(s.str()); + log_if_enable(s.str(), env); sync(std::move(f), env); } @@ -142,9 +150,10 @@ namespace librealsense void composite_matcher::dispatch(frame_holder f, syncronization_environment env) { + std::stringstream s; - s <<"DISPATCH "<<_name<<"--> "<< frame_to_string(f) <<"\n"; - LOG_DEBUG(s.str()); + s << "DISPATCH " << _name << "--> " << frame_to_string(f) << "\n"; + log_if_enable(s.str(), env); clean_inactive_streams(f); auto matcher = find_matcher(f); @@ -264,7 +273,7 @@ namespace librealsense { std::ostringstream s; s <<"SYNC "<<_name<<"--> "<< frame_to_string(f)<<"\n"; - LOG_DEBUG(s.str()); + log_if_enable(s.str(), env); update_next_expected(f); auto matcher = find_matcher(f); @@ -333,14 +342,14 @@ namespace librealsense { for (auto i : missing_streams) { - if (!skip_missing_stream(synced_frames, i)) + if (!skip_missing_stream(synced_frames, i, env)) { s << _name<<" "<get_streams()) s << stream<<" next expected "<get_streams()) s << stream << " next expected " << std::fixed << _next_expected[i]<<" "; - LOG_DEBUG(s.str()); + log_if_enable(s.str(), env); } } @@ -377,7 +386,7 @@ namespace librealsense if (old_frames) { - LOG_DEBUG(s.str()); + log_if_enable(s.str(), env); } std::sort(match.begin(), match.end(), [](const frame_holder& f1, const frame_holder& f2) @@ -442,7 +451,7 @@ namespace librealsense } } - bool frame_number_composite_matcher::skip_missing_stream(std::vector synced, matcher* missing) + bool frame_number_composite_matcher::skip_missing_stream(std::vector synced, matcher* missing, syncronization_environment env) { frame_holder* synced_frame; @@ -572,7 +581,7 @@ namespace librealsense } } - bool timestamp_composite_matcher::skip_missing_stream(std::vector synced, matcher* missing) + bool timestamp_composite_matcher::skip_missing_stream(std::vector synced, matcher* missing, syncronization_environment env) { if(!missing->get_active()) return true; @@ -595,7 +604,7 @@ namespace librealsense //next expected of the missing stream didn't updated yet if((*synced_frame)->get_frame_timestamp() > next_expected && abs((*synced_frame)->get_frame_timestamp()- next_expected)& matches; + bool log; }; typedef int stream_id; @@ -117,7 +118,7 @@ namespace librealsense virtual bool are_equivalent(frame_holder& a, frame_holder& b) = 0; virtual bool is_smaller_than(frame_holder& a, frame_holder& b) = 0; - virtual bool skip_missing_stream(std::vector synced, matcher* missing) = 0; + virtual bool skip_missing_stream(std::vector synced, matcher* missing, syncronization_environment env) = 0; virtual void clean_inactive_streams(frame_holder& f) = 0; virtual void update_last_arrived(frame_holder& f, matcher* m) = 0; @@ -144,7 +145,7 @@ namespace librealsense void sync(frame_holder f, syncronization_environment env) override; virtual bool are_equivalent(frame_holder& a, frame_holder& b) override { return false; } virtual bool is_smaller_than(frame_holder& a, frame_holder& b) override { return false; } - virtual bool skip_missing_stream(std::vector synced, matcher* missing) override { return false; } + virtual bool skip_missing_stream(std::vector synced, matcher* missing, syncronization_environment env) override { return false; } virtual void clean_inactive_streams(frame_holder& f) override {} virtual void update_last_arrived(frame_holder& f, matcher* m) override {} @@ -159,7 +160,7 @@ namespace librealsense virtual void update_last_arrived(frame_holder& f, matcher* m) override; bool are_equivalent(frame_holder& a, frame_holder& b) override; bool is_smaller_than(frame_holder& a, frame_holder& b) override; - bool skip_missing_stream(std::vector synced, matcher* missing) override; + bool skip_missing_stream(std::vector synced, matcher* missing, syncronization_environment env) override; void clean_inactive_streams(frame_holder& f) override; void update_next_expected(const frame_holder& f) override; @@ -175,7 +176,7 @@ namespace librealsense bool is_smaller_than(frame_holder& a, frame_holder& b) override; virtual void update_last_arrived(frame_holder& f, matcher* m) override; void clean_inactive_streams(frame_holder& f) override; - bool skip_missing_stream(std::vector synced, matcher* missing) override; + bool skip_missing_stream(std::vector synced, matcher* missing, syncronization_environment env) override; void update_next_expected(const frame_holder & f) override; private: