Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed deadlock on pipeline.stop() with playback device in non realtime #8625

Merged
merged 6 commits into from
Mar 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ class dispatcher
int timeout_ms = 5000;
while (_is_alive)
{
{
std::unique_lock<std::mutex> lock(_was_flushed_mutex);
_was_flushed = false;
}

std::function<void(cancellable_timer)> item;

if (_queue.dequeue(&item, timeout_ms))
Expand Down Expand Up @@ -293,6 +298,7 @@ class dispatcher

void stop()
{
_is_alive = false;
{
std::unique_lock<std::mutex> lock(_was_stopped_mutex);

Expand All @@ -304,11 +310,6 @@ class dispatcher

_queue.clear();

{
std::unique_lock<std::mutex> lock(_was_flushed_mutex);
_was_flushed = false;
}

std::unique_lock<std::mutex> lock_was_flushed(_was_flushed_mutex);
_was_flushed_cv.wait_for(lock_was_flushed, std::chrono::hours(999999), [&]() { return _was_flushed.load(); });

Expand All @@ -319,7 +320,6 @@ class dispatcher
{
stop();
_queue.clear();
_is_alive = false;

if (_thread.joinable())
_thread.join();
Expand Down
1 change: 1 addition & 0 deletions src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ namespace librealsense
{
try
{
_syncer->stop();
_aggregator->stop();
auto dev = _active_profile->get_device();
if (auto playback = As<librealsense::playback_device>(dev))
Expand Down
7 changes: 7 additions & 0 deletions src/proc/syncer-processing-block.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ namespace librealsense
_enable_opts.push_back( is_enabled_opt );
}

// Stopping the syncer means no more frames will be enqueued, and any existing frames
// pending dispatch will be lost!
void stop()
maloel marked this conversation as resolved.
Show resolved Hide resolved
{
_matcher->stop();
}

~syncer_process_unit()
{
_matcher.reset();
Expand Down
7 changes: 7 additions & 0 deletions src/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,13 @@ namespace librealsense
return matcher;
}

void composite_matcher::stop()
{
for (auto& fq : _frames_queue)
{
fq.second.clear();
maloel marked this conversation as resolved.
Show resolved Hide resolved
}
}

std::string composite_matcher::frames_to_string(std::vector<librealsense::matcher*> matchers)
{
Expand Down
4 changes: 4 additions & 0 deletions src/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ namespace librealsense
virtual const std::vector<stream_id>& get_streams() const = 0;
virtual const std::vector<rs2_stream>& get_streams_types() const = 0;
virtual std::string get_name() const = 0;
virtual void stop() = 0;
};

class matcher: public matcher_interface
Expand All @@ -99,6 +100,7 @@ namespace librealsense
virtual std::string get_name() const override;
bool get_active() const;
void set_active(const bool active);
virtual void stop() override {}

protected:
std::vector<stream_id> _streams_id;
Expand Down Expand Up @@ -134,6 +136,8 @@ namespace librealsense
std::string frames_to_string(std::vector<librealsense::matcher*> matchers);
void sync(frame_holder f, const syncronization_environment& env) override;
std::shared_ptr<matcher> find_matcher(const frame_holder& f);
virtual void stop() override;


protected:
virtual void update_next_expected(const frame_holder& f) = 0;
Expand Down
1 change: 1 addition & 0 deletions unit-tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ list(APPEND PP_Rosbag_Recordings_List
[pointcloud]_all_combinations_depth_color.bag
single_depth_color_640x480.bag
D435i_Depth_and_IMU.bag
recording_deadlock.bag
)
set(PP_Rosbag_Recordings_URL https://librealsense.intel.com/rs-tests/Rosbag_unit_test_records/)
foreach(i ${PP_Rosbag_Recordings_List})
Expand Down
33 changes: 33 additions & 0 deletions unit-tests/func/rec-play/test-non-realtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# License: Apache 2.0. See LICENSE file in root directory.
# Copyright(c) 2021 Intel Corporation. All Rights Reserved.

import os
import pyrealsense2 as rs2
from rspy import test
import tempfile

# This test checks that stop of pipeline with playback file
# and non realtime mode is not stuck due to deadlock of
# pipeline stop thread and syncer blocking enqueue thread (DSO-15157)
#############################################################################################
test.start("Playback with non realtime isn't stuck at stop")

filename = tempfile.gettempdir() + os.sep + 'recording_deadlock.bag'

pipeline = rs2.pipeline()
config = rs2.config()
config.enable_all_streams()
config.enable_device_from_file(filename, repeat_playback=False)
profile = pipeline.start(config)
device = profile.get_device().as_playback().set_real_time(False)
success = True
while success:
success, _ = pipeline.try_wait_for_frames(1000)
print("stopping...")
pipeline.stop()
print("stopped")

test.finish()
#############################################################################################

test.print_results_and_exit()