Skip to content

Commit

Permalink
1. fixed l500 matcher hierarchy
Browse files Browse the repository at this point in the history
2.  Added mutex to protect try_dequeue and calling to callback
3. added log parameter to syncer
  • Loading branch information
aangerma committed Feb 17, 2021
1 parent 42c9ffc commit a49d873
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 65 deletions.
4 changes: 2 additions & 2 deletions src/l500/l500-device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ namespace librealsense
[=]() {
auto z16rot = std::make_shared<rotation_transform>(RS2_FORMAT_Z16, RS2_STREAM_DEPTH, RS2_EXTENSION_DEPTH_FRAME);
auto y8rot = std::make_shared<rotation_transform>(RS2_FORMAT_Y8, RS2_STREAM_INFRARED, RS2_EXTENSION_VIDEO_FRAME);
auto sync = std::make_shared<syncer_process_unit>(); // is_zo_enabled_opt );
auto sync = std::make_shared<syncer_process_unit>(nullptr, false); // is_zo_enabled_opt );

auto cpb = std::make_shared<composite_processing_block>();
cpb->add(z16rot);
Expand All @@ -310,7 +310,7 @@ namespace librealsense
auto z16rot = std::make_shared<rotation_transform>(RS2_FORMAT_Z16, RS2_STREAM_DEPTH, RS2_EXTENSION_DEPTH_FRAME);
auto y8rot = std::make_shared<rotation_transform>(RS2_FORMAT_Y8, RS2_STREAM_INFRARED, RS2_EXTENSION_VIDEO_FRAME);
auto conf = std::make_shared<confidence_rotation_transform>();
auto sync = std::make_shared<syncer_process_unit>(); // is_zo_enabled_opt );
auto sync = std::make_shared<syncer_process_unit>(nullptr, false); // is_zo_enabled_opt );

auto cpb = std::make_shared<composite_processing_block>();
cpb->add(z16rot);
Expand Down
123 changes: 79 additions & 44 deletions src/proc/syncer-processing-block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,63 +10,98 @@

namespace librealsense
{
syncer_process_unit::syncer_process_unit( std::initializer_list< bool_option::ptr > enable_opts )
: processing_block("syncer"), _matcher((new timestamp_composite_matcher({})))
, _enable_opts( enable_opts.begin(), enable_opts.end() )
{
_matcher->set_callback([this](frame_holder f, syncronization_environment env)
syncer_process_unit::syncer_process_unit( std::initializer_list< bool_option::ptr > enable_opts,
bool log )
: processing_block( "syncer" )
, _enable_opts( enable_opts.begin(), enable_opts.end() )
{
auto f = [this, log]( frame_holder frame, synthetic_source_interface * source ) {
// if the syncer is disabled passthrough the frame
bool enabled = false;
size_t n_opts = 0;
for( auto & wopt : _enable_opts )
{
std::stringstream ss;
ss << "SYNCED: ";
auto composite = dynamic_cast<composite_frame*>(f.frame);
for (int i = 0; i < composite->get_embedded_frames_count(); i++)
auto opt = wopt.lock();
if( opt )
{
auto matched = composite->get_frame(i);
ss << matched->get_stream()->get_stream_type() << " " << matched->get_frame_number() << ", "<<std::fixed<< matched->get_frame_timestamp()<<" ";
++n_opts;
if( opt->is_true() )
{
enabled = true;
break;
}
}
}
if( n_opts && ! enabled )
{
get_source().frame_ready( std::move( frame ) );
return;
}

LOG_DEBUG(ss.str());
env.matches.enqueue(std::move(f));
});

auto f = [&](frame_holder frame, synthetic_source_interface* source)
{
// if the syncer is disabled passthrough the frame
bool enabled = false;
size_t n_opts = 0;
for( auto& wopt : _enable_opts )
std::lock_guard< std::mutex > lock( _mutex );

if( _matcher == nullptr )
{
auto opt = wopt.lock();
if( opt )
{
++n_opts;
if( opt->is_true() )
{
enabled = true;
break;
}
}
create_matcher( frame, log );
}
if( n_opts && ! enabled )

_matcher->dispatch( std::move( frame ), { source, matches, log } );
}

frame_holder f;
{
std::lock_guard< std::mutex > lock(callback_mutex);

while( matches.try_dequeue( &f ) )
{
get_source().frame_ready( std::move( frame ) );
return;
get_source().frame_ready( std::move( f ) );
}
}
};

single_consumer_frame_queue<frame_holder> matches;
set_processing_callback( std::shared_ptr< rs2_frame_processor_callback >(
new internal_frame_processor_callback< decltype( f ) >( f ) ) );
}

void syncer_process_unit::create_matcher( const frame_holder & frame, bool log )
{
auto sensor = frame.frame->get_sensor().get();
const device_interface * dev = nullptr;
try
{
dev = sensor->get_device().shared_from_this().get();
}
catch( const std::bad_weak_ptr & )
{
LOG_WARNING( "Device destroyed" );
}
if( dev )
{
_matcher = dev->create_matcher( frame );
}
else
{
_matcher = std::shared_ptr< matcher >( new timestamp_composite_matcher( {} ) );
}

_matcher->set_callback( [this, log]( frame_holder f, syncronization_environment env ) {
if( log )
{
std::stringstream ss;
ss << "SYNCED: ";
auto composite = dynamic_cast< composite_frame * >( f.frame );
for( int i = 0; i < composite->get_embedded_frames_count(); i++ )
{
std::lock_guard<std::mutex> lock(_mutex);
_matcher->dispatch(std::move(frame), { source, matches });
auto matched = composite->get_frame( i );
ss << matched->get_stream()->get_stream_type() << " " << matched->get_frame_number()
<< ", " << std::fixed << matched->get_frame_timestamp() << " ";
}

frame_holder f;
while (matches.try_dequeue(&f))
get_source().frame_ready(std::move(f));

};
LOG_DEBUG( ss.str() );
}

set_processing_callback(std::shared_ptr<rs2_frame_processor_callback>(
new internal_frame_processor_callback<decltype(f)>(f)));
}
env.matches.enqueue( std::move( f ) );
} );
}
} // namespace librealsense
14 changes: 10 additions & 4 deletions src/proc/syncer-processing-block.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ namespace librealsense
class syncer_process_unit : public processing_block
{
public:
syncer_process_unit( std::initializer_list< bool_option::ptr > enable_opts );
syncer_process_unit(std::initializer_list< bool_option::ptr > enable_opts, bool log = true);

syncer_process_unit( bool_option::ptr is_enabled_opt = nullptr )
: syncer_process_unit( { is_enabled_opt } ) {}
syncer_process_unit( bool_option::ptr is_enabled_opt = nullptr, bool log = true)
: syncer_process_unit( { is_enabled_opt }, log) {}

void add_enabling_option( bool_option::ptr is_enabled_opt )
{
Expand All @@ -34,7 +34,13 @@ namespace librealsense
_matcher.reset();
}
private:
std::unique_ptr<timestamp_composite_matcher> _matcher;
void create_matcher(const frame_holder& frame, bool log = true);

std::shared_ptr<matcher> _matcher;
std::vector< std::weak_ptr<bool_option> > _enable_opts;

single_consumer_frame_queue<frame_holder> matches;
std::mutex callback_mutex;
bool _log;
};
}
31 changes: 20 additions & 11 deletions src/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ namespace librealsense
return s.str();
}

