Skip to content

Commit

Permalink
Removes all usage of block_on, and use a oneshot channel instead.
Browse files Browse the repository at this point in the history
Calling `block_on` panics in certain context.
For instance, it panics when it is called in a the context of another
call to block.

Using it in tantivy is unnecessary. We replace it by a thin wrapper
around a oneshot channel that supports both async/sync.
  • Loading branch information
fulmicoton committed Mar 18, 2022
1 parent 958b2be commit 0d348e3
Show file tree
Hide file tree
Showing 18 changed files with 247 additions and 146 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ keywords = ["search", "information", "retrieval"]
edition = "2018"

[dependencies]
oneshot = "0.1"
base64 = "0.13"
byteorder = "1.4.3"
crc32fast = "1.2.1"
Expand Down
8 changes: 3 additions & 5 deletions src/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,6 @@ pub(crate) fn f64_to_fastfield_u64(val: f64, field_type: &Type) -> Option<u64> {

#[cfg(test)]
mod tests {

use futures::executor::block_on;
use serde_json::Value;

use super::agg_req::{Aggregation, Aggregations, BucketAggregation};
Expand Down Expand Up @@ -348,7 +346,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}

Expand Down Expand Up @@ -549,7 +547,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}

Expand Down Expand Up @@ -984,7 +982,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}

Expand Down
4 changes: 3 additions & 1 deletion src/directory/file_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ impl FileWatcher {
if metafile_has_changed {
info!("Meta file {:?} was modified", path);
current_checksum_opt = Some(checksum);
futures::executor::block_on(callbacks.broadcast());
// We actually ignore callbacks failing here.
// We just wait for the end of their execution.
let _ = callbacks.broadcast().wait();
}
}

Expand Down
9 changes: 3 additions & 6 deletions src/directory/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc;
use std::time::Duration;

use futures::channel::oneshot;
use futures::executor::block_on;

use super::*;

#[cfg(feature = "mmap")]
Expand Down Expand Up @@ -249,8 +246,8 @@ fn test_lock_blocking(directory: &dyn Directory) {
std::thread::spawn(move || {
//< lock_a_res is sent to the thread.
in_thread_clone.store(true, SeqCst);
let _just_sync = block_on(receiver);
// explicitely droping lock_a_res. It would have been sufficient to just force it
let _just_sync = receiver.recv();
// explicitely dropping lock_a_res. It would have been sufficient to just force it
// to be part of the move, but the intent seems clearer that way.
drop(lock_a_res);
});
Expand All @@ -273,7 +270,7 @@ fn test_lock_blocking(directory: &dyn Directory) {
assert!(in_thread.load(SeqCst));
assert!(lock_a_res.is_ok());
});
assert!(block_on(receiver2).is_ok());
assert!(receiver2.recv().is_ok());
assert!(sender.send(()).is_ok());
assert!(join_handle.join().is_ok());
}
53 changes: 19 additions & 34 deletions src/directory/watch_event_router.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::{Arc, RwLock, Weak};

use futures::channel::oneshot;
use futures::{Future, TryFutureExt};
use crate::ScheduledResult;

/// Cloneable wrapper for callbacks registered when watching files of a `Directory`.
#[derive(Clone)]
Expand Down Expand Up @@ -74,12 +73,11 @@ impl WatchCallbackList {
}

