diff --git a/comms/core/src/pipeline/inbound.rs b/comms/core/src/pipeline/inbound.rs index f77d5f66bb..7c6e89dab4 100644 --- a/comms/core/src/pipeline/inbound.rs +++ b/comms/core/src/pipeline/inbound.rs @@ -20,12 +20,15 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{fmt::Display, time::Instant}; +use std::{ + fmt::Display, + time::{Duration, Instant}, +}; use futures::future::FusedFuture; use log::*; use tari_shutdown::ShutdownSignal; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, time}; use tower::{Service, ServiceExt}; use crate::bounded_executor::BoundedExecutor; @@ -103,8 +106,19 @@ where .spawn(async move { let timer = Instant::now(); trace!(target: LOG_TARGET, "Start inbound pipeline {}", id); - if let Err(err) = service.oneshot(item).await { - warn!(target: LOG_TARGET, "Inbound pipeline returned an error: '{}'", err); + match time::timeout(Duration::from_secs(30), service.oneshot(item)).await { + Ok(Ok(_)) => {}, + Ok(Err(err)) => { + warn!(target: LOG_TARGET, "Inbound pipeline returned an error: '{}'", err); + }, + Err(_) => { + error!( + target: LOG_TARGET, + "Inbound pipeline {} timed out and was aborted. THIS SHOULD NOT HAPPEN: there was a \ + deadlock or excessive delay in processing this pipeline.", + id + ); + }, } trace!( target: LOG_TARGET,