void log_if_enable(std::string str, syncronization_environment env)
{
if (env.log)
{
LOG_DEBUG(str);
}
}

matcher::matcher(std::vector<stream_id> streams_id)
: _streams_id(streams_id){}

Expand Down Expand Up @@ -100,7 +108,7 @@ namespace librealsense
{
std::stringstream s;
s <<_name<<"--> "<< f->get_stream()->get_stream_type() << " " << f->get_frame_number() << ", "<<std::fixed<< f->get_frame_timestamp()<<"\n";
LOG_DEBUG(s.str());
log_if_enable(s.str(), env);

sync(std::move(f), env);
}
Expand Down Expand Up @@ -142,9 +150,10 @@ namespace librealsense

void composite_matcher::dispatch(frame_holder f, syncronization_environment env)
{

std::stringstream s;
s <<"DISPATCH "<<_name<<"--> "<< frame_to_string(f) <<"\n";
LOG_DEBUG(s.str());
s << "DISPATCH " << _name << "--> " << frame_to_string(f) << "\n";
log_if_enable(s.str(), env);

clean_inactive_streams(f);
auto matcher = find_matcher(f);
Expand Down Expand Up @@ -264,7 +273,7 @@ namespace librealsense
{
std::ostringstream s;
s <<"SYNC "<<_name<<"--> "<< frame_to_string(f)<<"\n";
LOG_DEBUG(s.str());
log_if_enable(s.str(), env);

update_next_expected(f);
auto matcher = find_matcher(f);
Expand Down Expand Up @@ -333,14 +342,14 @@ namespace librealsense
{
for (auto i : missing_streams)
{
if (!skip_missing_stream(synced_frames, i))
if (!skip_missing_stream(synced_frames, i, env))
{
s << _name<<" "<<frames_to_string(synced_frames )<<" Wait for missing stream: ";

for (auto&& stream : i->get_streams())
s << stream<<" next expected "<<std::fixed<< _next_expected[i];
synced_frames.clear();
LOG_DEBUG(s.str());
log_if_enable(s.str(), env);
break;
}
else
Expand All @@ -349,7 +358,7 @@ namespace librealsense
s << _name << " " << frames_to_string(synced_frames) << " Skipped missing stream: ";
for (auto&& stream : i->get_streams())
s << stream << " next expected " << std::fixed << _next_expected[i]<<" ";
LOG_DEBUG(s.str());
log_if_enable(s.str(), env);
}

}
Expand Down Expand Up @@ -377,7 +386,7 @@ namespace librealsense

if (old_frames)
{
LOG_DEBUG(s.str());
log_if_enable(s.str(), env);
}

std::sort(match.begin(), match.end(), [](const frame_holder& f1, const frame_holder& f2)
Expand Down Expand Up @@ -442,7 +451,7 @@ namespace librealsense
}
}

bool frame_number_composite_matcher::skip_missing_stream(std::vector<matcher*> synced, matcher* missing)
bool frame_number_composite_matcher::skip_missing_stream(std::vector<matcher*> synced, matcher* missing, syncronization_environment env)
{
frame_holder* synced_frame;

Expand Down Expand Up @@ -572,7 +581,7 @@ namespace librealsense
}
}