/// Triggers all callbacks
pub fn broadcast(&self) -> impl Future<Output = ()> {
pub fn broadcast(&self) -> ScheduledResult<()> {
let callbacks = self.list_callback();
let (sender, receiver) = oneshot::channel();
let result = receiver.unwrap_or_else(|_| ());
let (result, sender) = ScheduledResult::create("One of the callback panicked.");
if callbacks.is_empty() {
let _ = sender.send(());
let _ = sender.send(Ok(()));
return result;
}
let spawn_res = std::thread::Builder::new()
Expand All @@ -88,7 +86,7 @@ impl WatchCallbackList {
for callback in callbacks {
callback.call();
}
let _ = sender.send(());
let _ = sender.send(Ok(()));
});
if let Err(err) = spawn_res {
error!(
Expand All @@ -106,8 +104,6 @@ mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use futures::executor::block_on;

use crate::directory::{WatchCallback, WatchCallbackList};

#[test]
Expand All @@ -118,22 +114,18 @@ mod tests {
let inc_callback = WatchCallback::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(0, counter.load(Ordering::SeqCst));
let handle_a = watch_event_router.subscribe(inc_callback);
assert_eq!(0, counter.load(Ordering::SeqCst));
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(1, counter.load(Ordering::SeqCst));
block_on(async {
(
watch_event_router.broadcast().await,
watch_event_router.broadcast().await,
watch_event_router.broadcast().await,
)
});
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
assert_eq!(4, counter.load(Ordering::SeqCst));
mem::drop(handle_a);
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(4, counter.load(Ordering::SeqCst));
}

Expand All @@ -150,19 +142,15 @@ mod tests {
let handle_a = watch_event_router.subscribe(inc_callback(1));
let handle_a2 = watch_event_router.subscribe(inc_callback(10));
assert_eq!(0, counter.load(Ordering::SeqCst));
block_on(async {
futures::join!(
watch_event_router.broadcast(),
watch_event_router.broadcast()
)
});
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
assert_eq!(22, counter.load(Ordering::SeqCst));
mem::drop(handle_a);
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(32, counter.load(Ordering::SeqCst));
mem::drop(handle_a2);
block_on(watch_event_router.broadcast());
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
assert_eq!(32, counter.load(Ordering::SeqCst));
}

Expand All @@ -176,15 +164,12 @@ mod tests {
});
let handle_a = watch_event_router.subscribe(inc_callback);
assert_eq!(0, counter.load(Ordering::SeqCst));
block_on(async {
let future1 = watch_event_router.broadcast();
let future2 = watch_event_router.broadcast();
futures::join!(future1, future2)
});
watch_event_router.broadcast().wait().unwrap();
watch_event_router.broadcast().wait().unwrap();
assert_eq!(2, counter.load(Ordering::SeqCst));
mem::drop(handle_a);
let _ = watch_event_router.broadcast();
block_on(watch_event_router.broadcast());
watch_event_router.broadcast().wait().unwrap();
assert_eq!(2, counter.load(Ordering::SeqCst));
}
}
3 changes: 1 addition & 2 deletions src/fastfield/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,7 @@ mod tests {
.map(SegmentReader::segment_id)
.collect();
assert_eq!(segment_ids.len(), 2);
let merge_future = index_writer.merge(&segment_ids[..]);
futures::executor::block_on(merge_future)?;
index_writer.merge(&segment_ids[..]).wait().unwrap();
reader.reload()?;
assert_eq!(reader.searcher().segment_readers().len(), 1);
Ok(())
Expand Down
5 changes: 2 additions & 3 deletions src/fastfield/multivalued/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub use self::writer::MultiValuedFastFieldWriter;
mod tests {

use chrono::Duration;
use futures::executor::block_on;
use proptest::strategy::Strategy;
use proptest::{prop_oneof, proptest};
use test_log::test;
Expand Down Expand Up @@ -268,7 +267,7 @@ mod tests {
IndexingOp::Merge => {
let segment_ids = index.searchable_segment_ids()?;
if segment_ids.len() >= 2 {
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.segment_updater().wait_merging_thread()?;
}
}
Expand All @@ -283,7 +282,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
if !segment_ids.is_empty() {
block_on(index_writer.merge(&segment_ids)).unwrap();
index_writer.merge(&segment_ids).wait()?;
assert!(index_writer.wait_merging_threads().is_ok());
}
}
Expand Down
22 changes: 9 additions & 13 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use std::thread::JoinHandle;

use common::BitSet;
use crossbeam::channel;
use futures::executor::block_on;
use futures::future::Future;
use smallvec::smallvec;

use super::operation::{AddOperation, UserOperation};
Expand All @@ -24,7 +22,7 @@ use crate::indexer::operation::DeleteOperation;
use crate::indexer::stamper::Stamper;
use crate::indexer::{MergePolicy, SegmentEntry, SegmentWriter};
use crate::schema::{Document, IndexRecordOption, Term};
use crate::Opstamp;
use crate::{Opstamp, ScheduledResult};

// Size of the margin for the `memory_arena`. A segment is closed when the remaining memory
// in the `memory_arena` goes below MARGIN_IN_BYTES.
Expand Down Expand Up @@ -214,7 +212,7 @@ fn index_documents(
meta.untrack_temp_docstore();
// update segment_updater inventory to remove tempstore
let segment_entry = SegmentEntry::new(meta, delete_cursor, alive_bitset_opt);
block_on(segment_updater.schedule_add_segment(segment_entry))?;
segment_updater.schedule_add_segment(segment_entry).wait()?;
Ok(())
}

Expand Down Expand Up @@ -368,7 +366,9 @@ impl IndexWriter {
pub fn add_segment(&self, segment_meta: SegmentMeta) -> crate::Result<()> {
let delete_cursor = self.delete_queue.cursor();
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None);
block_on(self.segment_updater.schedule_add_segment(segment_entry))
self.segment_updater
.schedule_add_segment(segment_entry)
.wait()
}

/// Creates a new segment.
Expand Down Expand Up @@ -516,13 +516,10 @@ impl IndexWriter {
/// Merges a given list of segments
///
/// `segment_ids` is required to be non-empty.
pub fn merge(
&mut self,
segment_ids: &[SegmentId],
) -> impl Future<Output = crate::Result<SegmentMeta>> {
pub fn merge(&mut self, segment_ids: &[SegmentId]) -> ScheduledResult<SegmentMeta> {
let merge_operation = self.segment_updater.make_merge_operation(segment_ids);
let segment_updater = self.segment_updater.clone();
async move { segment_updater.start_merge(merge_operation)?.await }
segment_updater.start_merge(merge_operation)
}

/// Closes the current document channel send.
Expand Down Expand Up @@ -781,7 +778,6 @@ impl Drop for IndexWriter {
mod tests {
use std::collections::{HashMap, HashSet};

use futures::executor::block_on;
use proptest::prelude::*;
use proptest::prop_oneof;
use proptest::strategy::Strategy;
Expand Down Expand Up @@ -1456,7 +1452,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
if segment_ids.len() >= 2 {
block_on(index_writer.merge(&segment_ids)).unwrap();
index_writer.merge(&segment_ids).wait().unwrap();
assert!(index_writer.segment_updater().wait_merging_thread().is_ok());
}
}
Expand All @@ -1472,7 +1468,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
if segment_ids.len() >= 2 {
block_on(index_writer.merge(&segment_ids)).unwrap();
index_writer.merge(&segment_ids).wait().unwrap();
assert!(index_writer.wait_merging_threads().is_ok());
}
}
Expand Down
20 changes: 11 additions & 9 deletions src/indexer/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,6 @@ impl IndexMerger {
#[cfg(test)]
mod tests {
use byteorder::{BigEndian, ReadBytesExt};
use futures::executor::block_on;
use schema::FAST;

use crate::collector::tests::{
Expand Down Expand Up @@ -1207,7 +1206,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}
{
Expand Down Expand Up @@ -1456,7 +1455,7 @@ mod tests {
{
// merging the segments
let segment_ids = index.searchable_segment_ids()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
reader.reload()?;
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
Expand Down Expand Up @@ -1549,7 +1548,7 @@ mod tests {
{
// Test merging a single segment in order to remove deletes.
let segment_ids = index.searchable_segment_ids()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
reader.reload()?;

let searcher = reader.searcher();
Expand Down Expand Up @@ -1769,7 +1768,10 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_for_tests().unwrap();
block_on(index_writer.merge(&segment_ids)).expect("Merging failed");
index_writer
.merge(&segment_ids)
.wait()
.expect("Merging failed");
index_writer.wait_merging_threads().unwrap();
reader.reload().unwrap();
test_searcher(
Expand Down Expand Up @@ -1824,7 +1826,7 @@ mod tests {
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
reader.reload()?;
// commit has not been called yet. The document should still be
// there.
Expand All @@ -1851,7 +1853,7 @@ mod tests {
index_writer.commit()?;
index_writer.delete_term(Term::from_field_u64(int_field, 1));
let segment_ids = index.searchable_segment_ids()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;

// assert delete has not been committed
reader.reload()?;
Expand Down Expand Up @@ -1952,7 +1954,7 @@ mod tests {
{
let segment_ids = index.searchable_segment_ids()?;
let mut index_writer = index.writer_for_tests()?;
block_on(index_writer.merge(&segment_ids))?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}
reader.reload()?;
Expand Down Expand Up @@ -2080,7 +2082,7 @@ mod tests {
.iter()
.map(|reader| reader.segment_id())
.collect();
block_on(writer.merge(&segment_ids[..]))?;
writer.merge(&segment_ids[..]).wait()?;

reader.reload()?;
let searcher = reader.searcher();
Expand Down
Loading

0 comments on commit 0d348e3

Please sign in to comment.