Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removes all usage of block_on, and use a oneshot channel instead. #1315

Merged
merged 5 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ keywords = ["search", "information", "retrieval"]
edition = "2018"

[dependencies]
oneshot = "0.1"
base64 = "0.13"
byteorder = "1.4.3"
crc32fast = "1.2.1"
Expand All @@ -32,7 +33,6 @@ fs2={ version = "0.4.3", optional = true }
levenshtein_automata = "0.2"
uuid = { version = "0.8.2", features = ["v4", "serde"] }
crossbeam = "0.8.1"
futures = { version = "0.3.15", features = ["thread-pool"] }
tantivy-query-grammar = { version="0.15.0", path="./query-grammar" }
tantivy-bitpacker = { version="0.1", path="./bitpacker" }
common = { version = "0.2", path = "./common/", package = "tantivy-common" }
Expand Down Expand Up @@ -71,6 +71,7 @@ criterion = "0.3.5"
test-log = "0.2.8"
env_logger = "0.9.0"
pprof = {version= "0.7", features=["flamegraph", "criterion"]}
futures = "0.3.15"

[dev-dependencies.fail]
version = "0.5"
Expand Down
8 changes: 3 additions & 5 deletions src/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,6 @@ pub(crate) fn f64_to_fastfield_u64(val: f64, field_type: &Type) -> Option<u64> {

#[cfg(test)]
mod tests {

use futures::executor::block_on;
use serde_json::Value;

use super::agg_req::{Aggregation, Aggregations, BucketAggregation};
Expand Down Expand Up @@ -348,7 +346,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}

Expand Down Expand Up @@ -549,7 +547,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}

Expand Down Expand Up @@ -984,7 +982,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}