bool timestamp_composite_matcher::skip_missing_stream(std::vector<matcher*> synced, matcher* missing)
bool timestamp_composite_matcher::skip_missing_stream(std::vector<matcher*> synced, matcher* missing, syncronization_environment env)
{
if(!missing->get_active())
return true;
Expand All @@ -595,7 +604,7 @@ namespace librealsense
//next expected of the missing stream didn't updated yet
if((*synced_frame)->get_frame_timestamp() > next_expected && abs((*synced_frame)->get_frame_timestamp()- next_expected)<gap*10)
{
LOG_DEBUG("next expected of the missing stream didn't updated yet");
log_if_enable("next expected of the missing stream didn't updated yet", env);
return false;
}

Expand Down
9 changes: 5 additions & 4 deletions src/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ namespace librealsense
synthetic_source_interface* source;
//sync_lock& lock_ref;
single_consumer_frame_queue<frame_holder>& matches;
bool log;
};

typedef int stream_id;
Expand Down Expand Up @@ -117,7 +118,7 @@ namespace librealsense

virtual bool are_equivalent(frame_holder& a, frame_holder& b) = 0;
virtual bool is_smaller_than(frame_holder& a, frame_holder& b) = 0;
virtual bool skip_missing_stream(std::vector<matcher*> synced, matcher* missing) = 0;
virtual bool skip_missing_stream(std::vector<matcher*> synced, matcher* missing, syncronization_environment env) = 0;
virtual void clean_inactive_streams(frame_holder& f) = 0;
virtual void update_last_arrived(frame_holder& f, matcher* m) = 0;

Expand All @@ -144,7 +145,7 @@ namespace librealsense
void sync(frame_holder f, syncronization_environment env) override;
virtual bool are_equivalent(frame_holder& a, frame_holder& b) override { return false; }
virtual bool is_smaller_than(frame_holder& a, frame_holder& b) override { return false; }
virtual bool skip_missing_stream(std::vector<matcher*> synced, matcher* missing) override { return false; }
virtual bool skip_missing_stream(std::vector<matcher*> synced, matcher* missing, syncronization_environment env) override { return false; }
virtual void clean_inactive_streams(frame_holder& f) override {}
virtual void update_last_arrived(frame_holder& f, matcher* m) override {}

Expand All @@ -159,7 +160,7 @@ namespace librealsense
virtual void update_last_arrived(frame_holder& f, matcher* m) override;
bool are_equivalent(frame_holder& a, frame_holder& b) override;
bool is_smaller_than(frame_holder& a, frame_holder& b) override;
bool skip_missing_stream(std::vector<matcher*> synced, matcher* missing) override;
bool skip_missing_stream(std::vector<matcher*> synced, matcher* missing, syncronization_environment env) override;
void clean_inactive_streams(frame_holder& f) override;
void update_next_expected(const frame_holder& f) override;

Expand All @@ -175,7 +176,7 @@ namespace librealsense
bool is_smaller_than(frame_holder& a, frame_holder& b) override;
virtual void update_last_arrived(frame_holder& f, matcher* m) override;
void clean_inactive_streams(frame_holder& f) override;
bool skip_missing_stream(std::vector<matcher*> synced, matcher* missing) override;
bool skip_missing_stream(std::vector<matcher*> synced, matcher* missing, syncronization_environment env) override;
void update_next_expected(const frame_holder & f) override;

private:
Expand Down

0 comments on commit a49d873

Please sign in to comment.