diff --git a/src/util.rs b/src/util.rs index a397998d..7a18fa83 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,7 +1,13 @@ + +use std; use std::collections::HashMap; +use std::sync::mpsc::{self, Receiver}; +use std::thread; use time; +use errors::*; + fn escape_for_slack(str: &str) -> String { str.replace("&", "&").replace("<", "<").replace(">", ">") } @@ -88,6 +94,33 @@ pub fn parse_query(query_params: Option<&str>) -> HashMap { .collect::>() } +// cf. https://github.com/rust-lang/rust/issues/39364 +pub fn recv_timeout(rx: &Receiver, timeout: std::time::Duration) -> Result { + let sleep_time = std::time::Duration::from_millis(50); + let mut time_left = timeout; + loop { + match rx.try_recv() { + Ok(r) => { + return Ok(r); + } + Err(mpsc::TryRecvError::Empty) => { + match time_left.checked_sub(sleep_time) { + Some(sub) => { + time_left = sub; + thread::sleep(sleep_time); + } + None => { + return Err("Timed out waiting".into()); + } + } + } + Err(mpsc::TryRecvError::Disconnected) => { + return Err("Channel disconnected!".into()); + } + }; + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/tests/github_handler_test.rs b/tests/github_handler_test.rs index 57ce2b24..ae1548e8 100644 --- a/tests/github_handler_test.rs +++ b/tests/github_handler_test.rs @@ -25,6 +25,7 @@ use octobot::repo_version::RepoVersionRequest; use octobot::repos; use octobot::server::github_handler::GithubEventHandler; use octobot::slack::{self, SlackAttachmentBuilder}; +use octobot::util; use octobot::worker::{WorkMessage, WorkSender}; use mocks::mock_github::MockGithub; @@ -56,7 +57,9 @@ impl GithubHandlerTest { let rx = self.pr_merge_rx.take().unwrap(); thread::spawn(move || { for branch in branches { - let msg = rx.recv_timeout(timeout).expect(&format!("expected to recv msg for branch: {}", branch)); + let msg = util::recv_timeout(&rx, timeout).expect( + &format!("expected to recv msg for branch: {}", branch), + ); match msg { WorkMessage::WorkItem(req) => { assert_eq!(branch, req.target_branch); @@ -67,7 +70,61 @@ impl GithubHandlerTest { }; } - let last_message = rx.recv_timeout(timeout); + let last_message = util::recv_timeout(&rx, timeout); + assert!(last_message.is_err()); + }) + } + + fn expect_will_force_push_notify(&mut self, before_hash: &'static str, after_hash: &'static str) -> JoinHandle<()> { + let timeout = Duration::from_millis(300); + let rx = self.force_push_rx.take().expect("force-push message"); + thread::spawn(move || { + let msg = util::recv_timeout(&rx, timeout).expect(&format!( + "expected to recv force-push for {} -> {}", + before_hash, + after_hash + )); + match msg { + WorkMessage::WorkItem(req) => { + assert_eq!(before_hash, req.before_hash); + assert_eq!(after_hash, req.after_hash); + } + _ => { + panic!("Unexpected messages: {:?}", msg); + } + }; + + let last_message = util::recv_timeout(&rx, timeout); + assert!(last_message.is_err()); + }) + } + + fn expect_will_not_force_push_notify(&mut self) -> JoinHandle<()> { + let timeout = Duration::from_millis(300); + let rx = self.force_push_rx.take().expect("force-push message"); + thread::spawn(move || { + let last_message = util::recv_timeout(&rx, timeout); + assert!(last_message.is_err()); + }) + } + + fn expect_will_run_version_script(&mut self, branch: &'static str, commit_hash: &'static str) -> JoinHandle<()> { + // Note: no expectations are set on mock_jira since we have stubbed out the background worker thread + let timeout = Duration::from_millis(300); + let rx = self.repo_version_rx.take().unwrap(); + thread::spawn(move || { + let msg = util::recv_timeout(&rx, timeout).expect(&format!("expected to recv version script msg")); + match msg { + WorkMessage::WorkItem(req) => { + assert_eq!(branch, req.branch); + assert_eq!(commit_hash, req.commit_hash); + } + _ => { + panic!("Unexpected messages: {:?}", msg); + } + }; + + let last_message = util::recv_timeout(&rx, timeout); assert!(last_message.is_err()); }) } @@ -1239,26 +1296,7 @@ fn test_push_force_notify() { ]); // Setup background thread to validate force-push msg - let expect_thread; - { - let timeout = Duration::from_millis(300); - let rx = test.force_push_rx.take().expect("force-push message"); - expect_thread = thread::spawn(move || { - let msg = rx.recv_timeout(timeout).expect(&format!("expected to recv msg")); - match msg { - WorkMessage::WorkItem(req) => { - assert_eq!("abcdef0000", req.before_hash); - assert_eq!("1111abcdef", req.after_hash); - } - _ => { - panic!("Unexpected messages: {:?}", msg); - } - }; - - let last_message = rx.recv_timeout(timeout); - assert!(last_message.is_err()); - }); - } + let expect_thread = test.expect_will_force_push_notify("abcdef0000", "1111abcdef"); let resp = test.handler.handle_event().expect("handled event"); assert_eq!((StatusCode::Ok, "push".into()), resp); @@ -1296,15 +1334,7 @@ fn test_push_force_notify_wip() { // Note: not slack expectations here. It should not notify slack for WIP PRs. // Setup background thread to validate force-push msg - let expect_thread; - { - let timeout = Duration::from_millis(300); - let rx = test.force_push_rx.take().unwrap(); - expect_thread = thread::spawn(move || { - let last_message = rx.recv_timeout(timeout); - assert!(last_message.is_err()); - }); - } + let expect_thread = test.expect_will_not_force_push_notify(); let resp = test.handler.handle_event().unwrap(); assert_eq!((StatusCode::Ok, "push".into()), resp); @@ -1358,15 +1388,7 @@ fn test_push_force_notify_ignored() { ]); // Setup background thread to validate force-push msg - let expect_thread; - { - let timeout = Duration::from_millis(300); - let rx = test.force_push_rx.take().unwrap(); - expect_thread = thread::spawn(move || { - let last_message = rx.recv_timeout(timeout); - assert!(last_message.is_err()); - }); - } + let expect_thread = test.expect_will_not_force_push_notify(); let resp = test.handler.handle_event().unwrap(); assert_eq!((StatusCode::Ok, "push".into()), resp); @@ -1416,11 +1438,11 @@ fn some_jira_commits() -> Vec { fn many_jira_commits() -> Vec { let commit = Commit { - sha: "ffeedd00110011".into(), - html_url: "http://commit/ffeedd00110011".into(), - author: Some(User::new("bob-author")), - commit: CommitDetails { message: "Fix [SER-1] Add the feature\n\nThe body ([OTHER-123])".into() }, - }; + sha: "ffeedd00110011".into(), + html_url: "http://commit/ffeedd00110011".into(), + author: Some(User::new("bob-author")), + commit: CommitDetails { message: "Fix [SER-1] Add the feature\n\nThe body ([OTHER-123])".into() }, + }; return (0..21).collect::>().into_iter().map(|_| commit.clone()).collect(); } @@ -1657,27 +1679,7 @@ fn test_jira_push_triggers_version_script() { test.handler.data.commits = Some(some_jira_push_commits()); // Setup background thread to validate version msg - // Note: no expectations are set on mock_jira since we have stubbed out the background worker thread - let expect_thread; - { - let timeout = Duration::from_millis(300); - let rx = test.repo_version_rx.take().unwrap(); - expect_thread = thread::spawn(move || { - let msg = rx.recv_timeout(timeout).expect(&format!("expected to recv msg")); - match msg { - WorkMessage::WorkItem(req) => { - assert_eq!("master", req.branch); - assert_eq!("1111abcdef", req.commit_hash); - } - _ => { - panic!("Unexpected messages: {:?}", msg); - } - }; - - let last_message = rx.recv_timeout(timeout); - assert!(last_message.is_err()); - }); - } + let expect_thread = test.expect_will_run_version_script("master", "1111abcdef"); let resp = test.handler.handle_event().unwrap(); assert_eq!((StatusCode::Ok, "push".into()), resp); diff --git a/tests/mocks/mock_slack.rs b/tests/mocks/mock_slack.rs index f3287d8e..2d80dbff 100644 --- a/tests/mocks/mock_slack.rs +++ b/tests/mocks/mock_slack.rs @@ -4,6 +4,7 @@ use std::thread::{self, JoinHandle}; use std::time::Duration; use octobot::slack::SlackRequest; +use octobot::util; use octobot::worker::{WorkMessage, WorkSender}; pub struct MockSlack { @@ -25,7 +26,7 @@ impl MockSlack { let thread = Some(thread::spawn(move || { let timeout = Duration::from_millis(1000); loop { - let req = slack_rx.recv_timeout(timeout); + let req = util::recv_timeout(&slack_rx, timeout); match req { Ok(WorkMessage::WorkItem(req)) => { let front = expected_calls2.lock().unwrap().pop();