Expand Down
12 changes: 6 additions & 6 deletions src/core/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,24 +781,24 @@ mod tests {
for i in 0u64..8_000u64 {
writer.add_document(doc!(field => i))?;
}
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle = directory.watch(WatchCallback::new(move || {
let _ = sender.send(());
}));

writer.commit()?;
let mem_right_after_commit = directory.total_mem_usage();
assert!(receiver.recv().is_ok());

let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;

assert_eq!(reader.searcher().num_docs(), 8_000);
assert_eq!(reader.searcher().segment_readers().len(), 8);

writer.wait_merging_threads()?;

let mem_right_after_merge_finished = directory.total_mem_usage();

reader.reload().unwrap();
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
assert_eq!(searcher.num_docs(), 8_000);
assert!(
mem_right_after_merge_finished < mem_right_after_commit,
Expand Down
4 changes: 3 additions & 1 deletion src/directory/file_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ impl FileWatcher {
if metafile_has_changed {
info!("Meta file {:?} was modified", path);
current_checksum_opt = Some(checksum);
futures::executor::block_on(callbacks.broadcast());
// We actually ignore callbacks failing here.
// We just wait for the end of their execution.
let _ = callbacks.broadcast().wait();
}
}

Expand Down
9 changes: 3 additions & 6 deletions src/directory/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc;
use std::time::Duration;

use futures::channel::oneshot;
use futures::executor::block_on;

use super::*;

#[cfg(feature = "mmap")]
Expand Down Expand Up @@ -249,8 +246,8 @@ fn test_lock_blocking(directory: &dyn Directory) {
std::thread::spawn(move || {
//< lock_a_res is sent to the thread.
in_thread_clone.store(true, SeqCst);
let _just_sync = block_on(receiver);
// explicitely droping lock_a_res. It would have been sufficient to just force it
let _just_sync = receiver.recv();
// explicitely dropping lock_a_res. It would have been sufficient to just force it
// to be part of the move, but the intent seems clearer that way.
drop(lock_a_res);
});
Expand All @@ -273,7 +270,7 @@ fn test_lock_blocking(directory: &dyn Directory) {
assert!(in_thread.load(SeqCst));
assert!(lock_a_res.is_ok());
});
assert!(block_on(receiver2).is_ok());
assert!(receiver2.recv().is_ok());
assert!(sender.send(()).is_ok());
assert!(join_handle.join().is_ok());
}
53 changes: 19 additions & 34 deletions src/directory/watch_event_router.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::{Arc, RwLock, Weak};

use futures::channel::oneshot;
use futures::{Future, TryFutureExt};
use crate::FutureResult;

/// Cloneable wrapper for callbacks registered when watching files of a `Directory`.
#[derive(Clone)]
Expand Down Expand Up @@ -74,12 +73,11 @@ impl WatchCallbackList {
}

/// Triggers all callbacks
pub fn broadcast(&self) -> impl Future<Output = ()> {
pub fn broadcast(&self) -> FutureResult<()> {
let callbacks = self.list_callback();
let (sender, receiver) = oneshot::channel();
let result = receiver.unwrap_or_else(|_| ());
let (result, sender) = FutureResult::create("One of the callback panicked.");
if callbacks.is_empty() {
let _ = sender.send(());
let _ = sender.send(Ok(()));
return result;
}
let spawn_res = std::thread::Builder::new()
Expand All @@ -88,7 +86,7 @@ impl WatchCallbackList {
for callback in callbacks {
callback.call();
}
let _ = sender.send(());
let _ = sender.send(Ok(()));
});
if let Err(err) = spawn_res {
error!(
Expand All @@ -106,8 +104,6 @@ mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use futures::executor::block_on;

use crate::directory::{WatchCallback, WatchCallbackList};

#[test]
Expand All @@ -118,22 +114,18 @@ mod tests {
let inc_callback = WatchCallback::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(0, counter.load(Ordering::SeqCst));
let handle_a = watch_event_router.subscribe(inc_callback);
assert_eq!(0, counter.load(Ordering::SeqCst));
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(1, counter.load(Ordering::SeqCst));
block_on(async {
(
watch_event_router.broadcast().await,
watch_event_router.broadcast().await,
watch_event_router.broadcast().await,
)
});
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
assert_eq!(4, counter.load(Ordering::SeqCst));
mem::drop(handle_a);
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(4, counter.load(Ordering::SeqCst));
}

Expand All @@ -150,19 +142,15 @@ mod tests {
let handle_a = watch_event_router.subscribe(inc_callback(1));
let handle_a2 = watch_event_router.subscribe(inc_callback(10));
assert_eq!(0, counter.load(Ordering::SeqCst));
block_on(async {
futures::join!(
watch_event_router.broadcast(),
watch_event_router.broadcast()
)
});
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
assert_eq!(22, counter.load(Ordering::SeqCst));
mem::drop(handle_a);
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(32, counter.load(Ordering::SeqCst));
mem::drop(handle_a2);
block_on(watch_event_router.broadcast());
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
assert_eq!(32, counter.load(Ordering::SeqCst));
}

Expand All @@ -176,15 +164,12 @@ mod tests {
});
let handle_a = watch_event_router.subscribe(inc_callback);
assert_eq!(0, counter.load(Ordering::SeqCst));
block_on(async {
let future1 = watch_event_router.broadcast();
let future2 = watch_event_router.broadcast();
futures::join!(future1, future2)
});
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
assert_eq!(2, counter.load(Ordering::SeqCst));
mem::drop(handle_a);
let _ = watch_event_router.broadcast();
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(2, counter.load(Ordering::SeqCst));
}
}
3 changes: 1 addition & 2 deletions src/fastfield/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,7 @@ mod tests {
.map(SegmentReader::segment_id)
.collect();
assert_eq!(segment_ids.len(), 2);
let merge_future = index_writer.merge(&segment_ids[..]);
futures::executor::block_on(merge_future)?;
index_writer.merge(&segment_ids[..]).wait().unwrap();
reader.reload()?;
assert_eq!(reader.searcher().segment_readers().len(), 1);
Ok(())
Expand Down
5 changes: 2 additions & 3 deletions src/fastfield/multivalued/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub use self::writer::MultiValuedFastFieldWriter;
mod tests {

use chrono::Duration;
use futures::executor::block_on;
use proptest::strategy::Strategy;
use proptest::{prop_oneof, proptest};
use test_log::test;
Expand Down Expand Up @@ -268,7 +267,7 @@ mod tests {
IndexingOp::Merge => {
let segment_ids = index.searchable_segment_ids()?;
if segment_ids.len() >= 2 {
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.segment_updater().wait_merging_thread()?;
}
}
Expand All @@ -283,7 +282,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
if !segment_ids.is_empty() {
block_on(index_writer.merge(&segment_ids)).unwrap();
index_writer.merge(&segment_ids).wait()?;
assert!(index_writer.wait_merging_threads().is_ok());
}
}
Expand Down
Loading