Skip to content

Commit

Permalink
review: add timeout to inbound pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Sep 12, 2022
1 parent ce3bd85 commit 2153f74
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions comms/core/src/pipeline/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{fmt::Display, time::Instant};
use std::{
fmt::Display,
time::{Duration, Instant},
};

use futures::future::FusedFuture;
use log::*;
use tari_shutdown::ShutdownSignal;
use tokio::sync::mpsc;
use tokio::{sync::mpsc, time};
use tower::{Service, ServiceExt};

use crate::bounded_executor::BoundedExecutor;
Expand Down Expand Up @@ -103,8 +106,19 @@ where
.spawn(async move {
let timer = Instant::now();
trace!(target: LOG_TARGET, "Start inbound pipeline {}", id);
if let Err(err) = service.oneshot(item).await {
warn!(target: LOG_TARGET, "Inbound pipeline returned an error: '{}'", err);
match time::timeout(Duration::from_secs(30), service.oneshot(item)).await {
Ok(Ok(_)) => {},
Ok(Err(err)) => {
warn!(target: LOG_TARGET, "Inbound pipeline returned an error: '{}'", err);
},
Err(_) => {
error!(
target: LOG_TARGET,
"Inbound pipeline {} timed out and was aborted. THIS SHOULD NOT HAPPEN: there was a \
deadlock or excessive delay in processing this pipeline.",
id
);
},
}
trace!(
target: LOG_TARGET,
Expand Down

0 comments on commit 2153f74

Please sign in to comment.